aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/salticidae/conn.h1
-rw-r--r--include/salticidae/network.h159
2 files changed, 101 insertions, 59 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 316d9cf..7f74a87 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -513,6 +513,7 @@ class ConnPool {
{
auto &conn = it.second;
conn->stop();
+ conn->set_terminated();
release_conn(conn);
}
}
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 4725b95..c59bbb3 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -271,13 +271,13 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
};
private:
- struct Peer;
+ struct PeerConn;
public:
class Conn: public MsgNet::Conn {
friend PeerNetwork;
- Peer *peer;
- BoxObj<Peer> _dead_peer;
+ PeerConn *peer;
+ BoxObj<PeerConn> _dead_peer;
TimerEvent ev_timeout;
void reset_timeout(double timeout);
@@ -308,7 +308,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
using unknown_peer_callback_t = std::function<void(const NetAddr &claimed_addr, const X509 *cert)>;
private:
- class Peer {
+ class PeerConn {
friend PeerNetwork;
/** connection addr, may be different due to passive mode */
uint256_t peer_id;
@@ -326,8 +326,8 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
bool inbound_handshake;
double ping_period;
- Peer() = delete;
- Peer(const uint256_t &peer_id,
+ PeerConn() = delete;
+ PeerConn(const uint256_t &peer_id,
conn_t conn, conn_t inbound_conn, conn_t outbound_conn,
const PeerNetwork *pn):
peer_id(peer_id),
@@ -335,13 +335,13 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
inbound_conn(inbound_conn),
outbound_conn(outbound_conn),
ev_ping_timer(
- TimerEvent(pn->disp_ec, std::bind(&Peer::ping_timer, this, _1))),
+ TimerEvent(pn->disp_ec, std::bind(&PeerConn::ping_timer, this, _1))),
connected(false),
outbound_handshake(false),
inbound_handshake(false),
ping_period(pn->ping_period) {}
- Peer &operator=(const Peer &) = delete;
- Peer(const Peer &) = delete;
+ PeerConn &operator=(const PeerConn &) = delete;
+ PeerConn(const PeerConn &) = delete;
void reset_ping_timer();
void send_ping();
@@ -351,15 +351,24 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
ev_ping_timer.del();
}
public:
- ~Peer() {
+ ~PeerConn() {
if (inbound_conn) inbound_conn->peer = nullptr;
if (outbound_conn) outbound_conn->peer = nullptr;
}
};
+ struct Peer {
+ uint256_t peer_id;
+ TimerEvent ev_retry_timer;
+ conn_t peer_conn;
+
+ Peer(const conn_t &conn):
+ peer_id(), ev_retry_timer(), peer_conn(conn) {}
+ };
+
std::unordered_map<NetAddr, conn_t> pending_peers;
- std::unordered_map<NetAddr, std::pair<uint256_t, TimerEvent>> known_peers;
- std::unordered_map<uint256_t, BoxObj<Peer>> pid2peer;
+ std::unordered_map<NetAddr, BoxObj<Peer>> known_peers;
+ std::unordered_map<uint256_t, BoxObj<PeerConn>> pid2peer;
using pinfo_slock_t = std::shared_lock<std::shared_timed_mutex>;
using pinfo_ulock_t = std::shared_lock<std::shared_timed_mutex>;
@@ -407,9 +416,10 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
void pong_handler(MsgPong &&msg, const conn_t &conn);
void _ping_msg_cb(const conn_t &conn, uint16_t port);
void _pong_msg_cb(const conn_t &conn, uint16_t port);
- bool check_handshake(Peer *peer);
+ void check_handshake(PeerConn *peer);
+ void move_peer_buffer(conn_t &old_conn, const conn_t &new_conn);
void replace_conn(const conn_t &conn);
- void start_active_conn(const NetAddr &addr);
+ conn_t start_active_conn(const NetAddr &addr);
static void tcall_reset_timeout(ConnPool::Worker *worker,
const conn_t &conn, double timeout);
inline conn_t _get_peer_conn(const NetAddr &addr) const;
@@ -636,6 +646,18 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
pinfo_ulock_t _g(known_peers_lock);
auto it = known_peers.find(addr);
if (it == known_peers.end()) return;
+ auto &pp = it->second;
+ if (conn == pp->peer_conn)
+ {
+ pinfo_ulock_t __g(pid2peer_lock);
+ auto it2 = pid2peer.find(pp->peer_id);
+ if (it2 != pid2peer.end())
+ {
+ auto &p = it2->second;
+ if (p->connected && p->conn != conn)
+ move_peer_buffer(pp->peer_conn, p->conn);
+ }
+ }
if (p)
{
if (conn != p->conn) return;
@@ -645,8 +667,8 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
p->connected = false;
p->outbound_handshake = false;
p->inbound_handshake = false;
- known_peers[p->peer_addr] = std::make_pair(uint256_t(), TimerEvent());
- Peer *peer = nullptr;
+ pp->peer_id = uint256_t();
+ PeerConn *peer = nullptr;
{
pinfo_ulock_t __g(pid2peer_lock);
auto it2 = pid2peer.find(p->peer_id);
@@ -661,9 +683,9 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
}
else
{
- if (!it->second.first.is_null()) return;
+ if (!it->second->peer_id.is_null()) return;
}
- auto &ev_retry_timer = it->second.second;
+ auto &ev_retry_timer = it->second->ev_retry_timer;
ev_retry_timer = TimerEvent(this->disp_ec, [this, addr](TimerEvent &) {
try {
start_active_conn(addr);
@@ -673,14 +695,14 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Peer::reset_ping_timer() {
+void PeerNetwork<O, _, __>::PeerConn::reset_ping_timer() {
assert(ev_ping_timer);
ev_ping_timer.del();
ev_ping_timer.add(gen_rand_timeout(ping_period));
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Peer::send_ping() {
+void PeerNetwork<O, _, __>::PeerConn::send_ping() {
auto pn = conn->get_net();
ping_timer_ok = false;
pong_msg_ok = false;
@@ -689,7 +711,7 @@ void PeerNetwork<O, _, __>::Peer::send_ping() {
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Peer::ping_timer(TimerEvent &) {
+void PeerNetwork<O, _, __>::PeerConn::ping_timer(TimerEvent &) {
ping_timer_ok = true;
if (pong_msg_ok)
{
@@ -699,10 +721,25 @@ void PeerNetwork<O, _, __>::Peer::ping_timer(TimerEvent &) {
}
template<typename O, O _, O __>
-bool PeerNetwork<O, _, __>::check_handshake(Peer *p) {
+void PeerNetwork<O, _, __>::move_peer_buffer(conn_t &old_conn, const conn_t &new_conn) {
+ assert(old_conn->is_terminated() && old_conn != new_conn);
+ for (;;)
+ {
+ bytearray_t buff_seg = old_conn->send_buffer.move_pop();
+ if (!buff_seg.size()) break;
+ new_conn->write(std::move(buff_seg));
+ }
+ old_conn = new_conn;
+ this->user_tcall->async_call([this, new_conn](ThreadCall::Handle &) {
+ if (peer_cb) peer_cb(new_conn, true);
+ });
+}
+
+template<typename O, O _, O __>
+void PeerNetwork<O, _, __>::check_handshake(PeerConn *p) {
if (!(p->inbound_handshake && p->outbound_handshake) ||
p->connected)
- return false;
+ return;
p->clear_all_events();
if (p->inbound_conn && p->inbound_conn != p->conn)
p->inbound_conn->peer = nullptr;
@@ -714,29 +751,33 @@ bool PeerNetwork<O, _, __>::check_handshake(Peer *p) {
p->send_ping();
{
pinfo_ulock_t _g(known_peers_lock);
- known_peers[p->peer_addr] = std::make_pair(p->peer_id, TimerEvent());
+ auto &pp = known_peers[p->peer_addr];
+ pp->peer_id = p->peer_id;
+ pp->ev_retry_timer.del();
+ auto &old_conn = pp->peer_conn;
+ auto &conn = p->conn;
+ if (old_conn != conn)
+ {
+ if (old_conn->is_terminated())
+ move_peer_buffer(old_conn, conn);
+ else
+ this->disp_terminate(old_conn);
+ }
}
pending_peers.erase(p->conn->get_addr());
- if (p->connected)
+ auto color_begin = "";
+ auto color_end = "";
+ if (logger.is_tty())
{
- auto color_begin = "";
- auto color_end = "";
- if (logger.is_tty())
- {
- color_begin = TTY_COLOR_BLUE;
- color_end = TTY_COLOR_RESET;
- }
- SALTICIDAE_LOG_INFO("%sPeerNetwork: established connection %s <-> %s via %s",
- color_begin,
- std::string(listen_addr).c_str(),
- std::string(p->peer_addr).c_str(),
- std::string(*(p->conn)).c_str(),
- color_end);
+ color_begin = TTY_COLOR_BLUE;
+ color_end = TTY_COLOR_RESET;
}
- this->user_tcall->async_call([this, conn=p->conn](ThreadCall::Handle &) {
- if (peer_cb) peer_cb(conn, true);
- });
- return true;
+ SALTICIDAE_LOG_INFO("%sPeerNetwork: established connection %s <-> %s via %s",
+ color_begin,
+ std::string(listen_addr).c_str(),
+ std::string(p->peer_addr).c_str(),
+ std::string(*(p->conn)).c_str(),
+ color_end);
}
template<typename O, O _, O __>
@@ -756,9 +797,10 @@ void PeerNetwork<O, _, __>::replace_conn(const conn_t &conn) {
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) {
+typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) {
auto conn = static_pointer_cast<Conn>(MsgNet::_connect(addr));
replace_conn(conn);
+ return conn;
}
template<typename O, O _, O __>
@@ -766,12 +808,7 @@ inline typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::_get_peer_c
auto it = known_peers.find(addr);
if (it == known_peers.end())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- const auto &peer_id = it->second.first;
- if (peer_id.is_null())
- throw PeerNetworkError(SALTI_ERROR_PEER_NOT_READY);
- auto it2 = pid2peer.find(peer_id);
- assert(it2 != pid2peer.end());
- return it2->second->conn;
+ return it->second->peer_conn;
}
/* end: functions invoked by the dispatcher */
@@ -827,7 +864,7 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) {
else
{
it = pid2peer.insert(std::make_pair(peer_id,
- new Peer(peer_id, conn, conn, nullptr, this))).first;
+ new PeerConn(peer_id, conn, conn, nullptr, this))).first;
}
auto p = it->second.get();
p->inbound_handshake = true;
@@ -895,7 +932,7 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) {
else
{
it = pid2peer.insert(std::make_pair(peer_id,
- new Peer(peer_id, conn, nullptr, conn, this))).first;
+ new PeerConn(peer_id, conn, nullptr, conn, this))).first;
}
auto p = it->second.get();
p->outbound_handshake = true;
@@ -960,11 +997,15 @@ void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) {
try {
pinfo_ulock_t _g(known_peers_lock);
- if (!known_peers.insert(std::make_pair(addr,
- std::make_pair(uint256_t(), TimerEvent()))).second)
+ if (known_peers.count(addr))
throw PeerNetworkError(SALTI_ERROR_PEER_ALREADY_EXISTS);
- if (!pending_peers.count(addr))
- start_active_conn(addr);
+ auto it = pending_peers.find(addr);
+ conn_t conn;
+ if (it == pending_peers.end())
+ conn = start_active_conn(addr);
+ else
+ conn = it->second;
+ known_peers.insert(std::make_pair(addr, new Peer(conn)));
} catch (const PeerNetworkError &) {
this->recoverable_error(std::current_exception());
} catch (...) { this->disp_error_cb(std::current_exception()); }
@@ -980,7 +1021,7 @@ void PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) {
auto it = known_peers.find(addr);
if (it == known_peers.end())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- auto peer_id = it->second.first;
+ auto peer_id = it->second->peer_id;
known_peers.erase(it);
auto it2 = pending_peers.find(addr);
if (it2 != pending_peers.end())
@@ -992,7 +1033,7 @@ void PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) {
auto it3 = pid2peer.find(peer_id);
if (it3 != pid2peer.end())
{
- auto p = it3->second.get();
+ auto &p = it3->second;
this->disp_terminate(p->conn);
pid2peer.erase(it3);
}
@@ -1015,12 +1056,12 @@ PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &addr) const {
auto it = known_peers.find(addr);
if (it == known_peers.end())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- if (it->second.first.is_null())
+ if (it->second->peer_id.is_null())
{
conn = nullptr;
return;
}
- auto it2 = pid2peer.find(it->second.first);
+ auto it2 = pid2peer.find(it->second->peer_id);
assert(it2 != pid2peer.end());
conn = it2->second->conn;
} catch (const PeerNetworkError &) {