diff options
-rw-r--r-- | include/salticidae/network.h | 84 | ||||
-rw-r--r-- | include/salticidae/util.h | 1 | ||||
-rw-r--r-- | src/util.cpp | 1 | ||||
-rw-r--r-- | test/test_p2p_stress.cpp | 1 |
4 files changed, 64 insertions, 23 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h index d3f3bae..e28e3df 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -275,7 +275,6 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { friend PeerNetwork; Peer *peer; TimerEvent ev_timeout; - TimerEvent ev_retry_timer; void reset_timeout(double timeout); @@ -320,7 +319,9 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { double ping_period; Peer() = delete; - Peer(const uint256_t &peer_id, conn_t conn, conn_t inbound_conn, conn_t outbound_conn, const PeerNetwork *pn): + Peer(const uint256_t &peer_id, + conn_t conn, conn_t inbound_conn, conn_t outbound_conn, + const PeerNetwork *pn): peer_id(peer_id), conn(conn), inbound_conn(inbound_conn), @@ -346,7 +347,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { }; std::unordered_map<NetAddr, conn_t> pending_peers; - std::unordered_map<NetAddr, uint256_t> known_peers; + std::unordered_map<NetAddr, std::pair<uint256_t, TimerEvent>> known_peers; std::unordered_map<uint256_t, BoxObj<Peer>> pid2peer; peer_callback_t peer_cb; unknown_peer_callback_t unknown_peer_cb; @@ -389,6 +390,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { void _ping_msg_cb(const conn_t &conn, uint16_t port); void _pong_msg_cb(const conn_t &conn, uint16_t port); bool check_handshake(Peer *peer); + void replace_conn(const conn_t &conn); void start_active_conn(const NetAddr &addr); static void tcall_reset_timeout(ConnPool::Worker *worker, const conn_t &conn, double timeout); @@ -474,6 +476,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { void add_peer(const NetAddr &addr); void del_peer(const NetAddr &addr); bool has_peer(const NetAddr &addr) const; + size_t get_npending() const; conn_t get_peer_conn(const NetAddr &addr) const; using MsgNet::send_msg; template<typename MsgType> @@ -502,7 +505,7 @@ void MsgNetwork<OpcodeType>::on_read(const ConnPool::conn_t &_conn) { auto &recv_buffer = conn->recv_buffer; auto &msg = conn->msg; auto &msg_state = conn->msg_state; - while (true) //(!conn->is_terminated()) + while (true) { if (msg_state == Conn::HEADER) { @@ -586,19 +589,18 @@ void PeerNetwork<O, _, __>::on_setup(const ConnPool::conn_t &_conn) { auto conn = static_pointer_cast<Conn>(_conn); auto worker = conn->worker; auto &ev_timeout = conn->ev_timeout; - conn->ev_retry_timer.del(); assert(!ev_timeout); ev_timeout = TimerEvent(worker->get_ec(), [listen_addr=this->listen_addr, worker, conn](TimerEvent &) { try { SALTICIDAE_LOG_INFO("peer ping-pong timeout %s <-> %s", std::string(listen_addr).c_str(), std::string(conn->get_peer_addr()).c_str()); - //conn->get_net()->worker_terminate(conn); + conn->get_net()->worker_terminate(conn); } catch (...) { worker->error_callback(std::current_exception()); } }); /* the initial ping-pong to set up the connection */ tcall_reset_timeout(worker, conn, conn_timeout); - pending_peers[conn->get_addr()] = conn; + replace_conn(conn); if (conn->get_mode() == Conn::ConnMode::ACTIVE) send_msg(MsgPing(listen_addr, my_nonce), conn); } @@ -608,7 +610,6 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { MsgNet::on_teardown(_conn); auto conn = static_pointer_cast<Conn>(_conn); auto addr = conn->get_addr(); - conn->ev_retry_timer.clear(); conn->ev_timeout.clear(); pending_peers.erase(addr); SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*conn).c_str()); @@ -621,7 +622,6 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { }); auto it = known_peers.find(addr); if (it == known_peers.end()) return; - pending_peers[addr] = conn; if (p) { if (conn != p->conn) return; @@ -631,20 +631,19 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { p->connected = false; p->outbound_handshake = false; p->inbound_handshake = false; - known_peers[p->peer_addr] = uint256_t(); + known_peers[p->peer_addr] = std::make_pair(uint256_t(), TimerEvent()); pid2peer.erase(p->peer_id); this->user_tcall->async_call([this, conn](ThreadCall::Handle &) { if (peer_cb) peer_cb(conn, false); }); - conn->ev_retry_timer = std::move(retry_timer); - conn->ev_retry_timer.add(gen_conn_timeout()); } else { - if (!it->second.is_null()) return; - conn->ev_retry_timer = std::move(retry_timer); - conn->ev_retry_timer.add(gen_conn_timeout()); + if (!it->second.first.is_null()) return; } + auto &ev_retry_timer = it->second.second; + ev_retry_timer = std::move(retry_timer); + ev_retry_timer.add(gen_conn_timeout()); } template<typename O, O _, O __> @@ -687,7 +686,8 @@ bool PeerNetwork<O, _, __>::check_handshake(Peer *p) { p->connected = true; p->reset_ping_timer(); p->send_ping(); - known_peers[p->peer_addr] = p->peer_id; + known_peers[p->peer_addr] = std::make_pair(p->peer_id, TimerEvent()); + pending_peers.erase(p->conn->get_addr()); if (p->connected) { auto color_begin = ""; @@ -711,9 +711,26 @@ bool PeerNetwork<O, _, __>::check_handshake(Peer *p) { } template<typename O, O _, O __> +void PeerNetwork<O, _, __>::replace_conn(const conn_t &conn) { + const auto &addr = conn->get_addr(); + auto it = pending_peers.find(addr); + if (it != pending_peers.end()) + { + auto &old_conn = it->second; + if (old_conn != conn) + { + if (old_conn->get_mode() != Conn::ConnMode::DEAD) + this->disp_terminate(old_conn); + pending_peers.erase(it); + } + } + pending_peers.insert(std::make_pair(addr, conn)); +} + +template<typename O, O _, O __> void PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) { auto conn = static_pointer_cast<Conn>(MsgNet::_connect(addr)); - pending_peers[addr] = conn; + replace_conn(conn); } template<typename O, O _, O __> @@ -721,7 +738,10 @@ inline typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::_get_peer_c auto it = known_peers.find(addr); if (it == known_peers.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - auto it2 = pid2peer.find(it->second); + auto &peer_id = it->second.first; + if (peer_id.is_null()) + throw PeerNetworkError(SALTI_ERROR_PEER_NOT_READY); + auto it2 = pid2peer.find(peer_id); assert(it2 != pid2peer.end()); return it2->second->conn; } @@ -849,7 +869,16 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) { } auto p = it->second.get(); p->outbound_handshake = true; - p->peer_addr = conn->get_addr(); + auto &peer_addr = conn->get_addr(); + auto &old_peer_addr = p->peer_addr; + if (!old_peer_addr.is_null() && old_peer_addr != peer_addr) + { + SALTICIDAE_LOG_WARN("multiple peer addresses share the same identity"); + known_peers.erase(old_peer_addr); + if (p->conn && p->conn->get_mode() != Conn::ConnMode::DEAD) + this->disp_terminate(p->conn); + } + old_peer_addr = peer_addr; p->reset_ping_timer(); check_handshake(p); } @@ -900,7 +929,8 @@ template<typename O, O _, O __> void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) { this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) { try { - if (!known_peers.insert(std::make_pair(addr, uint256_t())).second) + if (!known_peers.insert(std::make_pair(addr, + std::make_pair(uint256_t(), TimerEvent()))).second) throw PeerNetworkError(SALTI_ERROR_PEER_ALREADY_EXISTS); if (!pending_peers.count(addr)) start_active_conn(addr); @@ -917,7 +947,7 @@ void PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) { auto it = known_peers.find(addr); if (it == known_peers.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - auto peer_id = it->second; + auto peer_id = it->second.first; known_peers.erase(it); auto it2 = pending_peers.find(addr); if (it2 != pending_peers.end()) @@ -950,12 +980,12 @@ PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &addr) const { auto it = known_peers.find(addr); if (it == known_peers.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - if (it->second.is_null()) + if (it->second.first.is_null()) { conn = nullptr; return; } - auto it2 = pid2peer.find(it->second); + auto it2 = pid2peer.find(it->second.first); assert(it2 != pid2peer.end()); conn = it2->second->conn; } catch (const PeerNetworkError &) { @@ -978,6 +1008,14 @@ bool PeerNetwork<O, _, __>::has_peer(const NetAddr &addr) const { } template<typename O, O _, O __> +size_t PeerNetwork<O, _, __>::get_npending() const { + return *(static_cast<bool *>(this->disp_tcall->call( + [this](ThreadCall::Handle &h) { + h.set_result(pending_peers.size()); + }).get())); +} + +template<typename O, O _, O __> template<typename MsgType> inline void PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) { return _send_msg_deferred(std::move(msg), addr); diff --git a/include/salticidae/util.h b/include/salticidae/util.h index 5059f13..063058c 100644 --- a/include/salticidae/util.h +++ b/include/salticidae/util.h @@ -85,6 +85,7 @@ enum SalticidaeErrorCode { SALTI_ERROR_CONNECT, SALTI_ERROR_PEER_ALREADY_EXISTS, SALTI_ERROR_PEER_NOT_EXIST, + SALTI_ERROR_PEER_NOT_READY, SALTI_ERROR_NETADDR_INVALID, SALTI_ERROR_OPTVAL_INVALID, SALTI_ERROR_OPTNAME_ALREADY_EXISTS, diff --git a/src/util.cpp b/src/util.cpp index b1f60db..ff41c9b 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -42,6 +42,7 @@ const char *SALTICIDAE_ERROR_STRINGS[] = { "unable to connect", "peer already exists", "peer does not exist", + "peer is not ready", "invalid NetAddr format", "invalid OptVal format", "option name already exists", diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp index 1eb4a0d..ca4fcda 100644 --- a/test/test_p2p_stress.cpp +++ b/test/test_p2p_stress.cpp @@ -148,6 +148,7 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) { for (const auto &p: app.tc) s += salticidae::stringprintf(" %d(%d)", ntohs(p.first.port), p.second.ncompleted); SALTICIDAE_LOG_INFO("%d completed:%s", ntohs(app.addr.port), s.c_str()); + SALTICIDAE_LOG_INFO("%d npending: %lu", ntohs(app.addr.port), net.get_npending()); }); double t = salticidae::gen_rand_timeout(10); tc.timer.add(t); |