aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/salticidae/network.h174
1 files changed, 94 insertions, 80 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 598c326..0db69e4 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -389,20 +389,26 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
PeerId id;
NetAddr addr; /** remote address (if set) */
uint32_t my_nonce;
+ conn_t conn;
double retry_delay;
ssize_t ntry;
TimerEvent ev_retry_timer;
/** the underlying connection, may be invalid when connected = false */
- conn_t conn;
+ conn_t chosen_conn;
conn_t inbound_conn;
conn_t outbound_conn;
TimerEvent ev_ping_timer;
bool ping_timer_ok;
bool pong_msg_ok;
- bool connected;
+
+ enum State {
+ DISCONNECTED,
+ CONNECTED,
+ RESET
+ } state;
double ping_period;
@@ -411,13 +417,12 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
retry_delay(0), ntry(0),
ev_ping_timer(
TimerEvent(pn->disp_ec, std::bind(&Peer::ping_timer, this, _1))),
- connected(false),
+ state(DISCONNECTED),
ping_period(pn->ping_period) {}
Peer &operator=(const Peer &) = delete;
Peer(const Peer &) = delete;
- void update_conn(const conn_t &conn);
void reset_ping_timer();
void send_ping();
void ping_timer(TimerEvent &);
@@ -494,7 +499,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
void _ping_msg_cb(const conn_t &conn, uint16_t port);
void _pong_msg_cb(const conn_t &conn, uint16_t port);
void finish_handshake(Peer *peer);
- void replace_conn(const conn_t &conn);
+ void replace_pending_conn(const conn_t &conn);
conn_t start_active_conn(const NetAddr &addr);
static void tcall_reset_timeout(ConnPool::Worker *worker,
const conn_t &conn, double timeout);
@@ -724,14 +729,14 @@ void PeerNetwork<O, _, __>::on_setup(const ConnPool::conn_t &_conn) {
});
/* the initial ping-pong to set up the connection */
tcall_reset_timeout(worker, conn, conn_timeout);
- replace_conn(conn);
+ replace_pending_conn(conn);
if (conn->get_mode() == Conn::ConnMode::ACTIVE)
{
auto pid = get_peer_id(conn, conn->get_addr());
pinfo_slock_t _g(known_peers_lock);
send_msg(MsgPing(
listen_addr,
- known_peers.find(pid)->second->my_nonce), conn);
+ known_peers.find(pid)->second->get_nonce()), conn);
}
}
@@ -744,49 +749,29 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*conn).c_str());
auto p = conn->peer;
if (!p) return;
- assert(conn == p->conn);
- p->connected = false;
- p->conn = nullptr;
- p->inbound_conn = nullptr;
- p->outbound_conn = nullptr;
- p->ev_ping_timer.del();
- p->my_nonce = 0;
- this->user_tcall->async_call([this, conn](ThreadCall::Handle &) {
- if (peer_cb) peer_cb(conn, false);
- });
+ /* if this connect was the active peer connection */
+ bool reset = p->state == Peer::State::RESET;
+ if (p->conn == conn)
+ {
+ p->state = Peer::State::DISCONNECTED;
+ p->inbound_conn = nullptr;
+ p->outbound_conn = nullptr;
+ p->ev_ping_timer.del();
+ p->my_nonce = 0;
+ this->user_tcall->async_call([this, conn](ThreadCall::Handle &) {
+ if (peer_cb) peer_cb(conn, false);
+ });
+ }
/* auto retry the connection */
if (p->ntry > 0) p->ntry--;
if (p->ntry)
{
- p->ev_retry_timer = TimerEvent(this->disp_ec, [this, addr](TimerEvent &) {
+ p->ev_retry_timer = TimerEvent(this->disp_ec, [this, addr=p->addr, p](TimerEvent &) {
try {
- start_active_conn(addr);
+ start_active_conn(addr)->peer = p;
} catch (...) { this->disp_error_cb(std::current_exception()); }
});
- p->ev_retry_timer.add(gen_rand_timeout(p->retry_delay));
- }
-}
-
-template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Peer::update_conn(const conn_t &new_conn) {
- if (conn != new_conn)
- {
- if (conn)
- {
- conn->peer = nullptr;
- if (conn->is_terminated())
- {
- for (;;)
- {
- bytearray_t buff_seg = conn->send_buffer.move_pop();
- if (!buff_seg.size()) break;
- new_conn->write(std::move(buff_seg));
- }
- }
- else
- conn->get_net()->disp_terminate(conn);
- }
- conn = new_conn;
+ p->ev_retry_timer.add(reset ? 0 : gen_rand_timeout(p->retry_delay));
}
}
@@ -799,11 +784,11 @@ void PeerNetwork<O, _, __>::Peer::reset_ping_timer() {
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::Peer::send_ping() {
- auto pn = conn->get_net();
+ auto pn = chosen_conn->get_net();
ping_timer_ok = false;
pong_msg_ok = false;
- tcall_reset_timeout(conn->worker, conn, pn->conn_timeout);
- pn->send_msg(MsgPing(), conn);
+ tcall_reset_timeout(chosen_conn->worker, chosen_conn, pn->conn_timeout);
+ pn->send_msg(MsgPing(), chosen_conn);
}
template<typename O, O _, O __>
@@ -818,17 +803,32 @@ void PeerNetwork<O, _, __>::Peer::ping_timer(TimerEvent &) {
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::finish_handshake(Peer *p) {
- if (p->connected) return;
+ assert(p->state == Peer::State::DISCONNECTED);
p->clear_all_events();
- if (p->inbound_conn && p->inbound_conn != p->conn)
+ if (p->inbound_conn && p->inbound_conn != p->chosen_conn)
p->inbound_conn->peer = nullptr;
- if (p->outbound_conn && p->outbound_conn != p->conn)
+ if (p->outbound_conn && p->outbound_conn != p->chosen_conn)
p->outbound_conn->peer = nullptr;
- p->conn->peer = p;
- p->connected = true;
+ p->state = Peer::State::CONNECTED;
p->reset_ping_timer();
p->send_ping();
p->ev_retry_timer.del();
+ auto &old_conn = p->conn;
+ auto &new_conn = p->chosen_conn;
+ if (old_conn)
+ {
+ /* there is some previously terminated connection */
+ assert(p->conn->is_terminated());
+ for (;;)
+ {
+ bytearray_t buff_seg = old_conn->send_buffer.move_pop();
+ if (!buff_seg.size()) break;
+ new_conn->write(std::move(buff_seg));
+ }
+ old_conn->peer = nullptr;
+ }
+ old_conn = new_conn;
+ new_conn->peer = p;
this->user_tcall->async_call([this, conn=p->conn](ThreadCall::Handle &) {
if (peer_cb) peer_cb(conn, true);
});
@@ -849,7 +849,7 @@ void PeerNetwork<O, _, __>::finish_handshake(Peer *p) {
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::replace_conn(const conn_t &conn) {
+void PeerNetwork<O, _, __>::replace_pending_conn(const conn_t &conn) {
const auto &addr = conn->get_addr();
auto it = pending_peers.find(addr);
if (it != pending_peers.end())
@@ -867,7 +867,7 @@ void PeerNetwork<O, _, __>::replace_conn(const conn_t &conn) {
template<typename O, O _, O __>
typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) {
auto conn = static_pointer_cast<Conn>(MsgNet::_connect(addr));
- replace_conn(conn);
+ replace_pending_conn(conn);
return conn;
}
@@ -886,11 +886,11 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) {
this->disp_tcall->async_call([this, conn, msg=std::move(msg)](ThreadCall::Handle &) {
try {
if (conn->is_terminated()) return;
- if (!msg.claimed_addr.is_null())
+ if (!msg.claimed_addr.is_null()) /* handshake ping */
{
- auto pid = get_peer_id(conn, msg.claimed_addr);
if (conn->get_mode() == Conn::ConnMode::PASSIVE)
{
+ auto pid = get_peer_id(conn, msg.claimed_addr);
pinfo_slock_t _g(known_peers_lock);
auto pit = known_peers.find(pid);
if (pit == known_peers.end())
@@ -902,37 +902,41 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) {
return;
}
auto &p = pit->second;
- if (p->connected) return;
+ if (p->state != Peer::State::DISCONNECTED ||
+ (!p->addr.is_null() && p->addr != msg.claimed_addr)) return;
SALTICIDAE_LOG_INFO("%s inbound handshake from %s",
std::string(listen_addr).c_str(),
std::string(*conn).c_str());
send_msg(MsgPong(
listen_addr,
- p->addr.is_null() ? passive_nonce : p->my_nonce), conn);
+ p->addr.is_null() ? passive_nonce : p->get_nonce()), conn);
auto &old_conn = p->inbound_conn;
if (old_conn && !old_conn->is_terminated())
{
- SALTICIDAE_LOG_DEBUG("%s terminating old connection %s",
+ SALTICIDAE_LOG_DEBUG("%s terminating stale handshake connection %s",
std::string(listen_addr).c_str(),
std::string(*old_conn).c_str());
assert(old_conn->peer == nullptr);
this->disp_terminate(old_conn);
}
old_conn = conn;
- if (msg.nonce < p->my_nonce)
- p->update_conn(conn);
+ if (msg.nonce < p->get_nonce() || p->addr.is_null())
+ {
+ SALTICIDAE_LOG_DEBUG("connection %s chosen", std::string(*conn).c_str());
+ p->chosen_conn = conn;
+ finish_handshake(p.get());
+ }
else
{
+ SALTICIDAE_LOG_DEBUG("%04x >= %04x, terminating", msg.nonce, p->get_nonce());
this->disp_terminate(conn);
- return;
}
- finish_handshake(p.get());
}
else
SALTICIDAE_LOG_WARN("unexpected inbound handshake from %s",
std::string(*conn).c_str());
}
- else
+ else /* heartbeat ping */
{
SALTICIDAE_LOG_INFO("ping from %s", std::string(*conn).c_str());
send_msg(MsgPong(), conn);
@@ -946,7 +950,7 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) {
this->disp_tcall->async_call([this, conn, msg=std::move(msg)](ThreadCall::Handle &) {
try {
if (conn->is_terminated()) return;
- if (!msg.claimed_addr.is_null())
+ if (!msg.claimed_addr.is_null()) /* handshake pong */
{
if (conn->get_mode() == Conn::ConnMode::ACTIVE)
{
@@ -955,46 +959,48 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) {
auto pit = known_peers.find(pid);
if (pit == known_peers.end())
{
+ SALTICIDAE_LOG_WARN("unexpected pong from an unknown peer");
this->disp_terminate(conn);
return;
}
auto &p = pit->second;
- if (p->connected) return;
+ assert(!p->addr.is_null() && p->addr == conn->get_addr());
+ if (p->state != Peer::State::DISCONNECTED ||
+ p->addr != msg.claimed_addr) return;
SALTICIDAE_LOG_INFO("%s outbound handshake to %s",
std::string(listen_addr).c_str(),
std::string(*conn).c_str());
auto &old_conn = p->outbound_conn;
if (old_conn && !old_conn->is_terminated())
{
- SALTICIDAE_LOG_DEBUG("%s terminating old connection %s",
+ SALTICIDAE_LOG_DEBUG("%s terminating stale handshake connection %s",
std::string(listen_addr).c_str(),
std::string(*old_conn).c_str());
assert(old_conn->peer == nullptr);
old_conn->get_net()->disp_terminate(old_conn);
}
old_conn = conn;
- assert(!p->addr.is_null());
- if (p->my_nonce < msg.nonce)
- p->update_conn(conn);
+ if (p->get_nonce() < msg.nonce)
+ {
+ SALTICIDAE_LOG_DEBUG("connection %s chosen", std::string(*conn).c_str());
+ p->chosen_conn = conn;
+ p->reset_ping_timer();
+ finish_handshake(p.get());
+ }
else
{
+ SALTICIDAE_LOG_DEBUG(
+ "%04x >= %04x, terminating and resetting",
+ p->get_nonce(), msg.nonce);
p->my_nonce = 0;
this->disp_terminate(conn);
- return;
}
- auto &peer_addr = conn->get_addr();
- auto &old_peer_addr = p->addr;
- if (!old_peer_addr.is_null() && old_peer_addr != peer_addr)
- SALTICIDAE_LOG_WARN("multiple peer addresses share the same identity");
- old_peer_addr = peer_addr;
- p->reset_ping_timer();
- finish_handshake(p.get());
}
else
SALTICIDAE_LOG_WARN("unexpected outbound handshake from %s",
std::string(*conn).c_str());
}
- else
+ else /* heartbeat pong */
{
auto p = conn->peer;
if (!p)
@@ -1051,12 +1057,19 @@ int32_t PeerNetwork<O, _, __>::conn_peer(const PeerId &pid, ssize_t ntry, double
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_READY);
p->ntry = ntry;
p->retry_delay = retry_delay;
- p->connected = false;
p->inbound_conn = nullptr;
p->outbound_conn = nullptr;
p->ev_ping_timer.del();
p->my_nonce = 0;
- p->update_conn(ntry ? start_active_conn(p->addr) : conn_t());
+ /* has to terminate established connection *before* making the next
+ * attempt */
+ if (!p->conn || p->state == Peer::State::DISCONNECTED)
+ start_active_conn(p->addr);
+ else if (p->state == Peer::State::CONNECTED)
+ {
+ p->state = Peer::State::RESET;
+ this->disp_terminate(p->conn);
+ }
} catch (const PeerNetworkError &) {
this->recoverable_error(std::current_exception(), id);
} catch (...) { this->disp_error_cb(std::current_exception()); }
@@ -1092,6 +1105,7 @@ int32_t PeerNetwork<O, _, __>::del_peer(const PeerId &pid) {
if (it == known_peers.end())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
auto addr = it->second->addr;
+ this->disp_terminate(it->second->conn);
known_peers.erase(it);
auto it2 = pending_peers.find(addr);
if (it2 != pending_peers.end())