aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/network.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r--include/salticidae/network.h150
1 files changed, 79 insertions, 71 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index c59bbb3..4c5fea6 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -110,8 +110,6 @@ class MsgNetwork: public ConnPool {
nrecvb.store(0, std::memory_order_relaxed);
}
#endif
-
- protected:
};
using conn_t = ArcObj<Conn>;
@@ -127,7 +125,6 @@ class MsgNetwork: public ConnPool {
queue_t incoming_msgs;
protected:
-
ConnPool::Conn *create_conn() override { return new Conn(); }
void on_read(const ConnPool::conn_t &) override;
@@ -201,16 +198,16 @@ class MsgNetwork: public ConnPool {
}
template<typename MsgType>
- inline void send_msg(const MsgType &msg, const conn_t &conn);
- inline void _send_msg(const Msg &msg, const conn_t &conn);
+ inline bool send_msg(const MsgType &msg, const conn_t &conn);
+ inline bool _send_msg(const Msg &msg, const conn_t &conn);
template<typename MsgType>
- inline void send_msg_deferred(MsgType &&msg, const conn_t &conn);
- inline void _send_msg_deferred(Msg &&msg, const conn_t &conn);
+ inline int32_t send_msg_deferred(MsgType &&msg, const conn_t &conn);
+ inline int32_t _send_msg_deferred(Msg &&msg, const conn_t &conn);
void stop() { stop_workers(); }
using ConnPool::listen;
- conn_t connect(const NetAddr &addr, bool blocking = true) {
- return static_pointer_cast<Conn>(ConnPool::connect(addr, blocking));
+ conn_t connect_sync(const NetAddr &addr) {
+ return static_pointer_cast<Conn>(ConnPool::connect_sync(addr));
}
};
@@ -248,11 +245,11 @@ class ClientNetwork: public MsgNetwork<OpcodeType> {
using MsgNet::send_msg;
template<typename MsgType>
- inline void send_msg(const MsgType &msg, const NetAddr &addr);
- inline void _send_msg(const Msg &msg, const NetAddr &addr);
+ inline bool send_msg(const MsgType &msg, const NetAddr &addr);
+ inline bool _send_msg(const Msg &msg, const NetAddr &addr);
template<typename MsgType>
- inline void send_msg_deferred(MsgType &&msg, const NetAddr &addr);
- inline void _send_msg_deferred(Msg &&msg, const NetAddr &addr);
+ inline int32_t send_msg_deferred(MsgType &&msg, const NetAddr &addr);
+ inline int32_t _send_msg_deferred(Msg &&msg, const NetAddr &addr);
};
/** Peer-to-peer network where any two nodes could hold a bi-diretional message
@@ -501,21 +498,21 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
virtual ~PeerNetwork() { this->stop(); }
- void add_peer(const NetAddr &addr);
- void del_peer(const NetAddr &addr);
+ int32_t add_peer(const NetAddr &addr);
+ int32_t del_peer(const NetAddr &addr);
bool has_peer(const NetAddr &addr) const;
size_t get_npending() const;
conn_t get_peer_conn(const NetAddr &addr) const;
using MsgNet::send_msg;
template<typename MsgType>
- inline void send_msg(const MsgType &msg, const NetAddr &addr);
- inline void _send_msg(const Msg &msg, const NetAddr &addr);
+ inline bool send_msg(const MsgType &msg, const NetAddr &addr);
+ inline bool _send_msg(const Msg &msg, const NetAddr &addr);
template<typename MsgType>
- inline void send_msg_deferred(MsgType &&msg, const NetAddr &addr);
- inline void _send_msg_deferred(Msg &&msg, const NetAddr &addr);
+ inline int32_t send_msg_deferred(MsgType &&msg, const NetAddr &addr);
+ inline int32_t _send_msg_deferred(Msg &&msg, const NetAddr &addr);
template<typename MsgType>
- void multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs);
- inline void _multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs);
+ inline int32_t multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs);
+ inline int32_t _multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs);
void listen(NetAddr listen_addr);
conn_t connect(const NetAddr &addr) = delete;
@@ -564,28 +561,31 @@ void MsgNetwork<OpcodeType>::on_read(const ConnPool::conn_t &_conn) {
template<typename OpcodeType>
template<typename MsgType>
-inline void MsgNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const conn_t &conn) {
+inline int32_t MsgNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const conn_t &conn) {
return _send_msg_deferred(std::move(msg), conn);
}
template<typename OpcodeType>
-inline void MsgNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const conn_t &conn) {
+inline int32_t MsgNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const conn_t &conn) {
+ auto id = this->gen_async_id();
this->disp_tcall->async_call(
- [this, msg=std::move(msg), conn](ThreadCall::Handle &) {
+ [this, msg=std::move(msg), conn, id](ThreadCall::Handle &) {
try {
- this->_send_msg(msg, conn);
- } catch (...) { this->recoverable_error(std::current_exception()); }
+ if (!_send_msg(msg, conn))
+ throw SalticidaeError(SALTI_ERROR_CONN_NOT_READY);
+ } catch (...) { this->recoverable_error(std::current_exception(), id); }
});
+ return id;
}
template<typename OpcodeType>
template<typename MsgType>
-inline void MsgNetwork<OpcodeType>::send_msg(const MsgType &msg, const conn_t &conn) {
+inline bool MsgNetwork<OpcodeType>::send_msg(const MsgType &msg, const conn_t &conn) {
return _send_msg(msg, conn);
}
template<typename OpcodeType>
-inline void MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn) {
+inline bool MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn) {
bytearray_t msg_data = msg.serialize();
SALTICIDAE_LOG_DEBUG("wrote message %s to %s",
std::string(msg).c_str(),
@@ -594,7 +594,7 @@ inline void MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn
conn->nsent++;
conn->nsentb += msg.get_length();
#endif
- conn->write(std::move(msg_data));
+ return conn->write(std::move(msg_data));
}
template<typename O, O _, O __>
@@ -993,8 +993,9 @@ void PeerNetwork<O, _, __>::listen(NetAddr _listen_addr) {
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
- this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) {
+int32_t PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
+ auto id = this->gen_async_id();
+ this->disp_tcall->async_call([this, addr, id](ThreadCall::Handle &) {
try {
pinfo_ulock_t _g(known_peers_lock);
if (known_peers.count(addr))
@@ -1007,14 +1008,16 @@ void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
conn = it->second;
known_peers.insert(std::make_pair(addr, new Peer(conn)));
} catch (const PeerNetworkError &) {
- this->recoverable_error(std::current_exception());
+ this->recoverable_error(std::current_exception(), id);
} catch (...) { this->disp_error_cb(std::current_exception()); }
});
+ return id;
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) {
- this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) {
+int32_t PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) {
+ auto id = this->gen_async_id();
+ this->disp_tcall->async_call([this, addr, id](ThreadCall::Handle &) {
try {
pinfo_ulock_t _g(known_peers_lock);
pinfo_ulock_t __g(pid2peer_lock);
@@ -1038,9 +1041,10 @@ void PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) {
pid2peer.erase(it3);
}
} catch (const PeerNetworkError &) {
- this->recoverable_error(std::current_exception());
+ this->recoverable_error(std::current_exception(), id);
} catch (...) { this->disp_error_cb(std::current_exception()); }
});
+ return id;
}
template<typename O, O _, O __>
@@ -1064,8 +1068,6 @@ PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &addr) const {
auto it2 = pid2peer.find(it->second->peer_id);
assert(it2 != pid2peer.end());
conn = it2->second->conn;
- } catch (const PeerNetworkError &) {
- this->recoverable_error(std::current_exception());
} catch (...) {
err = std::current_exception();
}
@@ -1094,50 +1096,54 @@ size_t PeerNetwork<O, _, __>::get_npending() const {
template<typename O, O _, O __>
template<typename MsgType>
-inline void PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) {
+inline int32_t PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) {
return _send_msg_deferred(std::move(msg), addr);
}
template<typename O, O _, O __>
-inline void PeerNetwork<O, _, __>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) {
+inline int32_t PeerNetwork<O, _, __>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) {
+ auto id = this->gen_async_id();
this->disp_tcall->async_call(
- [this, msg=std::move(msg), addr](ThreadCall::Handle &) {
+ [this, msg=std::move(msg), addr, id](ThreadCall::Handle &) {
try {
- _send_msg(msg, addr);
- } catch (...) { this->recoverable_error(std::current_exception()); }
+ if (!_send_msg(msg, addr))
+ throw PeerNetworkError(SALTI_ERROR_CONN_NOT_READY);
+ } catch (...) { this->recoverable_error(std::current_exception(), id); }
});
+ return id;
}
template<typename O, O _, O __>
template<typename MsgType>
-inline void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &addr) {
+inline bool PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &addr) {
return _send_msg(msg, addr);
}
template<typename O, O _, O __>
-inline void PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const NetAddr &addr) {
+inline bool PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const NetAddr &addr) {
pinfo_slock_t _g(known_peers_lock);
- pinfo_slock_t __g(pid2peer_lock);
- MsgNet::_send_msg(msg, _get_peer_conn(addr));
+ return MsgNet::_send_msg(msg, _get_peer_conn(addr));
}
template<typename O, O _, O __>
template<typename MsgType>
-inline void PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs) {
+inline int32_t PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs) {
return _multicast_msg(MsgType(std::move(msg)), addrs);
}
template<typename O, O _, O __>
-inline void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs) {
+inline int32_t PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs) {
+ auto id = this->gen_async_id();
this->disp_tcall->async_call(
- [this, msg=std::move(msg), addrs](ThreadCall::Handle &) {
+ [this, msg=std::move(msg), addrs, id](ThreadCall::Handle &) {
try {
+ bool succ = true;
for (auto &addr: addrs)
- MsgNet::_send_msg(msg, _get_peer_conn(addr));
- } catch (const PeerNetworkError &) {
- this->recoverable_error(std::current_exception());
- } catch (...) { this->recoverable_error(std::current_exception()); }
+ succ &= MsgNet::_send_msg(msg, _get_peer_conn(addr));
+ if (!succ) throw PeerNetworkError(SALTI_ERROR_CONN_NOT_READY);
+ } catch (...) { this->recoverable_error(std::current_exception(), id); }
});
+ return id;
}
/* end: functions invoked by the user loop */
@@ -1162,33 +1168,34 @@ void ClientNetwork<OpcodeType>::on_teardown(const ConnPool::conn_t &_conn) {
template<typename OpcodeType>
template<typename MsgType>
-inline void ClientNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) {
+inline int32_t ClientNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) {
return _send_msg_deferred(std::move(msg), addr);
}
template<typename OpcodeType>
-inline void ClientNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) {
+inline int32_t ClientNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) {
+ auto id = this->gen_async_id();
this->disp_tcall->async_call(
- [this, msg=std::move(msg), addr](ThreadCall::Handle &) {
+ [this, msg=std::move(msg), addr, id](ThreadCall::Handle &) {
try {
_send_msg(msg, addr);
- } catch (...) { this->recoverable_error(std::current_exception()); }
+ } catch (...) { this->recoverable_error(std::current_exception(), id); }
});
+ return id;
}
template<typename OpcodeType>
template<typename MsgType>
-inline void ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr) {
+inline bool ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr) {
return _send_msg(msg, addr);
}
template<typename OpcodeType>
-inline void ClientNetwork<OpcodeType>::_send_msg(const Msg &msg, const NetAddr &addr) {
+inline bool ClientNetwork<OpcodeType>::_send_msg(const Msg &msg, const NetAddr &addr) {
auto it = addr2conn.find(addr);
- if (it != addr2conn.end())
- MsgNet::_send_msg(msg, it->second);
- else
+ if (it == addr2conn.end())
throw ClientNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
+ return MsgNet::_send_msg(msg, it->second);
}
template<typename O, O OPCODE_PING, O _>
@@ -1264,9 +1271,10 @@ void msgnetwork_config_tls_cert_by_move(msgnetwork_config_t *self, x509_t *cert)
msgnetwork_t *msgnetwork_new(const eventcontext_t *ec, const msgnetwork_config_t *config, SalticidaeCError *err);
void msgnetwork_free(const msgnetwork_t *self);
-void msgnetwork_send_msg(msgnetwork_t *self, const msg_t *msg, const msgnetwork_conn_t *conn);
-void msgnetwork_send_msg_deferred_by_move(msgnetwork_t *self, msg_t *_moved_msg, const msgnetwork_conn_t *conn);
-msgnetwork_conn_t *msgnetwork_connect(msgnetwork_t *self, const netaddr_t *addr, bool blocking, SalticidaeCError *err);
+bool msgnetwork_send_msg(msgnetwork_t *self, const msg_t *msg, const msgnetwork_conn_t *conn);
+int32_t msgnetwork_send_msg_deferred_by_move(msgnetwork_t *self, msg_t *_moved_msg, const msgnetwork_conn_t *conn);
+msgnetwork_conn_t *msgnetwork_connect_sync(msgnetwork_t *self, const netaddr_t *addr, SalticidaeCError *err);
+int32_t msgnetwork_connect(msgnetwork_t *self, const netaddr_t *addr);
msgnetwork_conn_t *msgnetwork_conn_copy(const msgnetwork_conn_t *self);
void msgnetwork_conn_free(const msgnetwork_conn_t *self);
void msgnetwork_listen(msgnetwork_t *self, const netaddr_t *listen_addr, SalticidaeCError *err);
@@ -1281,7 +1289,7 @@ typedef bool (*msgnetwork_conn_callback_t)(const msgnetwork_conn_t *, bool conne
void msgnetwork_reg_conn_handler(msgnetwork_t *self, msgnetwork_conn_callback_t cb, void *userdata);
-typedef void (*msgnetwork_error_callback_t)(const SalticidaeCError *, bool fatal, void *userdata);
+typedef void (*msgnetwork_error_callback_t)(const SalticidaeCError *, bool fatal, int32_t async_id, void *userdata);
void msgnetwork_reg_error_handler(msgnetwork_t *self, msgnetwork_error_callback_t cb, void *userdata);
msgnetwork_t *msgnetwork_conn_get_net(const msgnetwork_conn_t *conn);
@@ -1302,8 +1310,8 @@ msgnetwork_config_t *peernetwork_config_as_msgnetwork_config(peernetwork_config_
peernetwork_t *peernetwork_new(const eventcontext_t *ec, const peernetwork_config_t *config, SalticidaeCError *err);
void peernetwork_free(const peernetwork_t *self);
-void peernetwork_add_peer(peernetwork_t *self, const netaddr_t *addr);
-void peernetwork_del_peer(peernetwork_t *self, const netaddr_t *addr);
+int32_t peernetwork_add_peer(peernetwork_t *self, const netaddr_t *addr);
+int32_t peernetwork_del_peer(peernetwork_t *self, const netaddr_t *addr);
bool peernetwork_has_peer(const peernetwork_t *self, const netaddr_t *addr);
const peernetwork_conn_t *peernetwork_get_peer_conn(const peernetwork_t *self, const netaddr_t *addr, SalticidaeCError *cerror);
msgnetwork_t *peernetwork_as_msgnetwork(peernetwork_t *self);
@@ -1314,8 +1322,8 @@ peernetwork_conn_t *peernetwork_conn_copy(const peernetwork_conn_t *self);
netaddr_t *peernetwork_conn_get_peer_addr(const peernetwork_conn_t *self);
void peernetwork_conn_free(const peernetwork_conn_t *self);
bool peernetwork_send_msg(peernetwork_t *self, const msg_t * msg, const netaddr_t *addr);
-void peernetwork_send_msg_deferred_by_move(peernetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr);
-void peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const netaddr_array_t *addrs);
+int32_t peernetwork_send_msg_deferred_by_move(peernetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr);
+int32_t peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const netaddr_array_t *addrs);
void peernetwork_listen(peernetwork_t *self, const netaddr_t *listen_addr, SalticidaeCError *err);
typedef void (*peernetwork_peer_callback_t)(const peernetwork_conn_t *, bool connected, void *userdata);
@@ -1335,7 +1343,7 @@ clientnetwork_conn_t *clientnetwork_conn_new_from_msgnetwork_conn_unsafe(const m
clientnetwork_conn_t *clientnetwork_conn_copy(const clientnetwork_conn_t *self);
void clientnetwork_conn_free(const clientnetwork_conn_t *self);
bool clientnetwork_send_msg(clientnetwork_t *self, const msg_t * msg, const netaddr_t *addr);
-void clientnetwork_send_msg_deferred_by_move(clientnetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr);
+int32_t clientnetwork_send_msg_deferred_by_move(clientnetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr);
#ifdef __cplusplus
}