diff options
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r-- | include/salticidae/network.h | 115 |
1 files changed, 63 insertions, 52 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 40f17a1..19d6db0 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -89,10 +89,6 @@ class MsgNetwork: public ConnPool { mutable std::atomic<size_t> nsentb; mutable std::atomic<size_t> nrecvb; #endif - void stop() override { - ev_enqueue_poll.clear(); - ConnPool::Conn::stop(); - } public: Conn(): msg_state(HEADER), msg_sleep(false) @@ -138,12 +134,10 @@ class MsgNetwork: public ConnPool { ConnPool::Conn *create_conn() override { return new Conn(); } void on_read(const ConnPool::conn_t &) override; - void on_setup(const ConnPool::conn_t &_conn) override { + void on_worker_setup(const ConnPool::conn_t &_conn) override { auto conn = static_pointer_cast<Conn>(_conn); - auto worker = conn->worker; - worker->get_tcall()->async_call([this, conn, worker](ThreadCall::Handle &) { - conn->ev_enqueue_poll = TimerEvent(worker->get_ec(), - [this, conn](TimerEvent &) { + conn->ev_enqueue_poll = TimerEvent(conn->worker->get_ec(), + [this, conn](TimerEvent &) { if (!incoming_msgs.enqueue(std::make_pair(conn->msg, conn), false)) { conn->msg_sleep = true; @@ -153,7 +147,12 @@ class MsgNetwork: public ConnPool { conn->msg_sleep = false; on_read(conn); }); - }); + } + + void on_worker_teardown(const ConnPool::conn_t &_conn) override { + auto conn = static_pointer_cast<Conn>(_conn); + conn->ev_enqueue_poll.clear(); + ConnPool::on_worker_teardown(_conn); } public: @@ -287,8 +286,8 @@ class ClientNetwork: public MsgNetwork<OpcodeType> { protected: ConnPool::Conn *create_conn() override { return new Conn(); } - void on_setup(const ConnPool::conn_t &) override; - void on_teardown(const ConnPool::conn_t &) override; + void on_dispatcher_setup(const ConnPool::conn_t &) override; + void on_dispatcher_teardown(const ConnPool::conn_t &) override; public: using Config = typename MsgNet::Config; @@ -376,12 +375,6 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { PeerNetwork *get_net() { return static_cast<PeerNetwork *>(ConnPool::Conn::get_pool()); } - - protected: - void stop() override { - ev_timeout.clear(); - MsgNet::Conn::stop(); - } }; using conn_t = ArcObj<Conn>; @@ -520,8 +513,10 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { protected: ConnPool::Conn *create_conn() override { return new Conn(); } - void on_setup(const ConnPool::conn_t &) override; - void on_teardown(const ConnPool::conn_t &) override; + void on_worker_setup(const ConnPool::conn_t &) override; + void on_worker_teardown(const ConnPool::conn_t &) override; + void on_dispatcher_setup(const ConnPool::conn_t &) override; + void on_dispatcher_teardown(const ConnPool::conn_t &) override; PeerId _get_peer_id(const X509 *cert, const NetAddr &addr) { if (!this->enable_tls || id_mode == ADDR_BASED) @@ -738,47 +733,65 @@ void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker, }); } -/* begin: functions invoked by the dispatcher */ template<typename O, O _, O __> -void PeerNetwork<O, _, __>::on_setup(const ConnPool::conn_t &_conn) { - MsgNet::on_setup(_conn); +void PeerNetwork<O, _, __>::on_worker_setup(const ConnPool::conn_t &_conn) { + MsgNet::on_worker_setup(_conn); auto conn = static_pointer_cast<Conn>(_conn); auto worker = conn->worker; + auto &ev_timeout = conn->ev_timeout; + assert(!ev_timeout); + ev_timeout = TimerEvent(worker->get_ec(), [=](TimerEvent &) { + try { + SALTICIDAE_LOG_INFO("%s%s%s: peer ping-pong timeout", + tty_secondary_color, + id_hex.c_str(), + tty_reset_color); + this->worker_terminate(conn); + } catch (...) { worker->error_callback(std::current_exception()); } + }); +} + +template<typename O, O _, O __> +void PeerNetwork<O, _, __>::on_worker_teardown(const ConnPool::conn_t &_conn) { + auto conn = static_pointer_cast<Conn>(_conn); + conn->ev_timeout.clear(); + MsgNet::on_worker_teardown(_conn); +} + +/* begin: functions invoked by the dispatcher */ + +/* the initial ping-pong to set up the connection */ +template<typename O, O _, O __> +void PeerNetwork<O, _, __>::on_dispatcher_setup(const ConnPool::conn_t &_conn) { + MsgNet::on_dispatcher_setup(_conn); + auto conn = static_pointer_cast<Conn>(_conn); SALTICIDAE_LOG_INFO("%s%s%s: setup connection %s", tty_secondary_color, id_hex.c_str(), tty_reset_color, std::string(*conn).c_str()); - worker->get_tcall()->async_call([this, conn, worker](ThreadCall::Handle &) { - auto &ev_timeout = conn->ev_timeout; - assert(!ev_timeout); - ev_timeout = TimerEvent(worker->get_ec(), [=](TimerEvent &) { - try { - SALTICIDAE_LOG_INFO("%s%s%s: peer ping-pong timeout", - tty_secondary_color, - id_hex.c_str(), - tty_reset_color); - this->worker_terminate(conn); - } catch (...) { worker->error_callback(std::current_exception()); } - }); - }); - /* the initial ping-pong to set up the connection */ - tcall_reset_timeout(worker, conn, conn_timeout); + tcall_reset_timeout(conn->worker, conn, conn_timeout); if (conn->get_mode() == Conn::ConnMode::ACTIVE) { auto pid = get_peer_id(conn, conn->get_addr()); - pinfo_slock_t _g(known_peers_lock); - send_msg(MsgPing( - listen_addr, - known_peers.find(pid)->second->get_nonce()), conn); + auto it = known_peers.find(pid); + if (it == known_peers.end()) + throw PeerNetworkError(SALTI_ERROR_PEER_NOT_MATCH); + else + { + pinfo_slock_t _g(known_peers_lock); + send_msg(MsgPing( + listen_addr, + it->second->get_nonce()), conn); + } } else replace_pending_conn(conn); } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { - MsgNet::on_teardown(_conn); +void PeerNetwork<O, _, __>::on_dispatcher_teardown(const ConnPool::conn_t &_conn) { + MsgNet::on_dispatcher_teardown(_conn); auto conn = static_pointer_cast<Conn>(_conn); auto addr = conn->get_addr(); pending_peers.erase(addr); @@ -949,8 +962,7 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) { this->user_tcall->async_call([this, addr=msg.claimed_addr, conn](ThreadCall::Handle &) { if (unknown_peer_cb) unknown_peer_cb(addr, conn->get_peer_cert()); }); - this->disp_terminate(conn); - return; + throw PeerNetworkError(SALTI_ERROR_PEER_NOT_MATCH); } auto &p = pit->second; if (p->state != Peer::State::DISCONNECTED || @@ -1018,8 +1030,7 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) { SALTICIDAE_LOG_WARN( "%s%s%s: unexpected pong from an unknown peer", tty_secondary_color, id_hex.c_str(), tty_reset_color); - this->disp_terminate(conn); - return; + throw PeerNetworkError(SALTI_ERROR_PEER_NOT_MATCH); } auto &p = pit->second; assert(!p->addr.is_null() && p->addr == conn->get_addr()); @@ -1290,8 +1301,8 @@ inline int32_t PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vecto /* end: functions invoked by the user loop */ template<typename OpcodeType> -void ClientNetwork<OpcodeType>::on_setup(const ConnPool::conn_t &_conn) { - MsgNet::on_setup(_conn); +void ClientNetwork<OpcodeType>::on_dispatcher_setup(const ConnPool::conn_t &_conn) { + MsgNet::on_dispatcher_setup(_conn); auto conn = static_pointer_cast<Conn>(_conn); assert(conn->get_mode() == Conn::PASSIVE); const auto &addr = conn->get_addr(); @@ -1301,8 +1312,8 @@ void ClientNetwork<OpcodeType>::on_setup(const ConnPool::conn_t &_conn) { } template<typename OpcodeType> -void ClientNetwork<OpcodeType>::on_teardown(const ConnPool::conn_t &_conn) { - MsgNet::on_teardown(_conn); +void ClientNetwork<OpcodeType>::on_dispatcher_teardown(const ConnPool::conn_t &_conn) { + MsgNet::on_dispatcher_teardown(_conn); auto conn = static_pointer_cast<Conn>(_conn); conn->get_net()->addr2conn.erase(conn->get_addr()); } |