diff options
Diffstat (limited to 'include')
-rw-r--r-- | include/salticidae/conn.h | 73 | ||||
-rw-r--r-- | include/salticidae/network.h | 150 | ||||
-rw-r--r-- | include/salticidae/util.h | 1 |
3 files changed, 118 insertions, 106 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 7f74a87..0058274 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -60,7 +60,7 @@ class ConnPool { /** The type of callback invoked when connection status is changed. */ using conn_callback_t = std::function<bool(const conn_t &, bool)>; /** The type of callback invoked when an error occured (during async execution). */ - using error_callback_t = std::function<void(const std::exception_ptr, bool)>; + using error_callback_t = std::function<void(const std::exception_ptr, bool, int32_t)>; /** Abstraction for a bi-directional connection. */ class Conn { friend ConnPool; @@ -149,12 +149,14 @@ class ConnPool { using worker_error_callback_t = std::function<void(const std::exception_ptr err)>; worker_error_callback_t disp_error_cb; worker_error_callback_t worker_error_cb; + std::atomic<uint16_t> async_id; + int32_t gen_async_id() { return async_id.fetch_add(1, std::memory_order_relaxed); } conn_t _connect(const NetAddr &addr); void _listen(NetAddr listen_addr); - void recoverable_error(const std::exception_ptr err) const { - user_tcall->async_call([this, err](ThreadCall::Handle &) { - if (error_cb) error_cb(err, false); + void recoverable_error(const std::exception_ptr err, int32_t id) const { + user_tcall->async_call([this, err, id](ThreadCall::Handle &) { + if (error_cb) error_cb(err, false, id); }); } @@ -279,7 +281,7 @@ class ConnPool { else conn->send_data_func(conn, fd, what); } catch (...) { - conn->cpool->recoverable_error(std::current_exception()); + conn->cpool->recoverable_error(std::current_exception(), -1); conn->cpool->worker_terminate(conn); } }); @@ -426,6 +428,7 @@ class ConnPool { ConnPool(const EventContext &ec, const Config &config): ec(ec), enable_tls(config._enable_tls), + async_id(0), max_listen_backlog(config._max_listen_backlog), conn_server_timeout(config._conn_server_timeout), seg_buff_size(config._seg_buff_size), @@ -459,8 +462,8 @@ class ConnPool { disp_error_cb = [this](const std::exception_ptr err) { user_tcall->async_call([this, err](ThreadCall::Handle &) { stop_workers(); - std::rethrow_exception(err); - //if (error_cb) error_cb(err, true); + //std::rethrow_exception(err); + if (error_cb) error_cb(err, true, -1); }); disp_ec.stop(); workers[0].stop_tcall(); @@ -528,36 +531,36 @@ class ConnPool { } /** Actively connect to remote addr. */ - conn_t connect(const NetAddr &addr, bool blocking = true) { - if (blocking) - { - auto ret = *(static_cast<std::pair<conn_t, std::exception_ptr> *>( - disp_tcall->call([this, addr](ThreadCall::Handle &h) { - conn_t conn; - std::exception_ptr err = nullptr; - try { - conn = _connect(addr); - } catch (...) { - err = std::current_exception(); - } - h.set_result(std::make_pair(std::move(conn), err)); - }).get())); - if (ret.second) std::rethrow_exception(ret.second); - return std::move(ret.first); - } - else - { - disp_tcall->async_call([this, addr](ThreadCall::Handle &) { - try { - _connect(addr); - } catch (...) { - disp_error_cb(std::current_exception()); - } - }); - return nullptr; - } + conn_t connect_sync(const NetAddr &addr) { + auto ret = *(static_cast<std::pair<conn_t, std::exception_ptr> *>( + disp_tcall->call([this, addr](ThreadCall::Handle &h) { + conn_t conn; + std::exception_ptr err = nullptr; + try { + conn = _connect(addr); + } catch (...) { + err = std::current_exception(); + } + h.set_result(std::make_pair(std::move(conn), err)); + }).get())); + if (ret.second) std::rethrow_exception(ret.second); + return std::move(ret.first); } + /** Actively connect to remote addr (async). */ + int32_t connect(const NetAddr &addr) { + auto id = gen_async_id(); + disp_tcall->async_call([this, addr, id](ThreadCall::Handle &) { + try { + _connect(addr); + } catch (...) { + this->recoverable_error(std::current_exception(), id); + } + }); + return id; + } + + /** Listen for passive connections (connection initiated from remote). * Does not need to be called if do not want to accept any passive * connections. */ diff --git a/include/salticidae/network.h b/include/salticidae/network.h index c59bbb3..4c5fea6 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -110,8 +110,6 @@ class MsgNetwork: public ConnPool { nrecvb.store(0, std::memory_order_relaxed); } #endif - - protected: }; using conn_t = ArcObj<Conn>; @@ -127,7 +125,6 @@ class MsgNetwork: public ConnPool { queue_t incoming_msgs; protected: - ConnPool::Conn *create_conn() override { return new Conn(); } void on_read(const ConnPool::conn_t &) override; @@ -201,16 +198,16 @@ class MsgNetwork: public ConnPool { } template<typename MsgType> - inline void send_msg(const MsgType &msg, const conn_t &conn); - inline void _send_msg(const Msg &msg, const conn_t &conn); + inline bool send_msg(const MsgType &msg, const conn_t &conn); + inline bool _send_msg(const Msg &msg, const conn_t &conn); template<typename MsgType> - inline void send_msg_deferred(MsgType &&msg, const conn_t &conn); - inline void _send_msg_deferred(Msg &&msg, const conn_t &conn); + inline int32_t send_msg_deferred(MsgType &&msg, const conn_t &conn); + inline int32_t _send_msg_deferred(Msg &&msg, const conn_t &conn); void stop() { stop_workers(); } using ConnPool::listen; - conn_t connect(const NetAddr &addr, bool blocking = true) { - return static_pointer_cast<Conn>(ConnPool::connect(addr, blocking)); + conn_t connect_sync(const NetAddr &addr) { + return static_pointer_cast<Conn>(ConnPool::connect_sync(addr)); } }; @@ -248,11 +245,11 @@ class ClientNetwork: public MsgNetwork<OpcodeType> { using MsgNet::send_msg; template<typename MsgType> - inline void send_msg(const MsgType &msg, const NetAddr &addr); - inline void _send_msg(const Msg &msg, const NetAddr &addr); + inline bool send_msg(const MsgType &msg, const NetAddr &addr); + inline bool _send_msg(const Msg &msg, const NetAddr &addr); template<typename MsgType> - inline void send_msg_deferred(MsgType &&msg, const NetAddr &addr); - inline void _send_msg_deferred(Msg &&msg, const NetAddr &addr); + inline int32_t send_msg_deferred(MsgType &&msg, const NetAddr &addr); + inline int32_t _send_msg_deferred(Msg &&msg, const NetAddr &addr); }; /** Peer-to-peer network where any two nodes could hold a bi-diretional message @@ -501,21 +498,21 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { virtual ~PeerNetwork() { this->stop(); } - void add_peer(const NetAddr &addr); - void del_peer(const NetAddr &addr); + int32_t add_peer(const NetAddr &addr); + int32_t del_peer(const NetAddr &addr); bool has_peer(const NetAddr &addr) const; size_t get_npending() const; conn_t get_peer_conn(const NetAddr &addr) const; using MsgNet::send_msg; template<typename MsgType> - inline void send_msg(const MsgType &msg, const NetAddr &addr); - inline void _send_msg(const Msg &msg, const NetAddr &addr); + inline bool send_msg(const MsgType &msg, const NetAddr &addr); + inline bool _send_msg(const Msg &msg, const NetAddr &addr); template<typename MsgType> - inline void send_msg_deferred(MsgType &&msg, const NetAddr &addr); - inline void _send_msg_deferred(Msg &&msg, const NetAddr &addr); + inline int32_t send_msg_deferred(MsgType &&msg, const NetAddr &addr); + inline int32_t _send_msg_deferred(Msg &&msg, const NetAddr &addr); template<typename MsgType> - void multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs); - inline void _multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs); + inline int32_t multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs); + inline int32_t _multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs); void listen(NetAddr listen_addr); conn_t connect(const NetAddr &addr) = delete; @@ -564,28 +561,31 @@ void MsgNetwork<OpcodeType>::on_read(const ConnPool::conn_t &_conn) { template<typename OpcodeType> template<typename MsgType> -inline void MsgNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const conn_t &conn) { +inline int32_t MsgNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const conn_t &conn) { return _send_msg_deferred(std::move(msg), conn); } template<typename OpcodeType> -inline void MsgNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const conn_t &conn) { +inline int32_t MsgNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const conn_t &conn) { + auto id = this->gen_async_id(); this->disp_tcall->async_call( - [this, msg=std::move(msg), conn](ThreadCall::Handle &) { + [this, msg=std::move(msg), conn, id](ThreadCall::Handle &) { try { - this->_send_msg(msg, conn); - } catch (...) { this->recoverable_error(std::current_exception()); } + if (!_send_msg(msg, conn)) + throw SalticidaeError(SALTI_ERROR_CONN_NOT_READY); + } catch (...) { this->recoverable_error(std::current_exception(), id); } }); + return id; } template<typename OpcodeType> template<typename MsgType> -inline void MsgNetwork<OpcodeType>::send_msg(const MsgType &msg, const conn_t &conn) { +inline bool MsgNetwork<OpcodeType>::send_msg(const MsgType &msg, const conn_t &conn) { return _send_msg(msg, conn); } template<typename OpcodeType> -inline void MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn) { +inline bool MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn) { bytearray_t msg_data = msg.serialize(); SALTICIDAE_LOG_DEBUG("wrote message %s to %s", std::string(msg).c_str(), @@ -594,7 +594,7 @@ inline void MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn conn->nsent++; conn->nsentb += msg.get_length(); #endif - conn->write(std::move(msg_data)); + return conn->write(std::move(msg_data)); } template<typename O, O _, O __> @@ -993,8 +993,9 @@ void PeerNetwork<O, _, __>::listen(NetAddr _listen_addr) { } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) { - this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) { +int32_t PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) { + auto id = this->gen_async_id(); + this->disp_tcall->async_call([this, addr, id](ThreadCall::Handle &) { try { pinfo_ulock_t _g(known_peers_lock); if (known_peers.count(addr)) @@ -1007,14 +1008,16 @@ void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) { conn = it->second; known_peers.insert(std::make_pair(addr, new Peer(conn))); } catch (const PeerNetworkError &) { - this->recoverable_error(std::current_exception()); + this->recoverable_error(std::current_exception(), id); } catch (...) { this->disp_error_cb(std::current_exception()); } }); + return id; } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) { - this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) { +int32_t PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) { + auto id = this->gen_async_id(); + this->disp_tcall->async_call([this, addr, id](ThreadCall::Handle &) { try { pinfo_ulock_t _g(known_peers_lock); pinfo_ulock_t __g(pid2peer_lock); @@ -1038,9 +1041,10 @@ void PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) { pid2peer.erase(it3); } } catch (const PeerNetworkError &) { - this->recoverable_error(std::current_exception()); + this->recoverable_error(std::current_exception(), id); } catch (...) { this->disp_error_cb(std::current_exception()); } }); + return id; } template<typename O, O _, O __> @@ -1064,8 +1068,6 @@ PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &addr) const { auto it2 = pid2peer.find(it->second->peer_id); assert(it2 != pid2peer.end()); conn = it2->second->conn; - } catch (const PeerNetworkError &) { - this->recoverable_error(std::current_exception()); } catch (...) { err = std::current_exception(); } @@ -1094,50 +1096,54 @@ size_t PeerNetwork<O, _, __>::get_npending() const { template<typename O, O _, O __> template<typename MsgType> -inline void PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) { +inline int32_t PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) { return _send_msg_deferred(std::move(msg), addr); } template<typename O, O _, O __> -inline void PeerNetwork<O, _, __>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) { +inline int32_t PeerNetwork<O, _, __>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) { + auto id = this->gen_async_id(); this->disp_tcall->async_call( - [this, msg=std::move(msg), addr](ThreadCall::Handle &) { + [this, msg=std::move(msg), addr, id](ThreadCall::Handle &) { try { - _send_msg(msg, addr); - } catch (...) { this->recoverable_error(std::current_exception()); } + if (!_send_msg(msg, addr)) + throw PeerNetworkError(SALTI_ERROR_CONN_NOT_READY); + } catch (...) { this->recoverable_error(std::current_exception(), id); } }); + return id; } template<typename O, O _, O __> template<typename MsgType> -inline void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &addr) { +inline bool PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &addr) { return _send_msg(msg, addr); } template<typename O, O _, O __> -inline void PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const NetAddr &addr) { +inline bool PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const NetAddr &addr) { pinfo_slock_t _g(known_peers_lock); - pinfo_slock_t __g(pid2peer_lock); - MsgNet::_send_msg(msg, _get_peer_conn(addr)); + return MsgNet::_send_msg(msg, _get_peer_conn(addr)); } template<typename O, O _, O __> template<typename MsgType> -inline void PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs) { +inline int32_t PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs) { return _multicast_msg(MsgType(std::move(msg)), addrs); } template<typename O, O _, O __> -inline void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs) { +inline int32_t PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs) { + auto id = this->gen_async_id(); this->disp_tcall->async_call( - [this, msg=std::move(msg), addrs](ThreadCall::Handle &) { + [this, msg=std::move(msg), addrs, id](ThreadCall::Handle &) { try { + bool succ = true; for (auto &addr: addrs) - MsgNet::_send_msg(msg, _get_peer_conn(addr)); - } catch (const PeerNetworkError &) { - this->recoverable_error(std::current_exception()); - } catch (...) { this->recoverable_error(std::current_exception()); } + succ &= MsgNet::_send_msg(msg, _get_peer_conn(addr)); + if (!succ) throw PeerNetworkError(SALTI_ERROR_CONN_NOT_READY); + } catch (...) { this->recoverable_error(std::current_exception(), id); } }); + return id; } /* end: functions invoked by the user loop */ @@ -1162,33 +1168,34 @@ void ClientNetwork<OpcodeType>::on_teardown(const ConnPool::conn_t &_conn) { template<typename OpcodeType> template<typename MsgType> -inline void ClientNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) { +inline int32_t ClientNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) { return _send_msg_deferred(std::move(msg), addr); } template<typename OpcodeType> -inline void ClientNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) { +inline int32_t ClientNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) { + auto id = this->gen_async_id(); this->disp_tcall->async_call( - [this, msg=std::move(msg), addr](ThreadCall::Handle &) { + [this, msg=std::move(msg), addr, id](ThreadCall::Handle &) { try { _send_msg(msg, addr); - } catch (...) { this->recoverable_error(std::current_exception()); } + } catch (...) { this->recoverable_error(std::current_exception(), id); } }); + return id; } template<typename OpcodeType> template<typename MsgType> -inline void ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr) { +inline bool ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr) { return _send_msg(msg, addr); } template<typename OpcodeType> -inline void ClientNetwork<OpcodeType>::_send_msg(const Msg &msg, const NetAddr &addr) { +inline bool ClientNetwork<OpcodeType>::_send_msg(const Msg &msg, const NetAddr &addr) { auto it = addr2conn.find(addr); - if (it != addr2conn.end()) - MsgNet::_send_msg(msg, it->second); - else + if (it == addr2conn.end()) throw ClientNetworkError(SALTI_ERROR_PEER_NOT_EXIST); + return MsgNet::_send_msg(msg, it->second); } template<typename O, O OPCODE_PING, O _> @@ -1264,9 +1271,10 @@ void msgnetwork_config_tls_cert_by_move(msgnetwork_config_t *self, x509_t *cert) msgnetwork_t *msgnetwork_new(const eventcontext_t *ec, const msgnetwork_config_t *config, SalticidaeCError *err); void msgnetwork_free(const msgnetwork_t *self); -void msgnetwork_send_msg(msgnetwork_t *self, const msg_t *msg, const msgnetwork_conn_t *conn); -void msgnetwork_send_msg_deferred_by_move(msgnetwork_t *self, msg_t *_moved_msg, const msgnetwork_conn_t *conn); -msgnetwork_conn_t *msgnetwork_connect(msgnetwork_t *self, const netaddr_t *addr, bool blocking, SalticidaeCError *err); +bool msgnetwork_send_msg(msgnetwork_t *self, const msg_t *msg, const msgnetwork_conn_t *conn); +int32_t msgnetwork_send_msg_deferred_by_move(msgnetwork_t *self, msg_t *_moved_msg, const msgnetwork_conn_t *conn); +msgnetwork_conn_t *msgnetwork_connect_sync(msgnetwork_t *self, const netaddr_t *addr, SalticidaeCError *err); +int32_t msgnetwork_connect(msgnetwork_t *self, const netaddr_t *addr); msgnetwork_conn_t *msgnetwork_conn_copy(const msgnetwork_conn_t *self); void msgnetwork_conn_free(const msgnetwork_conn_t *self); void msgnetwork_listen(msgnetwork_t *self, const netaddr_t *listen_addr, SalticidaeCError *err); @@ -1281,7 +1289,7 @@ typedef bool (*msgnetwork_conn_callback_t)(const msgnetwork_conn_t *, bool conne void msgnetwork_reg_conn_handler(msgnetwork_t *self, msgnetwork_conn_callback_t cb, void *userdata); -typedef void (*msgnetwork_error_callback_t)(const SalticidaeCError *, bool fatal, void *userdata); +typedef void (*msgnetwork_error_callback_t)(const SalticidaeCError *, bool fatal, int32_t async_id, void *userdata); void msgnetwork_reg_error_handler(msgnetwork_t *self, msgnetwork_error_callback_t cb, void *userdata); msgnetwork_t *msgnetwork_conn_get_net(const msgnetwork_conn_t *conn); @@ -1302,8 +1310,8 @@ msgnetwork_config_t *peernetwork_config_as_msgnetwork_config(peernetwork_config_ peernetwork_t *peernetwork_new(const eventcontext_t *ec, const peernetwork_config_t *config, SalticidaeCError *err); void peernetwork_free(const peernetwork_t *self); -void peernetwork_add_peer(peernetwork_t *self, const netaddr_t *addr); -void peernetwork_del_peer(peernetwork_t *self, const netaddr_t *addr); +int32_t peernetwork_add_peer(peernetwork_t *self, const netaddr_t *addr); +int32_t peernetwork_del_peer(peernetwork_t *self, const netaddr_t *addr); bool peernetwork_has_peer(const peernetwork_t *self, const netaddr_t *addr); const peernetwork_conn_t *peernetwork_get_peer_conn(const peernetwork_t *self, const netaddr_t *addr, SalticidaeCError *cerror); msgnetwork_t *peernetwork_as_msgnetwork(peernetwork_t *self); @@ -1314,8 +1322,8 @@ peernetwork_conn_t *peernetwork_conn_copy(const peernetwork_conn_t *self); netaddr_t *peernetwork_conn_get_peer_addr(const peernetwork_conn_t *self); void peernetwork_conn_free(const peernetwork_conn_t *self); bool peernetwork_send_msg(peernetwork_t *self, const msg_t * msg, const netaddr_t *addr); -void peernetwork_send_msg_deferred_by_move(peernetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr); -void peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const netaddr_array_t *addrs); +int32_t peernetwork_send_msg_deferred_by_move(peernetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr); +int32_t peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const netaddr_array_t *addrs); void peernetwork_listen(peernetwork_t *self, const netaddr_t *listen_addr, SalticidaeCError *err); typedef void (*peernetwork_peer_callback_t)(const peernetwork_conn_t *, bool connected, void *userdata); @@ -1335,7 +1343,7 @@ clientnetwork_conn_t *clientnetwork_conn_new_from_msgnetwork_conn_unsafe(const m clientnetwork_conn_t *clientnetwork_conn_copy(const clientnetwork_conn_t *self); void clientnetwork_conn_free(const clientnetwork_conn_t *self); bool clientnetwork_send_msg(clientnetwork_t *self, const msg_t * msg, const netaddr_t *addr); -void clientnetwork_send_msg_deferred_by_move(clientnetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr); +int32_t clientnetwork_send_msg_deferred_by_move(clientnetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr); #ifdef __cplusplus } diff --git a/include/salticidae/util.h b/include/salticidae/util.h index 063058c..017d301 100644 --- a/include/salticidae/util.h +++ b/include/salticidae/util.h @@ -101,6 +101,7 @@ enum SalticidaeErrorCode { SALTI_ERROR_TLS_NO_PEER_CERT, SALTI_ERROR_FD, SALTI_ERROR_RAND_SOURCE, + SALTI_ERROR_CONN_NOT_READY, SALTI_ERROR_UNKNOWN }; |