diff options
-rw-r--r-- | include/salticidae/conn.h | 73 | ||||
-rw-r--r-- | include/salticidae/network.h | 150 | ||||
-rw-r--r-- | include/salticidae/util.h | 1 | ||||
-rw-r--r-- | src/conn.cpp | 4 | ||||
-rw-r--r-- | src/network.cpp | 40 | ||||
-rw-r--r-- | src/util.cpp | 1 | ||||
-rw-r--r-- | test/bench_network.cpp | 2 | ||||
-rw-r--r-- | test/bench_network_tls.cpp | 2 | ||||
-rw-r--r-- | test/test_msgnet.cpp | 6 | ||||
-rw-r--r-- | test/test_msgnet_c.c | 6 | ||||
-rw-r--r-- | test/test_msgnet_tls.cpp | 6 | ||||
-rw-r--r-- | test/test_p2p.cpp | 6 | ||||
-rw-r--r-- | test/test_p2p_stress.cpp | 6 | ||||
-rw-r--r-- | test/test_p2p_tls.cpp | 6 |
14 files changed, 163 insertions, 146 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 7f74a87..0058274 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -60,7 +60,7 @@ class ConnPool { /** The type of callback invoked when connection status is changed. */ using conn_callback_t = std::function<bool(const conn_t &, bool)>; /** The type of callback invoked when an error occured (during async execution). */ - using error_callback_t = std::function<void(const std::exception_ptr, bool)>; + using error_callback_t = std::function<void(const std::exception_ptr, bool, int32_t)>; /** Abstraction for a bi-directional connection. */ class Conn { friend ConnPool; @@ -149,12 +149,14 @@ class ConnPool { using worker_error_callback_t = std::function<void(const std::exception_ptr err)>; worker_error_callback_t disp_error_cb; worker_error_callback_t worker_error_cb; + std::atomic<uint16_t> async_id; + int32_t gen_async_id() { return async_id.fetch_add(1, std::memory_order_relaxed); } conn_t _connect(const NetAddr &addr); void _listen(NetAddr listen_addr); - void recoverable_error(const std::exception_ptr err) const { - user_tcall->async_call([this, err](ThreadCall::Handle &) { - if (error_cb) error_cb(err, false); + void recoverable_error(const std::exception_ptr err, int32_t id) const { + user_tcall->async_call([this, err, id](ThreadCall::Handle &) { + if (error_cb) error_cb(err, false, id); }); } @@ -279,7 +281,7 @@ class ConnPool { else conn->send_data_func(conn, fd, what); } catch (...) { - conn->cpool->recoverable_error(std::current_exception()); + conn->cpool->recoverable_error(std::current_exception(), -1); conn->cpool->worker_terminate(conn); } }); @@ -426,6 +428,7 @@ class ConnPool { ConnPool(const EventContext &ec, const Config &config): ec(ec), enable_tls(config._enable_tls), + async_id(0), max_listen_backlog(config._max_listen_backlog), conn_server_timeout(config._conn_server_timeout), seg_buff_size(config._seg_buff_size), @@ -459,8 +462,8 @@ class ConnPool { disp_error_cb = [this](const std::exception_ptr err) { user_tcall->async_call([this, err](ThreadCall::Handle &) { stop_workers(); - std::rethrow_exception(err); - //if (error_cb) error_cb(err, true); + //std::rethrow_exception(err); + if (error_cb) error_cb(err, true, -1); }); disp_ec.stop(); workers[0].stop_tcall(); @@ -528,36 +531,36 @@ class ConnPool { } /** Actively connect to remote addr. */ - conn_t connect(const NetAddr &addr, bool blocking = true) { - if (blocking) - { - auto ret = *(static_cast<std::pair<conn_t, std::exception_ptr> *>( - disp_tcall->call([this, addr](ThreadCall::Handle &h) { - conn_t conn; - std::exception_ptr err = nullptr; - try { - conn = _connect(addr); - } catch (...) { - err = std::current_exception(); - } - h.set_result(std::make_pair(std::move(conn), err)); - }).get())); - if (ret.second) std::rethrow_exception(ret.second); - return std::move(ret.first); - } - else - { - disp_tcall->async_call([this, addr](ThreadCall::Handle &) { - try { - _connect(addr); - } catch (...) { - disp_error_cb(std::current_exception()); - } - }); - return nullptr; - } + conn_t connect_sync(const NetAddr &addr) { + auto ret = *(static_cast<std::pair<conn_t, std::exception_ptr> *>( + disp_tcall->call([this, addr](ThreadCall::Handle &h) { + conn_t conn; + std::exception_ptr err = nullptr; + try { + conn = _connect(addr); + } catch (...) { + err = std::current_exception(); + } + h.set_result(std::make_pair(std::move(conn), err)); + }).get())); + if (ret.second) std::rethrow_exception(ret.second); + return std::move(ret.first); } + /** Actively connect to remote addr (async). */ + int32_t connect(const NetAddr &addr) { + auto id = gen_async_id(); + disp_tcall->async_call([this, addr, id](ThreadCall::Handle &) { + try { + _connect(addr); + } catch (...) { + this->recoverable_error(std::current_exception(), id); + } + }); + return id; + } + + /** Listen for passive connections (connection initiated from remote). * Does not need to be called if do not want to accept any passive * connections. */ diff --git a/include/salticidae/network.h b/include/salticidae/network.h index c59bbb3..4c5fea6 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -110,8 +110,6 @@ class MsgNetwork: public ConnPool { nrecvb.store(0, std::memory_order_relaxed); } #endif - - protected: }; using conn_t = ArcObj<Conn>; @@ -127,7 +125,6 @@ class MsgNetwork: public ConnPool { queue_t incoming_msgs; protected: - ConnPool::Conn *create_conn() override { return new Conn(); } void on_read(const ConnPool::conn_t &) override; @@ -201,16 +198,16 @@ class MsgNetwork: public ConnPool { } template<typename MsgType> - inline void send_msg(const MsgType &msg, const conn_t &conn); - inline void _send_msg(const Msg &msg, const conn_t &conn); + inline bool send_msg(const MsgType &msg, const conn_t &conn); + inline bool _send_msg(const Msg &msg, const conn_t &conn); template<typename MsgType> - inline void send_msg_deferred(MsgType &&msg, const conn_t &conn); - inline void _send_msg_deferred(Msg &&msg, const conn_t &conn); + inline int32_t send_msg_deferred(MsgType &&msg, const conn_t &conn); + inline int32_t _send_msg_deferred(Msg &&msg, const conn_t &conn); void stop() { stop_workers(); } using ConnPool::listen; - conn_t connect(const NetAddr &addr, bool blocking = true) { - return static_pointer_cast<Conn>(ConnPool::connect(addr, blocking)); + conn_t connect_sync(const NetAddr &addr) { + return static_pointer_cast<Conn>(ConnPool::connect_sync(addr)); } }; @@ -248,11 +245,11 @@ class ClientNetwork: public MsgNetwork<OpcodeType> { using MsgNet::send_msg; template<typename MsgType> - inline void send_msg(const MsgType &msg, const NetAddr &addr); - inline void _send_msg(const Msg &msg, const NetAddr &addr); + inline bool send_msg(const MsgType &msg, const NetAddr &addr); + inline bool _send_msg(const Msg &msg, const NetAddr &addr); template<typename MsgType> - inline void send_msg_deferred(MsgType &&msg, const NetAddr &addr); - inline void _send_msg_deferred(Msg &&msg, const NetAddr &addr); + inline int32_t send_msg_deferred(MsgType &&msg, const NetAddr &addr); + inline int32_t _send_msg_deferred(Msg &&msg, const NetAddr &addr); }; /** Peer-to-peer network where any two nodes could hold a bi-diretional message @@ -501,21 +498,21 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { virtual ~PeerNetwork() { this->stop(); } - void add_peer(const NetAddr &addr); - void del_peer(const NetAddr &addr); + int32_t add_peer(const NetAddr &addr); + int32_t del_peer(const NetAddr &addr); bool has_peer(const NetAddr &addr) const; size_t get_npending() const; conn_t get_peer_conn(const NetAddr &addr) const; using MsgNet::send_msg; template<typename MsgType> - inline void send_msg(const MsgType &msg, const NetAddr &addr); - inline void _send_msg(const Msg &msg, const NetAddr &addr); + inline bool send_msg(const MsgType &msg, const NetAddr &addr); + inline bool _send_msg(const Msg &msg, const NetAddr &addr); template<typename MsgType> - inline void send_msg_deferred(MsgType &&msg, const NetAddr &addr); - inline void _send_msg_deferred(Msg &&msg, const NetAddr &addr); + inline int32_t send_msg_deferred(MsgType &&msg, const NetAddr &addr); + inline int32_t _send_msg_deferred(Msg &&msg, const NetAddr &addr); template<typename MsgType> - void multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs); - inline void _multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs); + inline int32_t multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs); + inline int32_t _multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs); void listen(NetAddr listen_addr); conn_t connect(const NetAddr &addr) = delete; @@ -564,28 +561,31 @@ void MsgNetwork<OpcodeType>::on_read(const ConnPool::conn_t &_conn) { template<typename OpcodeType> template<typename MsgType> -inline void MsgNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const conn_t &conn) { +inline int32_t MsgNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const conn_t &conn) { return _send_msg_deferred(std::move(msg), conn); } template<typename OpcodeType> -inline void MsgNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const conn_t &conn) { +inline int32_t MsgNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const conn_t &conn) { + auto id = this->gen_async_id(); this->disp_tcall->async_call( - [this, msg=std::move(msg), conn](ThreadCall::Handle &) { + [this, msg=std::move(msg), conn, id](ThreadCall::Handle &) { try { - this->_send_msg(msg, conn); - } catch (...) { this->recoverable_error(std::current_exception()); } + if (!_send_msg(msg, conn)) + throw SalticidaeError(SALTI_ERROR_CONN_NOT_READY); + } catch (...) { this->recoverable_error(std::current_exception(), id); } }); + return id; } template<typename OpcodeType> template<typename MsgType> -inline void MsgNetwork<OpcodeType>::send_msg(const MsgType &msg, const conn_t &conn) { +inline bool MsgNetwork<OpcodeType>::send_msg(const MsgType &msg, const conn_t &conn) { return _send_msg(msg, conn); } template<typename OpcodeType> -inline void MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn) { +inline bool MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn) { bytearray_t msg_data = msg.serialize(); SALTICIDAE_LOG_DEBUG("wrote message %s to %s", std::string(msg).c_str(), @@ -594,7 +594,7 @@ inline void MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn conn->nsent++; conn->nsentb += msg.get_length(); #endif - conn->write(std::move(msg_data)); + return conn->write(std::move(msg_data)); } template<typename O, O _, O __> @@ -993,8 +993,9 @@ void PeerNetwork<O, _, __>::listen(NetAddr _listen_addr) { } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) { - this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) { +int32_t PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) { + auto id = this->gen_async_id(); + this->disp_tcall->async_call([this, addr, id](ThreadCall::Handle &) { try { pinfo_ulock_t _g(known_peers_lock); if (known_peers.count(addr)) @@ -1007,14 +1008,16 @@ void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) { conn = it->second; known_peers.insert(std::make_pair(addr, new Peer(conn))); } catch (const PeerNetworkError &) { - this->recoverable_error(std::current_exception()); + this->recoverable_error(std::current_exception(), id); } catch (...) { this->disp_error_cb(std::current_exception()); } }); + return id; } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) { - this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) { +int32_t PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) { + auto id = this->gen_async_id(); + this->disp_tcall->async_call([this, addr, id](ThreadCall::Handle &) { try { pinfo_ulock_t _g(known_peers_lock); pinfo_ulock_t __g(pid2peer_lock); @@ -1038,9 +1041,10 @@ void PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) { pid2peer.erase(it3); } } catch (const PeerNetworkError &) { - this->recoverable_error(std::current_exception()); + this->recoverable_error(std::current_exception(), id); } catch (...) { this->disp_error_cb(std::current_exception()); } }); + return id; } template<typename O, O _, O __> @@ -1064,8 +1068,6 @@ PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &addr) const { auto it2 = pid2peer.find(it->second->peer_id); assert(it2 != pid2peer.end()); conn = it2->second->conn; - } catch (const PeerNetworkError &) { - this->recoverable_error(std::current_exception()); } catch (...) { err = std::current_exception(); } @@ -1094,50 +1096,54 @@ size_t PeerNetwork<O, _, __>::get_npending() const { template<typename O, O _, O __> template<typename MsgType> -inline void PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) { +inline int32_t PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) { return _send_msg_deferred(std::move(msg), addr); } template<typename O, O _, O __> -inline void PeerNetwork<O, _, __>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) { +inline int32_t PeerNetwork<O, _, __>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) { + auto id = this->gen_async_id(); this->disp_tcall->async_call( - [this, msg=std::move(msg), addr](ThreadCall::Handle &) { + [this, msg=std::move(msg), addr, id](ThreadCall::Handle &) { try { - _send_msg(msg, addr); - } catch (...) { this->recoverable_error(std::current_exception()); } + if (!_send_msg(msg, addr)) + throw PeerNetworkError(SALTI_ERROR_CONN_NOT_READY); + } catch (...) { this->recoverable_error(std::current_exception(), id); } }); + return id; } template<typename O, O _, O __> template<typename MsgType> -inline void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &addr) { +inline bool PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &addr) { return _send_msg(msg, addr); } template<typename O, O _, O __> -inline void PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const NetAddr &addr) { +inline bool PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const NetAddr &addr) { pinfo_slock_t _g(known_peers_lock); - pinfo_slock_t __g(pid2peer_lock); - MsgNet::_send_msg(msg, _get_peer_conn(addr)); + return MsgNet::_send_msg(msg, _get_peer_conn(addr)); } template<typename O, O _, O __> template<typename MsgType> -inline void PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs) { +inline int32_t PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs) { return _multicast_msg(MsgType(std::move(msg)), addrs); } template<typename O, O _, O __> -inline void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs) { +inline int32_t PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs) { + auto id = this->gen_async_id(); this->disp_tcall->async_call( - [this, msg=std::move(msg), addrs](ThreadCall::Handle &) { + [this, msg=std::move(msg), addrs, id](ThreadCall::Handle &) { try { + bool succ = true; for (auto &addr: addrs) - MsgNet::_send_msg(msg, _get_peer_conn(addr)); - } catch (const PeerNetworkError &) { - this->recoverable_error(std::current_exception()); - } catch (...) { this->recoverable_error(std::current_exception()); } + succ &= MsgNet::_send_msg(msg, _get_peer_conn(addr)); + if (!succ) throw PeerNetworkError(SALTI_ERROR_CONN_NOT_READY); + } catch (...) { this->recoverable_error(std::current_exception(), id); } }); + return id; } /* end: functions invoked by the user loop */ @@ -1162,33 +1168,34 @@ void ClientNetwork<OpcodeType>::on_teardown(const ConnPool::conn_t &_conn) { template<typename OpcodeType> template<typename MsgType> -inline void ClientNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) { +inline int32_t ClientNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) { return _send_msg_deferred(std::move(msg), addr); } template<typename OpcodeType> -inline void ClientNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) { +inline int32_t ClientNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) { + auto id = this->gen_async_id(); this->disp_tcall->async_call( - [this, msg=std::move(msg), addr](ThreadCall::Handle &) { + [this, msg=std::move(msg), addr, id](ThreadCall::Handle &) { try { _send_msg(msg, addr); - } catch (...) { this->recoverable_error(std::current_exception()); } + } catch (...) { this->recoverable_error(std::current_exception(), id); } }); + return id; } template<typename OpcodeType> template<typename MsgType> -inline void ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr) { +inline bool ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr) { return _send_msg(msg, addr); } template<typename OpcodeType> -inline void ClientNetwork<OpcodeType>::_send_msg(const Msg &msg, const NetAddr &addr) { +inline bool ClientNetwork<OpcodeType>::_send_msg(const Msg &msg, const NetAddr &addr) { auto it = addr2conn.find(addr); - if (it != addr2conn.end()) - MsgNet::_send_msg(msg, it->second); - else + if (it == addr2conn.end()) throw ClientNetworkError(SALTI_ERROR_PEER_NOT_EXIST); + return MsgNet::_send_msg(msg, it->second); } template<typename O, O OPCODE_PING, O _> @@ -1264,9 +1271,10 @@ void msgnetwork_config_tls_cert_by_move(msgnetwork_config_t *self, x509_t *cert) msgnetwork_t *msgnetwork_new(const eventcontext_t *ec, const msgnetwork_config_t *config, SalticidaeCError *err); void msgnetwork_free(const msgnetwork_t *self); -void msgnetwork_send_msg(msgnetwork_t *self, const msg_t *msg, const msgnetwork_conn_t *conn); -void msgnetwork_send_msg_deferred_by_move(msgnetwork_t *self, msg_t *_moved_msg, const msgnetwork_conn_t *conn); -msgnetwork_conn_t *msgnetwork_connect(msgnetwork_t *self, const netaddr_t *addr, bool blocking, SalticidaeCError *err); +bool msgnetwork_send_msg(msgnetwork_t *self, const msg_t *msg, const msgnetwork_conn_t *conn); +int32_t msgnetwork_send_msg_deferred_by_move(msgnetwork_t *self, msg_t *_moved_msg, const msgnetwork_conn_t *conn); +msgnetwork_conn_t *msgnetwork_connect_sync(msgnetwork_t *self, const netaddr_t *addr, SalticidaeCError *err); +int32_t msgnetwork_connect(msgnetwork_t *self, const netaddr_t *addr); msgnetwork_conn_t *msgnetwork_conn_copy(const msgnetwork_conn_t *self); void msgnetwork_conn_free(const msgnetwork_conn_t *self); void msgnetwork_listen(msgnetwork_t *self, const netaddr_t *listen_addr, SalticidaeCError *err); @@ -1281,7 +1289,7 @@ typedef bool (*msgnetwork_conn_callback_t)(const msgnetwork_conn_t *, bool conne void msgnetwork_reg_conn_handler(msgnetwork_t *self, msgnetwork_conn_callback_t cb, void *userdata); -typedef void (*msgnetwork_error_callback_t)(const SalticidaeCError *, bool fatal, void *userdata); +typedef void (*msgnetwork_error_callback_t)(const SalticidaeCError *, bool fatal, int32_t async_id, void *userdata); void msgnetwork_reg_error_handler(msgnetwork_t *self, msgnetwork_error_callback_t cb, void *userdata); msgnetwork_t *msgnetwork_conn_get_net(const msgnetwork_conn_t *conn); @@ -1302,8 +1310,8 @@ msgnetwork_config_t *peernetwork_config_as_msgnetwork_config(peernetwork_config_ peernetwork_t *peernetwork_new(const eventcontext_t *ec, const peernetwork_config_t *config, SalticidaeCError *err); void peernetwork_free(const peernetwork_t *self); -void peernetwork_add_peer(peernetwork_t *self, const netaddr_t *addr); -void peernetwork_del_peer(peernetwork_t *self, const netaddr_t *addr); +int32_t peernetwork_add_peer(peernetwork_t *self, const netaddr_t *addr); +int32_t peernetwork_del_peer(peernetwork_t *self, const netaddr_t *addr); bool peernetwork_has_peer(const peernetwork_t *self, const netaddr_t *addr); const peernetwork_conn_t *peernetwork_get_peer_conn(const peernetwork_t *self, const netaddr_t *addr, SalticidaeCError *cerror); msgnetwork_t *peernetwork_as_msgnetwork(peernetwork_t *self); @@ -1314,8 +1322,8 @@ peernetwork_conn_t *peernetwork_conn_copy(const peernetwork_conn_t *self); netaddr_t *peernetwork_conn_get_peer_addr(const peernetwork_conn_t *self); void peernetwork_conn_free(const peernetwork_conn_t *self); bool peernetwork_send_msg(peernetwork_t *self, const msg_t * msg, const netaddr_t *addr); -void peernetwork_send_msg_deferred_by_move(peernetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr); -void peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const netaddr_array_t *addrs); +int32_t peernetwork_send_msg_deferred_by_move(peernetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr); +int32_t peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const netaddr_array_t *addrs); void peernetwork_listen(peernetwork_t *self, const netaddr_t *listen_addr, SalticidaeCError *err); typedef void (*peernetwork_peer_callback_t)(const peernetwork_conn_t *, bool connected, void *userdata); @@ -1335,7 +1343,7 @@ clientnetwork_conn_t *clientnetwork_conn_new_from_msgnetwork_conn_unsafe(const m clientnetwork_conn_t *clientnetwork_conn_copy(const clientnetwork_conn_t *self); void clientnetwork_conn_free(const clientnetwork_conn_t *self); bool clientnetwork_send_msg(clientnetwork_t *self, const msg_t * msg, const netaddr_t *addr); -void clientnetwork_send_msg_deferred_by_move(clientnetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr); +int32_t clientnetwork_send_msg_deferred_by_move(clientnetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr); #ifdef __cplusplus } diff --git a/include/salticidae/util.h b/include/salticidae/util.h index 063058c..017d301 100644 --- a/include/salticidae/util.h +++ b/include/salticidae/util.h @@ -101,6 +101,7 @@ enum SalticidaeErrorCode { SALTI_ERROR_TLS_NO_PEER_CERT, SALTI_ERROR_FD, SALTI_ERROR_RAND_SOURCE, + SALTI_ERROR_CONN_NOT_READY, SALTI_ERROR_UNKNOWN }; diff --git a/src/conn.cpp b/src/conn.cpp index abea7ac..13486ee 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -303,7 +303,7 @@ void ConnPool::accept_client(int fd, int) { conn->worker = &worker; worker.feed(conn, client_fd); } - } catch (...) { recoverable_error(std::current_exception()); } + } catch (...) { recoverable_error(std::current_exception(), -1); } } void ConnPool::conn_server(const conn_t &conn, int fd, int events) { @@ -324,7 +324,7 @@ void ConnPool::conn_server(const conn_t &conn, int fd, int events) { } } catch (...) { disp_terminate(conn); - recoverable_error(std::current_exception()); + recoverable_error(std::current_exception(), -1); } } diff --git a/src/network.cpp b/src/network.cpp index 05e7b50..2e98439 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -67,22 +67,26 @@ void msgnetwork_config_tls_cert_by_move(msgnetwork_config_t *self, x509_t *cert) self->tls_cert(new x509_t(std::move(*cert))); } -void msgnetwork_send_msg(msgnetwork_t *self, const msg_t *msg, const msgnetwork_conn_t *conn) { - self->_send_msg(*msg, *conn); +bool msgnetwork_send_msg(msgnetwork_t *self, const msg_t *msg, const msgnetwork_conn_t *conn) { + return self->_send_msg(*msg, *conn); } -void msgnetwork_send_msg_deferred_by_move(msgnetwork_t *self, +int32_t msgnetwork_send_msg_deferred_by_move(msgnetwork_t *self, msg_t *_moved_msg, const msgnetwork_conn_t *conn) { - self->_send_msg_deferred(std::move(*_moved_msg), *conn); + return self->_send_msg_deferred(std::move(*_moved_msg), *conn); } -msgnetwork_conn_t *msgnetwork_connect(msgnetwork_t *self, const netaddr_t *addr, bool blocking, SalticidaeCError *cerror) { +msgnetwork_conn_t *msgnetwork_connect_sync(msgnetwork_t *self, const netaddr_t *addr, SalticidaeCError *cerror) { SALTICIDAE_CERROR_TRY(cerror) - return new msgnetwork_conn_t(self->connect(*addr, blocking)); + return new msgnetwork_conn_t(self->connect_sync(*addr)); SALTICIDAE_CERROR_CATCH(cerror) return nullptr; } +int32_t msgnetwork_connect(msgnetwork_t *self, const netaddr_t *addr) { + return self->connect(*addr); +} + msgnetwork_conn_t *msgnetwork_conn_copy(const msgnetwork_conn_t *self) { return new msgnetwork_conn_t(*self); } @@ -126,7 +130,7 @@ void msgnetwork_reg_conn_handler(msgnetwork_t *self, void msgnetwork_reg_error_handler(msgnetwork_t *self, msgnetwork_error_callback_t cb, void *userdata) { - self->reg_error_handler([=](const std::exception_ptr _err, bool fatal) { + self->reg_error_handler([=](const std::exception_ptr _err, bool fatal, int32_t async_id) { SalticidaeCError cerror; try { std::rethrow_exception(_err); @@ -135,7 +139,7 @@ void msgnetwork_reg_error_handler(msgnetwork_t *self, } catch (...) { cerror = salticidae_cerror_unknown(); } - cb(&cerror, fatal, userdata); + cb(&cerror, fatal, async_id, userdata); }); } @@ -194,12 +198,12 @@ peernetwork_t *peernetwork_new(const eventcontext_t *ec, const peernetwork_confi void peernetwork_free(const peernetwork_t *self) { delete self; } -void peernetwork_add_peer(peernetwork_t *self, const netaddr_t *addr) { - self->add_peer(*addr); +int32_t peernetwork_add_peer(peernetwork_t *self, const netaddr_t *addr) { + return self->add_peer(*addr); } -void peernetwork_del_peer(peernetwork_t *self, const netaddr_t *addr) { - self->del_peer(*addr); +int32_t peernetwork_del_peer(peernetwork_t *self, const netaddr_t *addr) { + return self->del_peer(*addr); } bool peernetwork_has_peer(const peernetwork_t *self, const netaddr_t *addr) { @@ -248,14 +252,14 @@ bool peernetwork_send_msg(peernetwork_t *self, const msg_t * msg, const netaddr_ } } -void peernetwork_send_msg_deferred_by_move(peernetwork_t *self, +int32_t peernetwork_send_msg_deferred_by_move(peernetwork_t *self, msg_t *_moved_msg, const netaddr_t *addr) { - self->_send_msg_deferred(std::move(*_moved_msg), *addr); + return self->_send_msg_deferred(std::move(*_moved_msg), *addr); } -void peernetwork_multicast_msg_by_move(peernetwork_t *self, +int32_t peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const netaddr_array_t *addrs) { - self->_multicast_msg(std::move(*_moved_msg), *addrs); + return self->_multicast_msg(std::move(*_moved_msg), *addrs); } void peernetwork_listen(peernetwork_t *self, const netaddr_t *listen_addr, SalticidaeCError *cerror) { @@ -318,8 +322,8 @@ bool clientnetwork_send_msg(clientnetwork_t *self, const msg_t * msg, const neta } } -void clientnetwork_send_msg_deferred_by_move(clientnetwork_t *self, msg_t *_moved_msg, const netaddr_t *addr) { - self->_send_msg_deferred(std::move(*_moved_msg), *addr); +int32_t clientnetwork_send_msg_deferred_by_move(clientnetwork_t *self, msg_t *_moved_msg, const netaddr_t *addr) { + return self->_send_msg_deferred(std::move(*_moved_msg), *addr); } } diff --git a/src/util.cpp b/src/util.cpp index ff41c9b..0fb990e 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -58,6 +58,7 @@ const char *SALTICIDAE_ERROR_STRINGS[] = { "tls fail to get peer cert", "fd error", "rand source is not available, try again", + "connection is not ready", "unknown error" }; diff --git a/test/bench_network.cpp b/test/bench_network.cpp index 5575a66..1021ec4 100644 --- a/test/bench_network.cpp +++ b/test/bench_network.cpp @@ -117,7 +117,7 @@ struct MyNet: public MsgNetworkByteOp { { printf("[%s] disconnected, retrying.\n", this->name.c_str()); /* try to reconnect to the same address */ - connect(conn->get_addr(), false); + connect(conn->get_addr()); } return true; }); diff --git a/test/bench_network_tls.cpp b/test/bench_network_tls.cpp index b5280b4..7c682ba 100644 --- a/test/bench_network_tls.cpp +++ b/test/bench_network_tls.cpp @@ -119,7 +119,7 @@ struct MyNet: public MsgNetworkByteOp { { printf("[%s] disconnected, retrying.\n", this->name.c_str()); /* try to reconnect to the same address */ - connect(conn->get_addr(), false); + connect(conn->get_addr()); } return true; }); diff --git a/test/test_msgnet.cpp b/test/test_msgnet.cpp index 50e35dc..9b52f85 100644 --- a/test/test_msgnet.cpp +++ b/test/test_msgnet.cpp @@ -108,7 +108,7 @@ struct MyNet: public MsgNetworkByteOp { { printf("[%s] disconnected, retrying.\n", this->name.c_str()); /* try to reconnect to the same address */ - connect(conn->get_addr(), false); + connect(conn->get_addr()); } return true; }); @@ -148,8 +148,8 @@ int main() { bob.listen(bob_addr); /* try to connect once */ - alice.connect(bob_addr, false); - bob.connect(alice_addr, false); + alice.connect(bob_addr); + bob.connect(alice_addr); /* the main loop can be shutdown by ctrl-c or kill */ auto shutdown = [&](int) {ec.stop();}; diff --git a/test/test_msgnet_c.c b/test/test_msgnet_c.c index 1417d2b..2ee9443 100644 --- a/test/test_msgnet_c.c +++ b/test/test_msgnet_c.c @@ -139,7 +139,7 @@ bool conn_handler(const msgnetwork_conn_t *conn, bool connected, void *userdata) printf("[%s] Disconnected, retrying.\n", name); /* try to reconnect to the same address */ const netaddr_t *addr = msgnetwork_conn_get_addr(conn); - msgnetwork_connect(net, addr, false, &err); check_err(&err); + msgnetwork_connect(net, addr); } return true; } @@ -194,8 +194,8 @@ int main() { msgnetwork_listen(bob.net, bob_addr, &err); check_err(&err); /* try to connect once */ - msgnetwork_conn_free(msgnetwork_connect(alice.net, bob_addr, false, &err)); check_err(&err); - msgnetwork_conn_free(msgnetwork_connect(bob.net, alice_addr, false, &err)); check_err(&err); + msgnetwork_connect(alice.net, bob_addr); + msgnetwork_connect(bob.net, alice_addr); netaddr_free(alice_addr); netaddr_free(bob_addr); diff --git a/test/test_msgnet_tls.cpp b/test/test_msgnet_tls.cpp index a779ba5..07fe6ac 100644 --- a/test/test_msgnet_tls.cpp +++ b/test/test_msgnet_tls.cpp @@ -121,7 +121,7 @@ struct MyNet: public MsgNetworkByteOp { { printf("[%s] disconnected, retrying.\n", this->name.c_str()); /* try to reconnect to the same address */ - connect(conn->get_addr(), false); + connect(conn->get_addr()); } return res; }); @@ -161,8 +161,8 @@ int main() { bob.listen(bob_addr); /* try to connect once */ - alice.connect(bob_addr, false); - bob.connect(alice_addr, false); + alice.connect(bob_addr); + bob.connect(alice_addr); /* the main loop can be shutdown by ctrl-c or kill */ auto shutdown = [&](int) {ec.stop();}; diff --git a/test/test_p2p.cpp b/test/test_p2p.cpp index 3a30ed2..d097562 100644 --- a/test/test_p2p.cpp +++ b/test/test_p2p.cpp @@ -80,12 +80,12 @@ struct Net { net->reg_handler([this](const MsgText &msg, const PeerNetwork::conn_t &) { fprintf(stdout, "net %lu: peer %lu says %s\n", this->id, msg.id, msg.text.c_str()); }); - net->reg_error_handler([this](const std::exception_ptr _err, bool fatal) { + net->reg_error_handler([this](const std::exception_ptr _err, bool fatal, int32_t async_id) { try { std::rethrow_exception(_err); } catch (const std::exception &err) { - fprintf(stdout, "net %lu: captured %s error during an async call: %s\n", - this->id, fatal ? "fatal" : "recoverable", err.what()); + fprintf(stdout, "net %lu: captured %s error during an async call %d: %s\n", + this->id, fatal ? "fatal" : "recoverable", async_id, err.what()); } }); net->reg_peer_handler([this](const PeerNetwork::conn_t &conn, bool connected) { diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp index fc9a430..9fe1b83 100644 --- a/test/test_p2p_stress.cpp +++ b/test/test_p2p_stress.cpp @@ -128,12 +128,12 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) { send_rand(tc.state, conn, tc); } }); - net.reg_error_handler([ec](const std::exception_ptr _err, bool fatal) { + net.reg_error_handler([ec](const std::exception_ptr _err, bool fatal, int32_t async_id) { try { std::rethrow_exception(_err); } catch (const std::exception & err) { - SALTICIDAE_LOG_WARN("captured %s error: %s", - fatal ? "fatal" : "recoverable", err.what()); + SALTICIDAE_LOG_WARN("captured %s error during async call %d: %s", + fatal ? "fatal" : "recoverable", async_id, err.what()); } }); net.reg_handler([&](MsgRand &&msg, const MyNet::conn_t &conn) { diff --git a/test/test_p2p_tls.cpp b/test/test_p2p_tls.cpp index f814643..698bbac 100644 --- a/test/test_p2p_tls.cpp +++ b/test/test_p2p_tls.cpp @@ -105,12 +105,12 @@ struct Net { } return true; }); - net->reg_error_handler([this](const std::exception_ptr _err, bool fatal) { + net->reg_error_handler([this](const std::exception_ptr _err, bool fatal, int32_t async_id) { try { std::rethrow_exception(_err); } catch (const std::exception &err) { - fprintf(stdout, "net %lu: captured %s error during an async call: %s\n", - this->id, fatal ? "fatal" : "recoverable", err.what()); + fprintf(stdout, "net %lu: captured %s error during an async call %d: %s\n", + this->id, fatal ? "fatal" : "recoverable", async_id, err.what()); } }); net->reg_peer_handler([this](const PeerNetwork::conn_t &conn, bool connected) { |