diff options
author | Determinant <[email protected]> | 2019-06-23 02:00:19 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2019-06-23 02:00:19 -0400 |
commit | 13e2643356bfcdeffd9f6e854f07ee68f29dc23b (patch) | |
tree | 8fae57e59a8a82676ef17d321b03cf4525933b87 | |
parent | da9410b41cf56340fd4a3e5148df704c9c0e139c (diff) |
get rid of `self()` and `release_self()`; fix hidden bugs
-rw-r--r-- | include/salticidae/conn.h | 63 | ||||
-rw-r--r-- | include/salticidae/event.h | 5 | ||||
-rw-r--r-- | include/salticidae/network.h | 79 | ||||
-rw-r--r-- | src/conn.cpp | 122 | ||||
-rw-r--r-- | test/bench_network.cpp | 1 | ||||
-rw-r--r-- | test/bench_network_tls.cpp | 1 | ||||
-rw-r--r-- | test/test_msgnet.cpp | 4 |
7 files changed, 136 insertions, 139 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index ceec176..9e2408f 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -59,6 +59,7 @@ class ConnPool { using conn_t = ArcObj<Conn>; /** The type of callback invoked when connection status is changed. */ using conn_callback_t = std::function<bool(const conn_t &, bool)>; + /** The type of callback invoked when an error occured (during async execution). */ using error_callback_t = std::function<void(const std::exception_ptr, bool)>; /** Abstraction for a bi-directional connection. */ class Conn { @@ -71,9 +72,8 @@ class ConnPool { }; protected: + std::atomic<bool> terminated; size_t seg_buff_size; - conn_t self_ref; - std::mutex ref_mlock; int fd; Worker *worker; ConnPool *cpool; @@ -85,7 +85,6 @@ class ConnPool { TimedFdEvent ev_connect; FdEvent ev_socket; - TimerEvent ev_send_wait; /** does not need to wait if true */ bool ready_send; @@ -104,15 +103,8 @@ class ConnPool { static socket_io_func _send_data_tls_handshake; static socket_io_func _recv_data_dummy; - void conn_server(int, int); - - /** Terminate the connection (from the worker thread). */ - void worker_terminate(); - /** Terminate the connection (from the dispatcher thread). */ - void disp_terminate(); - public: - Conn(): worker(nullptr), ready_send(false), + Conn(): terminated(false), worker(nullptr), ready_send(false), send_data_func(nullptr), recv_data_func(nullptr), tls(nullptr), peer_cert(nullptr) {} Conn(const Conn &) = delete; @@ -122,15 +114,12 @@ class ConnPool { SALTICIDAE_LOG_INFO("destroyed %s", std::string(*this).c_str()); } - /** Get the handle to itself. */ - conn_t self() { - mutex_lg_t _(ref_mlock); - return self_ref; + bool is_terminated() { + return terminated.load(std::memory_order_acquire); } - void release_self() { - mutex_lg_t _(ref_mlock); - self_ref = nullptr; + bool set_terminated() { + return !terminated.exchange(true, std::memory_order_acq_rel); } operator std::string() const; @@ -150,12 +139,6 @@ class ConnPool { /** Close the IO and clear all on-going or planned events. Remove the * connection from a Worker. */ virtual void stop(); - /** Called when new data is available. */ - virtual void on_read() {} - /** Called when the underlying connection is established. */ - virtual void on_setup() {} - /** Called when the underlying connection breaks. */ - virtual void on_teardown() {} }; protected: @@ -168,6 +151,18 @@ class ConnPool { worker_error_callback_t disp_error_cb; worker_error_callback_t worker_error_cb; + /** Terminate the connection (from the worker thread). */ + void worker_terminate(const conn_t &conn); + /** Terminate the connection (from the dispatcher thread). */ + void disp_terminate(const conn_t &conn); + + void conn_server(const conn_t &conn, int, int); + /** Called when new data is available. */ + virtual void on_read(const conn_t &) {} + /** Called when the underlying connection is established. */ + virtual void on_setup(const conn_t &) {} + /** Called when the underlying connection breaks. */ + virtual void on_teardown(const conn_t &) {} private: const int max_listen_backlog; @@ -195,11 +190,11 @@ class ConnPool { bool ret = !conn_cb || conn_cb(conn, connected); if (enable_tls && connected) { - conn->worker->get_tcall()->async_call([conn, ret](ThreadCall::Handle &) { + conn->worker->get_tcall()->async_call([this, conn, ret](ThreadCall::Handle &) { if (ret) conn->recv_data_func = Conn::_recv_data_tls; else - conn->worker_terminate(); + worker_terminate(conn); }); } }); @@ -214,6 +209,7 @@ class ConnPool { ConnPool::worker_error_callback_t on_fatal_error; public: + Worker(): tcall(ec), disp_flag(false), nconn(0) {} void set_error_callback(ConnPool::worker_error_callback_t _on_error) { @@ -233,6 +229,7 @@ class ConnPool { /* the caller should finalize all the preparation */ tcall.async_call([this, conn, client_fd](ThreadCall::Handle &) { try { + conn->ev_connect.clear(); if (conn->mode == Conn::ConnMode::DEAD) { SALTICIDAE_LOG_INFO("worker %x discarding dead connection", @@ -258,6 +255,7 @@ class ConnPool { SALTICIDAE_LOG_INFO("worker %x got %s", std::this_thread::get_id(), std::string(*conn).c_str()); + assert(conn->worker == this); conn->get_send_buffer() .get_queue() .reg_handler(this->ec, [conn, client_fd] @@ -270,7 +268,7 @@ class ConnPool { } return false; }); - conn->ev_socket = FdEvent(ec, client_fd, [this, conn=conn](int fd, int what) { + conn->ev_socket = FdEvent(ec, client_fd, [this, conn](int fd, int what) { try { if (what & FdEvent::READ) conn->recv_data_func(conn, fd, what); @@ -278,7 +276,7 @@ class ConnPool { conn->send_data_func(conn, fd, what); } catch (...) { conn->cpool->recoverable_error(std::current_exception()); - conn->worker_terminate(); + conn->cpool->worker_terminate(conn); } }); conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE); @@ -310,6 +308,7 @@ class ConnPool { void accept_client(int, int); conn_t add_conn(const conn_t &conn); void del_conn(const conn_t &conn); + void release_conn(const conn_t &conn); protected: conn_t _connect(const NetAddr &addr); @@ -494,6 +493,7 @@ class ConnPool { ConnPool(ConnPool &&) = delete; void start() { + std::atomic_thread_fence(std::memory_order_acq_rel); if (system_state) return; SALTICIDAE_LOG_INFO("starting all threads..."); for (size_t i = 0; i < nworker; i++) @@ -516,10 +516,9 @@ class ConnPool { workers[i].get_handle().join(); for (auto it: pool) { - conn_t conn = it.second; + auto &conn = it.second; conn->stop(); - conn->self_ref = nullptr; - ::close(conn->fd); + release_conn(conn); } } @@ -589,7 +588,7 @@ class ConnPool { void terminate(const conn_t &conn) { disp_tcall->async_call([this, conn](ThreadCall::Handle &) { try { - conn->disp_terminate(); + disp_terminate(conn); } catch (...) { disp_error_cb(std::current_exception()); } diff --git a/include/salticidae/event.h b/include/salticidae/event.h index b243865..ad78a6e 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -308,6 +308,7 @@ class TimedFdEvent: public FdEvent, public TimerEvent { void clear() { TimerEvent::clear(); FdEvent::clear(); + callback = nullptr; } using FdEvent::set_callback; @@ -532,7 +533,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> { fd(eventfd(0, EFD_NONBLOCK)) { if (fd == -1) throw SalticidaeError(SALTI_ERROR_FD); } - ~MPSCQueueEventDriven() { close(fd); } + ~MPSCQueueEventDriven() { close(fd); unreg_handler(); } template<typename Func> void reg_handler(const EventContext &ec, Func &&func) { @@ -587,7 +588,7 @@ class MPMCQueueEventDriven: public MPMCQueue<T> { fd(eventfd(0, EFD_NONBLOCK)) { if (fd == -1) throw SalticidaeError(SALTI_ERROR_FD); } - ~MPMCQueueEventDriven() { close(fd); } + ~MPMCQueueEventDriven() { close(fd); unreg_handlers(); } // this function is *NOT* thread-safe template<typename Func> diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 07c6ba5..20dc696 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -109,7 +109,6 @@ class MsgNetwork: public ConnPool { #endif protected: - void on_read() override; }; using conn_t = ArcObj<Conn>; @@ -127,6 +126,7 @@ class MsgNetwork: public ConnPool { protected: ConnPool::Conn *create_conn() override { return new Conn(); } + void on_read(const ConnPool::conn_t &) override; public: @@ -287,12 +287,9 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { protected: void stop() override { - ev_timeout.clear(); + ev_timeout.del(); MsgNet::Conn::stop(); } - - void on_setup() override; - void on_teardown() override; }; using conn_t = ArcObj<Conn>; @@ -326,7 +323,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { if (ev_ping_timer) ev_ping_timer.del(); } - void reset_conn(conn_t conn); + void reset_conn(const conn_t &conn); }; std::unordered_map<NetAddr, BoxObj<Peer>> id2peer; @@ -381,6 +378,8 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { virtual double gen_conn_timeout() { return gen_rand_timeout(retry_conn_delay); } + void on_setup(const ConnPool::conn_t &) override; + void on_teardown(const ConnPool::conn_t &) override; public: @@ -466,11 +465,13 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { /* this callback is run by a worker */ template<typename OpcodeType> -void MsgNetwork<OpcodeType>::Conn::on_read() { - ConnPool::Conn::on_read(); - auto &recv_buffer = this->recv_buffer; - auto mn = get_net(); - while (self_ref) +void MsgNetwork<OpcodeType>::on_read(const ConnPool::conn_t &_conn) { + ConnPool::on_read(_conn); + auto conn = static_pointer_cast<Conn>(_conn); + auto &recv_buffer = conn->recv_buffer; + auto &msg = conn->msg; + auto &msg_state = conn->msg_state; + while (true) //(!conn->is_terminated()) { if (msg_state == Conn::HEADER) { @@ -493,8 +494,7 @@ void MsgNetwork<OpcodeType>::Conn::on_read() { return; } #endif - auto conn = static_pointer_cast<Conn>(self()); - while (!mn->incoming_msgs.enqueue(std::make_pair(msg, conn), false)) + while (!incoming_msgs.enqueue(std::make_pair(msg, conn), false)) std::this_thread::yield(); } } @@ -550,46 +550,47 @@ void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker, /* begin: functions invoked by the dispatcher */ template<typename O, O _, O __> -void PeerNetwork<O, _, __>::Conn::on_setup() { - MsgNet::Conn::on_setup(); - auto pn = get_net(); - auto conn = static_pointer_cast<Conn>(this->self()); - auto worker = this->worker; +void PeerNetwork<O, _, __>::on_setup(const ConnPool::conn_t &_conn) { + MsgNet::on_setup(_conn); + auto conn = static_pointer_cast<Conn>(_conn); + auto worker = conn->worker; + auto &ev_timeout = conn->ev_timeout; assert(!ev_timeout); ev_timeout = TimerEvent(worker->get_ec(), [worker, conn](TimerEvent &) { try { SALTICIDAE_LOG_INFO("peer ping-pong timeout"); - conn->worker_terminate(); + conn->get_net()->worker_terminate(conn); } catch (...) { worker->error_callback(std::current_exception()); } }); /* the initial ping-pong to set up the connection */ - tcall_reset_timeout(worker, conn, pn->conn_timeout); - pn->send_msg(MsgPing(pn->listen_port), conn); + tcall_reset_timeout(worker, conn, conn_timeout); + send_msg(MsgPing(listen_port), conn); } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::Conn::on_teardown() { - MsgNet::Conn::on_teardown(); - auto pn = get_net(); - auto p = pn->get_peer(peer_id); +void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { + MsgNet::on_teardown(_conn); + auto conn = static_pointer_cast<Conn>(_conn); + conn->ev_timeout.clear(); + const auto &peer_id = conn->peer_id; + auto p = get_peer(peer_id); if (!p) return; - if (this != p->conn.get()) return; + if (conn != p->conn) return; p->ev_ping_timer.del(); p->connected = false; //p->conn = nullptr; - SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*this).c_str()); + SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*conn).c_str()); // try to reconnect - p->ev_retry_timer = TimerEvent(pn->disp_ec, - [pn, peer_id = this->peer_id](TimerEvent &) { + p->ev_retry_timer = TimerEvent(this->disp_ec, [this, peer_id](TimerEvent &) { try { - pn->start_active_conn(peer_id); - } catch (...) { pn->disp_error_cb(std::current_exception()); } + start_active_conn(peer_id); + } catch (...) { this->disp_error_cb(std::current_exception()); } }); - p->ev_retry_timer.add(pn->gen_conn_timeout()); + p->ev_retry_timer.add(gen_conn_timeout()); } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) { +void PeerNetwork<O, _, __>::Peer::reset_conn(const conn_t &new_conn) { if (conn != new_conn) { if (conn) @@ -597,7 +598,8 @@ void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) { //SALTICIDAE_LOG_DEBUG("moving send buffer"); //new_conn->move_send_buffer(conn); SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str()); - conn->disp_terminate(); + auto net = conn->get_net(); + net->disp_terminate(conn); } addr = new_conn->get_addr(); conn = new_conn; @@ -656,7 +658,7 @@ bool PeerNetwork<O, _, __>::check_new_conn(const conn_t &conn, uint16_t port) { } else { - conn->disp_terminate(); + this->disp_terminate(conn); return true; } } @@ -665,7 +667,7 @@ bool PeerNetwork<O, _, __>::check_new_conn(const conn_t &conn, uint16_t port) { { if (conn != p->conn) { - conn->disp_terminate(); + this->disp_terminate(conn); return true; } return false; @@ -797,7 +799,7 @@ void PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) { auto it = id2peer.find(addr); if (it == id2peer.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - it->second->conn->disp_terminate(); + this->disp_terminate(it->second->conn); id2peer.erase(it); } catch (const PeerNetworkError &) { this->recoverable_error(std::current_exception()); @@ -899,8 +901,7 @@ void ClientNetwork<OpcodeType>::Conn::on_setup() { auto cn = get_net(); cn->addr2conn.erase(addr); cn->addr2conn.insert( - std::make_pair(addr, - static_pointer_cast<Conn>(this->self()))); + std::make_pair(addr, static_pointer_cast<Conn>(this->self()))); } template<typename OpcodeType> diff --git a/src/conn.cpp b/src/conn.cpp index 931e915..1f5e324 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -56,7 +56,7 @@ ConnPool::Conn::operator std::string() const { void ConnPool::Conn::_send_data(const ConnPool::conn_t &conn, int fd, int events) { if (events & FdEvent::ERROR) { - conn->worker_terminate(); + conn->cpool->worker_terminate(conn); return; } ssize_t ret = conn->seg_buff_size; @@ -77,7 +77,7 @@ void ConnPool::Conn::_send_data(const ConnPool::conn_t &conn, int fd, int events if (ret < 0 && errno != EWOULDBLOCK) { SALTICIDAE_LOG_INFO("send(%d) failure: %s", fd, strerror(errno)); - conn->worker_terminate(); + conn->cpool->worker_terminate(conn); return; } } @@ -100,7 +100,7 @@ void ConnPool::Conn::_send_data(const ConnPool::conn_t &conn, int fd, int events void ConnPool::Conn::_recv_data(const ConnPool::conn_t &conn, int fd, int events) { if (events & FdEvent::ERROR) { - conn->worker_terminate(); + conn->cpool->worker_terminate(conn); return; } const size_t seg_buff_size = conn->seg_buff_size; @@ -116,27 +116,27 @@ void ConnPool::Conn::_recv_data(const ConnPool::conn_t &conn, int fd, int events if (errno == EWOULDBLOCK) break; SALTICIDAE_LOG_INFO("recv(%d) failure: %s", fd, strerror(errno)); /* connection err or half-opened connection */ - conn->worker_terminate(); + conn->cpool->worker_terminate(conn); return; } if (ret == 0) { //SALTICIDAE_LOG_INFO("recv(%d) terminates", fd, strerror(errno)); - conn->worker_terminate(); + conn->cpool->worker_terminate(conn); return; } buff_seg.resize(ret); conn->recv_buffer.push(std::move(buff_seg)); } //ev_read.add(); - conn->on_read(); + conn->cpool->on_read(conn); } void ConnPool::Conn::_send_data_tls(const ConnPool::conn_t &conn, int fd, int events) { if (events & FdEvent::ERROR) { - conn->worker_terminate(); + conn->cpool->worker_terminate(conn); return; } ssize_t ret = conn->seg_buff_size; @@ -158,7 +158,7 @@ void ConnPool::Conn::_send_data_tls(const ConnPool::conn_t &conn, int fd, int ev if (ret < 0 && tls->get_error(ret) != SSL_ERROR_WANT_WRITE) { SALTICIDAE_LOG_INFO("send(%d) failure: %s", fd, strerror(errno)); - conn->worker_terminate(); + conn->cpool->worker_terminate(conn); return; } } @@ -180,7 +180,7 @@ void ConnPool::Conn::_send_data_tls(const ConnPool::conn_t &conn, int fd, int ev void ConnPool::Conn::_recv_data_tls(const ConnPool::conn_t &conn, int fd, int events) { if (events & FdEvent::ERROR) { - conn->worker_terminate(); + conn->cpool->worker_terminate(conn); return; } const size_t seg_buff_size = conn->seg_buff_size; @@ -197,18 +197,18 @@ void ConnPool::Conn::_recv_data_tls(const ConnPool::conn_t &conn, int fd, int ev if (tls->get_error(ret) == SSL_ERROR_WANT_READ) break; SALTICIDAE_LOG_INFO("recv(%d) failure: %s", fd, strerror(errno)); /* connection err or half-opened connection */ - conn->worker_terminate(); + conn->cpool->worker_terminate(conn); return; } if (ret == 0) { - conn->worker_terminate(); + conn->cpool->worker_terminate(conn); return; } buff_seg.resize(ret); conn->recv_buffer.push(std::move(buff_seg)); } - conn->on_read(); + conn->cpool->on_read(conn); } void ConnPool::Conn::_send_data_tls_handshake(const ConnPool::conn_t &conn, int fd, int events) { @@ -234,44 +234,43 @@ void ConnPool::Conn::_recv_data_tls_handshake(const ConnPool::conn_t &conn, int, } } - void ConnPool::Conn::_recv_data_dummy(const ConnPool::conn_t &, int, int) { } -/****/ void ConnPool::Conn::stop() { if (mode != ConnMode::DEAD) { if (worker) worker->unfeed(); if (tls) tls->shutdown(); - ev_connect.clear(); - ev_socket.clear(); + ev_connect.del(); + ev_socket.del(); send_buffer.get_queue().unreg_handler(); mode = ConnMode::DEAD; } } -void ConnPool::Conn::worker_terminate() { - auto conn = self(); - if (!conn) return; - stop(); - if (!worker->is_dispatcher()) - cpool->disp_tcall->async_call( - [cpool=this->cpool, conn](ThreadCall::Handle &) { - cpool->del_conn(conn); +void ConnPool::worker_terminate(const conn_t &conn) { + conn->worker->get_tcall()->async_call([this, conn](ThreadCall::Handle &) { + if (!conn->set_terminated()) return; + conn->stop(); + disp_tcall->async_call([this, conn](ThreadCall::Handle &) { + del_conn(conn); }); - else cpool->del_conn(conn); + }); } -void ConnPool::Conn::disp_terminate() { - auto conn = self(); - if (!conn) return; - if (worker && !worker->is_dispatcher()) - worker->get_tcall()->call([conn](ThreadCall::Handle &) { +/****/ + +void ConnPool::disp_terminate(const conn_t &conn) { + auto worker = conn->worker; + if (worker) + worker_terminate(conn); + else + disp_tcall->async_call([this, conn](ThreadCall::Handle &) { + if (!conn->set_terminated()) return; conn->stop(); + del_conn(conn); }); - else stop(); - cpool->del_conn(conn); } void ConnPool::accept_client(int fd, int) { @@ -291,7 +290,6 @@ void ConnPool::accept_client(int fd, int) { NetAddr addr((struct sockaddr_in *)&client_addr); conn_t conn = create_conn(); - conn->self_ref = conn; conn->send_buffer.set_capacity(queue_capacity); conn->seg_buff_size = seg_buff_size; conn->fd = client_fd; @@ -299,31 +297,31 @@ void ConnPool::accept_client(int fd, int) { conn->mode = Conn::PASSIVE; conn->addr = addr; add_conn(conn); + //SALTICIDAE_LOG_INFO("new %x", (uintptr_t)conn.get()); SALTICIDAE_LOG_INFO("accepted %s", std::string(*conn).c_str()); auto &worker = select_worker(); conn->worker = &worker; - conn->on_setup(); + on_setup(conn); worker.feed(conn, client_fd); } } catch (...) { recoverable_error(std::current_exception()); } } -void ConnPool::Conn::conn_server(int fd, int events) { - auto conn = self(); /* pin the connection */ - if (!conn) return; +void ConnPool::conn_server(const conn_t &conn, int fd, int events) { if (send(fd, "", 0, MSG_NOSIGNAL) == 0) { - ev_connect.clear(); - SALTICIDAE_LOG_INFO("connected to remote %s", std::string(*this).c_str()); - worker = &(cpool->select_worker()); - on_setup(); - worker->feed(conn, fd); + conn->ev_connect.del(); + SALTICIDAE_LOG_INFO("connected to remote %s", std::string(*conn).c_str()); + auto &worker = select_worker(); + conn->worker = &worker; + on_setup(conn); + worker.feed(conn, fd); } else { if (events & TimedFdEvent::TIMEOUT) - SALTICIDAE_LOG_INFO("%s connect timeout", std::string(*this).c_str()); - conn->disp_terminate(); + SALTICIDAE_LOG_INFO("%s connect timeout", std::string(*conn).c_str()); + disp_terminate(conn); return; } } @@ -370,14 +368,14 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) { if (fcntl(fd, F_SETFL, O_NONBLOCK) == -1) throw ConnPoolError(SALTI_ERROR_CONNECT, errno); conn_t conn = create_conn(); - conn->self_ref = conn; conn->send_buffer.set_capacity(queue_capacity); conn->seg_buff_size = seg_buff_size; conn->fd = fd; - conn->worker = nullptr; conn->cpool = this; conn->mode = Conn::ACTIVE; conn->addr = addr; + add_conn(conn); + //SALTICIDAE_LOG_INFO("new %x", (uintptr_t)conn.get()); struct sockaddr_in sockin; memset(&sockin, 0, sizeof(struct sockaddr_in)); @@ -389,13 +387,14 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) { sizeof(struct sockaddr_in)) < 0 && errno != EINPROGRESS) { SALTICIDAE_LOG_INFO("cannot connect to %s", std::string(addr).c_str()); - conn->disp_terminate(); + disp_terminate(conn); } else { - conn->ev_connect = TimedFdEvent(disp_ec, conn->fd, std::bind(&Conn::conn_server, conn.get(), _1, _2)); + conn->ev_connect = TimedFdEvent(disp_ec, conn->fd, [this, conn](int fd, int events) { + conn_server(conn, fd, events); + }); conn->ev_connect.add(FdEvent::WRITE, conn_server_timeout); - add_conn(conn); SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str()); } return conn; @@ -403,19 +402,18 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) { void ConnPool::del_conn(const conn_t &conn) { auto it = pool.find(conn->fd); - if (it != pool.end()) - { - /* temporarily pin the conn before it dies */ - auto conn = it->second; - //assert(conn->fd == fd); - pool.erase(it); - /* inform the upper layer the connection will be destroyed */ - conn->on_teardown(); - update_conn(conn, false); - conn->release_self(); /* remove the self-cycle */ - ::close(conn->fd); - conn->fd = -1; - } + assert(it != pool.end()); + pool.erase(it); + update_conn(conn, false); + release_conn(conn); +} + +void ConnPool::release_conn(const conn_t &conn) { + /* inform the upper layer the connection will be destroyed */ + on_teardown(conn); + conn->ev_connect.clear(); + conn->ev_socket.clear(); + ::close(conn->fd); } ConnPool::conn_t ConnPool::add_conn(const conn_t &conn) { diff --git a/test/bench_network.cpp b/test/bench_network.cpp index f8d3070..64e53c2 100644 --- a/test/bench_network.cpp +++ b/test/bench_network.cpp @@ -146,7 +146,6 @@ int main() { try { tec.dispatch(); } catch (std::exception &) {} - SALTICIDAE_LOG_INFO("thread exiting"); }); auto shutdown = [&](int) { tcall->async_call([&](salticidae::ThreadCall::Handle &) { diff --git a/test/bench_network_tls.cpp b/test/bench_network_tls.cpp index 1143877..cb466ad 100644 --- a/test/bench_network_tls.cpp +++ b/test/bench_network_tls.cpp @@ -150,7 +150,6 @@ int main() { try { tec.dispatch(); } catch (std::exception &) {} - SALTICIDAE_LOG_INFO("thread exiting"); }); auto shutdown = [&](int) { tcall->async_call([&](salticidae::ThreadCall::Handle &) { diff --git a/test/test_msgnet.cpp b/test/test_msgnet.cpp index 7635af8..f6dbe1b 100644 --- a/test/test_msgnet.cpp +++ b/test/test_msgnet.cpp @@ -151,8 +151,8 @@ int main() { bob.listen(bob_addr); /* try to connect once */ - alice.connect(bob_addr); - bob.connect(alice_addr); + alice.connect(bob_addr, false); + bob.connect(alice_addr, false); /* the main loop can be shutdown by ctrl-c or kill */ auto shutdown = [&](int) {ec.stop();}; |