aboutsummaryrefslogtreecommitdiff
path: root/include
diff options
context:
space:
mode:
Diffstat (limited to 'include')
-rw-r--r--include/salticidae/event.h51
1 files changed, 51 insertions, 0 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index bac707c..649cfdd 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -589,6 +589,57 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
}
};
+template<typename T>
+class MPMCQueueEventDriven: public MPMCQueue<T> {
+ private:
+ const uint64_t dummy = 1;
+ std::atomic<bool> wait_sig;
+ int fd;
+ std::vector<FdEvent> evs;
+
+ public:
+ MPMCQueueEventDriven(size_t capacity = 65536):
+ MPMCQueue<T>(capacity),
+ wait_sig(true),
+ fd(eventfd(0, EFD_NONBLOCK)) {}
+
+ ~MPMCQueueEventDriven() {
+ evs.clear();
+ close(fd);
+ }
+
+ // this function is *NOT* thread-safe
+ template<typename Func>
+ void reg_handler(const EventContext &ec, Func &&func) {
+ FdEvent ev(ec, fd, [this, func=std::forward<Func>(func)](int, int) {
+ //fprintf(stderr, "%x\n", std::this_thread::get_id());
+ uint64_t t;
+ if (read(fd, &t, 8) != 8) return;
+ // only one consumer should be here a a time
+ wait_sig.store(true, std::memory_order_release);
+ if (func(*this))
+ write(fd, &dummy, 8);
+ });
+ ev.add(FdEvent::READ);
+ evs.push_back(std::move(ev));
+ }
+
+ void unreg_handlers() { evs.clear(); }
+
+ template<typename U>
+ bool enqueue(U &&e) {
+ static const uint64_t dummy = 1;
+ bool ret = MPMCQueue<T>::enqueue(std::forward<U>(e));
+ // memory barrier here, so any load/store in enqueue must be finialized
+ if (wait_sig.exchange(false, std::memory_order_acq_rel))
+ {
+ SALTICIDAE_LOG_DEBUG("mpsc notify");
+ write(fd, &dummy, 8);
+ }
+ return ret;
+ }
+};
+
}
#endif