diff options
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r-- | include/salticidae/network.h | 217 |
1 files changed, 132 insertions, 85 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 37ecc75..eff5369 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -129,6 +129,7 @@ class MsgNetwork: public ConnPool { queue_t incoming_msgs; protected: + const uint32_t msg_magic; ConnPool::Conn *create_conn() override { return new Conn(); } void on_read(const ConnPool::conn_t &) override; @@ -159,6 +160,7 @@ class MsgNetwork: public ConnPool { size_t _max_msg_size; size_t _max_msg_queue_size; size_t _burst_size; + uint32_t _msg_magic; public: Config(): Config(ConnPool::Config()) {} @@ -166,7 +168,8 @@ class MsgNetwork: public ConnPool { ConnPool::Config(config), _max_msg_size(1024), _max_msg_queue_size(65536), - _burst_size(1000) {} + _burst_size(1000), + _msg_magic(0x0) {} Config &max_msg_size(size_t x) { _max_msg_size = x; @@ -182,6 +185,10 @@ class MsgNetwork: public ConnPool { _burst_size = x; return *this; } + + Config &msg_magic(uint32_t x) { + _msg_magic = x; + } }; virtual ~MsgNetwork() { stop(); } @@ -189,7 +196,8 @@ class MsgNetwork: public ConnPool { MsgNetwork(const EventContext &ec, const Config &config): ConnPool(ec, config), max_msg_size(config._max_msg_size), - max_msg_queue_size(config._max_msg_queue_size) { + max_msg_queue_size(config._max_msg_queue_size), + msg_magic(config._msg_magic) { incoming_msgs.set_capacity(max_msg_queue_size); incoming_msgs.reg_handler(ec, [this, burst_size=config._burst_size](queue_t &q) { std::pair<Msg, conn_t> item; @@ -293,6 +301,37 @@ class ClientNetwork: public MsgNetwork<OpcodeType> { inline int32_t _send_msg_deferred(Msg &&msg, const NetAddr &addr); }; +struct PeerId: public uint256_t { + using uint256_t::uint256_t; + PeerId(const NetAddr &addr) { + *(static_cast<uint256_t *>(this)) = salticidae::get_hash(addr); + } + + PeerId(const X509 &cert) { + *(static_cast<uint256_t *>(this)) = salticidae::get_hash(cert.get_der()); + } +}; + +} + +namespace std { + template <> + struct hash<salticidae::PeerId> { + size_t operator()(const salticidae::PeerId &p) const { + return hash<salticidae::uint256_t>()(p); + } + }; + + template <> + struct hash<const salticidae::PeerId> { + size_t operator()(const salticidae::PeerId &p) const { + return hash<salticidae::uint256_t>()(p); + } + }; +} + +namespace salticidae { + /** Peer-to-peer network where any two nodes could hold a bi-diretional message * channel, established by either side. */ template<typename OpcodeType = uint8_t, @@ -344,20 +383,6 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { using peer_callback_t = std::function<void(const conn_t &peer_conn, bool connected)>; using unknown_peer_callback_t = std::function<void(const NetAddr &claimed_addr, const X509 *cert)>; - struct PeerId: public uint256_t { - PeerId(const NetAddr &addr) { - DataStream tmp; - tmp << addr; - *(static_cast<uint256_t *>(this)) = tmp.get_hash(); - } - - PeerId(const X509 &cert) { - DataStream tmp; - tmp << cert.get_der(); - *(static_cast<uint256_t *>(this)) = tmp.get_hash(); - } - }; - private: struct Peer { @@ -425,6 +450,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { NetAddr listen_addr; bool allow_unknown_peer; uint256_t my_nonce; + uint256_t passive_nonce; struct MsgPing { static const OpcodeType opcode; @@ -460,7 +486,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { conn_t start_active_conn(const NetAddr &addr); static void tcall_reset_timeout(ConnPool::Worker *worker, const conn_t &conn, double timeout); - inline conn_t _get_peer_conn(const NetAddr &addr) const; + inline conn_t _get_peer_conn(const PeerId &peer) const; protected: ConnPool::Conn *create_conn() override { return new Conn(); } @@ -522,6 +548,9 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { ping_period(config._ping_period), conn_timeout(config._conn_timeout), allow_unknown_peer(config._allow_unknown_peer) { + uint64_t ones[4]; + memset(ones, 0xff, 32); + passive_nonce.load((uint8_t *)ones); this->reg_handler(generic_bind(&PeerNetwork::ping_handler, this, _1, _2)); this->reg_handler(generic_bind(&PeerNetwork::pong_handler, this, _1, _2)); } @@ -529,26 +558,26 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { virtual ~PeerNetwork() { this->stop(); } /* register a peer as known */ - int32_t add_peer(const PeerId &pid); + int32_t add_peer(const PeerId &peer); /* set the peer's public IP */ - int32_t set_peer_addr(const PeerId &pid, const NetAddr &addr); + int32_t set_peer_addr(const PeerId &peer, const NetAddr &addr); /* try to connect to the peer: once (ntry = 1), indefinitely (ntry = -1), give up retry (ntry = 0) */ - int32_t try_conn_peer(const PeerId &pid, ssize_t ntry = -1, double retry_delay = 2); + int32_t conn_peer(const PeerId &peer, ssize_t ntry = -1, double retry_delay = 2); /* unregister the peer */ - int32_t del_peer(const PeerId &pid); - bool has_peer(const PeerId &pid) const; + int32_t del_peer(const PeerId &peer); + bool has_peer(const PeerId &peer) const; size_t get_npending() const; conn_t get_peer_conn(const PeerId &addr) const; using MsgNet::send_msg; template<typename MsgType> - inline bool send_msg(const MsgType &msg, const NetAddr &addr); - inline bool _send_msg(const Msg &msg, const NetAddr &addr); + inline bool send_msg(const MsgType &msg, const PeerId &peer); + inline bool _send_msg(const Msg &msg, const PeerId &peer); template<typename MsgType> - inline int32_t send_msg_deferred(MsgType &&msg, const NetAddr &addr); - inline int32_t _send_msg_deferred(Msg &&msg, const NetAddr &addr); + inline int32_t send_msg_deferred(MsgType &&msg, const PeerId &peer); + inline int32_t _send_msg_deferred(Msg &&msg, const PeerId &peer); template<typename MsgType> - inline int32_t multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs); - inline int32_t _multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs); + inline int32_t multicast_msg(MsgType &&msg, const std::vector<PeerId> &peers); + inline int32_t _multicast_msg(Msg &&msg, const std::vector<PeerId> &peers); void listen(NetAddr listen_addr); conn_t connect(const NetAddr &addr) = delete; @@ -637,7 +666,7 @@ inline int32_t MsgNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const conn_ template<typename OpcodeType> template<typename MsgType> inline bool MsgNetwork<OpcodeType>::send_msg(const MsgType &msg, const conn_t &conn) { - return _send_msg(msg, conn); + return _send_msg(Msg(msg, msg_magic), conn); } template<typename OpcodeType> @@ -700,11 +729,11 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { auto p = conn->peer; if (!p) return; assert(conn == p->conn); + p->connected = false; + p->conn = nullptr; p->inbound_conn = nullptr; p->outbound_conn = nullptr; p->ev_ping_timer.del(); - p->connected = false; - p->conn = nullptr; this->user_tcall->async_call([this, conn](ThreadCall::Handle &) { if (peer_cb) peer_cb(conn, false); }); @@ -725,18 +754,21 @@ template<typename O, O _, O __> void PeerNetwork<O, _, __>::Peer::update_conn(const conn_t &new_conn) { if (conn != new_conn) { - conn->peer = nullptr; - if (conn->is_terminated()) + if (conn) { - for (;;) + conn->peer = nullptr; + if (conn->is_terminated()) { - bytearray_t buff_seg = conn->send_buffer.move_pop(); - if (!buff_seg.size()) break; - new_conn->write(std::move(buff_seg)); + for (;;) + { + bytearray_t buff_seg = conn->send_buffer.move_pop(); + if (!buff_seg.size()) break; + new_conn->write(std::move(buff_seg)); + } } + else + conn->get_net()->disp_terminate(conn); } - else - this->disp_terminate(conn); conn = new_conn; } } @@ -791,10 +823,10 @@ void PeerNetwork<O, _, __>::finish_handshake(Peer *p) { color_begin = TTY_COLOR_BLUE; color_end = TTY_COLOR_RESET; } - SALTICIDAE_LOG_INFO("%sPeerNetwork: established connection %s <-> %s via %s", + SALTICIDAE_LOG_INFO("%sPeerNetwork: established connection %s <-> %s(%s)", color_begin, std::string(listen_addr).c_str(), - std::string(p->addr).c_str(), + get_hex10(p->id).c_str(), std::string(*(p->conn)).c_str(), color_end); } @@ -823,11 +855,11 @@ typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::start_active_conn( } template<typename O, O _, O __> -inline typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::_get_peer_conn(const NetAddr &addr) const { - auto it = known_peers.find(addr); +inline typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::_get_peer_conn(const PeerId &pid) const { + auto it = known_peers.find(pid); if (it == known_peers.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - return it->second->peer_conn; + return it->second->conn; } /* end: functions invoked by the dispatcher */ @@ -843,7 +875,7 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) { if (conn->get_mode() == Conn::ConnMode::PASSIVE) { pinfo_slock_t _g(known_peers_lock); - auto pit = known_peers.find(msg.claimed_addr); + auto pit = known_peers.find(pid); if (pit == known_peers.end()) { this->user_tcall->async_call([this, addr=msg.claimed_addr, conn](ThreadCall::Handle &) { @@ -856,7 +888,7 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) { SALTICIDAE_LOG_INFO("%s inbound handshake from %s", std::string(listen_addr).c_str(), std::string(*conn).c_str()); - send_msg(MsgPong(listen_addr, my_nonce), conn); + send_msg(MsgPong(listen_addr, p->addr.is_null() ? passive_nonce : my_nonce), conn); if (p->connected) { //conn->get_net()->disp_terminate(conn); @@ -879,7 +911,7 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) { this->disp_terminate(conn); return; } - finish_handshake(p); + finish_handshake(p.get()); } else SALTICIDAE_LOG_WARN("unexpected inbound handshake from %s", @@ -905,17 +937,16 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) { if (conn->get_mode() == Conn::ConnMode::ACTIVE) { pinfo_ulock_t _g(known_peers_lock); - SALTICIDAE_LOG_INFO("%s outbound handshake to %s", - std::string(listen_addr).c_str(), - std::string(*conn).c_str()); auto pit = known_peers.find(pid); - assert(pit != known_peers.end()); - auto &p = pit->second; - if (p->connected) + if (pit == known_peers.end()) { - conn->get_net()->disp_terminate(conn); + this->disp_terminate(conn); return; } + SALTICIDAE_LOG_INFO("%s outbound handshake to %s", + std::string(listen_addr).c_str(), + std::string(*conn).c_str()); + auto &p = pit->second; auto &old_conn = p->outbound_conn; if (old_conn && !old_conn->is_terminated()) { @@ -940,7 +971,7 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) { SALTICIDAE_LOG_WARN("multiple peer addresses share the same identity"); old_peer_addr = peer_addr; p->reset_ping_timer(); - finish_handshake(p); + finish_handshake(p.get()); } else SALTICIDAE_LOG_WARN("unexpected outbound handshake from %s", @@ -994,7 +1025,7 @@ int32_t PeerNetwork<O, _, __>::add_peer(const PeerId &pid) { } template<typename O, O _, O __> -int32_t PeerNetwork<O, _, __>::try_conn_peer(const PeerId &pid, ssize_t ntry, double retry_delay) { +int32_t PeerNetwork<O, _, __>::conn_peer(const PeerId &pid, ssize_t ntry, double retry_delay) { auto id = this->gen_async_id(); this->disp_tcall->async_call([this, pid, ntry, retry_delay, id](ThreadCall::Handle &) { try { @@ -1003,13 +1034,15 @@ int32_t PeerNetwork<O, _, __>::try_conn_peer(const PeerId &pid, ssize_t ntry, do if (it == known_peers.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); auto &p = it->second; - if (p.addr.is_null()) + if (p->addr.is_null()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_READY); - // FIXME: ??? - if (p.peer_conn) - p.peer_conn->disp_terminate(); - p.retry_delay = retry_delay; - p.peer_conn = ntry ? start_active_conn(p.addr) : conn_t(); + p->ntry = ntry; + p->retry_delay = retry_delay; + p->connected = false; + p->inbound_conn = nullptr; + p->outbound_conn = nullptr; + p->ev_ping_timer.del(); + p->update_conn(ntry ? start_active_conn(p->addr) : conn_t()); } catch (const PeerNetworkError &) { this->recoverable_error(std::current_exception(), id); } catch (...) { this->disp_error_cb(std::current_exception()); } @@ -1026,7 +1059,7 @@ int32_t PeerNetwork<O, _, __>::set_peer_addr(const PeerId &pid, const NetAddr &a auto it = known_peers.find(pid); if (it == known_peers.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - it->addr = addr; + it->second->addr = addr; } catch (const PeerNetworkError &) { this->recoverable_error(std::current_exception(), id); } catch (...) { this->disp_error_cb(std::current_exception()); } @@ -1094,17 +1127,17 @@ size_t PeerNetwork<O, _, __>::get_npending() const { template<typename O, O _, O __> template<typename MsgType> -inline int32_t PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) { - return _send_msg_deferred(std::move(msg), addr); +inline int32_t PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const PeerId &pid) { + return _send_msg_deferred(std::move(msg), pid); } template<typename O, O _, O __> -inline int32_t PeerNetwork<O, _, __>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) { +inline int32_t PeerNetwork<O, _, __>::_send_msg_deferred(Msg &&msg, const PeerId &pid) { auto id = this->gen_async_id(); this->disp_tcall->async_call( - [this, msg=std::move(msg), addr, id](ThreadCall::Handle &) { + [this, msg=std::move(msg), pid, id](ThreadCall::Handle &) { try { - if (!_send_msg(msg, addr)) + if (!_send_msg(msg, pid)) throw PeerNetworkError(SALTI_ERROR_CONN_NOT_READY); } catch (...) { this->recoverable_error(std::current_exception(), id); } }); @@ -1113,31 +1146,31 @@ inline int32_t PeerNetwork<O, _, __>::_send_msg_deferred(Msg &&msg, const NetAdd template<typename O, O _, O __> template<typename MsgType> -inline bool PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &addr) { - return _send_msg(msg, addr); +inline bool PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const PeerId &pid) { + return _send_msg(Msg(msg, this->msg_magic), pid); } template<typename O, O _, O __> -inline bool PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const NetAddr &addr) { +inline bool PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const PeerId &pid) { pinfo_slock_t _g(known_peers_lock); - return MsgNet::_send_msg(msg, _get_peer_conn(addr)); + return MsgNet::_send_msg(msg, _get_peer_conn(pid)); } template<typename O, O _, O __> template<typename MsgType> -inline int32_t PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs) { - return _multicast_msg(MsgType(std::move(msg)), addrs); +inline int32_t PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<PeerId> &pids) { + return _multicast_msg(MsgType(std::move(msg), this->msg_magic), pids); } template<typename O, O _, O __> -inline int32_t PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs) { +inline int32_t PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<PeerId> &pids) { auto id = this->gen_async_id(); this->disp_tcall->async_call( - [this, msg=std::move(msg), addrs, id](ThreadCall::Handle &) { + [this, msg=std::move(msg), pids, id](ThreadCall::Handle &) { try { bool succ = true; - for (auto &addr: addrs) - succ &= MsgNet::_send_msg(msg, _get_peer_conn(addr)); + for (auto &pid: pids) + succ &= MsgNet::_send_msg(msg, _get_peer_conn(pid)); if (!succ) throw PeerNetworkError(SALTI_ERROR_CONN_NOT_READY); } catch (...) { this->recoverable_error(std::current_exception(), id); } }); @@ -1205,6 +1238,9 @@ const O PeerNetwork<O, _, OPCODE_PONG>::MsgPong::opcode = OPCODE_PONG; } #ifdef SALTICIDAE_CBINDINGS +using peerid_t = salticidae::PeerId; +using peerid_array_t = std::vector<peerid_t>; + using msgnetwork_t = salticidae::MsgNetwork<_opcode_t>; using msgnetwork_config_t = msgnetwork_t::Config; using msgnetwork_conn_t = msgnetwork_t::conn_t; @@ -1220,6 +1256,9 @@ using clientnetwork_conn_t = clientnetwork_t::conn_t; #else #ifdef SALTICIDAE_CBINDINGS +typedef struct peerid_t peerid_t; +typedef struct peerid_t_array_t peerid_array_t; + typedef struct msgnetwork_t msgnetwork_t; typedef struct msgnetwork_config_t msgnetwork_config_t; typedef struct msgnetwork_conn_t msgnetwork_conn_t; @@ -1301,6 +1340,14 @@ bool msgnetwork_conn_is_terminated(const msgnetwork_conn_t *conn); /* PeerNetwork */ +//peerid_t *peerid_new(); +void peerid_free(const peerid_t *self); +peerid_t *peerid_new_from_netaddr(const netaddr_t *addr); +peerid_t *peerid_new_from_x509(const x509_t *cert); +peerid_array_t *peerid_array_new(); +peerid_array_t *peerid_array_new_from_peerids(const peerid_t * const *pids, size_t npids); +void peerid_array_free(peerid_array_t *self); + peernetwork_config_t *peernetwork_config_new(); void peernetwork_config_free(const peernetwork_config_t *self); void peernetwork_config_ping_period(peernetwork_config_t *self, double t); @@ -1310,10 +1357,10 @@ msgnetwork_config_t *peernetwork_config_as_msgnetwork_config(peernetwork_config_ peernetwork_t *peernetwork_new(const eventcontext_t *ec, const peernetwork_config_t *config, SalticidaeCError *err); void peernetwork_free(const peernetwork_t *self); -int32_t peernetwork_add_peer(peernetwork_t *self, const netaddr_t *addr); -int32_t peernetwork_del_peer(peernetwork_t *self, const netaddr_t *addr); -bool peernetwork_has_peer(const peernetwork_t *self, const netaddr_t *addr); -const peernetwork_conn_t *peernetwork_get_peer_conn(const peernetwork_t *self, const netaddr_t *addr, SalticidaeCError *cerror); +int32_t peernetwork_add_peer(peernetwork_t *self, const peerid_t *pid); +int32_t peernetwork_del_peer(peernetwork_t *self, const peerid_t *pid); +bool peernetwork_has_peer(const peernetwork_t *self, const peerid_t *pid); +const peernetwork_conn_t *peernetwork_get_peer_conn(const peernetwork_t *self, const peerid_t *pid, SalticidaeCError *cerror); msgnetwork_t *peernetwork_as_msgnetwork(peernetwork_t *self); peernetwork_t *msgnetwork_as_peernetwork_unsafe(msgnetwork_t *self); msgnetwork_conn_t *msgnetwork_conn_new_from_peernetwork_conn(const peernetwork_conn_t *conn); @@ -1321,9 +1368,9 @@ peernetwork_conn_t *peernetwork_conn_new_from_msgnetwork_conn_unsafe(const msgne peernetwork_conn_t *peernetwork_conn_copy(const peernetwork_conn_t *self); netaddr_t *peernetwork_conn_get_peer_addr(const peernetwork_conn_t *self); void peernetwork_conn_free(const peernetwork_conn_t *self); -bool peernetwork_send_msg(peernetwork_t *self, const msg_t * msg, const netaddr_t *addr); -int32_t peernetwork_send_msg_deferred_by_move(peernetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr); -int32_t peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const netaddr_array_t *addrs); +bool peernetwork_send_msg(peernetwork_t *self, const msg_t * msg, const peerid_t *peer); +int32_t peernetwork_send_msg_deferred_by_move(peernetwork_t *self, msg_t * _moved_msg, const peerid_t *peer); +int32_t peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const peerid_array_t *peers); void peernetwork_listen(peernetwork_t *self, const netaddr_t *listen_addr, SalticidaeCError *err); typedef void (*peernetwork_peer_callback_t)(const peernetwork_conn_t *, bool connected, void *userdata); |