diff options
-rw-r--r-- | include/salticidae/conn.h | 10 | ||||
-rw-r--r-- | include/salticidae/event.h | 2 | ||||
-rw-r--r-- | include/salticidae/network.h | 35 | ||||
-rw-r--r-- | src/conn.cpp | 17 | ||||
-rw-r--r-- | test/bench_network.cpp | 2 | ||||
-rw-r--r-- | test/bench_network_tls.cpp | 2 |
6 files changed, 31 insertions, 37 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index c34521a..a1b5633 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -68,7 +68,6 @@ class ConnPool { enum ConnMode { ACTIVE, /**< the connection is established by connect() */ PASSIVE, /**< the connection is established by accept() */ - DEAD, /**< the connection is dead */ }; protected: @@ -77,7 +76,7 @@ class ConnPool { int fd; Worker *worker; ConnPool *cpool; - std::atomic<ConnMode> mode; + ConnMode mode; NetAddr addr; MPSCWriteBuffer send_buffer; @@ -115,7 +114,7 @@ class ConnPool { Conn(Conn &&other) = delete; virtual ~Conn() { - std::atomic_thread_fence(std::memory_order_acquire); + //std::atomic_thread_fence(std::memory_order_acquire); SALTICIDAE_LOG_INFO("destroyed %s", std::string(*this).c_str()); } @@ -132,7 +131,6 @@ class ConnPool { const X509 *get_peer_cert() const { return peer_cert.get(); } ConnMode get_mode() const { return mode; } ConnPool *get_pool() const { return cpool; } - MPSCWriteBuffer &get_send_buffer() { return send_buffer; } /** Write data to the connection (non-blocking). The data will be sent * whenever I/O is available. */ @@ -234,8 +232,7 @@ class ConnPool { } void enable_send_buffer(const conn_t &conn, int client_fd) { - conn->get_send_buffer() - .get_queue() + conn->send_buffer.get_queue() .reg_handler(this->ec, [conn, client_fd] (MPSCWriteBuffer::queue_t &) { if (conn->ready_send) @@ -252,7 +249,6 @@ class ConnPool { /* the caller should finalize all the preparation */ tcall.async_call([this, conn, client_fd](ThreadCall::Handle &) { try { - assert(conn->mode != Conn::ConnMode::DEAD); auto cpool = conn->cpool; if (cpool->enable_tls) { diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 9b8bea1..78ae12d 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -305,6 +305,8 @@ class TimedFdEvent: public FdEvent, public TimerEvent { return *this; } + ~TimedFdEvent() { clear(); } + void clear() { TimerEvent::clear(); FdEvent::clear(); diff --git a/include/salticidae/network.h b/include/salticidae/network.h index f5fba90..2b0c5b3 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -282,7 +282,11 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { public: Conn(): MsgNet::Conn(), peer(nullptr) {} NetAddr get_peer_addr() { - return peer ? peer->peer_addr : NetAddr(); + auto ret = *(static_cast<NetAddr *>( + get_net()->disp_tcall->call([this](ThreadCall::Handle &h) { + h.set_result(peer ? peer->peer_addr : NetAddr()); + }).get())); + return ret; } PeerNetwork *get_net() { @@ -344,7 +348,10 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { ev_ping_timer.del(); } public: - ~Peer() {} + ~Peer() { + if (inbound_conn) inbound_conn->peer = nullptr; + if (outbound_conn) outbound_conn->peer = nullptr; + } }; std::unordered_map<NetAddr, conn_t> pending_peers; @@ -601,9 +608,7 @@ void PeerNetwork<O, _, __>::on_setup(const ConnPool::conn_t &_conn) { assert(!ev_timeout); ev_timeout = TimerEvent(worker->get_ec(), [=](TimerEvent &) { try { - SALTICIDAE_LOG_INFO("peer ping-pong timeout %s <-> %s", - std::string(listen_addr).c_str(), - std::string(conn->get_peer_addr()).c_str()); + SALTICIDAE_LOG_INFO("peer ping-pong timeout"); this->worker_terminate(conn); } catch (...) { worker->error_callback(std::current_exception()); } }); @@ -735,8 +740,7 @@ void PeerNetwork<O, _, __>::replace_conn(const conn_t &conn) { auto &old_conn = it->second; if (old_conn != conn) { - if (old_conn->get_mode() != Conn::ConnMode::DEAD) - this->disp_terminate(old_conn); + this->disp_terminate(old_conn); pending_peers.erase(it); } } @@ -768,12 +772,11 @@ template<typename O, O _, O __> void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) { this->disp_tcall->async_call([this, conn, msg=std::move(msg)](ThreadCall::Handle &) { try { - auto conn_mode = conn->get_mode(); - if (conn_mode == ConnPool::Conn::DEAD) return; + if (conn->is_terminated()) return; if (!msg.claimed_addr.is_null()) { auto peer_id = gen_peer_id(conn, msg.claimed_addr, msg.nonce); - if (conn_mode == Conn::ConnMode::PASSIVE) + if (conn->get_mode() == Conn::ConnMode::PASSIVE) { pinfo_slock_t _g(known_peers_lock); pinfo_ulock_t __g(pid2peer_lock); @@ -799,7 +802,7 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) { return; } auto &old_conn = p->inbound_conn; - if (old_conn && old_conn->get_mode() != Conn::ConnMode::DEAD) + if (old_conn && !old_conn->is_terminated()) { SALTICIDAE_LOG_DEBUG("%s terminating old connection %s", std::string(listen_addr).c_str(), @@ -839,12 +842,11 @@ template<typename O, O _, O __> void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) { this->disp_tcall->async_call([this, conn, msg=std::move(msg)](ThreadCall::Handle &) { try { - auto conn_mode = conn->get_mode(); - if (conn_mode == ConnPool::Conn::DEAD) return; + if (conn->is_terminated()) return; if (!msg.claimed_addr.is_null()) { auto peer_id = gen_peer_id(conn, msg.claimed_addr, msg.nonce); - if (conn_mode == Conn::ConnMode::ACTIVE) + if (conn->get_mode() == Conn::ConnMode::ACTIVE) { pinfo_ulock_t _g(known_peers_lock); pinfo_ulock_t __g(pid2peer_lock); @@ -861,7 +863,7 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) { return; } auto &old_conn = p->outbound_conn; - if (old_conn && old_conn->get_mode() != Conn::ConnMode::DEAD) + if (old_conn && !old_conn->is_terminated()) { SALTICIDAE_LOG_DEBUG("%s terminating old connection %s", std::string(listen_addr).c_str(), @@ -895,7 +897,7 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) { { SALTICIDAE_LOG_WARN("multiple peer addresses share the same identity"); known_peers.erase(old_peer_addr); - if (p->conn && p->conn->get_mode() != Conn::ConnMode::DEAD) + if (p->conn && !p->conn->is_terminated()) this->disp_terminate(p->conn); } old_peer_addr = peer_addr; @@ -1182,7 +1184,6 @@ typedef struct clientnetwork_conn_t clientnetwork_conn_t; typedef enum msgnetwork_conn_mode_t { CONN_MODE_ACTIVE, CONN_MODE_PASSIVE, - CONN_MODE_DEAD } msgnetwork_conn_mode_t; typedef enum peernetwork_id_mode_t { diff --git a/src/conn.cpp b/src/conn.cpp index b6c6c71..f26c480 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -45,7 +45,6 @@ ConnPool::Conn::operator std::string() const { { case Conn::ACTIVE: s << "active"; break; case Conn::PASSIVE: s << "passive"; break; - case Conn::DEAD: s << "dead"; break; } s << ">"; return std::move(s); @@ -244,15 +243,10 @@ void ConnPool::Conn::_recv_data_tls_handshake(const conn_t &conn, int, int) { void ConnPool::Conn::_recv_data_dummy(const conn_t &, int, int) {} void ConnPool::Conn::stop() { - if (mode != ConnMode::DEAD) - { - if (worker) worker->unfeed(); - if (tls) tls->shutdown(); - ev_socket.clear(); - ev_connect.clear(); - send_buffer.get_queue().unreg_handler(); - mode = ConnMode::DEAD; - } + if (worker) worker->unfeed(); + if (tls) tls->shutdown(); + ev_socket.clear(); + send_buffer.get_queue().unreg_handler(); } void ConnPool::worker_terminate(const conn_t &conn) { @@ -414,13 +408,14 @@ void ConnPool::del_conn(const conn_t &conn) { pool.erase(it); update_conn(conn, false); release_conn(conn); + //std::atomic_thread_fence(std::memory_order_release); } void ConnPool::release_conn(const conn_t &conn) { /* inform the upper layer the connection will be destroyed */ + conn->ev_connect.clear(); on_teardown(conn); ::close(conn->fd); - std::atomic_thread_fence(std::memory_order_release); } ConnPool::conn_t ConnPool::add_conn(const conn_t &conn) { diff --git a/test/bench_network.cpp b/test/bench_network.cpp index 64e53c2..ed2642a 100644 --- a/test/bench_network.cpp +++ b/test/bench_network.cpp @@ -106,7 +106,7 @@ struct MyNet: public MsgNetworkByteOp { /* send the first message through this connection */ trigger = [this, conn](ThreadCall::Handle &) { send_msg(MsgBytes(256), salticidae::static_pointer_cast<Conn>(conn)); - if (conn->get_mode() != MyNet::Conn::DEAD) + if (!conn->is_terminated()) tcall.async_call(trigger); }; tcall.async_call(trigger); diff --git a/test/bench_network_tls.cpp b/test/bench_network_tls.cpp index cb466ad..89e39a8 100644 --- a/test/bench_network_tls.cpp +++ b/test/bench_network_tls.cpp @@ -110,7 +110,7 @@ struct MyNet: public MsgNetworkByteOp { /* send the first message through this connection */ trigger = [this, conn](ThreadCall::Handle &) { send_msg(MsgBytes(256), salticidae::static_pointer_cast<Conn>(conn)); - if (conn->get_mode() != MyNet::Conn::DEAD) + if (!conn->is_terminated()) tcall.async_call(trigger); }; tcall.async_call(trigger); |