diff options
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r-- | include/salticidae/network.h | 55 |
1 files changed, 45 insertions, 10 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h index fd69b58..6f7a034 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -45,7 +45,7 @@ class MsgNetwork: public ConnPool { template<typename T> struct callback_traits: public callback_traits<decltype(&T::operator())> {}; - + /* match plain functions */ template<typename ReturnType, typename MsgType, typename ConnType> struct callback_traits<ReturnType(MsgType, ConnType)> { @@ -53,17 +53,17 @@ class MsgNetwork: public ConnPool { using conn_type = typename std::remove_reference<ConnType>::type::type; using msg_type = typename std::remove_reference<MsgType>::type; }; - + /* match function pointers */ template<typename ReturnType, typename... Args> struct callback_traits<ReturnType(*)(Args...)>: public callback_traits<ReturnType(Args...)> {}; - + /* match const member functions */ template<typename ClassType, typename ReturnType, typename... Args> struct callback_traits<ReturnType(ClassType::*)(Args...) const>: public callback_traits<ReturnType(Args...)> {}; - + /* match member functions */ template<typename ClassType, typename ReturnType, typename... Args> struct callback_traits<ReturnType(ClassType::*)(Args...)>: @@ -78,6 +78,8 @@ class MsgNetwork: public ConnPool { Msg msg; MsgState msg_state; + bool msg_sleep; + TimerEvent ev_enqueue_poll; protected: #ifdef SALTICIDAE_MSG_STAT @@ -88,7 +90,7 @@ class MsgNetwork: public ConnPool { #endif public: - Conn(): msg_state(HEADER) + Conn(): msg_state(HEADER), msg_sleep(false) #ifdef SALTICIDAE_MSG_STAT , nsent(0), nrecv(0), nsentb(0), nrecvb(0) #endif @@ -128,6 +130,26 @@ class MsgNetwork: public ConnPool { ConnPool::Conn *create_conn() override { return new Conn(); } void on_read(const ConnPool::conn_t &) override; + void on_setup(const ConnPool::conn_t &_conn) override { + auto conn = static_pointer_cast<Conn>(_conn); + conn->ev_enqueue_poll = TimerEvent(conn->worker->get_ec(), + [this, conn](TimerEvent &) { + if (!incoming_msgs.enqueue(std::make_pair(conn->msg, conn), false)) + { + conn->msg_sleep = true; + conn->ev_enqueue_poll.add(0); + return; + } + conn->msg_sleep = false; + on_read(conn); + }); + } + + void on_teardown(const ConnPool::conn_t &_conn) override { + auto conn = static_pointer_cast<Conn>(_conn); + conn->ev_enqueue_poll.clear(); + } + public: class Config: public ConnPool::Config { @@ -153,7 +175,7 @@ class MsgNetwork: public ConnPool { incoming_msgs.reg_handler(ec, [this, burst_size=config._burst_size](queue_t &q) { std::pair<Msg, conn_t> item; size_t cnt = 0; - while (q.try_dequeue(item)) + while (q.try_dequeue(item) && this->system_state == 1) { auto &msg = item.first; auto &conn = item.second; @@ -525,8 +547,9 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { /* this callback is run by a worker */ template<typename OpcodeType> void MsgNetwork<OpcodeType>::on_read(const ConnPool::conn_t &_conn) { - ConnPool::on_read(_conn); auto conn = static_pointer_cast<Conn>(_conn); + if (conn->msg_sleep) return; + ConnPool::on_read(_conn); auto &recv_buffer = conn->recv_buffer; auto &msg = conn->msg; auto &msg_state = conn->msg_state; @@ -550,13 +573,25 @@ void MsgNetwork<OpcodeType>::on_read(const ConnPool::conn_t &_conn) { if (!msg.verify_checksum()) { SALTICIDAE_LOG_WARN("checksums do not match, dropping the message"); - return; + break; } #endif - while (!incoming_msgs.enqueue(std::make_pair(msg, conn), true)) - std::this_thread::yield(); + if (!incoming_msgs.enqueue(std::make_pair(msg, conn), false)) + { + conn->msg_sleep = true; + conn->ev_enqueue_poll.add(0); + return; + } } } + if (conn->ready_recv && recv_buffer.size() < conn->max_recv_buff_size) + { + /* resume reading from socket */ + conn->ev_socket.del(); + conn->ev_socket.add(FdEvent::READ | + (conn->ready_send ? 0: FdEvent::WRITE)); + conn->send_data_func(conn, conn->fd, FdEvent::READ); + } } template<typename OpcodeType> |