From dd09443b0b3c0b5d1a8c034644d1065dd25bf5a9 Mon Sep 17 00:00:00 2001 From: Determinant Date: Sun, 11 Nov 2018 23:02:59 -0500 Subject: start debugging multiloops design --- include/salticidae/network.h | 64 ++++++++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 23 deletions(-) (limited to 'include/salticidae/network.h') diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 4e966d1..d82772f 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -103,7 +103,7 @@ class MsgNetwork: public ConnPool { void on_read() override; }; - using conn_t = RcObj; + using conn_t = ArcObj; #ifdef SALTICIDAE_MSG_STAT class msg_stat_by_opcode_t: public std::unordered_map> handler_map; + using queue_t = MPSCQueueEventDriven>; + queue_t incoming_msgs; protected: #ifdef SALTICIDAE_MSG_STAT @@ -134,10 +136,38 @@ class MsgNetwork: public ConnPool { MsgNetwork(const EventContext &ec, int max_listen_backlog, double conn_server_timeout, - size_t seg_buff_size): + size_t seg_buff_size, + size_t burst_size = 1000): ConnPool(ec, max_listen_backlog, conn_server_timeout, - seg_buff_size) {} + seg_buff_size) { + incoming_msgs.reg_handler(ec, [this, burst_size](queue_t &q) { + std::pair item; + size_t cnt = 0; + while (q.try_dequeue(item)) + { + auto &msg = item.first; + auto &conn = item.second; + auto it = handler_map.find(msg.get_opcode()); + if (it == handler_map.end()) + SALTICIDAE_LOG_WARN("unknown opcode: %s", + get_hex(msg.get_opcode()).c_str()); + else /* call the handler */ + { + SALTICIDAE_LOG_DEBUG("got message %s from %s", + std::string(msg).c_str(), + std::string(*conn).c_str()); + it->second(msg, *conn); +#ifdef SALTICIDAE_MSG_STAT + conn->nrecv++; + recv_by_opcode.add(msg); +#endif + } + if (++cnt == burst_size) return true; + } + return false; + }); + } template typename std::enable_if { void on_teardown() override; }; - using conn_t = RcObj; + using conn_t = ArcObj; protected: ConnPool::Conn *create_conn() override { return new Conn(); } @@ -249,7 +279,7 @@ class PeerNetwork: public MsgNetwork { void on_teardown() override; }; - using conn_t = RcObj; + using conn_t = ArcObj; private: struct Peer { @@ -361,10 +391,11 @@ class PeerNetwork: public MsgNetwork { } }; +/* this callback is run by a worker */ template void MsgNetwork::Conn::on_read() { ConnPool::Conn::on_read(); - auto &recv_buffer = read(); + auto &recv_buffer = get_recv_buffer(); auto mn = get_net(); while (get_fd() != -1) { @@ -389,21 +420,8 @@ void MsgNetwork::Conn::on_read() { return; } #endif - auto it = mn->handler_map.find(msg.get_opcode()); - if (it == mn->handler_map.end()) - SALTICIDAE_LOG_WARN("unknown opcode: %s", - get_hex(msg.get_opcode()).c_str()); - else /* call the handler */ - { - SALTICIDAE_LOG_DEBUG("got message %s from %s", - std::string(msg).c_str(), - std::string(*this).c_str()); - it->second(msg, *this); -#ifdef SALTICIDAE_MSG_STAT - nrecv++; - mn->recv_by_opcode.add(msg); -#endif - } + mn->incoming_msgs.enqueue( + std::make_pair(std::move(msg), static_pointer_cast(self()))); } } } @@ -414,8 +432,8 @@ void PeerNetwork::Peer::reset_conn(conn_t new_conn) { { if (conn) { - SALTICIDAE_LOG_DEBUG("moving send buffer"); - new_conn->move_send_buffer(conn); + //SALTICIDAE_LOG_DEBUG("moving send buffer"); + //new_conn->move_send_buffer(conn); SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str()); conn->terminate(); } -- cgit v1.2.3