From dd09443b0b3c0b5d1a8c034644d1065dd25bf5a9 Mon Sep 17 00:00:00 2001 From: Determinant Date: Sun, 11 Nov 2018 23:02:59 -0500 Subject: start debugging multiloops design --- include/salticidae/event.h | 60 ++++++++++++++++++++++++---------------------- 1 file changed, 32 insertions(+), 28 deletions(-) (limited to 'include/salticidae/event.h') diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 857518b..ddb93fc 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -53,8 +53,8 @@ class EventContext: public _event_context_ot { EventContext(EventContext &&) = default; EventContext &operator=(const EventContext &) = default; EventContext &operator=(EventContext &&) = default; - void dispatch() { event_base_dispatch(get()); } - void stop() { event_base_loopbreak(get()); } + void dispatch() const { event_base_dispatch(get()); } + void stop() const { event_base_loopbreak(get()); } }; class Event { @@ -122,7 +122,7 @@ class Event { }; template -class MPSCQueueEventDriven: public MPMCQueue { +class MPSCQueueEventDriven: public MPSCQueue { private: const uint64_t dummy = 1; std::atomic wait_sig; @@ -130,38 +130,39 @@ class MPSCQueueEventDriven: public MPMCQueue { Event ev; public: - template - MPSCQueueEventDriven(const EventContext &ec, Func &&func, - size_t burst_size = 128, size_t capacity = 65536): - MPMCQueue(capacity), + MPSCQueueEventDriven(size_t capacity = 65536): + MPSCQueue(capacity), wait_sig(true), - fd(eventfd(0, EFD_NONBLOCK)), - ev(Event(ec, fd, EV_READ | EV_PERSIST, - [this, func=std::forward(func), burst_size](int, short) { - uint64_t t; - read(fd, &t, 8); - //fprintf(stderr, "%x\n", std::this_thread::get_id()); - T elem; - size_t cnt = burst_size; - while (MPMCQueue::try_dequeue(elem)) - { - func(std::move(elem)); - if (!--cnt) - { - write(fd, &dummy, 8); - return; - } - } - wait_sig.store(true, std::memory_order_relaxed); - })) { ev.add(); } + fd(eventfd(0, EFD_NONBLOCK)) {} ~MPSCQueueEventDriven() { close(fd); } + template + void reg_handler(const EventContext &ec, Func &&func) { + ev = Event(ec, fd, EV_READ | EV_PERSIST, + [this, func=std::forward(func)](int, short) { + //fprintf(stderr, "%x\n", std::this_thread::get_id()); + uint64_t t; + read(fd, &t, 8); + // the only undesirable case is there are some new items + // enqueued before recovering wait_sig to true, so the consumer + // won't be notified. In this case, no enqueuing thread will + // get to write(fd). Then store(true) must happen after all exchange(false), + // since all enqueue operations are finalized, the dequeue should be able + // to see those enqueued values in func() + wait_sig.store(true, std::memory_order_release); + if (func(*this)) + write(fd, &dummy, 8); + }); + ev.add(); + } + template bool enqueue(U &&e) { static const uint64_t dummy = 1; - bool ret = MPMCQueue::enqueue(std::forward(e)); - if (wait_sig.exchange(false, std::memory_order_relaxed)) + bool ret = MPSCQueue::enqueue(std::forward(e)); + // memory barrier here, so any load/store in enqueue must be finialized + if (wait_sig.exchange(false, std::memory_order_acq_rel)) { SALTICIDAE_LOG_DEBUG("mpsc notify"); write(fd, &dummy, 8); @@ -170,6 +171,8 @@ class MPSCQueueEventDriven: public MPMCQueue { } }; +// TODO: incorrect MPMCQueueEventDriven impl +/* template class MPMCQueueEventDriven: public MPMCQueue { private: @@ -223,6 +226,7 @@ class MPMCQueueEventDriven: public MPMCQueue { return ret; } }; +*/ } -- cgit v1.2.3