aboutsummaryrefslogtreecommitdiff
path: root/include
diff options
context:
space:
mode:
Diffstat (limited to 'include')
-rw-r--r--include/salticidae/event.h37
1 files changed, 8 insertions, 29 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index fe94c0a..52cb058 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -577,7 +577,10 @@ class NotifyFd {
throw SalticidaeError(SALTI_ERROR_FD);
}
bool reset() {
- return read(fds[0], dummy, 8) == 8;
+ // clear the pipe buffer (not atomically)
+ while (read(fds[0], dummy, 8) > 0);
+ // may not work for MPMC, but salticidae currently doesn't use that
+ return true;
}
void notify() {
write(fds[1], dummy, 8);
@@ -597,28 +600,18 @@ template<typename T>
class MPSCQueueEventDriven: public MPSCQueue<T> {
private:
std::atomic<bool> wait_sig;
- //int fd;
NotifyFd nfd;
FdEvent ev;
public:
MPSCQueueEventDriven():
- wait_sig(true) {
- //fd(eventfd(0, EFD_NONBLOCK)) {
- //if (fd == -1) throw SalticidaeError(SALTI_ERROR_FD);
- }
- ~MPSCQueueEventDriven() {
- //close(fd);
- unreg_handler();
- }
+ wait_sig(true) {}
+ ~MPSCQueueEventDriven() { unreg_handler(); }
template<typename Func>
void reg_handler(const EventContext &ec, Func &&func) {
ev = FdEvent(ec, nfd.read_fd(),
[this, func=std::forward<Func>(func)](int, int) {
- //fprintf(stderr, "%x\n", std::this_thread::get_id());
- //uint64_t t;
- //read(fd, &t, 8);
nfd.reset();
// the only undesirable case is there are some new items
// enqueued before recovering wait_sig to true, so the consumer
@@ -629,7 +622,6 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
wait_sig.exchange(true, std::memory_order_acq_rel);
if (func(*this))
nfd.notify();
- //write(fd, &dummy, 8);
});
ev.add(FdEvent::READ);
}
@@ -644,7 +636,6 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
if (wait_sig.exchange(false, std::memory_order_acq_rel))
{
//SALTICIDAE_LOG_DEBUG("mpsc notify");
- //write(fd, &dummy, 8);
nfd.notify();
}
return true;
@@ -658,33 +649,22 @@ template<typename T>
class MPMCQueueEventDriven: public MPMCQueue<T> {
private:
std::atomic<bool> wait_sig;
- //int fd;
NotifyFd nfd;
std::vector<FdEvent> evs;
public:
MPMCQueueEventDriven():
- wait_sig(true) {
- //fd(eventfd(0, EFD_NONBLOCK)) {
- //if (fd == -1) throw SalticidaeError(SALTI_ERROR_FD);
- }
- ~MPMCQueueEventDriven() {
- //close(fd);
- unreg_handlers();
- }
+ wait_sig(true) {}
+ ~MPMCQueueEventDriven() { unreg_handlers(); }
// this function is *NOT* thread-safe
template<typename Func>
void reg_handler(const EventContext &ec, Func &&func) {
FdEvent ev(ec, nfd.read_fd(), [this, func=std::forward<Func>(func)](int, int) {
- //fprintf(stderr, "%x\n", std::this_thread::get_id());
- uint64_t t;
- //if (read(fd, &t, 8) != 8) return;
if (!nfd.reset()) return;
// only one consumer should be here a a time
wait_sig.exchange(true, std::memory_order_acq_rel);
if (func(*this))
- //write(fd, &dummy, 8);
nfd.notify();
});
ev.add(FdEvent::READ);
@@ -701,7 +681,6 @@ class MPMCQueueEventDriven: public MPMCQueue<T> {
if (wait_sig.exchange(false, std::memory_order_acq_rel))
{
//SALTICIDAE_LOG_DEBUG("mpmc notify");
- //write(fd, &dummy, 8);
nfd.notify();
}
return true;