From 82bd1d81e918cc8a1a46b6c36a81cbba43750bb1 Mon Sep 17 00:00:00 2001 From: Determinant Date: Thu, 15 Nov 2018 00:24:38 -0500 Subject: use a simple worker selection policy; add todo-list --- include/salticidae/conn.h | 64 +++++++++++++++++++++----------------------- include/salticidae/network.h | 6 ++--- 2 files changed, 34 insertions(+), 36 deletions(-) (limited to 'include') diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 480809f..5f5fef4 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -94,8 +94,10 @@ class ConnPool { void send_data(int, int); void conn_server(int, int); - /** Terminate the connection. */ - void terminate(); + /** Terminate the connection (from the worker thread). */ + void worker_terminate(); + /** Terminate the connection (from the dispatcher thread). */ + void disp_terminate(bool blocking = true); public: Conn(): ready_send(false) {} @@ -121,16 +123,9 @@ class ConnPool { } protected: - /** Close the IO and clear all on-going or planned events. */ - virtual void stop() { - if (!self_ref) return; - ev_connect.clear(); - ev_socket.clear(); - send_buffer.get_queue().unreg_handler(); - ::close(fd); - self_ref = nullptr; /* remove the self-cycle */ - } - + /** Close the IO and clear all on-going or planned events. Remove the + * connection from a Worker. */ + virtual void stop(); /** Called when new data is available. */ virtual void on_read() {} /** Called when the underlying connection is established. */ @@ -153,7 +148,7 @@ class ConnPool { /* owned by user loop */ BoxObj user_tcall; - conn_callback_t conn_cb; + conn_callback_t conn_cb; /* owned by the dispatcher */ Event ev_listen; @@ -170,9 +165,11 @@ class ConnPool { EventContext ec; ThreadCall tcall; std::thread handle; + bool disp_flag; + std::atomic nconn; public: - Worker(): tcall(ec) {} + Worker(): tcall(ec), disp_flag(false), nconn(0) {} /* the following functions are called by the dispatcher */ void start() { @@ -203,15 +200,13 @@ class ConnPool { else conn->send_data(fd, what); }); - - // std::bind(&Conn::recv_data, conn_ptr, _1, _2)); - //conn->ev_write = Event(ec, client_fd, Event::WRITE, - // std::bind(&Conn::send_data, conn_ptr, _1, _2)); conn->ev_socket.add(Event::READ | Event::WRITE); - //conn->ev_write.add(); + nconn++; }); } + void unfeed() { nconn--; } + void stop() { tcall.call([this](ThreadCall::Handle &) { ec.stop(); }); } @@ -219,6 +214,9 @@ class ConnPool { std::thread &get_handle() { return handle; } const EventContext &get_ec() { return ec; } ThreadCall *get_tcall() { return &tcall; } + void set_dispatcher() { disp_flag = true; } + bool is_dispatcher() const { return disp_flag; } + size_t get_nconn() { return nconn; } }; /* related to workers */ @@ -248,7 +246,18 @@ class ConnPool { //}; Worker &select_worker() { - return workers[1]; + size_t idx = 0; + size_t best = workers[idx].get_nconn(); + for (size_t i = 0; i < nworker; i++) + { + size_t t = workers[i].get_nconn(); + if (t < best) + { + best = t; + idx = i; + } + } + return workers[idx]; } public: @@ -256,7 +265,7 @@ class ConnPool { int max_listen_backlog = 10, double conn_server_timeout = 2, size_t seg_buff_size = 4096, - size_t nworker = 2): + size_t nworker = 4): ec(ec), max_listen_backlog(max_listen_backlog), conn_server_timeout(conn_server_timeout), @@ -267,6 +276,7 @@ class ConnPool { user_tcall = new ThreadCall(ec); disp_ec = workers[0].get_ec(); disp_tcall = workers[0].get_tcall(); + workers[0].set_dispatcher(); } ~ConnPool() { @@ -336,18 +346,6 @@ class ConnPool { template void reg_conn_handler(Func cb) { conn_cb = cb; } - - void terminate(const conn_t &conn, bool blocking = true) { - int fd = conn->fd; - auto worker = conn->worker; - if (worker) - worker->get_tcall()->call([conn](ThreadCall::Handle &) { - conn->stop(); - }, blocking); - else - conn->stop(); - remove_conn(fd); - } }; } diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 60d8f20..4f934cd 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -470,7 +470,7 @@ void PeerNetwork::Conn::on_setup() { assert(!ev_timeout); ev_timeout = Event(worker->get_ec(), -1, [conn](int, int) { SALTICIDAE_LOG_INFO("peer ping-pong timeout"); - conn->terminate(); + conn->worker_terminate(); }); /* the initial ping-pong to set up the connection */ tcall_reset_timeout(worker, conn, pn->conn_timeout); @@ -508,7 +508,7 @@ void PeerNetwork::Peer::reset_conn(conn_t new_conn) { //SALTICIDAE_LOG_DEBUG("moving send buffer"); //new_conn->move_send_buffer(conn); SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str()); - conn->cpool->terminate(conn); + conn->disp_terminate(); } addr = new_conn->get_addr(); conn = new_conn; @@ -557,7 +557,7 @@ bool PeerNetwork::check_new_conn(const conn_t &conn, uint16_t port) { { if (conn != p->conn) { - conn->cpool->terminate(conn); + conn->disp_terminate(); return true; } return false; -- cgit v1.2.3-70-g09d2