From ecc163f98e434b557768560d00ee2f9755d6d950 Mon Sep 17 00:00:00 2001 From: Determinant Date: Wed, 14 Nov 2018 22:18:59 -0500 Subject: major bug fix --- include/salticidae/conn.h | 91 ++++++++++++++++++++++++++------------------ include/salticidae/event.h | 67 +++++++++++++++++++++++--------- include/salticidae/network.h | 63 +++++++++++++++++++----------- src/conn.cpp | 37 +++++++++--------- test/bench_network.cpp | 12 +++--- test/test_network.cpp | 25 ++++++++---- test/test_p2p.cpp | 25 ++++++++---- test/test_queue.cpp | 5 +-- 8 files changed, 201 insertions(+), 124 deletions(-) diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 2ef6b50..73b3022 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -57,13 +57,13 @@ struct ConnPoolError: public SalticidaeError { /** Abstraction for connection management. */ class ConnPool { + class Worker; public: class Conn; /** The handle to a bi-directional connection. */ using conn_t = ArcObj; /** The type of callback invoked when connection status is changed. */ using conn_callback_t = std::function; - /** Abstraction for a bi-directional connection. */ class Conn { friend ConnPool; @@ -77,17 +77,16 @@ class ConnPool { size_t seg_buff_size; conn_t self_ref; int fd; + Worker *worker; ConnPool *cpool; ConnMode mode; NetAddr addr; - // TODO: send_buffer should be a thread-safe mpsc queue MPSCWriteBuffer send_buffer; SegBuffer recv_buffer; - Event ev_read; - Event ev_write; Event ev_connect; + Event ev_socket; /** does not need to wait if true */ bool ready_send; @@ -95,8 +94,6 @@ class ConnPool { void send_data(int, int); void conn_server(int, int); - /** Stop the worker and I/O events. */ - void stop(); /** Terminate the connection. */ void terminate(); @@ -125,10 +122,11 @@ class ConnPool { protected: /** Close the IO and clear all on-going or planned events. */ - virtual void on_close() { - ev_read.clear(); - ev_write.clear(); + virtual void stop() { + if (fd == -1) return; ev_connect.clear(); + ev_socket.clear(); + send_buffer.get_queue().unreg_handler(); ::close(fd); fd = -1; self_ref = nullptr; /* remove the self-cycle */ @@ -142,6 +140,13 @@ class ConnPool { virtual void on_teardown() {} }; + protected: + EventContext ec; + EventContext disp_ec; + ThreadCall* disp_tcall; + /** Should be implemented by derived class to return a new Conn object. */ + virtual Conn *create_conn() = 0; + private: const int max_listen_backlog; const double conn_server_timeout; @@ -164,11 +169,11 @@ class ConnPool { class Worker { EventContext ec; - ThreadCall msgr; + ThreadCall tcall; std::thread handle; public: - Worker(): msgr(ec) {} + Worker(): tcall(ec) {} /* the following functions are called by the dispatcher */ void start() { @@ -176,7 +181,7 @@ class ConnPool { } void feed(const conn_t &conn, int client_fd) { - msgr.call([this, conn, client_fd](ThreadCall::Handle &) { + tcall.call([this, conn, client_fd](ThreadCall::Handle &) { SALTICIDAE_LOG_INFO("worker %x got %s", std::this_thread::get_id(), std::string(*conn).c_str()); @@ -185,11 +190,15 @@ class ConnPool { .reg_handler(this->ec, [conn, client_fd] (MPSCWriteBuffer::queue_t &) { if (conn->ready_send && conn->fd != -1) + { + conn->ev_socket.del(); + conn->ev_socket.add(Event::READ | Event::WRITE); conn->send_data(client_fd, Event::WRITE); + } return false; }); //auto conn_ptr = conn.get(); - conn->ev_read = Event(ec, client_fd, Event::READ | Event::WRITE, [conn=conn](int fd, int what) { + conn->ev_socket = Event(ec, client_fd, [conn=conn](int fd, int what) { if (what & Event::READ) conn->recv_data(fd, what); else @@ -199,19 +208,18 @@ class ConnPool { // std::bind(&Conn::recv_data, conn_ptr, _1, _2)); //conn->ev_write = Event(ec, client_fd, Event::WRITE, // std::bind(&Conn::send_data, conn_ptr, _1, _2)); - conn->ev_read.add(); + conn->ev_socket.add(Event::READ | Event::WRITE); //conn->ev_write.add(); }); } void stop() { - msgr.call([this](ThreadCall::Handle &) { - ec.stop(); - }); + tcall.call([this](ThreadCall::Handle &) { ec.stop(); }); } std::thread &get_handle() { return handle; } const EventContext &get_ec() { return ec; } + ThreadCall *get_tcall() { return &tcall; } }; /* related to workers */ @@ -220,7 +228,7 @@ class ConnPool { void accept_client(int, int); conn_t add_conn(const conn_t &conn); - void terminate(int fd); + void remove_conn(int fd); protected: conn_t _connect(const NetAddr &addr); @@ -244,43 +252,30 @@ class ConnPool { return workers[1]; } - protected: - EventContext ec; - EventContext dispatcher_ec; - BoxObj disp_tcall; - /** Should be implemented by derived class to return a new Conn object. */ - virtual Conn *create_conn() = 0; - public: ConnPool(const EventContext &ec, int max_listen_backlog = 10, double conn_server_timeout = 2, size_t seg_buff_size = 4096, size_t nworker = 2): + ec(ec), max_listen_backlog(max_listen_backlog), conn_server_timeout(conn_server_timeout), seg_buff_size(seg_buff_size), listen_fd(-1), - nworker(std::max((size_t)1, nworker)), - ec(ec) { + nworker(std::max((size_t)1, nworker)) { workers = new Worker[nworker]; - dispatcher_ec = workers[0].get_ec(); - user_tcall = new ThreadCall(ec); - disp_tcall = new ThreadCall(dispatcher_ec); + disp_ec = workers[0].get_ec(); + disp_tcall = workers[0].get_tcall(); } ~ConnPool() { - /* stop all workers */ - for (size_t i = 0; i < nworker; i++) - workers[i].stop(); - /* join all worker threads */ - for (size_t i = 0; i < nworker; i++) - workers[i].get_handle().join(); + stop(); for (auto it: pool) { conn_t conn = it.second; - conn->on_close(); + conn->stop(); } if (listen_fd != -1) close(listen_fd); } @@ -294,6 +289,17 @@ class ConnPool { workers[i].start(); } + void stop() { + SALTICIDAE_LOG_INFO("stopping all threads..."); + /* stop all workers */ + for (size_t i = 0; i < nworker; i++) + workers[i].stop(); + /* join all worker threads */ + for (size_t i = 0; i < nworker; i++) + workers[i].get_handle().join(); + nworker = 0; + } + /** Actively connect to remote addr. */ conn_t connect(const NetAddr &addr, bool blocking = true) { if (blocking) @@ -303,6 +309,9 @@ class ConnPool { auto ptr = new conn_t(_connect(addr)); std::atomic_thread_fence(std::memory_order_release); h.set_result(ptr); + h.set_deleter([](void *data) { + delete static_cast(data); + }); }, true)); auto conn = *ret; delete ret; @@ -328,6 +337,14 @@ class ConnPool { template void reg_conn_handler(Func cb) { conn_cb = cb; } + + void terminate(const conn_t &conn, bool blocking = true) { + int fd = conn->fd; + conn->worker->get_tcall()->call([conn](ThreadCall::Handle &) { + conn->stop(); + }, blocking); + remove_conn(fd); + } }; } diff --git a/include/salticidae/event.h b/include/salticidae/event.h index da27902..616f598 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -42,7 +42,8 @@ struct _event_context_deleter { void operator()(uv_loop_t *ptr) { if (ptr != nullptr) { - uv_loop_close(ptr); + while (uv_loop_close(ptr) == UV_EBUSY) + uv_run(ptr, UV_RUN_NOWAIT); delete ptr; } } @@ -62,8 +63,7 @@ class EventContext: public _event_context_ot { EventContext &operator=(EventContext &&) = default; void dispatch() const { // TODO: improve this loop - for (;;) - uv_run(get(), UV_RUN_ONCE); + uv_run(get(), UV_RUN_DEFAULT); } void stop() const { uv_stop(get()); } }; @@ -79,12 +79,15 @@ class Event { private: EventContext eb; int fd; - int events; uv_poll_t *ev_fd; uv_timer_t *ev_timer; callback_t callback; static inline void fd_then(uv_poll_t *h, int status, int events) { - assert(status == 0); + if (status != 0) + { + SALTICIDAE_LOG_WARN("%s", uv_strerror(status)); + return; + } auto event = static_cast(h->data); event->callback(event->fd, events); } @@ -95,10 +98,14 @@ class Event { event->callback(event->fd, TIMEOUT); } + static void _on_handle_close(uv_handle_t *h) { + delete h; + } + public: Event(): eb(nullptr), ev_fd(nullptr), ev_timer(nullptr) {} - Event(const EventContext &eb, int fd, short events, callback_t callback): - eb(eb), fd(fd), events(events), + Event(const EventContext &eb, int fd, callback_t callback): + eb(eb), fd(fd), ev_fd(nullptr), ev_timer(new uv_timer_t()), callback(callback) { @@ -114,7 +121,7 @@ class Event { Event(const Event &) = delete; Event(Event &&other): - eb(std::move(other.eb)), fd(other.fd), events(other.events), + eb(std::move(other.eb)), fd(other.fd), ev_fd(other.ev_fd), ev_timer(other.ev_timer), callback(std::move(other.callback)) { other.del(); @@ -132,7 +139,6 @@ class Event { other.del(); eb = std::move(other.eb); fd = other.fd; - events = other.events; ev_fd = other.ev_fd; ev_timer = other.ev_timer; callback = std::move(other.callback); @@ -153,26 +159,33 @@ class Event { if (ev_fd != nullptr) { uv_poll_stop(ev_fd); - delete ev_fd; + uv_close((uv_handle_t *)ev_fd, Event::_on_handle_close); ev_fd = nullptr; } if (ev_timer != nullptr) { uv_timer_stop(ev_timer); - delete ev_timer; + uv_close((uv_handle_t *)ev_timer, Event::_on_handle_close); ev_timer = nullptr; } + callback = nullptr; + } + + void set_callback(callback_t _callback) { + callback = _callback; } - void add() { + void add(int events) { if (ev_fd) uv_poll_start(ev_fd, events, Event::fd_then); } void del() { if (ev_fd) uv_poll_stop(ev_fd); + if (ev_timer == nullptr) + assert(ev_timer); uv_timer_stop(ev_timer); } - void add_with_timeout(double t_sec) { - add(); + void add_with_timeout(double t_sec, int events) { + add(events); uv_timer_start(ev_timer, Event::timer_then, uint64_t(t_sec * 1000), 0); } @@ -209,6 +222,7 @@ class ThreadCall { public: class Handle { std::function callback; + std::function deleter; ThreadNotifier* notifier; void *result; friend ThreadCall; @@ -219,6 +233,8 @@ class ThreadCall { if (notifier) notifier->notify(result); } void set_result(void *data) { result = data; } + template + void set_deleter(Func _deleter) { deleter = _deleter; } }; ThreadCall() = default; @@ -227,16 +243,24 @@ class ThreadCall { ThreadCall(EventContext ec): ec(ec) { if (pipe2(ctl_fd, O_NONBLOCK)) throw SalticidaeError(std::string("ThreadCall: failed to create pipe")); - ev_listen = Event(ec, ctl_fd[0], Event::READ, [this](int fd, int) { + ev_listen = Event(ec, ctl_fd[0], [this](int fd, int) { Handle *h; read(fd, &h, sizeof(h)); h->exec(); delete h; }); - ev_listen.add(); + ev_listen.add(Event::READ); } ~ThreadCall() { + ev_listen.clear(); + Handle *h; + while (read(ctl_fd[0], &h, sizeof(h)) == sizeof(h)) + { + if (h->result && h->deleter) + h->deleter(h->result); + delete h; + } close(ctl_fd[0]); close(ctl_fd[1]); } @@ -277,11 +301,14 @@ class MPSCQueueEventDriven: public MPSCQueue { wait_sig(true), fd(eventfd(0, EFD_NONBLOCK)) {} - ~MPSCQueueEventDriven() { close(fd); } + ~MPSCQueueEventDriven() { + ev.clear(); + close(fd); + } template void reg_handler(const EventContext &ec, Func &&func) { - ev = Event(ec, fd, Event::READ, + ev = Event(ec, fd, [this, func=std::forward(func)](int, short) { //fprintf(stderr, "%x\n", std::this_thread::get_id()); uint64_t t; @@ -296,9 +323,11 @@ class MPSCQueueEventDriven: public MPSCQueue { if (func(*this)) write(fd, &dummy, 8); }); - ev.add(); + ev.add(Event::READ); } + void unreg_handler() { ev.clear(); } + template bool enqueue(U &&e) { static const uint64_t dummy = 1; diff --git a/include/salticidae/network.h b/include/salticidae/network.h index e5165bf..a63976b 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -275,9 +275,9 @@ class PeerNetwork: public MsgNetwork { const NetAddr &get_peer() { return peer_id; } protected: - void on_close() override { + void stop() override { ev_timeout.clear(); - MsgNet::Conn::on_close(); + MsgNet::Conn::stop(); } void on_setup() override; @@ -302,7 +302,7 @@ class PeerNetwork: public MsgNetwork { Peer(NetAddr addr, conn_t conn, const EventContext &ec): addr(addr), conn(conn), ev_ping_timer( - Event(ec, -1, 0, std::bind(&Peer::ping_timer, this, _1, _2))), + Event(ec, -1, std::bind(&Peer::ping_timer, this, _1, _2))), connected(false) {} ~Peer() {} Peer &operator=(const Peer &) = delete; @@ -358,6 +358,8 @@ class PeerNetwork: public MsgNetwork { void _pong_msg_cb(const conn_t &conn, uint16_t port); bool check_new_conn(const conn_t &conn, uint16_t port); void start_active_conn(const NetAddr &paddr); + static void tcall_reset_timeout(ConnPool::Worker *worker, + const conn_t &conn, double timeout); protected: ConnPool::Conn *create_conn() override { return new Conn(); } @@ -385,6 +387,8 @@ class PeerNetwork: public MsgNetwork { this->reg_handler(generic_bind(&PeerNetwork::msg_pong, this, _1, _2)); } + ~PeerNetwork() { this->stop(); } + void add_peer(const NetAddr &paddr); const conn_t get_peer_conn(const NetAddr &paddr) const; using MsgNet::send_msg; @@ -445,15 +449,28 @@ void MsgNetwork::send_msg(const MsgType &_msg, Conn &conn) { #endif } +template +void PeerNetwork::tcall_reset_timeout(ConnPool::Worker *worker, + const conn_t &conn, double timeout) { + worker->get_tcall()->call([conn, t=timeout](ThreadCall::Handle &) { + assert(conn->ev_timeout); + conn->ev_timeout.del(); + conn->ev_timeout.add_with_timeout(t, 0); + SALTICIDAE_LOG_INFO("reset timeout %.2f", t); + }); +} + /* begin: functions invoked by the dispatcher */ template void PeerNetwork::Conn::on_setup() { MsgNet::Conn::on_setup(); auto pn = get_net(); + auto conn = static_pointer_cast(this->self()); + auto worker = this->worker; assert(!ev_timeout); - ev_timeout = Event(pn->dispatcher_ec, -1, 0, [this](int, int) { + ev_timeout = Event(worker->get_ec(), -1, [conn](int, int) { SALTICIDAE_LOG_INFO("peer ping-pong timeout"); - this->terminate(); + conn->terminate(); }); if (this->get_mode() == Conn::ConnMode::ACTIVE) { @@ -461,9 +478,8 @@ void PeerNetwork::Conn::on_setup() { if (pn->id_mode == IP_BASED) peer_id.port = 0; } /* the initial ping-pong to set up the connection */ - auto &conn = static_cast(*this); - reset_timeout(pn->conn_timeout); - pn->send_msg(MsgPing(pn->listen_port), conn); + tcall_reset_timeout(worker, conn, pn->conn_timeout); + pn->send_msg(MsgPing(pn->listen_port), *conn); } template @@ -481,11 +497,11 @@ void PeerNetwork::Conn::on_teardown() { std::string(*this).c_str(), std::string(peer_id).c_str()); // try to reconnect - p->ev_retry_timer = Event(pn->dispatcher_ec, -1, 0, + p->ev_retry_timer = Event(pn->disp_ec, -1, [pn, peer_id = this->peer_id](int, int) { pn->start_active_conn(peer_id); }); - p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout()); + p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout(), 0); } template @@ -497,7 +513,7 @@ void PeerNetwork::Peer::reset_conn(conn_t new_conn) { //SALTICIDAE_LOG_DEBUG("moving send buffer"); //new_conn->move_send_buffer(conn); SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str()); - conn->terminate(); + conn->cpool->terminate(conn); } addr = new_conn->get_addr(); conn = new_conn; @@ -505,20 +521,12 @@ void PeerNetwork::Peer::reset_conn(conn_t new_conn) { clear_all_events(); } -template -void PeerNetwork::Conn::reset_timeout(double timeout) { - assert(ev_timeout); - ev_timeout.del(); - ev_timeout.add_with_timeout(timeout); - SALTICIDAE_LOG_INFO("reset timeout %.2f", timeout); -} - template void PeerNetwork::Peer::reset_ping_timer() { assert(ev_ping_timer); ev_ping_timer.del(); ev_ping_timer.add_with_timeout( - gen_rand_timeout(conn->get_net()->ping_period)); + gen_rand_timeout(conn->get_net()->ping_period), 0); } template @@ -526,7 +534,7 @@ void PeerNetwork::Peer::send_ping() { auto pn = conn->get_net(); ping_timer_ok = false; pong_msg_ok = false; - conn->reset_timeout(pn->conn_timeout); + tcall_reset_timeout(conn->worker, conn, pn->conn_timeout); pn->send_msg(MsgPing(pn->listen_port), *conn); } @@ -554,7 +562,7 @@ bool PeerNetwork::check_new_conn(const conn_t &conn, uint16_t port) { { if (conn != p->conn) { - conn->terminate(); + conn->cpool->terminate(conn); return true; } return false; @@ -631,7 +639,7 @@ void PeerNetwork::add_peer(const NetAddr &addr) { auto it = id2peer.find(addr); if (it != id2peer.end()) throw PeerNetworkError("peer already exists"); - id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->dispatcher_ec))); + id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->disp_ec))); start_active_conn(addr); }, true); } @@ -646,6 +654,9 @@ PeerNetwork::get_peer_conn(const NetAddr &paddr) const { throw PeerNetworkError("peer does not exist"); auto ptr = new conn_t(it->second->conn); h.set_result(ptr); + h.set_deleter([](void *data) { + delete static_cast(data); + }); })); auto conn = *ret; delete ret; @@ -657,6 +668,9 @@ bool PeerNetwork::has_peer(const NetAddr &paddr) const { auto ret = static_cast(this->disp_tcall->call( [this, paddr](ThreadCall::Handle &h) { h.set_result(id2peer.count(paddr)); + h.set_deleter([](void *data) { + delete static_cast(data); + }); })); auto has = *ret; delete ret; @@ -699,6 +713,9 @@ void ClientNetwork::send_msg(const MsgType &msg, const NetAddr &addr throw PeerNetworkError("client does not exist"); auto ptr = new conn_t(it->second->conn); h.set_result(ptr); + h.set_deleter([](void *data) { + delete static_cast(data); + }); })); send_msg(msg, **ret); delete ret; diff --git a/src/conn.cpp b/src/conn.cpp index 6b2e3aa..da8086c 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -81,6 +81,8 @@ void ConnPool::Conn::send_data(int fd, int events) { return; } } + ev_socket.del(); + ev_socket.add(Event::READ); /* consumed the buffer but endpoint still seems to be writable */ ready_send = true; } @@ -116,20 +118,14 @@ void ConnPool::Conn::recv_data(int fd, int events) { on_read(); } -void ConnPool::Conn::stop() { - ev_read.clear(); - ev_write.clear(); -} - void ConnPool::Conn::terminate() { stop(); cpool->disp_tcall->call( [cpool=this->cpool, fd=this->fd](ThreadCall::Handle &) { - cpool->terminate(fd); + cpool->remove_conn(fd); }); } - void ConnPool::accept_client(int fd, int) { int client_fd; struct sockaddr client_addr; @@ -154,9 +150,11 @@ void ConnPool::accept_client(int fd, int) { conn->addr = addr; add_conn(conn); SALTICIDAE_LOG_INFO("accepted %s", std::string(*conn).c_str()); + auto &worker = select_worker(); + conn->worker = &worker; conn->on_setup(); update_conn(conn, true); - select_worker().feed(conn, client_fd); + worker.feed(conn, client_fd); } } @@ -166,15 +164,16 @@ void ConnPool::Conn::conn_server(int fd, int events) { { ev_connect.clear(); SALTICIDAE_LOG_INFO("connected to remote %s", std::string(*this).c_str()); + worker = &(cpool->select_worker()); on_setup(); cpool->update_conn(conn, true); - cpool->select_worker().feed(conn, fd); + worker->feed(conn, fd); } else { if (events & Event::TIMEOUT) SALTICIDAE_LOG_INFO("%s connect timeout", std::string(*this).c_str()); - terminate(); + stop(); return; } } @@ -204,9 +203,9 @@ void ConnPool::_listen(NetAddr listen_addr) { throw ConnPoolError(std::string("binding error")); if (::listen(listen_fd, max_listen_backlog) < 0) throw ConnPoolError(std::string("listen error")); - ev_listen = Event(dispatcher_ec, listen_fd, Event::READ, + ev_listen = Event(disp_ec, listen_fd, std::bind(&ConnPool::accept_client, this, _1, _2)); - ev_listen.add(); + ev_listen.add(Event::READ); SALTICIDAE_LOG_INFO("listening to %u", ntohs(listen_addr.port)); } @@ -238,28 +237,26 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) { sizeof(struct sockaddr_in)) < 0 && errno != EINPROGRESS) { SALTICIDAE_LOG_INFO("cannot connect to %s", std::string(addr).c_str()); - conn->terminate(); + conn->stop(); } else { - conn->ev_connect = Event(dispatcher_ec, conn->fd, Event::WRITE, - std::bind(&Conn::conn_server, conn.get(), _1, _2)); - conn->ev_connect.add_with_timeout(conn_server_timeout); + conn->ev_connect = Event(disp_ec, conn->fd, std::bind(&Conn::conn_server, conn.get(), _1, _2)); + conn->ev_connect.add_with_timeout(conn_server_timeout, Event::WRITE); add_conn(conn); SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str()); } return conn; } -void ConnPool::terminate(int fd) { +void ConnPool::remove_conn(int fd) { auto it = pool.find(fd); if (it != pool.end()) { /* temporarily pin the conn before it dies */ auto conn = it->second; - assert(conn->fd == fd); + //assert(conn->fd == fd); pool.erase(it); - conn->on_close(); /* inform the upper layer the connection will be destroyed */ conn->on_teardown(); update_conn(conn, false); @@ -267,7 +264,7 @@ void ConnPool::terminate(int fd) { } ConnPool::conn_t ConnPool::add_conn(const conn_t &conn) { - assert(pool.find(conn->fd) == pool.end()); + //assert(pool.find(conn->fd) == pool.end()); return pool.insert(std::make_pair(conn->fd, conn)).first->second; } diff --git a/test/bench_network.cpp b/test/bench_network.cpp index 8ff9ab2..40ba17a 100644 --- a/test/bench_network.cpp +++ b/test/bench_network.cpp @@ -82,17 +82,17 @@ struct MyNet: public MsgNetworkByteOp { MsgNetwork(ec, 10, 1.0, 4096), name(name), peer(peer), - ev_period_stat(ec, -1, 0, [this, stat_timeout](int, short) { + ev_period_stat(ec, -1, [this, stat_timeout](int, short) { printf("%.2f mps\n", nrecv / (double)stat_timeout); nrecv = 0; - ev_period_stat.add_with_timeout(stat_timeout); + ev_period_stat.add_with_timeout(stat_timeout, 0); }), nrecv(0) { /* message handler could be a bound method */ reg_handler(salticidae::generic_bind( &MyNet::on_receive_bytes, this, _1, _2)); if (stat_timeout > 0) - ev_period_stat.add_with_timeout(0); + ev_period_stat.add_with_timeout(0, 0); } struct Conn: public MsgNetworkByteOp::Conn { @@ -109,12 +109,12 @@ struct MyNet: public MsgNetworkByteOp { printf("[%s] Connected, sending hello.\n", net->name.c_str()); /* send the first message through this connection */ - net->ev_period_send = Event(net->ec, -1, 0, + net->ev_period_send = Event(net->ec, -1, [net, conn = self()](int, short) { net->send_msg(MsgBytes(256), *conn); - net->ev_period_send.add_with_timeout(0); + net->ev_period_send.add_with_timeout(0, 0); }); - net->ev_period_send.add_with_timeout(0); + net->ev_period_send.add_with_timeout(0, 0); } else diff --git a/test/test_network.cpp b/test/test_network.cpp index d93d0ff..d52b6c2 100644 --- a/test/test_network.cpp +++ b/test/test_network.cpp @@ -132,7 +132,14 @@ salticidae::EventContext ec; NetAddr alice_addr("127.0.0.1:12345"); NetAddr bob_addr("127.0.0.1:12346"); +void signal_handler(int) { + throw salticidae::SalticidaeError("got termination signal"); +} + int main() { + signal(SIGTERM, signal_handler); + signal(SIGINT, signal_handler); + /* test two nodes */ MyNet alice(ec, "Alice", bob_addr); MyNet bob(ec, "Bob", alice_addr); @@ -141,16 +148,18 @@ int main() { alice.reg_handler(on_receive_ack); bob.reg_handler(on_receive_ack); - alice.start(); - bob.start(); + try { + alice.start(); + bob.start(); - alice.listen(alice_addr); - bob.listen(bob_addr); + alice.listen(alice_addr); + bob.listen(bob_addr); - /* first attempt */ - alice.connect(bob_addr); - bob.connect(alice_addr); + /* first attempt */ + alice.connect(bob_addr); + bob.connect(alice_addr); - ec.dispatch(); + ec.dispatch(); + } catch (salticidae::SalticidaeError &e) {} return 0; } diff --git a/test/test_p2p.cpp b/test/test_p2p.cpp index f52f48f..558be5c 100644 --- a/test/test_p2p.cpp +++ b/test/test_p2p.cpp @@ -130,7 +130,14 @@ salticidae::EventContext ec; NetAddr alice_addr("127.0.0.1:12345"); NetAddr bob_addr("127.0.0.1:12346"); +void signal_handler(int) { + throw salticidae::SalticidaeError("got termination signal"); +} + int main() { + signal(SIGTERM, signal_handler); + signal(SIGINT, signal_handler); + /* test two nodes */ MyNet alice(ec, "Alice", bob_addr); MyNet bob(ec, "Bob", alice_addr); @@ -139,16 +146,18 @@ int main() { alice.reg_handler(on_receive_ack); bob.reg_handler(on_receive_ack); - alice.start(); - bob.start(); + try { + alice.start(); + bob.start(); - alice.listen(alice_addr); - bob.listen(bob_addr); + alice.listen(alice_addr); + bob.listen(bob_addr); - /* first attempt */ - alice.add_peer(bob_addr); - bob.add_peer(alice_addr); + /* first attempt */ + alice.add_peer(bob_addr); + bob.add_peer(alice_addr); - ec.dispatch(); + ec.dispatch(); + } catch (salticidae::SalticidaeError &e) {} return 0; } diff --git a/test/test_queue.cpp b/test/test_queue.cpp index a2444d3..5a7b548 100644 --- a/test/test_queue.cpp +++ b/test/test_queue.cpp @@ -23,11 +23,10 @@ void test_mpsc(int nproducers = 16, int nops = 100000, size_t burst_size = 128) }); std::vector producers; std::thread consumer([&collected, total, &ec]() { - salticidae::Event timer(ec, -1, EV_TIMEOUT | EV_PERSIST, - [&ec, &collected, total](int, short) { + salticidae::Event timer(ec, -1, [&ec, &collected, total](int, short) { if (collected.load() == total) ec.stop(); }); - timer.add_with_timeout(1); + timer.add_with_timeout(1, EV_TIMEOUT | EV_PERSIST); ec.dispatch(); }); for (int i = 0; i < nproducers; i++) -- cgit v1.2.3-70-g09d2