From 20669e2b5e956babf888bca577e39a71d83bae79 Mon Sep 17 00:00:00 2001 From: Determinant Date: Fri, 16 Nov 2018 21:40:53 -0500 Subject: add test_p2p_stress.cpp; fix bugs --- src/conn.cpp | 45 +++++++++++++++++++++++++++++++++------------ 1 file changed, 33 insertions(+), 12 deletions(-) (limited to 'src') diff --git a/src/conn.cpp b/src/conn.cpp index 5967339..c29dee9 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -40,14 +40,25 @@ ConnPool::Conn::operator std::string() const { s << ""; + << "mode="; + switch (mode) + { + case Conn::ACTIVE: s << "active"; break; + case Conn::PASSIVE: s << "passive"; break; + case Conn::DEAD: s << "dead"; break; + } + s << ">"; return std::move(s); } /* the following two functions are executed by exactly one worker per Conn object */ void ConnPool::Conn::send_data(int fd, int events) { - if (!(events & Event::WRITE)) return; + if (events & Event::ERROR) + { + worker_terminate(); + return; + } auto conn = self(); /* pin the connection */ ssize_t ret = seg_buff_size; for (;;) @@ -88,7 +99,11 @@ void ConnPool::Conn::send_data(int fd, int events) { } void ConnPool::Conn::recv_data(int fd, int events) { - if (!(events & Event::READ)) return; + if (events & Event::ERROR) + { + worker_terminate(); + return; + } auto conn = self(); /* pin the connection */ ssize_t ret = seg_buff_size; while (ret == (ssize_t)seg_buff_size) @@ -125,24 +140,29 @@ void ConnPool::Conn::stop() { ev_connect.clear(); ev_socket.clear(); send_buffer.get_queue().unreg_handler(); - ::close(fd); mode = ConnMode::DEAD; } } void ConnPool::Conn::worker_terminate() { stop(); - cpool->disp_tcall->call( - [cpool=this->cpool, fd=this->fd](ThreadCall::Handle &) { - cpool->remove_conn(fd); - }); + if (!worker->is_dispatcher()) + cpool->disp_tcall->async_call( + [cpool=this->cpool, fd=this->fd](ThreadCall::Handle &) { + cpool->remove_conn(fd); + }); + else cpool->remove_conn(fd); } void ConnPool::Conn::disp_terminate() { if (worker && !worker->is_dispatcher()) - worker->get_tcall()->call([conn=self()](ThreadCall::Handle &) { - conn->stop(); - }); + { + auto conn = self(); + if (conn) + worker->get_tcall()->call([conn](ThreadCall::Handle &) { + conn->stop(); + }); + } else stop(); cpool->remove_conn(fd); } @@ -277,12 +297,13 @@ void ConnPool::remove_conn(int fd) { /* temporarily pin the conn before it dies */ auto conn = it->second; //assert(conn->fd == fd); - conn->fd = -1; pool.erase(it); /* inform the upper layer the connection will be destroyed */ conn->on_teardown(); update_conn(conn, false); conn->self_ref = nullptr; /* remove the self-cycle */ + ::close(conn->fd); + conn->fd = -1; } } -- cgit v1.2.3-70-g09d2