From ecc163f98e434b557768560d00ee2f9755d6d950 Mon Sep 17 00:00:00 2001 From: Determinant Date: Wed, 14 Nov 2018 22:18:59 -0500 Subject: major bug fix --- include/salticidae/event.h | 67 +++++++++++++++++++++++++++++++++------------- 1 file changed, 48 insertions(+), 19 deletions(-) (limited to 'include/salticidae/event.h') diff --git a/include/salticidae/event.h b/include/salticidae/event.h index da27902..616f598 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -42,7 +42,8 @@ struct _event_context_deleter { void operator()(uv_loop_t *ptr) { if (ptr != nullptr) { - uv_loop_close(ptr); + while (uv_loop_close(ptr) == UV_EBUSY) + uv_run(ptr, UV_RUN_NOWAIT); delete ptr; } } @@ -62,8 +63,7 @@ class EventContext: public _event_context_ot { EventContext &operator=(EventContext &&) = default; void dispatch() const { // TODO: improve this loop - for (;;) - uv_run(get(), UV_RUN_ONCE); + uv_run(get(), UV_RUN_DEFAULT); } void stop() const { uv_stop(get()); } }; @@ -79,12 +79,15 @@ class Event { private: EventContext eb; int fd; - int events; uv_poll_t *ev_fd; uv_timer_t *ev_timer; callback_t callback; static inline void fd_then(uv_poll_t *h, int status, int events) { - assert(status == 0); + if (status != 0) + { + SALTICIDAE_LOG_WARN("%s", uv_strerror(status)); + return; + } auto event = static_cast(h->data); event->callback(event->fd, events); } @@ -95,10 +98,14 @@ class Event { event->callback(event->fd, TIMEOUT); } + static void _on_handle_close(uv_handle_t *h) { + delete h; + } + public: Event(): eb(nullptr), ev_fd(nullptr), ev_timer(nullptr) {} - Event(const EventContext &eb, int fd, short events, callback_t callback): - eb(eb), fd(fd), events(events), + Event(const EventContext &eb, int fd, callback_t callback): + eb(eb), fd(fd), ev_fd(nullptr), ev_timer(new uv_timer_t()), callback(callback) { @@ -114,7 +121,7 @@ class Event { Event(const Event &) = delete; Event(Event &&other): - eb(std::move(other.eb)), fd(other.fd), events(other.events), + eb(std::move(other.eb)), fd(other.fd), ev_fd(other.ev_fd), ev_timer(other.ev_timer), callback(std::move(other.callback)) { other.del(); @@ -132,7 +139,6 @@ class Event { other.del(); eb = std::move(other.eb); fd = other.fd; - events = other.events; ev_fd = other.ev_fd; ev_timer = other.ev_timer; callback = std::move(other.callback); @@ -153,26 +159,33 @@ class Event { if (ev_fd != nullptr) { uv_poll_stop(ev_fd); - delete ev_fd; + uv_close((uv_handle_t *)ev_fd, Event::_on_handle_close); ev_fd = nullptr; } if (ev_timer != nullptr) { uv_timer_stop(ev_timer); - delete ev_timer; + uv_close((uv_handle_t *)ev_timer, Event::_on_handle_close); ev_timer = nullptr; } + callback = nullptr; + } + + void set_callback(callback_t _callback) { + callback = _callback; } - void add() { + void add(int events) { if (ev_fd) uv_poll_start(ev_fd, events, Event::fd_then); } void del() { if (ev_fd) uv_poll_stop(ev_fd); + if (ev_timer == nullptr) + assert(ev_timer); uv_timer_stop(ev_timer); } - void add_with_timeout(double t_sec) { - add(); + void add_with_timeout(double t_sec, int events) { + add(events); uv_timer_start(ev_timer, Event::timer_then, uint64_t(t_sec * 1000), 0); } @@ -209,6 +222,7 @@ class ThreadCall { public: class Handle { std::function callback; + std::function deleter; ThreadNotifier* notifier; void *result; friend ThreadCall; @@ -219,6 +233,8 @@ class ThreadCall { if (notifier) notifier->notify(result); } void set_result(void *data) { result = data; } + template + void set_deleter(Func _deleter) { deleter = _deleter; } }; ThreadCall() = default; @@ -227,16 +243,24 @@ class ThreadCall { ThreadCall(EventContext ec): ec(ec) { if (pipe2(ctl_fd, O_NONBLOCK)) throw SalticidaeError(std::string("ThreadCall: failed to create pipe")); - ev_listen = Event(ec, ctl_fd[0], Event::READ, [this](int fd, int) { + ev_listen = Event(ec, ctl_fd[0], [this](int fd, int) { Handle *h; read(fd, &h, sizeof(h)); h->exec(); delete h; }); - ev_listen.add(); + ev_listen.add(Event::READ); } ~ThreadCall() { + ev_listen.clear(); + Handle *h; + while (read(ctl_fd[0], &h, sizeof(h)) == sizeof(h)) + { + if (h->result && h->deleter) + h->deleter(h->result); + delete h; + } close(ctl_fd[0]); close(ctl_fd[1]); } @@ -277,11 +301,14 @@ class MPSCQueueEventDriven: public MPSCQueue { wait_sig(true), fd(eventfd(0, EFD_NONBLOCK)) {} - ~MPSCQueueEventDriven() { close(fd); } + ~MPSCQueueEventDriven() { + ev.clear(); + close(fd); + } template void reg_handler(const EventContext &ec, Func &&func) { - ev = Event(ec, fd, Event::READ, + ev = Event(ec, fd, [this, func=std::forward(func)](int, short) { //fprintf(stderr, "%x\n", std::this_thread::get_id()); uint64_t t; @@ -296,9 +323,11 @@ class MPSCQueueEventDriven: public MPSCQueue { if (func(*this)) write(fd, &dummy, 8); }); - ev.add(); + ev.add(Event::READ); } + void unreg_handler() { ev.clear(); } + template bool enqueue(U &&e) { static const uint64_t dummy = 1; -- cgit v1.2.3