aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/network.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r--include/salticidae/network.h93
1 files changed, 56 insertions, 37 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index a09f8ac..e9fdae6 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -199,14 +199,16 @@ class MsgNetwork: public ConnPool {
}
template<typename MsgType>
- void send_msg(MsgType &&msg, const conn_t &conn);
- inline void _send_msg(Msg &&msg, const conn_t &conn);
- inline void _send_msg_dispatcher(const Msg &msg, const conn_t &conn);
+ inline void send_msg(const MsgType &msg, const conn_t &conn);
+ inline void _send_msg(const Msg &msg, const conn_t &conn);
+ template<typename MsgType>
+ inline void send_msg_deferred(MsgType &&msg, const conn_t &conn);
+ inline void _send_msg_deferred(Msg &&msg, const conn_t &conn);
void stop() { stop_workers(); }
using ConnPool::listen;
- conn_t connect(const NetAddr &addr) {
- return static_pointer_cast<Conn>(ConnPool::connect(addr));
+ conn_t connect(const NetAddr &addr, bool blocking = true) {
+ return static_pointer_cast<Conn>(ConnPool::connect(addr, blocking));
}
};
@@ -446,8 +448,11 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
const conn_t get_peer_conn(const NetAddr &paddr) const;
using MsgNet::send_msg;
template<typename MsgType>
- void send_msg(MsgType &&msg, const NetAddr &paddr);
- inline void _send_msg(Msg &&msg, const NetAddr &paddr);
+ inline void send_msg(const MsgType &msg, const NetAddr &paddr);
+ inline void _send_msg(const Msg &msg, const NetAddr &paddr);
+ template<typename MsgType>
+ inline void send_msg_deferred(MsgType &&msg, const NetAddr &paddr);
+ inline void _send_msg_deferred(Msg &&msg, const NetAddr &paddr);
template<typename MsgType>
void multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs);
inline void _multicast_msg(Msg &&msg, const std::vector<NetAddr> &paddrs);
@@ -496,20 +501,39 @@ void MsgNetwork<OpcodeType>::Conn::on_read() {
template<typename OpcodeType>
template<typename MsgType>
-void MsgNetwork<OpcodeType>::send_msg(MsgType &&msg, const conn_t &conn) {
- return _send_msg(MsgType(std::move(msg)), conn);
+inline void MsgNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const conn_t &conn) {
+ return _send_msg_deferred(MsgType(std::move(msg)), conn);
}
template<typename OpcodeType>
-inline void MsgNetwork<OpcodeType>::_send_msg(Msg &&msg, const conn_t &conn) {
+inline void MsgNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const conn_t &conn) {
this->disp_tcall->async_call(
[this, msg=std::move(msg), conn](ThreadCall::Handle &) {
try {
- this->_send_msg_dispatcher(msg, conn);
+ this->send_msg(msg, conn);
} catch (...) { this->recoverable_error(std::current_exception()); }
});
}
+template<typename OpcodeType>
+template<typename MsgType>
+inline void MsgNetwork<OpcodeType>::send_msg(const MsgType &msg, const conn_t &conn) {
+ return _send_msg(msg, conn);
+}
+
+template<typename OpcodeType>
+inline void MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn) {
+ bytearray_t msg_data = msg.serialize();
+ SALTICIDAE_LOG_DEBUG("wrote message %s to %s",
+ std::string(msg).c_str(),
+ std::string(*conn).c_str());
+#ifdef SALTICIDAE_MSG_STAT
+ conn->nsent++;
+ conn->nsentb += msg.get_length();
+#endif
+ conn->write(std::move(msg_data));
+}
+
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker,
const conn_t &conn, double timeout) {
@@ -686,19 +710,6 @@ typename PeerNetwork<O, _, __>::Peer *PeerNetwork<O, _, __>::get_peer(const NetA
if (it != id2upeer.end()) return it->second.get();
return nullptr;
}
-
-template<typename OpcodeType>
-inline void MsgNetwork<OpcodeType>::_send_msg_dispatcher(const Msg &msg, const conn_t &conn) {
- bytearray_t msg_data = msg.serialize();
- SALTICIDAE_LOG_DEBUG("wrote message %s to %s",
- std::string(msg).c_str(),
- std::string(*conn).c_str());
-#ifdef SALTICIDAE_MSG_STAT
- conn->nsent++;
- conn->nsentb += msg.get_length();
-#endif
- conn->write(std::move(msg_data));
-}
/* end: functions invoked by the dispatcher */
/* begin: functions invoked by the user loop */
@@ -826,33 +837,41 @@ bool PeerNetwork<O, _, __>::has_peer(const NetAddr &paddr) const {
template<typename O, O _, O __>
template<typename MsgType>
-void PeerNetwork<O, _, __>::send_msg(MsgType &&msg, const NetAddr &paddr) {
- return _send_msg(MsgType(std::move(msg)), paddr);
+inline void PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const NetAddr &paddr) {
+ return _send_msg_deferred(MsgType(std::move(msg)), paddr);
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::_send_msg(Msg &&msg, const NetAddr &paddr) {
+inline void PeerNetwork<O, _, __>::_send_msg_deferred(Msg &&msg, const NetAddr &paddr) {
this->disp_tcall->async_call(
- [this, msg=std::move(msg), paddr](ThreadCall::Handle &) {
+ [this, msg=std::move(msg), paddr](ThreadCall::Handle &) {
try {
- auto p = get_peer(paddr);
- if (!p)
- throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- this->_send_msg_dispatcher(msg, p->conn);
- } catch (const PeerNetworkError &) {
- this->recoverable_error(std::current_exception());
+ send_msg(msg, paddr);
} catch (...) { this->recoverable_error(std::current_exception()); }
});
}
template<typename O, O _, O __>
template<typename MsgType>
-void PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs) {
+inline void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &paddr) {
+ return _send_msg(msg, paddr);
+}
+
+template<typename O, O _, O __>
+inline void PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const NetAddr &paddr) {
+ auto p = get_peer(paddr);
+ if (!p) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
+ this->send_msg(msg, p->conn);
+}
+
+template<typename O, O _, O __>
+template<typename MsgType>
+inline void PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs) {
return _multicast_msg(MsgType(std::move(msg)), paddrs);
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &paddrs) {
+inline void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &paddrs) {
this->disp_tcall->async_call(
[this, msg=std::move(msg), paddrs](ThreadCall::Handle &) {
try {
@@ -861,7 +880,7 @@ void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr>
auto p = get_peer(addr);
if (!p)
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- this->_send_msg_dispatcher(msg, p->conn);
+ this->send_msg(msg, p->conn);
}
} catch (const PeerNetworkError &) {
this->recoverable_error(std::current_exception());