aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/conn.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/conn.h')
-rw-r--r--include/salticidae/conn.h46
1 files changed, 20 insertions, 26 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 6b9b486..56d2d6a 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -210,6 +210,7 @@ class ConnPool {
class Worker {
EventContext ec;
ThreadCall tcall;
+ BoxObj<ThreadCall> exit_tcall; /** only used by the dispatcher thread */
std::thread handle;
bool disp_flag;
std::atomic<size_t> nconn;
@@ -297,10 +298,18 @@ class ConnPool {
tcall.async_call([this](ThreadCall::Handle &) { ec.stop(); });
}
+ void disp_stop() {
+ assert(disp_flag && exit_tcall);
+ exit_tcall->async_call([this](ThreadCall::Handle &) { ec.stop(); });
+ }
+
std::thread &get_handle() { return handle; }
const EventContext &get_ec() { return ec; }
ThreadCall *get_tcall() { return &tcall; }
- void set_dispatcher() { disp_flag = true; }
+ void set_dispatcher() {
+ disp_flag = true;
+ exit_tcall = new ThreadCall(ec);
+ }
bool is_dispatcher() const { return disp_flag; }
size_t get_nconn() { return nconn; }
void stop_tcall() { tcall.stop(); }
@@ -461,10 +470,9 @@ class ConnPool {
workers[0].set_dispatcher();
disp_error_cb = [this](const std::exception_ptr err) {
workers[0].stop_tcall();
- disp_ec.stop();
user_tcall->async_call([this, err](ThreadCall::Handle &) {
- stop_workers();
- //std::rethrow_exception(err);
+ for (size_t i = 1; i < nworker; i++)
+ workers[i].stop();
if (error_cb) error_cb(err, true, -1);
});
};
@@ -504,7 +512,7 @@ class ConnPool {
system_state = 2;
SALTICIDAE_LOG_INFO("stopping all threads...");
/* stop the dispatcher */
- workers[0].stop();
+ workers[0].disp_stop();
workers[0].get_handle().join();
/* stop all workers */
for (size_t i = 1; i < nworker; i++)
@@ -532,19 +540,13 @@ class ConnPool {
/** Actively connect to remote addr. */
conn_t connect_sync(const NetAddr &addr) {
- auto ret = *(static_cast<std::pair<conn_t, std::exception_ptr> *>(
+ auto ret = *(static_cast<conn_t *>(
disp_tcall->call([this, addr](ThreadCall::Handle &h) {
conn_t conn;
- std::exception_ptr err = nullptr;
- try {
- conn = _connect(addr);
- } catch (...) {
- err = std::current_exception();
- }
- h.set_result(std::make_pair(std::move(conn), err));
+ conn = _connect(addr);
+ h.set_result(conn);
}).get()));
- if (ret.second) std::rethrow_exception(ret.second);
- return std::move(ret.first);
+ return std::move(ret);
}
/** Actively connect to remote addr (async). */
@@ -565,17 +567,9 @@ class ConnPool {
* Does not need to be called if do not want to accept any passive
* connections. */
void listen(NetAddr listen_addr) {
- auto ret = *(static_cast<std::exception_ptr *>(
- disp_tcall->call([this, listen_addr](ThreadCall::Handle &h) {
- std::exception_ptr err = nullptr;
- try {
- _listen(listen_addr);
- } catch (...) {
- err = std::current_exception();
- }
- h.set_result(err);
- }).get()));
- if (ret) std::rethrow_exception(ret);
+ disp_tcall->call([this, listen_addr](ThreadCall::Handle &) {
+ _listen(listen_addr);
+ }).get();
}
template<typename Func>