diff options
-rw-r--r-- | include/salticidae/network.h | 388 |
1 files changed, 128 insertions, 260 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 152ca60..37ecc75 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -309,23 +309,22 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { }; private: - struct PeerConn; + struct Peer; public: class Conn: public MsgNet::Conn { friend PeerNetwork; - PeerConn *peer; - BoxObj<PeerConn> _dead_peer; + Peer *peer; TimerEvent ev_timeout; void reset_timeout(double timeout); public: - Conn(): MsgNet::Conn(), peer(nullptr), _dead_peer(nullptr) {} + Conn(): MsgNet::Conn(), peer(nullptr) {} NetAddr get_peer_addr() { auto ret = *(static_cast<NetAddr *>( get_net()->disp_tcall->call([this](ThreadCall::Handle &h) { - h.set_result(peer ? NetAddr(peer->peer_addr) : NetAddr()); + h.set_result(peer ? NetAddr(peer->addr) : NetAddr()); }).get())); return ret; } @@ -349,22 +348,26 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { PeerId(const NetAddr &addr) { DataStream tmp; tmp << addr; - *this = tmp.get_hash(); + *(static_cast<uint256_t *>(this)) = tmp.get_hash(); } PeerId(const X509 &cert) { DataStream tmp; tmp << cert.get_der(); - *this = tmp.get_hash(); + *(static_cast<uint256_t *>(this)) = tmp.get_hash(); } }; private: - class PeerConn { - friend PeerNetwork; - /** connection addr, may be different due to passive mode */ - uint256_t session_id; - PeerId peer_id; + + struct Peer { + PeerId id; + NetAddr addr; /** remote address (if set) */ + + double retry_delay; + ssize_t ntry; + TimerEvent ev_retry_timer; + /** the underlying connection, may be invalid when connected = false */ conn_t conn; conn_t inbound_conn; @@ -374,27 +377,21 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { bool ping_timer_ok; bool pong_msg_ok; bool connected; - bool outbound_handshake; - bool inbound_handshake; double ping_period; - PeerConn() = delete; - PeerConn(const uint256_t &session_id, const PeerId &peer_id, - conn_t conn, conn_t inbound_conn, conn_t outbound_conn, - const PeerNetwork *pn): - session_id(session_id), - conn(conn), - inbound_conn(inbound_conn), - outbound_conn(outbound_conn), - ev_ping_timer( - TimerEvent(pn->disp_ec, std::bind(&PeerConn::ping_timer, this, _1))), - connected(false), - outbound_handshake(false), - inbound_handshake(false), - ping_period(pn->ping_period) {} - PeerConn &operator=(const PeerConn &) = delete; - PeerConn(const PeerConn &) = delete; + Peer(const PeerId &pid, const PeerNetwork *pn): + id(pid), + retry_delay(0), ntry(0), + ev_ping_timer( + TimerEvent(pn->disp_ec, std::bind(&Peer::ping_timer, this, _1))), + connected(false), + ping_period(pn->ping_period) {} + + Peer &operator=(const Peer &) = delete; + Peer(const Peer &) = delete; + + void update_conn(const conn_t &conn); void reset_ping_timer(); void send_ping(); void ping_timer(TimerEvent &); @@ -403,38 +400,21 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { ev_ping_timer.del(); } public: - ~PeerConn() { + ~Peer() { if (inbound_conn) inbound_conn->peer = nullptr; if (outbound_conn) outbound_conn->peer = nullptr; } }; - struct Peer { - uint256_t session_id; - NetAddr addr; - double retry_delay; - ssize_t ntry; - TimerEvent ev_retry_timer; - conn_t peer_conn; - - Peer(): - session_id(), - retry_delay(0), ntry(0), - ev_retry_timer() {} - }; - /* connections whose PeerId is unknown */ std::unordered_map<NetAddr, conn_t> pending_peers; /* registered peers */ std::unordered_map<PeerId, BoxObj<Peer>> known_peers; - /* peer connection deduplication map */ - std::unordered_map<uint256_t, BoxObj<PeerConn>> sid2peer; using pinfo_slock_t = std::shared_lock<std::shared_timed_mutex>; using pinfo_ulock_t = std::shared_lock<std::shared_timed_mutex>; mutable std::shared_timed_mutex known_peers_lock; - mutable std::shared_timed_mutex sid2peer_lock; peer_callback_t peer_cb; unknown_peer_callback_t unknown_peer_cb; @@ -444,7 +424,6 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { double conn_timeout; NetAddr listen_addr; bool allow_unknown_peer; - bool force_two_way_handshake; uint256_t my_nonce; struct MsgPing { @@ -476,8 +455,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { void pong_handler(MsgPong &&msg, const conn_t &conn); void _ping_msg_cb(const conn_t &conn, uint16_t port); void _pong_msg_cb(const conn_t &conn, uint16_t port); - void check_handshake(PeerConn *peer); - void move_peer_buffer(conn_t &old_conn, const conn_t &new_conn); + void finish_handshake(Peer *peer); void replace_conn(const conn_t &conn); conn_t start_active_conn(const NetAddr &addr); static void tcall_reset_timeout(ConnPool::Worker *worker, @@ -489,15 +467,6 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { void on_setup(const ConnPool::conn_t &) override; void on_teardown(const ConnPool::conn_t &) override; - uint256_t get_session_id(const conn_t &conn, const NetAddr &claimed_addr, const uint256_t &nonce) { - DataStream tmp; - if (!this->enable_tls || id_mode == ADDR_BASED) - tmp << nonce << claimed_addr; - else - tmp << conn->get_peer_cert()->get_der(); - return tmp.get_hash(); - } - PeerId get_peer_id(const conn_t &conn, const NetAddr &addr) { DataStream tmp; if (!this->enable_tls || id_mode == ADDR_BASED) @@ -513,7 +482,6 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { double _ping_period; double _conn_timeout; bool _allow_unknown_peer; - bool _force_two_way_handshake; IdentityMode _id_mode; public: @@ -524,7 +492,6 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { _ping_period(30), _conn_timeout(180), _allow_unknown_peer(false), - _force_two_way_handshake(false), _id_mode(CERT_BASED) {} @@ -547,11 +514,6 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { _allow_unknown_peer = x; return *this; } - - Config &force_two_way_handshake(bool x) { - _force_two_way_handshake = x; - return *this; - } }; PeerNetwork(const EventContext &ec, const Config &config): @@ -559,8 +521,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { id_mode(config._id_mode), ping_period(config._ping_period), conn_timeout(config._conn_timeout), - allow_unknown_peer(config._allow_unknown_peer), - force_two_way_handshake(config._force_two_way_handshake) { + allow_unknown_peer(config._allow_unknown_peer) { this->reg_handler(generic_bind(&PeerNetwork::ping_handler, this, _1, _2)); this->reg_handler(generic_bind(&PeerNetwork::pong_handler, this, _1, _2)); } @@ -737,70 +698,58 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { pending_peers.erase(addr); SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*conn).c_str()); auto p = conn->peer; - auto pid = p ? p->peer_id : PeerId(); - pinfo_ulock_t _g(known_peers_lock); - auto it = known_peers.find(pid); - if (it == known_peers.end()) return; - auto &pp = it->second; - if (conn == pp->peer_conn) + if (!p) return; + assert(conn == p->conn); + p->inbound_conn = nullptr; + p->outbound_conn = nullptr; + p->ev_ping_timer.del(); + p->connected = false; + p->conn = nullptr; + this->user_tcall->async_call([this, conn](ThreadCall::Handle &) { + if (peer_cb) peer_cb(conn, false); + }); + /* auto retry the connection */ + if (p->ntry > 0) p->ntry--; + if (p->ntry) { - pinfo_ulock_t __g(sid2peer_lock); - auto it2 = sid2peer.find(pp->session_id); - if (it2 != sid2peer.end()) - { - auto &p = it2->second; - if (p->connected && p->conn != conn) - move_peer_buffer(pp->peer_conn, p->conn); - } + p->ev_retry_timer = TimerEvent(this->disp_ec, [this, addr](TimerEvent &) { + try { + start_active_conn(addr); + } catch (...) { this->disp_error_cb(std::current_exception()); } + }); + p->ev_retry_timer.add(gen_rand_timeout(p->retry_delay)); } - if (p) +} + +template<typename O, O _, O __> +void PeerNetwork<O, _, __>::Peer::update_conn(const conn_t &new_conn) { + if (conn != new_conn) { - if (conn != p->conn) return; - p->inbound_conn = nullptr; - p->outbound_conn = nullptr; - p->ev_ping_timer.del(); - p->connected = false; - p->outbound_handshake = false; - p->inbound_handshake = false; - pp->session_id = uint256_t(); - PeerConn *peer = nullptr; + conn->peer = nullptr; + if (conn->is_terminated()) { - pinfo_ulock_t __g(sid2peer_lock); - auto it2 = sid2peer.find(p->session_id); - peer = it2->second.unwrap(); - sid2peer.erase(it2); + for (;;) + { + bytearray_t buff_seg = conn->send_buffer.move_pop(); + if (!buff_seg.size()) break; + new_conn->write(std::move(buff_seg)); + } } - peer->conn = nullptr; - conn->_dead_peer = peer; - this->user_tcall->async_call([this, conn, peer](ThreadCall::Handle &) { - if (peer_cb) peer_cb(conn, false); - }); - } - else - { - if (!it->second->session_id.is_null()) return; + else + this->disp_terminate(conn); + conn = new_conn; } - auto &ev_retry_timer = it->second->ev_retry_timer; - ev_retry_timer = TimerEvent(this->disp_ec, [this, addr](TimerEvent &) { - try { - start_active_conn(addr); - } catch (...) { this->disp_error_cb(std::current_exception()); } - }); - /* auto retry the connection */ - if (pp->ntry > 0) pp->ntry--; - if (pp->ntry) - ev_retry_timer.add(gen_rand_timeout(pp->retry_delay)); } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::PeerConn::reset_ping_timer() { +void PeerNetwork<O, _, __>::Peer::reset_ping_timer() { assert(ev_ping_timer); ev_ping_timer.del(); ev_ping_timer.add(gen_rand_timeout(ping_period)); } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::PeerConn::send_ping() { +void PeerNetwork<O, _, __>::Peer::send_ping() { auto pn = conn->get_net(); ping_timer_ok = false; pong_msg_ok = false; @@ -809,7 +758,7 @@ void PeerNetwork<O, _, __>::PeerConn::send_ping() { } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::PeerConn::ping_timer(TimerEvent &) { +void PeerNetwork<O, _, __>::Peer::ping_timer(TimerEvent &) { ping_timer_ok = true; if (pong_msg_ok) { @@ -819,27 +768,8 @@ void PeerNetwork<O, _, __>::PeerConn::ping_timer(TimerEvent &) { } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::move_peer_buffer(conn_t &old_conn, const conn_t &new_conn) { - assert(old_conn->is_terminated() && old_conn != new_conn); - for (;;) - { - bytearray_t buff_seg = old_conn->send_buffer.move_pop(); - if (!buff_seg.size()) break; - new_conn->write(std::move(buff_seg)); - } - old_conn = new_conn; - this->user_tcall->async_call([this, new_conn](ThreadCall::Handle &) { - if (peer_cb) peer_cb(new_conn, true); - }); -} - -template<typename O, O _, O __> -void PeerNetwork<O, _, __>::check_handshake(PeerConn *p) { - if ((force_two_way_handshake && - !(p->inbound_handshake && p->outbound_handshake)) || - !(p->inbound_handshake || p->outbound_handshake) || - p->connected) - return; +void PeerNetwork<O, _, __>::finish_handshake(Peer *p) { + if (p->connected) return; p->clear_all_events(); if (p->inbound_conn && p->inbound_conn != p->conn) p->inbound_conn->peer = nullptr; @@ -849,25 +779,10 @@ void PeerNetwork<O, _, __>::check_handshake(PeerConn *p) { p->connected = true; p->reset_ping_timer(); p->send_ping(); - { - pinfo_ulock_t _g(known_peers_lock); - auto &pp = known_peers[p->peer_addr]; - pp->session_id = p->session_id; - pp->ev_retry_timer.del(); - auto &old_conn = pp->peer_conn; - auto &conn = p->conn; - if (old_conn != conn) - { - if (old_conn->is_terminated()) - move_peer_buffer(old_conn, conn); - else - this->disp_terminate(old_conn); - } - else - this->user_tcall->async_call([this, conn](ThreadCall::Handle &) { - if (peer_cb) peer_cb(conn, true); - }); - } + p->ev_retry_timer.del(); + this->user_tcall->async_call([this, conn=p->conn](ThreadCall::Handle &) { + if (peer_cb) peer_cb(conn, true); + }); pending_peers.erase(p->conn->get_addr()); auto color_begin = ""; auto color_end = ""; @@ -879,7 +794,7 @@ void PeerNetwork<O, _, __>::check_handshake(PeerConn *p) { SALTICIDAE_LOG_INFO("%sPeerNetwork: established connection %s <-> %s via %s", color_begin, std::string(listen_addr).c_str(), - std::string(p->peer_addr).c_str(), + std::string(p->addr).c_str(), std::string(*(p->conn)).c_str(), color_end); } @@ -924,12 +839,12 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) { if (conn->is_terminated()) return; if (!msg.claimed_addr.is_null()) { - auto session_id = get_session_id(conn, msg.claimed_addr, msg.nonce); + auto pid = get_peer_id(conn, msg.claimed_addr); if (conn->get_mode() == Conn::ConnMode::PASSIVE) { pinfo_slock_t _g(known_peers_lock); - pinfo_ulock_t __g(sid2peer_lock); - if (!known_peers.count(msg.claimed_addr)) + auto pit = known_peers.find(msg.claimed_addr); + if (pit == known_peers.end()) { this->user_tcall->async_call([this, addr=msg.claimed_addr, conn](ThreadCall::Handle &) { if (unknown_peer_cb) unknown_peer_cb(addr, conn->get_peer_cert()); @@ -937,43 +852,34 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) { this->disp_terminate(conn); return; } + auto &p = pit->second; SALTICIDAE_LOG_INFO("%s inbound handshake from %s", std::string(listen_addr).c_str(), std::string(*conn).c_str()); send_msg(MsgPong(listen_addr, my_nonce), conn); - auto it = sid2peer.find(session_id); - if (it != sid2peer.end()) + if (p->connected) { - auto p = it->second.get(); - if (p->connected) - { - //conn->get_net()->disp_terminate(conn); - return; - } - auto &old_conn = p->inbound_conn; - if (old_conn && !old_conn->is_terminated()) - { - SALTICIDAE_LOG_DEBUG("%s terminating old connection %s", - std::string(listen_addr).c_str(), - std::string(*old_conn).c_str()); - assert(old_conn->peer == nullptr); - old_conn->get_net()->disp_terminate(old_conn); - } - old_conn = conn; - if (msg.nonce < my_nonce) - { - p->conn = conn; - } + //conn->get_net()->disp_terminate(conn); + return; } + auto &old_conn = p->inbound_conn; + if (old_conn && !old_conn->is_terminated()) + { + SALTICIDAE_LOG_DEBUG("%s terminating old connection %s", + std::string(listen_addr).c_str(), + std::string(*old_conn).c_str()); + assert(old_conn->peer == nullptr); + this->disp_terminate(old_conn); + } + old_conn = conn; + if (msg.nonce < my_nonce || p->addr.is_null()) + p->update_conn(conn); else { - it = sid2peer.insert(std::make_pair(session_id, - new PeerConn(session_id, get_peer_id(conn, msg.claimed_addr), - conn, conn, nullptr, this))).first; + this->disp_terminate(conn); + return; } - auto p = it->second.get(); - p->inbound_handshake = true; - check_handshake(p); + finish_handshake(p); } else SALTICIDAE_LOG_WARN("unexpected inbound handshake from %s", @@ -995,65 +901,46 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) { if (conn->is_terminated()) return; if (!msg.claimed_addr.is_null()) { - auto session_id = get_session_id(conn, msg.claimed_addr, msg.nonce); + auto pid = get_peer_id(conn, msg.claimed_addr); if (conn->get_mode() == Conn::ConnMode::ACTIVE) { pinfo_ulock_t _g(known_peers_lock); - pinfo_ulock_t __g(sid2peer_lock); SALTICIDAE_LOG_INFO("%s outbound handshake to %s", std::string(listen_addr).c_str(), std::string(*conn).c_str()); - auto it = sid2peer.find(session_id); - if (it != sid2peer.end()) + auto pit = known_peers.find(pid); + assert(pit != known_peers.end()); + auto &p = pit->second; + if (p->connected) { - auto p = it->second.get(); - if (p->connected) - { - conn->get_net()->disp_terminate(conn); - return; - } - auto &old_conn = p->outbound_conn; - if (old_conn && !old_conn->is_terminated()) - { - SALTICIDAE_LOG_DEBUG("%s terminating old connection %s", - std::string(listen_addr).c_str(), - std::string(*old_conn).c_str()); - assert(old_conn->peer == nullptr); - old_conn->get_net()->disp_terminate(old_conn); - } - old_conn = conn; - if (my_nonce < msg.nonce) - { - p->conn = conn; - } - else - { - SALTICIDAE_LOG_DEBUG("%s terminating low connection %s", + conn->get_net()->disp_terminate(conn); + return; + } + auto &old_conn = p->outbound_conn; + if (old_conn && !old_conn->is_terminated()) + { + SALTICIDAE_LOG_DEBUG("%s terminating old connection %s", std::string(listen_addr).c_str(), - std::string(*conn).c_str()); - conn->get_net()->disp_terminate(conn); - } + std::string(*old_conn).c_str()); + assert(old_conn->peer == nullptr); + old_conn->get_net()->disp_terminate(old_conn); } + old_conn = conn; + assert(!p->addr.is_null()); + if (my_nonce < msg.nonce) + p->update_conn(conn); else { - it = sid2peer.insert(std::make_pair(session_id, - new PeerConn(session_id, get_peer_id(conn, msg.claimed_addr), - conn, nullptr, conn, this))).first; + this->disp_terminate(conn); + return; } - auto p = it->second.get(); - p->outbound_handshake = true; auto &peer_addr = conn->get_addr(); - auto &old_peer_addr = p->peer_addr; + auto &old_peer_addr = p->addr; if (!old_peer_addr.is_null() && old_peer_addr != peer_addr) - { SALTICIDAE_LOG_WARN("multiple peer addresses share the same identity"); - known_peers.erase(old_peer_addr); - if (p->conn && !p->conn->is_terminated()) - this->disp_terminate(p->conn); - } old_peer_addr = peer_addr; p->reset_ping_timer(); - check_handshake(p); + finish_handshake(p); } else SALTICIDAE_LOG_WARN("unexpected outbound handshake from %s", @@ -1098,7 +985,7 @@ int32_t PeerNetwork<O, _, __>::add_peer(const PeerId &pid) { pinfo_ulock_t _g(known_peers_lock); if (known_peers.count(pid)) throw PeerNetworkError(SALTI_ERROR_PEER_ALREADY_EXISTS); - known_peers.insert(std::make_pair(pid, new Peer())); + known_peers.insert(std::make_pair(pid, new Peer(pid, this))); } catch (const PeerNetworkError &) { this->recoverable_error(std::current_exception(), id); } catch (...) { this->disp_error_cb(std::current_exception()); } @@ -1139,9 +1026,7 @@ int32_t PeerNetwork<O, _, __>::set_peer_addr(const PeerId &pid, const NetAddr &a auto it = known_peers.find(pid); if (it == known_peers.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - auto it2 = pending_peers.find(addr); - if (it2 != pending_peers.end()) - it->second.peer_conn = it2->second; + it->addr = addr; } catch (const PeerNetworkError &) { this->recoverable_error(std::current_exception(), id); } catch (...) { this->disp_error_cb(std::current_exception()); } @@ -1156,7 +1041,6 @@ int32_t PeerNetwork<O, _, __>::del_peer(const PeerId &pid) { this->disp_tcall->async_call([this, pid, id](ThreadCall::Handle &) { try { pinfo_ulock_t _g(known_peers_lock); - pinfo_ulock_t __g(sid2peer_lock); auto it = known_peers.find(pid); if (it == known_peers.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); @@ -1169,13 +1053,6 @@ int32_t PeerNetwork<O, _, __>::del_peer(const PeerId &pid) { this->disp_terminate(it2->second); pending_peers.erase(it2); } - auto it3 = sid2peer.find(pid); - if (it3 != sid2peer.end()) - { - auto &p = it3->second; - this->disp_terminate(p->conn); - sid2peer.erase(it3); - } } catch (const PeerNetworkError &) { this->recoverable_error(std::current_exception(), id); } catch (...) { this->disp_error_cb(std::current_exception()); } @@ -1187,32 +1064,23 @@ template<typename O, O _, O __> typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::get_peer_conn(const PeerId &pid) const { auto ret = *(static_cast<conn_t *>( - this->disp_tcall->call([this, addr](ThreadCall::Handle &h) { + this->disp_tcall->call([this, pid](ThreadCall::Handle &h) { conn_t conn; pinfo_slock_t _g(known_peers_lock); - pinfo_slock_t __g(sid2peer_lock); - auto it = known_peers.find(addr); + auto it = known_peers.find(pid); if (it == known_peers.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - if (it->second->session_id.is_null()) - conn = nullptr; - else - { - auto it2 = sid2peer.find(it->second->session_id); - assert(it2 != sid2peer.end()); - conn = it2->second->conn; - } - h.set_result(std::move(conn)); + h.set_result(it->second->conn); }).get())); return ret; } template<typename O, O _, O __> -bool PeerNetwork<O, _, __>::has_peer(const NetAddr &addr) const { +bool PeerNetwork<O, _, __>::has_peer(const PeerId &pid) const { return *(static_cast<bool *>(this->disp_tcall->call( - [this, addr](ThreadCall::Handle &h) { + [this, pid](ThreadCall::Handle &h) { pinfo_slock_t _g(known_peers_lock); - h.set_result(known_peers.count(addr)); + h.set_result(known_peers.count(pid)); }).get())); } |