diff options
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r-- | include/salticidae/network.h | 217 |
1 files changed, 90 insertions, 127 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 18406ea..e5165bf 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -206,7 +206,6 @@ class ClientNetwork: public MsgNetwork<OpcodeType> { private: std::unordered_map<NetAddr, typename MsgNet::conn_t> addr2conn; - std::mutex cn_mlock; public: class Conn: public MsgNet::Conn { @@ -298,7 +297,6 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { bool ping_timer_ok; bool pong_msg_ok; bool connected; - std::mutex mlock; Peer() = delete; Peer(NetAddr addr, conn_t conn, const EventContext &ec): @@ -321,7 +319,6 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { }; std::unordered_map <NetAddr, BoxObj<Peer>> id2peer; - std::mutex pn_mlock; const IdentityMode id_mode; double retry_conn_delay; @@ -355,30 +352,11 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { } }; - struct PingCmd: public ConnPool::DispatchCmd { - conn_t conn; - uint16_t port; - PingCmd(const conn_t &conn, uint16_t port): - conn(conn), port(port) {} - void exec(ConnPool *cpool) override { - auto pn = static_cast<PeerNetwork *>(cpool); - pn->_ping_msg_cb(conn, port); - } - }; - - struct PongCmd: public PingCmd { - using PingCmd::PingCmd; - void exec(ConnPool *cpool) override { - auto pn = static_cast<PeerNetwork *>(cpool); - pn->_pong_msg_cb(this->conn, this->port); - } - }; - void msg_ping(MsgPing &&msg, Conn &conn); void msg_pong(MsgPong &&msg, Conn &conn); void _ping_msg_cb(const conn_t &conn, uint16_t port); void _pong_msg_cb(const conn_t &conn, uint16_t port); - bool check_new_conn(Conn &conn, uint16_t port); + bool check_new_conn(const conn_t &conn, uint16_t port); void start_active_conn(const NetAddr &paddr); protected: @@ -402,12 +380,14 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { id_mode(id_mode), retry_conn_delay(retry_conn_delay), ping_period(ping_period), - conn_timeout(conn_timeout) {} + conn_timeout(conn_timeout) { + this->reg_handler(generic_bind(&PeerNetwork::msg_ping, this, _1, _2)); + this->reg_handler(generic_bind(&PeerNetwork::msg_pong, this, _1, _2)); + } void add_peer(const NetAddr &paddr); const conn_t get_peer_conn(const NetAddr &paddr) const; - template<typename MsgType> - void _send_msg(const MsgType &msg, const Peer *peer); + using MsgNet::send_msg; template<typename MsgType> void send_msg(const MsgType &msg, const NetAddr &paddr); void listen(NetAddr listen_addr); @@ -471,7 +451,7 @@ void PeerNetwork<O, _, __>::Conn::on_setup() { MsgNet::Conn::on_setup(); auto pn = get_net(); assert(!ev_timeout); - ev_timeout = Event(pn->ec, -1, 0, [this](int, int) { + ev_timeout = Event(pn->dispatcher_ec, -1, 0, [this](int, int) { SALTICIDAE_LOG_INFO("peer ping-pong timeout"); this->terminate(); }); @@ -483,18 +463,16 @@ void PeerNetwork<O, _, __>::Conn::on_setup() { /* the initial ping-pong to set up the connection */ auto &conn = static_cast<Conn &>(*this); reset_timeout(pn->conn_timeout); - pn->MsgNet::send_msg(MsgPing(pn->listen_port), conn); + pn->send_msg(MsgPing(pn->listen_port), conn); } template<typename O, O _, O __> void PeerNetwork<O, _, __>::Conn::on_teardown() { MsgNet::Conn::on_teardown(); auto pn = get_net(); - mutex_lg_t _pn_lg(pn->pn_mlock); auto it = pn->id2peer.find(peer_id); if (it == pn->id2peer.end()) return; auto p = it->second.get(); - mutex_lg_t _p_lg(p->mlock); if (this != p->conn.get()) return; p->ev_ping_timer.del(); p->connected = false; @@ -505,7 +483,6 @@ void PeerNetwork<O, _, __>::Conn::on_teardown() { // try to reconnect p->ev_retry_timer = Event(pn->dispatcher_ec, -1, 0, [pn, peer_id = this->peer_id](int, int) { - mutex_lg_t _pn_lg(pn->pn_mlock); pn->start_active_conn(peer_id); }); p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout()); @@ -550,12 +527,11 @@ void PeerNetwork<O, _, __>::Peer::send_ping() { ping_timer_ok = false; pong_msg_ok = false; conn->reset_timeout(pn->conn_timeout); - pn->_send_msg(MsgPing(pn->listen_port), this); + pn->send_msg(MsgPing(pn->listen_port), *conn); } template<typename O, O _, O __> void PeerNetwork<O, _, __>::Peer::ping_timer(int, int) { - mutex_lg_t _p_lg(mlock); ping_timer_ok = true; if (pong_msg_ok) { @@ -565,149 +541,132 @@ void PeerNetwork<O, _, __>::Peer::ping_timer(int, int) { } template<typename O, O _, O __> -bool PeerNetwork<O, _, __>::check_new_conn(Conn &conn, uint16_t port) { - if (conn.peer_id.is_null()) +bool PeerNetwork<O, _, __>::check_new_conn(const conn_t &conn, uint16_t port) { + if (conn->peer_id.is_null()) { /* passive connections can eventually have ids after getting the port number in IP_BASED_PORT mode */ assert(id_mode == IP_PORT_BASED); - conn.peer_id.ip = conn.get_addr().ip; - conn.peer_id.port = port; + conn->peer_id.ip = conn->get_addr().ip; + conn->peer_id.port = port; } - auto p = id2peer.find(conn.peer_id)->second.get(); - mutex_lg_t _p_lg(p->mlock); + auto p = id2peer.find(conn->peer_id)->second.get(); if (p->connected) { - if (conn.self() != p->conn) + if (conn != p->conn) { - conn.terminate(); + conn->terminate(); return true; } return false; } - p->reset_conn(static_pointer_cast<Conn>(conn.self())); + p->reset_conn(conn); p->connected = true; p->reset_ping_timer(); p->send_ping(); if (p->connected) SALTICIDAE_LOG_INFO("PeerNetwork: established connection with %s via %s", - std::string(conn.peer_id).c_str(), std::string(conn).c_str()); + std::string(conn->peer_id).c_str(), std::string(*conn).c_str()); return false; } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::_ping_msg_cb(const conn_t &conn, uint16_t port) { - mutex_lg_t _pn_lg(pn_mlock); - if (check_new_conn(*conn, port)) return; - auto p = id2peer.find(conn->peer_id)->second.get(); - mutex_lg_t _p_lg(p->mlock); - _send_msg(MsgPong(this->listen_port), p); -} - -template<typename O, O _, O __> -void PeerNetwork<O, _, __>::_pong_msg_cb(const conn_t &conn, uint16_t port) { - mutex_lg_t _pn_lg(pn_mlock); - auto it = id2peer.find(conn->peer_id); - if (it == id2peer.end()) - { - SALTICIDAE_LOG_WARN("pong message discarded"); - return; - } - if (check_new_conn(*conn, port)) return; - auto p = it->second.get(); - mutex_lg_t _p_lg(p->mlock); - p->pong_msg_ok = true; - if (p->ping_timer_ok) - { - p->reset_ping_timer(); - p->send_ping(); - } -} - -/* end: functions invoked by the dispatcher */ - -/* this function could be both invoked by the dispatcher and the user loop */ -template<typename O, O _, O __> void PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) { auto p = id2peer.find(addr)->second.get(); - mutex_lg_t _p_lg(p->mlock); if (p->connected) return; - auto conn = static_pointer_cast<Conn>(MsgNet::connect(addr)); + auto conn = static_pointer_cast<Conn>(MsgNet::_connect(addr)); assert(p->conn == nullptr); p->conn = conn; } +/* end: functions invoked by the dispatcher */ /* begin: functions invoked by the user loop */ template<typename O, O _, O __> -void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, Conn &conn) { +void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, Conn &_conn) { + auto conn = static_pointer_cast<Conn>(_conn.self()); + if (!conn) return; uint16_t port = msg.port; - SALTICIDAE_LOG_INFO("ping from %s, port %u", std::string(conn).c_str(), ntohs(port)); - auto dcmd = new PingCmd(static_pointer_cast<Conn>(conn.self()), port); - write(this->dlisten_fd[1], &dcmd, sizeof(dcmd)); + this->disp_tcall->call([this, conn, port](ThreadCall::Handle &msg) { + SALTICIDAE_LOG_INFO("ping from %s, port %u", + std::string(*conn).c_str(), ntohs(port)); + if (check_new_conn(conn, port)) return; + auto p = id2peer.find(conn->peer_id)->second.get(); + send_msg(MsgPong(this->listen_port), *conn); + }); } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::msg_pong(MsgPong &&msg, Conn &conn) { - auto dcmd = new PongCmd(static_pointer_cast<Conn>(conn.self()), msg.port); - write(this->dlisten_fd[1], &dcmd, sizeof(dcmd)); +void PeerNetwork<O, _, __>::msg_pong(MsgPong &&msg, Conn &_conn) { + auto conn = static_pointer_cast<Conn>(_conn.self()); + if (!conn) return; + uint16_t port = msg.port; + this->disp_tcall->call([this, conn, port](ThreadCall::Handle &msg) { + auto it = id2peer.find(conn->peer_id); + if (it == id2peer.end()) + { + SALTICIDAE_LOG_WARN("pong message discarded"); + return; + } + if (check_new_conn(conn, port)) return; + auto p = it->second.get(); + p->pong_msg_ok = true; + if (p->ping_timer_ok) + { + p->reset_ping_timer(); + p->send_ping(); + } + }); } template<typename O, O _, O __> void PeerNetwork<O, _, __>::listen(NetAddr listen_addr) { - MsgNet::listen(listen_addr); - listen_port = listen_addr.port; - this->reg_handler(generic_bind(&PeerNetwork::msg_ping, this, _1, _2)); - this->reg_handler(generic_bind(&PeerNetwork::msg_pong, this, _1, _2)); + this->disp_tcall->call([this, listen_addr](ThreadCall::Handle &msg) { + MsgNet::_listen(listen_addr); + listen_port = listen_addr.port; + }, true); } template<typename O, O _, O __> void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) { - mutex_lg_t _pn_lg(pn_mlock); - auto it = id2peer.find(addr); - if (it != id2peer.end()) - throw PeerNetworkError("peer already exists"); - id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->dispatcher_ec))); - start_active_conn(addr); + this->disp_tcall->call([this, addr](ThreadCall::Handle &) { + auto it = id2peer.find(addr); + if (it != id2peer.end()) + throw PeerNetworkError("peer already exists"); + id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->dispatcher_ec))); + start_active_conn(addr); + }, true); } template<typename O, O _, O __> const typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &paddr) const { - mutex_lg_t _pn_lg(pn_mlock); - auto it = id2peer.find(paddr); - if (it == id2peer.end()) - throw PeerNetworkError("peer does not exist"); - return it->second->conn; + auto ret = static_cast<conn_t *>(this->disp_tcall->call( + [this, paddr](ThreadCall::Handle &h) { + auto it = id2peer.find(paddr); + if (it == id2peer.end()) + throw PeerNetworkError("peer does not exist"); + auto ptr = new conn_t(it->second->conn); + h.set_result(ptr); + })); + auto conn = *ret; + delete ret; + return std::move(conn); } template<typename O, O _, O __> bool PeerNetwork<O, _, __>::has_peer(const NetAddr &paddr) const { - mutex_lg_t _pn_lg(pn_mlock); - return id2peer.count(paddr); -} - -template<typename O, O _, O __> -template<typename MsgType> -void PeerNetwork<O, _, __>::_send_msg(const MsgType &msg, const Peer *peer) { - if (peer->connected) - MsgNet::send_msg(msg, *(peer->conn)); - else - SALTICIDAE_LOG_DEBUG("dropped"); + auto ret = static_cast<bool *>(this->disp_tcall->call( + [this, paddr](ThreadCall::Handle &h) { + h.set_result(id2peer.count(paddr)); + })); + auto has = *ret; + delete ret; + return has; } template<typename O, O _, O __> template<typename MsgType> void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &addr) { - mutex_lg_t _pn_lg(pn_mlock); - auto it = id2peer.find(addr); - if (it == id2peer.end()) - { - SALTICIDAE_LOG_ERROR("sending to non-existing peer: %s", - std::string(addr).c_str()); - throw PeerNetworkError("peer does not exist"); - } - auto p = it->second.get(); - mutex_lg_t _p_lg(p->mlock); - _send_msg(msg, p); + send_msg(msg, *get_peer_conn(addr)); } /* end: functions invoked by the user loop */ @@ -716,7 +675,6 @@ void ClientNetwork<OpcodeType>::Conn::on_setup() { MsgNet::Conn::on_setup(); assert(this->get_mode() == Conn::PASSIVE); const auto &addr = this->get_addr(); - mutex_lg_t _cn_lg(cn_mlock); auto cn = get_net(); cn->addr2conn.erase(addr); cn->addr2conn.insert( @@ -728,17 +686,22 @@ template<typename OpcodeType> void ClientNetwork<OpcodeType>::Conn::on_teardown() { MsgNet::Conn::on_teardown(); assert(this->get_mode() == Conn::PASSIVE); - mutex_lg_t _cn_lg(cn_mlock); get_net()->addr2conn.erase(this->get_addr()); } template<typename OpcodeType> template<typename MsgType> void ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr) { - mutex_lg_t _cn_lg(cn_mlock); - auto it = addr2conn.find(addr); - if (it == addr2conn.end()) return; - MsgNet::send_msg(msg, *(it->second)); + auto ret = static_cast<conn_t *>(this->disp_tcall->call( + [this, addr](ThreadCall::Handle &h) { + auto it = addr2conn.find(addr); + if (it == addr2conn.end()) + throw PeerNetworkError("client does not exist"); + auto ptr = new conn_t(it->second->conn); + h.set_result(ptr); + })); + send_msg(msg, **ret); + delete ret; } template<typename O, O OPCODE_PING, O _> |