aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/event.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/event.h')
-rw-r--r--include/salticidae/event.h108
1 files changed, 84 insertions, 24 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index 2f22e1a..fe94c0a 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -29,7 +29,6 @@
#include <condition_variable>
#include <unistd.h>
#include <uv.h>
-#include <sys/eventfd.h>
#include "salticidae/type.h"
#include "salticidae/queue.h"
@@ -544,29 +543,83 @@ class ThreadNotifier {
}
};
+#if defined(__linux__)
+#include <sys/eventfd.h>
+
+class NotifyFd {
+ int fd;
+ static const uint64_t dummy;
+ public:
+ NotifyFd(): fd(eventfd(0, EFD_NONBLOCK)) {
+ if (fd < 0) throw SalticidaeError(SALTI_ERROR_FD);
+ }
+ bool reset() {
+ uint64_t _;
+ return read(fd, &_, 8) == 8;
+ }
+ void notify() { write(fd, &dummy, 8); }
+ int read_fd() { return fd; }
+ ~NotifyFd() { close(fd); }
+};
+
+#elif defined(__APPLE__)
+// NOTE: using kqueue/kevent with EVFILT_USER is optimal, but libuv doesn't
+// seem to offer such interface for such user level kevent (and its
+// identifier). Thus, we downgrade to pipe-based solution on OSX/BSD system.
+
+class NotifyFd {
+ int fds[0];
+ uint8_t dummy[8];
+ public:
+ NotifyFd() {
+ if (pipe(fds) < 0 ||
+ fcntl(fds[0], F_SETFL, O_NONBLOCK))
+ throw SalticidaeError(SALTI_ERROR_FD);
+ }
+ bool reset() {
+ return read(fds[0], dummy, 8) == 8;
+ }
+ void notify() {
+ write(fds[1], dummy, 8);
+ }
+ int read_fd() { return fds[0]; }
+ ~NotifyFd() {
+ close(fds[0]);
+ close(fds[1]);
+ }
+};
+
+#else
+#warning "platform not supported!"
+#endif
+
template<typename T>
class MPSCQueueEventDriven: public MPSCQueue<T> {
private:
- const uint64_t dummy = 1;
std::atomic<bool> wait_sig;
- int fd;
+ //int fd;
+ NotifyFd nfd;
FdEvent ev;
public:
MPSCQueueEventDriven():
- wait_sig(true),
- fd(eventfd(0, EFD_NONBLOCK)) {
- if (fd == -1) throw SalticidaeError(SALTI_ERROR_FD);
+ wait_sig(true) {
+ //fd(eventfd(0, EFD_NONBLOCK)) {
+ //if (fd == -1) throw SalticidaeError(SALTI_ERROR_FD);
+ }
+ ~MPSCQueueEventDriven() {
+ //close(fd);
+ unreg_handler();
}
- ~MPSCQueueEventDriven() { close(fd); unreg_handler(); }
template<typename Func>
void reg_handler(const EventContext &ec, Func &&func) {
- ev = FdEvent(ec, fd,
+ ev = FdEvent(ec, nfd.read_fd(),
[this, func=std::forward<Func>(func)](int, int) {
//fprintf(stderr, "%x\n", std::this_thread::get_id());
- uint64_t t;
- read(fd, &t, 8);
+ //uint64_t t;
+ //read(fd, &t, 8);
+ nfd.reset();
// the only undesirable case is there are some new items
// enqueued before recovering wait_sig to true, so the consumer
// won't be notified. In this case, no enqueuing thread will
@@ -575,7 +628,8 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
// to see those enqueued values in func()
wait_sig.exchange(true, std::memory_order_acq_rel);
if (func(*this))
- write(fd, &dummy, 8);
+ nfd.notify();
+ //write(fd, &dummy, 8);
});
ev.add(FdEvent::READ);
}
@@ -584,14 +638,14 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
template<typename U>
bool enqueue(U &&e, bool unbounded = true) {
- static const uint64_t dummy = 1;
if (!MPSCQueue<T>::enqueue(std::forward<U>(e), unbounded))
return false;
// memory barrier here, so any load/store in enqueue must be finialized
if (wait_sig.exchange(false, std::memory_order_acq_rel))
{
//SALTICIDAE_LOG_DEBUG("mpsc notify");
- write(fd, &dummy, 8);
+ //write(fd, &dummy, 8);
+ nfd.notify();
}
return true;
}
@@ -599,33 +653,39 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
template<typename U> bool try_enqueue(U &&e) = delete;
};
+// NOTE: the MPMC implementation below hasn't been heavily tested.
template<typename T>
class MPMCQueueEventDriven: public MPMCQueue<T> {
private:
- const uint64_t dummy = 1;
std::atomic<bool> wait_sig;
- int fd;
+ //int fd;
+ NotifyFd nfd;
std::vector<FdEvent> evs;
public:
MPMCQueueEventDriven():
- wait_sig(true),
- fd(eventfd(0, EFD_NONBLOCK)) {
- if (fd == -1) throw SalticidaeError(SALTI_ERROR_FD);
+ wait_sig(true) {
+ //fd(eventfd(0, EFD_NONBLOCK)) {
+ //if (fd == -1) throw SalticidaeError(SALTI_ERROR_FD);
+ }
+ ~MPMCQueueEventDriven() {
+ //close(fd);
+ unreg_handlers();
}
- ~MPMCQueueEventDriven() { close(fd); unreg_handlers(); }
// this function is *NOT* thread-safe
template<typename Func>
void reg_handler(const EventContext &ec, Func &&func) {
- FdEvent ev(ec, fd, [this, func=std::forward<Func>(func)](int, int) {
+ FdEvent ev(ec, nfd.read_fd(), [this, func=std::forward<Func>(func)](int, int) {
//fprintf(stderr, "%x\n", std::this_thread::get_id());
uint64_t t;
- if (read(fd, &t, 8) != 8) return;
+ //if (read(fd, &t, 8) != 8) return;
+ if (!nfd.reset()) return;
// only one consumer should be here a a time
wait_sig.exchange(true, std::memory_order_acq_rel);
if (func(*this))
- write(fd, &dummy, 8);
+ //write(fd, &dummy, 8);
+ nfd.notify();
});
ev.add(FdEvent::READ);
evs.push_back(std::move(ev));
@@ -635,14 +695,14 @@ class MPMCQueueEventDriven: public MPMCQueue<T> {
template<typename U>
bool enqueue(U &&e, bool unbounded = true) {
- static const uint64_t dummy = 1;
if (!MPMCQueue<T>::enqueue(std::forward<U>(e), unbounded))
return false;
// memory barrier here, so any load/store in enqueue must be finialized
if (wait_sig.exchange(false, std::memory_order_acq_rel))
{
//SALTICIDAE_LOG_DEBUG("mpmc notify");
- write(fd, &dummy, 8);
+ //write(fd, &dummy, 8);
+ nfd.notify();
}
return true;
}