From ecc163f98e434b557768560d00ee2f9755d6d950 Mon Sep 17 00:00:00 2001 From: Determinant Date: Wed, 14 Nov 2018 22:18:59 -0500 Subject: major bug fix --- include/salticidae/conn.h | 91 ++++++++++++++++++++++++++------------------ include/salticidae/event.h | 67 +++++++++++++++++++++++--------- include/salticidae/network.h | 63 +++++++++++++++++++----------- 3 files changed, 142 insertions(+), 79 deletions(-) (limited to 'include') diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 2ef6b50..73b3022 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -57,13 +57,13 @@ struct ConnPoolError: public SalticidaeError { /** Abstraction for connection management. */ class ConnPool { + class Worker; public: class Conn; /** The handle to a bi-directional connection. */ using conn_t = ArcObj; /** The type of callback invoked when connection status is changed. */ using conn_callback_t = std::function; - /** Abstraction for a bi-directional connection. */ class Conn { friend ConnPool; @@ -77,17 +77,16 @@ class ConnPool { size_t seg_buff_size; conn_t self_ref; int fd; + Worker *worker; ConnPool *cpool; ConnMode mode; NetAddr addr; - // TODO: send_buffer should be a thread-safe mpsc queue MPSCWriteBuffer send_buffer; SegBuffer recv_buffer; - Event ev_read; - Event ev_write; Event ev_connect; + Event ev_socket; /** does not need to wait if true */ bool ready_send; @@ -95,8 +94,6 @@ class ConnPool { void send_data(int, int); void conn_server(int, int); - /** Stop the worker and I/O events. */ - void stop(); /** Terminate the connection. */ void terminate(); @@ -125,10 +122,11 @@ class ConnPool { protected: /** Close the IO and clear all on-going or planned events. */ - virtual void on_close() { - ev_read.clear(); - ev_write.clear(); + virtual void stop() { + if (fd == -1) return; ev_connect.clear(); + ev_socket.clear(); + send_buffer.get_queue().unreg_handler(); ::close(fd); fd = -1; self_ref = nullptr; /* remove the self-cycle */ @@ -142,6 +140,13 @@ class ConnPool { virtual void on_teardown() {} }; + protected: + EventContext ec; + EventContext disp_ec; + ThreadCall* disp_tcall; + /** Should be implemented by derived class to return a new Conn object. */ + virtual Conn *create_conn() = 0; + private: const int max_listen_backlog; const double conn_server_timeout; @@ -164,11 +169,11 @@ class ConnPool { class Worker { EventContext ec; - ThreadCall msgr; + ThreadCall tcall; std::thread handle; public: - Worker(): msgr(ec) {} + Worker(): tcall(ec) {} /* the following functions are called by the dispatcher */ void start() { @@ -176,7 +181,7 @@ class ConnPool { } void feed(const conn_t &conn, int client_fd) { - msgr.call([this, conn, client_fd](ThreadCall::Handle &) { + tcall.call([this, conn, client_fd](ThreadCall::Handle &) { SALTICIDAE_LOG_INFO("worker %x got %s", std::this_thread::get_id(), std::string(*conn).c_str()); @@ -185,11 +190,15 @@ class ConnPool { .reg_handler(this->ec, [conn, client_fd] (MPSCWriteBuffer::queue_t &) { if (conn->ready_send && conn->fd != -1) + { + conn->ev_socket.del(); + conn->ev_socket.add(Event::READ | Event::WRITE); conn->send_data(client_fd, Event::WRITE); + } return false; }); //auto conn_ptr = conn.get(); - conn->ev_read = Event(ec, client_fd, Event::READ | Event::WRITE, [conn=conn](int fd, int what) { + conn->ev_socket = Event(ec, client_fd, [conn=conn](int fd, int what) { if (what & Event::READ) conn->recv_data(fd, what); else @@ -199,19 +208,18 @@ class ConnPool { // std::bind(&Conn::recv_data, conn_ptr, _1, _2)); //conn->ev_write = Event(ec, client_fd, Event::WRITE, // std::bind(&Conn::send_data, conn_ptr, _1, _2)); - conn->ev_read.add(); + conn->ev_socket.add(Event::READ | Event::WRITE); //conn->ev_write.add(); }); } void stop() { - msgr.call([this](ThreadCall::Handle &) { - ec.stop(); - }); + tcall.call([this](ThreadCall::Handle &) { ec.stop(); }); } std::thread &get_handle() { return handle; } const EventContext &get_ec() { return ec; } + ThreadCall *get_tcall() { return &tcall; } }; /* related to workers */ @@ -220,7 +228,7 @@ class ConnPool { void accept_client(int, int); conn_t add_conn(const conn_t &conn); - void terminate(int fd); + void remove_conn(int fd); protected: conn_t _connect(const NetAddr &addr); @@ -244,43 +252,30 @@ class ConnPool { return workers[1]; } - protected: - EventContext ec; - EventContext dispatcher_ec; - BoxObj disp_tcall; - /** Should be implemented by derived class to return a new Conn object. */ - virtual Conn *create_conn() = 0; - public: ConnPool(const EventContext &ec, int max_listen_backlog = 10, double conn_server_timeout = 2, size_t seg_buff_size = 4096, size_t nworker = 2): + ec(ec), max_listen_backlog(max_listen_backlog), conn_server_timeout(conn_server_timeout), seg_buff_size(seg_buff_size), listen_fd(-1), - nworker(std::max((size_t)1, nworker)), - ec(ec) { + nworker(std::max((size_t)1, nworker)) { workers = new Worker[nworker]; - dispatcher_ec = workers[0].get_ec(); - user_tcall = new ThreadCall(ec); - disp_tcall = new ThreadCall(dispatcher_ec); + disp_ec = workers[0].get_ec(); + disp_tcall = workers[0].get_tcall(); } ~ConnPool() { - /* stop all workers */ - for (size_t i = 0; i < nworker; i++) - workers[i].stop(); - /* join all worker threads */ - for (size_t i = 0; i < nworker; i++) - workers[i].get_handle().join(); + stop(); for (auto it: pool) { conn_t conn = it.second; - conn->on_close(); + conn->stop(); } if (listen_fd != -1) close(listen_fd); } @@ -294,6 +289,17 @@ class ConnPool { workers[i].start(); } + void stop() { + SALTICIDAE_LOG_INFO("stopping all threads..."); + /* stop all workers */ + for (size_t i = 0; i < nworker; i++) + workers[i].stop(); + /* join all worker threads */ + for (size_t i = 0; i < nworker; i++) + workers[i].get_handle().join(); + nworker = 0; + } + /** Actively connect to remote addr. */ conn_t connect(const NetAddr &addr, bool blocking = true) { if (blocking) @@ -303,6 +309,9 @@ class ConnPool { auto ptr = new conn_t(_connect(addr)); std::atomic_thread_fence(std::memory_order_release); h.set_result(ptr); + h.set_deleter([](void *data) { + delete static_cast(data); + }); }, true)); auto conn = *ret; delete ret; @@ -328,6 +337,14 @@ class ConnPool { template void reg_conn_handler(Func cb) { conn_cb = cb; } + + void terminate(const conn_t &conn, bool blocking = true) { + int fd = conn->fd; + conn->worker->get_tcall()->call([conn](ThreadCall::Handle &) { + conn->stop(); + }, blocking); + remove_conn(fd); + } }; } diff --git a/include/salticidae/event.h b/include/salticidae/event.h index da27902..616f598 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -42,7 +42,8 @@ struct _event_context_deleter { void operator()(uv_loop_t *ptr) { if (ptr != nullptr) { - uv_loop_close(ptr); + while (uv_loop_close(ptr) == UV_EBUSY) + uv_run(ptr, UV_RUN_NOWAIT); delete ptr; } } @@ -62,8 +63,7 @@ class EventContext: public _event_context_ot { EventContext &operator=(EventContext &&) = default; void dispatch() const { // TODO: improve this loop - for (;;) - uv_run(get(), UV_RUN_ONCE); + uv_run(get(), UV_RUN_DEFAULT); } void stop() const { uv_stop(get()); } }; @@ -79,12 +79,15 @@ class Event { private: EventContext eb; int fd; - int events; uv_poll_t *ev_fd; uv_timer_t *ev_timer; callback_t callback; static inline void fd_then(uv_poll_t *h, int status, int events) { - assert(status == 0); + if (status != 0) + { + SALTICIDAE_LOG_WARN("%s", uv_strerror(status)); + return; + } auto event = static_cast(h->data); event->callback(event->fd, events); } @@ -95,10 +98,14 @@ class Event { event->callback(event->fd, TIMEOUT); } + static void _on_handle_close(uv_handle_t *h) { + delete h; + } + public: Event(): eb(nullptr), ev_fd(nullptr), ev_timer(nullptr) {} - Event(const EventContext &eb, int fd, short events, callback_t callback): - eb(eb), fd(fd), events(events), + Event(const EventContext &eb, int fd, callback_t callback): + eb(eb), fd(fd), ev_fd(nullptr), ev_timer(new uv_timer_t()), callback(callback) { @@ -114,7 +121,7 @@ class Event { Event(const Event &) = delete; Event(Event &&other): - eb(std::move(other.eb)), fd(other.fd), events(other.events), + eb(std::move(other.eb)), fd(other.fd), ev_fd(other.ev_fd), ev_timer(other.ev_timer), callback(std::move(other.callback)) { other.del(); @@ -132,7 +139,6 @@ class Event { other.del(); eb = std::move(other.eb); fd = other.fd; - events = other.events; ev_fd = other.ev_fd; ev_timer = other.ev_timer; callback = std::move(other.callback); @@ -153,26 +159,33 @@ class Event { if (ev_fd != nullptr) { uv_poll_stop(ev_fd); - delete ev_fd; + uv_close((uv_handle_t *)ev_fd, Event::_on_handle_close); ev_fd = nullptr; } if (ev_timer != nullptr) { uv_timer_stop(ev_timer); - delete ev_timer; + uv_close((uv_handle_t *)ev_timer, Event::_on_handle_close); ev_timer = nullptr; } + callback = nullptr; + } + + void set_callback(callback_t _callback) { + callback = _callback; } - void add() { + void add(int events) { if (ev_fd) uv_poll_start(ev_fd, events, Event::fd_then); } void del() { if (ev_fd) uv_poll_stop(ev_fd); + if (ev_timer == nullptr) + assert(ev_timer); uv_timer_stop(ev_timer); } - void add_with_timeout(double t_sec) { - add(); + void add_with_timeout(double t_sec, int events) { + add(events); uv_timer_start(ev_timer, Event::timer_then, uint64_t(t_sec * 1000), 0); } @@ -209,6 +222,7 @@ class ThreadCall { public: class Handle { std::function callback; + std::function deleter; ThreadNotifier* notifier; void *result; friend ThreadCall; @@ -219,6 +233,8 @@ class ThreadCall { if (notifier) notifier->notify(result); } void set_result(void *data) { result = data; } + template + void set_deleter(Func _deleter) { deleter = _deleter; } }; ThreadCall() = default; @@ -227,16 +243,24 @@ class ThreadCall { ThreadCall(EventContext ec): ec(ec) { if (pipe2(ctl_fd, O_NONBLOCK)) throw SalticidaeError(std::string("ThreadCall: failed to create pipe")); - ev_listen = Event(ec, ctl_fd[0], Event::READ, [this](int fd, int) { + ev_listen = Event(ec, ctl_fd[0], [this](int fd, int) { Handle *h; read(fd, &h, sizeof(h)); h->exec(); delete h; }); - ev_listen.add(); + ev_listen.add(Event::READ); } ~ThreadCall() { + ev_listen.clear(); + Handle *h; + while (read(ctl_fd[0], &h, sizeof(h)) == sizeof(h)) + { + if (h->result && h->deleter) + h->deleter(h->result); + delete h; + } close(ctl_fd[0]); close(ctl_fd[1]); } @@ -277,11 +301,14 @@ class MPSCQueueEventDriven: public MPSCQueue { wait_sig(true), fd(eventfd(0, EFD_NONBLOCK)) {} - ~MPSCQueueEventDriven() { close(fd); } + ~MPSCQueueEventDriven() { + ev.clear(); + close(fd); + } template void reg_handler(const EventContext &ec, Func &&func) { - ev = Event(ec, fd, Event::READ, + ev = Event(ec, fd, [this, func=std::forward(func)](int, short) { //fprintf(stderr, "%x\n", std::this_thread::get_id()); uint64_t t; @@ -296,9 +323,11 @@ class MPSCQueueEventDriven: public MPSCQueue { if (func(*this)) write(fd, &dummy, 8); }); - ev.add(); + ev.add(Event::READ); } + void unreg_handler() { ev.clear(); } + template bool enqueue(U &&e) { static const uint64_t dummy = 1; diff --git a/include/salticidae/network.h b/include/salticidae/network.h index e5165bf..a63976b 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -275,9 +275,9 @@ class PeerNetwork: public MsgNetwork { const NetAddr &get_peer() { return peer_id; } protected: - void on_close() override { + void stop() override { ev_timeout.clear(); - MsgNet::Conn::on_close(); + MsgNet::Conn::stop(); } void on_setup() override; @@ -302,7 +302,7 @@ class PeerNetwork: public MsgNetwork { Peer(NetAddr addr, conn_t conn, const EventContext &ec): addr(addr), conn(conn), ev_ping_timer( - Event(ec, -1, 0, std::bind(&Peer::ping_timer, this, _1, _2))), + Event(ec, -1, std::bind(&Peer::ping_timer, this, _1, _2))), connected(false) {} ~Peer() {} Peer &operator=(const Peer &) = delete; @@ -358,6 +358,8 @@ class PeerNetwork: public MsgNetwork { void _pong_msg_cb(const conn_t &conn, uint16_t port); bool check_new_conn(const conn_t &conn, uint16_t port); void start_active_conn(const NetAddr &paddr); + static void tcall_reset_timeout(ConnPool::Worker *worker, + const conn_t &conn, double timeout); protected: ConnPool::Conn *create_conn() override { return new Conn(); } @@ -385,6 +387,8 @@ class PeerNetwork: public MsgNetwork { this->reg_handler(generic_bind(&PeerNetwork::msg_pong, this, _1, _2)); } + ~PeerNetwork() { this->stop(); } + void add_peer(const NetAddr &paddr); const conn_t get_peer_conn(const NetAddr &paddr) const; using MsgNet::send_msg; @@ -445,15 +449,28 @@ void MsgNetwork::send_msg(const MsgType &_msg, Conn &conn) { #endif } +template +void PeerNetwork::tcall_reset_timeout(ConnPool::Worker *worker, + const conn_t &conn, double timeout) { + worker->get_tcall()->call([conn, t=timeout](ThreadCall::Handle &) { + assert(conn->ev_timeout); + conn->ev_timeout.del(); + conn->ev_timeout.add_with_timeout(t, 0); + SALTICIDAE_LOG_INFO("reset timeout %.2f", t); + }); +} + /* begin: functions invoked by the dispatcher */ template void PeerNetwork::Conn::on_setup() { MsgNet::Conn::on_setup(); auto pn = get_net(); + auto conn = static_pointer_cast(this->self()); + auto worker = this->worker; assert(!ev_timeout); - ev_timeout = Event(pn->dispatcher_ec, -1, 0, [this](int, int) { + ev_timeout = Event(worker->get_ec(), -1, [conn](int, int) { SALTICIDAE_LOG_INFO("peer ping-pong timeout"); - this->terminate(); + conn->terminate(); }); if (this->get_mode() == Conn::ConnMode::ACTIVE) { @@ -461,9 +478,8 @@ void PeerNetwork::Conn::on_setup() { if (pn->id_mode == IP_BASED) peer_id.port = 0; } /* the initial ping-pong to set up the connection */ - auto &conn = static_cast(*this); - reset_timeout(pn->conn_timeout); - pn->send_msg(MsgPing(pn->listen_port), conn); + tcall_reset_timeout(worker, conn, pn->conn_timeout); + pn->send_msg(MsgPing(pn->listen_port), *conn); } template @@ -481,11 +497,11 @@ void PeerNetwork::Conn::on_teardown() { std::string(*this).c_str(), std::string(peer_id).c_str()); // try to reconnect - p->ev_retry_timer = Event(pn->dispatcher_ec, -1, 0, + p->ev_retry_timer = Event(pn->disp_ec, -1, [pn, peer_id = this->peer_id](int, int) { pn->start_active_conn(peer_id); }); - p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout()); + p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout(), 0); } template @@ -497,7 +513,7 @@ void PeerNetwork::Peer::reset_conn(conn_t new_conn) { //SALTICIDAE_LOG_DEBUG("moving send buffer"); //new_conn->move_send_buffer(conn); SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str()); - conn->terminate(); + conn->cpool->terminate(conn); } addr = new_conn->get_addr(); conn = new_conn; @@ -505,20 +521,12 @@ void PeerNetwork::Peer::reset_conn(conn_t new_conn) { clear_all_events(); } -template -void PeerNetwork::Conn::reset_timeout(double timeout) { - assert(ev_timeout); - ev_timeout.del(); - ev_timeout.add_with_timeout(timeout); - SALTICIDAE_LOG_INFO("reset timeout %.2f", timeout); -} - template void PeerNetwork::Peer::reset_ping_timer() { assert(ev_ping_timer); ev_ping_timer.del(); ev_ping_timer.add_with_timeout( - gen_rand_timeout(conn->get_net()->ping_period)); + gen_rand_timeout(conn->get_net()->ping_period), 0); } template @@ -526,7 +534,7 @@ void PeerNetwork::Peer::send_ping() { auto pn = conn->get_net(); ping_timer_ok = false; pong_msg_ok = false; - conn->reset_timeout(pn->conn_timeout); + tcall_reset_timeout(conn->worker, conn, pn->conn_timeout); pn->send_msg(MsgPing(pn->listen_port), *conn); } @@ -554,7 +562,7 @@ bool PeerNetwork::check_new_conn(const conn_t &conn, uint16_t port) { { if (conn != p->conn) { - conn->terminate(); + conn->cpool->terminate(conn); return true; } return false; @@ -631,7 +639,7 @@ void PeerNetwork::add_peer(const NetAddr &addr) { auto it = id2peer.find(addr); if (it != id2peer.end()) throw PeerNetworkError("peer already exists"); - id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->dispatcher_ec))); + id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->disp_ec))); start_active_conn(addr); }, true); } @@ -646,6 +654,9 @@ PeerNetwork::get_peer_conn(const NetAddr &paddr) const { throw PeerNetworkError("peer does not exist"); auto ptr = new conn_t(it->second->conn); h.set_result(ptr); + h.set_deleter([](void *data) { + delete static_cast(data); + }); })); auto conn = *ret; delete ret; @@ -657,6 +668,9 @@ bool PeerNetwork::has_peer(const NetAddr &paddr) const { auto ret = static_cast(this->disp_tcall->call( [this, paddr](ThreadCall::Handle &h) { h.set_result(id2peer.count(paddr)); + h.set_deleter([](void *data) { + delete static_cast(data); + }); })); auto has = *ret; delete ret; @@ -699,6 +713,9 @@ void ClientNetwork::send_msg(const MsgType &msg, const NetAddr &addr throw PeerNetworkError("client does not exist"); auto ptr = new conn_t(it->second->conn); h.set_result(ptr); + h.set_deleter([](void *data) { + delete static_cast(data); + }); })); send_msg(msg, **ret); delete ret; -- cgit v1.2.3