diff options
Diffstat (limited to 'include')
-rw-r--r-- | include/salticidae/conn.h | 37 | ||||
-rw-r--r-- | include/salticidae/network.h | 55 |
2 files changed, 72 insertions, 20 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 56d2d6a..a594da6 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -69,10 +69,11 @@ class ConnPool { ACTIVE, /**< the connection is established by connect() */ PASSIVE, /**< the connection is established by accept() */ }; - + protected: std::atomic<bool> terminated; size_t seg_buff_size; + size_t max_recv_buff_size; int fd; Worker *worker; ConnPool *cpool; @@ -86,13 +87,14 @@ class ConnPool { FdEvent ev_socket; /** does not need to wait if true */ bool ready_send; + bool ready_recv; typedef void (socket_io_func)(const conn_t &, int, int); socket_io_func *send_data_func; socket_io_func *recv_data_func; BoxObj<TLS> tls; BoxObj<const X509> peer_cert; - + static socket_io_func _recv_data; static socket_io_func _send_data; @@ -107,12 +109,13 @@ class ConnPool { virtual void stop(); public: - Conn(): terminated(false), worker(nullptr), ready_send(false), + Conn(): terminated(false), worker(nullptr), + ready_send(false), ready_recv(false), send_data_func(nullptr), recv_data_func(nullptr), tls(nullptr), peer_cert(nullptr) {} Conn(const Conn &) = delete; Conn(Conn &&other) = delete; - + virtual ~Conn() { SALTICIDAE_LOG_INFO("destroyed %s", std::string(*this).c_str()); } @@ -139,6 +142,7 @@ class ConnPool { }; protected: + int system_state; EventContext ec; EventContext disp_ec; ThreadCall* disp_tcall; @@ -178,6 +182,7 @@ class ConnPool { const int max_listen_backlog; const double conn_server_timeout; const size_t seg_buff_size; + const size_t max_recv_buff_size; const size_t queue_capacity; tls_context_t tls_ctx; @@ -230,7 +235,12 @@ class ConnPool { /* the following functions are called by the dispatcher */ void start() { - handle = std::thread([this]() { ec.dispatch(); }); + handle = std::thread([this]() { + sigset_t mask; + sigfillset(&mask); + pthread_sigmask(SIG_BLOCK, &mask, NULL); + ec.dispatch(); + }); } void enable_send_buffer(const conn_t &conn, int client_fd) { @@ -240,7 +250,8 @@ class ConnPool { if (conn->ready_send) { conn->ev_socket.del(); - conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE); + conn->ev_socket.add((conn->ready_recv ? 0 : FdEvent::READ) | + FdEvent::WRITE); conn->send_data_func(conn, client_fd, FdEvent::WRITE); } return false; @@ -318,7 +329,6 @@ class ConnPool { /* related to workers */ size_t nworker; salticidae::BoxObj<Worker[]> workers; - int system_state; void accept_client(int, int); void conn_server(const conn_t &conn, int, int); @@ -348,6 +358,7 @@ class ConnPool { int _max_listen_backlog; double _conn_server_timeout; size_t _seg_buff_size; + size_t _max_recv_buff_size; size_t _nworker; size_t _queue_capacity; bool _enable_tls; @@ -363,6 +374,7 @@ class ConnPool { _max_listen_backlog(10), _conn_server_timeout(2), _seg_buff_size(4096), + _max_recv_buff_size(4096), _nworker(1), _queue_capacity(0), _enable_tls(false), @@ -388,6 +400,11 @@ class ConnPool { return *this; } + Config &max_recv_buff_size(size_t x) { + _max_recv_buff_size = x; + return *this; + } + Config &nworker(size_t x) { _nworker = std::max((size_t)1, x); return *this; @@ -435,17 +452,17 @@ class ConnPool { }; ConnPool(const EventContext &ec, const Config &config): - ec(ec), + system_state(0), ec(ec), enable_tls(config._enable_tls), async_id(0), max_listen_backlog(config._max_listen_backlog), conn_server_timeout(config._conn_server_timeout), seg_buff_size(config._seg_buff_size), + max_recv_buff_size(config._max_recv_buff_size), queue_capacity(config._queue_capacity), tls_ctx(nullptr), listen_fd(-1), - nworker(config._nworker), - system_state(0) { + nworker(config._nworker) { if (enable_tls) { tls_ctx = new TLSContext(); diff --git a/include/salticidae/network.h b/include/salticidae/network.h index fd69b58..6f7a034 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -45,7 +45,7 @@ class MsgNetwork: public ConnPool { template<typename T> struct callback_traits: public callback_traits<decltype(&T::operator())> {}; - + /* match plain functions */ template<typename ReturnType, typename MsgType, typename ConnType> struct callback_traits<ReturnType(MsgType, ConnType)> { @@ -53,17 +53,17 @@ class MsgNetwork: public ConnPool { using conn_type = typename std::remove_reference<ConnType>::type::type; using msg_type = typename std::remove_reference<MsgType>::type; }; - + /* match function pointers */ template<typename ReturnType, typename... Args> struct callback_traits<ReturnType(*)(Args...)>: public callback_traits<ReturnType(Args...)> {}; - + /* match const member functions */ template<typename ClassType, typename ReturnType, typename... Args> struct callback_traits<ReturnType(ClassType::*)(Args...) const>: public callback_traits<ReturnType(Args...)> {}; - + /* match member functions */ template<typename ClassType, typename ReturnType, typename... Args> struct callback_traits<ReturnType(ClassType::*)(Args...)>: @@ -78,6 +78,8 @@ class MsgNetwork: public ConnPool { Msg msg; MsgState msg_state; + bool msg_sleep; + TimerEvent ev_enqueue_poll; protected: #ifdef SALTICIDAE_MSG_STAT @@ -88,7 +90,7 @@ class MsgNetwork: public ConnPool { #endif public: - Conn(): msg_state(HEADER) + Conn(): msg_state(HEADER), msg_sleep(false) #ifdef SALTICIDAE_MSG_STAT , nsent(0), nrecv(0), nsentb(0), nrecvb(0) #endif @@ -128,6 +130,26 @@ class MsgNetwork: public ConnPool { ConnPool::Conn *create_conn() override { return new Conn(); } void on_read(const ConnPool::conn_t &) override; + void on_setup(const ConnPool::conn_t &_conn) override { + auto conn = static_pointer_cast<Conn>(_conn); + conn->ev_enqueue_poll = TimerEvent(conn->worker->get_ec(), + [this, conn](TimerEvent &) { + if (!incoming_msgs.enqueue(std::make_pair(conn->msg, conn), false)) + { + conn->msg_sleep = true; + conn->ev_enqueue_poll.add(0); + return; + } + conn->msg_sleep = false; + on_read(conn); + }); + } + + void on_teardown(const ConnPool::conn_t &_conn) override { + auto conn = static_pointer_cast<Conn>(_conn); + conn->ev_enqueue_poll.clear(); + } + public: class Config: public ConnPool::Config { @@ -153,7 +175,7 @@ class MsgNetwork: public ConnPool { incoming_msgs.reg_handler(ec, [this, burst_size=config._burst_size](queue_t &q) { std::pair<Msg, conn_t> item; size_t cnt = 0; - while (q.try_dequeue(item)) + while (q.try_dequeue(item) && this->system_state == 1) { auto &msg = item.first; auto &conn = item.second; @@ -525,8 +547,9 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { /* this callback is run by a worker */ template<typename OpcodeType> void MsgNetwork<OpcodeType>::on_read(const ConnPool::conn_t &_conn) { - ConnPool::on_read(_conn); auto conn = static_pointer_cast<Conn>(_conn); + if (conn->msg_sleep) return; + ConnPool::on_read(_conn); auto &recv_buffer = conn->recv_buffer; auto &msg = conn->msg; auto &msg_state = conn->msg_state; @@ -550,13 +573,25 @@ void MsgNetwork<OpcodeType>::on_read(const ConnPool::conn_t &_conn) { if (!msg.verify_checksum()) { SALTICIDAE_LOG_WARN("checksums do not match, dropping the message"); - return; + break; } #endif - while (!incoming_msgs.enqueue(std::make_pair(msg, conn), true)) - std::this_thread::yield(); + if (!incoming_msgs.enqueue(std::make_pair(msg, conn), false)) + { + conn->msg_sleep = true; + conn->ev_enqueue_poll.add(0); + return; + } } } + if (conn->ready_recv && recv_buffer.size() < conn->max_recv_buff_size) + { + /* resume reading from socket */ + conn->ev_socket.del(); + conn->ev_socket.add(FdEvent::READ | + (conn->ready_send ? 0: FdEvent::WRITE)); + conn->send_data_func(conn, conn->fd, FdEvent::READ); + } } template<typename OpcodeType> |