aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/event.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/event.h')
-rw-r--r--include/salticidae/event.h218
1 files changed, 109 insertions, 109 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index 641bb8e..0600109 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -434,107 +434,6 @@ class ThreadNotifier {
}
};
-class ThreadCall {
- int ctl_fd[2];
- EventContext ec;
- FdEvent ev_listen;
-
- public:
- struct Result {
- void *data;
- std::function<void(void *)> deleter;
- Result(): data(nullptr) {}
- Result(void *data, std::function<void(void *)> &&deleter):
- data(data), deleter(std::move(deleter)) {}
- ~Result() { if (data != nullptr) deleter(data); }
- Result(const Result &) = delete;
- Result(Result &&other):
- data(other.data), deleter(std::move(other.deleter)) {
- other.data = nullptr;
- }
- void swap(Result &other) {
- std::swap(data, other.data);
- std::swap(deleter, other.deleter);
- }
- Result &operator=(const Result &other) = delete;
- Result &operator=(Result &&other) {
- if (this != &other)
- {
- Result tmp(std::move(other));
- tmp.swap(*this);
- }
- return *this;
- }
- void *get() { return data; }
- };
- class Handle {
- std::function<void(Handle &)> callback;
- ThreadNotifier<Result> * notifier;
- Result result;
- friend ThreadCall;
- public:
- Handle(): notifier(nullptr) {}
- void exec() {
- callback(*this);
- if (notifier)
- notifier->notify(std::move(result));
- }
- template<typename T>
- void set_result(T &&data) {
- using _T = std::remove_reference_t<T>;
- result = Result(new _T(std::forward<T>(data)),
- [](void *ptr) {delete static_cast<_T *>(ptr);});
- }
- };
-
- ThreadCall() = default;
- ThreadCall(const ThreadCall &) = delete;
- ThreadCall(ThreadCall &&) = delete;
- ThreadCall(EventContext ec): ec(ec) {
- if (pipe2(ctl_fd, O_NONBLOCK))
- throw SalticidaeError(std::string("ThreadCall: failed to create pipe"));
- ev_listen = FdEvent(ec, ctl_fd[0], [this](int fd, int) {
- Handle *h;
- read(fd, &h, sizeof(h));
- std::atomic_thread_fence(std::memory_order_acquire);
- h->exec();
- delete h;
- });
- ev_listen.add(FdEvent::READ);
- }
-
- ~ThreadCall() {
- ev_listen.clear();
- Handle *h;
- while (read(ctl_fd[0], &h, sizeof(h)) == sizeof(h))
- delete h;
- close(ctl_fd[0]);
- close(ctl_fd[1]);
- }
-
- template<typename Func>
- void async_call(Func callback) {
- auto h = new Handle();
- h->callback = callback;
- std::atomic_thread_fence(std::memory_order_release);
- write(ctl_fd[1], &h, sizeof(h));
- }
-
- template<typename Func>
- Result call(Func callback) {
- auto h = new Handle();
- h->callback = callback;
- ThreadNotifier<Result> notifier;
- h->notifier = &notifier;
- std::atomic_thread_fence(std::memory_order_release);
- write(ctl_fd[1], &h, sizeof(h));
- return notifier.wait();
- }
-
- const EventContext &get_ec() const { return ec; }
-};
-
-
template<typename T>
class MPSCQueueEventDriven: public MPSCQueue<T> {
private:
@@ -544,8 +443,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
FdEvent ev;
public:
- MPSCQueueEventDriven(size_t capacity = 65536):
- MPSCQueue<T>(capacity),
+ MPSCQueueEventDriven():
wait_sig(true),
fd(eventfd(0, EFD_NONBLOCK)) {}
@@ -577,9 +475,10 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
void unreg_handler() { ev.clear(); }
template<typename U>
- bool enqueue(U &&e) {
+ bool enqueue(U &&e, bool unbounded = true) {
static const uint64_t dummy = 1;
- MPSCQueue<T>::enqueue(std::forward<U>(e));
+ if (!MPSCQueue<T>::enqueue(std::forward<U>(e), unbounded))
+ return false;
// memory barrier here, so any load/store in enqueue must be finialized
if (wait_sig.exchange(false, std::memory_order_acq_rel))
{
@@ -613,8 +512,7 @@ class MPMCQueueEventDriven: public MPMCQueue<T> {
std::vector<FdEvent> evs;
public:
- MPMCQueueEventDriven(size_t capacity = 65536):
- MPMCQueue<T>(capacity),
+ MPMCQueueEventDriven():
wait_sig(true),
fd(eventfd(0, EFD_NONBLOCK)) {}
@@ -642,9 +540,10 @@ class MPMCQueueEventDriven: public MPMCQueue<T> {
void unreg_handlers() { evs.clear(); }
template<typename U>
- bool enqueue(U &&e) {
+ bool enqueue(U &&e, bool unbounded = true) {
static const uint64_t dummy = 1;
- MPMCQueue<T>::enqueue(std::forward<U>(e));
+ if (!MPMCQueue<T>::enqueue(std::forward<U>(e), unbounded))
+ return false;
// memory barrier here, so any load/store in enqueue must be finialized
if (wait_sig.exchange(false, std::memory_order_acq_rel))
{
@@ -655,6 +554,107 @@ class MPMCQueueEventDriven: public MPMCQueue<T> {
}
};
+class ThreadCall {
+ public: class Handle;
+ private:
+ int ctl_fd[2];
+ EventContext ec;
+ const size_t burst_size;
+ using queue_t = MPSCQueueEventDriven<Handle *>;
+ queue_t q;
+
+ public:
+ struct Result {
+ void *data;
+ std::function<void(void *)> deleter;
+ Result(): data(nullptr) {}
+ Result(void *data, std::function<void(void *)> &&deleter):
+ data(data), deleter(std::move(deleter)) {}
+ ~Result() { if (data != nullptr) deleter(data); }
+ Result(const Result &) = delete;
+ Result(Result &&other):
+ data(other.data), deleter(std::move(other.deleter)) {
+ other.data = nullptr;
+ }
+ void swap(Result &other) {
+ std::swap(data, other.data);
+ std::swap(deleter, other.deleter);
+ }
+ Result &operator=(const Result &other) = delete;
+ Result &operator=(Result &&other) {
+ if (this != &other)
+ {
+ Result tmp(std::move(other));
+ tmp.swap(*this);
+ }
+ return *this;
+ }
+ void *get() { return data; }
+ };
+ class Handle {
+ std::function<void(Handle &)> callback;
+ ThreadNotifier<Result> * notifier;
+ Result result;
+ friend ThreadCall;
+ public:
+ Handle(): notifier(nullptr) {}
+ void exec() {
+ callback(*this);
+ if (notifier)
+ notifier->notify(std::move(result));
+ }
+ template<typename T>
+ void set_result(T &&data) {
+ using _T = std::remove_reference_t<T>;
+ result = Result(new _T(std::forward<T>(data)),
+ [](void *ptr) {delete static_cast<_T *>(ptr);});
+ }
+ };
+
+ ThreadCall(size_t burst_size): burst_size(burst_size) {}
+ ThreadCall(const ThreadCall &) = delete;
+ ThreadCall(ThreadCall &&) = delete;
+ ThreadCall(EventContext ec, size_t burst_size = 128): ec(ec), burst_size(burst_size) {
+ q.reg_handler(ec, [this, burst_size=burst_size](queue_t &q) {
+ size_t cnt = 0;
+ Handle *h;
+ while (q.try_dequeue(h))
+ {
+ h->exec();
+ delete h;
+ if (++cnt == burst_size) return true;
+ }
+ return false;
+ });
+ }
+
+ ~ThreadCall() {
+ Handle *h;
+ while (q.try_dequeue(h)) delete h;
+ close(ctl_fd[0]);
+ close(ctl_fd[1]);
+ }
+
+ template<typename Func>
+ void async_call(Func callback) {
+ auto h = new Handle();
+ h->callback = callback;
+ q.enqueue(h);
+ }
+
+ template<typename Func>
+ Result call(Func callback) {
+ auto h = new Handle();
+ h->callback = callback;
+ ThreadNotifier<Result> notifier;
+ h->notifier = &notifier;
+ q.enqueue(h);
+ return notifier.wait();
+ }
+
+ const EventContext &get_ec() const { return ec; }
+};
+
}
#endif