diff options
author | Determinant <ted.sybil@gmail.com> | 2018-10-04 18:30:23 -0400 |
---|---|---|
committer | Determinant <ted.sybil@gmail.com> | 2018-10-04 18:30:23 -0400 |
commit | 2b584edbddc782b6436a8e28d4f8b85a11f528a6 (patch) | |
tree | 364aa1067c95976c390b61a260ecbb39d4ff6555 /include/salticidae/event.h | |
parent | 0eea9ddc7cfb2820295dd87aed3dc911a100ecde (diff) |
add lock-free queue impl and event-driven queue
Diffstat (limited to 'include/salticidae/event.h')
-rw-r--r-- | include/salticidae/event.h | 93 |
1 files changed, 93 insertions, 0 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 5b41d9e..99fff2f 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -25,8 +25,14 @@ #ifndef _SALTICIDAE_EVENT_H #define _SALTICIDAE_EVENT_H +#include <unistd.h> #include <event2/event.h> +#include <sys/eventfd.h> +#ifdef SALTICIDAE_DEBUG_LOG +#include <thread> +#endif +#include "salticidae/queue.h" #include "salticidae/util.h" #include "salticidae/ref.h" @@ -50,6 +56,7 @@ class EventContext: public _event_context_ot { EventContext &operator=(const EventContext &) = default; EventContext &operator=(EventContext &&) = default; void dispatch() { event_base_dispatch(get()); } + void stop() { event_base_loopbreak(get()); } }; class Event { @@ -76,6 +83,7 @@ class Event { ev(event_new(eb.get(), fd, events, Event::_then, this)), callback(callback) {} + Event(const Event &) = delete; Event(Event &&other): eb(std::move(other.eb)), fd(other.fd), events(other.events), callback(std::move(other.callback)) { @@ -115,6 +123,91 @@ class Event { operator bool() const { return ev != nullptr; } }; +template<typename T> +class MPSCQueueEventDriven: public MPMCQueue<T> { + private: + std::atomic<bool> wait_sig; + int fd; + Event ev; + + public: + template<typename Func> + MPSCQueueEventDriven(const EventContext &ec, Func &&func, size_t capacity = 65536): + MPMCQueue<T>(capacity), + wait_sig(true), + fd(eventfd(0, EFD_NONBLOCK)), + ev(Event(ec, fd, EV_READ | EV_PERSIST, + [this, func=std::forward<Func>(func)](int, short) { + uint64_t t; + read(fd, &t, 8); + SALTICIDAE_LOG_DEBUG("%x\n", std::this_thread::get_id()); + T elem; + while (MPMCQueue<T>::try_dequeue(elem)) + func(std::move(elem)); + wait_sig.store(true, std::memory_order_relaxed); + })) { ev.add(); } + + ~MPSCQueueEventDriven() { close(fd); } + + template<typename U> + bool enqueue(U &&e) { + static const uint64_t dummy = 1; + bool ret = MPMCQueue<T>::enqueue(std::forward<U>(e)); + if (wait_sig.exchange(false, std::memory_order_relaxed)) + { + SALTICIDAE_LOG_DEBUG("mpsc notify"); + write(fd, &dummy, 8); + } + return ret; + } +}; + +template<typename T> +class MPMCQueueEventDriven: public MPMCQueue<T> { + private: + std::atomic<bool> wait_sig; + std::vector<std::pair<BoxObj<Event>, int>> evs; + + public: + MPMCQueueEventDriven(size_t capacity = 65536): + MPMCQueue<T>(capacity), + wait_sig(true) {} + + template<typename Func> + void listen(const EventContext &ec, Func &&func) { + int fd = eventfd(0, EFD_NONBLOCK); + evs.emplace(evs.end(), std::make_pair(new Event(ec, fd, EV_READ | EV_PERSIST, + [this, func=std::forward<Func>(func)](int fd, short) { + uint64_t t; + read(fd, &t, 8); + SALTICIDAE_LOG_DEBUG("%x\n", std::this_thread::get_id()); + T elem; + while (MPMCQueue<T>::try_dequeue(elem)) + func(std::move(elem)); + wait_sig.store(true, std::memory_order_relaxed); + }), fd)); + evs.rbegin()->first->add(); + } + + ~MPMCQueueEventDriven() { + for (const auto &p: evs) + close(p.second); + } + + template<typename U> + bool enqueue(U &&e) { + static const uint64_t dummy = 1; + bool ret = MPMCQueue<T>::enqueue(std::forward<U>(e)); + if (wait_sig.exchange(false, std::memory_order_relaxed)) + { + SALTICIDAE_LOG_DEBUG("mpmc notify"); + for (const auto &p: evs) + write(p.second, &dummy, 8); + } + return ret; + } +}; + } #endif |