diff options
-rw-r--r-- | include/salticidae/conn.h | 9 | ||||
-rw-r--r-- | include/salticidae/event.h | 14 | ||||
-rw-r--r-- | include/salticidae/network.h | 2 | ||||
-rw-r--r-- | src/conn.cpp | 17 | ||||
-rw-r--r-- | test/CMakeLists.txt | 3 |
5 files changed, 28 insertions, 17 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 53465fc..6462ddc 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -71,6 +71,7 @@ class ConnPool { enum ConnMode { ACTIVE, /**< the connection is established by connect() */ PASSIVE, /**< the connection is established by accept() */ + DEAD, /**< the connection is dead */ }; protected: @@ -178,6 +179,12 @@ class ConnPool { void feed(const conn_t &conn, int client_fd) { tcall.call([this, conn, client_fd](ThreadCall::Handle &) { + if (conn->mode == Conn::ConnMode::DEAD) + { + SALTICIDAE_LOG_INFO("worker %x discarding dead connection", + std::this_thread::get_id()); + return; + } SALTICIDAE_LOG_INFO("worker %x got %s", std::this_thread::get_id(), std::string(*conn).c_str()); @@ -193,7 +200,6 @@ class ConnPool { } return false; }); - //auto conn_ptr = conn.get(); conn->ev_socket = Event(ec, client_fd, [conn=conn](int fd, int what) { if (what & Event::READ) conn->recv_data(fd, what); @@ -317,6 +323,7 @@ class ConnPool { { conn_t conn = it.second; conn->stop(); + conn->self_ref = nullptr; } if (listen_fd != -1) close(listen_fd); } diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 3fd11b6..2cda44f 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -125,7 +125,7 @@ class Event { ev_fd(other.ev_fd), ev_timer(other.ev_timer), callback(std::move(other.callback)) { other.del(); - if (fd != -1) + if (ev_fd != nullptr) { other.ev_fd = nullptr; ev_fd->data = this; @@ -143,7 +143,7 @@ class Event { ev_timer = other.ev_timer; callback = std::move(other.callback); - if (fd != -1) + if (ev_fd != nullptr) { other.ev_fd = nullptr; ev_fd->data = this; @@ -181,7 +181,7 @@ class Event { void del() { if (ev_fd) uv_poll_stop(ev_fd); if (ev_timer == nullptr) - assert(ev_timer); + assert(ev_timer); uv_timer_stop(ev_timer); } void add_with_timeout(double t_sec, int events) { @@ -205,11 +205,9 @@ class ThreadNotifier { return data; } void notify(void *_data) { - { - mutex_lg_t _(mlock); - ready = true; - data = _data; - } + mutex_lg_t _(mlock); + ready = true; + data = _data; cv.notify_all(); } }; diff --git a/include/salticidae/network.h b/include/salticidae/network.h index e5f4406..2d16938 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -483,7 +483,7 @@ template<typename O, O _, O __> void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker, const conn_t &conn, double timeout) { worker->get_tcall()->call([conn, t=timeout](ThreadCall::Handle &) { - assert(conn->ev_timeout); + if (!conn->ev_timeout) return; conn->ev_timeout.del(); conn->ev_timeout.add_with_timeout(t, 0); SALTICIDAE_LOG_INFO("reset timeout %.2f", t); diff --git a/src/conn.cpp b/src/conn.cpp index 841002f..ca13619 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -119,13 +119,15 @@ void ConnPool::Conn::recv_data(int fd, int events) { } void ConnPool::Conn::stop() { - if (!self_ref) return; - if (worker) worker->unfeed(); - ev_connect.clear(); - ev_socket.clear(); - send_buffer.get_queue().unreg_handler(); - ::close(fd); - self_ref = nullptr; /* remove the self-cycle */ + if (mode != ConnMode::DEAD) + { + if (worker) worker->unfeed(); + ev_connect.clear(); + ev_socket.clear(); + send_buffer.get_queue().unreg_handler(); + ::close(fd); + mode = ConnMode::DEAD; + } } void ConnPool::Conn::worker_terminate() { @@ -280,6 +282,7 @@ void ConnPool::remove_conn(int fd) { /* inform the upper layer the connection will be destroyed */ conn->on_teardown(); update_conn(conn, false); + conn->self_ref = nullptr; /* remove the self-cycle */ } } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 3d4dc23..8f31ddb 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -32,6 +32,9 @@ target_link_libraries(test_network salticidae_static) add_executable(test_p2p test_p2p.cpp) target_link_libraries(test_p2p salticidae_static) +add_executable(test_p2p_stress test_p2p_stress.cpp) +target_link_libraries(test_p2p_stress salticidae_static) + add_executable(test_queue test_queue.cpp) target_link_libraries(test_queue salticidae_static pthread) |