aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/event.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/event.h')
-rw-r--r--include/salticidae/event.h60
1 files changed, 32 insertions, 28 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index 857518b..ddb93fc 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -53,8 +53,8 @@ class EventContext: public _event_context_ot {
EventContext(EventContext &&) = default;
EventContext &operator=(const EventContext &) = default;
EventContext &operator=(EventContext &&) = default;
- void dispatch() { event_base_dispatch(get()); }
- void stop() { event_base_loopbreak(get()); }
+ void dispatch() const { event_base_dispatch(get()); }
+ void stop() const { event_base_loopbreak(get()); }
};
class Event {
@@ -122,7 +122,7 @@ class Event {
};
template<typename T>
-class MPSCQueueEventDriven: public MPMCQueue<T> {
+class MPSCQueueEventDriven: public MPSCQueue<T> {
private:
const uint64_t dummy = 1;
std::atomic<bool> wait_sig;
@@ -130,38 +130,39 @@ class MPSCQueueEventDriven: public MPMCQueue<T> {
Event ev;
public:
- template<typename Func>
- MPSCQueueEventDriven(const EventContext &ec, Func &&func,
- size_t burst_size = 128, size_t capacity = 65536):
- MPMCQueue<T>(capacity),
+ MPSCQueueEventDriven(size_t capacity = 65536):
+ MPSCQueue<T>(capacity),
wait_sig(true),
- fd(eventfd(0, EFD_NONBLOCK)),
- ev(Event(ec, fd, EV_READ | EV_PERSIST,
- [this, func=std::forward<Func>(func), burst_size](int, short) {
- uint64_t t;
- read(fd, &t, 8);
- //fprintf(stderr, "%x\n", std::this_thread::get_id());
- T elem;
- size_t cnt = burst_size;
- while (MPMCQueue<T>::try_dequeue(elem))
- {
- func(std::move(elem));
- if (!--cnt)
- {
- write(fd, &dummy, 8);
- return;
- }
- }
- wait_sig.store(true, std::memory_order_relaxed);
- })) { ev.add(); }
+ fd(eventfd(0, EFD_NONBLOCK)) {}
~MPSCQueueEventDriven() { close(fd); }
+ template<typename Func>
+ void reg_handler(const EventContext &ec, Func &&func) {
+ ev = Event(ec, fd, EV_READ | EV_PERSIST,
+ [this, func=std::forward<Func>(func)](int, short) {
+ //fprintf(stderr, "%x\n", std::this_thread::get_id());
+ uint64_t t;
+ read(fd, &t, 8);
+ // the only undesirable case is there are some new items
+ // enqueued before recovering wait_sig to true, so the consumer
+ // won't be notified. In this case, no enqueuing thread will
+ // get to write(fd). Then store(true) must happen after all exchange(false),
+ // since all enqueue operations are finalized, the dequeue should be able
+ // to see those enqueued values in func()
+ wait_sig.store(true, std::memory_order_release);
+ if (func(*this))
+ write(fd, &dummy, 8);
+ });
+ ev.add();
+ }
+
template<typename U>
bool enqueue(U &&e) {
static const uint64_t dummy = 1;
- bool ret = MPMCQueue<T>::enqueue(std::forward<U>(e));
- if (wait_sig.exchange(false, std::memory_order_relaxed))
+ bool ret = MPSCQueue<T>::enqueue(std::forward<U>(e));
+ // memory barrier here, so any load/store in enqueue must be finialized
+ if (wait_sig.exchange(false, std::memory_order_acq_rel))
{
SALTICIDAE_LOG_DEBUG("mpsc notify");
write(fd, &dummy, 8);
@@ -170,6 +171,8 @@ class MPSCQueueEventDriven: public MPMCQueue<T> {
}
};
+// TODO: incorrect MPMCQueueEventDriven impl
+/*
template<typename T>
class MPMCQueueEventDriven: public MPMCQueue<T> {
private:
@@ -223,6 +226,7 @@ class MPMCQueueEventDriven: public MPMCQueue<T> {
return ret;
}
};
+*/
}