diff options
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r-- | include/salticidae/network.h | 93 |
1 files changed, 56 insertions, 37 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h index a09f8ac..e9fdae6 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -199,14 +199,16 @@ class MsgNetwork: public ConnPool { } template<typename MsgType> - void send_msg(MsgType &&msg, const conn_t &conn); - inline void _send_msg(Msg &&msg, const conn_t &conn); - inline void _send_msg_dispatcher(const Msg &msg, const conn_t &conn); + inline void send_msg(const MsgType &msg, const conn_t &conn); + inline void _send_msg(const Msg &msg, const conn_t &conn); + template<typename MsgType> + inline void send_msg_deferred(MsgType &&msg, const conn_t &conn); + inline void _send_msg_deferred(Msg &&msg, const conn_t &conn); void stop() { stop_workers(); } using ConnPool::listen; - conn_t connect(const NetAddr &addr) { - return static_pointer_cast<Conn>(ConnPool::connect(addr)); + conn_t connect(const NetAddr &addr, bool blocking = true) { + return static_pointer_cast<Conn>(ConnPool::connect(addr, blocking)); } }; @@ -446,8 +448,11 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { const conn_t get_peer_conn(const NetAddr &paddr) const; using MsgNet::send_msg; template<typename MsgType> - void send_msg(MsgType &&msg, const NetAddr &paddr); - inline void _send_msg(Msg &&msg, const NetAddr &paddr); + inline void send_msg(const MsgType &msg, const NetAddr &paddr); + inline void _send_msg(const Msg &msg, const NetAddr &paddr); + template<typename MsgType> + inline void send_msg_deferred(MsgType &&msg, const NetAddr &paddr); + inline void _send_msg_deferred(Msg &&msg, const NetAddr &paddr); template<typename MsgType> void multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs); inline void _multicast_msg(Msg &&msg, const std::vector<NetAddr> &paddrs); @@ -496,20 +501,39 @@ void MsgNetwork<OpcodeType>::Conn::on_read() { template<typename OpcodeType> template<typename MsgType> -void MsgNetwork<OpcodeType>::send_msg(MsgType &&msg, const conn_t &conn) { - return _send_msg(MsgType(std::move(msg)), conn); +inline void MsgNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const conn_t &conn) { + return _send_msg_deferred(MsgType(std::move(msg)), conn); } template<typename OpcodeType> -inline void MsgNetwork<OpcodeType>::_send_msg(Msg &&msg, const conn_t &conn) { +inline void MsgNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const conn_t &conn) { this->disp_tcall->async_call( [this, msg=std::move(msg), conn](ThreadCall::Handle &) { try { - this->_send_msg_dispatcher(msg, conn); + this->send_msg(msg, conn); } catch (...) { this->recoverable_error(std::current_exception()); } }); } +template<typename OpcodeType> +template<typename MsgType> +inline void MsgNetwork<OpcodeType>::send_msg(const MsgType &msg, const conn_t &conn) { + return _send_msg(msg, conn); +} + +template<typename OpcodeType> +inline void MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn) { + bytearray_t msg_data = msg.serialize(); + SALTICIDAE_LOG_DEBUG("wrote message %s to %s", + std::string(msg).c_str(), + std::string(*conn).c_str()); +#ifdef SALTICIDAE_MSG_STAT + conn->nsent++; + conn->nsentb += msg.get_length(); +#endif + conn->write(std::move(msg_data)); +} + template<typename O, O _, O __> void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker, const conn_t &conn, double timeout) { @@ -686,19 +710,6 @@ typename PeerNetwork<O, _, __>::Peer *PeerNetwork<O, _, __>::get_peer(const NetA if (it != id2upeer.end()) return it->second.get(); return nullptr; } - -template<typename OpcodeType> -inline void MsgNetwork<OpcodeType>::_send_msg_dispatcher(const Msg &msg, const conn_t &conn) { - bytearray_t msg_data = msg.serialize(); - SALTICIDAE_LOG_DEBUG("wrote message %s to %s", - std::string(msg).c_str(), - std::string(*conn).c_str()); -#ifdef SALTICIDAE_MSG_STAT - conn->nsent++; - conn->nsentb += msg.get_length(); -#endif - conn->write(std::move(msg_data)); -} /* end: functions invoked by the dispatcher */ /* begin: functions invoked by the user loop */ @@ -826,33 +837,41 @@ bool PeerNetwork<O, _, __>::has_peer(const NetAddr &paddr) const { template<typename O, O _, O __> template<typename MsgType> -void PeerNetwork<O, _, __>::send_msg(MsgType &&msg, const NetAddr &paddr) { - return _send_msg(MsgType(std::move(msg)), paddr); +inline void PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const NetAddr &paddr) { + return _send_msg_deferred(MsgType(std::move(msg)), paddr); } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::_send_msg(Msg &&msg, const NetAddr &paddr) { +inline void PeerNetwork<O, _, __>::_send_msg_deferred(Msg &&msg, const NetAddr &paddr) { this->disp_tcall->async_call( - [this, msg=std::move(msg), paddr](ThreadCall::Handle &) { + [this, msg=std::move(msg), paddr](ThreadCall::Handle &) { try { - auto p = get_peer(paddr); - if (!p) - throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - this->_send_msg_dispatcher(msg, p->conn); - } catch (const PeerNetworkError &) { - this->recoverable_error(std::current_exception()); + send_msg(msg, paddr); } catch (...) { this->recoverable_error(std::current_exception()); } }); } template<typename O, O _, O __> template<typename MsgType> -void PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs) { +inline void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &paddr) { + return _send_msg(msg, paddr); +} + +template<typename O, O _, O __> +inline void PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const NetAddr &paddr) { + auto p = get_peer(paddr); + if (!p) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); + this->send_msg(msg, p->conn); +} + +template<typename O, O _, O __> +template<typename MsgType> +inline void PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs) { return _multicast_msg(MsgType(std::move(msg)), paddrs); } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &paddrs) { +inline void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &paddrs) { this->disp_tcall->async_call( [this, msg=std::move(msg), paddrs](ThreadCall::Handle &) { try { @@ -861,7 +880,7 @@ void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> auto p = get_peer(addr); if (!p) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - this->_send_msg_dispatcher(msg, p->conn); + this->send_msg(msg, p->conn); } } catch (const PeerNetworkError &) { this->recoverable_error(std::current_exception()); |