From 85552ce1b0bc997f58341f21ab8bbcf7d937ab4b Mon Sep 17 00:00:00 2001 From: Determinant Date: Wed, 26 Jun 2019 19:13:43 -0400 Subject: change to new (more flexible) p2p design --- include/salticidae/conn.h | 11 +- include/salticidae/network.h | 507 +++++++++++++++++++++++++------------------ include/salticidae/stream.h | 7 + include/salticidae/util.h | 1 + 4 files changed, 305 insertions(+), 221 deletions(-) (limited to 'include') diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index ff75e34..87966ac 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -144,8 +144,9 @@ class ConnPool { EventContext ec; EventContext disp_ec; ThreadCall* disp_tcall; - /* owned by user loop */ BoxObj user_tcall; + const bool enable_tls; + RcObj tls_cert; using worker_error_callback_t = std::function; worker_error_callback_t disp_error_cb; @@ -178,7 +179,6 @@ class ConnPool { const double conn_server_timeout; const size_t seg_buff_size; const size_t queue_capacity; - const bool enable_tls; tls_context_t tls_ctx; conn_callback_t conn_cb; @@ -422,11 +422,11 @@ class ConnPool { ConnPool(const EventContext &ec, const Config &config): ec(ec), + enable_tls(config._enable_tls), max_listen_backlog(config._max_listen_backlog), conn_server_timeout(config._conn_server_timeout), seg_buff_size(config._seg_buff_size), queue_capacity(config._queue_capacity), - enable_tls(config._enable_tls), tls_ctx(nullptr), listen_fd(-1), nworker(config._nworker), @@ -435,9 +435,10 @@ class ConnPool { { tls_ctx = new TLSContext(); if (config._tls_cert) - tls_ctx->use_cert(*config._tls_cert); + tls_cert = config._tls_cert; else - tls_ctx->use_cert_file(config._tls_cert_file); + tls_cert = new X509(X509::create_from_pem_file(config._tls_cert_file)); + tls_ctx->use_cert(*tls_cert); if (config._tls_key) tls_ctx->use_privkey(*config._tls_key); else diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 7f0964d..975084f 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -32,6 +32,8 @@ #include "salticidae/conn.h" #ifdef __cplusplus +#include +#include namespace salticidae { /** Network of nodes who can send async messages. */ template @@ -157,9 +159,6 @@ class MsgNetwork: public ConnPool { { auto &msg = item.first; auto &conn = item.second; -#ifdef SALTICIDAE_CBINDINGS_INJECT_CALLBACK - salticidae_injected_msg_callback(&msg, conn.get()); -#else auto it = handler_map.find(msg.get_opcode()); if (it == handler_map.end()) SALTICIDAE_LOG_WARN("unknown opcode: %s", @@ -175,7 +174,6 @@ class MsgNetwork: public ConnPool { #endif it->second(msg, conn); } -#endif if (++cnt == burst_size) return true; } return false; @@ -266,25 +264,29 @@ class PeerNetwork: public MsgNetwork { using unknown_callback_t = std::function; enum IdentityMode { - IP_BASED, - IP_PORT_BASED + ADDR_BASED, + CERT_BASED }; + private: + struct Peer; + + public: class Conn: public MsgNet::Conn { friend PeerNetwork; - NetAddr peer_id; + Peer *peer; TimerEvent ev_timeout; + TimerEvent ev_retry_timer; + void reset_timeout(double timeout); public: - Conn() = default; + Conn(): MsgNet::Conn(), peer(nullptr) {} PeerNetwork *get_net() { return static_cast(ConnPool::Conn::get_pool()); } - const NetAddr &get_peer() { return peer_id; } - protected: void stop() override { ev_timeout.del(); @@ -297,81 +299,99 @@ class PeerNetwork: public MsgNetwork { private: struct Peer { /** connection addr, may be different due to passive mode */ - NetAddr addr; + uint256_t nonce; + uint256_t peer_id; + NetAddr peer_addr; /** the underlying connection, may be invalid when connected = false */ conn_t conn; + conn_t inbound_conn; + conn_t outbound_conn; + TimerEvent ev_ping_timer; - TimerEvent ev_retry_timer; bool ping_timer_ok; bool pong_msg_ok; bool connected; + bool outbound_handshake; + bool inbound_handshake; + double ping_period; Peer() = delete; - Peer(NetAddr addr, conn_t conn, const EventContext &ec): - addr(addr), conn(conn), + Peer(conn_t conn, conn_t inbound_conn, conn_t outbound_conn, const PeerNetwork *pn): + conn(conn), + inbound_conn(inbound_conn), + outbound_conn(outbound_conn), ev_ping_timer( - TimerEvent(ec, std::bind(&Peer::ping_timer, this, _1))), - connected(false) {} + TimerEvent(pn->disp_ec, std::bind(&Peer::ping_timer, this, _1))), + connected(false), + outbound_handshake(false), + inbound_handshake(false), + ping_period(pn->ping_period) {} ~Peer() {} Peer &operator=(const Peer &) = delete; Peer(const Peer &) = delete; - void ping_timer(TimerEvent &); void reset_ping_timer(); void send_ping(); + void ping_timer(TimerEvent &); void clear_all_events() { if (ev_ping_timer) ev_ping_timer.del(); } - void reset_conn(const conn_t &conn); }; - std::unordered_map> id2peer; - std::unordered_map> id2upeer; + std::unordered_map pending_peers; + std::unordered_map known_peers; + std::unordered_map> pid2peer; unknown_callback_t unknown_peer_cb; const IdentityMode id_mode; double retry_conn_delay; double ping_period; double conn_timeout; - uint16_t listen_port; + NetAddr listen_addr; bool allow_unknown_peer; + uint256_t my_pname; + uint256_t my_nonce; struct MsgPing { static const OpcodeType opcode; DataStream serialized; - uint16_t port; - MsgPing(uint16_t port) { - serialized << htole(port); + uint256_t pname; + uint256_t nonce; + uint256_t peer_id; + MsgPing() { serialized << false; } + MsgPing(const uint256_t &_pname, const uint256_t &_nonce) { + serialized << true << _pname << _nonce; } MsgPing(DataStream &&s) { - s >> port; - port = letoh(port); + uint8_t flag; + s >> flag; + if (flag) + { + s >> pname >> nonce; + DataStream tmp; + tmp << pname << nonce; + peer_id = tmp.get_hash(); + } } }; - struct MsgPong { + struct MsgPong: public MsgPing { static const OpcodeType opcode; - DataStream serialized; - uint16_t port; - MsgPong(uint16_t port) { - serialized << htole(port); - } - MsgPong(DataStream &&s) { - s >> port; - port = letoh(port); - } + MsgPong(): MsgPing() {} + MsgPong(const uint256_t &_pname, const uint256_t _nonce): MsgPing(_pname, _nonce) {} + MsgPong(DataStream &&s): MsgPing(std::move(s)) {} }; void msg_ping(MsgPing &&msg, const conn_t &conn); void msg_pong(MsgPong &&msg, const conn_t &conn); void _ping_msg_cb(const conn_t &conn, uint16_t port); void _pong_msg_cb(const conn_t &conn, uint16_t port); - bool check_new_conn(const conn_t &conn, uint16_t port); - void start_active_conn(const NetAddr &paddr); + bool check_handshake(Peer *peer); + void start_active_conn(const NetAddr &addr); static void tcall_reset_timeout(ConnPool::Worker *worker, const conn_t &conn, double timeout); - Peer *get_peer(const NetAddr &id) const; + inline conn_t _get_peer_conn(const NetAddr &addr) const; protected: ConnPool::Conn *create_conn() override { return new Conn(); } @@ -400,7 +420,7 @@ class PeerNetwork: public MsgNetwork { _ping_period(30), _conn_timeout(180), _allow_unknown_peer(false), - _id_mode(IP_PORT_BASED) {} + _id_mode(ADDR_BASED) {} Config &retry_conn_delay(double x) { @@ -442,20 +462,20 @@ class PeerNetwork: public MsgNetwork { virtual ~PeerNetwork() { this->stop(); } - void add_peer(const NetAddr &paddr); - void del_peer(const NetAddr &paddr); - bool has_peer(const NetAddr &paddr) const; - const conn_t get_peer_conn(const NetAddr &paddr) const; + void add_peer(const NetAddr &addr); + void del_peer(const NetAddr &addr); + bool has_peer(const NetAddr &addr) const; + conn_t get_peer_conn(const NetAddr &addr) const; using MsgNet::send_msg; template - inline void send_msg(const MsgType &msg, const NetAddr &paddr); - inline void _send_msg(const Msg &msg, const NetAddr &paddr); + inline void send_msg(const MsgType &msg, const NetAddr &addr); + inline void _send_msg(const Msg &msg, const NetAddr &addr); template - inline void send_msg_deferred(MsgType &&msg, const NetAddr &paddr); - inline void _send_msg_deferred(Msg &&msg, const NetAddr &paddr); + inline void send_msg_deferred(MsgType &&msg, const NetAddr &addr); + inline void _send_msg_deferred(Msg &&msg, const NetAddr &addr); template - void multicast_msg(MsgType &&msg, const std::vector &paddrs); - inline void _multicast_msg(Msg &&msg, const std::vector &paddrs); + void multicast_msg(MsgType &&msg, const std::vector &addrs); + inline void _multicast_msg(Msg &&msg, const std::vector &addrs); void listen(NetAddr listen_addr); conn_t connect(const NetAddr &addr) = delete; @@ -555,6 +575,7 @@ void PeerNetwork::on_setup(const ConnPool::conn_t &_conn) { auto conn = static_pointer_cast(_conn); auto worker = conn->worker; auto &ev_timeout = conn->ev_timeout; + conn->ev_retry_timer.del(); assert(!ev_timeout); ev_timeout = TimerEvent(worker->get_ec(), [worker, conn](TimerEvent &) { try { @@ -564,54 +585,42 @@ void PeerNetwork::on_setup(const ConnPool::conn_t &_conn) { }); /* the initial ping-pong to set up the connection */ tcall_reset_timeout(worker, conn, conn_timeout); - send_msg(MsgPing(listen_port), conn); + pending_peers[conn->get_addr()] = conn; + if (conn->get_mode() == Conn::ConnMode::ACTIVE) + send_msg(MsgPing(my_pname, my_nonce), conn); } template void PeerNetwork::on_teardown(const ConnPool::conn_t &_conn) { MsgNet::on_teardown(_conn); auto conn = static_pointer_cast(_conn); + const auto &addr = conn->get_addr(); + conn->ev_retry_timer.clear(); conn->ev_timeout.clear(); - const auto &peer_id = conn->peer_id; - auto p = get_peer(peer_id); - if (!p) return; - if (conn != p->conn) return; - p->ev_ping_timer.del(); - p->connected = false; - //p->conn = nullptr; - SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*conn).c_str()); - // try to reconnect - p->ev_retry_timer = TimerEvent(this->disp_ec, [this, peer_id](TimerEvent &) { - try { - start_active_conn(peer_id); - } catch (...) { this->disp_error_cb(std::current_exception()); } - }); - p->ev_retry_timer.add(gen_conn_timeout()); -} - -template -void PeerNetwork::Peer::reset_conn(const conn_t &new_conn) { - if (conn != new_conn) + pending_peers.erase(addr); + auto p = conn->peer; + if (p) { - if (conn) - { - //SALTICIDAE_LOG_DEBUG("moving send buffer"); - //new_conn->move_send_buffer(conn); - SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str()); - auto net = conn->get_net(); - net->disp_terminate(conn); - } - addr = new_conn->get_addr(); - conn = new_conn; + if (conn != p->conn) return; + p->ev_ping_timer.del(); + p->connected = false; + known_peers[p->peer_addr] = uint256_t(); + // try to reconnect + conn->ev_retry_timer = TimerEvent(this->disp_ec, [this, addr](TimerEvent &) { + try { + start_active_conn(addr); + } catch (...) { this->disp_error_cb(std::current_exception()); } + }); + conn->ev_retry_timer.add(gen_conn_timeout()); } - clear_all_events(); + SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*conn).c_str()); } template void PeerNetwork::Peer::reset_ping_timer() { assert(ev_ping_timer); ev_ping_timer.del(); - ev_ping_timer.add(gen_rand_timeout(conn->get_net()->ping_period)); + ev_ping_timer.add(gen_rand_timeout(ping_period)); } template @@ -620,7 +629,7 @@ void PeerNetwork::Peer::send_ping() { ping_timer_ok = false; pong_msg_ok = false; tcall_reset_timeout(conn->worker, conn, pn->conn_timeout); - pn->send_msg(MsgPing(pn->listen_port), conn); + pn->send_msg(MsgPing(), conn); } template @@ -634,47 +643,20 @@ void PeerNetwork::Peer::ping_timer(TimerEvent &) { } template -bool PeerNetwork::check_new_conn(const conn_t &conn, uint16_t port) { - if (conn->peer_id.is_null()) - { /* passive connections can eventually have ids after getting the port - number in IP_BASED_PORT mode */ - conn->peer_id.ip = conn->get_addr().ip; - conn->peer_id.port = id_mode == IP_BASED ? 0: port; - } - const auto &id = conn->peer_id; - auto it = id2peer.find(id); - if (it == id2peer.end()) - { /* found an unknown peer */ - const auto &addr = conn->get_addr(); - this->user_tcall->async_call([this, id](ThreadCall::Handle &) { - if (unknown_peer_cb) unknown_peer_cb(id); - }); - if (allow_unknown_peer) - { - auto it2 = id2upeer.find(id); - if (it2 == id2upeer.end()) - it = id2upeer.insert(std::make_pair(id, new Peer(addr, nullptr, this->disp_ec))).first; - } - else - { - this->disp_terminate(conn); - return true; - } - } - auto p = it->second.get(); - if (p->connected) - { - if (conn != p->conn) - { - this->disp_terminate(conn); - return true; - } +bool PeerNetwork::check_handshake(Peer *p) { + if (!(p->inbound_handshake && p->outbound_handshake) || + p->connected) return false; - } - p->reset_conn(conn); + p->clear_all_events(); + if (p->inbound_conn && p->inbound_conn != p->conn) + p->inbound_conn->peer = nullptr; + if (p->outbound_conn && p->outbound_conn != p->conn) + p->outbound_conn->peer = nullptr; + p->conn->peer = p; p->connected = true; p->reset_ping_timer(); p->send_ping(); + known_peers[p->peer_addr] = p->peer_id; if (p->connected) { auto color_begin = ""; @@ -684,82 +666,182 @@ bool PeerNetwork::check_new_conn(const conn_t &conn, uint16_t port) { color_begin = TTY_COLOR_BLUE; color_end = TTY_COLOR_RESET; } - SALTICIDAE_LOG_INFO("%sPeerNetwork: established connection with %s via %s%s", + SALTICIDAE_LOG_INFO("%sPeerNetwork: established connection with %s <-> %s via %s", color_begin, - std::string(conn->peer_id).c_str(), std::string(*conn).c_str(), + std::string(listen_addr).c_str(), + std::string(p->peer_addr).c_str(), + std::string(*(p->conn)).c_str(), color_end); } - return false; + return true; } template void PeerNetwork::start_active_conn(const NetAddr &addr) { - auto p = get_peer(addr); - if (p->connected) return; auto conn = static_pointer_cast(MsgNet::_connect(addr)); - //assert(p->conn == nullptr); - p->conn = conn; - conn->peer_id = addr; - if (id_mode == IP_BASED) - conn->peer_id.port = 0; + pending_peers[addr] = conn; } template -typename PeerNetwork::Peer *PeerNetwork::get_peer(const NetAddr &addr) const { - auto it = id2peer.find(addr); - if (it != id2peer.end()) return it->second.get(); - it = id2upeer.find(addr); - if (it != id2upeer.end()) return it->second.get(); - return nullptr; +inline typename PeerNetwork::conn_t PeerNetwork::_get_peer_conn(const NetAddr &addr) const { + auto it = pending_peers.find(addr); + if (it == pending_peers.end()) + throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); + return it->second; } /* end: functions invoked by the dispatcher */ /* begin: functions invoked by the user loop */ template void PeerNetwork::msg_ping(MsgPing &&msg, const conn_t &conn) { - uint16_t port = msg.port; - this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &) { + this->disp_tcall->async_call([this, conn, msg=std::move(msg)](ThreadCall::Handle &) { try { - if (conn->get_mode() == ConnPool::Conn::DEAD) return; - SALTICIDAE_LOG_INFO("ping from %s, port %u", - std::string(*conn).c_str(), ntohs(port)); - if (check_new_conn(conn, port)) return; - send_msg(MsgPong(this->listen_port), conn); + auto conn_mode = conn->get_mode(); + if (conn_mode == ConnPool::Conn::DEAD) return; + if (!msg.peer_id.is_null()) + { + if (conn_mode == Conn::ConnMode::PASSIVE) + { + send_msg(MsgPong(my_pname, my_nonce), conn); + SALTICIDAE_LOG_INFO("%s inbound handshake from %s", + std::string(listen_addr).c_str(), + std::string(*conn).c_str()); + auto it = pid2peer.find(msg.peer_id); + if (it != pid2peer.end()) + { + if (msg.nonce < my_nonce) + { + auto p = it->second.get(); + auto &old_conn = p->inbound_conn; + if (old_conn && old_conn->get_mode() != Conn::ConnMode::DEAD) + { + SALTICIDAE_LOG_INFO("%s terminating old connection %s", + std::string(listen_addr).c_str(), + std::string(*old_conn).c_str()); + old_conn->peer = nullptr; + old_conn->get_net()->disp_terminate(old_conn); + } + old_conn = conn; + p->conn = conn; + } + } + else + { + it = pid2peer.insert(std::make_pair( + msg.peer_id, + new Peer(conn, conn, nullptr, this))).first; + } + auto p = it->second.get(); + p->inbound_handshake = true; + check_handshake(p); + } + else + SALTICIDAE_LOG_WARN("unexpected inbound handshake from %s", + std::string(*conn).c_str()); + } + else + { + SALTICIDAE_LOG_INFO("ping from %s", std::string(*conn).c_str()); + send_msg(MsgPong(), conn); + } } catch (...) { this->disp_error_cb(std::current_exception()); } }); } template void PeerNetwork::msg_pong(MsgPong &&msg, const conn_t &conn) { - uint16_t port = msg.port; - this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &) { + this->disp_tcall->async_call([this, conn, msg=std::move(msg)](ThreadCall::Handle &) { try { - if (conn->get_mode() == ConnPool::Conn::DEAD) return; - auto p = get_peer(conn->peer_id); - if (!p) + auto conn_mode = conn->get_mode(); + if (conn_mode == ConnPool::Conn::DEAD) return; + if (!msg.peer_id.is_null()) { - SALTICIDAE_LOG_WARN("pong message discarded"); - return; + if (conn_mode == Conn::ConnMode::ACTIVE) + { + SALTICIDAE_LOG_INFO("%s outbound handshake to %s", + std::string(listen_addr).c_str(), + std::string(*conn).c_str()); + auto it = pid2peer.find(msg.peer_id); + if (it != pid2peer.end()) + { + if (my_nonce < msg.nonce) + { + auto p = it->second.get(); + auto &old_conn = p->outbound_conn; + if (old_conn && old_conn->get_mode() != Conn::ConnMode::DEAD) + { + SALTICIDAE_LOG_INFO("%s terminating old connection %s", + std::string(listen_addr).c_str(), + std::string(*old_conn).c_str()); + old_conn->peer = nullptr; + old_conn->get_net()->disp_terminate(old_conn); + } + old_conn = conn; + p->conn = conn; + } + else + { + SALTICIDAE_LOG_INFO("%s terminating low connection %s", + std::string(listen_addr).c_str(), + std::string(*conn).c_str()); + conn->get_net()->disp_terminate(conn); + } + } + else + { + it = pid2peer.insert(std::make_pair( + msg.peer_id, + new Peer(conn, nullptr, conn, this))).first; + } + auto p = it->second.get(); + p->outbound_handshake = true; + p->peer_addr = conn->get_addr(); + p->reset_ping_timer(); + check_handshake(p); + } + else + SALTICIDAE_LOG_WARN("unexpected outbound handshake from %s", + std::string(*conn).c_str()); } - if (check_new_conn(conn, port)) return; - p->pong_msg_ok = true; - if (p->ping_timer_ok) + else { - p->reset_ping_timer(); - p->send_ping(); + auto p = conn->peer; + if (!p) return; + p->pong_msg_ok = true; + if (p->ping_timer_ok) + { + p->reset_ping_timer(); + p->send_ping(); + } } } catch (...) { this->disp_error_cb(std::current_exception()); } }); } template -void PeerNetwork::listen(NetAddr listen_addr) { +void PeerNetwork::listen(NetAddr _listen_addr) { auto ret = *(static_cast( - this->disp_tcall->call([this, listen_addr](ThreadCall::Handle &h) { + this->disp_tcall->call([this, _listen_addr](ThreadCall::Handle &h) { std::exception_ptr err = nullptr; try { - MsgNet::_listen(listen_addr); - listen_port = listen_addr.port; + MsgNet::_listen(_listen_addr); + listen_addr = _listen_addr; + DataStream pid; + if (id_mode == CERT_BASED) + { + if (!this->enable_tls) + throw PeerNetworkError(SALTI_ERROR_TLS_LOAD_CERT); + pid << this->tls_cert->get_der(); + } + else + { + pid << listen_addr; + } + my_pname = pid.get_hash(); + uint8_t rand_bytes[32]; + if (!RAND_bytes(rand_bytes, 32)) + throw PeerNetworkError(SALTI_ERROR_RAND_SOURCE); + my_nonce.load(rand_bytes); } catch (...) { err = std::current_exception(); } @@ -772,19 +854,10 @@ template void PeerNetwork::add_peer(const NetAddr &addr) { this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) { try { - auto it = id2peer.find(addr); - if (it != id2peer.end()) + if (!known_peers.insert(std::make_pair(addr, uint256_t())).second) throw PeerNetworkError(SALTI_ERROR_PEER_ALREADY_EXISTS); - auto it2 = id2upeer.find(addr); - if (it2 != id2upeer.end()) - { /* move to the known peer set */ - auto p = std::move(it2->second); - id2upeer.erase(it2); - id2peer.insert(std::make_pair(addr, std::move(p))); - } - else - id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->disp_ec))); - start_active_conn(addr); + if (!pending_peers.count(addr)) + start_active_conn(addr); } catch (const PeerNetworkError &) { this->recoverable_error(std::current_exception()); } catch (...) { this->disp_error_cb(std::current_exception()); } @@ -795,11 +868,14 @@ template void PeerNetwork::del_peer(const NetAddr &addr) { this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) { try { - auto it = id2peer.find(addr); - if (it == id2peer.end()) + if (!known_peers.erase(addr)) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - this->disp_terminate(it->second->conn); - id2peer.erase(it); + auto it = pending_peers.find(addr); + assert(it != pending_peers.end()); + auto conn = it->second; + auto p = conn->peer; + if (p) pid2peer.erase(p->peer_id); + this->disp_terminate(conn); } catch (const PeerNetworkError &) { this->recoverable_error(std::current_exception()); } catch (...) { this->disp_error_cb(std::current_exception()); } @@ -807,17 +883,24 @@ void PeerNetwork::del_peer(const NetAddr &addr) { } template -const typename PeerNetwork::conn_t -PeerNetwork::get_peer_conn(const NetAddr &paddr) const { +typename PeerNetwork::conn_t +PeerNetwork::get_peer_conn(const NetAddr &addr) const { auto ret = *(static_cast *>( - this->disp_tcall->call([this, paddr](ThreadCall::Handle &h) { + this->disp_tcall->call([this, addr](ThreadCall::Handle &h) { conn_t conn; std::exception_ptr err = nullptr; try { - auto p = get_peer(paddr); - if (!p) + auto it = known_peers.find(addr); + if (it == known_peers.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - conn = p->conn; + if (it->second.is_null()) + { + conn = nullptr; + return; + } + auto it2 = pid2peer.find(it->second); + assert(it2 != pid2peer.end()); + conn = it2->second->conn; } catch (const PeerNetworkError &) { this->recoverable_error(std::current_exception()); } catch (...) { @@ -830,60 +913,53 @@ PeerNetwork::get_peer_conn(const NetAddr &paddr) const { } template -bool PeerNetwork::has_peer(const NetAddr &paddr) const { +bool PeerNetwork::has_peer(const NetAddr &addr) const { return *(static_cast(this->disp_tcall->call( - [this, paddr](ThreadCall::Handle &h) { - h.set_result(id2peer.count(paddr)); + [this, addr](ThreadCall::Handle &h) { + h.set_result(known_peers.count(addr)); }).get())); } template template -inline void PeerNetwork::send_msg_deferred(MsgType &&msg, const NetAddr &paddr) { - return _send_msg_deferred(std::move(msg), paddr); +inline void PeerNetwork::send_msg_deferred(MsgType &&msg, const NetAddr &addr) { + return _send_msg_deferred(std::move(msg), addr); } template -inline void PeerNetwork::_send_msg_deferred(Msg &&msg, const NetAddr &paddr) { +inline void PeerNetwork::_send_msg_deferred(Msg &&msg, const NetAddr &addr) { this->disp_tcall->async_call( - [this, msg=std::move(msg), paddr](ThreadCall::Handle &) { + [this, msg=std::move(msg), addr](ThreadCall::Handle &) { try { - _send_msg(msg, paddr); + _send_msg(msg, addr); } catch (...) { this->recoverable_error(std::current_exception()); } }); } template template -inline void PeerNetwork::send_msg(const MsgType &msg, const NetAddr &paddr) { - return _send_msg(msg, paddr); +inline void PeerNetwork::send_msg(const MsgType &msg, const NetAddr &addr) { + return _send_msg(msg, addr); } template -inline void PeerNetwork::_send_msg(const Msg &msg, const NetAddr &paddr) { - auto p = get_peer(paddr); - if (!p) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - MsgNet::_send_msg(msg, p->conn); +inline void PeerNetwork::_send_msg(const Msg &msg, const NetAddr &addr) { + MsgNet::_send_msg(msg, _get_peer_conn(addr)); } template template -inline void PeerNetwork::multicast_msg(MsgType &&msg, const std::vector &paddrs) { - return _multicast_msg(MsgType(std::move(msg)), paddrs); +inline void PeerNetwork::multicast_msg(MsgType &&msg, const std::vector &addrs) { + return _multicast_msg(MsgType(std::move(msg)), addrs); } template -inline void PeerNetwork::_multicast_msg(Msg &&msg, const std::vector &paddrs) { +inline void PeerNetwork::_multicast_msg(Msg &&msg, const std::vector &addrs) { this->disp_tcall->async_call( - [this, msg=std::move(msg), paddrs](ThreadCall::Handle &) { + [this, msg=std::move(msg), addrs](ThreadCall::Handle &) { try { - for (auto &addr: paddrs) - { - auto p = get_peer(addr); - if (!p) - throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - MsgNet::_send_msg(msg, p->conn); - } + for (auto &addr: addrs) + MsgNet::_send_msg(msg, _get_peer_conn(addr)); } catch (const PeerNetworkError &) { this->recoverable_error(std::current_exception()); } catch (...) { this->recoverable_error(std::current_exception()); } @@ -987,8 +1063,7 @@ typedef enum msgnetwork_conn_mode_t { } msgnetwork_conn_mode_t; typedef enum peernetwork_id_mode_t { - ID_MODE_IP_BASED, - ID_MODE_IP_PORT_BASED + ID_MODE_ADDR_BASED } peernetwork_id_mode_t; #ifdef __cplusplus @@ -1052,19 +1127,19 @@ msgnetwork_config_t *peernetwork_config_as_msgnetwork_config(peernetwork_config_ peernetwork_t *peernetwork_new(const eventcontext_t *ec, const peernetwork_config_t *config, SalticidaeCError *err); void peernetwork_free(const peernetwork_t *self); -void peernetwork_add_peer(peernetwork_t *self, const netaddr_t *paddr); -void peernetwork_del_peer(peernetwork_t *self, const netaddr_t *paddr); -bool peernetwork_has_peer(const peernetwork_t *self, const netaddr_t *paddr); -const peernetwork_conn_t *peernetwork_get_peer_conn(const peernetwork_t *self, const netaddr_t *paddr, SalticidaeCError *cerror); +void peernetwork_add_peer(peernetwork_t *self, const netaddr_t *addr); +void peernetwork_del_peer(peernetwork_t *self, const netaddr_t *addr); +bool peernetwork_has_peer(const peernetwork_t *self, const netaddr_t *addr); +const peernetwork_conn_t *peernetwork_get_peer_conn(const peernetwork_t *self, const netaddr_t *addr, SalticidaeCError *cerror); msgnetwork_t *peernetwork_as_msgnetwork(peernetwork_t *self); peernetwork_t *msgnetwork_as_peernetwork_unsafe(msgnetwork_t *self); msgnetwork_conn_t *msgnetwork_conn_new_from_peernetwork_conn(const peernetwork_conn_t *conn); peernetwork_conn_t *peernetwork_conn_new_from_msgnetwork_conn_unsafe(const msgnetwork_conn_t *conn); peernetwork_conn_t *peernetwork_conn_copy(const peernetwork_conn_t *self); void peernetwork_conn_free(const peernetwork_conn_t *self); -void peernetwork_send_msg(peernetwork_t *self, const msg_t * msg, const netaddr_t *paddr); -void peernetwork_send_msg_deferred_by_move(peernetwork_t *self, msg_t * _moved_msg, const netaddr_t *paddr); -void peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const netaddr_array_t *paddrs); +void peernetwork_send_msg(peernetwork_t *self, const msg_t * msg, const netaddr_t *addr); +void peernetwork_send_msg_deferred_by_move(peernetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr); +void peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const netaddr_array_t *addrs); void peernetwork_listen(peernetwork_t *self, const netaddr_t *listen_addr, SalticidaeCError *err); typedef void (*msgnetwork_unknown_peer_callback_t)(const netaddr_t *, void *userdata); diff --git a/include/salticidae/stream.h b/include/salticidae/stream.h index 7d1456e..dc47792 100644 --- a/include/salticidae/stream.h +++ b/include/salticidae/stream.h @@ -243,6 +243,13 @@ class Blob { return !(*this == other); } + bool operator<(const Blob &other) const { + for (size_t i = _len - 1; i > 0; i--) + if (data[i] != other.data[i]) + return data[i] < other.data[i]; + return data[0] < other.data[0]; + } + size_t cheap_hash() const { return *data; } void serialize(DataStream &s) const { diff --git a/include/salticidae/util.h b/include/salticidae/util.h index 27f4e8e..5059f13 100644 --- a/include/salticidae/util.h +++ b/include/salticidae/util.h @@ -99,6 +99,7 @@ enum SalticidaeErrorCode { SALTI_ERROR_TLS_KEY_NOT_MATCH, SALTI_ERROR_TLS_NO_PEER_CERT, SALTI_ERROR_FD, + SALTI_ERROR_RAND_SOURCE, SALTI_ERROR_UNKNOWN }; -- cgit v1.2.3-70-g09d2