diff options
Diffstat (limited to 'include/salticidae/conn.h')
-rw-r--r-- | include/salticidae/conn.h | 91 |
1 files changed, 54 insertions, 37 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 2ef6b50..73b3022 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -57,13 +57,13 @@ struct ConnPoolError: public SalticidaeError { /** Abstraction for connection management. */ class ConnPool { + class Worker; public: class Conn; /** The handle to a bi-directional connection. */ using conn_t = ArcObj<Conn>; /** The type of callback invoked when connection status is changed. */ using conn_callback_t = std::function<void(Conn &, bool)>; - /** Abstraction for a bi-directional connection. */ class Conn { friend ConnPool; @@ -77,17 +77,16 @@ class ConnPool { size_t seg_buff_size; conn_t self_ref; int fd; + Worker *worker; ConnPool *cpool; ConnMode mode; NetAddr addr; - // TODO: send_buffer should be a thread-safe mpsc queue MPSCWriteBuffer send_buffer; SegBuffer recv_buffer; - Event ev_read; - Event ev_write; Event ev_connect; + Event ev_socket; /** does not need to wait if true */ bool ready_send; @@ -95,8 +94,6 @@ class ConnPool { void send_data(int, int); void conn_server(int, int); - /** Stop the worker and I/O events. */ - void stop(); /** Terminate the connection. */ void terminate(); @@ -125,10 +122,11 @@ class ConnPool { protected: /** Close the IO and clear all on-going or planned events. */ - virtual void on_close() { - ev_read.clear(); - ev_write.clear(); + virtual void stop() { + if (fd == -1) return; ev_connect.clear(); + ev_socket.clear(); + send_buffer.get_queue().unreg_handler(); ::close(fd); fd = -1; self_ref = nullptr; /* remove the self-cycle */ @@ -142,6 +140,13 @@ class ConnPool { virtual void on_teardown() {} }; + protected: + EventContext ec; + EventContext disp_ec; + ThreadCall* disp_tcall; + /** Should be implemented by derived class to return a new Conn object. */ + virtual Conn *create_conn() = 0; + private: const int max_listen_backlog; const double conn_server_timeout; @@ -164,11 +169,11 @@ class ConnPool { class Worker { EventContext ec; - ThreadCall msgr; + ThreadCall tcall; std::thread handle; public: - Worker(): msgr(ec) {} + Worker(): tcall(ec) {} /* the following functions are called by the dispatcher */ void start() { @@ -176,7 +181,7 @@ class ConnPool { } void feed(const conn_t &conn, int client_fd) { - msgr.call([this, conn, client_fd](ThreadCall::Handle &) { + tcall.call([this, conn, client_fd](ThreadCall::Handle &) { SALTICIDAE_LOG_INFO("worker %x got %s", std::this_thread::get_id(), std::string(*conn).c_str()); @@ -185,11 +190,15 @@ class ConnPool { .reg_handler(this->ec, [conn, client_fd] (MPSCWriteBuffer::queue_t &) { if (conn->ready_send && conn->fd != -1) + { + conn->ev_socket.del(); + conn->ev_socket.add(Event::READ | Event::WRITE); conn->send_data(client_fd, Event::WRITE); + } return false; }); //auto conn_ptr = conn.get(); - conn->ev_read = Event(ec, client_fd, Event::READ | Event::WRITE, [conn=conn](int fd, int what) { + conn->ev_socket = Event(ec, client_fd, [conn=conn](int fd, int what) { if (what & Event::READ) conn->recv_data(fd, what); else @@ -199,19 +208,18 @@ class ConnPool { // std::bind(&Conn::recv_data, conn_ptr, _1, _2)); //conn->ev_write = Event(ec, client_fd, Event::WRITE, // std::bind(&Conn::send_data, conn_ptr, _1, _2)); - conn->ev_read.add(); + conn->ev_socket.add(Event::READ | Event::WRITE); //conn->ev_write.add(); }); } void stop() { - msgr.call([this](ThreadCall::Handle &) { - ec.stop(); - }); + tcall.call([this](ThreadCall::Handle &) { ec.stop(); }); } std::thread &get_handle() { return handle; } const EventContext &get_ec() { return ec; } + ThreadCall *get_tcall() { return &tcall; } }; /* related to workers */ @@ -220,7 +228,7 @@ class ConnPool { void accept_client(int, int); conn_t add_conn(const conn_t &conn); - void terminate(int fd); + void remove_conn(int fd); protected: conn_t _connect(const NetAddr &addr); @@ -244,43 +252,30 @@ class ConnPool { return workers[1]; } - protected: - EventContext ec; - EventContext dispatcher_ec; - BoxObj<ThreadCall> disp_tcall; - /** Should be implemented by derived class to return a new Conn object. */ - virtual Conn *create_conn() = 0; - public: ConnPool(const EventContext &ec, int max_listen_backlog = 10, double conn_server_timeout = 2, size_t seg_buff_size = 4096, size_t nworker = 2): + ec(ec), max_listen_backlog(max_listen_backlog), conn_server_timeout(conn_server_timeout), seg_buff_size(seg_buff_size), listen_fd(-1), - nworker(std::max((size_t)1, nworker)), - ec(ec) { + nworker(std::max((size_t)1, nworker)) { workers = new Worker[nworker]; - dispatcher_ec = workers[0].get_ec(); - user_tcall = new ThreadCall(ec); - disp_tcall = new ThreadCall(dispatcher_ec); + disp_ec = workers[0].get_ec(); + disp_tcall = workers[0].get_tcall(); } ~ConnPool() { - /* stop all workers */ - for (size_t i = 0; i < nworker; i++) - workers[i].stop(); - /* join all worker threads */ - for (size_t i = 0; i < nworker; i++) - workers[i].get_handle().join(); + stop(); for (auto it: pool) { conn_t conn = it.second; - conn->on_close(); + conn->stop(); } if (listen_fd != -1) close(listen_fd); } @@ -294,6 +289,17 @@ class ConnPool { workers[i].start(); } + void stop() { + SALTICIDAE_LOG_INFO("stopping all threads..."); + /* stop all workers */ + for (size_t i = 0; i < nworker; i++) + workers[i].stop(); + /* join all worker threads */ + for (size_t i = 0; i < nworker; i++) + workers[i].get_handle().join(); + nworker = 0; + } + /** Actively connect to remote addr. */ conn_t connect(const NetAddr &addr, bool blocking = true) { if (blocking) @@ -303,6 +309,9 @@ class ConnPool { auto ptr = new conn_t(_connect(addr)); std::atomic_thread_fence(std::memory_order_release); h.set_result(ptr); + h.set_deleter([](void *data) { + delete static_cast<conn_t *>(data); + }); }, true)); auto conn = *ret; delete ret; @@ -328,6 +337,14 @@ class ConnPool { template<typename Func> void reg_conn_handler(Func cb) { conn_cb = cb; } + + void terminate(const conn_t &conn, bool blocking = true) { + int fd = conn->fd; + conn->worker->get_tcall()->call([conn](ThreadCall::Handle &) { + conn->stop(); + }, blocking); + remove_conn(fd); + } }; } |