aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/salticidae/network.h84
-rw-r--r--include/salticidae/util.h1
-rw-r--r--src/util.cpp1
-rw-r--r--test/test_p2p_stress.cpp1
4 files changed, 64 insertions, 23 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index d3f3bae..e28e3df 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -275,7 +275,6 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
friend PeerNetwork;
Peer *peer;
TimerEvent ev_timeout;
- TimerEvent ev_retry_timer;
void reset_timeout(double timeout);
@@ -320,7 +319,9 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
double ping_period;
Peer() = delete;
- Peer(const uint256_t &peer_id, conn_t conn, conn_t inbound_conn, conn_t outbound_conn, const PeerNetwork *pn):
+ Peer(const uint256_t &peer_id,
+ conn_t conn, conn_t inbound_conn, conn_t outbound_conn,
+ const PeerNetwork *pn):
peer_id(peer_id),
conn(conn),
inbound_conn(inbound_conn),
@@ -346,7 +347,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
};
std::unordered_map<NetAddr, conn_t> pending_peers;
- std::unordered_map<NetAddr, uint256_t> known_peers;
+ std::unordered_map<NetAddr, std::pair<uint256_t, TimerEvent>> known_peers;
std::unordered_map<uint256_t, BoxObj<Peer>> pid2peer;
peer_callback_t peer_cb;
unknown_peer_callback_t unknown_peer_cb;
@@ -389,6 +390,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
void _ping_msg_cb(const conn_t &conn, uint16_t port);
void _pong_msg_cb(const conn_t &conn, uint16_t port);
bool check_handshake(Peer *peer);
+ void replace_conn(const conn_t &conn);
void start_active_conn(const NetAddr &addr);
static void tcall_reset_timeout(ConnPool::Worker *worker,
const conn_t &conn, double timeout);
@@ -474,6 +476,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
void add_peer(const NetAddr &addr);
void del_peer(const NetAddr &addr);
bool has_peer(const NetAddr &addr) const;
+ size_t get_npending() const;
conn_t get_peer_conn(const NetAddr &addr) const;
using MsgNet::send_msg;
template<typename MsgType>
@@ -502,7 +505,7 @@ void MsgNetwork<OpcodeType>::on_read(const ConnPool::conn_t &_conn) {
auto &recv_buffer = conn->recv_buffer;
auto &msg = conn->msg;
auto &msg_state = conn->msg_state;
- while (true) //(!conn->is_terminated())
+ while (true)
{
if (msg_state == Conn::HEADER)
{
@@ -586,19 +589,18 @@ void PeerNetwork<O, _, __>::on_setup(const ConnPool::conn_t &_conn) {
auto conn = static_pointer_cast<Conn>(_conn);
auto worker = conn->worker;
auto &ev_timeout = conn->ev_timeout;
- conn->ev_retry_timer.del();
assert(!ev_timeout);
ev_timeout = TimerEvent(worker->get_ec(), [listen_addr=this->listen_addr, worker, conn](TimerEvent &) {
try {
SALTICIDAE_LOG_INFO("peer ping-pong timeout %s <-> %s",
std::string(listen_addr).c_str(),
std::string(conn->get_peer_addr()).c_str());
- //conn->get_net()->worker_terminate(conn);
+ conn->get_net()->worker_terminate(conn);
} catch (...) { worker->error_callback(std::current_exception()); }
});
/* the initial ping-pong to set up the connection */
tcall_reset_timeout(worker, conn, conn_timeout);
- pending_peers[conn->get_addr()] = conn;
+ replace_conn(conn);
if (conn->get_mode() == Conn::ConnMode::ACTIVE)
send_msg(MsgPing(listen_addr, my_nonce), conn);
}
@@ -608,7 +610,6 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
MsgNet::on_teardown(_conn);
auto conn = static_pointer_cast<Conn>(_conn);
auto addr = conn->get_addr();
- conn->ev_retry_timer.clear();
conn->ev_timeout.clear();
pending_peers.erase(addr);
SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*conn).c_str());
@@ -621,7 +622,6 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
});
auto it = known_peers.find(addr);
if (it == known_peers.end()) return;
- pending_peers[addr] = conn;
if (p)
{
if (conn != p->conn) return;
@@ -631,20 +631,19 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
p->connected = false;
p->outbound_handshake = false;
p->inbound_handshake = false;
- known_peers[p->peer_addr] = uint256_t();
+ known_peers[p->peer_addr] = std::make_pair(uint256_t(), TimerEvent());
pid2peer.erase(p->peer_id);
this->user_tcall->async_call([this, conn](ThreadCall::Handle &) {
if (peer_cb) peer_cb(conn, false);
});
- conn->ev_retry_timer = std::move(retry_timer);
- conn->ev_retry_timer.add(gen_conn_timeout());
}
else
{
- if (!it->second.is_null()) return;
- conn->ev_retry_timer = std::move(retry_timer);
- conn->ev_retry_timer.add(gen_conn_timeout());
+ if (!it->second.first.is_null()) return;
}
+ auto &ev_retry_timer = it->second.second;
+ ev_retry_timer = std::move(retry_timer);
+ ev_retry_timer.add(gen_conn_timeout());
}
template<typename O, O _, O __>
@@ -687,7 +686,8 @@ bool PeerNetwork<O, _, __>::check_handshake(Peer *p) {
p->connected = true;
p->reset_ping_timer();
p->send_ping();
- known_peers[p->peer_addr] = p->peer_id;
+ known_peers[p->peer_addr] = std::make_pair(p->peer_id, TimerEvent());
+ pending_peers.erase(p->conn->get_addr());
if (p->connected)
{
auto color_begin = "";
@@ -711,9 +711,26 @@ bool PeerNetwork<O, _, __>::check_handshake(Peer *p) {
}
template<typename O, O _, O __>
+void PeerNetwork<O, _, __>::replace_conn(const conn_t &conn) {
+ const auto &addr = conn->get_addr();
+ auto it = pending_peers.find(addr);
+ if (it != pending_peers.end())
+ {
+ auto &old_conn = it->second;
+ if (old_conn != conn)
+ {
+ if (old_conn->get_mode() != Conn::ConnMode::DEAD)
+ this->disp_terminate(old_conn);
+ pending_peers.erase(it);
+ }
+ }
+ pending_peers.insert(std::make_pair(addr, conn));
+}
+
+template<typename O, O _, O __>
void PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) {
auto conn = static_pointer_cast<Conn>(MsgNet::_connect(addr));
- pending_peers[addr] = conn;
+ replace_conn(conn);
}
template<typename O, O _, O __>
@@ -721,7 +738,10 @@ inline typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::_get_peer_c
auto it = known_peers.find(addr);
if (it == known_peers.end())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- auto it2 = pid2peer.find(it->second);
+ auto &peer_id = it->second.first;
+ if (peer_id.is_null())
+ throw PeerNetworkError(SALTI_ERROR_PEER_NOT_READY);
+ auto it2 = pid2peer.find(peer_id);
assert(it2 != pid2peer.end());
return it2->second->conn;
}
@@ -849,7 +869,16 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) {
}
auto p = it->second.get();
p->outbound_handshake = true;
- p->peer_addr = conn->get_addr();
+ auto &peer_addr = conn->get_addr();
+ auto &old_peer_addr = p->peer_addr;
+ if (!old_peer_addr.is_null() && old_peer_addr != peer_addr)
+ {
+ SALTICIDAE_LOG_WARN("multiple peer addresses share the same identity");
+ known_peers.erase(old_peer_addr);
+ if (p->conn && p->conn->get_mode() != Conn::ConnMode::DEAD)
+ this->disp_terminate(p->conn);
+ }
+ old_peer_addr = peer_addr;
p->reset_ping_timer();
check_handshake(p);
}
@@ -900,7 +929,8 @@ template<typename O, O _, O __>
void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) {
try {
- if (!known_peers.insert(std::make_pair(addr, uint256_t())).second)
+ if (!known_peers.insert(std::make_pair(addr,
+ std::make_pair(uint256_t(), TimerEvent()))).second)
throw PeerNetworkError(SALTI_ERROR_PEER_ALREADY_EXISTS);
if (!pending_peers.count(addr))
start_active_conn(addr);
@@ -917,7 +947,7 @@ void PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) {
auto it = known_peers.find(addr);
if (it == known_peers.end())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- auto peer_id = it->second;
+ auto peer_id = it->second.first;
known_peers.erase(it);
auto it2 = pending_peers.find(addr);
if (it2 != pending_peers.end())
@@ -950,12 +980,12 @@ PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &addr) const {
auto it = known_peers.find(addr);
if (it == known_peers.end())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- if (it->second.is_null())
+ if (it->second.first.is_null())
{
conn = nullptr;
return;
}
- auto it2 = pid2peer.find(it->second);
+ auto it2 = pid2peer.find(it->second.first);
assert(it2 != pid2peer.end());
conn = it2->second->conn;
} catch (const PeerNetworkError &) {
@@ -978,6 +1008,14 @@ bool PeerNetwork<O, _, __>::has_peer(const NetAddr &addr) const {
}
template<typename O, O _, O __>
+size_t PeerNetwork<O, _, __>::get_npending() const {
+ return *(static_cast<bool *>(this->disp_tcall->call(
+ [this](ThreadCall::Handle &h) {
+ h.set_result(pending_peers.size());
+ }).get()));
+}
+
+template<typename O, O _, O __>
template<typename MsgType>
inline void PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) {
return _send_msg_deferred(std::move(msg), addr);
diff --git a/include/salticidae/util.h b/include/salticidae/util.h
index 5059f13..063058c 100644
--- a/include/salticidae/util.h
+++ b/include/salticidae/util.h
@@ -85,6 +85,7 @@ enum SalticidaeErrorCode {
SALTI_ERROR_CONNECT,
SALTI_ERROR_PEER_ALREADY_EXISTS,
SALTI_ERROR_PEER_NOT_EXIST,
+ SALTI_ERROR_PEER_NOT_READY,
SALTI_ERROR_NETADDR_INVALID,
SALTI_ERROR_OPTVAL_INVALID,
SALTI_ERROR_OPTNAME_ALREADY_EXISTS,
diff --git a/src/util.cpp b/src/util.cpp
index b1f60db..ff41c9b 100644
--- a/src/util.cpp
+++ b/src/util.cpp
@@ -42,6 +42,7 @@ const char *SALTICIDAE_ERROR_STRINGS[] = {
"unable to connect",
"peer already exists",
"peer does not exist",
+ "peer is not ready",
"invalid NetAddr format",
"invalid OptVal format",
"option name already exists",
diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp
index 1eb4a0d..ca4fcda 100644
--- a/test/test_p2p_stress.cpp
+++ b/test/test_p2p_stress.cpp
@@ -148,6 +148,7 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) {
for (const auto &p: app.tc)
s += salticidae::stringprintf(" %d(%d)", ntohs(p.first.port), p.second.ncompleted);
SALTICIDAE_LOG_INFO("%d completed:%s", ntohs(app.addr.port), s.c_str());
+ SALTICIDAE_LOG_INFO("%d npending: %lu", ntohs(app.addr.port), net.get_npending());
});
double t = salticidae::gen_rand_timeout(10);
tc.timer.add(t);