From 85b9affbce70ac4b5922158802b227a42c42a203 Mon Sep 17 00:00:00 2001 From: Determinant Date: Mon, 1 Jul 2019 18:36:41 -0400 Subject: preserve outgoing messages with best effort --- include/salticidae/conn.h | 1 + include/salticidae/network.h | 159 +++++++++++++++++++++++++++---------------- 2 files changed, 101 insertions(+), 59 deletions(-) diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 316d9cf..7f74a87 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -513,6 +513,7 @@ class ConnPool { { auto &conn = it.second; conn->stop(); + conn->set_terminated(); release_conn(conn); } } diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 4725b95..c59bbb3 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -271,13 +271,13 @@ class PeerNetwork: public MsgNetwork { }; private: - struct Peer; + struct PeerConn; public: class Conn: public MsgNet::Conn { friend PeerNetwork; - Peer *peer; - BoxObj _dead_peer; + PeerConn *peer; + BoxObj _dead_peer; TimerEvent ev_timeout; void reset_timeout(double timeout); @@ -308,7 +308,7 @@ class PeerNetwork: public MsgNetwork { using unknown_peer_callback_t = std::function; private: - class Peer { + class PeerConn { friend PeerNetwork; /** connection addr, may be different due to passive mode */ uint256_t peer_id; @@ -326,8 +326,8 @@ class PeerNetwork: public MsgNetwork { bool inbound_handshake; double ping_period; - Peer() = delete; - Peer(const uint256_t &peer_id, + PeerConn() = delete; + PeerConn(const uint256_t &peer_id, conn_t conn, conn_t inbound_conn, conn_t outbound_conn, const PeerNetwork *pn): peer_id(peer_id), @@ -335,13 +335,13 @@ class PeerNetwork: public MsgNetwork { inbound_conn(inbound_conn), outbound_conn(outbound_conn), ev_ping_timer( - TimerEvent(pn->disp_ec, std::bind(&Peer::ping_timer, this, _1))), + TimerEvent(pn->disp_ec, std::bind(&PeerConn::ping_timer, this, _1))), connected(false), outbound_handshake(false), inbound_handshake(false), ping_period(pn->ping_period) {} - Peer &operator=(const Peer &) = delete; - Peer(const Peer &) = delete; + PeerConn &operator=(const PeerConn &) = delete; + PeerConn(const PeerConn &) = delete; void reset_ping_timer(); void send_ping(); @@ -351,15 +351,24 @@ class PeerNetwork: public MsgNetwork { ev_ping_timer.del(); } public: - ~Peer() { + ~PeerConn() { if (inbound_conn) inbound_conn->peer = nullptr; if (outbound_conn) outbound_conn->peer = nullptr; } }; + struct Peer { + uint256_t peer_id; + TimerEvent ev_retry_timer; + conn_t peer_conn; + + Peer(const conn_t &conn): + peer_id(), ev_retry_timer(), peer_conn(conn) {} + }; + std::unordered_map pending_peers; - std::unordered_map> known_peers; - std::unordered_map> pid2peer; + std::unordered_map> known_peers; + std::unordered_map> pid2peer; using pinfo_slock_t = std::shared_lock; using pinfo_ulock_t = std::shared_lock; @@ -407,9 +416,10 @@ class PeerNetwork: public MsgNetwork { void pong_handler(MsgPong &&msg, const conn_t &conn); void _ping_msg_cb(const conn_t &conn, uint16_t port); void _pong_msg_cb(const conn_t &conn, uint16_t port); - bool check_handshake(Peer *peer); + void check_handshake(PeerConn *peer); + void move_peer_buffer(conn_t &old_conn, const conn_t &new_conn); void replace_conn(const conn_t &conn); - void start_active_conn(const NetAddr &addr); + conn_t start_active_conn(const NetAddr &addr); static void tcall_reset_timeout(ConnPool::Worker *worker, const conn_t &conn, double timeout); inline conn_t _get_peer_conn(const NetAddr &addr) const; @@ -636,6 +646,18 @@ void PeerNetwork::on_teardown(const ConnPool::conn_t &_conn) { pinfo_ulock_t _g(known_peers_lock); auto it = known_peers.find(addr); if (it == known_peers.end()) return; + auto &pp = it->second; + if (conn == pp->peer_conn) + { + pinfo_ulock_t __g(pid2peer_lock); + auto it2 = pid2peer.find(pp->peer_id); + if (it2 != pid2peer.end()) + { + auto &p = it2->second; + if (p->connected && p->conn != conn) + move_peer_buffer(pp->peer_conn, p->conn); + } + } if (p) { if (conn != p->conn) return; @@ -645,8 +667,8 @@ void PeerNetwork::on_teardown(const ConnPool::conn_t &_conn) { p->connected = false; p->outbound_handshake = false; p->inbound_handshake = false; - known_peers[p->peer_addr] = std::make_pair(uint256_t(), TimerEvent()); - Peer *peer = nullptr; + pp->peer_id = uint256_t(); + PeerConn *peer = nullptr; { pinfo_ulock_t __g(pid2peer_lock); auto it2 = pid2peer.find(p->peer_id); @@ -661,9 +683,9 @@ void PeerNetwork::on_teardown(const ConnPool::conn_t &_conn) { } else { - if (!it->second.first.is_null()) return; + if (!it->second->peer_id.is_null()) return; } - auto &ev_retry_timer = it->second.second; + auto &ev_retry_timer = it->second->ev_retry_timer; ev_retry_timer = TimerEvent(this->disp_ec, [this, addr](TimerEvent &) { try { start_active_conn(addr); @@ -673,14 +695,14 @@ void PeerNetwork::on_teardown(const ConnPool::conn_t &_conn) { } template -void PeerNetwork::Peer::reset_ping_timer() { +void PeerNetwork::PeerConn::reset_ping_timer() { assert(ev_ping_timer); ev_ping_timer.del(); ev_ping_timer.add(gen_rand_timeout(ping_period)); } template -void PeerNetwork::Peer::send_ping() { +void PeerNetwork::PeerConn::send_ping() { auto pn = conn->get_net(); ping_timer_ok = false; pong_msg_ok = false; @@ -689,7 +711,7 @@ void PeerNetwork::Peer::send_ping() { } template -void PeerNetwork::Peer::ping_timer(TimerEvent &) { +void PeerNetwork::PeerConn::ping_timer(TimerEvent &) { ping_timer_ok = true; if (pong_msg_ok) { @@ -699,10 +721,25 @@ void PeerNetwork::Peer::ping_timer(TimerEvent &) { } template -bool PeerNetwork::check_handshake(Peer *p) { +void PeerNetwork::move_peer_buffer(conn_t &old_conn, const conn_t &new_conn) { + assert(old_conn->is_terminated() && old_conn != new_conn); + for (;;) + { + bytearray_t buff_seg = old_conn->send_buffer.move_pop(); + if (!buff_seg.size()) break; + new_conn->write(std::move(buff_seg)); + } + old_conn = new_conn; + this->user_tcall->async_call([this, new_conn](ThreadCall::Handle &) { + if (peer_cb) peer_cb(new_conn, true); + }); +} + +template +void PeerNetwork::check_handshake(PeerConn *p) { if (!(p->inbound_handshake && p->outbound_handshake) || p->connected) - return false; + return; p->clear_all_events(); if (p->inbound_conn && p->inbound_conn != p->conn) p->inbound_conn->peer = nullptr; @@ -714,29 +751,33 @@ bool PeerNetwork::check_handshake(Peer *p) { p->send_ping(); { pinfo_ulock_t _g(known_peers_lock); - known_peers[p->peer_addr] = std::make_pair(p->peer_id, TimerEvent()); + auto &pp = known_peers[p->peer_addr]; + pp->peer_id = p->peer_id; + pp->ev_retry_timer.del(); + auto &old_conn = pp->peer_conn; + auto &conn = p->conn; + if (old_conn != conn) + { + if (old_conn->is_terminated()) + move_peer_buffer(old_conn, conn); + else + this->disp_terminate(old_conn); + } } pending_peers.erase(p->conn->get_addr()); - if (p->connected) + auto color_begin = ""; + auto color_end = ""; + if (logger.is_tty()) { - auto color_begin = ""; - auto color_end = ""; - if (logger.is_tty()) - { - color_begin = TTY_COLOR_BLUE; - color_end = TTY_COLOR_RESET; - } - SALTICIDAE_LOG_INFO("%sPeerNetwork: established connection %s <-> %s via %s", - color_begin, - std::string(listen_addr).c_str(), - std::string(p->peer_addr).c_str(), - std::string(*(p->conn)).c_str(), - color_end); + color_begin = TTY_COLOR_BLUE; + color_end = TTY_COLOR_RESET; } - this->user_tcall->async_call([this, conn=p->conn](ThreadCall::Handle &) { - if (peer_cb) peer_cb(conn, true); - }); - return true; + SALTICIDAE_LOG_INFO("%sPeerNetwork: established connection %s <-> %s via %s", + color_begin, + std::string(listen_addr).c_str(), + std::string(p->peer_addr).c_str(), + std::string(*(p->conn)).c_str(), + color_end); } template @@ -756,9 +797,10 @@ void PeerNetwork::replace_conn(const conn_t &conn) { } template -void PeerNetwork::start_active_conn(const NetAddr &addr) { +typename PeerNetwork::conn_t PeerNetwork::start_active_conn(const NetAddr &addr) { auto conn = static_pointer_cast(MsgNet::_connect(addr)); replace_conn(conn); + return conn; } template @@ -766,12 +808,7 @@ inline typename PeerNetwork::conn_t PeerNetwork::_get_peer_c auto it = known_peers.find(addr); if (it == known_peers.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - const auto &peer_id = it->second.first; - if (peer_id.is_null()) - throw PeerNetworkError(SALTI_ERROR_PEER_NOT_READY); - auto it2 = pid2peer.find(peer_id); - assert(it2 != pid2peer.end()); - return it2->second->conn; + return it->second->peer_conn; } /* end: functions invoked by the dispatcher */ @@ -827,7 +864,7 @@ void PeerNetwork::ping_handler(MsgPing &&msg, const conn_t &conn) { else { it = pid2peer.insert(std::make_pair(peer_id, - new Peer(peer_id, conn, conn, nullptr, this))).first; + new PeerConn(peer_id, conn, conn, nullptr, this))).first; } auto p = it->second.get(); p->inbound_handshake = true; @@ -895,7 +932,7 @@ void PeerNetwork::pong_handler(MsgPong &&msg, const conn_t &conn) { else { it = pid2peer.insert(std::make_pair(peer_id, - new Peer(peer_id, conn, nullptr, conn, this))).first; + new PeerConn(peer_id, conn, nullptr, conn, this))).first; } auto p = it->second.get(); p->outbound_handshake = true; @@ -960,11 +997,15 @@ void PeerNetwork::add_peer(const NetAddr &addr) { this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) { try { pinfo_ulock_t _g(known_peers_lock); - if (!known_peers.insert(std::make_pair(addr, - std::make_pair(uint256_t(), TimerEvent()))).second) + if (known_peers.count(addr)) throw PeerNetworkError(SALTI_ERROR_PEER_ALREADY_EXISTS); - if (!pending_peers.count(addr)) - start_active_conn(addr); + auto it = pending_peers.find(addr); + conn_t conn; + if (it == pending_peers.end()) + conn = start_active_conn(addr); + else + conn = it->second; + known_peers.insert(std::make_pair(addr, new Peer(conn))); } catch (const PeerNetworkError &) { this->recoverable_error(std::current_exception()); } catch (...) { this->disp_error_cb(std::current_exception()); } @@ -980,7 +1021,7 @@ void PeerNetwork::del_peer(const NetAddr &addr) { auto it = known_peers.find(addr); if (it == known_peers.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - auto peer_id = it->second.first; + auto peer_id = it->second->peer_id; known_peers.erase(it); auto it2 = pending_peers.find(addr); if (it2 != pending_peers.end()) @@ -992,7 +1033,7 @@ void PeerNetwork::del_peer(const NetAddr &addr) { auto it3 = pid2peer.find(peer_id); if (it3 != pid2peer.end()) { - auto p = it3->second.get(); + auto &p = it3->second; this->disp_terminate(p->conn); pid2peer.erase(it3); } @@ -1015,12 +1056,12 @@ PeerNetwork::get_peer_conn(const NetAddr &addr) const { auto it = known_peers.find(addr); if (it == known_peers.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - if (it->second.first.is_null()) + if (it->second->peer_id.is_null()) { conn = nullptr; return; } - auto it2 = pid2peer.find(it->second.first); + auto it2 = pid2peer.find(it->second->peer_id); assert(it2 != pid2peer.end()); conn = it2->second->conn; } catch (const PeerNetworkError &) { -- cgit v1.2.3