From 0f341fe7f092f704e1c1952c72085eb1ebd2086a Mon Sep 17 00:00:00 2001 From: Determinant Date: Wed, 14 Nov 2018 15:19:32 -0500 Subject: use ThreadCall pattern --- include/salticidae/event.h | 86 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) (limited to 'include/salticidae/event.h') diff --git a/include/salticidae/event.h b/include/salticidae/event.h index c21644b..da27902 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -25,10 +25,12 @@ #ifndef _SALTICIDAE_EVENT_H #define _SALTICIDAE_EVENT_H +#include #include #include #include +#include "salticidae/type.h" #include "salticidae/queue.h" #include "salticidae/util.h" #include "salticidae/ref.h" @@ -177,6 +179,90 @@ class Event { operator bool() const { return ev_fd != nullptr || ev_timer != nullptr; } }; +class ThreadNotifier { + std::condition_variable cv; + std::mutex mlock; + mutex_ul_t ul; + bool ready; + void *data; + public: + ThreadNotifier(): ul(mlock), ready(false) {} + void *wait() { + cv.wait(ul, [this]{ return ready; }); + return data; + } + void notify(void *_data) { + { + mutex_lg_t _(mlock); + ready = true; + data = _data; + } + cv.notify_all(); + } +}; + +class ThreadCall { + int ctl_fd[2]; + EventContext ec; + Event ev_listen; + + public: + class Handle { + std::function callback; + ThreadNotifier* notifier; + void *result; + friend ThreadCall; + public: + Handle(): notifier(nullptr), result(nullptr) {} + void exec() { + callback(*this); + if (notifier) notifier->notify(result); + } + void set_result(void *data) { result = data; } + }; + + ThreadCall() = default; + ThreadCall(const ThreadCall &) = delete; + ThreadCall(ThreadCall &&) = delete; + ThreadCall(EventContext ec): ec(ec) { + if (pipe2(ctl_fd, O_NONBLOCK)) + throw SalticidaeError(std::string("ThreadCall: failed to create pipe")); + ev_listen = Event(ec, ctl_fd[0], Event::READ, [this](int fd, int) { + Handle *h; + read(fd, &h, sizeof(h)); + h->exec(); + delete h; + }); + ev_listen.add(); + } + + ~ThreadCall() { + close(ctl_fd[0]); + close(ctl_fd[1]); + } + + template + void *call(Func callback, bool blocking = false) { + auto h = new Handle(); + h->callback = callback; + if (blocking) + { + ThreadNotifier notifier; + h->notifier = ¬ifier; + std::atomic_thread_fence(std::memory_order_release); + write(ctl_fd[1], &h, sizeof(h)); + return notifier.wait(); + } + else + { + std::atomic_thread_fence(std::memory_order_release); + write(ctl_fd[1], &h, sizeof(h)); + return nullptr; + } + } +}; + + template class MPSCQueueEventDriven: public MPSCQueue { private: -- cgit v1.2.3