From d0cb90e3fdd1bcb943ecadff542edd4260bbae1c Mon Sep 17 00:00:00 2001 From: Determinant Date: Mon, 24 Jun 2019 16:55:52 -0400 Subject: clean up code --- include/salticidae/conn.h | 99 ++++++++++++++++++++------------------------ include/salticidae/network.h | 97 +++++++++++++++++++++++++++++-------------- include/salticidae/util.h | 4 ++ 3 files changed, 116 insertions(+), 84 deletions(-) (limited to 'include') diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 9e2408f..ff75e34 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -103,6 +103,10 @@ class ConnPool { static socket_io_func _send_data_tls_handshake; static socket_io_func _recv_data_dummy; + /** Close the IO and clear all on-going or planned events. Remove the + * connection from a Worker. */ + virtual void stop(); + public: Conn(): terminated(false), worker(nullptr), ready_send(false), send_data_func(nullptr), recv_data_func(nullptr), @@ -114,7 +118,7 @@ class ConnPool { SALTICIDAE_LOG_INFO("destroyed %s", std::string(*this).c_str()); } - bool is_terminated() { + bool is_terminated() const { return terminated.load(std::memory_order_acquire); } @@ -134,35 +138,40 @@ class ConnPool { bool write(bytearray_t &&data) { return send_buffer.push(std::move(data), !cpool->queue_capacity); } - - protected: - /** Close the IO and clear all on-going or planned events. Remove the - * connection from a Worker. */ - virtual void stop(); }; protected: EventContext ec; EventContext disp_ec; ThreadCall* disp_tcall; - /** Should be implemented by derived class to return a new Conn object. */ - virtual Conn *create_conn() = 0; + /* owned by user loop */ + BoxObj user_tcall; + using worker_error_callback_t = std::function; worker_error_callback_t disp_error_cb; worker_error_callback_t worker_error_cb; + conn_t _connect(const NetAddr &addr); + void _listen(NetAddr listen_addr); + void recoverable_error(const std::exception_ptr err) const { + user_tcall->async_call([this, err](ThreadCall::Handle &) { + if (error_cb) error_cb(err, false); + }); + } + /** Terminate the connection (from the worker thread). */ void worker_terminate(const conn_t &conn); /** Terminate the connection (from the dispatcher thread). */ void disp_terminate(const conn_t &conn); - void conn_server(const conn_t &conn, int, int); - /** Called when new data is available. */ - virtual void on_read(const conn_t &) {} - /** Called when the underlying connection is established. */ - virtual void on_setup(const conn_t &) {} - /** Called when the underlying connection breaks. */ - virtual void on_teardown(const conn_t &) {} + /** Should be implemented by derived class to return a new Conn object. */ + virtual Conn *create_conn() = 0; + /** Called when new data is available. */ + virtual void on_read(const conn_t &) {} + /** Called when the underlying connection is established. */ + virtual void on_setup(const conn_t &) {} + /** Called when the underlying connection breaks. */ + virtual void on_teardown(const conn_t &) {} private: const int max_listen_backlog; @@ -172,11 +181,6 @@ class ConnPool { const bool enable_tls; tls_context_t tls_ctx; - /* owned by user loop */ - protected: - BoxObj user_tcall; - - private: conn_callback_t conn_cb; error_callback_t error_cb; @@ -191,10 +195,8 @@ class ConnPool { if (enable_tls && connected) { conn->worker->get_tcall()->async_call([this, conn, ret](ThreadCall::Handle &) { - if (ret) - conn->recv_data_func = Conn::_recv_data_tls; - else - worker_terminate(conn); + if (ret) conn->recv_data_func = Conn::_recv_data_tls; + else worker_terminate(conn); }); } }); @@ -225,17 +227,27 @@ class ConnPool { handle = std::thread([this]() { ec.dispatch(); }); } + void enable_send_buffer(const conn_t &conn, int client_fd) { + conn->get_send_buffer() + .get_queue() + .reg_handler(this->ec, [conn, client_fd] + (MPSCWriteBuffer::queue_t &) { + if (conn->ready_send) + { + conn->ev_socket.del(); + conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE); + conn->send_data_func(conn, client_fd, FdEvent::WRITE); + } + return false; + }); + } + void feed(const conn_t &conn, int client_fd) { /* the caller should finalize all the preparation */ tcall.async_call([this, conn, client_fd](ThreadCall::Handle &) { try { conn->ev_connect.clear(); - if (conn->mode == Conn::ConnMode::DEAD) - { - SALTICIDAE_LOG_INFO("worker %x discarding dead connection", - std::this_thread::get_id()); - return; - } + assert(conn->mode != Conn::ConnMode::DEAD); auto cpool = conn->cpool; if (cpool->enable_tls) { @@ -249,25 +261,14 @@ class ConnPool { { conn->send_data_func = Conn::_send_data; conn->recv_data_func = Conn::_recv_data; + enable_send_buffer(conn, client_fd); cpool->update_conn(conn, true); } assert(conn->fd != -1); + assert(conn->worker == this); SALTICIDAE_LOG_INFO("worker %x got %s", std::this_thread::get_id(), std::string(*conn).c_str()); - assert(conn->worker == this); - conn->get_send_buffer() - .get_queue() - .reg_handler(this->ec, [conn, client_fd] - (MPSCWriteBuffer::queue_t &) { - if (conn->ready_send) - { - conn->ev_socket.del(); - conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE); - conn->send_data_func(conn, client_fd, FdEvent::WRITE); - } - return false; - }); conn->ev_socket = FdEvent(ec, client_fd, [this, conn](int fd, int what) { try { if (what & FdEvent::READ) @@ -306,21 +307,11 @@ class ConnPool { int system_state; void accept_client(int, int); + void conn_server(const conn_t &conn, int, int); conn_t add_conn(const conn_t &conn); void del_conn(const conn_t &conn); void release_conn(const conn_t &conn); - protected: - conn_t _connect(const NetAddr &addr); - void _listen(NetAddr listen_addr); - void recoverable_error(const std::exception_ptr err) const { - user_tcall->async_call([this, err](ThreadCall::Handle &) { - if (error_cb) error_cb(err, false); - }); - } - - private: - Worker &select_worker() { size_t idx = 0; size_t best = workers[idx].get_nconn(); diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 20dc696..05f4b7b 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -226,23 +226,19 @@ class ClientNetwork: public MsgNetwork { public: class Conn: public MsgNet::Conn { friend ClientNetwork; - public: Conn() = default; - ClientNetwork *get_net() { return static_cast(ConnPool::Conn::get_pool()); } - - protected: - void on_setup() override; - void on_teardown() override; }; using conn_t = ArcObj; protected: ConnPool::Conn *create_conn() override { return new Conn(); } + void on_setup(const ConnPool::conn_t &) override; + void on_teardown(const ConnPool::conn_t &) override; public: using Config = typename MsgNet::Config; @@ -251,7 +247,11 @@ class ClientNetwork: public MsgNetwork { using MsgNet::send_msg; template - void send_msg(MsgType &&msg, const NetAddr &addr); + inline void send_msg(const MsgType &msg, const NetAddr &addr); + inline void _send_msg(const Msg &msg, const NetAddr &addr); + template + inline void send_msg_deferred(MsgType &&msg, const NetAddr &addr); + inline void _send_msg_deferred(Msg &&msg, const NetAddr &addr); }; /** Peer-to-peer network where any two nodes could hold a bi-diretional message @@ -503,7 +503,7 @@ void MsgNetwork::on_read(const ConnPool::conn_t &_conn) { template template inline void MsgNetwork::send_msg_deferred(MsgType &&msg, const conn_t &conn) { - return _send_msg_deferred(MsgType(std::move(msg)), conn); + return _send_msg_deferred(std::move(msg), conn); } template @@ -511,7 +511,7 @@ inline void MsgNetwork::_send_msg_deferred(Msg &&msg, const conn_t & this->disp_tcall->async_call( [this, msg=std::move(msg), conn](ThreadCall::Handle &) { try { - this->send_msg(msg, conn); + this->_send_msg(msg, conn); } catch (...) { this->recoverable_error(std::current_exception()); } }); } @@ -841,7 +841,7 @@ bool PeerNetwork::has_peer(const NetAddr &paddr) const { template template inline void PeerNetwork::send_msg_deferred(MsgType &&msg, const NetAddr &paddr) { - return _send_msg_deferred(MsgType(std::move(msg)), paddr); + return _send_msg_deferred(std::move(msg), paddr); } template @@ -849,7 +849,7 @@ inline void PeerNetwork::_send_msg_deferred(Msg &&msg, const NetAddr & this->disp_tcall->async_call( [this, msg=std::move(msg), paddr](ThreadCall::Handle &) { try { - send_msg(msg, paddr); + _send_msg(msg, paddr); } catch (...) { this->recoverable_error(std::current_exception()); } }); } @@ -864,7 +864,7 @@ template inline void PeerNetwork::_send_msg(const Msg &msg, const NetAddr &paddr) { auto p = get_peer(paddr); if (!p) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - this->send_msg(msg, p->conn); + MsgNet::_send_msg(msg, p->conn); } template @@ -883,7 +883,7 @@ inline void PeerNetwork::_multicast_msg(Msg &&msg, const std::vectorsend_msg(msg, p->conn); + MsgNet::_send_msg(msg, p->conn); } } catch (const PeerNetworkError &) { this->recoverable_error(std::current_exception()); @@ -894,35 +894,54 @@ inline void PeerNetwork::_multicast_msg(Msg &&msg, const std::vector -void ClientNetwork::Conn::on_setup() { - MsgNet::Conn::on_setup(); - assert(this->get_mode() == Conn::PASSIVE); - const auto &addr = this->get_addr(); - auto cn = get_net(); +void ClientNetwork::on_setup(const ConnPool::conn_t &_conn) { + MsgNet::on_setup(_conn); + auto conn = static_pointer_cast(_conn); + assert(conn->get_mode() == Conn::PASSIVE); + const auto &addr = conn->get_addr(); + auto cn = conn->get_net(); cn->addr2conn.erase(addr); - cn->addr2conn.insert( - std::make_pair(addr, static_pointer_cast(this->self()))); + cn->addr2conn.insert(std::make_pair(addr, conn)); } template -void ClientNetwork::Conn::on_teardown() { - MsgNet::Conn::on_teardown(); - get_net()->addr2conn.erase(this->get_addr()); +void ClientNetwork::on_teardown(const ConnPool::conn_t &_conn) { + MsgNet::on_teardown(_conn); + auto conn = static_pointer_cast(_conn); + conn->get_net()->addr2conn.erase(conn->get_addr()); } template template -void ClientNetwork::send_msg(MsgType &&msg, const NetAddr &addr) { +inline void ClientNetwork::send_msg_deferred(MsgType &&msg, const NetAddr &addr) { + return _send_msg_deferred(std::move(msg), addr); +} + +template +inline void ClientNetwork::_send_msg_deferred(Msg &&msg, const NetAddr &addr) { this->disp_tcall->async_call( - [this, addr, msg=std::forward(msg)](ThreadCall::Handle &) { + [this, msg=std::move(msg), addr](ThreadCall::Handle &) { try { - auto it = addr2conn.find(addr); - if (it != addr2conn.end()) - send_msg(std::move(msg), it->second); - } catch (...) { this->disp_error_cb(std::current_exception()); } + _send_msg(msg, addr); + } catch (...) { this->recoverable_error(std::current_exception()); } }); } +template +template +inline void ClientNetwork::send_msg(const MsgType &msg, const NetAddr &addr) { + return _send_msg(msg, addr); +} + +template +inline void ClientNetwork::_send_msg(const Msg &msg, const NetAddr &addr) { + auto it = addr2conn.find(addr); + if (it != addr2conn.end()) + MsgNet::_send_msg(msg, it->second); + else + throw ClientNetworkError(SALTI_ERROR_PEER_NOT_EXIST); +} + template const O PeerNetwork::MsgPing::opcode = OPCODE_PING; @@ -939,6 +958,9 @@ using msgnetwork_conn_t = msgnetwork_t::conn_t; using peernetwork_t = salticidae::PeerNetwork<_opcode_t>; using peernetwork_config_t = peernetwork_t::Config; using peernetwork_conn_t = peernetwork_t::conn_t; + +using clientnetwork_t = salticidae::ClientNetwork<_opcode_t>; +using clientnetwork_conn_t = clientnetwork_t::conn_t; #endif #else @@ -951,6 +973,9 @@ typedef struct msgnetwork_conn_t msgnetwork_conn_t; typedef struct peernetwork_t peernetwork_t; typedef struct peernetwork_config_t peernetwork_config_t; typedef struct peernetwork_conn_t peernetwork_conn_t; + +typedef struct clientnetwork_t clientnetwork_t; +typedef struct clientnetwork_conn_t clientnetwork_conn_t; #endif #endif @@ -1042,11 +1067,23 @@ void peernetwork_send_msg(peernetwork_t *self, const msg_t * msg, const netaddr_ void peernetwork_send_msg_deferred_by_move(peernetwork_t *self, msg_t * _moved_msg, const netaddr_t *paddr); void peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const netaddr_array_t *paddrs); void peernetwork_listen(peernetwork_t *self, const netaddr_t *listen_addr, SalticidaeCError *err); -void peernetwork_stop(peernetwork_t *self); typedef void (*msgnetwork_unknown_peer_callback_t)(const netaddr_t *, void *userdata); void peernetwork_reg_unknown_peer_handler(peernetwork_t *self, msgnetwork_unknown_peer_callback_t cb, void *userdata); +/* ClientNetwork */ + +clientnetwork_t *clientnetwork_new(const eventcontext_t *ec, const msgnetwork_config_t *config, SalticidaeCError *err); +void clientnetwork_free(const clientnetwork_t *self); +msgnetwork_t *clientnetwork_as_msgnetwork(clientnetwork_t *self); +clientnetwork_t *msgnetwork_as_clientnetwork_unsafe(msgnetwork_t *self); +msgnetwork_conn_t *msgnetwork_conn_new_from_clientnetwork_conn(const clientnetwork_conn_t *conn); +clientnetwork_conn_t *clientnetwork_conn_new_from_msgnetwork_conn_unsafe(const msgnetwork_conn_t *conn); +clientnetwork_conn_t *clientnetwork_conn_copy(const clientnetwork_conn_t *self); +void clientnetwork_conn_free(const clientnetwork_conn_t *self); +void clientnetwork_send_msg(clientnetwork_t *self, const msg_t * msg, const netaddr_t *addr); +void clientnetwork_send_msg_deferred_by_move(clientnetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr); + #ifdef __cplusplus } #endif diff --git a/include/salticidae/util.h b/include/salticidae/util.h index 952b18b..27f4e8e 100644 --- a/include/salticidae/util.h +++ b/include/salticidae/util.h @@ -145,6 +145,10 @@ class PeerNetworkError: public ConnPoolError { using ConnPoolError::ConnPoolError; }; +class ClientNetworkError: public ConnPoolError { + using ConnPoolError::ConnPoolError; +}; + extern const char *TTY_COLOR_RED; extern const char *TTY_COLOR_GREEN; extern const char *TTY_COLOR_YELLOW; -- cgit v1.2.3-70-g09d2