aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/salticidae/conn.h9
-rw-r--r--include/salticidae/event.h14
-rw-r--r--include/salticidae/network.h2
-rw-r--r--src/conn.cpp17
-rw-r--r--test/CMakeLists.txt3
5 files changed, 28 insertions, 17 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 53465fc..6462ddc 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -71,6 +71,7 @@ class ConnPool {
enum ConnMode {
ACTIVE, /**< the connection is established by connect() */
PASSIVE, /**< the connection is established by accept() */
+ DEAD, /**< the connection is dead */
};
protected:
@@ -178,6 +179,12 @@ class ConnPool {
void feed(const conn_t &conn, int client_fd) {
tcall.call([this, conn, client_fd](ThreadCall::Handle &) {
+ if (conn->mode == Conn::ConnMode::DEAD)
+ {
+ SALTICIDAE_LOG_INFO("worker %x discarding dead connection",
+ std::this_thread::get_id());
+ return;
+ }
SALTICIDAE_LOG_INFO("worker %x got %s",
std::this_thread::get_id(),
std::string(*conn).c_str());
@@ -193,7 +200,6 @@ class ConnPool {
}
return false;
});
- //auto conn_ptr = conn.get();
conn->ev_socket = Event(ec, client_fd, [conn=conn](int fd, int what) {
if (what & Event::READ)
conn->recv_data(fd, what);
@@ -317,6 +323,7 @@ class ConnPool {
{
conn_t conn = it.second;
conn->stop();
+ conn->self_ref = nullptr;
}
if (listen_fd != -1) close(listen_fd);
}
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index 3fd11b6..2cda44f 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -125,7 +125,7 @@ class Event {
ev_fd(other.ev_fd), ev_timer(other.ev_timer),
callback(std::move(other.callback)) {
other.del();
- if (fd != -1)
+ if (ev_fd != nullptr)
{
other.ev_fd = nullptr;
ev_fd->data = this;
@@ -143,7 +143,7 @@ class Event {
ev_timer = other.ev_timer;
callback = std::move(other.callback);
- if (fd != -1)
+ if (ev_fd != nullptr)
{
other.ev_fd = nullptr;
ev_fd->data = this;
@@ -181,7 +181,7 @@ class Event {
void del() {
if (ev_fd) uv_poll_stop(ev_fd);
if (ev_timer == nullptr)
- assert(ev_timer);
+ assert(ev_timer);
uv_timer_stop(ev_timer);
}
void add_with_timeout(double t_sec, int events) {
@@ -205,11 +205,9 @@ class ThreadNotifier {
return data;
}
void notify(void *_data) {
- {
- mutex_lg_t _(mlock);
- ready = true;
- data = _data;
- }
+ mutex_lg_t _(mlock);
+ ready = true;
+ data = _data;
cv.notify_all();
}
};
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index e5f4406..2d16938 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -483,7 +483,7 @@ template<typename O, O _, O __>
void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker,
const conn_t &conn, double timeout) {
worker->get_tcall()->call([conn, t=timeout](ThreadCall::Handle &) {
- assert(conn->ev_timeout);
+ if (!conn->ev_timeout) return;
conn->ev_timeout.del();
conn->ev_timeout.add_with_timeout(t, 0);
SALTICIDAE_LOG_INFO("reset timeout %.2f", t);
diff --git a/src/conn.cpp b/src/conn.cpp
index 841002f..ca13619 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -119,13 +119,15 @@ void ConnPool::Conn::recv_data(int fd, int events) {
}
void ConnPool::Conn::stop() {
- if (!self_ref) return;
- if (worker) worker->unfeed();
- ev_connect.clear();
- ev_socket.clear();
- send_buffer.get_queue().unreg_handler();
- ::close(fd);
- self_ref = nullptr; /* remove the self-cycle */
+ if (mode != ConnMode::DEAD)
+ {
+ if (worker) worker->unfeed();
+ ev_connect.clear();
+ ev_socket.clear();
+ send_buffer.get_queue().unreg_handler();
+ ::close(fd);
+ mode = ConnMode::DEAD;
+ }
}
void ConnPool::Conn::worker_terminate() {
@@ -280,6 +282,7 @@ void ConnPool::remove_conn(int fd) {
/* inform the upper layer the connection will be destroyed */
conn->on_teardown();
update_conn(conn, false);
+ conn->self_ref = nullptr; /* remove the self-cycle */
}
}
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 3d4dc23..8f31ddb 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -32,6 +32,9 @@ target_link_libraries(test_network salticidae_static)
add_executable(test_p2p test_p2p.cpp)
target_link_libraries(test_p2p salticidae_static)
+add_executable(test_p2p_stress test_p2p_stress.cpp)
+target_link_libraries(test_p2p_stress salticidae_static)
+
add_executable(test_queue test_queue.cpp)
target_link_libraries(test_queue salticidae_static pthread)