aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/event.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/event.h')
-rw-r--r--include/salticidae/event.h132
1 files changed, 95 insertions, 37 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index ddb93fc..c21644b 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -26,9 +26,8 @@
#define _SALTICIDAE_EVENT_H
#include <unistd.h>
-#include <event2/event.h>
+#include <uv.h>
#include <sys/eventfd.h>
-//#include <thread>
#include "salticidae/queue.h"
#include "salticidae/util.h"
@@ -38,87 +37,144 @@ namespace salticidae {
struct _event_context_deleter {
constexpr _event_context_deleter() = default;
- void operator()(struct event_base *ptr) {
- if (ptr != nullptr) event_base_free(ptr);
+ void operator()(uv_loop_t *ptr) {
+ if (ptr != nullptr)
+ {
+ uv_loop_close(ptr);
+ delete ptr;
+ }
}
};
-using _event_context_ot = RcObj<struct event_base, _event_context_deleter>;
+using _event_context_ot = ArcObj<uv_loop_t, _event_context_deleter>;
class EventContext: public _event_context_ot {
public:
- EventContext(): _event_context_ot(event_base_new()) {}
- EventContext(struct event_base *eb): _event_context_ot(eb) {}
+ EventContext(): _event_context_ot(new uv_loop_t()) {
+ uv_loop_init(get());
+ }
+ EventContext(uv_loop_t *eb): _event_context_ot(eb) {}
EventContext(const EventContext &) = default;
EventContext(EventContext &&) = default;
EventContext &operator=(const EventContext &) = default;
EventContext &operator=(EventContext &&) = default;
- void dispatch() const { event_base_dispatch(get()); }
- void stop() const { event_base_loopbreak(get()); }
+ void dispatch() const {
+ // TODO: improve this loop
+ for (;;)
+ uv_run(get(), UV_RUN_ONCE);
+ }
+ void stop() const { uv_stop(get()); }
};
class Event {
public:
- using callback_t = std::function<void(evutil_socket_t fd, short events)>;
+ using callback_t = std::function<void(int fd, short events)>;
+ static const int READ = UV_READABLE;
+ static const int WRITE = UV_WRITABLE;
+ static const int TIMEOUT = ~(UV_READABLE | UV_WRITABLE |
+ UV_DISCONNECT | UV_PRIORITIZED);
private:
EventContext eb;
- evutil_socket_t fd;
- short events;
- struct event *ev;
+ int fd;
+ int events;
+ uv_poll_t *ev_fd;
+ uv_timer_t *ev_timer;
callback_t callback;
- static inline void _then(evutil_socket_t fd, short events, void *arg) {
- (static_cast<Event *>(arg))->callback(fd, events);
+ static inline void fd_then(uv_poll_t *h, int status, int events) {
+ assert(status == 0);
+ auto event = static_cast<Event *>(h->data);
+ event->callback(event->fd, events);
+ }
+
+ static inline void timer_then(uv_timer_t *h) {
+ auto event = static_cast<Event *>(h->data);
+ if (event->ev_fd) uv_poll_stop(event->ev_fd);
+ event->callback(event->fd, TIMEOUT);
}
public:
- Event(): eb(nullptr), ev(nullptr) {}
- Event(const EventContext &eb,
- evutil_socket_t fd,
- short events,
- callback_t callback):
+ Event(): eb(nullptr), ev_fd(nullptr), ev_timer(nullptr) {}
+ Event(const EventContext &eb, int fd, short events, callback_t callback):
eb(eb), fd(fd), events(events),
- ev(event_new(eb.get(), fd, events, Event::_then, this)),
- callback(callback) {}
+ ev_fd(nullptr),
+ ev_timer(new uv_timer_t()),
+ callback(callback) {
+ if (fd != -1)
+ {
+ ev_fd = new uv_poll_t();
+ uv_poll_init(eb.get(), ev_fd, fd);
+ ev_fd->data = this;
+ }
+ uv_timer_init(eb.get(), ev_timer);
+ ev_timer->data = this;
+ }
Event(const Event &) = delete;
Event(Event &&other):
eb(std::move(other.eb)), fd(other.fd), events(other.events),
+ ev_fd(other.ev_fd), ev_timer(other.ev_timer),
callback(std::move(other.callback)) {
- other.clear();
- ev = event_new(eb.get(), fd, events, Event::_then, this);
+ other.del();
+ if (fd != -1)
+ {
+ other.ev_fd = nullptr;
+ ev_fd->data = this;
+ }
+ other.ev_timer = nullptr;
+ ev_timer->data = this;
}
Event &operator=(Event &&other) {
clear();
- other.clear();
+ other.del();
eb = std::move(other.eb);
fd = other.fd;
events = other.events;
- ev = event_new(eb.get(), fd, events, Event::_then, this);
+ ev_fd = other.ev_fd;
+ ev_timer = other.ev_timer;
callback = std::move(other.callback);
+
+ if (fd != -1)
+ {
+ other.ev_fd = nullptr;
+ ev_fd->data = this;
+ }
+ other.ev_timer = nullptr;
+ ev_timer->data = this;
return *this;
}
~Event() { clear(); }
void clear() {
- if (ev != nullptr)
+ if (ev_fd != nullptr)
+ {
+ uv_poll_stop(ev_fd);
+ delete ev_fd;
+ ev_fd = nullptr;
+ }
+ if (ev_timer != nullptr)
{
- event_del(ev);
- event_free(ev);
- ev = nullptr;
+ uv_timer_stop(ev_timer);
+ delete ev_timer;
+ ev_timer = nullptr;
}
}
- void add() { if (ev) event_add(ev, nullptr); }
- void del() { if (ev) event_del(ev); }
- void add_with_timeout(double timeout) {
- if (ev)
- event_add_with_timeout(ev, timeout);
+ void add() {
+ if (ev_fd) uv_poll_start(ev_fd, events, Event::fd_then);
+ }
+ void del() {
+ if (ev_fd) uv_poll_stop(ev_fd);
+ uv_timer_stop(ev_timer);
+ }
+ void add_with_timeout(double t_sec) {
+ add();
+ uv_timer_start(ev_timer, Event::timer_then, uint64_t(t_sec * 1000), 0);
}
- operator bool() const { return ev != nullptr; }
+ operator bool() const { return ev_fd != nullptr || ev_timer != nullptr; }
};
template<typename T>
@@ -139,7 +195,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
template<typename Func>
void reg_handler(const EventContext &ec, Func &&func) {
- ev = Event(ec, fd, EV_READ | EV_PERSIST,
+ ev = Event(ec, fd, Event::READ,
[this, func=std::forward<Func>(func)](int, short) {
//fprintf(stderr, "%x\n", std::this_thread::get_id());
uint64_t t;
@@ -171,6 +227,8 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
}
};
+
+
// TODO: incorrect MPMCQueueEventDriven impl
/*
template<typename T>