From d2fe5eb74bdf40afc5cacd052f40b56aa3e57eaf Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 20 Nov 2018 20:43:57 -0500 Subject: refactor libuv wrapper classes --- include/salticidae/conn.h | 16 +-- include/salticidae/event.h | 307 ++++++++++++++++++++++++++++++------------- include/salticidae/network.h | 25 ++-- 3 files changed, 232 insertions(+), 116 deletions(-) (limited to 'include') diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index a60cd8f..e7d6295 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -86,8 +86,8 @@ class ConnPool { MPSCWriteBuffer send_buffer; SegBuffer recv_buffer; - Event ev_connect; - Event ev_socket; + TimedFdEvent ev_connect; + FdEvent ev_socket; /** does not need to wait if true */ bool ready_send; @@ -161,7 +161,7 @@ class ConnPool { conn_callback_t conn_cb; /* owned by the dispatcher */ - Event ev_listen; + FdEvent ev_listen; std::unordered_map pool; int listen_fd; /**< for accepting new network connections */ @@ -207,18 +207,18 @@ class ConnPool { if (conn->ready_send) { conn->ev_socket.del(); - conn->ev_socket.add(Event::READ | Event::WRITE); - conn->send_data(client_fd, Event::WRITE); + conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE); + conn->send_data(client_fd, FdEvent::WRITE); } return false; }); - conn->ev_socket = Event(ec, client_fd, [conn=conn](int fd, int what) { - if (what & Event::READ) + conn->ev_socket = FdEvent(ec, client_fd, [conn=conn](int fd, int what) { + if (what & FdEvent::READ) conn->recv_data(fd, what); else conn->send_data(fd, what); }); - conn->ev_socket.add(Event::READ | Event::WRITE); + conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE); nconn++; }); } diff --git a/include/salticidae/event.h b/include/salticidae/event.h index ca48e69..2365a71 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -68,101 +68,153 @@ class EventContext: public _event_context_ot { void stop() const { uv_stop(get()); } }; -class Event { +static void _on_uv_handle_close(uv_handle_t *h) { delete h; } + +class FdEvent { public: using callback_t = std::function; static const int READ = UV_READABLE; static const int WRITE = UV_WRITABLE; static const int ERROR = 1 << 30; - static const int TIMEOUT = 1 << 29; - private: - EventContext eb; + protected: + EventContext ec; int fd; uv_poll_t *ev_fd; - uv_timer_t *ev_timer; callback_t callback; + static inline void fd_then(uv_poll_t *h, int status, int events) { if (status != 0) events |= ERROR; - auto event = static_cast(h->data); + auto event = static_cast(h->data); event->callback(event->fd, events); } - static inline void timer_then(uv_timer_t *h) { - auto event = static_cast(h->data); - if (event->ev_fd) uv_poll_stop(event->ev_fd); - event->callback(event->fd, TIMEOUT); + public: + FdEvent(): ec(nullptr), ev_fd(nullptr) {} + FdEvent(const EventContext &ec, int fd, callback_t callback): + ec(ec), fd(fd), ev_fd(new uv_poll_t()), + callback(callback) { + uv_poll_init(ec.get(), ev_fd, fd); + ev_fd->data = this; } - static void _on_handle_close(uv_handle_t *h) { - delete h; + FdEvent(const FdEvent &) = delete; + FdEvent(FdEvent &&other): + ec(std::move(other.ec)), fd(other.fd), ev_fd(other.ev_fd), + callback(std::move(other.callback)) { + other.ev_fd = nullptr; + if (ev_fd != nullptr) + ev_fd->data = this; + } + + void swap(FdEvent &other) { + std::swap(ec, other.ec); + std::swap(fd, other.fd); + std::swap(ev_fd, other.ev_fd); + std::swap(callback, other.callback); + if (ev_fd != nullptr) + ev_fd->data = this; + if (other.ev_fd != nullptr) + other.ev_fd->data = &other; } - public: - Event(): eb(nullptr), ev_fd(nullptr), ev_timer(nullptr) {} - Event(const EventContext &eb, int fd, callback_t callback): - eb(eb), fd(fd), - ev_fd(nullptr), - ev_timer(new uv_timer_t()), - callback(callback) { - if (fd != -1) + FdEvent &operator=(FdEvent &&other) { + if (this != &other) { - ev_fd = new uv_poll_t(); - uv_poll_init(eb.get(), ev_fd, fd); - ev_fd->data = this; + FdEvent tmp(std::move(other)); + tmp.swap(*this); } - uv_timer_init(eb.get(), ev_timer); - ev_timer->data = this; + return *this; } - Event(const Event &) = delete; - Event(Event &&other): - eb(std::move(other.eb)), fd(other.fd), - ev_fd(other.ev_fd), ev_timer(other.ev_timer), - callback(std::move(other.callback)) { - other.del(); + ~FdEvent() { clear(); } + + void clear() { if (ev_fd != nullptr) { - other.ev_fd = nullptr; - ev_fd->data = this; + uv_poll_stop(ev_fd); + uv_close((uv_handle_t *)ev_fd, _on_uv_handle_close); + ev_fd = nullptr; } - other.ev_timer = nullptr; + callback = nullptr; + } + + void set_callback(callback_t _callback) { + callback = _callback; + } + + void add(int events) { + assert(ev_fd != nullptr); + uv_poll_start(ev_fd, events, FdEvent::fd_then); + } + + void del() { + if (ev_fd != nullptr) uv_poll_stop(ev_fd); + } + + operator bool() const { return ev_fd != nullptr; } +}; + + +class TimerEvent { + public: + using callback_t = std::function; + + protected: + EventContext ec; + uv_timer_t *ev_timer; + callback_t callback; + + static inline void timer_then(uv_timer_t *h) { + auto event = static_cast(h->data); + event->callback(*event); + } + + public: + TimerEvent(): ec(nullptr), ev_timer(nullptr) {} + TimerEvent(const EventContext &ec, callback_t callback): + ec(ec), ev_timer(new uv_timer_t()), + callback(callback) { + uv_timer_init(ec.get(), ev_timer); ev_timer->data = this; } - Event &operator=(Event &&other) { - clear(); - other.del(); - eb = std::move(other.eb); - fd = other.fd; - ev_fd = other.ev_fd; - ev_timer = other.ev_timer; - callback = std::move(other.callback); + TimerEvent(const TimerEvent &) = delete; + TimerEvent(TimerEvent &&other): + ec(std::move(other.ec)), ev_timer(other.ev_timer), + callback(std::move(other.callback)) { + other.ev_timer = nullptr; + if (ev_timer != nullptr) + ev_timer->data = this; + } - if (ev_fd != nullptr) + void swap(TimerEvent &other) { + std::swap(ec, other.ec); + std::swap(ev_timer, other.ev_timer); + std::swap(callback, other.callback); + if (ev_timer != nullptr) + ev_timer->data = this; + if (other.ev_timer != nullptr) + other.ev_timer->data = &other; + } + + TimerEvent &operator=(TimerEvent &&other) { + if (this != &other) { - other.ev_fd = nullptr; - ev_fd->data = this; + TimerEvent tmp(std::move(other)); + tmp.swap(*this); } - other.ev_timer = nullptr; - ev_timer->data = this; return *this; } - ~Event() { clear(); } + ~TimerEvent() { clear(); } void clear() { - if (ev_fd != nullptr) - { - uv_poll_stop(ev_fd); - uv_close((uv_handle_t *)ev_fd, Event::_on_handle_close); - ev_fd = nullptr; - } if (ev_timer != nullptr) { uv_timer_stop(ev_timer); - uv_close((uv_handle_t *)ev_timer, Event::_on_handle_close); + uv_close((uv_handle_t *)ev_timer, _on_uv_handle_close); ev_timer = nullptr; } callback = nullptr; @@ -172,28 +224,90 @@ class Event { callback = _callback; } - void add(int events) { - if (ev_fd) uv_poll_start(ev_fd, events, Event::fd_then); + void add(double t_sec) { + assert(ev_timer != nullptr); + uv_timer_start(ev_timer, TimerEvent::timer_then, uint64_t(t_sec * 1000), 0); } + void del() { - if (ev_fd) uv_poll_stop(ev_fd); - if (ev_timer == nullptr) - assert(ev_timer); - uv_timer_stop(ev_timer); + if (ev_timer != nullptr) uv_timer_stop(ev_timer); + } + + operator bool() const { return ev_timer != nullptr; } +}; + +class TimedFdEvent: public FdEvent, public TimerEvent { + public: + static const int TIMEOUT = 1 << 29; + + private: + static inline void timer_then(uv_timer_t *h) { + auto event = static_cast(h->data); + event->FdEvent::del(); + event->FdEvent::callback(event->fd, TIMEOUT); } - void add_with_timeout(double t_sec, int events) { - add(events); - uv_timer_start(ev_timer, Event::timer_then, uint64_t(t_sec * 1000), 0); + + public: + TimedFdEvent() = default; + TimedFdEvent(const EventContext &ec, int fd, FdEvent::callback_t callback): + FdEvent(ec, fd, callback), + TimerEvent(ec, TimerEvent::callback_t()) { + ev_timer->data = this; } - operator bool() const { return ev_fd != nullptr || ev_timer != nullptr; } + TimedFdEvent(TimedFdEvent &&other): + FdEvent(static_cast(other)), + TimerEvent(static_cast(other)) { + if (ev_timer != nullptr) + ev_timer->data = this; + } + + void swap(TimedFdEvent &other) { + std::swap(static_cast(*this), static_cast(other)); + std::swap(static_cast(*this), static_cast(other)); + if (ev_timer != nullptr) + ev_timer->data = this; + } + + TimedFdEvent &operator=(TimedFdEvent &&other) { + if (this != &other) + { + TimedFdEvent tmp(std::move(other)); + tmp.swap(*this); + } + return *this; + } + + void clear() { + TimerEvent::clear(); + FdEvent::clear(); + } + + using FdEvent::set_callback; + + void add(int events) = delete; + void add(double t_sec) = delete; + + void add(int events, double t_sec) { + assert(ev_fd != nullptr && ev_timer != nullptr); + uv_timer_start(ev_timer, TimedFdEvent::timer_then, + uint64_t(t_sec * 1000), 0); + FdEvent::add(events); + } + + void del() { + TimerEvent::del(); + FdEvent::del(); + } + + operator bool() const { return ev_fd != nullptr; } }; class SigEvent { public: using callback_t = std::function; private: - EventContext eb; + EventContext ec; uv_signal_t *ev_sig; callback_t callback; static inline void sig_then(uv_signal_t *h, int signum) { @@ -201,39 +315,40 @@ class SigEvent { event->callback(signum); } - static void _on_handle_close(uv_handle_t *h) { - delete h; - } - public: - SigEvent(): eb(nullptr), ev_sig(nullptr) {} - SigEvent(const EventContext &eb, callback_t callback): - eb(eb), - ev_sig(new uv_signal_t()), + SigEvent(): ec(nullptr), ev_sig(nullptr) {} + SigEvent(const EventContext &ec, callback_t callback): + ec(ec), ev_sig(new uv_signal_t()), callback(callback) { - uv_signal_init(eb.get(), ev_sig); + uv_signal_init(ec.get(), ev_sig); ev_sig->data = this; } SigEvent(const SigEvent &) = delete; SigEvent(SigEvent &&other): - eb(std::move(other.eb)), - ev_sig(other.ev_sig), + ec(std::move(other.ec)), ev_sig(other.ev_sig), callback(std::move(other.callback)) { - other.del(); other.ev_sig = nullptr; - ev_sig->data = this; + if (ev_sig != nullptr) + ev_sig->data = this; } - SigEvent &operator=(SigEvent &&other) { - clear(); - other.del(); - eb = std::move(other.eb); - ev_sig = other.ev_sig; - callback = std::move(other.callback); + void swap(SigEvent &other) { + std::swap(ec, other.ec); + std::swap(ev_sig, other.ev_sig); + std::swap(callback, other.callback); + if (ev_sig != nullptr) + ev_sig->data = this; + if (other.ev_sig != nullptr) + other.ev_sig->data = &other; + } - other.ev_sig = nullptr; - ev_sig->data = this; + SigEvent &operator=(SigEvent &&other) { + if (this != &other) + { + SigEvent tmp(std::move(other)); + tmp.swap(*this); + } return *this; } @@ -243,7 +358,7 @@ class SigEvent { if (ev_sig != nullptr) { uv_signal_stop(ev_sig); - uv_close((uv_handle_t *)ev_sig, SigEvent::_on_handle_close); + uv_close((uv_handle_t *)ev_sig, _on_uv_handle_close); ev_sig = nullptr; } callback = nullptr; @@ -254,15 +369,17 @@ class SigEvent { } void add(int signum) { + assert(ev_sig != nullptr); uv_signal_start(ev_sig, SigEvent::sig_then, signum); } void add_once(int signum) { + assert(ev_sig != nullptr); uv_signal_start_oneshot(ev_sig, SigEvent::sig_then, signum); } void del() { - uv_signal_stop(ev_sig); + if (ev_sig != nullptr) uv_signal_stop(ev_sig); } operator bool() const { return ev_sig != nullptr; } @@ -292,7 +409,7 @@ class ThreadNotifier { class ThreadCall { int ctl_fd[2]; EventContext ec; - Event ev_listen; + FdEvent ev_listen; public: struct Result { @@ -347,13 +464,13 @@ class ThreadCall { ThreadCall(EventContext ec): ec(ec) { if (pipe2(ctl_fd, O_NONBLOCK)) throw SalticidaeError(std::string("ThreadCall: failed to create pipe")); - ev_listen = Event(ec, ctl_fd[0], [this](int fd, int) { + ev_listen = FdEvent(ec, ctl_fd[0], [this](int fd, int) { Handle *h; read(fd, &h, sizeof(h)); h->exec(); delete h; }); - ev_listen.add(Event::READ); + ev_listen.add(FdEvent::READ); } ~ThreadCall() { @@ -394,7 +511,7 @@ class MPSCQueueEventDriven: public MPSCQueue { const uint64_t dummy = 1; std::atomic wait_sig; int fd; - Event ev; + FdEvent ev; public: MPSCQueueEventDriven(size_t capacity = 65536): @@ -409,7 +526,7 @@ class MPSCQueueEventDriven: public MPSCQueue { template void reg_handler(const EventContext &ec, Func &&func) { - ev = Event(ec, fd, + ev = FdEvent(ec, fd, [this, func=std::forward(func)](int, int) { //fprintf(stderr, "%x\n", std::this_thread::get_id()); uint64_t t; @@ -424,7 +541,7 @@ class MPSCQueueEventDriven: public MPSCQueue { if (func(*this)) write(fd, &dummy, 8); }); - ev.add(Event::READ); + ev.add(FdEvent::READ); } void unreg_handler() { ev.clear(); } diff --git a/include/salticidae/network.h b/include/salticidae/network.h index f2b99b3..51effe0 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -267,7 +267,7 @@ class PeerNetwork: public MsgNetwork { class Conn: public MsgNet::Conn { friend PeerNetwork; NetAddr peer_id; - Event ev_timeout; + TimerEvent ev_timeout; void reset_timeout(double timeout); public: @@ -297,8 +297,8 @@ class PeerNetwork: public MsgNetwork { NetAddr addr; /** the underlying connection, may be invalid when connected = false */ conn_t conn; - Event ev_ping_timer; - Event ev_retry_timer; + TimerEvent ev_ping_timer; + TimerEvent ev_retry_timer; bool ping_timer_ok; bool pong_msg_ok; bool connected; @@ -307,13 +307,13 @@ class PeerNetwork: public MsgNetwork { Peer(NetAddr addr, conn_t conn, const EventContext &ec): addr(addr), conn(conn), ev_ping_timer( - Event(ec, -1, std::bind(&Peer::ping_timer, this, _1, _2))), + TimerEvent(ec, std::bind(&Peer::ping_timer, this, _1))), connected(false) {} ~Peer() {} Peer &operator=(const Peer &) = delete; Peer(const Peer &) = delete; - void ping_timer(int, int); + void ping_timer(TimerEvent &); void reset_ping_timer(); void send_ping(); void clear_all_events() { @@ -491,7 +491,7 @@ void PeerNetwork::tcall_reset_timeout(ConnPool::Worker *worker, worker->get_tcall()->async_call([conn, t=timeout](ThreadCall::Handle &) { if (!conn->ev_timeout) return; conn->ev_timeout.del(); - conn->ev_timeout.add_with_timeout(t, 0); + conn->ev_timeout.add(t); SALTICIDAE_LOG_DEBUG("reset connection timeout %.2f", t); }); } @@ -504,7 +504,7 @@ void PeerNetwork::Conn::on_setup() { auto conn = static_pointer_cast(this->self()); auto worker = this->worker; assert(!ev_timeout); - ev_timeout = Event(worker->get_ec(), -1, [conn](int, int) { + ev_timeout = TimerEvent(worker->get_ec(), [conn](TimerEvent &) { SALTICIDAE_LOG_INFO("peer ping-pong timeout"); conn->worker_terminate(); }); @@ -526,11 +526,11 @@ void PeerNetwork::Conn::on_teardown() { p->conn = nullptr; SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*this).c_str()); // try to reconnect - p->ev_retry_timer = Event(pn->disp_ec, -1, - [pn, peer_id = this->peer_id](int, int) { + p->ev_retry_timer = TimerEvent(pn->disp_ec, + [pn, peer_id = this->peer_id](TimerEvent &) { pn->start_active_conn(peer_id); }); - p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout(), 0); + p->ev_retry_timer.add(pn->gen_conn_timeout()); } template @@ -554,8 +554,7 @@ template void PeerNetwork::Peer::reset_ping_timer() { assert(ev_ping_timer); ev_ping_timer.del(); - ev_ping_timer.add_with_timeout( - gen_rand_timeout(conn->get_net()->ping_period), 0); + ev_ping_timer.add(gen_rand_timeout(conn->get_net()->ping_period)); } template @@ -568,7 +567,7 @@ void PeerNetwork::Peer::send_ping() { } template -void PeerNetwork::Peer::ping_timer(int, int) { +void PeerNetwork::Peer::ping_timer(TimerEvent &) { ping_timer_ok = true; if (pong_msg_ok) { -- cgit v1.2.3