aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/event.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/event.h')
-rw-r--r--include/salticidae/event.h93
1 files changed, 93 insertions, 0 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index 5b41d9e..99fff2f 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -25,8 +25,14 @@
#ifndef _SALTICIDAE_EVENT_H
#define _SALTICIDAE_EVENT_H
+#include <unistd.h>
#include <event2/event.h>
+#include <sys/eventfd.h>
+#ifdef SALTICIDAE_DEBUG_LOG
+#include <thread>
+#endif
+#include "salticidae/queue.h"
#include "salticidae/util.h"
#include "salticidae/ref.h"
@@ -50,6 +56,7 @@ class EventContext: public _event_context_ot {
EventContext &operator=(const EventContext &) = default;
EventContext &operator=(EventContext &&) = default;
void dispatch() { event_base_dispatch(get()); }
+ void stop() { event_base_loopbreak(get()); }
};
class Event {
@@ -76,6 +83,7 @@ class Event {
ev(event_new(eb.get(), fd, events, Event::_then, this)),
callback(callback) {}
+ Event(const Event &) = delete;
Event(Event &&other):
eb(std::move(other.eb)), fd(other.fd), events(other.events),
callback(std::move(other.callback)) {
@@ -115,6 +123,91 @@ class Event {
operator bool() const { return ev != nullptr; }
};
+template<typename T>
+class MPSCQueueEventDriven: public MPMCQueue<T> {
+ private:
+ std::atomic<bool> wait_sig;
+ int fd;
+ Event ev;
+
+ public:
+ template<typename Func>
+ MPSCQueueEventDriven(const EventContext &ec, Func &&func, size_t capacity = 65536):
+ MPMCQueue<T>(capacity),
+ wait_sig(true),
+ fd(eventfd(0, EFD_NONBLOCK)),
+ ev(Event(ec, fd, EV_READ | EV_PERSIST,
+ [this, func=std::forward<Func>(func)](int, short) {
+ uint64_t t;
+ read(fd, &t, 8);
+ SALTICIDAE_LOG_DEBUG("%x\n", std::this_thread::get_id());
+ T elem;
+ while (MPMCQueue<T>::try_dequeue(elem))
+ func(std::move(elem));
+ wait_sig.store(true, std::memory_order_relaxed);
+ })) { ev.add(); }
+
+ ~MPSCQueueEventDriven() { close(fd); }
+
+ template<typename U>
+ bool enqueue(U &&e) {
+ static const uint64_t dummy = 1;
+ bool ret = MPMCQueue<T>::enqueue(std::forward<U>(e));
+ if (wait_sig.exchange(false, std::memory_order_relaxed))
+ {
+ SALTICIDAE_LOG_DEBUG("mpsc notify");
+ write(fd, &dummy, 8);
+ }
+ return ret;
+ }
+};
+
+template<typename T>
+class MPMCQueueEventDriven: public MPMCQueue<T> {
+ private:
+ std::atomic<bool> wait_sig;
+ std::vector<std::pair<BoxObj<Event>, int>> evs;
+
+ public:
+ MPMCQueueEventDriven(size_t capacity = 65536):
+ MPMCQueue<T>(capacity),
+ wait_sig(true) {}
+
+ template<typename Func>
+ void listen(const EventContext &ec, Func &&func) {
+ int fd = eventfd(0, EFD_NONBLOCK);
+ evs.emplace(evs.end(), std::make_pair(new Event(ec, fd, EV_READ | EV_PERSIST,
+ [this, func=std::forward<Func>(func)](int fd, short) {
+ uint64_t t;
+ read(fd, &t, 8);
+ SALTICIDAE_LOG_DEBUG("%x\n", std::this_thread::get_id());
+ T elem;
+ while (MPMCQueue<T>::try_dequeue(elem))
+ func(std::move(elem));
+ wait_sig.store(true, std::memory_order_relaxed);
+ }), fd));
+ evs.rbegin()->first->add();
+ }
+
+ ~MPMCQueueEventDriven() {
+ for (const auto &p: evs)
+ close(p.second);
+ }
+
+ template<typename U>
+ bool enqueue(U &&e) {
+ static const uint64_t dummy = 1;
+ bool ret = MPMCQueue<T>::enqueue(std::forward<U>(e));
+ if (wait_sig.exchange(false, std::memory_order_relaxed))
+ {
+ SALTICIDAE_LOG_DEBUG("mpmc notify");
+ for (const auto &p: evs)
+ write(p.second, &dummy, 8);
+ }
+ return ret;
+ }
+};
+
}
#endif