aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/event.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/event.h')
-rw-r--r--include/salticidae/event.h135
1 files changed, 76 insertions, 59 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index 021b5dc..d862ce8 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -189,6 +189,80 @@ class Event {
operator bool() const { return ev_fd != nullptr || ev_timer != nullptr; }
};
+class SigEvent {
+ public:
+ using callback_t = std::function<void(int signum)>;
+ private:
+ EventContext eb;
+ uv_signal_t *ev_sig;
+ callback_t callback;
+ static inline void sig_then(uv_signal_t *h, int signum) {
+ auto event = static_cast<SigEvent *>(h->data);
+ event->callback(signum);
+ }
+
+ static void _on_handle_close(uv_handle_t *h) {
+ delete h;
+ }
+
+ public:
+ SigEvent(): eb(nullptr), ev_sig(nullptr) {}
+ SigEvent(const EventContext &eb, callback_t callback):
+ eb(eb),
+ ev_sig(new uv_signal_t()),
+ callback(callback) {
+ uv_signal_init(eb.get(), ev_sig);
+ ev_sig->data = this;
+ }
+
+ SigEvent(const SigEvent &) = delete;
+ SigEvent(SigEvent &&other):
+ eb(std::move(other.eb)),
+ ev_sig(other.ev_sig),
+ callback(std::move(other.callback)) {
+ other.del();
+ other.ev_sig = nullptr;
+ ev_sig->data = this;
+ }
+
+ SigEvent &operator=(SigEvent &&other) {
+ clear();
+ other.del();
+ eb = std::move(other.eb);
+ ev_sig = other.ev_sig;
+ callback = std::move(other.callback);
+
+ other.ev_sig = nullptr;
+ ev_sig->data = this;
+ return *this;
+ }
+
+ ~SigEvent() { clear(); }
+
+ void clear() {
+ if (ev_sig != nullptr)
+ {
+ uv_signal_stop(ev_sig);
+ uv_close((uv_handle_t *)ev_sig, SigEvent::_on_handle_close);
+ ev_sig = nullptr;
+ }
+ callback = nullptr;
+ }
+
+ void set_callback(callback_t _callback) {
+ callback = _callback;
+ }
+
+ void add(int signum) {
+ uv_signal_start(ev_sig, SigEvent::sig_then, signum);
+ }
+ void del() {
+ uv_signal_stop(ev_sig);
+ }
+
+ operator bool() const { return ev_sig != nullptr; }
+};
+
template<typename T>
class ThreadNotifier {
std::condition_variable cv;
@@ -304,6 +378,8 @@ class ThreadCall {
write(ctl_fd[1], &h, sizeof(h));
return notifier.wait();
}
+
+ const EventContext &get_ec() const { return ec; }
};
@@ -362,65 +438,6 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
}
};
-
-
-// TODO: incorrect MPMCQueueEventDriven impl
-/*
-template<typename T>
-class MPMCQueueEventDriven: public MPMCQueue<T> {
- private:
- const uint64_t dummy = 1;
- std::atomic<bool> wait_sig;
- std::vector<std::pair<BoxObj<Event>, int>> evs;
-
- public:
- MPMCQueueEventDriven(size_t capacity = 65536):
- MPMCQueue<T>(capacity),
- wait_sig(true) {}
-
- template<typename Func>
- void listen(const EventContext &ec, Func &&func, size_t burst_size=128) {
- int fd = eventfd(0, EFD_NONBLOCK);
- evs.emplace(evs.end(), std::make_pair(new Event(ec, fd, EV_READ | EV_PERSIST,
- [this, func=std::forward<Func>(func), burst_size](int fd, int) {
- uint64_t t;
- read(fd, &t, 8);
- //fprintf(stderr, "%x\n", std::this_thread::get_id());
- T elem;
- size_t cnt = burst_size;
- while (MPMCQueue<T>::try_dequeue(elem))
- {
- func(std::move(elem));
- if (!--cnt)
- {
- write(fd, &dummy, 8);
- return;
- }
- }
- wait_sig.store(true, std::memory_order_relaxed);
- }), fd));
- evs.rbegin()->first->add();
- }
-
- ~MPMCQueueEventDriven() {
- for (const auto &p: evs)
- close(p.second);
- }
-
- template<typename U>
- bool enqueue(U &&e) {
- bool ret = MPMCQueue<T>::enqueue(std::forward<U>(e));
- if (wait_sig.exchange(false, std::memory_order_relaxed))
- {
- SALTICIDAE_LOG_DEBUG("mpmc notify");
- for (const auto &p: evs)
- write(p.second, &dummy, 8);
- }
- return ret;
- }
-};
-*/
-
}
#endif