aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/salticidae/conn.h10
-rw-r--r--include/salticidae/event.h2
-rw-r--r--include/salticidae/network.h35
-rw-r--r--src/conn.cpp17
-rw-r--r--test/bench_network.cpp2
-rw-r--r--test/bench_network_tls.cpp2
6 files changed, 31 insertions, 37 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index c34521a..a1b5633 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -68,7 +68,6 @@ class ConnPool {
enum ConnMode {
ACTIVE, /**< the connection is established by connect() */
PASSIVE, /**< the connection is established by accept() */
- DEAD, /**< the connection is dead */
};
protected:
@@ -77,7 +76,7 @@ class ConnPool {
int fd;
Worker *worker;
ConnPool *cpool;
- std::atomic<ConnMode> mode;
+ ConnMode mode;
NetAddr addr;
MPSCWriteBuffer send_buffer;
@@ -115,7 +114,7 @@ class ConnPool {
Conn(Conn &&other) = delete;
virtual ~Conn() {
- std::atomic_thread_fence(std::memory_order_acquire);
+ //std::atomic_thread_fence(std::memory_order_acquire);
SALTICIDAE_LOG_INFO("destroyed %s", std::string(*this).c_str());
}
@@ -132,7 +131,6 @@ class ConnPool {
const X509 *get_peer_cert() const { return peer_cert.get(); }
ConnMode get_mode() const { return mode; }
ConnPool *get_pool() const { return cpool; }
- MPSCWriteBuffer &get_send_buffer() { return send_buffer; }
/** Write data to the connection (non-blocking). The data will be sent
* whenever I/O is available. */
@@ -234,8 +232,7 @@ class ConnPool {
}
void enable_send_buffer(const conn_t &conn, int client_fd) {
- conn->get_send_buffer()
- .get_queue()
+ conn->send_buffer.get_queue()
.reg_handler(this->ec, [conn, client_fd]
(MPSCWriteBuffer::queue_t &) {
if (conn->ready_send)
@@ -252,7 +249,6 @@ class ConnPool {
/* the caller should finalize all the preparation */
tcall.async_call([this, conn, client_fd](ThreadCall::Handle &) {
try {
- assert(conn->mode != Conn::ConnMode::DEAD);
auto cpool = conn->cpool;
if (cpool->enable_tls)
{
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index 9b8bea1..78ae12d 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -305,6 +305,8 @@ class TimedFdEvent: public FdEvent, public TimerEvent {
return *this;
}
+ ~TimedFdEvent() { clear(); }
+
void clear() {
TimerEvent::clear();
FdEvent::clear();
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index f5fba90..2b0c5b3 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -282,7 +282,11 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
public:
Conn(): MsgNet::Conn(), peer(nullptr) {}
NetAddr get_peer_addr() {
- return peer ? peer->peer_addr : NetAddr();
+ auto ret = *(static_cast<NetAddr *>(
+ get_net()->disp_tcall->call([this](ThreadCall::Handle &h) {
+ h.set_result(peer ? peer->peer_addr : NetAddr());
+ }).get()));
+ return ret;
}
PeerNetwork *get_net() {
@@ -344,7 +348,10 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
ev_ping_timer.del();
}
public:
- ~Peer() {}
+ ~Peer() {
+ if (inbound_conn) inbound_conn->peer = nullptr;
+ if (outbound_conn) outbound_conn->peer = nullptr;
+ }
};
std::unordered_map<NetAddr, conn_t> pending_peers;
@@ -601,9 +608,7 @@ void PeerNetwork<O, _, __>::on_setup(const ConnPool::conn_t &_conn) {
assert(!ev_timeout);
ev_timeout = TimerEvent(worker->get_ec(), [=](TimerEvent &) {
try {
- SALTICIDAE_LOG_INFO("peer ping-pong timeout %s <-> %s",
- std::string(listen_addr).c_str(),
- std::string(conn->get_peer_addr()).c_str());
+ SALTICIDAE_LOG_INFO("peer ping-pong timeout");
this->worker_terminate(conn);
} catch (...) { worker->error_callback(std::current_exception()); }
});
@@ -735,8 +740,7 @@ void PeerNetwork<O, _, __>::replace_conn(const conn_t &conn) {
auto &old_conn = it->second;
if (old_conn != conn)
{
- if (old_conn->get_mode() != Conn::ConnMode::DEAD)
- this->disp_terminate(old_conn);
+ this->disp_terminate(old_conn);
pending_peers.erase(it);
}
}
@@ -768,12 +772,11 @@ template<typename O, O _, O __>
void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) {
this->disp_tcall->async_call([this, conn, msg=std::move(msg)](ThreadCall::Handle &) {
try {
- auto conn_mode = conn->get_mode();
- if (conn_mode == ConnPool::Conn::DEAD) return;
+ if (conn->is_terminated()) return;
if (!msg.claimed_addr.is_null())
{
auto peer_id = gen_peer_id(conn, msg.claimed_addr, msg.nonce);
- if (conn_mode == Conn::ConnMode::PASSIVE)
+ if (conn->get_mode() == Conn::ConnMode::PASSIVE)
{
pinfo_slock_t _g(known_peers_lock);
pinfo_ulock_t __g(pid2peer_lock);
@@ -799,7 +802,7 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) {
return;
}
auto &old_conn = p->inbound_conn;
- if (old_conn && old_conn->get_mode() != Conn::ConnMode::DEAD)
+ if (old_conn && !old_conn->is_terminated())
{
SALTICIDAE_LOG_DEBUG("%s terminating old connection %s",
std::string(listen_addr).c_str(),
@@ -839,12 +842,11 @@ template<typename O, O _, O __>
void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) {
this->disp_tcall->async_call([this, conn, msg=std::move(msg)](ThreadCall::Handle &) {
try {
- auto conn_mode = conn->get_mode();
- if (conn_mode == ConnPool::Conn::DEAD) return;
+ if (conn->is_terminated()) return;
if (!msg.claimed_addr.is_null())
{
auto peer_id = gen_peer_id(conn, msg.claimed_addr, msg.nonce);
- if (conn_mode == Conn::ConnMode::ACTIVE)
+ if (conn->get_mode() == Conn::ConnMode::ACTIVE)
{
pinfo_ulock_t _g(known_peers_lock);
pinfo_ulock_t __g(pid2peer_lock);
@@ -861,7 +863,7 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) {
return;
}
auto &old_conn = p->outbound_conn;
- if (old_conn && old_conn->get_mode() != Conn::ConnMode::DEAD)
+ if (old_conn && !old_conn->is_terminated())
{
SALTICIDAE_LOG_DEBUG("%s terminating old connection %s",
std::string(listen_addr).c_str(),
@@ -895,7 +897,7 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) {
{
SALTICIDAE_LOG_WARN("multiple peer addresses share the same identity");
known_peers.erase(old_peer_addr);
- if (p->conn && p->conn->get_mode() != Conn::ConnMode::DEAD)
+ if (p->conn && !p->conn->is_terminated())
this->disp_terminate(p->conn);
}
old_peer_addr = peer_addr;
@@ -1182,7 +1184,6 @@ typedef struct clientnetwork_conn_t clientnetwork_conn_t;
typedef enum msgnetwork_conn_mode_t {
CONN_MODE_ACTIVE,
CONN_MODE_PASSIVE,
- CONN_MODE_DEAD
} msgnetwork_conn_mode_t;
typedef enum peernetwork_id_mode_t {
diff --git a/src/conn.cpp b/src/conn.cpp
index b6c6c71..f26c480 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -45,7 +45,6 @@ ConnPool::Conn::operator std::string() const {
{
case Conn::ACTIVE: s << "active"; break;
case Conn::PASSIVE: s << "passive"; break;
- case Conn::DEAD: s << "dead"; break;
}
s << ">";
return std::move(s);
@@ -244,15 +243,10 @@ void ConnPool::Conn::_recv_data_tls_handshake(const conn_t &conn, int, int) {
void ConnPool::Conn::_recv_data_dummy(const conn_t &, int, int) {}
void ConnPool::Conn::stop() {
- if (mode != ConnMode::DEAD)
- {
- if (worker) worker->unfeed();
- if (tls) tls->shutdown();
- ev_socket.clear();
- ev_connect.clear();
- send_buffer.get_queue().unreg_handler();
- mode = ConnMode::DEAD;
- }
+ if (worker) worker->unfeed();
+ if (tls) tls->shutdown();
+ ev_socket.clear();
+ send_buffer.get_queue().unreg_handler();
}
void ConnPool::worker_terminate(const conn_t &conn) {
@@ -414,13 +408,14 @@ void ConnPool::del_conn(const conn_t &conn) {
pool.erase(it);
update_conn(conn, false);
release_conn(conn);
+ //std::atomic_thread_fence(std::memory_order_release);
}
void ConnPool::release_conn(const conn_t &conn) {
/* inform the upper layer the connection will be destroyed */
+ conn->ev_connect.clear();
on_teardown(conn);
::close(conn->fd);
- std::atomic_thread_fence(std::memory_order_release);
}
ConnPool::conn_t ConnPool::add_conn(const conn_t &conn) {
diff --git a/test/bench_network.cpp b/test/bench_network.cpp
index 64e53c2..ed2642a 100644
--- a/test/bench_network.cpp
+++ b/test/bench_network.cpp
@@ -106,7 +106,7 @@ struct MyNet: public MsgNetworkByteOp {
/* send the first message through this connection */
trigger = [this, conn](ThreadCall::Handle &) {
send_msg(MsgBytes(256), salticidae::static_pointer_cast<Conn>(conn));
- if (conn->get_mode() != MyNet::Conn::DEAD)
+ if (!conn->is_terminated())
tcall.async_call(trigger);
};
tcall.async_call(trigger);
diff --git a/test/bench_network_tls.cpp b/test/bench_network_tls.cpp
index cb466ad..89e39a8 100644
--- a/test/bench_network_tls.cpp
+++ b/test/bench_network_tls.cpp
@@ -110,7 +110,7 @@ struct MyNet: public MsgNetworkByteOp {
/* send the first message through this connection */
trigger = [this, conn](ThreadCall::Handle &) {
send_msg(MsgBytes(256), salticidae::static_pointer_cast<Conn>(conn));
- if (conn->get_mode() != MyNet::Conn::DEAD)
+ if (!conn->is_terminated())
tcall.async_call(trigger);
};
tcall.async_call(trigger);