From b7802b3b06511f067719cb845dfd03a223f0e18f Mon Sep 17 00:00:00 2001 From: Determinant Date: Wed, 3 Apr 2019 14:55:26 -0400 Subject: improve non-blocking API --- include/salticidae/event.h | 218 ++++++++++++++++++++++----------------------- 1 file changed, 109 insertions(+), 109 deletions(-) (limited to 'include/salticidae/event.h') diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 641bb8e..0600109 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -434,107 +434,6 @@ class ThreadNotifier { } }; -class ThreadCall { - int ctl_fd[2]; - EventContext ec; - FdEvent ev_listen; - - public: - struct Result { - void *data; - std::function deleter; - Result(): data(nullptr) {} - Result(void *data, std::function &&deleter): - data(data), deleter(std::move(deleter)) {} - ~Result() { if (data != nullptr) deleter(data); } - Result(const Result &) = delete; - Result(Result &&other): - data(other.data), deleter(std::move(other.deleter)) { - other.data = nullptr; - } - void swap(Result &other) { - std::swap(data, other.data); - std::swap(deleter, other.deleter); - } - Result &operator=(const Result &other) = delete; - Result &operator=(Result &&other) { - if (this != &other) - { - Result tmp(std::move(other)); - tmp.swap(*this); - } - return *this; - } - void *get() { return data; } - }; - class Handle { - std::function callback; - ThreadNotifier * notifier; - Result result; - friend ThreadCall; - public: - Handle(): notifier(nullptr) {} - void exec() { - callback(*this); - if (notifier) - notifier->notify(std::move(result)); - } - template - void set_result(T &&data) { - using _T = std::remove_reference_t; - result = Result(new _T(std::forward(data)), - [](void *ptr) {delete static_cast<_T *>(ptr);}); - } - }; - - ThreadCall() = default; - ThreadCall(const ThreadCall &) = delete; - ThreadCall(ThreadCall &&) = delete; - ThreadCall(EventContext ec): ec(ec) { - if (pipe2(ctl_fd, O_NONBLOCK)) - throw SalticidaeError(std::string("ThreadCall: failed to create pipe")); - ev_listen = FdEvent(ec, ctl_fd[0], [this](int fd, int) { - Handle *h; - read(fd, &h, sizeof(h)); - std::atomic_thread_fence(std::memory_order_acquire); - h->exec(); - delete h; - }); - ev_listen.add(FdEvent::READ); - } - - ~ThreadCall() { - ev_listen.clear(); - Handle *h; - while (read(ctl_fd[0], &h, sizeof(h)) == sizeof(h)) - delete h; - close(ctl_fd[0]); - close(ctl_fd[1]); - } - - template - void async_call(Func callback) { - auto h = new Handle(); - h->callback = callback; - std::atomic_thread_fence(std::memory_order_release); - write(ctl_fd[1], &h, sizeof(h)); - } - - template - Result call(Func callback) { - auto h = new Handle(); - h->callback = callback; - ThreadNotifier notifier; - h->notifier = ¬ifier; - std::atomic_thread_fence(std::memory_order_release); - write(ctl_fd[1], &h, sizeof(h)); - return notifier.wait(); - } - - const EventContext &get_ec() const { return ec; } -}; - - template class MPSCQueueEventDriven: public MPSCQueue { private: @@ -544,8 +443,7 @@ class MPSCQueueEventDriven: public MPSCQueue { FdEvent ev; public: - MPSCQueueEventDriven(size_t capacity = 65536): - MPSCQueue(capacity), + MPSCQueueEventDriven(): wait_sig(true), fd(eventfd(0, EFD_NONBLOCK)) {} @@ -577,9 +475,10 @@ class MPSCQueueEventDriven: public MPSCQueue { void unreg_handler() { ev.clear(); } template - bool enqueue(U &&e) { + bool enqueue(U &&e, bool unbounded = true) { static const uint64_t dummy = 1; - MPSCQueue::enqueue(std::forward(e)); + if (!MPSCQueue::enqueue(std::forward(e), unbounded)) + return false; // memory barrier here, so any load/store in enqueue must be finialized if (wait_sig.exchange(false, std::memory_order_acq_rel)) { @@ -613,8 +512,7 @@ class MPMCQueueEventDriven: public MPMCQueue { std::vector evs; public: - MPMCQueueEventDriven(size_t capacity = 65536): - MPMCQueue(capacity), + MPMCQueueEventDriven(): wait_sig(true), fd(eventfd(0, EFD_NONBLOCK)) {} @@ -642,9 +540,10 @@ class MPMCQueueEventDriven: public MPMCQueue { void unreg_handlers() { evs.clear(); } template - bool enqueue(U &&e) { + bool enqueue(U &&e, bool unbounded = true) { static const uint64_t dummy = 1; - MPMCQueue::enqueue(std::forward(e)); + if (!MPMCQueue::enqueue(std::forward(e), unbounded)) + return false; // memory barrier here, so any load/store in enqueue must be finialized if (wait_sig.exchange(false, std::memory_order_acq_rel)) { @@ -655,6 +554,107 @@ class MPMCQueueEventDriven: public MPMCQueue { } }; +class ThreadCall { + public: class Handle; + private: + int ctl_fd[2]; + EventContext ec; + const size_t burst_size; + using queue_t = MPSCQueueEventDriven; + queue_t q; + + public: + struct Result { + void *data; + std::function deleter; + Result(): data(nullptr) {} + Result(void *data, std::function &&deleter): + data(data), deleter(std::move(deleter)) {} + ~Result() { if (data != nullptr) deleter(data); } + Result(const Result &) = delete; + Result(Result &&other): + data(other.data), deleter(std::move(other.deleter)) { + other.data = nullptr; + } + void swap(Result &other) { + std::swap(data, other.data); + std::swap(deleter, other.deleter); + } + Result &operator=(const Result &other) = delete; + Result &operator=(Result &&other) { + if (this != &other) + { + Result tmp(std::move(other)); + tmp.swap(*this); + } + return *this; + } + void *get() { return data; } + }; + class Handle { + std::function callback; + ThreadNotifier * notifier; + Result result; + friend ThreadCall; + public: + Handle(): notifier(nullptr) {} + void exec() { + callback(*this); + if (notifier) + notifier->notify(std::move(result)); + } + template + void set_result(T &&data) { + using _T = std::remove_reference_t; + result = Result(new _T(std::forward(data)), + [](void *ptr) {delete static_cast<_T *>(ptr);}); + } + }; + + ThreadCall(size_t burst_size): burst_size(burst_size) {} + ThreadCall(const ThreadCall &) = delete; + ThreadCall(ThreadCall &&) = delete; + ThreadCall(EventContext ec, size_t burst_size = 128): ec(ec), burst_size(burst_size) { + q.reg_handler(ec, [this, burst_size=burst_size](queue_t &q) { + size_t cnt = 0; + Handle *h; + while (q.try_dequeue(h)) + { + h->exec(); + delete h; + if (++cnt == burst_size) return true; + } + return false; + }); + } + + ~ThreadCall() { + Handle *h; + while (q.try_dequeue(h)) delete h; + close(ctl_fd[0]); + close(ctl_fd[1]); + } + + template + void async_call(Func callback) { + auto h = new Handle(); + h->callback = callback; + q.enqueue(h); + } + + template + Result call(Func callback) { + auto h = new Handle(); + h->callback = callback; + ThreadNotifier notifier; + h->notifier = ¬ifier; + q.enqueue(h); + return notifier.wait(); + } + + const EventContext &get_ec() const { return ec; } +}; + } #endif -- cgit v1.2.3