diff options
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r-- | include/salticidae/network.h | 150 |
1 files changed, 79 insertions, 71 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h index c59bbb3..4c5fea6 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -110,8 +110,6 @@ class MsgNetwork: public ConnPool { nrecvb.store(0, std::memory_order_relaxed); } #endif - - protected: }; using conn_t = ArcObj<Conn>; @@ -127,7 +125,6 @@ class MsgNetwork: public ConnPool { queue_t incoming_msgs; protected: - ConnPool::Conn *create_conn() override { return new Conn(); } void on_read(const ConnPool::conn_t &) override; @@ -201,16 +198,16 @@ class MsgNetwork: public ConnPool { } template<typename MsgType> - inline void send_msg(const MsgType &msg, const conn_t &conn); - inline void _send_msg(const Msg &msg, const conn_t &conn); + inline bool send_msg(const MsgType &msg, const conn_t &conn); + inline bool _send_msg(const Msg &msg, const conn_t &conn); template<typename MsgType> - inline void send_msg_deferred(MsgType &&msg, const conn_t &conn); - inline void _send_msg_deferred(Msg &&msg, const conn_t &conn); + inline int32_t send_msg_deferred(MsgType &&msg, const conn_t &conn); + inline int32_t _send_msg_deferred(Msg &&msg, const conn_t &conn); void stop() { stop_workers(); } using ConnPool::listen; - conn_t connect(const NetAddr &addr, bool blocking = true) { - return static_pointer_cast<Conn>(ConnPool::connect(addr, blocking)); + conn_t connect_sync(const NetAddr &addr) { + return static_pointer_cast<Conn>(ConnPool::connect_sync(addr)); } }; @@ -248,11 +245,11 @@ class ClientNetwork: public MsgNetwork<OpcodeType> { using MsgNet::send_msg; template<typename MsgType> - inline void send_msg(const MsgType &msg, const NetAddr &addr); - inline void _send_msg(const Msg &msg, const NetAddr &addr); + inline bool send_msg(const MsgType &msg, const NetAddr &addr); + inline bool _send_msg(const Msg &msg, const NetAddr &addr); template<typename MsgType> - inline void send_msg_deferred(MsgType &&msg, const NetAddr &addr); - inline void _send_msg_deferred(Msg &&msg, const NetAddr &addr); + inline int32_t send_msg_deferred(MsgType &&msg, const NetAddr &addr); + inline int32_t _send_msg_deferred(Msg &&msg, const NetAddr &addr); }; /** Peer-to-peer network where any two nodes could hold a bi-diretional message @@ -501,21 +498,21 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { virtual ~PeerNetwork() { this->stop(); } - void add_peer(const NetAddr &addr); - void del_peer(const NetAddr &addr); + int32_t add_peer(const NetAddr &addr); + int32_t del_peer(const NetAddr &addr); bool has_peer(const NetAddr &addr) const; size_t get_npending() const; conn_t get_peer_conn(const NetAddr &addr) const; using MsgNet::send_msg; template<typename MsgType> - inline void send_msg(const MsgType &msg, const NetAddr &addr); - inline void _send_msg(const Msg &msg, const NetAddr &addr); + inline bool send_msg(const MsgType &msg, const NetAddr &addr); + inline bool _send_msg(const Msg &msg, const NetAddr &addr); template<typename MsgType> - inline void send_msg_deferred(MsgType &&msg, const NetAddr &addr); - inline void _send_msg_deferred(Msg &&msg, const NetAddr &addr); + inline int32_t send_msg_deferred(MsgType &&msg, const NetAddr &addr); + inline int32_t _send_msg_deferred(Msg &&msg, const NetAddr &addr); template<typename MsgType> - void multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs); - inline void _multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs); + inline int32_t multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs); + inline int32_t _multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs); void listen(NetAddr listen_addr); conn_t connect(const NetAddr &addr) = delete; @@ -564,28 +561,31 @@ void MsgNetwork<OpcodeType>::on_read(const ConnPool::conn_t &_conn) { template<typename OpcodeType> template<typename MsgType> -inline void MsgNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const conn_t &conn) { +inline int32_t MsgNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const conn_t &conn) { return _send_msg_deferred(std::move(msg), conn); } template<typename OpcodeType> -inline void MsgNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const conn_t &conn) { +inline int32_t MsgNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const conn_t &conn) { + auto id = this->gen_async_id(); this->disp_tcall->async_call( - [this, msg=std::move(msg), conn](ThreadCall::Handle &) { + [this, msg=std::move(msg), conn, id](ThreadCall::Handle &) { try { - this->_send_msg(msg, conn); - } catch (...) { this->recoverable_error(std::current_exception()); } + if (!_send_msg(msg, conn)) + throw SalticidaeError(SALTI_ERROR_CONN_NOT_READY); + } catch (...) { this->recoverable_error(std::current_exception(), id); } }); + return id; } template<typename OpcodeType> template<typename MsgType> -inline void MsgNetwork<OpcodeType>::send_msg(const MsgType &msg, const conn_t &conn) { +inline bool MsgNetwork<OpcodeType>::send_msg(const MsgType &msg, const conn_t &conn) { return _send_msg(msg, conn); } template<typename OpcodeType> -inline void MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn) { +inline bool MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn) { bytearray_t msg_data = msg.serialize(); SALTICIDAE_LOG_DEBUG("wrote message %s to %s", std::string(msg).c_str(), @@ -594,7 +594,7 @@ inline void MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn conn->nsent++; conn->nsentb += msg.get_length(); #endif - conn->write(std::move(msg_data)); + return conn->write(std::move(msg_data)); } template<typename O, O _, O __> @@ -993,8 +993,9 @@ void PeerNetwork<O, _, __>::listen(NetAddr _listen_addr) { } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) { - this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) { +int32_t PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) { + auto id = this->gen_async_id(); + this->disp_tcall->async_call([this, addr, id](ThreadCall::Handle &) { try { pinfo_ulock_t _g(known_peers_lock); if (known_peers.count(addr)) @@ -1007,14 +1008,16 @@ void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) { conn = it->second; known_peers.insert(std::make_pair(addr, new Peer(conn))); } catch (const PeerNetworkError &) { - this->recoverable_error(std::current_exception()); + this->recoverable_error(std::current_exception(), id); } catch (...) { this->disp_error_cb(std::current_exception()); } }); + return id; } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) { - this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) { +int32_t PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) { + auto id = this->gen_async_id(); + this->disp_tcall->async_call([this, addr, id](ThreadCall::Handle &) { try { pinfo_ulock_t _g(known_peers_lock); pinfo_ulock_t __g(pid2peer_lock); @@ -1038,9 +1041,10 @@ void PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) { pid2peer.erase(it3); } } catch (const PeerNetworkError &) { - this->recoverable_error(std::current_exception()); + this->recoverable_error(std::current_exception(), id); } catch (...) { this->disp_error_cb(std::current_exception()); } }); + return id; } template<typename O, O _, O __> @@ -1064,8 +1068,6 @@ PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &addr) const { auto it2 = pid2peer.find(it->second->peer_id); assert(it2 != pid2peer.end()); conn = it2->second->conn; - } catch (const PeerNetworkError &) { - this->recoverable_error(std::current_exception()); } catch (...) { err = std::current_exception(); } @@ -1094,50 +1096,54 @@ size_t PeerNetwork<O, _, __>::get_npending() const { template<typename O, O _, O __> template<typename MsgType> -inline void PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) { +inline int32_t PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) { return _send_msg_deferred(std::move(msg), addr); } template<typename O, O _, O __> -inline void PeerNetwork<O, _, __>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) { +inline int32_t PeerNetwork<O, _, __>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) { + auto id = this->gen_async_id(); this->disp_tcall->async_call( - [this, msg=std::move(msg), addr](ThreadCall::Handle &) { + [this, msg=std::move(msg), addr, id](ThreadCall::Handle &) { try { - _send_msg(msg, addr); - } catch (...) { this->recoverable_error(std::current_exception()); } + if (!_send_msg(msg, addr)) + throw PeerNetworkError(SALTI_ERROR_CONN_NOT_READY); + } catch (...) { this->recoverable_error(std::current_exception(), id); } }); + return id; } template<typename O, O _, O __> template<typename MsgType> -inline void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &addr) { +inline bool PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &addr) { return _send_msg(msg, addr); } template<typename O, O _, O __> -inline void PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const NetAddr &addr) { +inline bool PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const NetAddr &addr) { pinfo_slock_t _g(known_peers_lock); - pinfo_slock_t __g(pid2peer_lock); - MsgNet::_send_msg(msg, _get_peer_conn(addr)); + return MsgNet::_send_msg(msg, _get_peer_conn(addr)); } template<typename O, O _, O __> template<typename MsgType> -inline void PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs) { +inline int32_t PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs) { return _multicast_msg(MsgType(std::move(msg)), addrs); } template<typename O, O _, O __> -inline void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs) { +inline int32_t PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs) { + auto id = this->gen_async_id(); this->disp_tcall->async_call( - [this, msg=std::move(msg), addrs](ThreadCall::Handle &) { + [this, msg=std::move(msg), addrs, id](ThreadCall::Handle &) { try { + bool succ = true; for (auto &addr: addrs) - MsgNet::_send_msg(msg, _get_peer_conn(addr)); - } catch (const PeerNetworkError &) { - this->recoverable_error(std::current_exception()); - } catch (...) { this->recoverable_error(std::current_exception()); } + succ &= MsgNet::_send_msg(msg, _get_peer_conn(addr)); + if (!succ) throw PeerNetworkError(SALTI_ERROR_CONN_NOT_READY); + } catch (...) { this->recoverable_error(std::current_exception(), id); } }); + return id; } /* end: functions invoked by the user loop */ @@ -1162,33 +1168,34 @@ void ClientNetwork<OpcodeType>::on_teardown(const ConnPool::conn_t &_conn) { template<typename OpcodeType> template<typename MsgType> -inline void ClientNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) { +inline int32_t ClientNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) { return _send_msg_deferred(std::move(msg), addr); } template<typename OpcodeType> -inline void ClientNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) { +inline int32_t ClientNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) { + auto id = this->gen_async_id(); this->disp_tcall->async_call( - [this, msg=std::move(msg), addr](ThreadCall::Handle &) { + [this, msg=std::move(msg), addr, id](ThreadCall::Handle &) { try { _send_msg(msg, addr); - } catch (...) { this->recoverable_error(std::current_exception()); } + } catch (...) { this->recoverable_error(std::current_exception(), id); } }); + return id; } template<typename OpcodeType> template<typename MsgType> -inline void ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr) { +inline bool ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr) { return _send_msg(msg, addr); } template<typename OpcodeType> -inline void ClientNetwork<OpcodeType>::_send_msg(const Msg &msg, const NetAddr &addr) { +inline bool ClientNetwork<OpcodeType>::_send_msg(const Msg &msg, const NetAddr &addr) { auto it = addr2conn.find(addr); - if (it != addr2conn.end()) - MsgNet::_send_msg(msg, it->second); - else + if (it == addr2conn.end()) throw ClientNetworkError(SALTI_ERROR_PEER_NOT_EXIST); + return MsgNet::_send_msg(msg, it->second); } template<typename O, O OPCODE_PING, O _> @@ -1264,9 +1271,10 @@ void msgnetwork_config_tls_cert_by_move(msgnetwork_config_t *self, x509_t *cert) msgnetwork_t *msgnetwork_new(const eventcontext_t *ec, const msgnetwork_config_t *config, SalticidaeCError *err); void msgnetwork_free(const msgnetwork_t *self); -void msgnetwork_send_msg(msgnetwork_t *self, const msg_t *msg, const msgnetwork_conn_t *conn); -void msgnetwork_send_msg_deferred_by_move(msgnetwork_t *self, msg_t *_moved_msg, const msgnetwork_conn_t *conn); -msgnetwork_conn_t *msgnetwork_connect(msgnetwork_t *self, const netaddr_t *addr, bool blocking, SalticidaeCError *err); +bool msgnetwork_send_msg(msgnetwork_t *self, const msg_t *msg, const msgnetwork_conn_t *conn); +int32_t msgnetwork_send_msg_deferred_by_move(msgnetwork_t *self, msg_t *_moved_msg, const msgnetwork_conn_t *conn); +msgnetwork_conn_t *msgnetwork_connect_sync(msgnetwork_t *self, const netaddr_t *addr, SalticidaeCError *err); +int32_t msgnetwork_connect(msgnetwork_t *self, const netaddr_t *addr); msgnetwork_conn_t *msgnetwork_conn_copy(const msgnetwork_conn_t *self); void msgnetwork_conn_free(const msgnetwork_conn_t *self); void msgnetwork_listen(msgnetwork_t *self, const netaddr_t *listen_addr, SalticidaeCError *err); @@ -1281,7 +1289,7 @@ typedef bool (*msgnetwork_conn_callback_t)(const msgnetwork_conn_t *, bool conne void msgnetwork_reg_conn_handler(msgnetwork_t *self, msgnetwork_conn_callback_t cb, void *userdata); -typedef void (*msgnetwork_error_callback_t)(const SalticidaeCError *, bool fatal, void *userdata); +typedef void (*msgnetwork_error_callback_t)(const SalticidaeCError *, bool fatal, int32_t async_id, void *userdata); void msgnetwork_reg_error_handler(msgnetwork_t *self, msgnetwork_error_callback_t cb, void *userdata); msgnetwork_t *msgnetwork_conn_get_net(const msgnetwork_conn_t *conn); @@ -1302,8 +1310,8 @@ msgnetwork_config_t *peernetwork_config_as_msgnetwork_config(peernetwork_config_ peernetwork_t *peernetwork_new(const eventcontext_t *ec, const peernetwork_config_t *config, SalticidaeCError *err); void peernetwork_free(const peernetwork_t *self); -void peernetwork_add_peer(peernetwork_t *self, const netaddr_t *addr); -void peernetwork_del_peer(peernetwork_t *self, const netaddr_t *addr); +int32_t peernetwork_add_peer(peernetwork_t *self, const netaddr_t *addr); +int32_t peernetwork_del_peer(peernetwork_t *self, const netaddr_t *addr); bool peernetwork_has_peer(const peernetwork_t *self, const netaddr_t *addr); const peernetwork_conn_t *peernetwork_get_peer_conn(const peernetwork_t *self, const netaddr_t *addr, SalticidaeCError *cerror); msgnetwork_t *peernetwork_as_msgnetwork(peernetwork_t *self); @@ -1314,8 +1322,8 @@ peernetwork_conn_t *peernetwork_conn_copy(const peernetwork_conn_t *self); netaddr_t *peernetwork_conn_get_peer_addr(const peernetwork_conn_t *self); void peernetwork_conn_free(const peernetwork_conn_t *self); bool peernetwork_send_msg(peernetwork_t *self, const msg_t * msg, const netaddr_t *addr); -void peernetwork_send_msg_deferred_by_move(peernetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr); -void peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const netaddr_array_t *addrs); +int32_t peernetwork_send_msg_deferred_by_move(peernetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr); +int32_t peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const netaddr_array_t *addrs); void peernetwork_listen(peernetwork_t *self, const netaddr_t *listen_addr, SalticidaeCError *err); typedef void (*peernetwork_peer_callback_t)(const peernetwork_conn_t *, bool connected, void *userdata); @@ -1335,7 +1343,7 @@ clientnetwork_conn_t *clientnetwork_conn_new_from_msgnetwork_conn_unsafe(const m clientnetwork_conn_t *clientnetwork_conn_copy(const clientnetwork_conn_t *self); void clientnetwork_conn_free(const clientnetwork_conn_t *self); bool clientnetwork_send_msg(clientnetwork_t *self, const msg_t * msg, const netaddr_t *addr); -void clientnetwork_send_msg_deferred_by_move(clientnetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr); +int32_t clientnetwork_send_msg_deferred_by_move(clientnetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr); #ifdef __cplusplus } |