From 161d969e0eabfecccd69a6b9ed2d03919cf89cb5 Mon Sep 17 00:00:00 2001 From: Determinant Date: Sun, 18 Nov 2018 16:24:30 -0500 Subject: improve the test programs --- include/salticidae/event.h | 135 +++++++++++++++++++++++++-------------------- 1 file changed, 76 insertions(+), 59 deletions(-) (limited to 'include/salticidae/event.h') diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 021b5dc..d862ce8 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -189,6 +189,80 @@ class Event { operator bool() const { return ev_fd != nullptr || ev_timer != nullptr; } }; +class SigEvent { + public: + using callback_t = std::function; + private: + EventContext eb; + uv_signal_t *ev_sig; + callback_t callback; + static inline void sig_then(uv_signal_t *h, int signum) { + auto event = static_cast(h->data); + event->callback(signum); + } + + static void _on_handle_close(uv_handle_t *h) { + delete h; + } + + public: + SigEvent(): eb(nullptr), ev_sig(nullptr) {} + SigEvent(const EventContext &eb, callback_t callback): + eb(eb), + ev_sig(new uv_signal_t()), + callback(callback) { + uv_signal_init(eb.get(), ev_sig); + ev_sig->data = this; + } + + SigEvent(const SigEvent &) = delete; + SigEvent(SigEvent &&other): + eb(std::move(other.eb)), + ev_sig(other.ev_sig), + callback(std::move(other.callback)) { + other.del(); + other.ev_sig = nullptr; + ev_sig->data = this; + } + + SigEvent &operator=(SigEvent &&other) { + clear(); + other.del(); + eb = std::move(other.eb); + ev_sig = other.ev_sig; + callback = std::move(other.callback); + + other.ev_sig = nullptr; + ev_sig->data = this; + return *this; + } + + ~SigEvent() { clear(); } + + void clear() { + if (ev_sig != nullptr) + { + uv_signal_stop(ev_sig); + uv_close((uv_handle_t *)ev_sig, SigEvent::_on_handle_close); + ev_sig = nullptr; + } + callback = nullptr; + } + + void set_callback(callback_t _callback) { + callback = _callback; + } + + void add(int signum) { + uv_signal_start(ev_sig, SigEvent::sig_then, signum); + } + void del() { + uv_signal_stop(ev_sig); + } + + operator bool() const { return ev_sig != nullptr; } +}; + template class ThreadNotifier { std::condition_variable cv; @@ -304,6 +378,8 @@ class ThreadCall { write(ctl_fd[1], &h, sizeof(h)); return notifier.wait(); } + + const EventContext &get_ec() const { return ec; } }; @@ -362,65 +438,6 @@ class MPSCQueueEventDriven: public MPSCQueue { } }; - - -// TODO: incorrect MPMCQueueEventDriven impl -/* -template -class MPMCQueueEventDriven: public MPMCQueue { - private: - const uint64_t dummy = 1; - std::atomic wait_sig; - std::vector, int>> evs; - - public: - MPMCQueueEventDriven(size_t capacity = 65536): - MPMCQueue(capacity), - wait_sig(true) {} - - template - void listen(const EventContext &ec, Func &&func, size_t burst_size=128) { - int fd = eventfd(0, EFD_NONBLOCK); - evs.emplace(evs.end(), std::make_pair(new Event(ec, fd, EV_READ | EV_PERSIST, - [this, func=std::forward(func), burst_size](int fd, int) { - uint64_t t; - read(fd, &t, 8); - //fprintf(stderr, "%x\n", std::this_thread::get_id()); - T elem; - size_t cnt = burst_size; - while (MPMCQueue::try_dequeue(elem)) - { - func(std::move(elem)); - if (!--cnt) - { - write(fd, &dummy, 8); - return; - } - } - wait_sig.store(true, std::memory_order_relaxed); - }), fd)); - evs.rbegin()->first->add(); - } - - ~MPMCQueueEventDriven() { - for (const auto &p: evs) - close(p.second); - } - - template - bool enqueue(U &&e) { - bool ret = MPMCQueue::enqueue(std::forward(e)); - if (wait_sig.exchange(false, std::memory_order_relaxed)) - { - SALTICIDAE_LOG_DEBUG("mpmc notify"); - for (const auto &p: evs) - write(p.second, &dummy, 8); - } - return ret; - } -}; -*/ - } #endif -- cgit v1.2.3