diff options
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r-- | include/salticidae/network.h | 64 |
1 files changed, 41 insertions, 23 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 4e966d1..d82772f 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -103,7 +103,7 @@ class MsgNetwork: public ConnPool { void on_read() override; }; - using conn_t = RcObj<Conn>; + using conn_t = ArcObj<Conn>; #ifdef SALTICIDAE_MSG_STAT class msg_stat_by_opcode_t: public std::unordered_map<typename Msg::opcode_t, @@ -121,6 +121,8 @@ class MsgNetwork: public ConnPool { std::unordered_map< typename Msg::opcode_t, std::function<void(const Msg &msg, Conn &)>> handler_map; + using queue_t = MPSCQueueEventDriven<std::pair<Msg, conn_t>>; + queue_t incoming_msgs; protected: #ifdef SALTICIDAE_MSG_STAT @@ -134,10 +136,38 @@ class MsgNetwork: public ConnPool { MsgNetwork(const EventContext &ec, int max_listen_backlog, double conn_server_timeout, - size_t seg_buff_size): + size_t seg_buff_size, + size_t burst_size = 1000): ConnPool(ec, max_listen_backlog, conn_server_timeout, - seg_buff_size) {} + seg_buff_size) { + incoming_msgs.reg_handler(ec, [this, burst_size](queue_t &q) { + std::pair<Msg, conn_t> item; + size_t cnt = 0; + while (q.try_dequeue(item)) + { + auto &msg = item.first; + auto &conn = item.second; + auto it = handler_map.find(msg.get_opcode()); + if (it == handler_map.end()) + SALTICIDAE_LOG_WARN("unknown opcode: %s", + get_hex(msg.get_opcode()).c_str()); + else /* call the handler */ + { + SALTICIDAE_LOG_DEBUG("got message %s from %s", + std::string(msg).c_str(), + std::string(*conn).c_str()); + it->second(msg, *conn); +#ifdef SALTICIDAE_MSG_STAT + conn->nrecv++; + recv_by_opcode.add(msg); +#endif + } + if (++cnt == burst_size) return true; + } + return false; + }); + } template<typename Func> typename std::enable_if<std::is_constructible< @@ -189,7 +219,7 @@ class ClientNetwork: public MsgNetwork<OpcodeType> { void on_teardown() override; }; - using conn_t = RcObj<Conn>; + using conn_t = ArcObj<Conn>; protected: ConnPool::Conn *create_conn() override { return new Conn(); } @@ -249,7 +279,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { void on_teardown() override; }; - using conn_t = RcObj<Conn>; + using conn_t = ArcObj<Conn>; private: struct Peer { @@ -361,10 +391,11 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { } }; +/* this callback is run by a worker */ template<typename OpcodeType> void MsgNetwork<OpcodeType>::Conn::on_read() { ConnPool::Conn::on_read(); - auto &recv_buffer = read(); + auto &recv_buffer = get_recv_buffer(); auto mn = get_net(); while (get_fd() != -1) { @@ -389,21 +420,8 @@ void MsgNetwork<OpcodeType>::Conn::on_read() { return; } #endif - auto it = mn->handler_map.find(msg.get_opcode()); - if (it == mn->handler_map.end()) - SALTICIDAE_LOG_WARN("unknown opcode: %s", - get_hex(msg.get_opcode()).c_str()); - else /* call the handler */ - { - SALTICIDAE_LOG_DEBUG("got message %s from %s", - std::string(msg).c_str(), - std::string(*this).c_str()); - it->second(msg, *this); -#ifdef SALTICIDAE_MSG_STAT - nrecv++; - mn->recv_by_opcode.add(msg); -#endif - } + mn->incoming_msgs.enqueue( + std::make_pair(std::move(msg), static_pointer_cast<Conn>(self()))); } } } @@ -414,8 +432,8 @@ void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) { { if (conn) { - SALTICIDAE_LOG_DEBUG("moving send buffer"); - new_conn->move_send_buffer(conn); + //SALTICIDAE_LOG_DEBUG("moving send buffer"); + //new_conn->move_send_buffer(conn); SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str()); conn->terminate(); } |