aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/event.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/event.h')
-rw-r--r--include/salticidae/event.h67
1 files changed, 48 insertions, 19 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index da27902..616f598 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -42,7 +42,8 @@ struct _event_context_deleter {
void operator()(uv_loop_t *ptr) {
if (ptr != nullptr)
{
- uv_loop_close(ptr);
+ while (uv_loop_close(ptr) == UV_EBUSY)
+ uv_run(ptr, UV_RUN_NOWAIT);
delete ptr;
}
}
@@ -62,8 +63,7 @@ class EventContext: public _event_context_ot {
EventContext &operator=(EventContext &&) = default;
void dispatch() const {
// TODO: improve this loop
- for (;;)
- uv_run(get(), UV_RUN_ONCE);
+ uv_run(get(), UV_RUN_DEFAULT);
}
void stop() const { uv_stop(get()); }
};
@@ -79,12 +79,15 @@ class Event {
private:
EventContext eb;
int fd;
- int events;
uv_poll_t *ev_fd;
uv_timer_t *ev_timer;
callback_t callback;
static inline void fd_then(uv_poll_t *h, int status, int events) {
- assert(status == 0);
+ if (status != 0)
+ {
+ SALTICIDAE_LOG_WARN("%s", uv_strerror(status));
+ return;
+ }
auto event = static_cast<Event *>(h->data);
event->callback(event->fd, events);
}
@@ -95,10 +98,14 @@ class Event {
event->callback(event->fd, TIMEOUT);
}
+ static void _on_handle_close(uv_handle_t *h) {
+ delete h;
+ }
+
public:
Event(): eb(nullptr), ev_fd(nullptr), ev_timer(nullptr) {}
- Event(const EventContext &eb, int fd, short events, callback_t callback):
- eb(eb), fd(fd), events(events),
+ Event(const EventContext &eb, int fd, callback_t callback):
+ eb(eb), fd(fd),
ev_fd(nullptr),
ev_timer(new uv_timer_t()),
callback(callback) {
@@ -114,7 +121,7 @@ class Event {
Event(const Event &) = delete;
Event(Event &&other):
- eb(std::move(other.eb)), fd(other.fd), events(other.events),
+ eb(std::move(other.eb)), fd(other.fd),
ev_fd(other.ev_fd), ev_timer(other.ev_timer),
callback(std::move(other.callback)) {
other.del();
@@ -132,7 +139,6 @@ class Event {
other.del();
eb = std::move(other.eb);
fd = other.fd;
- events = other.events;
ev_fd = other.ev_fd;
ev_timer = other.ev_timer;
callback = std::move(other.callback);
@@ -153,26 +159,33 @@ class Event {
if (ev_fd != nullptr)
{
uv_poll_stop(ev_fd);
- delete ev_fd;
+ uv_close((uv_handle_t *)ev_fd, Event::_on_handle_close);
ev_fd = nullptr;
}
if (ev_timer != nullptr)
{
uv_timer_stop(ev_timer);
- delete ev_timer;
+ uv_close((uv_handle_t *)ev_timer, Event::_on_handle_close);
ev_timer = nullptr;
}
+ callback = nullptr;
+ }
+
+ void set_callback(callback_t _callback) {
+ callback = _callback;
}
- void add() {
+ void add(int events) {
if (ev_fd) uv_poll_start(ev_fd, events, Event::fd_then);
}
void del() {
if (ev_fd) uv_poll_stop(ev_fd);
+ if (ev_timer == nullptr)
+ assert(ev_timer);
uv_timer_stop(ev_timer);
}
- void add_with_timeout(double t_sec) {
- add();
+ void add_with_timeout(double t_sec, int events) {
+ add(events);
uv_timer_start(ev_timer, Event::timer_then, uint64_t(t_sec * 1000), 0);
}
@@ -209,6 +222,7 @@ class ThreadCall {
public:
class Handle {
std::function<void(Handle &)> callback;
+ std::function<void(void *)> deleter;
ThreadNotifier* notifier;
void *result;
friend ThreadCall;
@@ -219,6 +233,8 @@ class ThreadCall {
if (notifier) notifier->notify(result);
}
void set_result(void *data) { result = data; }
+ template<typename Func>
+ void set_deleter(Func _deleter) { deleter = _deleter; }
};
ThreadCall() = default;
@@ -227,16 +243,24 @@ class ThreadCall {
ThreadCall(EventContext ec): ec(ec) {
if (pipe2(ctl_fd, O_NONBLOCK))
throw SalticidaeError(std::string("ThreadCall: failed to create pipe"));
- ev_listen = Event(ec, ctl_fd[0], Event::READ, [this](int fd, int) {
+ ev_listen = Event(ec, ctl_fd[0], [this](int fd, int) {
Handle *h;
read(fd, &h, sizeof(h));
h->exec();
delete h;
});
- ev_listen.add();
+ ev_listen.add(Event::READ);
}
~ThreadCall() {
+ ev_listen.clear();
+ Handle *h;
+ while (read(ctl_fd[0], &h, sizeof(h)) == sizeof(h))
+ {
+ if (h->result && h->deleter)
+ h->deleter(h->result);
+ delete h;
+ }
close(ctl_fd[0]);
close(ctl_fd[1]);
}
@@ -277,11 +301,14 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
wait_sig(true),
fd(eventfd(0, EFD_NONBLOCK)) {}
- ~MPSCQueueEventDriven() { close(fd); }
+ ~MPSCQueueEventDriven() {
+ ev.clear();
+ close(fd);
+ }
template<typename Func>
void reg_handler(const EventContext &ec, Func &&func) {
- ev = Event(ec, fd, Event::READ,
+ ev = Event(ec, fd,
[this, func=std::forward<Func>(func)](int, short) {
//fprintf(stderr, "%x\n", std::this_thread::get_id());
uint64_t t;
@@ -296,9 +323,11 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
if (func(*this))
write(fd, &dummy, 8);
});
- ev.add();
+ ev.add(Event::READ);
}
+ void unreg_handler() { ev.clear(); }
+
template<typename U>
bool enqueue(U &&e) {
static const uint64_t dummy = 1;