diff options
author | Determinant <[email protected]> | 2018-11-16 21:40:53 -0500 |
---|---|---|
committer | Determinant <[email protected]> | 2018-11-16 21:40:53 -0500 |
commit | 20669e2b5e956babf888bca577e39a71d83bae79 (patch) | |
tree | 22733578ee8731cebd43f50be33e09c765c61714 /src | |
parent | b27216a50bd6566b6fd9203d3acf191005ae5763 (diff) |
add test_p2p_stress.cpp; fix bugs
Diffstat (limited to 'src')
-rw-r--r-- | src/conn.cpp | 45 |
1 files changed, 33 insertions, 12 deletions
diff --git a/src/conn.cpp b/src/conn.cpp index 5967339..c29dee9 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -40,14 +40,25 @@ ConnPool::Conn::operator std::string() const { s << "<Conn " << "fd=" << std::to_string(fd) << " " << "addr=" << std::string(addr) << " " - << "mode=" << ((mode == Conn::ACTIVE) ? "active" : "passive") << ">"; + << "mode="; + switch (mode) + { + case Conn::ACTIVE: s << "active"; break; + case Conn::PASSIVE: s << "passive"; break; + case Conn::DEAD: s << "dead"; break; + } + s << ">"; return std::move(s); } /* the following two functions are executed by exactly one worker per Conn object */ void ConnPool::Conn::send_data(int fd, int events) { - if (!(events & Event::WRITE)) return; + if (events & Event::ERROR) + { + worker_terminate(); + return; + } auto conn = self(); /* pin the connection */ ssize_t ret = seg_buff_size; for (;;) @@ -88,7 +99,11 @@ void ConnPool::Conn::send_data(int fd, int events) { } void ConnPool::Conn::recv_data(int fd, int events) { - if (!(events & Event::READ)) return; + if (events & Event::ERROR) + { + worker_terminate(); + return; + } auto conn = self(); /* pin the connection */ ssize_t ret = seg_buff_size; while (ret == (ssize_t)seg_buff_size) @@ -125,24 +140,29 @@ void ConnPool::Conn::stop() { ev_connect.clear(); ev_socket.clear(); send_buffer.get_queue().unreg_handler(); - ::close(fd); mode = ConnMode::DEAD; } } void ConnPool::Conn::worker_terminate() { stop(); - cpool->disp_tcall->call( - [cpool=this->cpool, fd=this->fd](ThreadCall::Handle &) { - cpool->remove_conn(fd); - }); + if (!worker->is_dispatcher()) + cpool->disp_tcall->async_call( + [cpool=this->cpool, fd=this->fd](ThreadCall::Handle &) { + cpool->remove_conn(fd); + }); + else cpool->remove_conn(fd); } void ConnPool::Conn::disp_terminate() { if (worker && !worker->is_dispatcher()) - worker->get_tcall()->call([conn=self()](ThreadCall::Handle &) { - conn->stop(); - }); + { + auto conn = self(); + if (conn) + worker->get_tcall()->call([conn](ThreadCall::Handle &) { + conn->stop(); + }); + } else stop(); cpool->remove_conn(fd); } @@ -277,12 +297,13 @@ void ConnPool::remove_conn(int fd) { /* temporarily pin the conn before it dies */ auto conn = it->second; //assert(conn->fd == fd); - conn->fd = -1; pool.erase(it); /* inform the upper layer the connection will be destroyed */ conn->on_teardown(); update_conn(conn, false); conn->self_ref = nullptr; /* remove the self-cycle */ + ::close(conn->fd); + conn->fd = -1; } } |