From 27304354844c8baf8c14ef923e0ab65bc4df2dc0 Mon Sep 17 00:00:00 2001 From: Determinant Date: Wed, 21 Nov 2018 18:15:14 -0500 Subject: impl MPMCQueueEventDriven --- include/salticidae/event.h | 51 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) (limited to 'include') diff --git a/include/salticidae/event.h b/include/salticidae/event.h index bac707c..649cfdd 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -589,6 +589,57 @@ class MPSCQueueEventDriven: public MPSCQueue { } }; +template +class MPMCQueueEventDriven: public MPMCQueue { + private: + const uint64_t dummy = 1; + std::atomic wait_sig; + int fd; + std::vector evs; + + public: + MPMCQueueEventDriven(size_t capacity = 65536): + MPMCQueue(capacity), + wait_sig(true), + fd(eventfd(0, EFD_NONBLOCK)) {} + + ~MPMCQueueEventDriven() { + evs.clear(); + close(fd); + } + + // this function is *NOT* thread-safe + template + void reg_handler(const EventContext &ec, Func &&func) { + FdEvent ev(ec, fd, [this, func=std::forward(func)](int, int) { + //fprintf(stderr, "%x\n", std::this_thread::get_id()); + uint64_t t; + if (read(fd, &t, 8) != 8) return; + // only one consumer should be here a a time + wait_sig.store(true, std::memory_order_release); + if (func(*this)) + write(fd, &dummy, 8); + }); + ev.add(FdEvent::READ); + evs.push_back(std::move(ev)); + } + + void unreg_handlers() { evs.clear(); } + + template + bool enqueue(U &&e) { + static const uint64_t dummy = 1; + bool ret = MPMCQueue::enqueue(std::forward(e)); + // memory barrier here, so any load/store in enqueue must be finialized + if (wait_sig.exchange(false, std::memory_order_acq_rel)) + { + SALTICIDAE_LOG_DEBUG("mpsc notify"); + write(fd, &dummy, 8); + } + return ret; + } +}; + } #endif -- cgit v1.2.3-70-g09d2