diff options
Diffstat (limited to 'include')
-rw-r--r-- | include/salticidae/event.h | 51 |
1 files changed, 51 insertions, 0 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h index bac707c..649cfdd 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -589,6 +589,57 @@ class MPSCQueueEventDriven: public MPSCQueue<T> { } }; +template<typename T> +class MPMCQueueEventDriven: public MPMCQueue<T> { + private: + const uint64_t dummy = 1; + std::atomic<bool> wait_sig; + int fd; + std::vector<FdEvent> evs; + + public: + MPMCQueueEventDriven(size_t capacity = 65536): + MPMCQueue<T>(capacity), + wait_sig(true), + fd(eventfd(0, EFD_NONBLOCK)) {} + + ~MPMCQueueEventDriven() { + evs.clear(); + close(fd); + } + + // this function is *NOT* thread-safe + template<typename Func> + void reg_handler(const EventContext &ec, Func &&func) { + FdEvent ev(ec, fd, [this, func=std::forward<Func>(func)](int, int) { + //fprintf(stderr, "%x\n", std::this_thread::get_id()); + uint64_t t; + if (read(fd, &t, 8) != 8) return; + // only one consumer should be here a a time + wait_sig.store(true, std::memory_order_release); + if (func(*this)) + write(fd, &dummy, 8); + }); + ev.add(FdEvent::READ); + evs.push_back(std::move(ev)); + } + + void unreg_handlers() { evs.clear(); } + + template<typename U> + bool enqueue(U &&e) { + static const uint64_t dummy = 1; + bool ret = MPMCQueue<T>::enqueue(std::forward<U>(e)); + // memory barrier here, so any load/store in enqueue must be finialized + if (wait_sig.exchange(false, std::memory_order_acq_rel)) + { + SALTICIDAE_LOG_DEBUG("mpsc notify"); + write(fd, &dummy, 8); + } + return ret; + } +}; + } #endif |