From 0f341fe7f092f704e1c1952c72085eb1ebd2086a Mon Sep 17 00:00:00 2001 From: Determinant Date: Wed, 14 Nov 2018 15:19:32 -0500 Subject: use ThreadCall pattern --- include/salticidae/conn.h | 267 ++++++++++++++-------------------------------- 1 file changed, 81 insertions(+), 186 deletions(-) (limited to 'include/salticidae/conn.h') diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index a86a4d2..2ef6b50 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -95,6 +95,8 @@ class ConnPool { void send_data(int, int); void conn_server(int, int); + /** Stop the worker and I/O events. */ + void stop(); /** Terminate the connection. */ void terminate(); @@ -146,54 +148,27 @@ class ConnPool { const size_t seg_buff_size; /* owned by user loop */ - int mlisten_fd[2]; /**< for connection events sent to the user loop */ - Event ev_mlisten; + BoxObj user_tcall; conn_callback_t conn_cb; /* owned by the dispatcher */ + Event ev_listen; std::unordered_map pool; int listen_fd; /**< for accepting new network connections */ - Event ev_listen; - Event ev_dlisten; - std::mutex cp_mlock; void update_conn(const conn_t &conn, bool connected) { - auto dcmd = new UserConn(conn, connected); - write(mlisten_fd[1], &dcmd, sizeof(dcmd)); + user_tcall->call([this, conn, connected](ThreadCall::Handle &) { + if (conn_cb) conn_cb(*conn, connected); + }); } - struct Worker; - class WorkerFeed; - - class WorkerCmd { - public: - virtual ~WorkerCmd() = default; - virtual void exec(Worker *worker) = 0; - }; - class Worker { EventContext ec; - Event ev_ctl; - int ctl_fd[2]; /**< for control messages from dispatcher */ + ThreadCall msgr; std::thread handle; public: - Worker() { - if (pipe2(ctl_fd, O_NONBLOCK)) - throw ConnPoolError(std::string("failed to create worker pipe")); - ev_ctl = Event(ec, ctl_fd[0], Event::READ, [this](int fd, int) { - WorkerCmd *dcmd; - read(fd, &dcmd, sizeof(dcmd)); - dcmd->exec(this); - delete dcmd; - }); - ev_ctl.add(); - } - - ~Worker() { - close(ctl_fd[0]); - close(ctl_fd[1]); - } + Worker(): msgr(ec) {} /* the following functions are called by the dispatcher */ void start() { @@ -201,143 +176,69 @@ class ConnPool { } void feed(const conn_t &conn, int client_fd) { - auto dcmd = new WorkerFeed(conn, client_fd); - write(ctl_fd[1], &dcmd, sizeof(dcmd)); + msgr.call([this, conn, client_fd](ThreadCall::Handle &) { + SALTICIDAE_LOG_INFO("worker %x got %s", + std::this_thread::get_id(), + std::string(*conn).c_str()); + conn->get_send_buffer() + .get_queue() + .reg_handler(this->ec, [conn, client_fd] + (MPSCWriteBuffer::queue_t &) { + if (conn->ready_send && conn->fd != -1) + conn->send_data(client_fd, Event::WRITE); + return false; + }); + //auto conn_ptr = conn.get(); + conn->ev_read = Event(ec, client_fd, Event::READ | Event::WRITE, [conn=conn](int fd, int what) { + if (what & Event::READ) + conn->recv_data(fd, what); + else + conn->send_data(fd, what); + }); + + // std::bind(&Conn::recv_data, conn_ptr, _1, _2)); + //conn->ev_write = Event(ec, client_fd, Event::WRITE, + // std::bind(&Conn::send_data, conn_ptr, _1, _2)); + conn->ev_read.add(); + //conn->ev_write.add(); + }); } void stop() { - auto dcmd = new WorkerStop(); - write(ctl_fd[1], &dcmd, sizeof(dcmd)); + msgr.call([this](ThreadCall::Handle &) { + ec.stop(); + }); } std::thread &get_handle() { return handle; } const EventContext &get_ec() { return ec; } }; - class WorkerFeed: public WorkerCmd { - conn_t conn; - int client_fd; - - public: - WorkerFeed(const conn_t &conn, int client_fd): - conn(conn), client_fd(client_fd) {} - void exec(Worker *worker) override { - SALTICIDAE_LOG_INFO("worker %x got %s", - std::this_thread::get_id(), - std::string(*conn).c_str()); - auto &ec = worker->get_ec(); - conn->get_send_buffer() - .get_queue() - .reg_handler(ec, [conn=this->conn, - client_fd=this->client_fd](MPSCWriteBuffer::queue_t &) { - if (conn->ready_send && conn->fd != -1) - conn->send_data(client_fd, Event::WRITE); - return false; - }); - //auto conn_ptr = conn.get(); - conn->ev_read = Event(ec, client_fd, Event::READ | Event::WRITE, [conn=conn](int fd, int what) { - if (what & Event::READ) - conn->recv_data(fd, what); - else - conn->send_data(fd, what); - }); - - // std::bind(&Conn::recv_data, conn_ptr, _1, _2)); - //conn->ev_write = Event(ec, client_fd, Event::WRITE, - // std::bind(&Conn::send_data, conn_ptr, _1, _2)); - conn->ev_read.add(); - //conn->ev_write.add(); - } - }; - - class WorkerStop: public WorkerCmd { - public: - void exec(Worker *worker) override { worker->get_ec().stop(); } - }; - /* related to workers */ size_t nworker; salticidae::BoxObj workers; void accept_client(int, int); conn_t add_conn(const conn_t &conn); - conn_t _connect(const NetAddr &addr); - void _post_terminate(int fd); - void _accept_listen(int listen_fd); - void _connect_listen(const conn_t &conn); + void terminate(int fd); protected: - class DispatchCmd { - public: - virtual ~DispatchCmd() = default; - virtual void exec(ConnPool *cpool) = 0; - }; + conn_t _connect(const NetAddr &addr); + void _listen(NetAddr listen_addr); private: - class DspConnect: public DispatchCmd { - const NetAddr addr; - public: - DspConnect(const NetAddr &addr): addr(addr) {} - void exec(ConnPool *cpool) override { - cpool->update_conn(cpool->_connect(addr), true); - } - }; - - class DspPostTerm: public DispatchCmd { - int fd; - public: - DspPostTerm(int fd): fd(fd) {} - void exec(ConnPool *cpool) override { - cpool->_post_terminate(fd); - } - }; - - class DspMulticast: public DispatchCmd { - std::vector receivers; - bytearray_t data; - public: - DspMulticast(std::vector &&receivers, bytearray_t &&data): - receivers(std::move(receivers)), - data(std::move(data)) {} - void exec(ConnPool *) override { - for (auto &r: receivers) r->write(bytearray_t(data)); - } - }; - - class DspAcceptListen: public DispatchCmd { - int listen_fd; - public: - DspAcceptListen(int listen_fd): listen_fd(listen_fd) {} - void exec(ConnPool *cpool) override { - cpool->_accept_listen(listen_fd); - } - }; - class DspConnectListen: public DispatchCmd { - conn_t conn; - public: - DspConnectListen(const conn_t &conn): conn(conn) {} - void exec(ConnPool *cpool) override { - cpool->_connect_listen(conn); - } - }; - - class UserConn: public DispatchCmd { - conn_t conn; - bool connected; - public: - UserConn(const conn_t &conn, bool connected): - conn(conn), connected(connected) {} - void exec(ConnPool *cpool) override { - if (cpool->conn_cb) - cpool->conn_cb(*conn, connected); - } - }; - - void post_terminate(int fd) { - auto dcmd = new DspPostTerm(fd); - write(dlisten_fd[1], &dcmd, sizeof(dcmd)); - } + //class DspMulticast: public DispatchCmd { + // std::vector receivers; + // bytearray_t data; + // public: + // DspMulticast(std::vector &&receivers, bytearray_t &&data): + // receivers(std::move(receivers)), + // data(std::move(data)) {} + // void exec(ConnPool *) override { + // for (auto &r: receivers) r->write(bytearray_t(data)); + // } + //}; Worker &select_worker() { return workers[1]; @@ -346,7 +247,7 @@ class ConnPool { protected: EventContext ec; EventContext dispatcher_ec; - int dlisten_fd[2]; /**< for control command sent to the dispatcher */ + BoxObj disp_tcall; /** Should be implemented by derived class to return a new Conn object. */ virtual Conn *create_conn() = 0; @@ -360,35 +261,13 @@ class ConnPool { conn_server_timeout(conn_server_timeout), seg_buff_size(seg_buff_size), listen_fd(-1), - nworker(std::min((size_t)1, nworker)), + nworker(std::max((size_t)1, nworker)), ec(ec) { - if (pipe2(mlisten_fd, O_NONBLOCK)) - throw ConnPoolError(std::string("failed to create main pipe")); - if (pipe2(dlisten_fd, O_NONBLOCK)) - throw ConnPoolError(std::string("failed to create dispatcher pipe")); - - ev_mlisten = Event(ec, mlisten_fd[0], Event::READ, [this](int fd, int) { - DispatchCmd *dcmd; - read(fd, &dcmd, sizeof(dcmd)); - dcmd->exec(this); - delete dcmd; - }); - ev_mlisten.add(); - workers = new Worker[nworker]; dispatcher_ec = workers[0].get_ec(); - ev_dlisten = Event(dispatcher_ec, dlisten_fd[0], Event::READ, [this](int fd, int) { - DispatchCmd *dcmd; - read(fd, &dcmd, sizeof(dcmd)); - dcmd->exec(this); - delete dcmd; - }); - ev_dlisten.add(); - - SALTICIDAE_LOG_INFO("starting all threads..."); - for (size_t i = 0; i < nworker; i++) - workers[i].start(); + user_tcall = new ThreadCall(ec); + disp_tcall = new ThreadCall(dispatcher_ec); } ~ConnPool() { @@ -404,24 +283,36 @@ class ConnPool { conn->on_close(); } if (listen_fd != -1) close(listen_fd); - for (int i = 0; i < 2; i++) - { - close(mlisten_fd[i]); - close(dlisten_fd[i]); - } } ConnPool(const ConnPool &) = delete; ConnPool(ConnPool &&) = delete; + void start() { + SALTICIDAE_LOG_INFO("starting all threads..."); + for (size_t i = 0; i < nworker; i++) + workers[i].start(); + } + /** Actively connect to remote addr. */ conn_t connect(const NetAddr &addr, bool blocking = true) { if (blocking) - return _connect(addr); + { + auto ret = static_cast(disp_tcall->call( + [this, addr](ThreadCall::Handle &h) { + auto ptr = new conn_t(_connect(addr)); + std::atomic_thread_fence(std::memory_order_release); + h.set_result(ptr); + }, true)); + auto conn = *ret; + delete ret; + return std::move(conn); + } else { - auto dcmd = new DspConnect(addr); - write(dlisten_fd[1], &dcmd, sizeof(dcmd)); + disp_tcall->call([this, addr](ThreadCall::Handle &) { + _connect(addr); + }, false); return nullptr; } } @@ -429,7 +320,11 @@ class ConnPool { /** Listen for passive connections (connection initiated from remote). * Does not need to be called if do not want to accept any passive * connections. */ - void listen(NetAddr listen_addr); + void listen(NetAddr listen_addr) { + disp_tcall->call([this, listen_addr](ThreadCall::Handle &) { + _listen(listen_addr); + }, true); + } template void reg_conn_handler(Func cb) { conn_cb = cb; } -- cgit v1.2.3