aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2018-11-16 21:40:53 -0500
committerDeterminant <[email protected]>2018-11-16 21:40:53 -0500
commit20669e2b5e956babf888bca577e39a71d83bae79 (patch)
tree22733578ee8731cebd43f50be33e09c765c61714 /src
parentb27216a50bd6566b6fd9203d3acf191005ae5763 (diff)
add test_p2p_stress.cpp; fix bugs
Diffstat (limited to 'src')
-rw-r--r--src/conn.cpp45
1 files changed, 33 insertions, 12 deletions
diff --git a/src/conn.cpp b/src/conn.cpp
index 5967339..c29dee9 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -40,14 +40,25 @@ ConnPool::Conn::operator std::string() const {
s << "<Conn "
<< "fd=" << std::to_string(fd) << " "
<< "addr=" << std::string(addr) << " "
- << "mode=" << ((mode == Conn::ACTIVE) ? "active" : "passive") << ">";
+ << "mode=";
+ switch (mode)
+ {
+ case Conn::ACTIVE: s << "active"; break;
+ case Conn::PASSIVE: s << "passive"; break;
+ case Conn::DEAD: s << "dead"; break;
+ }
+ s << ">";
return std::move(s);
}
/* the following two functions are executed by exactly one worker per Conn object */
void ConnPool::Conn::send_data(int fd, int events) {
- if (!(events & Event::WRITE)) return;
+ if (events & Event::ERROR)
+ {
+ worker_terminate();
+ return;
+ }
auto conn = self(); /* pin the connection */
ssize_t ret = seg_buff_size;
for (;;)
@@ -88,7 +99,11 @@ void ConnPool::Conn::send_data(int fd, int events) {
}
void ConnPool::Conn::recv_data(int fd, int events) {
- if (!(events & Event::READ)) return;
+ if (events & Event::ERROR)
+ {
+ worker_terminate();
+ return;
+ }
auto conn = self(); /* pin the connection */
ssize_t ret = seg_buff_size;
while (ret == (ssize_t)seg_buff_size)
@@ -125,24 +140,29 @@ void ConnPool::Conn::stop() {
ev_connect.clear();
ev_socket.clear();
send_buffer.get_queue().unreg_handler();
- ::close(fd);
mode = ConnMode::DEAD;
}
}
void ConnPool::Conn::worker_terminate() {
stop();
- cpool->disp_tcall->call(
- [cpool=this->cpool, fd=this->fd](ThreadCall::Handle &) {
- cpool->remove_conn(fd);
- });
+ if (!worker->is_dispatcher())
+ cpool->disp_tcall->async_call(
+ [cpool=this->cpool, fd=this->fd](ThreadCall::Handle &) {
+ cpool->remove_conn(fd);
+ });
+ else cpool->remove_conn(fd);
}
void ConnPool::Conn::disp_terminate() {
if (worker && !worker->is_dispatcher())
- worker->get_tcall()->call([conn=self()](ThreadCall::Handle &) {
- conn->stop();
- });
+ {
+ auto conn = self();
+ if (conn)
+ worker->get_tcall()->call([conn](ThreadCall::Handle &) {
+ conn->stop();
+ });
+ }
else stop();
cpool->remove_conn(fd);
}
@@ -277,12 +297,13 @@ void ConnPool::remove_conn(int fd) {
/* temporarily pin the conn before it dies */
auto conn = it->second;
//assert(conn->fd == fd);
- conn->fd = -1;
pool.erase(it);
/* inform the upper layer the connection will be destroyed */
conn->on_teardown();
update_conn(conn, false);
conn->self_ref = nullptr; /* remove the self-cycle */
+ ::close(conn->fd);
+ conn->fd = -1;
}
}