aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/salticidae/conn.h4
-rw-r--r--include/salticidae/event.h9
-rw-r--r--include/salticidae/network.h46
-rw-r--r--include/salticidae/ref.h5
-rw-r--r--test/test_p2p_stress.cpp19
5 files changed, 43 insertions, 40 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index a345845..316d9cf 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -575,10 +575,10 @@ class ConnPool {
}
template<typename Func>
- void reg_conn_handler(Func cb) { conn_cb = cb; }
+ void reg_conn_handler(Func &&cb) { conn_cb = std::forward<Func>(cb); }
template<typename Func>
- void reg_error_handler(Func cb) { error_cb = cb; }
+ void reg_error_handler(Func &&cb) { error_cb = std::forward<Func>(cb); }
void terminate(const conn_t &conn) {
disp_tcall->async_call([this, conn](ThreadCall::Handle &) {
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index 78ae12d..30d8f3c 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -671,6 +671,7 @@ class ThreadCall {
friend ThreadCall;
public:
Handle(): notifier(nullptr) {}
+ Handle(const Handle &) = delete;
void exec() {
callback(*this);
if (notifier)
@@ -707,16 +708,16 @@ class ThreadCall {
}
template<typename Func>
- void async_call(Func callback) {
+ void async_call(Func &&callback) {
auto h = new Handle();
- h->callback = callback;
+ h->callback = std::forward<Func>(callback);
q.enqueue(h);
}
template<typename Func>
- Result call(Func callback) {
+ Result call(Func &&callback) {
auto h = new Handle();
- h->callback = callback;
+ h->callback = std::forward<Func>(callback);
ThreadNotifier<Result> notifier;
h->notifier = &notifier;
q.enqueue(h);
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 467fe6a..4725b95 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -183,19 +183,21 @@ class MsgNetwork: public ConnPool {
template<typename Func>
typename std::enable_if<std::is_constructible<
- typename callback_traits<Func>::msg_type, DataStream &&>::value>::type
- reg_handler(Func handler) {
- using callback_t = callback_traits<Func>;
+ typename callback_traits<
+ typename std::remove_reference<Func>::type>::msg_type,
+ DataStream &&>::value>::type
+ reg_handler(Func &&handler) {
+ using callback_t = callback_traits<typename std::remove_reference<Func>::type>;
set_handler(callback_t::msg_type::opcode,
- [handler](const Msg &msg, const conn_t &conn) {
+ [handler=std::forward<Func>(handler)](const Msg &msg, const conn_t &conn) {
handler(typename callback_t::msg_type(msg.get_payload()),
static_pointer_cast<typename callback_t::conn_type>(conn));
});
}
template<typename Func>
- inline void set_handler(OpcodeType opcode, Func handler) {
- handler_map[opcode] = handler;
+ inline void set_handler(OpcodeType opcode, Func &&handler) {
+ handler_map[opcode] = std::forward<Func>(handler);
}
template<typename MsgType>
@@ -275,12 +277,13 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
class Conn: public MsgNet::Conn {
friend PeerNetwork;
Peer *peer;
+ BoxObj<Peer> _dead_peer;
TimerEvent ev_timeout;
void reset_timeout(double timeout);
public:
- Conn(): MsgNet::Conn(), peer(nullptr) {}
+ Conn(): MsgNet::Conn(), peer(nullptr), _dead_peer(nullptr) {}
NetAddr get_peer_addr() {
auto ret = *(static_cast<NetAddr *>(
get_net()->disp_tcall->call([this](ThreadCall::Handle &h) {
@@ -507,9 +510,9 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
void listen(NetAddr listen_addr);
conn_t connect(const NetAddr &addr) = delete;
template<typename Func>
- void reg_unknown_peer_handler(Func cb) { unknown_peer_cb = cb; }
+ void reg_unknown_peer_handler(Func &&cb) { unknown_peer_cb = std::forward<Func>(cb); }
template<typename Func>
- void reg_peer_handler(Func cb) { peer_cb = cb; }
+ void reg_peer_handler(Func &&cb) { peer_cb = std::forward<Func>(cb); }
};
/* this callback is run by a worker */
@@ -630,11 +633,6 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*conn).c_str());
auto p = conn->peer;
if (p) addr = p->peer_addr;
- TimerEvent retry_timer(this->disp_ec, [this, addr](TimerEvent &) {
- try {
- start_active_conn(addr);
- } catch (...) { this->disp_error_cb(std::current_exception()); }
- });
pinfo_ulock_t _g(known_peers_lock);
auto it = known_peers.find(addr);
if (it == known_peers.end()) return;
@@ -648,12 +646,16 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
p->outbound_handshake = false;
p->inbound_handshake = false;
known_peers[p->peer_addr] = std::make_pair(uint256_t(), TimerEvent());
+ Peer *peer = nullptr;
{
pinfo_ulock_t __g(pid2peer_lock);
- p->conn->peer = nullptr;
- pid2peer.erase(p->peer_id);
+ auto it2 = pid2peer.find(p->peer_id);
+ peer = it2->second.unwrap();
+ pid2peer.erase(it2);
}
- this->user_tcall->async_call([this, conn](ThreadCall::Handle &) {
+ peer->conn = nullptr;
+ conn->_dead_peer = peer;
+ this->user_tcall->async_call([this, conn, peer](ThreadCall::Handle &) {
if (peer_cb) peer_cb(conn, false);
});
}
@@ -662,7 +664,11 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
if (!it->second.first.is_null()) return;
}
auto &ev_retry_timer = it->second.second;
- ev_retry_timer = std::move(retry_timer);
+ ev_retry_timer = TimerEvent(this->disp_ec, [this, addr](TimerEvent &) {
+ try {
+ start_active_conn(addr);
+ } catch (...) { this->disp_error_cb(std::current_exception()); }
+ });
ev_retry_timer.add(gen_conn_timeout());
}
@@ -809,7 +815,7 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) {
SALTICIDAE_LOG_DEBUG("%s terminating old connection %s",
std::string(listen_addr).c_str(),
std::string(*old_conn).c_str());
- old_conn->peer = nullptr;
+ assert(old_conn->peer == nullptr);
old_conn->get_net()->disp_terminate(old_conn);
}
old_conn = conn;
@@ -870,7 +876,7 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) {
SALTICIDAE_LOG_DEBUG("%s terminating old connection %s",
std::string(listen_addr).c_str(),
std::string(*old_conn).c_str());
- old_conn->peer = nullptr;
+ assert(old_conn->peer == nullptr);
old_conn->get_net()->disp_terminate(old_conn);
}
old_conn = conn;
diff --git a/include/salticidae/ref.h b/include/salticidae/ref.h
index 44eb01e..9f4bfed 100644
--- a/include/salticidae/ref.h
+++ b/include/salticidae/ref.h
@@ -98,6 +98,11 @@ class _BoxObj {
T &operator *() const { return *obj; }
T *get() const { return obj; }
+ T *unwrap() {
+ auto ret = obj;
+ obj = nullptr;
+ return ret;
+ }
operator bool() const { return obj != nullptr; }
bool operator==(const _BoxObj &other) const { return obj == other.obj; }
bool operator!=(const _BoxObj &other) const { return obj != other.obj; }
diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp
index cb6595c..fc9a430 100644
--- a/test/test_p2p_stress.cpp
+++ b/test/test_p2p_stress.cpp
@@ -110,10 +110,7 @@ struct AppContext {
void install_proto(AppContext &app, const size_t &seg_buff_size) {
auto &ec = app.ec;
auto &net = *app.net;
- auto send_rand = [&](int size, const MyNet::conn_t &conn) {
- auto addr = conn->get_peer_addr();
- assert(!addr.is_null());
- auto &tc = app.tc[addr];
+ auto send_rand = [&](int size, const MyNet::conn_t &conn, TestContext &tc) {
MsgRand msg(tc.view, size);
tc.hash = msg.hash;
net.send_msg(std::move(msg), conn);
@@ -125,11 +122,10 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) {
if (connected)
{
auto addr = conn->get_peer_addr();
- assert(!addr.is_null());
auto &tc = app.tc[addr];
tc.state = 1;
tc.view++;
- send_rand(tc.state, conn);
+ send_rand(tc.state, conn, tc);
}
});
net.reg_error_handler([ec](const std::exception_ptr _err, bool fatal) {
@@ -147,11 +143,6 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) {
net.reg_handler([&, send_rand](MsgAck &&msg, const MyNet::conn_t &conn) {
auto addr = conn->get_peer_addr();
if (addr.is_null()) return;
- if (app.tc.find(addr) == app.tc.end())
- {
- SALTICIDAE_LOG_WARN("%s\n", std::string(addr).c_str());
- throw std::runtime_error("violation");
- }
auto &tc = app.tc[addr];
if (msg.view != tc.view)
{
@@ -166,7 +157,7 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) {
if (tc.state == seg_buff_size * 2)
{
- send_rand(tc.state, conn);
+ send_rand(tc.state, conn, tc);
tc.state = -1;
tc.timer = TimerEvent(ec, [&, conn](TimerEvent &) {
tc.ncompleted++;
@@ -182,9 +173,9 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) {
SALTICIDAE_LOG_INFO("rand-bomboard phase, ending in %.2f secs", t);
}
else if (tc.state == -1)
- send_rand(rand() % (seg_buff_size * 10), conn);
+ send_rand(rand() % (seg_buff_size * 10), conn, tc);
else
- send_rand(++tc.state, conn);
+ send_rand(++tc.state, conn, tc);
});
}