aboutsummaryrefslogtreecommitdiff
path: root/include
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2018-11-18 16:24:30 -0500
committerDeterminant <[email protected]>2018-11-18 16:24:30 -0500
commit161d969e0eabfecccd69a6b9ed2d03919cf89cb5 (patch)
tree60008191c4320a8b4c5ae31773d70f996d3548f7 /include
parent39f6d6ac46d440aa68e7b1a2f2e4eb629356af34 (diff)
improve the test programsmultiloops
Diffstat (limited to 'include')
-rw-r--r--include/salticidae/conn.h37
-rw-r--r--include/salticidae/event.h135
-rw-r--r--include/salticidae/network.h2
3 files changed, 101 insertions, 73 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index c357875..265a02a 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -238,6 +238,7 @@ class ConnPool {
/* related to workers */
size_t nworker;
salticidae::BoxObj<Worker[]> workers;
+ bool worker_running;
void accept_client(int, int);
conn_t add_conn(const conn_t &conn);
@@ -319,7 +320,8 @@ class ConnPool {
conn_server_timeout(config._conn_server_timeout),
seg_buff_size(config._seg_buff_size),
listen_fd(-1),
- nworker(config._nworker) {
+ nworker(config._nworker),
+ worker_running(false) {
workers = new Worker[nworker];
user_tcall = new ThreadCall(ec);
disp_ec = workers[0].get_ec();
@@ -327,27 +329,22 @@ class ConnPool {
workers[0].set_dispatcher();
}
- ~ConnPool() {
- stop();
- for (auto it: pool)
- {
- conn_t conn = it.second;
- conn->stop();
- conn->self_ref = nullptr;
- }
- if (listen_fd != -1) close(listen_fd);
- }
+ ~ConnPool() { stop(); }
ConnPool(const ConnPool &) = delete;
ConnPool(ConnPool &&) = delete;
void start() {
+ if (worker_running) return;
SALTICIDAE_LOG_INFO("starting all threads...");
for (size_t i = 0; i < nworker; i++)
workers[i].start();
+ worker_running = true;
}
- void stop() {
+ void stop_workers() {
+ if (!worker_running) return;
+ worker_running = false;
SALTICIDAE_LOG_INFO("stopping all threads...");
/* stop all workers */
for (size_t i = 0; i < nworker; i++)
@@ -355,7 +352,21 @@ class ConnPool {
/* join all worker threads */
for (size_t i = 0; i < nworker; i++)
workers[i].get_handle().join();
- nworker = 0;
+ }
+
+ void stop() {
+ stop_workers();
+ for (auto it: pool)
+ {
+ conn_t conn = it.second;
+ conn->stop();
+ conn->self_ref = nullptr;
+ }
+ if (listen_fd != -1)
+ {
+ close(listen_fd);
+ listen_fd = -1;
+ }
}
/** Actively connect to remote addr. */
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index 021b5dc..d862ce8 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -189,6 +189,80 @@ class Event {
operator bool() const { return ev_fd != nullptr || ev_timer != nullptr; }
};
+class SigEvent {
+ public:
+ using callback_t = std::function<void(int signum)>;
+ private:
+ EventContext eb;
+ uv_signal_t *ev_sig;
+ callback_t callback;
+ static inline void sig_then(uv_signal_t *h, int signum) {
+ auto event = static_cast<SigEvent *>(h->data);
+ event->callback(signum);
+ }
+
+ static void _on_handle_close(uv_handle_t *h) {
+ delete h;
+ }
+
+ public:
+ SigEvent(): eb(nullptr), ev_sig(nullptr) {}
+ SigEvent(const EventContext &eb, callback_t callback):
+ eb(eb),
+ ev_sig(new uv_signal_t()),
+ callback(callback) {
+ uv_signal_init(eb.get(), ev_sig);
+ ev_sig->data = this;
+ }
+
+ SigEvent(const SigEvent &) = delete;
+ SigEvent(SigEvent &&other):
+ eb(std::move(other.eb)),
+ ev_sig(other.ev_sig),
+ callback(std::move(other.callback)) {
+ other.del();
+ other.ev_sig = nullptr;
+ ev_sig->data = this;
+ }
+
+ SigEvent &operator=(SigEvent &&other) {
+ clear();
+ other.del();
+ eb = std::move(other.eb);
+ ev_sig = other.ev_sig;
+ callback = std::move(other.callback);
+
+ other.ev_sig = nullptr;
+ ev_sig->data = this;
+ return *this;
+ }
+
+ ~SigEvent() { clear(); }
+
+ void clear() {
+ if (ev_sig != nullptr)
+ {
+ uv_signal_stop(ev_sig);
+ uv_close((uv_handle_t *)ev_sig, SigEvent::_on_handle_close);
+ ev_sig = nullptr;
+ }
+ callback = nullptr;
+ }
+
+ void set_callback(callback_t _callback) {
+ callback = _callback;
+ }
+
+ void add(int signum) {
+ uv_signal_start(ev_sig, SigEvent::sig_then, signum);
+ }
+ void del() {
+ uv_signal_stop(ev_sig);
+ }
+
+ operator bool() const { return ev_sig != nullptr; }
+};
+
template<typename T>
class ThreadNotifier {
std::condition_variable cv;
@@ -304,6 +378,8 @@ class ThreadCall {
write(ctl_fd[1], &h, sizeof(h));
return notifier.wait();
}
+
+ const EventContext &get_ec() const { return ec; }
};
@@ -362,65 +438,6 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
}
};
-
-
-// TODO: incorrect MPMCQueueEventDriven impl
-/*
-template<typename T>
-class MPMCQueueEventDriven: public MPMCQueue<T> {
- private:
- const uint64_t dummy = 1;
- std::atomic<bool> wait_sig;
- std::vector<std::pair<BoxObj<Event>, int>> evs;
-
- public:
- MPMCQueueEventDriven(size_t capacity = 65536):
- MPMCQueue<T>(capacity),
- wait_sig(true) {}
-
- template<typename Func>
- void listen(const EventContext &ec, Func &&func, size_t burst_size=128) {
- int fd = eventfd(0, EFD_NONBLOCK);
- evs.emplace(evs.end(), std::make_pair(new Event(ec, fd, EV_READ | EV_PERSIST,
- [this, func=std::forward<Func>(func), burst_size](int fd, int) {
- uint64_t t;
- read(fd, &t, 8);
- //fprintf(stderr, "%x\n", std::this_thread::get_id());
- T elem;
- size_t cnt = burst_size;
- while (MPMCQueue<T>::try_dequeue(elem))
- {
- func(std::move(elem));
- if (!--cnt)
- {
- write(fd, &dummy, 8);
- return;
- }
- }
- wait_sig.store(true, std::memory_order_relaxed);
- }), fd));
- evs.rbegin()->first->add();
- }
-
- ~MPMCQueueEventDriven() {
- for (const auto &p: evs)
- close(p.second);
- }
-
- template<typename U>
- bool enqueue(U &&e) {
- bool ret = MPMCQueue<T>::enqueue(std::forward<U>(e));
- if (wait_sig.exchange(false, std::memory_order_relaxed))
- {
- SALTICIDAE_LOG_DEBUG("mpmc notify");
- for (const auto &p: evs)
- write(p.second, &dummy, 8);
- }
- return ret;
- }
-};
-*/
-
}
#endif
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 43638cf..a71c5dd 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -423,7 +423,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
this->reg_handler(generic_bind(&PeerNetwork::msg_pong, this, _1, _2));
}
- ~PeerNetwork() { this->stop(); }
+ ~PeerNetwork() { this->stop_workers(); }
void add_peer(const NetAddr &paddr);
const conn_t get_peer_conn(const NetAddr &paddr) const;