diff options
-rw-r--r-- | include/salticidae/network.h | 174 |
1 files changed, 94 insertions, 80 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 598c326..0db69e4 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -389,20 +389,26 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { PeerId id; NetAddr addr; /** remote address (if set) */ uint32_t my_nonce; + conn_t conn; double retry_delay; ssize_t ntry; TimerEvent ev_retry_timer; /** the underlying connection, may be invalid when connected = false */ - conn_t conn; + conn_t chosen_conn; conn_t inbound_conn; conn_t outbound_conn; TimerEvent ev_ping_timer; bool ping_timer_ok; bool pong_msg_ok; - bool connected; + + enum State { + DISCONNECTED, + CONNECTED, + RESET + } state; double ping_period; @@ -411,13 +417,12 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { retry_delay(0), ntry(0), ev_ping_timer( TimerEvent(pn->disp_ec, std::bind(&Peer::ping_timer, this, _1))), - connected(false), + state(DISCONNECTED), ping_period(pn->ping_period) {} Peer &operator=(const Peer &) = delete; Peer(const Peer &) = delete; - void update_conn(const conn_t &conn); void reset_ping_timer(); void send_ping(); void ping_timer(TimerEvent &); @@ -494,7 +499,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { void _ping_msg_cb(const conn_t &conn, uint16_t port); void _pong_msg_cb(const conn_t &conn, uint16_t port); void finish_handshake(Peer *peer); - void replace_conn(const conn_t &conn); + void replace_pending_conn(const conn_t &conn); conn_t start_active_conn(const NetAddr &addr); static void tcall_reset_timeout(ConnPool::Worker *worker, const conn_t &conn, double timeout); @@ -724,14 +729,14 @@ void PeerNetwork<O, _, __>::on_setup(const ConnPool::conn_t &_conn) { }); /* the initial ping-pong to set up the connection */ tcall_reset_timeout(worker, conn, conn_timeout); - replace_conn(conn); + replace_pending_conn(conn); if (conn->get_mode() == Conn::ConnMode::ACTIVE) { auto pid = get_peer_id(conn, conn->get_addr()); pinfo_slock_t _g(known_peers_lock); send_msg(MsgPing( listen_addr, - known_peers.find(pid)->second->my_nonce), conn); + known_peers.find(pid)->second->get_nonce()), conn); } } @@ -744,49 +749,29 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*conn).c_str()); auto p = conn->peer; if (!p) return; - assert(conn == p->conn); - p->connected = false; - p->conn = nullptr; - p->inbound_conn = nullptr; - p->outbound_conn = nullptr; - p->ev_ping_timer.del(); - p->my_nonce = 0; - this->user_tcall->async_call([this, conn](ThreadCall::Handle &) { - if (peer_cb) peer_cb(conn, false); - }); + /* if this connect was the active peer connection */ + bool reset = p->state == Peer::State::RESET; + if (p->conn == conn) + { + p->state = Peer::State::DISCONNECTED; + p->inbound_conn = nullptr; + p->outbound_conn = nullptr; + p->ev_ping_timer.del(); + p->my_nonce = 0; + this->user_tcall->async_call([this, conn](ThreadCall::Handle &) { + if (peer_cb) peer_cb(conn, false); + }); + } /* auto retry the connection */ if (p->ntry > 0) p->ntry--; if (p->ntry) { - p->ev_retry_timer = TimerEvent(this->disp_ec, [this, addr](TimerEvent &) { + p->ev_retry_timer = TimerEvent(this->disp_ec, [this, addr=p->addr, p](TimerEvent &) { try { - start_active_conn(addr); + start_active_conn(addr)->peer = p; } catch (...) { this->disp_error_cb(std::current_exception()); } }); - p->ev_retry_timer.add(gen_rand_timeout(p->retry_delay)); - } -} - -template<typename O, O _, O __> -void PeerNetwork<O, _, __>::Peer::update_conn(const conn_t &new_conn) { - if (conn != new_conn) - { - if (conn) - { - conn->peer = nullptr; - if (conn->is_terminated()) - { - for (;;) - { - bytearray_t buff_seg = conn->send_buffer.move_pop(); - if (!buff_seg.size()) break; - new_conn->write(std::move(buff_seg)); - } - } - else - conn->get_net()->disp_terminate(conn); - } - conn = new_conn; + p->ev_retry_timer.add(reset ? 0 : gen_rand_timeout(p->retry_delay)); } } @@ -799,11 +784,11 @@ void PeerNetwork<O, _, __>::Peer::reset_ping_timer() { template<typename O, O _, O __> void PeerNetwork<O, _, __>::Peer::send_ping() { - auto pn = conn->get_net(); + auto pn = chosen_conn->get_net(); ping_timer_ok = false; pong_msg_ok = false; - tcall_reset_timeout(conn->worker, conn, pn->conn_timeout); - pn->send_msg(MsgPing(), conn); + tcall_reset_timeout(chosen_conn->worker, chosen_conn, pn->conn_timeout); + pn->send_msg(MsgPing(), chosen_conn); } template<typename O, O _, O __> @@ -818,17 +803,32 @@ void PeerNetwork<O, _, __>::Peer::ping_timer(TimerEvent &) { template<typename O, O _, O __> void PeerNetwork<O, _, __>::finish_handshake(Peer *p) { - if (p->connected) return; + assert(p->state == Peer::State::DISCONNECTED); p->clear_all_events(); - if (p->inbound_conn && p->inbound_conn != p->conn) + if (p->inbound_conn && p->inbound_conn != p->chosen_conn) p->inbound_conn->peer = nullptr; - if (p->outbound_conn && p->outbound_conn != p->conn) + if (p->outbound_conn && p->outbound_conn != p->chosen_conn) p->outbound_conn->peer = nullptr; - p->conn->peer = p; - p->connected = true; + p->state = Peer::State::CONNECTED; p->reset_ping_timer(); p->send_ping(); p->ev_retry_timer.del(); + auto &old_conn = p->conn; + auto &new_conn = p->chosen_conn; + if (old_conn) + { + /* there is some previously terminated connection */ + assert(p->conn->is_terminated()); + for (;;) + { + bytearray_t buff_seg = old_conn->send_buffer.move_pop(); + if (!buff_seg.size()) break; + new_conn->write(std::move(buff_seg)); + } + old_conn->peer = nullptr; + } + old_conn = new_conn; + new_conn->peer = p; this->user_tcall->async_call([this, conn=p->conn](ThreadCall::Handle &) { if (peer_cb) peer_cb(conn, true); }); @@ -849,7 +849,7 @@ void PeerNetwork<O, _, __>::finish_handshake(Peer *p) { } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::replace_conn(const conn_t &conn) { +void PeerNetwork<O, _, __>::replace_pending_conn(const conn_t &conn) { const auto &addr = conn->get_addr(); auto it = pending_peers.find(addr); if (it != pending_peers.end()) @@ -867,7 +867,7 @@ void PeerNetwork<O, _, __>::replace_conn(const conn_t &conn) { template<typename O, O _, O __> typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) { auto conn = static_pointer_cast<Conn>(MsgNet::_connect(addr)); - replace_conn(conn); + replace_pending_conn(conn); return conn; } @@ -886,11 +886,11 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) { this->disp_tcall->async_call([this, conn, msg=std::move(msg)](ThreadCall::Handle &) { try { if (conn->is_terminated()) return; - if (!msg.claimed_addr.is_null()) + if (!msg.claimed_addr.is_null()) /* handshake ping */ { - auto pid = get_peer_id(conn, msg.claimed_addr); if (conn->get_mode() == Conn::ConnMode::PASSIVE) { + auto pid = get_peer_id(conn, msg.claimed_addr); pinfo_slock_t _g(known_peers_lock); auto pit = known_peers.find(pid); if (pit == known_peers.end()) @@ -902,37 +902,41 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) { return; } auto &p = pit->second; - if (p->connected) return; + if (p->state != Peer::State::DISCONNECTED || + (!p->addr.is_null() && p->addr != msg.claimed_addr)) return; SALTICIDAE_LOG_INFO("%s inbound handshake from %s", std::string(listen_addr).c_str(), std::string(*conn).c_str()); send_msg(MsgPong( listen_addr, - p->addr.is_null() ? passive_nonce : p->my_nonce), conn); + p->addr.is_null() ? passive_nonce : p->get_nonce()), conn); auto &old_conn = p->inbound_conn; if (old_conn && !old_conn->is_terminated()) { - SALTICIDAE_LOG_DEBUG("%s terminating old connection %s", + SALTICIDAE_LOG_DEBUG("%s terminating stale handshake connection %s", std::string(listen_addr).c_str(), std::string(*old_conn).c_str()); assert(old_conn->peer == nullptr); this->disp_terminate(old_conn); } old_conn = conn; - if (msg.nonce < p->my_nonce) - p->update_conn(conn); + if (msg.nonce < p->get_nonce() || p->addr.is_null()) + { + SALTICIDAE_LOG_DEBUG("connection %s chosen", std::string(*conn).c_str()); + p->chosen_conn = conn; + finish_handshake(p.get()); + } else { + SALTICIDAE_LOG_DEBUG("%04x >= %04x, terminating", msg.nonce, p->get_nonce()); this->disp_terminate(conn); - return; } - finish_handshake(p.get()); } else SALTICIDAE_LOG_WARN("unexpected inbound handshake from %s", std::string(*conn).c_str()); } - else + else /* heartbeat ping */ { SALTICIDAE_LOG_INFO("ping from %s", std::string(*conn).c_str()); send_msg(MsgPong(), conn); @@ -946,7 +950,7 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) { this->disp_tcall->async_call([this, conn, msg=std::move(msg)](ThreadCall::Handle &) { try { if (conn->is_terminated()) return; - if (!msg.claimed_addr.is_null()) + if (!msg.claimed_addr.is_null()) /* handshake pong */ { if (conn->get_mode() == Conn::ConnMode::ACTIVE) { @@ -955,46 +959,48 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) { auto pit = known_peers.find(pid); if (pit == known_peers.end()) { + SALTICIDAE_LOG_WARN("unexpected pong from an unknown peer"); this->disp_terminate(conn); return; } auto &p = pit->second; - if (p->connected) return; + assert(!p->addr.is_null() && p->addr == conn->get_addr()); + if (p->state != Peer::State::DISCONNECTED || + p->addr != msg.claimed_addr) return; SALTICIDAE_LOG_INFO("%s outbound handshake to %s", std::string(listen_addr).c_str(), std::string(*conn).c_str()); auto &old_conn = p->outbound_conn; if (old_conn && !old_conn->is_terminated()) { - SALTICIDAE_LOG_DEBUG("%s terminating old connection %s", + SALTICIDAE_LOG_DEBUG("%s terminating stale handshake connection %s", std::string(listen_addr).c_str(), std::string(*old_conn).c_str()); assert(old_conn->peer == nullptr); old_conn->get_net()->disp_terminate(old_conn); } old_conn = conn; - assert(!p->addr.is_null()); - if (p->my_nonce < msg.nonce) - p->update_conn(conn); + if (p->get_nonce() < msg.nonce) + { + SALTICIDAE_LOG_DEBUG("connection %s chosen", std::string(*conn).c_str()); + p->chosen_conn = conn; + p->reset_ping_timer(); + finish_handshake(p.get()); + } else { + SALTICIDAE_LOG_DEBUG( + "%04x >= %04x, terminating and resetting", + p->get_nonce(), msg.nonce); p->my_nonce = 0; this->disp_terminate(conn); - return; } - auto &peer_addr = conn->get_addr(); - auto &old_peer_addr = p->addr; - if (!old_peer_addr.is_null() && old_peer_addr != peer_addr) - SALTICIDAE_LOG_WARN("multiple peer addresses share the same identity"); - old_peer_addr = peer_addr; - p->reset_ping_timer(); - finish_handshake(p.get()); } else SALTICIDAE_LOG_WARN("unexpected outbound handshake from %s", std::string(*conn).c_str()); } - else + else /* heartbeat pong */ { auto p = conn->peer; if (!p) @@ -1051,12 +1057,19 @@ int32_t PeerNetwork<O, _, __>::conn_peer(const PeerId &pid, ssize_t ntry, double throw PeerNetworkError(SALTI_ERROR_PEER_NOT_READY); p->ntry = ntry; p->retry_delay = retry_delay; - p->connected = false; p->inbound_conn = nullptr; p->outbound_conn = nullptr; p->ev_ping_timer.del(); p->my_nonce = 0; - p->update_conn(ntry ? start_active_conn(p->addr) : conn_t()); + /* has to terminate established connection *before* making the next + * attempt */ + if (!p->conn || p->state == Peer::State::DISCONNECTED) + start_active_conn(p->addr); + else if (p->state == Peer::State::CONNECTED) + { + p->state = Peer::State::RESET; + this->disp_terminate(p->conn); + } } catch (const PeerNetworkError &) { this->recoverable_error(std::current_exception(), id); } catch (...) { this->disp_error_cb(std::current_exception()); } @@ -1092,6 +1105,7 @@ int32_t PeerNetwork<O, _, __>::del_peer(const PeerId &pid) { if (it == known_peers.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); auto addr = it->second->addr; + this->disp_terminate(it->second->conn); known_peers.erase(it); auto it2 = pending_peers.find(addr); if (it2 != pending_peers.end()) |