From d0cb90e3fdd1bcb943ecadff542edd4260bbae1c Mon Sep 17 00:00:00 2001 From: Determinant Date: Mon, 24 Jun 2019 16:55:52 -0400 Subject: clean up code --- include/salticidae/conn.h | 99 +++++++++++++++++++++-------------------------- 1 file changed, 45 insertions(+), 54 deletions(-) (limited to 'include/salticidae/conn.h') diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 9e2408f..ff75e34 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -103,6 +103,10 @@ class ConnPool { static socket_io_func _send_data_tls_handshake; static socket_io_func _recv_data_dummy; + /** Close the IO and clear all on-going or planned events. Remove the + * connection from a Worker. */ + virtual void stop(); + public: Conn(): terminated(false), worker(nullptr), ready_send(false), send_data_func(nullptr), recv_data_func(nullptr), @@ -114,7 +118,7 @@ class ConnPool { SALTICIDAE_LOG_INFO("destroyed %s", std::string(*this).c_str()); } - bool is_terminated() { + bool is_terminated() const { return terminated.load(std::memory_order_acquire); } @@ -134,35 +138,40 @@ class ConnPool { bool write(bytearray_t &&data) { return send_buffer.push(std::move(data), !cpool->queue_capacity); } - - protected: - /** Close the IO and clear all on-going or planned events. Remove the - * connection from a Worker. */ - virtual void stop(); }; protected: EventContext ec; EventContext disp_ec; ThreadCall* disp_tcall; - /** Should be implemented by derived class to return a new Conn object. */ - virtual Conn *create_conn() = 0; + /* owned by user loop */ + BoxObj user_tcall; + using worker_error_callback_t = std::function; worker_error_callback_t disp_error_cb; worker_error_callback_t worker_error_cb; + conn_t _connect(const NetAddr &addr); + void _listen(NetAddr listen_addr); + void recoverable_error(const std::exception_ptr err) const { + user_tcall->async_call([this, err](ThreadCall::Handle &) { + if (error_cb) error_cb(err, false); + }); + } + /** Terminate the connection (from the worker thread). */ void worker_terminate(const conn_t &conn); /** Terminate the connection (from the dispatcher thread). */ void disp_terminate(const conn_t &conn); - void conn_server(const conn_t &conn, int, int); - /** Called when new data is available. */ - virtual void on_read(const conn_t &) {} - /** Called when the underlying connection is established. */ - virtual void on_setup(const conn_t &) {} - /** Called when the underlying connection breaks. */ - virtual void on_teardown(const conn_t &) {} + /** Should be implemented by derived class to return a new Conn object. */ + virtual Conn *create_conn() = 0; + /** Called when new data is available. */ + virtual void on_read(const conn_t &) {} + /** Called when the underlying connection is established. */ + virtual void on_setup(const conn_t &) {} + /** Called when the underlying connection breaks. */ + virtual void on_teardown(const conn_t &) {} private: const int max_listen_backlog; @@ -172,11 +181,6 @@ class ConnPool { const bool enable_tls; tls_context_t tls_ctx; - /* owned by user loop */ - protected: - BoxObj user_tcall; - - private: conn_callback_t conn_cb; error_callback_t error_cb; @@ -191,10 +195,8 @@ class ConnPool { if (enable_tls && connected) { conn->worker->get_tcall()->async_call([this, conn, ret](ThreadCall::Handle &) { - if (ret) - conn->recv_data_func = Conn::_recv_data_tls; - else - worker_terminate(conn); + if (ret) conn->recv_data_func = Conn::_recv_data_tls; + else worker_terminate(conn); }); } }); @@ -225,17 +227,27 @@ class ConnPool { handle = std::thread([this]() { ec.dispatch(); }); } + void enable_send_buffer(const conn_t &conn, int client_fd) { + conn->get_send_buffer() + .get_queue() + .reg_handler(this->ec, [conn, client_fd] + (MPSCWriteBuffer::queue_t &) { + if (conn->ready_send) + { + conn->ev_socket.del(); + conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE); + conn->send_data_func(conn, client_fd, FdEvent::WRITE); + } + return false; + }); + } + void feed(const conn_t &conn, int client_fd) { /* the caller should finalize all the preparation */ tcall.async_call([this, conn, client_fd](ThreadCall::Handle &) { try { conn->ev_connect.clear(); - if (conn->mode == Conn::ConnMode::DEAD) - { - SALTICIDAE_LOG_INFO("worker %x discarding dead connection", - std::this_thread::get_id()); - return; - } + assert(conn->mode != Conn::ConnMode::DEAD); auto cpool = conn->cpool; if (cpool->enable_tls) { @@ -249,25 +261,14 @@ class ConnPool { { conn->send_data_func = Conn::_send_data; conn->recv_data_func = Conn::_recv_data; + enable_send_buffer(conn, client_fd); cpool->update_conn(conn, true); } assert(conn->fd != -1); + assert(conn->worker == this); SALTICIDAE_LOG_INFO("worker %x got %s", std::this_thread::get_id(), std::string(*conn).c_str()); - assert(conn->worker == this); - conn->get_send_buffer() - .get_queue() - .reg_handler(this->ec, [conn, client_fd] - (MPSCWriteBuffer::queue_t &) { - if (conn->ready_send) - { - conn->ev_socket.del(); - conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE); - conn->send_data_func(conn, client_fd, FdEvent::WRITE); - } - return false; - }); conn->ev_socket = FdEvent(ec, client_fd, [this, conn](int fd, int what) { try { if (what & FdEvent::READ) @@ -306,21 +307,11 @@ class ConnPool { int system_state; void accept_client(int, int); + void conn_server(const conn_t &conn, int, int); conn_t add_conn(const conn_t &conn); void del_conn(const conn_t &conn); void release_conn(const conn_t &conn); - protected: - conn_t _connect(const NetAddr &addr); - void _listen(NetAddr listen_addr); - void recoverable_error(const std::exception_ptr err) const { - user_tcall->async_call([this, err](ThreadCall::Handle &) { - if (error_cb) error_cb(err, false); - }); - } - - private: - Worker &select_worker() { size_t idx = 0; size_t best = workers[idx].get_nconn(); -- cgit v1.2.3