From 69a9bed21f18728483320e88530045180796e2ac Mon Sep 17 00:00:00 2001 From: Determinant Date: Thu, 4 Jul 2019 00:28:30 -0400 Subject: improve dispatcher shutdown --- include/salticidae/event.h | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) (limited to 'include/salticidae/event.h') diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 960588c..420c073 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -656,6 +656,7 @@ class ThreadCall { public: struct Result { void *data; + std::exception_ptr error; std::function deleter; Result(): data(nullptr) {} Result(void *data, std::function &&deleter): @@ -663,11 +664,14 @@ class ThreadCall { ~Result() { if (data != nullptr) deleter(data); } Result(const Result &) = delete; Result(Result &&other): - data(other.data), deleter(std::move(other.deleter)) { + data(other.data), + error(std::move(other.error)), + deleter(std::move(other.deleter)) { other.data = nullptr; } void swap(Result &other) { std::swap(data, other.data); + std::swap(error, other.error); std::swap(deleter, other.deleter); } Result &operator=(const Result &other) = delete; @@ -679,7 +683,10 @@ class ThreadCall { } return *this; } - void *get() { return data; } + void *get() { + if (error) std::rethrow_exception(error); + return data; + } }; class Handle { std::function callback; @@ -689,15 +696,18 @@ class ThreadCall { public: Handle(): notifier(nullptr) {} Handle(const Handle &) = delete; - void exec() { - callback(*this); + void return_sync() { if (notifier) notifier->notify(std::move(result)); } + void exec() { + callback(*this); + return_sync(); + } template - void set_result(T &&data) { + Result &set_result(T &&data) { using _T = std::remove_reference_t; - result = Result(new _T(std::forward(data)), + return result = Result(new _T(std::forward(data)), [](void *ptr) {delete static_cast<_T *>(ptr);}); } }; @@ -711,7 +721,13 @@ class ThreadCall { Handle *h; while (q.try_dequeue(h)) { - if (!stopped) h->exec(); + try { + if (!stopped) h->exec(); + else throw SalticidaeError(SALTI_ERROR_NOT_AVAIL); + } catch (...) { + h->set_result(0).error = std::current_exception(); + h->return_sync(); + } delete h; if (++cnt == burst_size) return true; } @@ -726,7 +742,6 @@ class ThreadCall { template bool async_call(Func &&callback) { - if (stopped) return false; auto h = new Handle(); h->callback = std::forward(callback); q.enqueue(h); @@ -735,7 +750,6 @@ class ThreadCall { template Result call(Func &&callback) { - if (stopped) throw SalticidaeError(SALTI_ERROR_NOT_AVAIL); auto h = new Handle(); h->callback = std::forward(callback); ThreadNotifier notifier; @@ -746,6 +760,7 @@ class ThreadCall { const EventContext &get_ec() const { return ec; } void stop() { stopped = true; } + bool is_stopped() { return stopped; } }; } -- cgit v1.2.3