diff options
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r-- | include/salticidae/network.h | 63 |
1 files changed, 40 insertions, 23 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h index e5165bf..a63976b 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -275,9 +275,9 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { const NetAddr &get_peer() { return peer_id; } protected: - void on_close() override { + void stop() override { ev_timeout.clear(); - MsgNet::Conn::on_close(); + MsgNet::Conn::stop(); } void on_setup() override; @@ -302,7 +302,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { Peer(NetAddr addr, conn_t conn, const EventContext &ec): addr(addr), conn(conn), ev_ping_timer( - Event(ec, -1, 0, std::bind(&Peer::ping_timer, this, _1, _2))), + Event(ec, -1, std::bind(&Peer::ping_timer, this, _1, _2))), connected(false) {} ~Peer() {} Peer &operator=(const Peer &) = delete; @@ -358,6 +358,8 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { void _pong_msg_cb(const conn_t &conn, uint16_t port); bool check_new_conn(const conn_t &conn, uint16_t port); void start_active_conn(const NetAddr &paddr); + static void tcall_reset_timeout(ConnPool::Worker *worker, + const conn_t &conn, double timeout); protected: ConnPool::Conn *create_conn() override { return new Conn(); } @@ -385,6 +387,8 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { this->reg_handler(generic_bind(&PeerNetwork::msg_pong, this, _1, _2)); } + ~PeerNetwork() { this->stop(); } + void add_peer(const NetAddr &paddr); const conn_t get_peer_conn(const NetAddr &paddr) const; using MsgNet::send_msg; @@ -445,15 +449,28 @@ void MsgNetwork<OpcodeType>::send_msg(const MsgType &_msg, Conn &conn) { #endif } +template<typename O, O _, O __> +void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker, + const conn_t &conn, double timeout) { + worker->get_tcall()->call([conn, t=timeout](ThreadCall::Handle &) { + assert(conn->ev_timeout); + conn->ev_timeout.del(); + conn->ev_timeout.add_with_timeout(t, 0); + SALTICIDAE_LOG_INFO("reset timeout %.2f", t); + }); +} + /* begin: functions invoked by the dispatcher */ template<typename O, O _, O __> void PeerNetwork<O, _, __>::Conn::on_setup() { MsgNet::Conn::on_setup(); auto pn = get_net(); + auto conn = static_pointer_cast<Conn>(this->self()); + auto worker = this->worker; assert(!ev_timeout); - ev_timeout = Event(pn->dispatcher_ec, -1, 0, [this](int, int) { + ev_timeout = Event(worker->get_ec(), -1, [conn](int, int) { SALTICIDAE_LOG_INFO("peer ping-pong timeout"); - this->terminate(); + conn->terminate(); }); if (this->get_mode() == Conn::ConnMode::ACTIVE) { @@ -461,9 +478,8 @@ void PeerNetwork<O, _, __>::Conn::on_setup() { if (pn->id_mode == IP_BASED) peer_id.port = 0; } /* the initial ping-pong to set up the connection */ - auto &conn = static_cast<Conn &>(*this); - reset_timeout(pn->conn_timeout); - pn->send_msg(MsgPing(pn->listen_port), conn); + tcall_reset_timeout(worker, conn, pn->conn_timeout); + pn->send_msg(MsgPing(pn->listen_port), *conn); } template<typename O, O _, O __> @@ -481,11 +497,11 @@ void PeerNetwork<O, _, __>::Conn::on_teardown() { std::string(*this).c_str(), std::string(peer_id).c_str()); // try to reconnect - p->ev_retry_timer = Event(pn->dispatcher_ec, -1, 0, + p->ev_retry_timer = Event(pn->disp_ec, -1, [pn, peer_id = this->peer_id](int, int) { pn->start_active_conn(peer_id); }); - p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout()); + p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout(), 0); } template<typename O, O _, O __> @@ -497,7 +513,7 @@ void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) { //SALTICIDAE_LOG_DEBUG("moving send buffer"); //new_conn->move_send_buffer(conn); SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str()); - conn->terminate(); + conn->cpool->terminate(conn); } addr = new_conn->get_addr(); conn = new_conn; @@ -506,19 +522,11 @@ void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) { } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::Conn::reset_timeout(double timeout) { - assert(ev_timeout); - ev_timeout.del(); - ev_timeout.add_with_timeout(timeout); - SALTICIDAE_LOG_INFO("reset timeout %.2f", timeout); -} - -template<typename O, O _, O __> void PeerNetwork<O, _, __>::Peer::reset_ping_timer() { assert(ev_ping_timer); ev_ping_timer.del(); ev_ping_timer.add_with_timeout( - gen_rand_timeout(conn->get_net()->ping_period)); + gen_rand_timeout(conn->get_net()->ping_period), 0); } template<typename O, O _, O __> @@ -526,7 +534,7 @@ void PeerNetwork<O, _, __>::Peer::send_ping() { auto pn = conn->get_net(); ping_timer_ok = false; pong_msg_ok = false; - conn->reset_timeout(pn->conn_timeout); + tcall_reset_timeout(conn->worker, conn, pn->conn_timeout); pn->send_msg(MsgPing(pn->listen_port), *conn); } @@ -554,7 +562,7 @@ bool PeerNetwork<O, _, __>::check_new_conn(const conn_t &conn, uint16_t port) { { if (conn != p->conn) { - conn->terminate(); + conn->cpool->terminate(conn); return true; } return false; @@ -631,7 +639,7 @@ void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) { auto it = id2peer.find(addr); if (it != id2peer.end()) throw PeerNetworkError("peer already exists"); - id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->dispatcher_ec))); + id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->disp_ec))); start_active_conn(addr); }, true); } @@ -646,6 +654,9 @@ PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &paddr) const { throw PeerNetworkError("peer does not exist"); auto ptr = new conn_t(it->second->conn); h.set_result(ptr); + h.set_deleter([](void *data) { + delete static_cast<conn_t *>(data); + }); })); auto conn = *ret; delete ret; @@ -657,6 +668,9 @@ bool PeerNetwork<O, _, __>::has_peer(const NetAddr &paddr) const { auto ret = static_cast<bool *>(this->disp_tcall->call( [this, paddr](ThreadCall::Handle &h) { h.set_result(id2peer.count(paddr)); + h.set_deleter([](void *data) { + delete static_cast<bool *>(data); + }); })); auto has = *ret; delete ret; @@ -699,6 +713,9 @@ void ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr throw PeerNetworkError("client does not exist"); auto ptr = new conn_t(it->second->conn); h.set_result(ptr); + h.set_deleter([](void *data) { + delete static_cast<conn_t *>(data); + }); })); send_msg(msg, **ret); delete ret; |