diff options
-rw-r--r-- | include/salticidae/conn.h | 4 | ||||
-rw-r--r-- | include/salticidae/event.h | 9 | ||||
-rw-r--r-- | include/salticidae/network.h | 46 | ||||
-rw-r--r-- | include/salticidae/ref.h | 5 | ||||
-rw-r--r-- | test/test_p2p_stress.cpp | 19 |
5 files changed, 43 insertions, 40 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index a345845..316d9cf 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -575,10 +575,10 @@ class ConnPool { } template<typename Func> - void reg_conn_handler(Func cb) { conn_cb = cb; } + void reg_conn_handler(Func &&cb) { conn_cb = std::forward<Func>(cb); } template<typename Func> - void reg_error_handler(Func cb) { error_cb = cb; } + void reg_error_handler(Func &&cb) { error_cb = std::forward<Func>(cb); } void terminate(const conn_t &conn) { disp_tcall->async_call([this, conn](ThreadCall::Handle &) { diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 78ae12d..30d8f3c 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -671,6 +671,7 @@ class ThreadCall { friend ThreadCall; public: Handle(): notifier(nullptr) {} + Handle(const Handle &) = delete; void exec() { callback(*this); if (notifier) @@ -707,16 +708,16 @@ class ThreadCall { } template<typename Func> - void async_call(Func callback) { + void async_call(Func &&callback) { auto h = new Handle(); - h->callback = callback; + h->callback = std::forward<Func>(callback); q.enqueue(h); } template<typename Func> - Result call(Func callback) { + Result call(Func &&callback) { auto h = new Handle(); - h->callback = callback; + h->callback = std::forward<Func>(callback); ThreadNotifier<Result> notifier; h->notifier = ¬ifier; q.enqueue(h); diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 467fe6a..4725b95 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -183,19 +183,21 @@ class MsgNetwork: public ConnPool { template<typename Func> typename std::enable_if<std::is_constructible< - typename callback_traits<Func>::msg_type, DataStream &&>::value>::type - reg_handler(Func handler) { - using callback_t = callback_traits<Func>; + typename callback_traits< + typename std::remove_reference<Func>::type>::msg_type, + DataStream &&>::value>::type + reg_handler(Func &&handler) { + using callback_t = callback_traits<typename std::remove_reference<Func>::type>; set_handler(callback_t::msg_type::opcode, - [handler](const Msg &msg, const conn_t &conn) { + [handler=std::forward<Func>(handler)](const Msg &msg, const conn_t &conn) { handler(typename callback_t::msg_type(msg.get_payload()), static_pointer_cast<typename callback_t::conn_type>(conn)); }); } template<typename Func> - inline void set_handler(OpcodeType opcode, Func handler) { - handler_map[opcode] = handler; + inline void set_handler(OpcodeType opcode, Func &&handler) { + handler_map[opcode] = std::forward<Func>(handler); } template<typename MsgType> @@ -275,12 +277,13 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { class Conn: public MsgNet::Conn { friend PeerNetwork; Peer *peer; + BoxObj<Peer> _dead_peer; TimerEvent ev_timeout; void reset_timeout(double timeout); public: - Conn(): MsgNet::Conn(), peer(nullptr) {} + Conn(): MsgNet::Conn(), peer(nullptr), _dead_peer(nullptr) {} NetAddr get_peer_addr() { auto ret = *(static_cast<NetAddr *>( get_net()->disp_tcall->call([this](ThreadCall::Handle &h) { @@ -507,9 +510,9 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { void listen(NetAddr listen_addr); conn_t connect(const NetAddr &addr) = delete; template<typename Func> - void reg_unknown_peer_handler(Func cb) { unknown_peer_cb = cb; } + void reg_unknown_peer_handler(Func &&cb) { unknown_peer_cb = std::forward<Func>(cb); } template<typename Func> - void reg_peer_handler(Func cb) { peer_cb = cb; } + void reg_peer_handler(Func &&cb) { peer_cb = std::forward<Func>(cb); } }; /* this callback is run by a worker */ @@ -630,11 +633,6 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*conn).c_str()); auto p = conn->peer; if (p) addr = p->peer_addr; - TimerEvent retry_timer(this->disp_ec, [this, addr](TimerEvent &) { - try { - start_active_conn(addr); - } catch (...) { this->disp_error_cb(std::current_exception()); } - }); pinfo_ulock_t _g(known_peers_lock); auto it = known_peers.find(addr); if (it == known_peers.end()) return; @@ -648,12 +646,16 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { p->outbound_handshake = false; p->inbound_handshake = false; known_peers[p->peer_addr] = std::make_pair(uint256_t(), TimerEvent()); + Peer *peer = nullptr; { pinfo_ulock_t __g(pid2peer_lock); - p->conn->peer = nullptr; - pid2peer.erase(p->peer_id); + auto it2 = pid2peer.find(p->peer_id); + peer = it2->second.unwrap(); + pid2peer.erase(it2); } - this->user_tcall->async_call([this, conn](ThreadCall::Handle &) { + peer->conn = nullptr; + conn->_dead_peer = peer; + this->user_tcall->async_call([this, conn, peer](ThreadCall::Handle &) { if (peer_cb) peer_cb(conn, false); }); } @@ -662,7 +664,11 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { if (!it->second.first.is_null()) return; } auto &ev_retry_timer = it->second.second; - ev_retry_timer = std::move(retry_timer); + ev_retry_timer = TimerEvent(this->disp_ec, [this, addr](TimerEvent &) { + try { + start_active_conn(addr); + } catch (...) { this->disp_error_cb(std::current_exception()); } + }); ev_retry_timer.add(gen_conn_timeout()); } @@ -809,7 +815,7 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) { SALTICIDAE_LOG_DEBUG("%s terminating old connection %s", std::string(listen_addr).c_str(), std::string(*old_conn).c_str()); - old_conn->peer = nullptr; + assert(old_conn->peer == nullptr); old_conn->get_net()->disp_terminate(old_conn); } old_conn = conn; @@ -870,7 +876,7 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) { SALTICIDAE_LOG_DEBUG("%s terminating old connection %s", std::string(listen_addr).c_str(), std::string(*old_conn).c_str()); - old_conn->peer = nullptr; + assert(old_conn->peer == nullptr); old_conn->get_net()->disp_terminate(old_conn); } old_conn = conn; diff --git a/include/salticidae/ref.h b/include/salticidae/ref.h index 44eb01e..9f4bfed 100644 --- a/include/salticidae/ref.h +++ b/include/salticidae/ref.h @@ -98,6 +98,11 @@ class _BoxObj { T &operator *() const { return *obj; } T *get() const { return obj; } + T *unwrap() { + auto ret = obj; + obj = nullptr; + return ret; + } operator bool() const { return obj != nullptr; } bool operator==(const _BoxObj &other) const { return obj == other.obj; } bool operator!=(const _BoxObj &other) const { return obj != other.obj; } diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp index cb6595c..fc9a430 100644 --- a/test/test_p2p_stress.cpp +++ b/test/test_p2p_stress.cpp @@ -110,10 +110,7 @@ struct AppContext { void install_proto(AppContext &app, const size_t &seg_buff_size) { auto &ec = app.ec; auto &net = *app.net; - auto send_rand = [&](int size, const MyNet::conn_t &conn) { - auto addr = conn->get_peer_addr(); - assert(!addr.is_null()); - auto &tc = app.tc[addr]; + auto send_rand = [&](int size, const MyNet::conn_t &conn, TestContext &tc) { MsgRand msg(tc.view, size); tc.hash = msg.hash; net.send_msg(std::move(msg), conn); @@ -125,11 +122,10 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) { if (connected) { auto addr = conn->get_peer_addr(); - assert(!addr.is_null()); auto &tc = app.tc[addr]; tc.state = 1; tc.view++; - send_rand(tc.state, conn); + send_rand(tc.state, conn, tc); } }); net.reg_error_handler([ec](const std::exception_ptr _err, bool fatal) { @@ -147,11 +143,6 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) { net.reg_handler([&, send_rand](MsgAck &&msg, const MyNet::conn_t &conn) { auto addr = conn->get_peer_addr(); if (addr.is_null()) return; - if (app.tc.find(addr) == app.tc.end()) - { - SALTICIDAE_LOG_WARN("%s\n", std::string(addr).c_str()); - throw std::runtime_error("violation"); - } auto &tc = app.tc[addr]; if (msg.view != tc.view) { @@ -166,7 +157,7 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) { if (tc.state == seg_buff_size * 2) { - send_rand(tc.state, conn); + send_rand(tc.state, conn, tc); tc.state = -1; tc.timer = TimerEvent(ec, [&, conn](TimerEvent &) { tc.ncompleted++; @@ -182,9 +173,9 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) { SALTICIDAE_LOG_INFO("rand-bomboard phase, ending in %.2f secs", t); } else if (tc.state == -1) - send_rand(rand() % (seg_buff_size * 10), conn); + send_rand(rand() % (seg_buff_size * 10), conn, tc); else - send_rand(++tc.state, conn); + send_rand(++tc.state, conn, tc); }); } |