diff options
-rw-r--r-- | include/salticidae/network.h | 81 | ||||
-rw-r--r-- | src/network.cpp | 24 | ||||
-rw-r--r-- | test/test_p2p_min.cpp | 9 | ||||
-rw-r--r-- | test/test_p2p_stress.cpp | 8 |
4 files changed, 74 insertions, 48 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 0db69e4..fb55f83 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -388,7 +388,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { struct Peer { PeerId id; NetAddr addr; /** remote address (if set) */ - uint32_t my_nonce; + uint32_t nonce; conn_t conn; double retry_delay; @@ -403,22 +403,21 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { TimerEvent ev_ping_timer; bool ping_timer_ok; bool pong_msg_ok; + double ping_period; enum State { DISCONNECTED, CONNECTED, RESET } state; - double ping_period; - Peer(const PeerId &pid, const PeerNetwork *pn): id(pid), retry_delay(0), ntry(0), ev_ping_timer( TimerEvent(pn->disp_ec, std::bind(&Peer::ping_timer, this, _1))), - state(DISCONNECTED), - ping_period(pn->ping_period) {} + ping_period(pn->ping_period), + state(DISCONNECTED) {} Peer &operator=(const Peer &) = delete; Peer(const Peer &) = delete; @@ -431,14 +430,14 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { ev_ping_timer.del(); } uint32_t get_nonce() { - if (my_nonce == 0) + if (nonce == 0) { uint16_t n; if (!RAND_bytes((uint8_t *)&n, 2)) throw PeerNetworkError(SALTI_ERROR_RAND_SOURCE); - my_nonce = n + 1; + nonce = n + 1; } - return my_nonce; + return nonce; } public: @@ -454,7 +453,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { std::unordered_map<PeerId, BoxObj<Peer>> known_peers; using pinfo_slock_t = std::shared_lock<std::shared_timed_mutex>; - using pinfo_ulock_t = std::shared_lock<std::shared_timed_mutex>; + using pinfo_ulock_t = std::unique_lock<std::shared_timed_mutex>; mutable std::shared_timed_mutex known_peers_lock; @@ -500,7 +499,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { void _pong_msg_cb(const conn_t &conn, uint16_t port); void finish_handshake(Peer *peer); void replace_pending_conn(const conn_t &conn); - conn_t start_active_conn(const NetAddr &addr); + void start_active_conn(Peer *peer); static void tcall_reset_timeout(ConnPool::Worker *worker, const conn_t &conn, double timeout); inline conn_t _get_peer_conn(const PeerId &peer) const; @@ -519,7 +518,6 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { } public: - class Config: public MsgNet::Config { friend PeerNetwork; double _ping_period; @@ -574,13 +572,15 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { /* register a peer as known */ int32_t add_peer(const PeerId &peer); + /* unregister the peer */ + int32_t del_peer(const PeerId &peer); /* set the peer's public IP */ int32_t set_peer_addr(const PeerId &peer, const NetAddr &addr); /* try to connect to the peer: once (ntry = 1), indefinitely (ntry = -1), give up retry (ntry = 0) */ int32_t conn_peer(const PeerId &peer, ssize_t ntry = -1, double retry_delay = 2); - /* unregister the peer */ - int32_t del_peer(const PeerId &peer); + /* check if a peer is registered */ bool has_peer(const PeerId &peer) const; + size_t get_npending() const; conn_t get_peer_conn(const PeerId &addr) const; using MsgNet::send_msg; @@ -662,7 +662,7 @@ void MsgNetwork<OpcodeType>::on_read(const ConnPool::conn_t &_conn) { template<typename OpcodeType> template<typename MsgType> inline int32_t MsgNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const conn_t &conn) { - return _send_msg_deferred(std::move(msg), conn); + return _send_msg_deferred(Msg(std::move(msg), msg_magic), conn); } template<typename OpcodeType> @@ -757,7 +757,7 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { p->inbound_conn = nullptr; p->outbound_conn = nullptr; p->ev_ping_timer.del(); - p->my_nonce = 0; + p->nonce = 0; this->user_tcall->async_call([this, conn](ThreadCall::Handle &) { if (peer_cb) peer_cb(conn, false); }); @@ -768,7 +768,8 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { { p->ev_retry_timer = TimerEvent(this->disp_ec, [this, addr=p->addr, p](TimerEvent &) { try { - start_active_conn(addr)->peer = p; + start_active_conn(p); + p->ev_retry_timer.add(gen_rand_timeout(p->retry_delay)); } catch (...) { this->disp_error_cb(std::current_exception()); } }); p->ev_retry_timer.add(reset ? 0 : gen_rand_timeout(p->retry_delay)); @@ -865,10 +866,11 @@ void PeerNetwork<O, _, __>::replace_pending_conn(const conn_t &conn) { } template<typename O, O _, O __> -typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) { - auto conn = static_pointer_cast<Conn>(MsgNet::_connect(addr)); +void PeerNetwork<O, _, __>::start_active_conn(Peer *p) { + assert(!p->addr.is_null()); + auto conn = static_pointer_cast<Conn>(MsgNet::_connect(p->addr)); + p->outbound_conn = conn; replace_pending_conn(conn); - return conn; } template<typename O, O _, O __> @@ -911,7 +913,7 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) { listen_addr, p->addr.is_null() ? passive_nonce : p->get_nonce()), conn); auto &old_conn = p->inbound_conn; - if (old_conn && !old_conn->is_terminated()) + if (old_conn && old_conn != conn) { SALTICIDAE_LOG_DEBUG("%s terminating stale handshake connection %s", std::string(listen_addr).c_str(), @@ -955,7 +957,7 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) { if (conn->get_mode() == Conn::ConnMode::ACTIVE) { auto pid = get_peer_id(conn, conn->get_addr()); - pinfo_ulock_t _g(known_peers_lock); + pinfo_slock_t _g(known_peers_lock); auto pit = known_peers.find(pid); if (pit == known_peers.end()) { @@ -971,7 +973,7 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) { std::string(listen_addr).c_str(), std::string(*conn).c_str()); auto &old_conn = p->outbound_conn; - if (old_conn && !old_conn->is_terminated()) + if (old_conn && old_conn != conn) { SALTICIDAE_LOG_DEBUG("%s terminating stale handshake connection %s", std::string(listen_addr).c_str(), @@ -992,7 +994,7 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) { SALTICIDAE_LOG_DEBUG( "%04x >= %04x, terminating and resetting", p->get_nonce(), msg.nonce); - p->my_nonce = 0; + p->nonce = 0; this->disp_terminate(conn); } } @@ -1048,7 +1050,7 @@ int32_t PeerNetwork<O, _, __>::conn_peer(const PeerId &pid, ssize_t ntry, double auto id = this->gen_async_id(); this->disp_tcall->async_call([this, pid, ntry, retry_delay, id](ThreadCall::Handle &) { try { - pinfo_ulock_t _g(known_peers_lock); + pinfo_slock_t _g(known_peers_lock); auto it = known_peers.find(pid); if (it == known_peers.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); @@ -1060,11 +1062,11 @@ int32_t PeerNetwork<O, _, __>::conn_peer(const PeerId &pid, ssize_t ntry, double p->inbound_conn = nullptr; p->outbound_conn = nullptr; p->ev_ping_timer.del(); - p->my_nonce = 0; + p->nonce = 0; /* has to terminate established connection *before* making the next * attempt */ if (!p->conn || p->state == Peer::State::DISCONNECTED) - start_active_conn(p->addr); + start_active_conn(p.get()); else if (p->state == Peer::State::CONNECTED) { p->state = Peer::State::RESET; @@ -1082,7 +1084,7 @@ int32_t PeerNetwork<O, _, __>::set_peer_addr(const PeerId &pid, const NetAddr &a auto id = this->gen_async_id(); this->disp_tcall->async_call([this, pid, addr, id](ThreadCall::Handle &) { try { - pinfo_ulock_t _g(known_peers_lock); + pinfo_slock_t _g(known_peers_lock); auto it = known_peers.find(pid); if (it == known_peers.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); @@ -1110,8 +1112,9 @@ int32_t PeerNetwork<O, _, __>::del_peer(const PeerId &pid) { auto it2 = pending_peers.find(addr); if (it2 != pending_peers.end()) { - if (!it2->second->peer) - this->disp_terminate(it2->second); + auto &conn = it2->second; + if (!conn->peer) + this->disp_terminate(conn); pending_peers.erase(it2); } } catch (const PeerNetworkError &) { @@ -1156,7 +1159,7 @@ size_t PeerNetwork<O, _, __>::get_npending() const { template<typename O, O _, O __> template<typename MsgType> inline int32_t PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const PeerId &pid) { - return _send_msg_deferred(std::move(msg), pid); + return _send_msg_deferred(Msg(std::move(msg), this->msg_magic), pid); } template<typename O, O _, O __> @@ -1187,7 +1190,7 @@ inline bool PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const PeerId &pid) template<typename O, O _, O __> template<typename MsgType> inline int32_t PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<PeerId> &pids) { - return _multicast_msg(MsgType(std::move(msg), this->msg_magic), pids); + return _multicast_msg(Msg(std::move(msg), this->msg_magic), pids); } template<typename O, O _, O __> @@ -1196,6 +1199,7 @@ inline int32_t PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vecto this->disp_tcall->async_call( [this, msg=std::move(msg), pids, id](ThreadCall::Handle &) { try { + pinfo_slock_t _g(known_peers_lock); bool succ = true; for (auto &pid: pids) succ &= MsgNet::_send_msg(msg, _get_peer_conn(pid)); @@ -1228,7 +1232,7 @@ void ClientNetwork<OpcodeType>::on_teardown(const ConnPool::conn_t &_conn) { template<typename OpcodeType> template<typename MsgType> inline int32_t ClientNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) { - return _send_msg_deferred(std::move(msg), addr); + return _send_msg_deferred(Msg(std::move(msg), this->msg_magic), addr); } template<typename OpcodeType> @@ -1246,7 +1250,7 @@ inline int32_t ClientNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const Ne template<typename OpcodeType> template<typename MsgType> inline bool ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr) { - return _send_msg(msg, addr); + return _send_msg(Msg(msg, this->msg_magic), addr); } template<typename OpcodeType> @@ -1368,7 +1372,6 @@ bool msgnetwork_conn_is_terminated(const msgnetwork_conn_t *conn); /* PeerNetwork */ -//peerid_t *peerid_new(); void peerid_free(const peerid_t *self); peerid_t *peerid_new_from_netaddr(const netaddr_t *addr); peerid_t *peerid_new_from_x509(const x509_t *cert); @@ -1385,10 +1388,12 @@ msgnetwork_config_t *peernetwork_config_as_msgnetwork_config(peernetwork_config_ peernetwork_t *peernetwork_new(const eventcontext_t *ec, const peernetwork_config_t *config, SalticidaeCError *err); void peernetwork_free(const peernetwork_t *self); -int32_t peernetwork_add_peer(peernetwork_t *self, const peerid_t *pid); -int32_t peernetwork_del_peer(peernetwork_t *self, const peerid_t *pid); -bool peernetwork_has_peer(const peernetwork_t *self, const peerid_t *pid); -const peernetwork_conn_t *peernetwork_get_peer_conn(const peernetwork_t *self, const peerid_t *pid, SalticidaeCError *cerror); +int32_t peernetwork_add_peer(peernetwork_t *self, const peerid_t *peer); +int32_t peernetwork_del_peer(peernetwork_t *self, const peerid_t *peer); +int32_t peernetwork_conn_peer(peernetwork_t *self, const peerid_t *peer, ssize_t ntry, double retry_delay); +bool peernetwork_has_peer(const peernetwork_t *self, const peerid_t *peer); +const peernetwork_conn_t *peernetwork_get_peer_conn(const peernetwork_t *self, const peerid_t *peer, SalticidaeCError *cerror); +int32_t peernetwork_set_peer_addr(peernetwork_t *self, const peerid_t *peer, const netaddr_t *addr); msgnetwork_t *peernetwork_as_msgnetwork(peernetwork_t *self); peernetwork_t *msgnetwork_as_peernetwork_unsafe(msgnetwork_t *self); msgnetwork_conn_t *msgnetwork_conn_new_from_peernetwork_conn(const peernetwork_conn_t *conn); diff --git a/src/network.cpp b/src/network.cpp index 286c3ef..4182a41 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -227,27 +227,35 @@ peernetwork_t *peernetwork_new(const eventcontext_t *ec, const peernetwork_confi void peernetwork_free(const peernetwork_t *self) { delete self; } -int32_t peernetwork_add_peer(peernetwork_t *self, const peerid_t *pid) { - return self->add_peer(*pid); +int32_t peernetwork_add_peer(peernetwork_t *self, const peerid_t *peer) { + return self->add_peer(*peer); } -int32_t peernetwork_del_peer(peernetwork_t *self, const peerid_t *pid) { - return self->del_peer(*pid); +int32_t peernetwork_del_peer(peernetwork_t *self, const peerid_t *peer) { + return self->del_peer(*peer); } -bool peernetwork_has_peer(const peernetwork_t *self, const peerid_t *pid) { - return self->has_peer(*pid); +int32_t peernetwork_conn_peer(peernetwork_t *self, const peerid_t *peer, ssize_t ntry, double retry_delay) { + return self->conn_peer(*peer, ntry, retry_delay); +} + +bool peernetwork_has_peer(const peernetwork_t *self, const peerid_t *peer) { + return self->has_peer(*peer); } const peernetwork_conn_t *peernetwork_get_peer_conn(const peernetwork_t *self, - const peerid_t *pid, + const peerid_t *peer, SalticidaeCError *cerror) { SALTICIDAE_CERROR_TRY(cerror) - return new peernetwork_conn_t(self->get_peer_conn(*pid)); + return new peernetwork_conn_t(self->get_peer_conn(*peer)); SALTICIDAE_CERROR_CATCH(cerror) return nullptr; } +int32_t peernetwork_set_peer_addr(peernetwork_t *self, const peerid_t *peer, const netaddr_t *addr) { + return self->set_peer_addr(*peer, *addr); +} + msgnetwork_t *peernetwork_as_msgnetwork(peernetwork_t *self) { return self; } peernetwork_t *msgnetwork_as_peernetwork_unsafe(msgnetwork_t *self) { diff --git a/test/test_p2p_min.cpp b/test/test_p2p_min.cpp index a221d79..bc62eda 100644 --- a/test/test_p2p_min.cpp +++ b/test/test_p2p_min.cpp @@ -43,7 +43,14 @@ int main() { for (size_t i = 0; i < nodes.size(); i++) for (size_t j = 0; j < nodes.size(); j++) if (i != j) - nodes[i].second->add_peer(nodes[j].first); + { + auto &node = nodes[i].second; + auto &peer_addr = nodes[j].first; + salticidae::PeerId pid{peer_addr}; + node->add_peer(pid); + node->set_peer_addr(pid, peer_addr); + node->conn_peer(pid); + } ec.dispatch(); return 0; } diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp index dca9cf4..f5a0b5d 100644 --- a/test/test_p2p_stress.cpp +++ b/test/test_p2p_stress.cpp @@ -238,7 +238,13 @@ int main(int argc, char **argv) { masksigs(); a.net->listen(a.addr); for (auto &paddr: addrs) - if (paddr != a.addr) a.net->add_peer(paddr); + if (paddr != a.addr) + { + salticidae::PeerId pid{paddr}; + a.net->add_peer(pid); + a.net->set_peer_addr(pid, paddr); + a.net->conn_peer(pid); + } a.ec.dispatch();})); EventContext ec; |