From 69a9bed21f18728483320e88530045180796e2ac Mon Sep 17 00:00:00 2001 From: Determinant Date: Thu, 4 Jul 2019 00:28:30 -0400 Subject: improve dispatcher shutdown --- include/salticidae/conn.h | 46 ++++++++++++++++++++-------------------------- 1 file changed, 20 insertions(+), 26 deletions(-) (limited to 'include/salticidae/conn.h') diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 6b9b486..56d2d6a 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -210,6 +210,7 @@ class ConnPool { class Worker { EventContext ec; ThreadCall tcall; + BoxObj exit_tcall; /** only used by the dispatcher thread */ std::thread handle; bool disp_flag; std::atomic nconn; @@ -297,10 +298,18 @@ class ConnPool { tcall.async_call([this](ThreadCall::Handle &) { ec.stop(); }); } + void disp_stop() { + assert(disp_flag && exit_tcall); + exit_tcall->async_call([this](ThreadCall::Handle &) { ec.stop(); }); + } + std::thread &get_handle() { return handle; } const EventContext &get_ec() { return ec; } ThreadCall *get_tcall() { return &tcall; } - void set_dispatcher() { disp_flag = true; } + void set_dispatcher() { + disp_flag = true; + exit_tcall = new ThreadCall(ec); + } bool is_dispatcher() const { return disp_flag; } size_t get_nconn() { return nconn; } void stop_tcall() { tcall.stop(); } @@ -461,10 +470,9 @@ class ConnPool { workers[0].set_dispatcher(); disp_error_cb = [this](const std::exception_ptr err) { workers[0].stop_tcall(); - disp_ec.stop(); user_tcall->async_call([this, err](ThreadCall::Handle &) { - stop_workers(); - //std::rethrow_exception(err); + for (size_t i = 1; i < nworker; i++) + workers[i].stop(); if (error_cb) error_cb(err, true, -1); }); }; @@ -504,7 +512,7 @@ class ConnPool { system_state = 2; SALTICIDAE_LOG_INFO("stopping all threads..."); /* stop the dispatcher */ - workers[0].stop(); + workers[0].disp_stop(); workers[0].get_handle().join(); /* stop all workers */ for (size_t i = 1; i < nworker; i++) @@ -532,19 +540,13 @@ class ConnPool { /** Actively connect to remote addr. */ conn_t connect_sync(const NetAddr &addr) { - auto ret = *(static_cast *>( + auto ret = *(static_cast( disp_tcall->call([this, addr](ThreadCall::Handle &h) { conn_t conn; - std::exception_ptr err = nullptr; - try { - conn = _connect(addr); - } catch (...) { - err = std::current_exception(); - } - h.set_result(std::make_pair(std::move(conn), err)); + conn = _connect(addr); + h.set_result(conn); }).get())); - if (ret.second) std::rethrow_exception(ret.second); - return std::move(ret.first); + return std::move(ret); } /** Actively connect to remote addr (async). */ @@ -565,17 +567,9 @@ class ConnPool { * Does not need to be called if do not want to accept any passive * connections. */ void listen(NetAddr listen_addr) { - auto ret = *(static_cast( - disp_tcall->call([this, listen_addr](ThreadCall::Handle &h) { - std::exception_ptr err = nullptr; - try { - _listen(listen_addr); - } catch (...) { - err = std::current_exception(); - } - h.set_result(err); - }).get())); - if (ret) std::rethrow_exception(ret); + disp_tcall->call([this, listen_addr](ThreadCall::Handle &) { + _listen(listen_addr); + }).get(); } template -- cgit v1.2.3