From 13e2643356bfcdeffd9f6e854f07ee68f29dc23b Mon Sep 17 00:00:00 2001 From: Determinant Date: Sun, 23 Jun 2019 02:00:19 -0400 Subject: get rid of `self()` and `release_self()`; fix hidden bugs --- include/salticidae/conn.h | 63 +++++++++++++++++++++++------------------------ 1 file changed, 31 insertions(+), 32 deletions(-) (limited to 'include/salticidae/conn.h') diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index ceec176..9e2408f 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -59,6 +59,7 @@ class ConnPool { using conn_t = ArcObj; /** The type of callback invoked when connection status is changed. */ using conn_callback_t = std::function; + /** The type of callback invoked when an error occured (during async execution). */ using error_callback_t = std::function; /** Abstraction for a bi-directional connection. */ class Conn { @@ -71,9 +72,8 @@ class ConnPool { }; protected: + std::atomic terminated; size_t seg_buff_size; - conn_t self_ref; - std::mutex ref_mlock; int fd; Worker *worker; ConnPool *cpool; @@ -85,7 +85,6 @@ class ConnPool { TimedFdEvent ev_connect; FdEvent ev_socket; - TimerEvent ev_send_wait; /** does not need to wait if true */ bool ready_send; @@ -104,15 +103,8 @@ class ConnPool { static socket_io_func _send_data_tls_handshake; static socket_io_func _recv_data_dummy; - void conn_server(int, int); - - /** Terminate the connection (from the worker thread). */ - void worker_terminate(); - /** Terminate the connection (from the dispatcher thread). */ - void disp_terminate(); - public: - Conn(): worker(nullptr), ready_send(false), + Conn(): terminated(false), worker(nullptr), ready_send(false), send_data_func(nullptr), recv_data_func(nullptr), tls(nullptr), peer_cert(nullptr) {} Conn(const Conn &) = delete; @@ -122,15 +114,12 @@ class ConnPool { SALTICIDAE_LOG_INFO("destroyed %s", std::string(*this).c_str()); } - /** Get the handle to itself. */ - conn_t self() { - mutex_lg_t _(ref_mlock); - return self_ref; + bool is_terminated() { + return terminated.load(std::memory_order_acquire); } - void release_self() { - mutex_lg_t _(ref_mlock); - self_ref = nullptr; + bool set_terminated() { + return !terminated.exchange(true, std::memory_order_acq_rel); } operator std::string() const; @@ -150,12 +139,6 @@ class ConnPool { /** Close the IO and clear all on-going or planned events. Remove the * connection from a Worker. */ virtual void stop(); - /** Called when new data is available. */ - virtual void on_read() {} - /** Called when the underlying connection is established. */ - virtual void on_setup() {} - /** Called when the underlying connection breaks. */ - virtual void on_teardown() {} }; protected: @@ -168,6 +151,18 @@ class ConnPool { worker_error_callback_t disp_error_cb; worker_error_callback_t worker_error_cb; + /** Terminate the connection (from the worker thread). */ + void worker_terminate(const conn_t &conn); + /** Terminate the connection (from the dispatcher thread). */ + void disp_terminate(const conn_t &conn); + + void conn_server(const conn_t &conn, int, int); + /** Called when new data is available. */ + virtual void on_read(const conn_t &) {} + /** Called when the underlying connection is established. */ + virtual void on_setup(const conn_t &) {} + /** Called when the underlying connection breaks. */ + virtual void on_teardown(const conn_t &) {} private: const int max_listen_backlog; @@ -195,11 +190,11 @@ class ConnPool { bool ret = !conn_cb || conn_cb(conn, connected); if (enable_tls && connected) { - conn->worker->get_tcall()->async_call([conn, ret](ThreadCall::Handle &) { + conn->worker->get_tcall()->async_call([this, conn, ret](ThreadCall::Handle &) { if (ret) conn->recv_data_func = Conn::_recv_data_tls; else - conn->worker_terminate(); + worker_terminate(conn); }); } }); @@ -214,6 +209,7 @@ class ConnPool { ConnPool::worker_error_callback_t on_fatal_error; public: + Worker(): tcall(ec), disp_flag(false), nconn(0) {} void set_error_callback(ConnPool::worker_error_callback_t _on_error) { @@ -233,6 +229,7 @@ class ConnPool { /* the caller should finalize all the preparation */ tcall.async_call([this, conn, client_fd](ThreadCall::Handle &) { try { + conn->ev_connect.clear(); if (conn->mode == Conn::ConnMode::DEAD) { SALTICIDAE_LOG_INFO("worker %x discarding dead connection", @@ -258,6 +255,7 @@ class ConnPool { SALTICIDAE_LOG_INFO("worker %x got %s", std::this_thread::get_id(), std::string(*conn).c_str()); + assert(conn->worker == this); conn->get_send_buffer() .get_queue() .reg_handler(this->ec, [conn, client_fd] @@ -270,7 +268,7 @@ class ConnPool { } return false; }); - conn->ev_socket = FdEvent(ec, client_fd, [this, conn=conn](int fd, int what) { + conn->ev_socket = FdEvent(ec, client_fd, [this, conn](int fd, int what) { try { if (what & FdEvent::READ) conn->recv_data_func(conn, fd, what); @@ -278,7 +276,7 @@ class ConnPool { conn->send_data_func(conn, fd, what); } catch (...) { conn->cpool->recoverable_error(std::current_exception()); - conn->worker_terminate(); + conn->cpool->worker_terminate(conn); } }); conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE); @@ -310,6 +308,7 @@ class ConnPool { void accept_client(int, int); conn_t add_conn(const conn_t &conn); void del_conn(const conn_t &conn); + void release_conn(const conn_t &conn); protected: conn_t _connect(const NetAddr &addr); @@ -494,6 +493,7 @@ class ConnPool { ConnPool(ConnPool &&) = delete; void start() { + std::atomic_thread_fence(std::memory_order_acq_rel); if (system_state) return; SALTICIDAE_LOG_INFO("starting all threads..."); for (size_t i = 0; i < nworker; i++) @@ -516,10 +516,9 @@ class ConnPool { workers[i].get_handle().join(); for (auto it: pool) { - conn_t conn = it.second; + auto &conn = it.second; conn->stop(); - conn->self_ref = nullptr; - ::close(conn->fd); + release_conn(conn); } } @@ -589,7 +588,7 @@ class ConnPool { void terminate(const conn_t &conn) { disp_tcall->async_call([this, conn](ThreadCall::Handle &) { try { - conn->disp_terminate(); + disp_terminate(conn); } catch (...) { disp_error_cb(std::current_exception()); } -- cgit v1.2.3