diff options
author | Determinant <[email protected]> | 2018-11-20 20:43:57 -0500 |
---|---|---|
committer | Determinant <[email protected]> | 2018-11-20 20:43:57 -0500 |
commit | d2fe5eb74bdf40afc5cacd052f40b56aa3e57eaf (patch) | |
tree | 30e042013d87d10cf9e0db95fd9398878b305915 | |
parent | 60c4af2602e18933f2b795500f44c6613c852f45 (diff) |
refactor libuv wrapper classes
-rw-r--r-- | include/salticidae/conn.h | 16 | ||||
-rw-r--r-- | include/salticidae/event.h | 307 | ||||
-rw-r--r-- | include/salticidae/network.h | 25 | ||||
-rw-r--r-- | src/conn.cpp | 19 | ||||
-rw-r--r-- | src/util.cpp | 7 | ||||
-rw-r--r-- | test/bench_network.cpp | 20 | ||||
-rw-r--r-- | test/test_p2p_stress.cpp | 8 | ||||
-rw-r--r-- | test/test_queue.cpp | 6 |
8 files changed, 265 insertions, 143 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index a60cd8f..e7d6295 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -86,8 +86,8 @@ class ConnPool { MPSCWriteBuffer send_buffer; SegBuffer recv_buffer; - Event ev_connect; - Event ev_socket; + TimedFdEvent ev_connect; + FdEvent ev_socket; /** does not need to wait if true */ bool ready_send; @@ -161,7 +161,7 @@ class ConnPool { conn_callback_t conn_cb; /* owned by the dispatcher */ - Event ev_listen; + FdEvent ev_listen; std::unordered_map<int, conn_t> pool; int listen_fd; /**< for accepting new network connections */ @@ -207,18 +207,18 @@ class ConnPool { if (conn->ready_send) { conn->ev_socket.del(); - conn->ev_socket.add(Event::READ | Event::WRITE); - conn->send_data(client_fd, Event::WRITE); + conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE); + conn->send_data(client_fd, FdEvent::WRITE); } return false; }); - conn->ev_socket = Event(ec, client_fd, [conn=conn](int fd, int what) { - if (what & Event::READ) + conn->ev_socket = FdEvent(ec, client_fd, [conn=conn](int fd, int what) { + if (what & FdEvent::READ) conn->recv_data(fd, what); else conn->send_data(fd, what); }); - conn->ev_socket.add(Event::READ | Event::WRITE); + conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE); nconn++; }); } diff --git a/include/salticidae/event.h b/include/salticidae/event.h index ca48e69..2365a71 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -68,101 +68,153 @@ class EventContext: public _event_context_ot { void stop() const { uv_stop(get()); } }; -class Event { +static void _on_uv_handle_close(uv_handle_t *h) { delete h; } + +class FdEvent { public: using callback_t = std::function<void(int fd, int events)>; static const int READ = UV_READABLE; static const int WRITE = UV_WRITABLE; static const int ERROR = 1 << 30; - static const int TIMEOUT = 1 << 29; - private: - EventContext eb; + protected: + EventContext ec; int fd; uv_poll_t *ev_fd; - uv_timer_t *ev_timer; callback_t callback; + static inline void fd_then(uv_poll_t *h, int status, int events) { if (status != 0) events |= ERROR; - auto event = static_cast<Event *>(h->data); + auto event = static_cast<FdEvent *>(h->data); event->callback(event->fd, events); } - static inline void timer_then(uv_timer_t *h) { - auto event = static_cast<Event *>(h->data); - if (event->ev_fd) uv_poll_stop(event->ev_fd); - event->callback(event->fd, TIMEOUT); + public: + FdEvent(): ec(nullptr), ev_fd(nullptr) {} + FdEvent(const EventContext &ec, int fd, callback_t callback): + ec(ec), fd(fd), ev_fd(new uv_poll_t()), + callback(callback) { + uv_poll_init(ec.get(), ev_fd, fd); + ev_fd->data = this; } - static void _on_handle_close(uv_handle_t *h) { - delete h; + FdEvent(const FdEvent &) = delete; + FdEvent(FdEvent &&other): + ec(std::move(other.ec)), fd(other.fd), ev_fd(other.ev_fd), + callback(std::move(other.callback)) { + other.ev_fd = nullptr; + if (ev_fd != nullptr) + ev_fd->data = this; + } + + void swap(FdEvent &other) { + std::swap(ec, other.ec); + std::swap(fd, other.fd); + std::swap(ev_fd, other.ev_fd); + std::swap(callback, other.callback); + if (ev_fd != nullptr) + ev_fd->data = this; + if (other.ev_fd != nullptr) + other.ev_fd->data = &other; } - public: - Event(): eb(nullptr), ev_fd(nullptr), ev_timer(nullptr) {} - Event(const EventContext &eb, int fd, callback_t callback): - eb(eb), fd(fd), - ev_fd(nullptr), - ev_timer(new uv_timer_t()), - callback(callback) { - if (fd != -1) + FdEvent &operator=(FdEvent &&other) { + if (this != &other) { - ev_fd = new uv_poll_t(); - uv_poll_init(eb.get(), ev_fd, fd); - ev_fd->data = this; + FdEvent tmp(std::move(other)); + tmp.swap(*this); } - uv_timer_init(eb.get(), ev_timer); - ev_timer->data = this; + return *this; } - Event(const Event &) = delete; - Event(Event &&other): - eb(std::move(other.eb)), fd(other.fd), - ev_fd(other.ev_fd), ev_timer(other.ev_timer), - callback(std::move(other.callback)) { - other.del(); + ~FdEvent() { clear(); } + + void clear() { if (ev_fd != nullptr) { - other.ev_fd = nullptr; - ev_fd->data = this; + uv_poll_stop(ev_fd); + uv_close((uv_handle_t *)ev_fd, _on_uv_handle_close); + ev_fd = nullptr; } - other.ev_timer = nullptr; + callback = nullptr; + } + + void set_callback(callback_t _callback) { + callback = _callback; + } + + void add(int events) { + assert(ev_fd != nullptr); + uv_poll_start(ev_fd, events, FdEvent::fd_then); + } + + void del() { + if (ev_fd != nullptr) uv_poll_stop(ev_fd); + } + + operator bool() const { return ev_fd != nullptr; } +}; + + +class TimerEvent { + public: + using callback_t = std::function<void(TimerEvent &)>; + + protected: + EventContext ec; + uv_timer_t *ev_timer; + callback_t callback; + + static inline void timer_then(uv_timer_t *h) { + auto event = static_cast<TimerEvent *>(h->data); + event->callback(*event); + } + + public: + TimerEvent(): ec(nullptr), ev_timer(nullptr) {} + TimerEvent(const EventContext &ec, callback_t callback): + ec(ec), ev_timer(new uv_timer_t()), + callback(callback) { + uv_timer_init(ec.get(), ev_timer); ev_timer->data = this; } - Event &operator=(Event &&other) { - clear(); - other.del(); - eb = std::move(other.eb); - fd = other.fd; - ev_fd = other.ev_fd; - ev_timer = other.ev_timer; - callback = std::move(other.callback); + TimerEvent(const TimerEvent &) = delete; + TimerEvent(TimerEvent &&other): + ec(std::move(other.ec)), ev_timer(other.ev_timer), + callback(std::move(other.callback)) { + other.ev_timer = nullptr; + if (ev_timer != nullptr) + ev_timer->data = this; + } - if (ev_fd != nullptr) + void swap(TimerEvent &other) { + std::swap(ec, other.ec); + std::swap(ev_timer, other.ev_timer); + std::swap(callback, other.callback); + if (ev_timer != nullptr) + ev_timer->data = this; + if (other.ev_timer != nullptr) + other.ev_timer->data = &other; + } + + TimerEvent &operator=(TimerEvent &&other) { + if (this != &other) { - other.ev_fd = nullptr; - ev_fd->data = this; + TimerEvent tmp(std::move(other)); + tmp.swap(*this); } - other.ev_timer = nullptr; - ev_timer->data = this; return *this; } - ~Event() { clear(); } + ~TimerEvent() { clear(); } void clear() { - if (ev_fd != nullptr) - { - uv_poll_stop(ev_fd); - uv_close((uv_handle_t *)ev_fd, Event::_on_handle_close); - ev_fd = nullptr; - } if (ev_timer != nullptr) { uv_timer_stop(ev_timer); - uv_close((uv_handle_t *)ev_timer, Event::_on_handle_close); + uv_close((uv_handle_t *)ev_timer, _on_uv_handle_close); ev_timer = nullptr; } callback = nullptr; @@ -172,28 +224,90 @@ class Event { callback = _callback; } - void add(int events) { - if (ev_fd) uv_poll_start(ev_fd, events, Event::fd_then); + void add(double t_sec) { + assert(ev_timer != nullptr); + uv_timer_start(ev_timer, TimerEvent::timer_then, uint64_t(t_sec * 1000), 0); } + void del() { - if (ev_fd) uv_poll_stop(ev_fd); - if (ev_timer == nullptr) - assert(ev_timer); - uv_timer_stop(ev_timer); + if (ev_timer != nullptr) uv_timer_stop(ev_timer); + } + + operator bool() const { return ev_timer != nullptr; } +}; + +class TimedFdEvent: public FdEvent, public TimerEvent { + public: + static const int TIMEOUT = 1 << 29; + + private: + static inline void timer_then(uv_timer_t *h) { + auto event = static_cast<TimedFdEvent *>(h->data); + event->FdEvent::del(); + event->FdEvent::callback(event->fd, TIMEOUT); } - void add_with_timeout(double t_sec, int events) { - add(events); - uv_timer_start(ev_timer, Event::timer_then, uint64_t(t_sec * 1000), 0); + + public: + TimedFdEvent() = default; + TimedFdEvent(const EventContext &ec, int fd, FdEvent::callback_t callback): + FdEvent(ec, fd, callback), + TimerEvent(ec, TimerEvent::callback_t()) { + ev_timer->data = this; } - operator bool() const { return ev_fd != nullptr || ev_timer != nullptr; } + TimedFdEvent(TimedFdEvent &&other): + FdEvent(static_cast<FdEvent &&>(other)), + TimerEvent(static_cast<TimerEvent &&>(other)) { + if (ev_timer != nullptr) + ev_timer->data = this; + } + + void swap(TimedFdEvent &other) { + std::swap(static_cast<FdEvent &>(*this), static_cast<FdEvent &>(other)); + std::swap(static_cast<TimerEvent &>(*this), static_cast<TimerEvent &>(other)); + if (ev_timer != nullptr) + ev_timer->data = this; + } + + TimedFdEvent &operator=(TimedFdEvent &&other) { + if (this != &other) + { + TimedFdEvent tmp(std::move(other)); + tmp.swap(*this); + } + return *this; + } + + void clear() { + TimerEvent::clear(); + FdEvent::clear(); + } + + using FdEvent::set_callback; + + void add(int events) = delete; + void add(double t_sec) = delete; + + void add(int events, double t_sec) { + assert(ev_fd != nullptr && ev_timer != nullptr); + uv_timer_start(ev_timer, TimedFdEvent::timer_then, + uint64_t(t_sec * 1000), 0); + FdEvent::add(events); + } + + void del() { + TimerEvent::del(); + FdEvent::del(); + } + + operator bool() const { return ev_fd != nullptr; } }; class SigEvent { public: using callback_t = std::function<void(int signum)>; private: - EventContext eb; + EventContext ec; uv_signal_t *ev_sig; callback_t callback; static inline void sig_then(uv_signal_t *h, int signum) { @@ -201,39 +315,40 @@ class SigEvent { event->callback(signum); } - static void _on_handle_close(uv_handle_t *h) { - delete h; - } - public: - SigEvent(): eb(nullptr), ev_sig(nullptr) {} - SigEvent(const EventContext &eb, callback_t callback): - eb(eb), - ev_sig(new uv_signal_t()), + SigEvent(): ec(nullptr), ev_sig(nullptr) {} + SigEvent(const EventContext &ec, callback_t callback): + ec(ec), ev_sig(new uv_signal_t()), callback(callback) { - uv_signal_init(eb.get(), ev_sig); + uv_signal_init(ec.get(), ev_sig); ev_sig->data = this; } SigEvent(const SigEvent &) = delete; SigEvent(SigEvent &&other): - eb(std::move(other.eb)), - ev_sig(other.ev_sig), + ec(std::move(other.ec)), ev_sig(other.ev_sig), callback(std::move(other.callback)) { - other.del(); other.ev_sig = nullptr; - ev_sig->data = this; + if (ev_sig != nullptr) + ev_sig->data = this; } - SigEvent &operator=(SigEvent &&other) { - clear(); - other.del(); - eb = std::move(other.eb); - ev_sig = other.ev_sig; - callback = std::move(other.callback); + void swap(SigEvent &other) { + std::swap(ec, other.ec); + std::swap(ev_sig, other.ev_sig); + std::swap(callback, other.callback); + if (ev_sig != nullptr) + ev_sig->data = this; + if (other.ev_sig != nullptr) + other.ev_sig->data = &other; + } - other.ev_sig = nullptr; - ev_sig->data = this; + SigEvent &operator=(SigEvent &&other) { + if (this != &other) + { + SigEvent tmp(std::move(other)); + tmp.swap(*this); + } return *this; } @@ -243,7 +358,7 @@ class SigEvent { if (ev_sig != nullptr) { uv_signal_stop(ev_sig); - uv_close((uv_handle_t *)ev_sig, SigEvent::_on_handle_close); + uv_close((uv_handle_t *)ev_sig, _on_uv_handle_close); ev_sig = nullptr; } callback = nullptr; @@ -254,15 +369,17 @@ class SigEvent { } void add(int signum) { + assert(ev_sig != nullptr); uv_signal_start(ev_sig, SigEvent::sig_then, signum); } void add_once(int signum) { + assert(ev_sig != nullptr); uv_signal_start_oneshot(ev_sig, SigEvent::sig_then, signum); } void del() { - uv_signal_stop(ev_sig); + if (ev_sig != nullptr) uv_signal_stop(ev_sig); } operator bool() const { return ev_sig != nullptr; } @@ -292,7 +409,7 @@ class ThreadNotifier { class ThreadCall { int ctl_fd[2]; EventContext ec; - Event ev_listen; + FdEvent ev_listen; public: struct Result { @@ -347,13 +464,13 @@ class ThreadCall { ThreadCall(EventContext ec): ec(ec) { if (pipe2(ctl_fd, O_NONBLOCK)) throw SalticidaeError(std::string("ThreadCall: failed to create pipe")); - ev_listen = Event(ec, ctl_fd[0], [this](int fd, int) { + ev_listen = FdEvent(ec, ctl_fd[0], [this](int fd, int) { Handle *h; read(fd, &h, sizeof(h)); h->exec(); delete h; }); - ev_listen.add(Event::READ); + ev_listen.add(FdEvent::READ); } ~ThreadCall() { @@ -394,7 +511,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> { const uint64_t dummy = 1; std::atomic<bool> wait_sig; int fd; - Event ev; + FdEvent ev; public: MPSCQueueEventDriven(size_t capacity = 65536): @@ -409,7 +526,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> { template<typename Func> void reg_handler(const EventContext &ec, Func &&func) { - ev = Event(ec, fd, + ev = FdEvent(ec, fd, [this, func=std::forward<Func>(func)](int, int) { //fprintf(stderr, "%x\n", std::this_thread::get_id()); uint64_t t; @@ -424,7 +541,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> { if (func(*this)) write(fd, &dummy, 8); }); - ev.add(Event::READ); + ev.add(FdEvent::READ); } void unreg_handler() { ev.clear(); } diff --git a/include/salticidae/network.h b/include/salticidae/network.h index f2b99b3..51effe0 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -267,7 +267,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { class Conn: public MsgNet::Conn { friend PeerNetwork; NetAddr peer_id; - Event ev_timeout; + TimerEvent ev_timeout; void reset_timeout(double timeout); public: @@ -297,8 +297,8 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { NetAddr addr; /** the underlying connection, may be invalid when connected = false */ conn_t conn; - Event ev_ping_timer; - Event ev_retry_timer; + TimerEvent ev_ping_timer; + TimerEvent ev_retry_timer; bool ping_timer_ok; bool pong_msg_ok; bool connected; @@ -307,13 +307,13 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { Peer(NetAddr addr, conn_t conn, const EventContext &ec): addr(addr), conn(conn), ev_ping_timer( - Event(ec, -1, std::bind(&Peer::ping_timer, this, _1, _2))), + TimerEvent(ec, std::bind(&Peer::ping_timer, this, _1))), connected(false) {} ~Peer() {} Peer &operator=(const Peer &) = delete; Peer(const Peer &) = delete; - void ping_timer(int, int); + void ping_timer(TimerEvent &); void reset_ping_timer(); void send_ping(); void clear_all_events() { @@ -491,7 +491,7 @@ void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker, worker->get_tcall()->async_call([conn, t=timeout](ThreadCall::Handle &) { if (!conn->ev_timeout) return; conn->ev_timeout.del(); - conn->ev_timeout.add_with_timeout(t, 0); + conn->ev_timeout.add(t); SALTICIDAE_LOG_DEBUG("reset connection timeout %.2f", t); }); } @@ -504,7 +504,7 @@ void PeerNetwork<O, _, __>::Conn::on_setup() { auto conn = static_pointer_cast<Conn>(this->self()); auto worker = this->worker; assert(!ev_timeout); - ev_timeout = Event(worker->get_ec(), -1, [conn](int, int) { + ev_timeout = TimerEvent(worker->get_ec(), [conn](TimerEvent &) { SALTICIDAE_LOG_INFO("peer ping-pong timeout"); conn->worker_terminate(); }); @@ -526,11 +526,11 @@ void PeerNetwork<O, _, __>::Conn::on_teardown() { p->conn = nullptr; SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*this).c_str()); // try to reconnect - p->ev_retry_timer = Event(pn->disp_ec, -1, - [pn, peer_id = this->peer_id](int, int) { + p->ev_retry_timer = TimerEvent(pn->disp_ec, + [pn, peer_id = this->peer_id](TimerEvent &) { pn->start_active_conn(peer_id); }); - p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout(), 0); + p->ev_retry_timer.add(pn->gen_conn_timeout()); } template<typename O, O _, O __> @@ -554,8 +554,7 @@ template<typename O, O _, O __> void PeerNetwork<O, _, __>::Peer::reset_ping_timer() { assert(ev_ping_timer); ev_ping_timer.del(); - ev_ping_timer.add_with_timeout( - gen_rand_timeout(conn->get_net()->ping_period), 0); + ev_ping_timer.add(gen_rand_timeout(conn->get_net()->ping_period)); } template<typename O, O _, O __> @@ -568,7 +567,7 @@ void PeerNetwork<O, _, __>::Peer::send_ping() { } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::Peer::ping_timer(int, int) { +void PeerNetwork<O, _, __>::Peer::ping_timer(TimerEvent &) { ping_timer_ok = true; if (pong_msg_ok) { diff --git a/src/conn.cpp b/src/conn.cpp index 51a5346..4ab3040 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -54,7 +54,7 @@ ConnPool::Conn::operator std::string() const { /* the following two functions are executed by exactly one worker per Conn object */ void ConnPool::Conn::send_data(int fd, int events) { - if (events & Event::ERROR) + if (events & FdEvent::ERROR) { worker_terminate(); return; @@ -93,13 +93,13 @@ void ConnPool::Conn::send_data(int fd, int events) { } } ev_socket.del(); - ev_socket.add(Event::READ); + ev_socket.add(FdEvent::READ); /* consumed the buffer but endpoint still seems to be writable */ ready_send = true; } void ConnPool::Conn::recv_data(int fd, int events) { - if (events & Event::ERROR) + if (events & FdEvent::ERROR) { worker_terminate(); return; @@ -172,7 +172,8 @@ void ConnPool::accept_client(int fd, int) { struct sockaddr client_addr; socklen_t addr_size = sizeof(struct sockaddr_in); if ((client_fd = accept(fd, &client_addr, &addr_size)) < 0) - SALTICIDAE_LOG_ERROR("error while accepting the connection"); + SALTICIDAE_LOG_ERROR("error while accepting the connection: %s", + strerror(errno)); else { int one = 1; @@ -213,7 +214,7 @@ void ConnPool::Conn::conn_server(int fd, int events) { } else { - if (events & Event::TIMEOUT) + if (events & TimedFdEvent::TIMEOUT) SALTICIDAE_LOG_INFO("%s connect timeout", std::string(*this).c_str()); conn->disp_terminate(); return; @@ -245,9 +246,9 @@ void ConnPool::_listen(NetAddr listen_addr) { throw ConnPoolError(std::string("binding error")); if (::listen(listen_fd, max_listen_backlog) < 0) throw ConnPoolError(std::string("listen error")); - ev_listen = Event(disp_ec, listen_fd, + ev_listen = FdEvent(disp_ec, listen_fd, std::bind(&ConnPool::accept_client, this, _1, _2)); - ev_listen.add(Event::READ); + ev_listen.add(FdEvent::READ); SALTICIDAE_LOG_INFO("listening to %u", ntohs(listen_addr.port)); } @@ -283,8 +284,8 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) { } else { - conn->ev_connect = Event(disp_ec, conn->fd, std::bind(&Conn::conn_server, conn.get(), _1, _2)); - conn->ev_connect.add_with_timeout(conn_server_timeout, Event::WRITE); + conn->ev_connect = TimedFdEvent(disp_ec, conn->fd, std::bind(&Conn::conn_server, conn.get(), _1, _2)); + conn->ev_connect.add(FdEvent::WRITE, conn_server_timeout); add_conn(conn); SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str()); } diff --git a/src/util.cpp b/src/util.cpp index c51e191..6a8c652 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -51,11 +51,13 @@ double gen_rand_timeout(double base_timeout, double alpha) { return base_timeout + rand() / (double)RAND_MAX * alpha * base_timeout; } -std::string vstringprintf(const char *fmt, va_list ap) { +std::string vstringprintf(const char *fmt, va_list _ap) { int guessed_size = 1024; std::string buff; + va_list ap; + va_copy(ap, _ap); buff.resize(guessed_size); - int nwrote = vsnprintf(&buff[0], guessed_size, fmt, ap); + int nwrote = vsnprintf(&buff[0], guessed_size, fmt, _ap); if (nwrote < 0) buff = ""; else { @@ -66,6 +68,7 @@ std::string vstringprintf(const char *fmt, va_list ap) { buff = ""; } } + va_end(ap); return std::move(buff); } diff --git a/test/bench_network.cpp b/test/bench_network.cpp index b0856b8..be90e2a 100644 --- a/test/bench_network.cpp +++ b/test/bench_network.cpp @@ -42,7 +42,7 @@ using salticidae::MsgNetwork; using salticidae::htole; using salticidae::letoh; using salticidae::bytearray_t; -using salticidae::Event; +using salticidae::TimerEvent; using std::placeholders::_1; using std::placeholders::_2; using opcode_t = uint8_t; @@ -71,8 +71,8 @@ using MsgNetworkByteOp = MsgNetwork<opcode_t>; struct MyNet: public MsgNetworkByteOp { const std::string name; const NetAddr peer; - Event ev_period_send; - Event ev_period_stat; + TimerEvent ev_period_send; + TimerEvent ev_period_stat; size_t nrecv; MyNet(const salticidae::EventContext &ec, @@ -82,17 +82,17 @@ struct MyNet: public MsgNetworkByteOp { MsgNetworkByteOp(ec, MsgNetworkByteOp::Config()), name(name), peer(peer), - ev_period_stat(ec, -1, [this, stat_timeout](int, short) { + ev_period_stat(ec, [this, stat_timeout](TimerEvent &) { SALTICIDAE_LOG_INFO("%.2f mps\n", nrecv / (double)stat_timeout); nrecv = 0; - ev_period_stat.add_with_timeout(stat_timeout, 0); + ev_period_stat.add(stat_timeout); }), nrecv(0) { /* message handler could be a bound method */ reg_handler(salticidae::generic_bind( &MyNet::on_receive_bytes, this, _1, _2)); if (stat_timeout > 0) - ev_period_stat.add_with_timeout(0, 0); + ev_period_stat.add(0); } struct Conn: public MsgNetworkByteOp::Conn { @@ -109,12 +109,12 @@ struct MyNet: public MsgNetworkByteOp { printf("[%s] Connected, sending hello.\n", net->name.c_str()); /* send the first message through this connection */ - net->ev_period_send = Event(net->ec, -1, - [net, conn = self()](int, short) { + net->ev_period_send = TimerEvent(net->ec, + [net, conn = self()](TimerEvent &) { net->send_msg(MsgBytes(256), conn); - net->ev_period_send.add_with_timeout(0, 0); + net->ev_period_send.add(0); }); - net->ev_period_send.add_with_timeout(0, 0); + net->ev_period_send.add(0); } else diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp index b272742..70e3444 100644 --- a/test/test_p2p_stress.cpp +++ b/test/test_p2p_stress.cpp @@ -36,7 +36,7 @@ using salticidae::NetAddr; using salticidae::DataStream; using salticidae::MsgNetwork; using salticidae::ConnPool; -using salticidae::Event; +using salticidae::TimerEvent; using salticidae::EventContext; using salticidae::htole; using salticidae::letoh; @@ -78,7 +78,7 @@ using MyNet = salticidae::PeerNetwork<uint8_t>; std::vector<NetAddr> addrs; struct TestContext { - Event timer; + TimerEvent timer; int state; uint256_t hash; size_t ncompleted; @@ -130,7 +130,7 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) { { send_rand(tc.state, conn); tc.state = -1; - tc.timer = Event(ec, -1, [&, conn](int, int) { + tc.timer = TimerEvent(ec, [&, conn](TimerEvent &) { tc.ncompleted++; net.terminate(conn); std::string s; @@ -139,7 +139,7 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) { SALTICIDAE_LOG_INFO("%d completed:%s", ntohs(app.addr.port), s.c_str()); }); double t = salticidae::gen_rand_timeout(10); - tc.timer.add_with_timeout(t, 0); + tc.timer.add(t); SALTICIDAE_LOG_INFO("rand-bomboard phase, ending in %.2f secs", t); } else if (tc.state == -1) diff --git a/test/test_queue.cpp b/test/test_queue.cpp index 41c6f88..7b06951 100644 --- a/test/test_queue.cpp +++ b/test/test_queue.cpp @@ -4,6 +4,8 @@ #include "salticidae/event.h" +using salticidae::TimerEvent; + void test_mpsc(int nproducers = 16, int nops = 100000, size_t burst_size = 128) { size_t total = nproducers * nops; salticidae::EventContext ec; @@ -23,10 +25,10 @@ void test_mpsc(int nproducers = 16, int nops = 100000, size_t burst_size = 128) }); std::vector<std::thread> producers; std::thread consumer([&collected, total, &ec]() { - salticidae::Event timer(ec, -1, [&ec, &collected, total](int, short) { + TimerEvent timer(ec, [&ec, &collected, total](TimerEvent &) { if (collected.load() == total) ec.stop(); }); - timer.add_with_timeout(1, 0); + timer.add(1); ec.dispatch(); }); for (int i = 0; i < nproducers; i++) |