From 0465243c710ede74a78885077140d8673efbc647 Mon Sep 17 00:00:00 2001 From: Determinant Date: Fri, 20 Jul 2018 20:08:59 -0400 Subject: improve msg & msg network interface --- include/salticidae/network.h | 263 +++++++++++++++++++++++++++---------------- 1 file changed, 164 insertions(+), 99 deletions(-) (limited to 'include/salticidae/network.h') diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 0ea7455..1e0f560 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -32,16 +32,51 @@ namespace salticidae { +template +inline auto handler_bind(ReturnType(ClassType::* f)(Args...), FArgs&&... fargs) { + return std::function(std::bind(f, std::forward(fargs)...)); +} + /** Network of nodes who can send async messages. */ -template +template class MsgNetwork: public ConnPool { public: + using Msg = MsgBase; + /* match lambdas */ + template + struct callback_traits: + public callback_traits {}; + + /* match plain functions */ + template + struct callback_traits { + using ret_type = ReturnType; + using conn_type = ConnType; + using msg_type = typename std::remove_reference::type; + }; + + /* match function pointers */ + template + struct callback_traits: + public callback_traits {}; + + /* match const member functions */ + template + struct callback_traits: + public callback_traits {}; + + /* match member functions */ + template + struct callback_traits: + public callback_traits {}; + class Conn: public ConnPool::Conn { enum MsgState { HEADER, PAYLOAD }; - MsgType msg; + + Msg msg; MsgState msg_state; MsgNetwork *mn; @@ -74,13 +109,13 @@ class MsgNetwork: public ConnPool { }; using conn_t = RcObj; - using msg_callback_t = std::function; + using msg_callback_t = std::function; #ifdef SALTICIDAE_MSG_STAT class msg_stat_by_opcode_t: - public std::unordered_map> { public: - void add(const MsgType &msg) { + void add(const Msg &msg) { auto &p = this->operator[](msg.get_opcode()); p.first++; p.second += msg.get_length(); @@ -89,7 +124,7 @@ class MsgNetwork: public ConnPool { #endif private: - std::unordered_map handler_map; protected: @@ -101,6 +136,7 @@ class MsgNetwork: public ConnPool { ConnPool::conn_t create_conn() override { return (new Conn(this))->self(); } public: + MsgNetwork(const EventContext &eb, int max_listen_backlog, double conn_server_timeout, @@ -109,7 +145,18 @@ class MsgNetwork: public ConnPool { conn_server_timeout, seg_buff_size) {} - void reg_handler(typename MsgType::opcode_t opcode, msg_callback_t handler); + template + typename std::enable_if::msg_type, DataStream &&>::value>::type + reg_handler(Func handler) { + using callback_t = callback_traits; + handler_map[callback_t::msg_type::opcode] = [handler](const Msg &msg, conn_t conn) { + handler(typename callback_t::msg_type(msg.get_payload()), + static_pointer_cast(conn)); + }; + } + + template void send_msg(const MsgType &msg, conn_t conn); using ConnPool::listen; #ifdef SALTICIDAE_MSG_STAT @@ -126,9 +173,10 @@ class MsgNetwork: public ConnPool { }; /** Simple network that handles client-server requests. */ -template -class ClientNetwork: public MsgNetwork { - using MsgNet = MsgNetwork; +template +class ClientNetwork: public MsgNetwork { + using MsgNet = MsgNetwork; + using Msg = typename MsgNet::Msg; std::unordered_map addr2conn; public: @@ -159,6 +207,7 @@ class ClientNetwork: public MsgNetwork { conn_server_timeout, seg_buff_size) {} + template void send_msg(const MsgType &msg, const NetAddr &addr); conn_t connect(const NetAddr &addr) = delete; }; @@ -169,9 +218,12 @@ class PeerNetworkError: public SalticidaeError { /** Peer-to-peer network where any two nodes could hold a bi-diretional message * channel, established by either side. */ -template -class PeerNetwork: public MsgNetwork { - using MsgNet= MsgNetwork; +template +class PeerNetwork: public MsgNetwork { + using MsgNet = MsgNetwork; + using Msg = typename MsgNet::Msg; public: enum IdentityMode { IP_BASED, @@ -244,8 +296,34 @@ class PeerNetwork: public MsgNetwork { double conn_timeout; uint16_t listen_port; - void msg_ping(const MsgType &msg, ConnPool::conn_t conn); - void msg_pong(const MsgType &msg, ConnPool::conn_t conn); + struct MsgPing { + static const OpcodeType opcode; + DataStream serialized; + uint16_t port; + MsgPing(uint16_t port) { + serialized << htole(port); + } + MsgPing(DataStream &&s) { + s >> port; + port = letoh(port); + } + }; + + struct MsgPong { + static const OpcodeType opcode; + DataStream serialized; + uint16_t port; + MsgPong(uint16_t port) { + serialized << htole(port); + } + MsgPong(DataStream &&s) { + s >> port; + port = letoh(port); + } + }; + + void msg_ping(MsgPing &&msg, conn_t conn); + void msg_pong(MsgPong &&msg, conn_t conn); void reset_conn_timeout(conn_t conn); bool check_new_conn(conn_t conn, uint16_t port); void start_active_conn(const NetAddr &paddr); @@ -275,7 +353,9 @@ class PeerNetwork: public MsgNetwork { void add_peer(const NetAddr &paddr); const conn_t get_peer_conn(const NetAddr &paddr) const; + template void send_msg(const MsgType &msg, const Peer *peer); + template void send_msg(const MsgType &msg, const NetAddr &paddr); void listen(NetAddr listen_addr); bool has_peer(const NetAddr &paddr) const; @@ -285,18 +365,18 @@ class PeerNetwork: public MsgNetwork { } }; -template -void MsgNetwork::Conn::on_read() { +template +void MsgNetwork::Conn::on_read() { auto &recv_buffer = read(); auto conn = static_pointer_cast(self()); while (get_fd() != -1) { if (msg_state == Conn::HEADER) { - if (recv_buffer.size() < MsgType::header_size) break; + if (recv_buffer.size() < Msg::header_size) break; /* new header available */ - bytearray_t data = recv_buffer.pop(MsgType::header_size); - msg = MsgType(data.data()); + bytearray_t data = recv_buffer.pop(Msg::header_size); + msg = Msg(data.data()); msg_state = Conn::PAYLOAD; } if (msg_state == Conn::PAYLOAD) @@ -331,8 +411,8 @@ void MsgNetwork::Conn::on_read() { } } -template -void PeerNetwork::Peer::reset_conn(conn_t new_conn) { +template +void PeerNetwork::Peer::reset_conn(conn_t new_conn) { if (conn != new_conn) { if (conn) @@ -348,23 +428,21 @@ void PeerNetwork::Peer::reset_conn(conn_t new_conn) { clear_all_events(); } -template -void PeerNetwork::Conn::on_setup() { +template +void PeerNetwork::Conn::on_setup() { assert(!ev_timeout); ev_timeout = Event(pn->eb, -1, 0, [this](evutil_socket_t, short) { SALTICIDAE_LOG_INFO("peer ping-pong timeout"); this->terminate(); }); /* the initial ping-pong to set up the connection */ - MsgType ping; - ping.gen_ping(pn->listen_port); auto conn = static_pointer_cast(this->self()); pn->reset_conn_timeout(conn); - pn->MsgNet::send_msg(ping, conn); + pn->MsgNet::send_msg(MsgPing(pn->listen_port), conn); } -template -void PeerNetwork::Conn::on_teardown() { +template +void PeerNetwork::Conn::on_teardown() { auto it = pn->id2peer.find(peer_id); if (it == pn->id2peer.end()) return; auto p = it->second.get(); @@ -383,8 +461,8 @@ void PeerNetwork::Conn::on_teardown() { p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout()); } -template -bool PeerNetwork::check_new_conn(conn_t conn, uint16_t port) { +template +bool PeerNetwork::check_new_conn(conn_t conn, uint16_t port) { if (conn->peer_id.is_null()) { /* passive connections can eventually have ids after getting the port number in IP_BASED_PORT mode */ @@ -412,21 +490,18 @@ bool PeerNetwork::check_new_conn(conn_t conn, uint16_t port) { return false; } -template -void PeerNetwork::msg_ping(const MsgType &msg, ConnPool::conn_t conn_) { +template +void PeerNetwork::msg_ping(MsgPing &&msg, conn_t conn_) { auto conn = static_pointer_cast(conn_); - uint16_t port; - msg.parse_ping(port); + uint16_t port = msg.port; SALTICIDAE_LOG_INFO("ping from %s, port %u", std::string(*conn).c_str(), ntohs(port)); if (check_new_conn(conn, port)) return; auto p = id2peer.find(conn->peer_id)->second.get(); - MsgType pong; - pong.gen_pong(this->listen_port); - send_msg(pong, p); + send_msg(MsgPong(this->listen_port), p); } -template -void PeerNetwork::msg_pong(const MsgType &msg, ConnPool::conn_t conn_) { +template +void PeerNetwork::msg_pong(MsgPong &&msg, conn_t conn_) { auto conn = static_pointer_cast(conn_); auto it = id2peer.find(conn->peer_id); if (it == id2peer.end()) @@ -435,8 +510,7 @@ void PeerNetwork::msg_pong(const MsgType &msg, ConnPool::conn_t conn_) return; } auto p = it->second.get(); - uint16_t port; - msg.parse_pong(port); + uint16_t port = msg.port; if (check_new_conn(conn, port)) return; p->pong_msg_ok = true; if (p->ping_timer_ok) @@ -446,18 +520,16 @@ void PeerNetwork::msg_pong(const MsgType &msg, ConnPool::conn_t conn_) } } -template -void PeerNetwork::listen(NetAddr listen_addr) { +template +void PeerNetwork::listen(NetAddr listen_addr) { MsgNet::listen(listen_addr); listen_port = listen_addr.port; - this->reg_handler(MsgType::OPCODE_PING, - std::bind(&PeerNetwork::msg_ping, this, _1, _2)); - this->reg_handler(MsgType::OPCODE_PONG, - std::bind(&PeerNetwork::msg_pong, this, _1, _2)); + this->reg_handler(handler_bind(&PeerNetwork::msg_ping, this, _1, _2)); + this->reg_handler(handler_bind(&PeerNetwork::msg_pong, this, _1, _2)); } -template -void PeerNetwork::start_active_conn(const NetAddr &addr) { +template +void PeerNetwork::start_active_conn(const NetAddr &addr) { auto p = id2peer.find(addr)->second.get(); if (p->connected) return; auto conn = static_pointer_cast(connect(addr)); @@ -468,8 +540,8 @@ void PeerNetwork::start_active_conn(const NetAddr &addr) { conn->peer_id.port = 0; } -template -void PeerNetwork::add_peer(const NetAddr &addr) { +template +void PeerNetwork::add_peer(const NetAddr &addr) { auto it = id2peer.find(addr); if (it != id2peer.end()) throw PeerNetworkError("peer already exists"); @@ -478,28 +550,26 @@ void PeerNetwork::add_peer(const NetAddr &addr) { start_active_conn(addr); } -template -const typename PeerNetwork::conn_t -PeerNetwork::get_peer_conn(const NetAddr &paddr) const { +template +const typename PeerNetwork::conn_t +PeerNetwork::get_peer_conn(const NetAddr &paddr) const { auto it = id2peer.find(paddr); if (it == id2peer.end()) throw PeerNetworkError("peer does not exist"); return it->second->conn; } -template -bool PeerNetwork::has_peer(const NetAddr &paddr) const { +template +bool PeerNetwork::has_peer(const NetAddr &paddr) const { return id2peer.count(paddr); } +template template -void MsgNetwork::reg_handler(typename MsgType::opcode_t opcode, - msg_callback_t handler) { - handler_map[opcode] = handler; -} - -template -void MsgNetwork::send_msg(const MsgType &msg, conn_t conn) { +void MsgNetwork::send_msg(const MsgType &_msg, conn_t conn) { + Msg msg; + msg.set_opcode(MsgType::opcode); + msg.set_payload(std::move(_msg.serialized)); bytearray_t msg_data = msg.serialize(); SALTICIDAE_LOG_DEBUG("wrote message %s to %s", std::string(msg).c_str(), @@ -511,65 +581,53 @@ void MsgNetwork::send_msg(const MsgType &msg, conn_t conn) { #endif } +template template -void PeerNetwork::send_msg(const MsgType &msg, const Peer *peer) { - bytearray_t msg_data = msg.serialize(); - SALTICIDAE_LOG_DEBUG("wrote message %s to %s", - std::string(msg).c_str(), - std::string(peer->addr).c_str()); +void PeerNetwork::send_msg(const MsgType &msg, const Peer *peer) { if (peer->connected) - { - SALTICIDAE_LOG_DEBUG("wrote to ConnPool"); - peer->conn->write(std::move(msg_data)); - } + MsgNet::send_msg(msg, peer->conn); else - { SALTICIDAE_LOG_DEBUG("dropped"); - } -#ifdef SALTICIDAE_MSG_STAT - peer->conn->nsent++; - this->sent_by_opcode.add(msg); -#endif } +template template -void PeerNetwork::send_msg(const MsgType &msg, const NetAddr &addr) { +void PeerNetwork::send_msg(const MsgType &msg, const NetAddr &addr) { auto it = id2peer.find(addr); if (it == id2peer.end()) { - SALTICIDAE_LOG_ERROR("sending to non-existing peer: %s", std::string(addr).c_str()); + SALTICIDAE_LOG_ERROR("sending to non-existing peer: %s", + std::string(addr).c_str()); throw PeerNetworkError("peer does not exist"); } send_msg(msg, it->second.get()); } -template -void PeerNetwork::Peer::reset_ping_timer() { +template +void PeerNetwork::Peer::reset_ping_timer() { assert(ev_ping_timer); ev_ping_timer.del(); ev_ping_timer.add_with_timeout(gen_rand_timeout(pn->ping_period)); } -template -void PeerNetwork::reset_conn_timeout(conn_t conn) { +template +void PeerNetwork::reset_conn_timeout(conn_t conn) { assert(conn->ev_timeout); conn->ev_timeout.del(); conn->ev_timeout.add_with_timeout(conn_timeout); SALTICIDAE_LOG_INFO("reset timeout %.2f", conn_timeout); } -template -void PeerNetwork::Peer::send_ping() { +template +void PeerNetwork::Peer::send_ping() { ping_timer_ok = false; pong_msg_ok = false; - MsgType ping; - ping.gen_ping(pn->listen_port); pn->reset_conn_timeout(conn); - pn->send_msg(ping, this); + pn->send_msg(MsgPing(pn->listen_port), this); } -template -void PeerNetwork::Peer::ping_timer(evutil_socket_t, short) { +template +void PeerNetwork::Peer::ping_timer(evutil_socket_t, short) { ping_timer_ok = true; if (pong_msg_ok) { @@ -578,13 +636,13 @@ void PeerNetwork::Peer::ping_timer(evutil_socket_t, short) { } } -template -const std::vector &PeerNetwork::all_peers() const { +template +const std::vector &PeerNetwork::all_peers() const { return peer_list; } -template -void ClientNetwork::Conn::on_setup() { +template +void ClientNetwork::Conn::on_setup() { assert(this->get_mode() == Conn::PASSIVE); const auto &addr = this->get_addr(); cn->addr2conn.erase(addr); @@ -593,19 +651,26 @@ void ClientNetwork::Conn::on_setup() { static_pointer_cast(this->self()))); } -template -void ClientNetwork::Conn::on_teardown() { +template +void ClientNetwork::Conn::on_teardown() { assert(this->get_mode() == Conn::PASSIVE); cn->addr2conn.erase(this->get_addr()); } +template template -void ClientNetwork::send_msg(const MsgType &msg, const NetAddr &addr) { +void ClientNetwork::send_msg(const MsgType &msg, const NetAddr &addr) { auto it = addr2conn.find(addr); if (it == addr2conn.end()) return; MsgNet::send_msg(msg, it->second); } +template +const O PeerNetwork::MsgPing::opcode = OPCODE_PING; + +template +const O PeerNetwork::MsgPong::opcode = OPCODE_PONG; + } #endif -- cgit v1.2.3