diff options
-rw-r--r-- | TODO.rst | 2 | ||||
-rw-r--r-- | include/salticidae/conn.h | 64 | ||||
-rw-r--r-- | include/salticidae/network.h | 6 | ||||
-rw-r--r-- | src/conn.cpp | 31 | ||||
-rw-r--r-- | test/test_p2p.cpp | 7 |
5 files changed, 65 insertions, 45 deletions
diff --git a/TODO.rst b/TODO.rst new file mode 100644 index 0000000..b6fb085 --- /dev/null +++ b/TODO.rst @@ -0,0 +1,2 @@ +- use config factory class to configure ConnPool, MsgNetwork, PeerNetwork, etc. +- stress test for PeerNetwork diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 480809f..5f5fef4 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -94,8 +94,10 @@ class ConnPool { void send_data(int, int); void conn_server(int, int); - /** Terminate the connection. */ - void terminate(); + /** Terminate the connection (from the worker thread). */ + void worker_terminate(); + /** Terminate the connection (from the dispatcher thread). */ + void disp_terminate(bool blocking = true); public: Conn(): ready_send(false) {} @@ -121,16 +123,9 @@ class ConnPool { } protected: - /** Close the IO and clear all on-going or planned events. */ - virtual void stop() { - if (!self_ref) return; - ev_connect.clear(); - ev_socket.clear(); - send_buffer.get_queue().unreg_handler(); - ::close(fd); - self_ref = nullptr; /* remove the self-cycle */ - } - + /** Close the IO and clear all on-going or planned events. Remove the + * connection from a Worker. */ + virtual void stop(); /** Called when new data is available. */ virtual void on_read() {} /** Called when the underlying connection is established. */ @@ -153,7 +148,7 @@ class ConnPool { /* owned by user loop */ BoxObj<ThreadCall> user_tcall; - conn_callback_t conn_cb; + conn_callback_t conn_cb; /* owned by the dispatcher */ Event ev_listen; @@ -170,9 +165,11 @@ class ConnPool { EventContext ec; ThreadCall tcall; std::thread handle; + bool disp_flag; + std::atomic<size_t> nconn; public: - Worker(): tcall(ec) {} + Worker(): tcall(ec), disp_flag(false), nconn(0) {} /* the following functions are called by the dispatcher */ void start() { @@ -203,15 +200,13 @@ class ConnPool { else conn->send_data(fd, what); }); - - // std::bind(&Conn::recv_data, conn_ptr, _1, _2)); - //conn->ev_write = Event(ec, client_fd, Event::WRITE, - // std::bind(&Conn::send_data, conn_ptr, _1, _2)); conn->ev_socket.add(Event::READ | Event::WRITE); - //conn->ev_write.add(); + nconn++; }); } + void unfeed() { nconn--; } + void stop() { tcall.call([this](ThreadCall::Handle &) { ec.stop(); }); } @@ -219,6 +214,9 @@ class ConnPool { std::thread &get_handle() { return handle; } const EventContext &get_ec() { return ec; } ThreadCall *get_tcall() { return &tcall; } + void set_dispatcher() { disp_flag = true; } + bool is_dispatcher() const { return disp_flag; } + size_t get_nconn() { return nconn; } }; /* related to workers */ @@ -248,7 +246,18 @@ class ConnPool { //}; Worker &select_worker() { - return workers[1]; + size_t idx = 0; + size_t best = workers[idx].get_nconn(); + for (size_t i = 0; i < nworker; i++) + { + size_t t = workers[i].get_nconn(); + if (t < best) + { + best = t; + idx = i; + } + } + return workers[idx]; } public: @@ -256,7 +265,7 @@ class ConnPool { int max_listen_backlog = 10, double conn_server_timeout = 2, size_t seg_buff_size = 4096, - size_t nworker = 2): + size_t nworker = 4): ec(ec), max_listen_backlog(max_listen_backlog), conn_server_timeout(conn_server_timeout), @@ -267,6 +276,7 @@ class ConnPool { user_tcall = new ThreadCall(ec); disp_ec = workers[0].get_ec(); disp_tcall = workers[0].get_tcall(); + workers[0].set_dispatcher(); } ~ConnPool() { @@ -336,18 +346,6 @@ class ConnPool { template<typename Func> void reg_conn_handler(Func cb) { conn_cb = cb; } - - void terminate(const conn_t &conn, bool blocking = true) { - int fd = conn->fd; - auto worker = conn->worker; - if (worker) - worker->get_tcall()->call([conn](ThreadCall::Handle &) { - conn->stop(); - }, blocking); - else - conn->stop(); - remove_conn(fd); - } }; } diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 60d8f20..4f934cd 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -470,7 +470,7 @@ void PeerNetwork<O, _, __>::Conn::on_setup() { assert(!ev_timeout); ev_timeout = Event(worker->get_ec(), -1, [conn](int, int) { SALTICIDAE_LOG_INFO("peer ping-pong timeout"); - conn->terminate(); + conn->worker_terminate(); }); /* the initial ping-pong to set up the connection */ tcall_reset_timeout(worker, conn, pn->conn_timeout); @@ -508,7 +508,7 @@ void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) { //SALTICIDAE_LOG_DEBUG("moving send buffer"); //new_conn->move_send_buffer(conn); SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str()); - conn->cpool->terminate(conn); + conn->disp_terminate(); } addr = new_conn->get_addr(); conn = new_conn; @@ -557,7 +557,7 @@ bool PeerNetwork<O, _, __>::check_new_conn(const conn_t &conn, uint16_t port) { { if (conn != p->conn) { - conn->cpool->terminate(conn); + conn->disp_terminate(); return true; } return false; diff --git a/src/conn.cpp b/src/conn.cpp index f170d0a..841002f 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -67,7 +67,7 @@ void ConnPool::Conn::send_data(int fd, int events) { if (ret < 0 && errno != EWOULDBLOCK) { SALTICIDAE_LOG_INFO("send(%d) failure: %s", fd, strerror(errno)); - terminate(); + worker_terminate(); return; } } @@ -102,13 +102,13 @@ void ConnPool::Conn::recv_data(int fd, int events) { if (errno == EWOULDBLOCK) break; SALTICIDAE_LOG_INFO("recv(%d) failure: %s", fd, strerror(errno)); /* connection err or half-opened connection */ - terminate(); + worker_terminate(); return; } if (ret == 0) { //SALTICIDAE_LOG_INFO("recv(%d) terminates", fd, strerror(errno)); - terminate(); + worker_terminate(); return; } buff_seg.resize(ret); @@ -118,7 +118,17 @@ void ConnPool::Conn::recv_data(int fd, int events) { on_read(); } -void ConnPool::Conn::terminate() { +void ConnPool::Conn::stop() { + if (!self_ref) return; + if (worker) worker->unfeed(); + ev_connect.clear(); + ev_socket.clear(); + send_buffer.get_queue().unreg_handler(); + ::close(fd); + self_ref = nullptr; /* remove the self-cycle */ +} + +void ConnPool::Conn::worker_terminate() { stop(); cpool->disp_tcall->call( [cpool=this->cpool, fd=this->fd](ThreadCall::Handle &) { @@ -126,6 +136,15 @@ void ConnPool::Conn::terminate() { }); } +void ConnPool::Conn::disp_terminate(bool blocking) { + if (worker && !worker->is_dispatcher()) + worker->get_tcall()->call([conn=self()](ThreadCall::Handle &) { + conn->stop(); + }, blocking); + else stop(); + cpool->remove_conn(fd); +} + void ConnPool::accept_client(int fd, int) { int client_fd; struct sockaddr client_addr; @@ -173,7 +192,7 @@ void ConnPool::Conn::conn_server(int fd, int events) { { if (events & Event::TIMEOUT) SALTICIDAE_LOG_INFO("%s connect timeout", std::string(*this).c_str()); - cpool->terminate(conn); + conn->disp_terminate(); return; } } @@ -237,7 +256,7 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) { sizeof(struct sockaddr_in)) < 0 && errno != EINPROGRESS) { SALTICIDAE_LOG_INFO("cannot connect to %s", std::string(addr).c_str()); - terminate(conn); + conn->disp_terminate(); } else { diff --git a/test/test_p2p.cpp b/test/test_p2p.cpp index f902afa..99c28a0 100644 --- a/test/test_p2p.cpp +++ b/test/test_p2p.cpp @@ -75,10 +75,11 @@ const uint8_t MsgAck::opcode; using MyNet = salticidae::PeerNetwork<uint8_t>; -NetAddr addrs[] = { +std::vector<NetAddr> addrs = { NetAddr("127.0.0.1:12345"), NetAddr("127.0.0.1:12346"), - NetAddr("127.0.0.1:12347") + NetAddr("127.0.0.1:12347"), + NetAddr("127.0.0.1:12348") }; void signal_handler(int) { @@ -97,7 +98,7 @@ int main(int argc, char **argv) { int i; net.start(); net.listen(addrs[i = atoi(argv[1])]); - for (int j = 0; j < 3; j++) + for (int j = 0; j < addrs.size(); j++) if (i != j) net.add_peer(addrs[j]); ec.dispatch(); } catch (salticidae::SalticidaeError &e) {} |