From 2b584edbddc782b6436a8e28d4f8b85a11f528a6 Mon Sep 17 00:00:00 2001 From: Determinant Date: Thu, 4 Oct 2018 18:30:23 -0400 Subject: add lock-free queue impl and event-driven queue --- include/salticidae/event.h | 93 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) (limited to 'include/salticidae/event.h') diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 5b41d9e..99fff2f 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -25,8 +25,14 @@ #ifndef _SALTICIDAE_EVENT_H #define _SALTICIDAE_EVENT_H +#include #include +#include +#ifdef SALTICIDAE_DEBUG_LOG +#include +#endif +#include "salticidae/queue.h" #include "salticidae/util.h" #include "salticidae/ref.h" @@ -50,6 +56,7 @@ class EventContext: public _event_context_ot { EventContext &operator=(const EventContext &) = default; EventContext &operator=(EventContext &&) = default; void dispatch() { event_base_dispatch(get()); } + void stop() { event_base_loopbreak(get()); } }; class Event { @@ -76,6 +83,7 @@ class Event { ev(event_new(eb.get(), fd, events, Event::_then, this)), callback(callback) {} + Event(const Event &) = delete; Event(Event &&other): eb(std::move(other.eb)), fd(other.fd), events(other.events), callback(std::move(other.callback)) { @@ -115,6 +123,91 @@ class Event { operator bool() const { return ev != nullptr; } }; +template +class MPSCQueueEventDriven: public MPMCQueue { + private: + std::atomic wait_sig; + int fd; + Event ev; + + public: + template + MPSCQueueEventDriven(const EventContext &ec, Func &&func, size_t capacity = 65536): + MPMCQueue(capacity), + wait_sig(true), + fd(eventfd(0, EFD_NONBLOCK)), + ev(Event(ec, fd, EV_READ | EV_PERSIST, + [this, func=std::forward(func)](int, short) { + uint64_t t; + read(fd, &t, 8); + SALTICIDAE_LOG_DEBUG("%x\n", std::this_thread::get_id()); + T elem; + while (MPMCQueue::try_dequeue(elem)) + func(std::move(elem)); + wait_sig.store(true, std::memory_order_relaxed); + })) { ev.add(); } + + ~MPSCQueueEventDriven() { close(fd); } + + template + bool enqueue(U &&e) { + static const uint64_t dummy = 1; + bool ret = MPMCQueue::enqueue(std::forward(e)); + if (wait_sig.exchange(false, std::memory_order_relaxed)) + { + SALTICIDAE_LOG_DEBUG("mpsc notify"); + write(fd, &dummy, 8); + } + return ret; + } +}; + +template +class MPMCQueueEventDriven: public MPMCQueue { + private: + std::atomic wait_sig; + std::vector, int>> evs; + + public: + MPMCQueueEventDriven(size_t capacity = 65536): + MPMCQueue(capacity), + wait_sig(true) {} + + template + void listen(const EventContext &ec, Func &&func) { + int fd = eventfd(0, EFD_NONBLOCK); + evs.emplace(evs.end(), std::make_pair(new Event(ec, fd, EV_READ | EV_PERSIST, + [this, func=std::forward(func)](int fd, short) { + uint64_t t; + read(fd, &t, 8); + SALTICIDAE_LOG_DEBUG("%x\n", std::this_thread::get_id()); + T elem; + while (MPMCQueue::try_dequeue(elem)) + func(std::move(elem)); + wait_sig.store(true, std::memory_order_relaxed); + }), fd)); + evs.rbegin()->first->add(); + } + + ~MPMCQueueEventDriven() { + for (const auto &p: evs) + close(p.second); + } + + template + bool enqueue(U &&e) { + static const uint64_t dummy = 1; + bool ret = MPMCQueue::enqueue(std::forward(e)); + if (wait_sig.exchange(false, std::memory_order_relaxed)) + { + SALTICIDAE_LOG_DEBUG("mpmc notify"); + for (const auto &p: evs) + write(p.second, &dummy, 8); + } + return ret; + } +}; + } #endif -- cgit v1.2.3