From 2c1e8ec448a1039ab9a46bce4c959e6ec3cefeb8 Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 13 Nov 2018 18:20:08 -0500 Subject: working on p2p; switch to libuv (libevent sucks in multi-threading) --- include/salticidae/event.h | 132 ++++++++++++++++++++++++++++++++------------- 1 file changed, 95 insertions(+), 37 deletions(-) (limited to 'include/salticidae/event.h') diff --git a/include/salticidae/event.h b/include/salticidae/event.h index ddb93fc..c21644b 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -26,9 +26,8 @@ #define _SALTICIDAE_EVENT_H #include -#include +#include #include -//#include #include "salticidae/queue.h" #include "salticidae/util.h" @@ -38,87 +37,144 @@ namespace salticidae { struct _event_context_deleter { constexpr _event_context_deleter() = default; - void operator()(struct event_base *ptr) { - if (ptr != nullptr) event_base_free(ptr); + void operator()(uv_loop_t *ptr) { + if (ptr != nullptr) + { + uv_loop_close(ptr); + delete ptr; + } } }; -using _event_context_ot = RcObj; +using _event_context_ot = ArcObj; class EventContext: public _event_context_ot { public: - EventContext(): _event_context_ot(event_base_new()) {} - EventContext(struct event_base *eb): _event_context_ot(eb) {} + EventContext(): _event_context_ot(new uv_loop_t()) { + uv_loop_init(get()); + } + EventContext(uv_loop_t *eb): _event_context_ot(eb) {} EventContext(const EventContext &) = default; EventContext(EventContext &&) = default; EventContext &operator=(const EventContext &) = default; EventContext &operator=(EventContext &&) = default; - void dispatch() const { event_base_dispatch(get()); } - void stop() const { event_base_loopbreak(get()); } + void dispatch() const { + // TODO: improve this loop + for (;;) + uv_run(get(), UV_RUN_ONCE); + } + void stop() const { uv_stop(get()); } }; class Event { public: - using callback_t = std::function; + using callback_t = std::function; + static const int READ = UV_READABLE; + static const int WRITE = UV_WRITABLE; + static const int TIMEOUT = ~(UV_READABLE | UV_WRITABLE | + UV_DISCONNECT | UV_PRIORITIZED); private: EventContext eb; - evutil_socket_t fd; - short events; - struct event *ev; + int fd; + int events; + uv_poll_t *ev_fd; + uv_timer_t *ev_timer; callback_t callback; - static inline void _then(evutil_socket_t fd, short events, void *arg) { - (static_cast(arg))->callback(fd, events); + static inline void fd_then(uv_poll_t *h, int status, int events) { + assert(status == 0); + auto event = static_cast(h->data); + event->callback(event->fd, events); + } + + static inline void timer_then(uv_timer_t *h) { + auto event = static_cast(h->data); + if (event->ev_fd) uv_poll_stop(event->ev_fd); + event->callback(event->fd, TIMEOUT); } public: - Event(): eb(nullptr), ev(nullptr) {} - Event(const EventContext &eb, - evutil_socket_t fd, - short events, - callback_t callback): + Event(): eb(nullptr), ev_fd(nullptr), ev_timer(nullptr) {} + Event(const EventContext &eb, int fd, short events, callback_t callback): eb(eb), fd(fd), events(events), - ev(event_new(eb.get(), fd, events, Event::_then, this)), - callback(callback) {} + ev_fd(nullptr), + ev_timer(new uv_timer_t()), + callback(callback) { + if (fd != -1) + { + ev_fd = new uv_poll_t(); + uv_poll_init(eb.get(), ev_fd, fd); + ev_fd->data = this; + } + uv_timer_init(eb.get(), ev_timer); + ev_timer->data = this; + } Event(const Event &) = delete; Event(Event &&other): eb(std::move(other.eb)), fd(other.fd), events(other.events), + ev_fd(other.ev_fd), ev_timer(other.ev_timer), callback(std::move(other.callback)) { - other.clear(); - ev = event_new(eb.get(), fd, events, Event::_then, this); + other.del(); + if (fd != -1) + { + other.ev_fd = nullptr; + ev_fd->data = this; + } + other.ev_timer = nullptr; + ev_timer->data = this; } Event &operator=(Event &&other) { clear(); - other.clear(); + other.del(); eb = std::move(other.eb); fd = other.fd; events = other.events; - ev = event_new(eb.get(), fd, events, Event::_then, this); + ev_fd = other.ev_fd; + ev_timer = other.ev_timer; callback = std::move(other.callback); + + if (fd != -1) + { + other.ev_fd = nullptr; + ev_fd->data = this; + } + other.ev_timer = nullptr; + ev_timer->data = this; return *this; } ~Event() { clear(); } void clear() { - if (ev != nullptr) + if (ev_fd != nullptr) + { + uv_poll_stop(ev_fd); + delete ev_fd; + ev_fd = nullptr; + } + if (ev_timer != nullptr) { - event_del(ev); - event_free(ev); - ev = nullptr; + uv_timer_stop(ev_timer); + delete ev_timer; + ev_timer = nullptr; } } - void add() { if (ev) event_add(ev, nullptr); } - void del() { if (ev) event_del(ev); } - void add_with_timeout(double timeout) { - if (ev) - event_add_with_timeout(ev, timeout); + void add() { + if (ev_fd) uv_poll_start(ev_fd, events, Event::fd_then); + } + void del() { + if (ev_fd) uv_poll_stop(ev_fd); + uv_timer_stop(ev_timer); + } + void add_with_timeout(double t_sec) { + add(); + uv_timer_start(ev_timer, Event::timer_then, uint64_t(t_sec * 1000), 0); } - operator bool() const { return ev != nullptr; } + operator bool() const { return ev_fd != nullptr || ev_timer != nullptr; } }; template @@ -139,7 +195,7 @@ class MPSCQueueEventDriven: public MPSCQueue { template void reg_handler(const EventContext &ec, Func &&func) { - ev = Event(ec, fd, EV_READ | EV_PERSIST, + ev = Event(ec, fd, Event::READ, [this, func=std::forward(func)](int, short) { //fprintf(stderr, "%x\n", std::this_thread::get_id()); uint64_t t; @@ -171,6 +227,8 @@ class MPSCQueueEventDriven: public MPSCQueue { } }; + + // TODO: incorrect MPMCQueueEventDriven impl /* template -- cgit v1.2.3