diff options
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r-- | include/salticidae/network.h | 236 |
1 files changed, 157 insertions, 79 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h index dd05ed3..152ca60 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -345,12 +345,26 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { using peer_callback_t = std::function<void(const conn_t &peer_conn, bool connected)>; using unknown_peer_callback_t = std::function<void(const NetAddr &claimed_addr, const X509 *cert)>; + struct PeerId: public uint256_t { + PeerId(const NetAddr &addr) { + DataStream tmp; + tmp << addr; + *this = tmp.get_hash(); + } + + PeerId(const X509 &cert) { + DataStream tmp; + tmp << cert.get_der(); + *this = tmp.get_hash(); + } + }; + private: class PeerConn { friend PeerNetwork; /** connection addr, may be different due to passive mode */ - uint256_t peer_id; - NetAddr peer_addr; + uint256_t session_id; + PeerId peer_id; /** the underlying connection, may be invalid when connected = false */ conn_t conn; conn_t inbound_conn; @@ -365,10 +379,10 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { double ping_period; PeerConn() = delete; - PeerConn(const uint256_t &peer_id, + PeerConn(const uint256_t &session_id, const PeerId &peer_id, conn_t conn, conn_t inbound_conn, conn_t outbound_conn, const PeerNetwork *pn): - peer_id(peer_id), + session_id(session_id), conn(conn), inbound_conn(inbound_conn), outbound_conn(outbound_conn), @@ -396,33 +410,41 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { }; struct Peer { - uint256_t peer_id; + uint256_t session_id; + NetAddr addr; + double retry_delay; + ssize_t ntry; TimerEvent ev_retry_timer; conn_t peer_conn; - Peer(const conn_t &conn): - peer_id(), ev_retry_timer(), peer_conn(conn) {} + Peer(): + session_id(), + retry_delay(0), ntry(0), + ev_retry_timer() {} }; + /* connections whose PeerId is unknown */ std::unordered_map<NetAddr, conn_t> pending_peers; - std::unordered_map<NetAddr, BoxObj<Peer>> known_peers; - std::unordered_map<uint256_t, BoxObj<PeerConn>> pid2peer; + /* registered peers */ + std::unordered_map<PeerId, BoxObj<Peer>> known_peers; + /* peer connection deduplication map */ + std::unordered_map<uint256_t, BoxObj<PeerConn>> sid2peer; using pinfo_slock_t = std::shared_lock<std::shared_timed_mutex>; using pinfo_ulock_t = std::shared_lock<std::shared_timed_mutex>; mutable std::shared_timed_mutex known_peers_lock; - mutable std::shared_timed_mutex pid2peer_lock; + mutable std::shared_timed_mutex sid2peer_lock; peer_callback_t peer_cb; unknown_peer_callback_t unknown_peer_cb; const IdentityMode id_mode; - double retry_conn_delay; double ping_period; double conn_timeout; NetAddr listen_addr; bool allow_unknown_peer; + bool force_two_way_handshake; uint256_t my_nonce; struct MsgPing { @@ -464,12 +486,10 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { protected: ConnPool::Conn *create_conn() override { return new Conn(); } - virtual double gen_conn_timeout() { - return gen_rand_timeout(retry_conn_delay); - } void on_setup(const ConnPool::conn_t &) override; void on_teardown(const ConnPool::conn_t &) override; - uint256_t gen_peer_id(const conn_t &conn, const NetAddr &claimed_addr, const uint256_t &nonce) { + + uint256_t get_session_id(const conn_t &conn, const NetAddr &claimed_addr, const uint256_t &nonce) { DataStream tmp; if (!this->enable_tls || id_mode == ADDR_BASED) tmp << nonce << claimed_addr; @@ -478,14 +498,22 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { return tmp.get_hash(); } + PeerId get_peer_id(const conn_t &conn, const NetAddr &addr) { + DataStream tmp; + if (!this->enable_tls || id_mode == ADDR_BASED) + return PeerId(addr); + else + return PeerId(*conn->get_peer_cert()); + } + public: class Config: public MsgNet::Config { friend PeerNetwork; - double _retry_conn_delay; double _ping_period; double _conn_timeout; bool _allow_unknown_peer; + bool _force_two_way_handshake; IdentityMode _id_mode; public: @@ -493,18 +521,13 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { Config(const typename MsgNet::Config &config): MsgNet::Config(config), - _retry_conn_delay(2), _ping_period(30), _conn_timeout(180), _allow_unknown_peer(false), + _force_two_way_handshake(false), _id_mode(CERT_BASED) {} - Config &retry_conn_delay(double x) { - _retry_conn_delay = x; - return *this; - } - Config &ping_period(double x) { _ping_period = x; return *this; @@ -524,26 +547,37 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { _allow_unknown_peer = x; return *this; } + + Config &force_two_way_handshake(bool x) { + _force_two_way_handshake = x; + return *this; + } }; PeerNetwork(const EventContext &ec, const Config &config): MsgNet(ec, config), id_mode(config._id_mode), - retry_conn_delay(config._retry_conn_delay), ping_period(config._ping_period), conn_timeout(config._conn_timeout), - allow_unknown_peer(config._allow_unknown_peer) { + allow_unknown_peer(config._allow_unknown_peer), + force_two_way_handshake(config._force_two_way_handshake) { this->reg_handler(generic_bind(&PeerNetwork::ping_handler, this, _1, _2)); this->reg_handler(generic_bind(&PeerNetwork::pong_handler, this, _1, _2)); } virtual ~PeerNetwork() { this->stop(); } - int32_t add_peer(const NetAddr &addr); - int32_t del_peer(const NetAddr &addr); - bool has_peer(const NetAddr &addr) const; + /* register a peer as known */ + int32_t add_peer(const PeerId &pid); + /* set the peer's public IP */ + int32_t set_peer_addr(const PeerId &pid, const NetAddr &addr); + /* try to connect to the peer: once (ntry = 1), indefinitely (ntry = -1), give up retry (ntry = 0) */ + int32_t try_conn_peer(const PeerId &pid, ssize_t ntry = -1, double retry_delay = 2); + /* unregister the peer */ + int32_t del_peer(const PeerId &pid); + bool has_peer(const PeerId &pid) const; size_t get_npending() const; - conn_t get_peer_conn(const NetAddr &addr) const; + conn_t get_peer_conn(const PeerId &addr) const; using MsgNet::send_msg; template<typename MsgType> inline bool send_msg(const MsgType &msg, const NetAddr &addr); @@ -703,16 +737,16 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { pending_peers.erase(addr); SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*conn).c_str()); auto p = conn->peer; - if (p) addr = p->peer_addr; + auto pid = p ? p->peer_id : PeerId(); pinfo_ulock_t _g(known_peers_lock); - auto it = known_peers.find(addr); + auto it = known_peers.find(pid); if (it == known_peers.end()) return; auto &pp = it->second; if (conn == pp->peer_conn) { - pinfo_ulock_t __g(pid2peer_lock); - auto it2 = pid2peer.find(pp->peer_id); - if (it2 != pid2peer.end()) + pinfo_ulock_t __g(sid2peer_lock); + auto it2 = sid2peer.find(pp->session_id); + if (it2 != sid2peer.end()) { auto &p = it2->second; if (p->connected && p->conn != conn) @@ -728,13 +762,13 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { p->connected = false; p->outbound_handshake = false; p->inbound_handshake = false; - pp->peer_id = uint256_t(); + pp->session_id = uint256_t(); PeerConn *peer = nullptr; { - pinfo_ulock_t __g(pid2peer_lock); - auto it2 = pid2peer.find(p->peer_id); + pinfo_ulock_t __g(sid2peer_lock); + auto it2 = sid2peer.find(p->session_id); peer = it2->second.unwrap(); - pid2peer.erase(it2); + sid2peer.erase(it2); } peer->conn = nullptr; conn->_dead_peer = peer; @@ -744,7 +778,7 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { } else { - if (!it->second->peer_id.is_null()) return; + if (!it->second->session_id.is_null()) return; } auto &ev_retry_timer = it->second->ev_retry_timer; ev_retry_timer = TimerEvent(this->disp_ec, [this, addr](TimerEvent &) { @@ -752,7 +786,10 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { start_active_conn(addr); } catch (...) { this->disp_error_cb(std::current_exception()); } }); - ev_retry_timer.add(gen_conn_timeout()); + /* auto retry the connection */ + if (pp->ntry > 0) pp->ntry--; + if (pp->ntry) + ev_retry_timer.add(gen_rand_timeout(pp->retry_delay)); } template<typename O, O _, O __> @@ -798,8 +835,10 @@ void PeerNetwork<O, _, __>::move_peer_buffer(conn_t &old_conn, const conn_t &new template<typename O, O _, O __> void PeerNetwork<O, _, __>::check_handshake(PeerConn *p) { - if (!(p->inbound_handshake && p->outbound_handshake) || - p->connected) + if ((force_two_way_handshake && + !(p->inbound_handshake && p->outbound_handshake)) || + !(p->inbound_handshake || p->outbound_handshake) || + p->connected) return; p->clear_all_events(); if (p->inbound_conn && p->inbound_conn != p->conn) @@ -813,7 +852,7 @@ void PeerNetwork<O, _, __>::check_handshake(PeerConn *p) { { pinfo_ulock_t _g(known_peers_lock); auto &pp = known_peers[p->peer_addr]; - pp->peer_id = p->peer_id; + pp->session_id = p->session_id; pp->ev_retry_timer.del(); auto &old_conn = pp->peer_conn; auto &conn = p->conn; @@ -885,11 +924,11 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) { if (conn->is_terminated()) return; if (!msg.claimed_addr.is_null()) { - auto peer_id = gen_peer_id(conn, msg.claimed_addr, msg.nonce); + auto session_id = get_session_id(conn, msg.claimed_addr, msg.nonce); if (conn->get_mode() == Conn::ConnMode::PASSIVE) { pinfo_slock_t _g(known_peers_lock); - pinfo_ulock_t __g(pid2peer_lock); + pinfo_ulock_t __g(sid2peer_lock); if (!known_peers.count(msg.claimed_addr)) { this->user_tcall->async_call([this, addr=msg.claimed_addr, conn](ThreadCall::Handle &) { @@ -902,8 +941,8 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) { std::string(listen_addr).c_str(), std::string(*conn).c_str()); send_msg(MsgPong(listen_addr, my_nonce), conn); - auto it = pid2peer.find(peer_id); - if (it != pid2peer.end()) + auto it = sid2peer.find(session_id); + if (it != sid2peer.end()) { auto p = it->second.get(); if (p->connected) @@ -928,8 +967,9 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) { } else { - it = pid2peer.insert(std::make_pair(peer_id, - new PeerConn(peer_id, conn, conn, nullptr, this))).first; + it = sid2peer.insert(std::make_pair(session_id, + new PeerConn(session_id, get_peer_id(conn, msg.claimed_addr), + conn, conn, nullptr, this))).first; } auto p = it->second.get(); p->inbound_handshake = true; @@ -955,16 +995,16 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) { if (conn->is_terminated()) return; if (!msg.claimed_addr.is_null()) { - auto peer_id = gen_peer_id(conn, msg.claimed_addr, msg.nonce); + auto session_id = get_session_id(conn, msg.claimed_addr, msg.nonce); if (conn->get_mode() == Conn::ConnMode::ACTIVE) { pinfo_ulock_t _g(known_peers_lock); - pinfo_ulock_t __g(pid2peer_lock); + pinfo_ulock_t __g(sid2peer_lock); SALTICIDAE_LOG_INFO("%s outbound handshake to %s", std::string(listen_addr).c_str(), std::string(*conn).c_str()); - auto it = pid2peer.find(peer_id); - if (it != pid2peer.end()) + auto it = sid2peer.find(session_id); + if (it != sid2peer.end()) { auto p = it->second.get(); if (p->connected) @@ -996,8 +1036,9 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) { } else { - it = pid2peer.insert(std::make_pair(peer_id, - new PeerConn(peer_id, conn, nullptr, conn, this))).first; + it = sid2peer.insert(std::make_pair(session_id, + new PeerConn(session_id, get_peer_id(conn, msg.claimed_addr), + conn, nullptr, conn, this))).first; } auto p = it->second.get(); p->outbound_handshake = true; @@ -1050,20 +1091,14 @@ void PeerNetwork<O, _, __>::listen(NetAddr _listen_addr) { } template<typename O, O _, O __> -int32_t PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) { +int32_t PeerNetwork<O, _, __>::add_peer(const PeerId &pid) { auto id = this->gen_async_id(); - this->disp_tcall->async_call([this, addr, id](ThreadCall::Handle &) { + this->disp_tcall->async_call([this, pid, id](ThreadCall::Handle &) { try { pinfo_ulock_t _g(known_peers_lock); - if (known_peers.count(addr)) + if (known_peers.count(pid)) throw PeerNetworkError(SALTI_ERROR_PEER_ALREADY_EXISTS); - auto it = pending_peers.find(addr); - conn_t conn; - if (it == pending_peers.end()) - conn = start_active_conn(addr); - else - conn = it->second; - known_peers.insert(std::make_pair(addr, new Peer(conn))); + known_peers.insert(std::make_pair(pid, new Peer())); } catch (const PeerNetworkError &) { this->recoverable_error(std::current_exception(), id); } catch (...) { this->disp_error_cb(std::current_exception()); } @@ -1072,16 +1107,60 @@ int32_t PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) { } template<typename O, O _, O __> -int32_t PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) { +int32_t PeerNetwork<O, _, __>::try_conn_peer(const PeerId &pid, ssize_t ntry, double retry_delay) { + auto id = this->gen_async_id(); + this->disp_tcall->async_call([this, pid, ntry, retry_delay, id](ThreadCall::Handle &) { + try { + pinfo_ulock_t _g(known_peers_lock); + auto it = known_peers.find(pid); + if (it == known_peers.end()) + throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); + auto &p = it->second; + if (p.addr.is_null()) + throw PeerNetworkError(SALTI_ERROR_PEER_NOT_READY); + // FIXME: ??? + if (p.peer_conn) + p.peer_conn->disp_terminate(); + p.retry_delay = retry_delay; + p.peer_conn = ntry ? start_active_conn(p.addr) : conn_t(); + } catch (const PeerNetworkError &) { + this->recoverable_error(std::current_exception(), id); + } catch (...) { this->disp_error_cb(std::current_exception()); } + }); + return id; +} + +template<typename O, O _, O __> +int32_t PeerNetwork<O, _, __>::set_peer_addr(const PeerId &pid, const NetAddr &addr) { + auto id = this->gen_async_id(); + this->disp_tcall->async_call([this, pid, addr, id](ThreadCall::Handle &) { + try { + pinfo_ulock_t _g(known_peers_lock); + auto it = known_peers.find(pid); + if (it == known_peers.end()) + throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); + auto it2 = pending_peers.find(addr); + if (it2 != pending_peers.end()) + it->second.peer_conn = it2->second; + } catch (const PeerNetworkError &) { + this->recoverable_error(std::current_exception(), id); + } catch (...) { this->disp_error_cb(std::current_exception()); } + }); + return id; +} + + +template<typename O, O _, O __> +int32_t PeerNetwork<O, _, __>::del_peer(const PeerId &pid) { auto id = this->gen_async_id(); - this->disp_tcall->async_call([this, addr, id](ThreadCall::Handle &) { + this->disp_tcall->async_call([this, pid, id](ThreadCall::Handle &) { try { pinfo_ulock_t _g(known_peers_lock); - pinfo_ulock_t __g(pid2peer_lock); - auto it = known_peers.find(addr); + pinfo_ulock_t __g(sid2peer_lock); + auto it = known_peers.find(pid); if (it == known_peers.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - auto peer_id = it->second->peer_id; + auto addr = it->second->addr; known_peers.erase(it); auto it2 = pending_peers.find(addr); if (it2 != pending_peers.end()) @@ -1090,12 +1169,12 @@ int32_t PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) { this->disp_terminate(it2->second); pending_peers.erase(it2); } - auto it3 = pid2peer.find(peer_id); - if (it3 != pid2peer.end()) + auto it3 = sid2peer.find(pid); + if (it3 != sid2peer.end()) { auto &p = it3->second; this->disp_terminate(p->conn); - pid2peer.erase(it3); + sid2peer.erase(it3); } } catch (const PeerNetworkError &) { this->recoverable_error(std::current_exception(), id); @@ -1106,21 +1185,21 @@ int32_t PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) { template<typename O, O _, O __> typename PeerNetwork<O, _, __>::conn_t -PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &addr) const { +PeerNetwork<O, _, __>::get_peer_conn(const PeerId &pid) const { auto ret = *(static_cast<conn_t *>( this->disp_tcall->call([this, addr](ThreadCall::Handle &h) { conn_t conn; pinfo_slock_t _g(known_peers_lock); - pinfo_slock_t __g(pid2peer_lock); + pinfo_slock_t __g(sid2peer_lock); auto it = known_peers.find(addr); if (it == known_peers.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - if (it->second->peer_id.is_null()) + if (it->second->session_id.is_null()) conn = nullptr; else { - auto it2 = pid2peer.find(it->second->peer_id); - assert(it2 != pid2peer.end()); + auto it2 = sid2peer.find(it->second->session_id); + assert(it2 != sid2peer.end()); conn = it2->second->conn; } h.set_result(std::move(conn)); @@ -1313,7 +1392,7 @@ void msgnetwork_config_max_msg_queue_size(msgnetwork_config_t *self, size_t size void msgnetwork_config_burst_size(msgnetwork_config_t *self, size_t burst_size); void msgnetwork_config_max_listen_backlog(msgnetwork_config_t *self, int backlog); void msgnetwork_config_conn_server_timeout(msgnetwork_config_t *self, double timeout); -void msgnetwork_config_seg_buff_size(msgnetwork_config_t *self, size_t size); +void msgnetwork_config_recv_chunk_size(msgnetwork_config_t *self, size_t size); void msgnetwork_config_nworker(msgnetwork_config_t *self, size_t nworker); void msgnetwork_config_max_recv_buff_size(msgnetwork_config_t *self, size_t size); void msgnetwork_config_max_send_buff_size(msgnetwork_config_t *self, size_t size); @@ -1356,7 +1435,6 @@ bool msgnetwork_conn_is_terminated(const msgnetwork_conn_t *conn); peernetwork_config_t *peernetwork_config_new(); void peernetwork_config_free(const peernetwork_config_t *self); -void peernetwork_config_retry_conn_delay(peernetwork_config_t *self, double t); void peernetwork_config_ping_period(peernetwork_config_t *self, double t); void peernetwork_config_conn_timeout(peernetwork_config_t *self, double t); void peernetwork_config_id_mode(peernetwork_config_t *self, peernetwork_id_mode_t mode); |