From b7d65ee96221e63c865b1bf8cda79b3021cba412 Mon Sep 17 00:00:00 2001 From: Determinant Date: Fri, 5 Oct 2018 10:20:11 -0400 Subject: add burst_size; consumer can proceed to its normal event loop --- include/salticidae/event.h | 36 ++++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 10 deletions(-) (limited to 'include') diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 99fff2f..857518b 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -28,9 +28,7 @@ #include #include #include -#ifdef SALTICIDAE_DEBUG_LOG -#include -#endif +//#include #include "salticidae/queue.h" #include "salticidae/util.h" @@ -126,24 +124,34 @@ class Event { template class MPSCQueueEventDriven: public MPMCQueue { private: + const uint64_t dummy = 1; std::atomic wait_sig; int fd; Event ev; public: template - MPSCQueueEventDriven(const EventContext &ec, Func &&func, size_t capacity = 65536): + MPSCQueueEventDriven(const EventContext &ec, Func &&func, + size_t burst_size = 128, size_t capacity = 65536): MPMCQueue(capacity), wait_sig(true), fd(eventfd(0, EFD_NONBLOCK)), ev(Event(ec, fd, EV_READ | EV_PERSIST, - [this, func=std::forward(func)](int, short) { + [this, func=std::forward(func), burst_size](int, short) { uint64_t t; read(fd, &t, 8); - SALTICIDAE_LOG_DEBUG("%x\n", std::this_thread::get_id()); + //fprintf(stderr, "%x\n", std::this_thread::get_id()); T elem; + size_t cnt = burst_size; while (MPMCQueue::try_dequeue(elem)) + { func(std::move(elem)); + if (!--cnt) + { + write(fd, &dummy, 8); + return; + } + } wait_sig.store(true, std::memory_order_relaxed); })) { ev.add(); } @@ -165,6 +173,7 @@ class MPSCQueueEventDriven: public MPMCQueue { template class MPMCQueueEventDriven: public MPMCQueue { private: + const uint64_t dummy = 1; std::atomic wait_sig; std::vector, int>> evs; @@ -174,16 +183,24 @@ class MPMCQueueEventDriven: public MPMCQueue { wait_sig(true) {} template - void listen(const EventContext &ec, Func &&func) { + void listen(const EventContext &ec, Func &&func, size_t burst_size=128) { int fd = eventfd(0, EFD_NONBLOCK); evs.emplace(evs.end(), std::make_pair(new Event(ec, fd, EV_READ | EV_PERSIST, - [this, func=std::forward(func)](int fd, short) { + [this, func=std::forward(func), burst_size](int fd, short) { uint64_t t; read(fd, &t, 8); - SALTICIDAE_LOG_DEBUG("%x\n", std::this_thread::get_id()); + //fprintf(stderr, "%x\n", std::this_thread::get_id()); T elem; + size_t cnt = burst_size; while (MPMCQueue::try_dequeue(elem)) + { func(std::move(elem)); + if (!--cnt) + { + write(fd, &dummy, 8); + return; + } + } wait_sig.store(true, std::memory_order_relaxed); }), fd)); evs.rbegin()->first->add(); @@ -196,7 +213,6 @@ class MPMCQueueEventDriven: public MPMCQueue { template bool enqueue(U &&e) { - static const uint64_t dummy = 1; bool ret = MPMCQueue::enqueue(std::forward(e)); if (wait_sig.exchange(false, std::memory_order_relaxed)) { -- cgit v1.2.3-70-g09d2