diff options
-rw-r--r-- | include/salticidae/conn.h | 40 | ||||
-rw-r--r-- | include/salticidae/network.h | 55 | ||||
-rw-r--r-- | test/test_p2p_stress.cpp | 1 |
3 files changed, 47 insertions, 49 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 6fc1288..546df5f 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -148,9 +148,9 @@ class ConnPool { ThreadCall* disp_tcall; /** Should be implemented by derived class to return a new Conn object. */ virtual Conn *create_conn() = 0; - using _error_callback_t = std::function<void(const std::exception_ptr err)>; - _error_callback_t disp_error_cb; - _error_callback_t worker_error_cb; + using worker_error_callback_t = std::function<void(const std::exception_ptr err)>; + worker_error_callback_t disp_error_cb; + worker_error_callback_t worker_error_cb; private: @@ -176,20 +176,17 @@ class ConnPool { } class Worker { - public: - - private: EventContext ec; ThreadCall tcall; std::thread handle; bool disp_flag; std::atomic<size_t> nconn; - ConnPool::_error_callback_t on_fatal_error; + ConnPool::worker_error_callback_t on_fatal_error; public: Worker(): tcall(ec), disp_flag(false), nconn(0) {} - void set_error_callback(ConnPool::_error_callback_t _on_error) { + void set_error_callback(ConnPool::worker_error_callback_t _on_error) { on_fatal_error = std::move(_on_error); } @@ -269,6 +266,17 @@ class ConnPool { protected: conn_t _connect(const NetAddr &addr); void _listen(NetAddr listen_addr); + void recoverable_error(const std::exception_ptr err) { + user_tcall->async_call([this, err](ThreadCall::Handle &) { + if (error_cb) { + try { + std::rethrow_exception(err); + } catch (const std::exception &e) { + error_cb(e, false); + } + } + }); + } private: @@ -287,15 +295,6 @@ class ConnPool { return workers[idx]; } - void on_fatal_error(const std::exception &error) { - stop_workers(); - if (error_cb) error_cb(error, true); - } - - void on_recoverable_error(const std::exception &error) { - if (error_cb) error_cb(error, false); - } - public: class Config { @@ -359,7 +358,8 @@ class ConnPool { try { std::rethrow_exception(_err); } catch (const std::exception &err) { - on_fatal_error(err); + stop_workers(); + if (error_cb) error_cb(err, true); } }); disp_ec.stop(); @@ -430,8 +430,8 @@ class ConnPool { conn_t connect(const NetAddr &addr, bool blocking = true) { if (blocking) { - auto ret = *(static_cast<std::pair<conn_t, std::exception_ptr> *>(disp_tcall->call( - [this, addr](ThreadCall::Handle &h) { + auto ret = *(static_cast<std::pair<conn_t, std::exception_ptr> *>( + disp_tcall->call([this, addr](ThreadCall::Handle &h) { conn_t conn; std::exception_ptr err = nullptr; try { diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 9c57749..b119e78 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -488,24 +488,10 @@ inline void MsgNetwork<OpcodeType>::_send_msg(Msg &&msg, const conn_t &conn) { [this, msg=std::move(msg), conn](ThreadCall::Handle &) { try { this->_send_msg_dispatcher(msg, conn); - throw SalticidaeError("wow"); - } catch (...) { this->disp_error_cb(std::current_exception()); } + } catch (...) { this->recoverable_error(std::current_exception()); } }); } -template<typename OpcodeType> -inline void MsgNetwork<OpcodeType>::_send_msg_dispatcher(const Msg &msg, const conn_t &conn) { - bytearray_t msg_data = msg.serialize(); - SALTICIDAE_LOG_DEBUG("wrote message %s to %s", - std::string(msg).c_str(), - std::string(*conn).c_str()); -#ifdef SALTICIDAE_MSG_STAT - conn->nsent++; - conn->nsentb += msg.get_length(); -#endif - conn->write(std::move(msg_data)); -} - template<typename O, O _, O __> void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker, const conn_t &conn, double timeout) { @@ -654,6 +640,19 @@ void PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) { if (id_mode == IP_BASED) conn->peer_id.port = 0; } + +template<typename OpcodeType> +inline void MsgNetwork<OpcodeType>::_send_msg_dispatcher(const Msg &msg, const conn_t &conn) { + bytearray_t msg_data = msg.serialize(); + SALTICIDAE_LOG_DEBUG("wrote message %s to %s", + std::string(msg).c_str(), + std::string(*conn).c_str()); +#ifdef SALTICIDAE_MSG_STAT + conn->nsent++; + conn->nsentb += msg.get_length(); +#endif + conn->write(std::move(msg_data)); +} /* end: functions invoked by the dispatcher */ /* begin: functions invoked by the user loop */ @@ -697,18 +696,18 @@ void PeerNetwork<O, _, __>::msg_pong(MsgPong &&msg, const conn_t &conn) { template<typename O, O _, O __> void PeerNetwork<O, _, __>::listen(NetAddr listen_addr) { - auto ret = *(static_cast<SalticidaeError *>( + auto ret = *(static_cast<std::exception_ptr *>( this->disp_tcall->call([this, listen_addr](ThreadCall::Handle &h) { - SalticidaeError err; + std::exception_ptr err = nullptr; try { MsgNet::_listen(listen_addr); listen_port = listen_addr.port; - } catch (SalticidaeError &e) { - err = e; + } catch (...) { + err = std::current_exception(); } h.set_result(std::move(err)); }).get())); - if (ret.get_code()) throw ret; + if (ret) std::rethrow_exception(ret); } template<typename O, O _, O __> @@ -727,21 +726,21 @@ void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) { template<typename O, O _, O __> const typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &paddr) const { - auto ret = *(static_cast<std::pair<conn_t, SalticidaeError> *>( + auto ret = *(static_cast<std::pair<conn_t, std::exception_ptr> *>( this->disp_tcall->call([this, paddr](ThreadCall::Handle &h) { conn_t conn; - SalticidaeError err; + std::exception_ptr err = nullptr; try { auto it = id2peer.find(paddr); if (it == id2peer.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXISTS); conn = it->second->conn; - } catch (SalticidaeError &e) { - err = e; + } catch (...) { + err = std::current_exception(); } - h.set_result(std::make_pair(std::move(conn), std::move(err))); + h.set_result(std::make_pair(std::move(conn), err)); }).get())); - if (ret.second.get_code()) throw ret.second; + if (ret.second) std::rethrow_exception(ret.second); return std::move(ret.first); } @@ -768,7 +767,7 @@ void PeerNetwork<O, _, __>::_send_msg(Msg &&msg, const NetAddr &paddr) { if (it == id2peer.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXISTS); this->_send_msg_dispatcher(msg, it->second->conn); - } catch (...) { this->disp_error_cb(std::current_exception()); } + } catch (...) { this->recoverable_error(std::current_exception()); } }); } @@ -790,7 +789,7 @@ void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXISTS); this->_send_msg_dispatcher(msg, it->second->conn); } - } catch (...) { this->disp_error_cb(std::current_exception()); } + } catch (...) { this->recoverable_error(std::current_exception()); } }); } diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp index 7321217..3a71660 100644 --- a/test/test_p2p_stress.cpp +++ b/test/test_p2p_stress.cpp @@ -117,7 +117,6 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) { net.reg_error_handler([ec](const std::exception &err, bool fatal) { SALTICIDAE_LOG_WARN("main thread captured %s error: %s", fatal ? "fatal" : "recoverable", err.what()); - ec.stop(); }); net.reg_handler([&](MsgRand &&msg, const MyNet::conn_t &conn) { uint256_t hash = salticidae::get_hash(msg.bytes); |