aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2018-10-05 10:20:11 -0400
committerDeterminant <[email protected]>2018-10-05 10:20:11 -0400
commitb7d65ee96221e63c865b1bf8cda79b3021cba412 (patch)
tree4eb90f463c634934e916060c0ceb86d8e72cb297
parent2b584edbddc782b6436a8e28d4f8b85a11f528a6 (diff)
add burst_size; consumer can proceed to its normal event loop
-rw-r--r--include/salticidae/event.h36
1 files changed, 26 insertions, 10 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index 99fff2f..857518b 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -28,9 +28,7 @@
#include <unistd.h>
#include <event2/event.h>
#include <sys/eventfd.h>
-#ifdef SALTICIDAE_DEBUG_LOG
-#include <thread>
-#endif
+//#include <thread>
#include "salticidae/queue.h"
#include "salticidae/util.h"
@@ -126,24 +124,34 @@ class Event {
template<typename T>
class MPSCQueueEventDriven: public MPMCQueue<T> {
private:
+ const uint64_t dummy = 1;
std::atomic<bool> wait_sig;
int fd;
Event ev;
public:
template<typename Func>
- MPSCQueueEventDriven(const EventContext &ec, Func &&func, size_t capacity = 65536):
+ MPSCQueueEventDriven(const EventContext &ec, Func &&func,
+ size_t burst_size = 128, size_t capacity = 65536):
MPMCQueue<T>(capacity),
wait_sig(true),
fd(eventfd(0, EFD_NONBLOCK)),
ev(Event(ec, fd, EV_READ | EV_PERSIST,
- [this, func=std::forward<Func>(func)](int, short) {
+ [this, func=std::forward<Func>(func), burst_size](int, short) {
uint64_t t;
read(fd, &t, 8);
- SALTICIDAE_LOG_DEBUG("%x\n", std::this_thread::get_id());
+ //fprintf(stderr, "%x\n", std::this_thread::get_id());
T elem;
+ size_t cnt = burst_size;
while (MPMCQueue<T>::try_dequeue(elem))
+ {
func(std::move(elem));
+ if (!--cnt)
+ {
+ write(fd, &dummy, 8);
+ return;
+ }
+ }
wait_sig.store(true, std::memory_order_relaxed);
})) { ev.add(); }
@@ -165,6 +173,7 @@ class MPSCQueueEventDriven: public MPMCQueue<T> {
template<typename T>
class MPMCQueueEventDriven: public MPMCQueue<T> {
private:
+ const uint64_t dummy = 1;
std::atomic<bool> wait_sig;
std::vector<std::pair<BoxObj<Event>, int>> evs;
@@ -174,16 +183,24 @@ class MPMCQueueEventDriven: public MPMCQueue<T> {
wait_sig(true) {}
template<typename Func>
- void listen(const EventContext &ec, Func &&func) {
+ void listen(const EventContext &ec, Func &&func, size_t burst_size=128) {
int fd = eventfd(0, EFD_NONBLOCK);
evs.emplace(evs.end(), std::make_pair(new Event(ec, fd, EV_READ | EV_PERSIST,
- [this, func=std::forward<Func>(func)](int fd, short) {
+ [this, func=std::forward<Func>(func), burst_size](int fd, short) {
uint64_t t;
read(fd, &t, 8);
- SALTICIDAE_LOG_DEBUG("%x\n", std::this_thread::get_id());
+ //fprintf(stderr, "%x\n", std::this_thread::get_id());
T elem;
+ size_t cnt = burst_size;
while (MPMCQueue<T>::try_dequeue(elem))
+ {
func(std::move(elem));
+ if (!--cnt)
+ {
+ write(fd, &dummy, 8);
+ return;
+ }
+ }
wait_sig.store(true, std::memory_order_relaxed);
}), fd));
evs.rbegin()->first->add();
@@ -196,7 +213,6 @@ class MPMCQueueEventDriven: public MPMCQueue<T> {
template<typename U>
bool enqueue(U &&e) {
- static const uint64_t dummy = 1;
bool ret = MPMCQueue<T>::enqueue(std::forward<U>(e));
if (wait_sig.exchange(false, std::memory_order_relaxed))
{