diff options
Diffstat (limited to 'include')
-rw-r--r-- | include/salticidae/conn.h | 37 | ||||
-rw-r--r-- | include/salticidae/event.h | 135 | ||||
-rw-r--r-- | include/salticidae/network.h | 2 |
3 files changed, 101 insertions, 73 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index c357875..265a02a 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -238,6 +238,7 @@ class ConnPool { /* related to workers */ size_t nworker; salticidae::BoxObj<Worker[]> workers; + bool worker_running; void accept_client(int, int); conn_t add_conn(const conn_t &conn); @@ -319,7 +320,8 @@ class ConnPool { conn_server_timeout(config._conn_server_timeout), seg_buff_size(config._seg_buff_size), listen_fd(-1), - nworker(config._nworker) { + nworker(config._nworker), + worker_running(false) { workers = new Worker[nworker]; user_tcall = new ThreadCall(ec); disp_ec = workers[0].get_ec(); @@ -327,27 +329,22 @@ class ConnPool { workers[0].set_dispatcher(); } - ~ConnPool() { - stop(); - for (auto it: pool) - { - conn_t conn = it.second; - conn->stop(); - conn->self_ref = nullptr; - } - if (listen_fd != -1) close(listen_fd); - } + ~ConnPool() { stop(); } ConnPool(const ConnPool &) = delete; ConnPool(ConnPool &&) = delete; void start() { + if (worker_running) return; SALTICIDAE_LOG_INFO("starting all threads..."); for (size_t i = 0; i < nworker; i++) workers[i].start(); + worker_running = true; } - void stop() { + void stop_workers() { + if (!worker_running) return; + worker_running = false; SALTICIDAE_LOG_INFO("stopping all threads..."); /* stop all workers */ for (size_t i = 0; i < nworker; i++) @@ -355,7 +352,21 @@ class ConnPool { /* join all worker threads */ for (size_t i = 0; i < nworker; i++) workers[i].get_handle().join(); - nworker = 0; + } + + void stop() { + stop_workers(); + for (auto it: pool) + { + conn_t conn = it.second; + conn->stop(); + conn->self_ref = nullptr; + } + if (listen_fd != -1) + { + close(listen_fd); + listen_fd = -1; + } } /** Actively connect to remote addr. */ diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 021b5dc..d862ce8 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -189,6 +189,80 @@ class Event { operator bool() const { return ev_fd != nullptr || ev_timer != nullptr; } }; +class SigEvent { + public: + using callback_t = std::function<void(int signum)>; + private: + EventContext eb; + uv_signal_t *ev_sig; + callback_t callback; + static inline void sig_then(uv_signal_t *h, int signum) { + auto event = static_cast<SigEvent *>(h->data); + event->callback(signum); + } + + static void _on_handle_close(uv_handle_t *h) { + delete h; + } + + public: + SigEvent(): eb(nullptr), ev_sig(nullptr) {} + SigEvent(const EventContext &eb, callback_t callback): + eb(eb), + ev_sig(new uv_signal_t()), + callback(callback) { + uv_signal_init(eb.get(), ev_sig); + ev_sig->data = this; + } + + SigEvent(const SigEvent &) = delete; + SigEvent(SigEvent &&other): + eb(std::move(other.eb)), + ev_sig(other.ev_sig), + callback(std::move(other.callback)) { + other.del(); + other.ev_sig = nullptr; + ev_sig->data = this; + } + + SigEvent &operator=(SigEvent &&other) { + clear(); + other.del(); + eb = std::move(other.eb); + ev_sig = other.ev_sig; + callback = std::move(other.callback); + + other.ev_sig = nullptr; + ev_sig->data = this; + return *this; + } + + ~SigEvent() { clear(); } + + void clear() { + if (ev_sig != nullptr) + { + uv_signal_stop(ev_sig); + uv_close((uv_handle_t *)ev_sig, SigEvent::_on_handle_close); + ev_sig = nullptr; + } + callback = nullptr; + } + + void set_callback(callback_t _callback) { + callback = _callback; + } + + void add(int signum) { + uv_signal_start(ev_sig, SigEvent::sig_then, signum); + } + void del() { + uv_signal_stop(ev_sig); + } + + operator bool() const { return ev_sig != nullptr; } +}; + template<typename T> class ThreadNotifier { std::condition_variable cv; @@ -304,6 +378,8 @@ class ThreadCall { write(ctl_fd[1], &h, sizeof(h)); return notifier.wait(); } + + const EventContext &get_ec() const { return ec; } }; @@ -362,65 +438,6 @@ class MPSCQueueEventDriven: public MPSCQueue<T> { } }; - - -// TODO: incorrect MPMCQueueEventDriven impl -/* -template<typename T> -class MPMCQueueEventDriven: public MPMCQueue<T> { - private: - const uint64_t dummy = 1; - std::atomic<bool> wait_sig; - std::vector<std::pair<BoxObj<Event>, int>> evs; - - public: - MPMCQueueEventDriven(size_t capacity = 65536): - MPMCQueue<T>(capacity), - wait_sig(true) {} - - template<typename Func> - void listen(const EventContext &ec, Func &&func, size_t burst_size=128) { - int fd = eventfd(0, EFD_NONBLOCK); - evs.emplace(evs.end(), std::make_pair(new Event(ec, fd, EV_READ | EV_PERSIST, - [this, func=std::forward<Func>(func), burst_size](int fd, int) { - uint64_t t; - read(fd, &t, 8); - //fprintf(stderr, "%x\n", std::this_thread::get_id()); - T elem; - size_t cnt = burst_size; - while (MPMCQueue<T>::try_dequeue(elem)) - { - func(std::move(elem)); - if (!--cnt) - { - write(fd, &dummy, 8); - return; - } - } - wait_sig.store(true, std::memory_order_relaxed); - }), fd)); - evs.rbegin()->first->add(); - } - - ~MPMCQueueEventDriven() { - for (const auto &p: evs) - close(p.second); - } - - template<typename U> - bool enqueue(U &&e) { - bool ret = MPMCQueue<T>::enqueue(std::forward<U>(e)); - if (wait_sig.exchange(false, std::memory_order_relaxed)) - { - SALTICIDAE_LOG_DEBUG("mpmc notify"); - for (const auto &p: evs) - write(p.second, &dummy, 8); - } - return ret; - } -}; -*/ - } #endif diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 43638cf..a71c5dd 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -423,7 +423,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { this->reg_handler(generic_bind(&PeerNetwork::msg_pong, this, _1, _2)); } - ~PeerNetwork() { this->stop(); } + ~PeerNetwork() { this->stop_workers(); } void add_peer(const NetAddr &paddr); const conn_t get_peer_conn(const NetAddr &paddr) const; |