aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/event.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/event.h')
-rw-r--r--include/salticidae/event.h86
1 files changed, 86 insertions, 0 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index c21644b..da27902 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -25,10 +25,12 @@
#ifndef _SALTICIDAE_EVENT_H
#define _SALTICIDAE_EVENT_H
+#include <condition_variable>
#include <unistd.h>
#include <uv.h>
#include <sys/eventfd.h>
+#include "salticidae/type.h"
#include "salticidae/queue.h"
#include "salticidae/util.h"
#include "salticidae/ref.h"
@@ -177,6 +179,90 @@ class Event {
operator bool() const { return ev_fd != nullptr || ev_timer != nullptr; }
};
+class ThreadNotifier {
+ std::condition_variable cv;
+ std::mutex mlock;
+ mutex_ul_t ul;
+ bool ready;
+ void *data;
+ public:
+ ThreadNotifier(): ul(mlock), ready(false) {}
+ void *wait() {
+ cv.wait(ul, [this]{ return ready; });
+ return data;
+ }
+ void notify(void *_data) {
+ {
+ mutex_lg_t _(mlock);
+ ready = true;
+ data = _data;
+ }
+ cv.notify_all();
+ }
+};
+
+class ThreadCall {
+ int ctl_fd[2];
+ EventContext ec;
+ Event ev_listen;
+
+ public:
+ class Handle {
+ std::function<void(Handle &)> callback;
+ ThreadNotifier* notifier;
+ void *result;
+ friend ThreadCall;
+ public:
+ Handle(): notifier(nullptr), result(nullptr) {}
+ void exec() {
+ callback(*this);
+ if (notifier) notifier->notify(result);
+ }
+ void set_result(void *data) { result = data; }
+ };
+
+ ThreadCall() = default;
+ ThreadCall(const ThreadCall &) = delete;
+ ThreadCall(ThreadCall &&) = delete;
+ ThreadCall(EventContext ec): ec(ec) {
+ if (pipe2(ctl_fd, O_NONBLOCK))
+ throw SalticidaeError(std::string("ThreadCall: failed to create pipe"));
+ ev_listen = Event(ec, ctl_fd[0], Event::READ, [this](int fd, int) {
+ Handle *h;
+ read(fd, &h, sizeof(h));
+ h->exec();
+ delete h;
+ });
+ ev_listen.add();
+ }
+
+ ~ThreadCall() {
+ close(ctl_fd[0]);
+ close(ctl_fd[1]);
+ }
+
+ template<typename Func>
+ void *call(Func callback, bool blocking = false) {
+ auto h = new Handle();
+ h->callback = callback;
+ if (blocking)
+ {
+ ThreadNotifier notifier;
+ h->notifier = &notifier;
+ std::atomic_thread_fence(std::memory_order_release);
+ write(ctl_fd[1], &h, sizeof(h));
+ return notifier.wait();
+ }
+ else
+ {
+ std::atomic_thread_fence(std::memory_order_release);
+ write(ctl_fd[1], &h, sizeof(h));
+ return nullptr;
+ }
+ }
+};
+
+
template<typename T>
class MPSCQueueEventDriven: public MPSCQueue<T> {
private: