diff options
Diffstat (limited to 'include')
-rw-r--r-- | include/salticidae/event.h | 36 |
1 files changed, 26 insertions, 10 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 99fff2f..857518b 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -28,9 +28,7 @@ #include <unistd.h> #include <event2/event.h> #include <sys/eventfd.h> -#ifdef SALTICIDAE_DEBUG_LOG -#include <thread> -#endif +//#include <thread> #include "salticidae/queue.h" #include "salticidae/util.h" @@ -126,24 +124,34 @@ class Event { template<typename T> class MPSCQueueEventDriven: public MPMCQueue<T> { private: + const uint64_t dummy = 1; std::atomic<bool> wait_sig; int fd; Event ev; public: template<typename Func> - MPSCQueueEventDriven(const EventContext &ec, Func &&func, size_t capacity = 65536): + MPSCQueueEventDriven(const EventContext &ec, Func &&func, + size_t burst_size = 128, size_t capacity = 65536): MPMCQueue<T>(capacity), wait_sig(true), fd(eventfd(0, EFD_NONBLOCK)), ev(Event(ec, fd, EV_READ | EV_PERSIST, - [this, func=std::forward<Func>(func)](int, short) { + [this, func=std::forward<Func>(func), burst_size](int, short) { uint64_t t; read(fd, &t, 8); - SALTICIDAE_LOG_DEBUG("%x\n", std::this_thread::get_id()); + //fprintf(stderr, "%x\n", std::this_thread::get_id()); T elem; + size_t cnt = burst_size; while (MPMCQueue<T>::try_dequeue(elem)) + { func(std::move(elem)); + if (!--cnt) + { + write(fd, &dummy, 8); + return; + } + } wait_sig.store(true, std::memory_order_relaxed); })) { ev.add(); } @@ -165,6 +173,7 @@ class MPSCQueueEventDriven: public MPMCQueue<T> { template<typename T> class MPMCQueueEventDriven: public MPMCQueue<T> { private: + const uint64_t dummy = 1; std::atomic<bool> wait_sig; std::vector<std::pair<BoxObj<Event>, int>> evs; @@ -174,16 +183,24 @@ class MPMCQueueEventDriven: public MPMCQueue<T> { wait_sig(true) {} template<typename Func> - void listen(const EventContext &ec, Func &&func) { + void listen(const EventContext &ec, Func &&func, size_t burst_size=128) { int fd = eventfd(0, EFD_NONBLOCK); evs.emplace(evs.end(), std::make_pair(new Event(ec, fd, EV_READ | EV_PERSIST, - [this, func=std::forward<Func>(func)](int fd, short) { + [this, func=std::forward<Func>(func), burst_size](int fd, short) { uint64_t t; read(fd, &t, 8); - SALTICIDAE_LOG_DEBUG("%x\n", std::this_thread::get_id()); + //fprintf(stderr, "%x\n", std::this_thread::get_id()); T elem; + size_t cnt = burst_size; while (MPMCQueue<T>::try_dequeue(elem)) + { func(std::move(elem)); + if (!--cnt) + { + write(fd, &dummy, 8); + return; + } + } wait_sig.store(true, std::memory_order_relaxed); }), fd)); evs.rbegin()->first->add(); @@ -196,7 +213,6 @@ class MPMCQueueEventDriven: public MPMCQueue<T> { template<typename U> bool enqueue(U &&e) { - static const uint64_t dummy = 1; bool ret = MPMCQueue<T>::enqueue(std::forward<U>(e)); if (wait_sig.exchange(false, std::memory_order_relaxed)) { |