aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/network.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r--include/salticidae/network.h64
1 files changed, 41 insertions, 23 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 4e966d1..d82772f 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -103,7 +103,7 @@ class MsgNetwork: public ConnPool {
void on_read() override;
};
- using conn_t = RcObj<Conn>;
+ using conn_t = ArcObj<Conn>;
#ifdef SALTICIDAE_MSG_STAT
class msg_stat_by_opcode_t:
public std::unordered_map<typename Msg::opcode_t,
@@ -121,6 +121,8 @@ class MsgNetwork: public ConnPool {
std::unordered_map<
typename Msg::opcode_t,
std::function<void(const Msg &msg, Conn &)>> handler_map;
+ using queue_t = MPSCQueueEventDriven<std::pair<Msg, conn_t>>;
+ queue_t incoming_msgs;
protected:
#ifdef SALTICIDAE_MSG_STAT
@@ -134,10 +136,38 @@ class MsgNetwork: public ConnPool {
MsgNetwork(const EventContext &ec,
int max_listen_backlog,
double conn_server_timeout,
- size_t seg_buff_size):
+ size_t seg_buff_size,
+ size_t burst_size = 1000):
ConnPool(ec, max_listen_backlog,
conn_server_timeout,
- seg_buff_size) {}
+ seg_buff_size) {
+ incoming_msgs.reg_handler(ec, [this, burst_size](queue_t &q) {
+ std::pair<Msg, conn_t> item;
+ size_t cnt = 0;
+ while (q.try_dequeue(item))
+ {
+ auto &msg = item.first;
+ auto &conn = item.second;
+ auto it = handler_map.find(msg.get_opcode());
+ if (it == handler_map.end())
+ SALTICIDAE_LOG_WARN("unknown opcode: %s",
+ get_hex(msg.get_opcode()).c_str());
+ else /* call the handler */
+ {
+ SALTICIDAE_LOG_DEBUG("got message %s from %s",
+ std::string(msg).c_str(),
+ std::string(*conn).c_str());
+ it->second(msg, *conn);
+#ifdef SALTICIDAE_MSG_STAT
+ conn->nrecv++;
+ recv_by_opcode.add(msg);
+#endif
+ }
+ if (++cnt == burst_size) return true;
+ }
+ return false;
+ });
+ }
template<typename Func>
typename std::enable_if<std::is_constructible<
@@ -189,7 +219,7 @@ class ClientNetwork: public MsgNetwork<OpcodeType> {
void on_teardown() override;
};
- using conn_t = RcObj<Conn>;
+ using conn_t = ArcObj<Conn>;
protected:
ConnPool::Conn *create_conn() override { return new Conn(); }
@@ -249,7 +279,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
void on_teardown() override;
};
- using conn_t = RcObj<Conn>;
+ using conn_t = ArcObj<Conn>;
private:
struct Peer {
@@ -361,10 +391,11 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
}
};
+/* this callback is run by a worker */
template<typename OpcodeType>
void MsgNetwork<OpcodeType>::Conn::on_read() {
ConnPool::Conn::on_read();
- auto &recv_buffer = read();
+ auto &recv_buffer = get_recv_buffer();
auto mn = get_net();
while (get_fd() != -1)
{
@@ -389,21 +420,8 @@ void MsgNetwork<OpcodeType>::Conn::on_read() {
return;
}
#endif
- auto it = mn->handler_map.find(msg.get_opcode());
- if (it == mn->handler_map.end())
- SALTICIDAE_LOG_WARN("unknown opcode: %s",
- get_hex(msg.get_opcode()).c_str());
- else /* call the handler */
- {
- SALTICIDAE_LOG_DEBUG("got message %s from %s",
- std::string(msg).c_str(),
- std::string(*this).c_str());
- it->second(msg, *this);
-#ifdef SALTICIDAE_MSG_STAT
- nrecv++;
- mn->recv_by_opcode.add(msg);
-#endif
- }
+ mn->incoming_msgs.enqueue(
+ std::make_pair(std::move(msg), static_pointer_cast<Conn>(self())));
}
}
}
@@ -414,8 +432,8 @@ void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) {
{
if (conn)
{
- SALTICIDAE_LOG_DEBUG("moving send buffer");
- new_conn->move_send_buffer(conn);
+ //SALTICIDAE_LOG_DEBUG("moving send buffer");
+ //new_conn->move_send_buffer(conn);
SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str());
conn->terminate();
}