diff options
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r-- | include/salticidae/network.h | 263 |
1 files changed, 164 insertions, 99 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 0ea7455..1e0f560 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -32,16 +32,51 @@ namespace salticidae { +template<typename ClassType, typename ReturnType, typename... Args, typename... FArgs> +inline auto handler_bind(ReturnType(ClassType::* f)(Args...), FArgs&&... fargs) { + return std::function<ReturnType(Args...)>(std::bind(f, std::forward<FArgs>(fargs)...)); +} + /** Network of nodes who can send async messages. */ -template<typename MsgType> +template<typename OpcodeType> class MsgNetwork: public ConnPool { public: + using Msg = MsgBase<OpcodeType>; + /* match lambdas */ + template<typename T> + struct callback_traits: + public callback_traits<decltype(&T::operator())> {}; + + /* match plain functions */ + template<typename ReturnType, typename MsgType, typename ConnType> + struct callback_traits<ReturnType(MsgType, ConnType)> { + using ret_type = ReturnType; + using conn_type = ConnType; + using msg_type = typename std::remove_reference<MsgType>::type; + }; + + /* match function pointers */ + template<typename ReturnType, typename... Args> + struct callback_traits<ReturnType(*)(Args...)>: + public callback_traits<ReturnType(Args...)> {}; + + /* match const member functions */ + template<typename ClassType, typename ReturnType, typename... Args> + struct callback_traits<ReturnType(ClassType::*)(Args...) const>: + public callback_traits<ReturnType(Args...)> {}; + + /* match member functions */ + template<typename ClassType, typename ReturnType, typename... Args> + struct callback_traits<ReturnType(ClassType::*)(Args...)>: + public callback_traits<ReturnType(Args...)> {}; + class Conn: public ConnPool::Conn { enum MsgState { HEADER, PAYLOAD }; - MsgType msg; + + Msg msg; MsgState msg_state; MsgNetwork *mn; @@ -74,13 +109,13 @@ class MsgNetwork: public ConnPool { }; using conn_t = RcObj<Conn>; - using msg_callback_t = std::function<void(const MsgType &msg, conn_t conn)>; + using msg_callback_t = std::function<void(const Msg &msg, conn_t conn)>; #ifdef SALTICIDAE_MSG_STAT class msg_stat_by_opcode_t: - public std::unordered_map<typename MsgType::opcode_t, + public std::unordered_map<typename Msg::opcode_t, std::pair<uint32_t, size_t>> { public: - void add(const MsgType &msg) { + void add(const Msg &msg) { auto &p = this->operator[](msg.get_opcode()); p.first++; p.second += msg.get_length(); @@ -89,7 +124,7 @@ class MsgNetwork: public ConnPool { #endif private: - std::unordered_map<typename MsgType::opcode_t, + std::unordered_map<typename Msg::opcode_t, msg_callback_t> handler_map; protected: @@ -101,6 +136,7 @@ class MsgNetwork: public ConnPool { ConnPool::conn_t create_conn() override { return (new Conn(this))->self(); } public: + MsgNetwork(const EventContext &eb, int max_listen_backlog, double conn_server_timeout, @@ -109,7 +145,18 @@ class MsgNetwork: public ConnPool { conn_server_timeout, seg_buff_size) {} - void reg_handler(typename MsgType::opcode_t opcode, msg_callback_t handler); + template<typename Func> + typename std::enable_if<std::is_constructible< + typename callback_traits<Func>::msg_type, DataStream &&>::value>::type + reg_handler(Func handler) { + using callback_t = callback_traits<Func>; + handler_map[callback_t::msg_type::opcode] = [handler](const Msg &msg, conn_t conn) { + handler(typename callback_t::msg_type(msg.get_payload()), + static_pointer_cast<typename callback_t::conn_type::type>(conn)); + }; + } + + template<typename MsgType> void send_msg(const MsgType &msg, conn_t conn); using ConnPool::listen; #ifdef SALTICIDAE_MSG_STAT @@ -126,9 +173,10 @@ class MsgNetwork: public ConnPool { }; /** Simple network that handles client-server requests. */ -template<typename MsgType> -class ClientNetwork: public MsgNetwork<MsgType> { - using MsgNet = MsgNetwork<MsgType>; +template<typename OpcodeType> +class ClientNetwork: public MsgNetwork<OpcodeType> { + using MsgNet = MsgNetwork<OpcodeType>; + using Msg = typename MsgNet::Msg; std::unordered_map<NetAddr, typename MsgNet::conn_t> addr2conn; public: @@ -159,6 +207,7 @@ class ClientNetwork: public MsgNetwork<MsgType> { conn_server_timeout, seg_buff_size) {} + template<typename MsgType> void send_msg(const MsgType &msg, const NetAddr &addr); conn_t connect(const NetAddr &addr) = delete; }; @@ -169,9 +218,12 @@ class PeerNetworkError: public SalticidaeError { /** Peer-to-peer network where any two nodes could hold a bi-diretional message * channel, established by either side. */ -template<typename MsgType> -class PeerNetwork: public MsgNetwork<MsgType> { - using MsgNet= MsgNetwork<MsgType>; +template<typename OpcodeType = uint8_t, + OpcodeType OPCODE_PING = 0xf0, + OpcodeType OPCODE_PONG = 0xf1> +class PeerNetwork: public MsgNetwork<OpcodeType> { + using MsgNet = MsgNetwork<OpcodeType>; + using Msg = typename MsgNet::Msg; public: enum IdentityMode { IP_BASED, @@ -244,8 +296,34 @@ class PeerNetwork: public MsgNetwork<MsgType> { double conn_timeout; uint16_t listen_port; - void msg_ping(const MsgType &msg, ConnPool::conn_t conn); - void msg_pong(const MsgType &msg, ConnPool::conn_t conn); + struct MsgPing { + static const OpcodeType opcode; + DataStream serialized; + uint16_t port; + MsgPing(uint16_t port) { + serialized << htole(port); + } + MsgPing(DataStream &&s) { + s >> port; + port = letoh(port); + } + }; + + struct MsgPong { + static const OpcodeType opcode; + DataStream serialized; + uint16_t port; + MsgPong(uint16_t port) { + serialized << htole(port); + } + MsgPong(DataStream &&s) { + s >> port; + port = letoh(port); + } + }; + + void msg_ping(MsgPing &&msg, conn_t conn); + void msg_pong(MsgPong &&msg, conn_t conn); void reset_conn_timeout(conn_t conn); bool check_new_conn(conn_t conn, uint16_t port); void start_active_conn(const NetAddr &paddr); @@ -275,7 +353,9 @@ class PeerNetwork: public MsgNetwork<MsgType> { void add_peer(const NetAddr &paddr); const conn_t get_peer_conn(const NetAddr &paddr) const; + template<typename MsgType> void send_msg(const MsgType &msg, const Peer *peer); + template<typename MsgType> void send_msg(const MsgType &msg, const NetAddr &paddr); void listen(NetAddr listen_addr); bool has_peer(const NetAddr &paddr) const; @@ -285,18 +365,18 @@ class PeerNetwork: public MsgNetwork<MsgType> { } }; -template<typename MsgType> -void MsgNetwork<MsgType>::Conn::on_read() { +template<typename OpcodeType> +void MsgNetwork<OpcodeType>::Conn::on_read() { auto &recv_buffer = read(); auto conn = static_pointer_cast<Conn>(self()); while (get_fd() != -1) { if (msg_state == Conn::HEADER) { - if (recv_buffer.size() < MsgType::header_size) break; + if (recv_buffer.size() < Msg::header_size) break; /* new header available */ - bytearray_t data = recv_buffer.pop(MsgType::header_size); - msg = MsgType(data.data()); + bytearray_t data = recv_buffer.pop(Msg::header_size); + msg = Msg(data.data()); msg_state = Conn::PAYLOAD; } if (msg_state == Conn::PAYLOAD) @@ -331,8 +411,8 @@ void MsgNetwork<MsgType>::Conn::on_read() { } } -template<typename MsgType> -void PeerNetwork<MsgType>::Peer::reset_conn(conn_t new_conn) { +template<typename O, O _, O __> +void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) { if (conn != new_conn) { if (conn) @@ -348,23 +428,21 @@ void PeerNetwork<MsgType>::Peer::reset_conn(conn_t new_conn) { clear_all_events(); } -template<typename MsgType> -void PeerNetwork<MsgType>::Conn::on_setup() { +template<typename O, O _, O __> +void PeerNetwork<O, _, __>::Conn::on_setup() { assert(!ev_timeout); ev_timeout = Event(pn->eb, -1, 0, [this](evutil_socket_t, short) { SALTICIDAE_LOG_INFO("peer ping-pong timeout"); this->terminate(); }); /* the initial ping-pong to set up the connection */ - MsgType ping; - ping.gen_ping(pn->listen_port); auto conn = static_pointer_cast<Conn>(this->self()); pn->reset_conn_timeout(conn); - pn->MsgNet::send_msg(ping, conn); + pn->MsgNet::send_msg(MsgPing(pn->listen_port), conn); } -template<typename MsgType> -void PeerNetwork<MsgType>::Conn::on_teardown() { +template<typename O, O _, O __> +void PeerNetwork<O, _, __>::Conn::on_teardown() { auto it = pn->id2peer.find(peer_id); if (it == pn->id2peer.end()) return; auto p = it->second.get(); @@ -383,8 +461,8 @@ void PeerNetwork<MsgType>::Conn::on_teardown() { p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout()); } -template<typename MsgType> -bool PeerNetwork<MsgType>::check_new_conn(conn_t conn, uint16_t port) { +template<typename O, O _, O __> +bool PeerNetwork<O, _, __>::check_new_conn(conn_t conn, uint16_t port) { if (conn->peer_id.is_null()) { /* passive connections can eventually have ids after getting the port number in IP_BASED_PORT mode */ @@ -412,21 +490,18 @@ bool PeerNetwork<MsgType>::check_new_conn(conn_t conn, uint16_t port) { return false; } -template<typename MsgType> -void PeerNetwork<MsgType>::msg_ping(const MsgType &msg, ConnPool::conn_t conn_) { +template<typename O, O _, O __> +void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, conn_t conn_) { auto conn = static_pointer_cast<Conn>(conn_); - uint16_t port; - msg.parse_ping(port); + uint16_t port = msg.port; SALTICIDAE_LOG_INFO("ping from %s, port %u", std::string(*conn).c_str(), ntohs(port)); if (check_new_conn(conn, port)) return; auto p = id2peer.find(conn->peer_id)->second.get(); - MsgType pong; - pong.gen_pong(this->listen_port); - send_msg(pong, p); + send_msg(MsgPong(this->listen_port), p); } -template<typename MsgType> -void PeerNetwork<MsgType>::msg_pong(const MsgType &msg, ConnPool::conn_t conn_) { +template<typename O, O _, O __> +void PeerNetwork<O, _, __>::msg_pong(MsgPong &&msg, conn_t conn_) { auto conn = static_pointer_cast<Conn>(conn_); auto it = id2peer.find(conn->peer_id); if (it == id2peer.end()) @@ -435,8 +510,7 @@ void PeerNetwork<MsgType>::msg_pong(const MsgType &msg, ConnPool::conn_t conn_) return; } auto p = it->second.get(); - uint16_t port; - msg.parse_pong(port); + uint16_t port = msg.port; if (check_new_conn(conn, port)) return; p->pong_msg_ok = true; if (p->ping_timer_ok) @@ -446,18 +520,16 @@ void PeerNetwork<MsgType>::msg_pong(const MsgType &msg, ConnPool::conn_t conn_) } } -template<typename MsgType> -void PeerNetwork<MsgType>::listen(NetAddr listen_addr) { +template<typename O, O _, O __> +void PeerNetwork<O, _, __>::listen(NetAddr listen_addr) { MsgNet::listen(listen_addr); listen_port = listen_addr.port; - this->reg_handler(MsgType::OPCODE_PING, - std::bind(&PeerNetwork::msg_ping, this, _1, _2)); - this->reg_handler(MsgType::OPCODE_PONG, - std::bind(&PeerNetwork::msg_pong, this, _1, _2)); + this->reg_handler(handler_bind(&PeerNetwork::msg_ping, this, _1, _2)); + this->reg_handler(handler_bind(&PeerNetwork::msg_pong, this, _1, _2)); } -template<typename MsgType> -void PeerNetwork<MsgType>::start_active_conn(const NetAddr &addr) { +template<typename O, O _, O __> +void PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) { auto p = id2peer.find(addr)->second.get(); if (p->connected) return; auto conn = static_pointer_cast<Conn>(connect(addr)); @@ -468,8 +540,8 @@ void PeerNetwork<MsgType>::start_active_conn(const NetAddr &addr) { conn->peer_id.port = 0; } -template<typename MsgType> -void PeerNetwork<MsgType>::add_peer(const NetAddr &addr) { +template<typename O, O _, O __> +void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) { auto it = id2peer.find(addr); if (it != id2peer.end()) throw PeerNetworkError("peer already exists"); @@ -478,28 +550,26 @@ void PeerNetwork<MsgType>::add_peer(const NetAddr &addr) { start_active_conn(addr); } -template<typename MsgType> -const typename PeerNetwork<MsgType>::conn_t -PeerNetwork<MsgType>::get_peer_conn(const NetAddr &paddr) const { +template<typename O, O _, O __> +const typename PeerNetwork<O, _, __>::conn_t +PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &paddr) const { auto it = id2peer.find(paddr); if (it == id2peer.end()) throw PeerNetworkError("peer does not exist"); return it->second->conn; } -template<typename MsgType> -bool PeerNetwork<MsgType>::has_peer(const NetAddr &paddr) const { +template<typename O, O _, O __> +bool PeerNetwork<O, _, __>::has_peer(const NetAddr &paddr) const { return id2peer.count(paddr); } +template<typename OpcodeType> template<typename MsgType> -void MsgNetwork<MsgType>::reg_handler(typename MsgType::opcode_t opcode, - msg_callback_t handler) { - handler_map[opcode] = handler; -} - -template<typename MsgType> -void MsgNetwork<MsgType>::send_msg(const MsgType &msg, conn_t conn) { +void MsgNetwork<OpcodeType>::send_msg(const MsgType &_msg, conn_t conn) { + Msg msg; + msg.set_opcode(MsgType::opcode); + msg.set_payload(std::move(_msg.serialized)); bytearray_t msg_data = msg.serialize(); SALTICIDAE_LOG_DEBUG("wrote message %s to %s", std::string(msg).c_str(), @@ -511,65 +581,53 @@ void MsgNetwork<MsgType>::send_msg(const MsgType &msg, conn_t conn) { #endif } +template<typename O, O _, O __> template<typename MsgType> -void PeerNetwork<MsgType>::send_msg(const MsgType &msg, const Peer *peer) { - bytearray_t msg_data = msg.serialize(); - SALTICIDAE_LOG_DEBUG("wrote message %s to %s", - std::string(msg).c_str(), - std::string(peer->addr).c_str()); +void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const Peer *peer) { if (peer->connected) - { - SALTICIDAE_LOG_DEBUG("wrote to ConnPool"); - peer->conn->write(std::move(msg_data)); - } + MsgNet::send_msg(msg, peer->conn); else - { SALTICIDAE_LOG_DEBUG("dropped"); - } -#ifdef SALTICIDAE_MSG_STAT - peer->conn->nsent++; - this->sent_by_opcode.add(msg); -#endif } +template<typename O, O _, O __> template<typename MsgType> -void PeerNetwork<MsgType>::send_msg(const MsgType &msg, const NetAddr &addr) { +void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &addr) { auto it = id2peer.find(addr); if (it == id2peer.end()) { - SALTICIDAE_LOG_ERROR("sending to non-existing peer: %s", std::string(addr).c_str()); + SALTICIDAE_LOG_ERROR("sending to non-existing peer: %s", + std::string(addr).c_str()); throw PeerNetworkError("peer does not exist"); } send_msg(msg, it->second.get()); } -template<typename MsgType> -void PeerNetwork<MsgType>::Peer::reset_ping_timer() { +template<typename O, O _, O __> +void PeerNetwork<O, _, __>::Peer::reset_ping_timer() { assert(ev_ping_timer); ev_ping_timer.del(); ev_ping_timer.add_with_timeout(gen_rand_timeout(pn->ping_period)); } -template<typename MsgType> -void PeerNetwork<MsgType>::reset_conn_timeout(conn_t conn) { +template<typename O, O _, O __> +void PeerNetwork<O, _, __>::reset_conn_timeout(conn_t conn) { assert(conn->ev_timeout); conn->ev_timeout.del(); conn->ev_timeout.add_with_timeout(conn_timeout); SALTICIDAE_LOG_INFO("reset timeout %.2f", conn_timeout); } -template<typename MsgType> -void PeerNetwork<MsgType>::Peer::send_ping() { +template<typename O, O _, O __> +void PeerNetwork<O, _, __>::Peer::send_ping() { ping_timer_ok = false; pong_msg_ok = false; - MsgType ping; - ping.gen_ping(pn->listen_port); pn->reset_conn_timeout(conn); - pn->send_msg(ping, this); + pn->send_msg(MsgPing(pn->listen_port), this); } -template<typename MsgType> -void PeerNetwork<MsgType>::Peer::ping_timer(evutil_socket_t, short) { +template<typename O, O _, O __> +void PeerNetwork<O, _, __>::Peer::ping_timer(evutil_socket_t, short) { ping_timer_ok = true; if (pong_msg_ok) { @@ -578,13 +636,13 @@ void PeerNetwork<MsgType>::Peer::ping_timer(evutil_socket_t, short) { } } -template<typename MsgType> -const std::vector<NetAddr> &PeerNetwork<MsgType>::all_peers() const { +template<typename O, O _, O __> +const std::vector<NetAddr> &PeerNetwork<O, _, __>::all_peers() const { return peer_list; } -template<typename MsgType> -void ClientNetwork<MsgType>::Conn::on_setup() { +template<typename OpcodeType> +void ClientNetwork<OpcodeType>::Conn::on_setup() { assert(this->get_mode() == Conn::PASSIVE); const auto &addr = this->get_addr(); cn->addr2conn.erase(addr); @@ -593,19 +651,26 @@ void ClientNetwork<MsgType>::Conn::on_setup() { static_pointer_cast<Conn>(this->self()))); } -template<typename MsgType> -void ClientNetwork<MsgType>::Conn::on_teardown() { +template<typename OpcodeType> +void ClientNetwork<OpcodeType>::Conn::on_teardown() { assert(this->get_mode() == Conn::PASSIVE); cn->addr2conn.erase(this->get_addr()); } +template<typename OpcodeType> template<typename MsgType> -void ClientNetwork<MsgType>::send_msg(const MsgType &msg, const NetAddr &addr) { +void ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr) { auto it = addr2conn.find(addr); if (it == addr2conn.end()) return; MsgNet::send_msg(msg, it->second); } +template<typename O, O OPCODE_PING, O _> +const O PeerNetwork<O, OPCODE_PING, _>::MsgPing::opcode = OPCODE_PING; + +template<typename O, O _, O OPCODE_PONG> +const O PeerNetwork<O, _, OPCODE_PONG>::MsgPong::opcode = OPCODE_PONG; + } #endif |