diff options
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r-- | include/salticidae/network.h | 79 |
1 files changed, 40 insertions, 39 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 07c6ba5..20dc696 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -109,7 +109,6 @@ class MsgNetwork: public ConnPool { #endif protected: - void on_read() override; }; using conn_t = ArcObj<Conn>; @@ -127,6 +126,7 @@ class MsgNetwork: public ConnPool { protected: ConnPool::Conn *create_conn() override { return new Conn(); } + void on_read(const ConnPool::conn_t &) override; public: @@ -287,12 +287,9 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { protected: void stop() override { - ev_timeout.clear(); + ev_timeout.del(); MsgNet::Conn::stop(); } - - void on_setup() override; - void on_teardown() override; }; using conn_t = ArcObj<Conn>; @@ -326,7 +323,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { if (ev_ping_timer) ev_ping_timer.del(); } - void reset_conn(conn_t conn); + void reset_conn(const conn_t &conn); }; std::unordered_map<NetAddr, BoxObj<Peer>> id2peer; @@ -381,6 +378,8 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { virtual double gen_conn_timeout() { return gen_rand_timeout(retry_conn_delay); } + void on_setup(const ConnPool::conn_t &) override; + void on_teardown(const ConnPool::conn_t &) override; public: @@ -466,11 +465,13 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { /* this callback is run by a worker */ template<typename OpcodeType> -void MsgNetwork<OpcodeType>::Conn::on_read() { - ConnPool::Conn::on_read(); - auto &recv_buffer = this->recv_buffer; - auto mn = get_net(); - while (self_ref) +void MsgNetwork<OpcodeType>::on_read(const ConnPool::conn_t &_conn) { + ConnPool::on_read(_conn); + auto conn = static_pointer_cast<Conn>(_conn); + auto &recv_buffer = conn->recv_buffer; + auto &msg = conn->msg; + auto &msg_state = conn->msg_state; + while (true) //(!conn->is_terminated()) { if (msg_state == Conn::HEADER) { @@ -493,8 +494,7 @@ void MsgNetwork<OpcodeType>::Conn::on_read() { return; } #endif - auto conn = static_pointer_cast<Conn>(self()); - while (!mn->incoming_msgs.enqueue(std::make_pair(msg, conn), false)) + while (!incoming_msgs.enqueue(std::make_pair(msg, conn), false)) std::this_thread::yield(); } } @@ -550,46 +550,47 @@ void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker, /* begin: functions invoked by the dispatcher */ template<typename O, O _, O __> -void PeerNetwork<O, _, __>::Conn::on_setup() { - MsgNet::Conn::on_setup(); - auto pn = get_net(); - auto conn = static_pointer_cast<Conn>(this->self()); - auto worker = this->worker; +void PeerNetwork<O, _, __>::on_setup(const ConnPool::conn_t &_conn) { + MsgNet::on_setup(_conn); + auto conn = static_pointer_cast<Conn>(_conn); + auto worker = conn->worker; + auto &ev_timeout = conn->ev_timeout; assert(!ev_timeout); ev_timeout = TimerEvent(worker->get_ec(), [worker, conn](TimerEvent &) { try { SALTICIDAE_LOG_INFO("peer ping-pong timeout"); - conn->worker_terminate(); + conn->get_net()->worker_terminate(conn); } catch (...) { worker->error_callback(std::current_exception()); } }); /* the initial ping-pong to set up the connection */ - tcall_reset_timeout(worker, conn, pn->conn_timeout); - pn->send_msg(MsgPing(pn->listen_port), conn); + tcall_reset_timeout(worker, conn, conn_timeout); + send_msg(MsgPing(listen_port), conn); } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::Conn::on_teardown() { - MsgNet::Conn::on_teardown(); - auto pn = get_net(); - auto p = pn->get_peer(peer_id); +void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { + MsgNet::on_teardown(_conn); + auto conn = static_pointer_cast<Conn>(_conn); + conn->ev_timeout.clear(); + const auto &peer_id = conn->peer_id; + auto p = get_peer(peer_id); if (!p) return; - if (this != p->conn.get()) return; + if (conn != p->conn) return; p->ev_ping_timer.del(); p->connected = false; //p->conn = nullptr; - SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*this).c_str()); + SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*conn).c_str()); // try to reconnect - p->ev_retry_timer = TimerEvent(pn->disp_ec, - [pn, peer_id = this->peer_id](TimerEvent &) { + p->ev_retry_timer = TimerEvent(this->disp_ec, [this, peer_id](TimerEvent &) { try { - pn->start_active_conn(peer_id); - } catch (...) { pn->disp_error_cb(std::current_exception()); } + start_active_conn(peer_id); + } catch (...) { this->disp_error_cb(std::current_exception()); } }); - p->ev_retry_timer.add(pn->gen_conn_timeout()); + p->ev_retry_timer.add(gen_conn_timeout()); } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) { +void PeerNetwork<O, _, __>::Peer::reset_conn(const conn_t &new_conn) { if (conn != new_conn) { if (conn) @@ -597,7 +598,8 @@ void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) { //SALTICIDAE_LOG_DEBUG("moving send buffer"); //new_conn->move_send_buffer(conn); SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str()); - conn->disp_terminate(); + auto net = conn->get_net(); + net->disp_terminate(conn); } addr = new_conn->get_addr(); conn = new_conn; @@ -656,7 +658,7 @@ bool PeerNetwork<O, _, __>::check_new_conn(const conn_t &conn, uint16_t port) { } else { - conn->disp_terminate(); + this->disp_terminate(conn); return true; } } @@ -665,7 +667,7 @@ bool PeerNetwork<O, _, __>::check_new_conn(const conn_t &conn, uint16_t port) { { if (conn != p->conn) { - conn->disp_terminate(); + this->disp_terminate(conn); return true; } return false; @@ -797,7 +799,7 @@ void PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) { auto it = id2peer.find(addr); if (it == id2peer.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - it->second->conn->disp_terminate(); + this->disp_terminate(it->second->conn); id2peer.erase(it); } catch (const PeerNetworkError &) { this->recoverable_error(std::current_exception()); @@ -899,8 +901,7 @@ void ClientNetwork<OpcodeType>::Conn::on_setup() { auto cn = get_net(); cn->addr2conn.erase(addr); cn->addr2conn.insert( - std::make_pair(addr, - static_pointer_cast<Conn>(this->self()))); + std::make_pair(addr, static_pointer_cast<Conn>(this->self()))); } template<typename OpcodeType> |