From 53f776997d0e92650b9f3a16224cef1c0c76b716 Mon Sep 17 00:00:00 2001 From: Determinant Date: Mon, 1 Jul 2019 23:35:17 -0400 Subject: add async id for error handling --- include/salticidae/conn.h | 73 ++++++++++++++++++++++++----------------------- 1 file changed, 38 insertions(+), 35 deletions(-) (limited to 'include/salticidae/conn.h') diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 7f74a87..0058274 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -60,7 +60,7 @@ class ConnPool { /** The type of callback invoked when connection status is changed. */ using conn_callback_t = std::function; /** The type of callback invoked when an error occured (during async execution). */ - using error_callback_t = std::function; + using error_callback_t = std::function; /** Abstraction for a bi-directional connection. */ class Conn { friend ConnPool; @@ -149,12 +149,14 @@ class ConnPool { using worker_error_callback_t = std::function; worker_error_callback_t disp_error_cb; worker_error_callback_t worker_error_cb; + std::atomic async_id; + int32_t gen_async_id() { return async_id.fetch_add(1, std::memory_order_relaxed); } conn_t _connect(const NetAddr &addr); void _listen(NetAddr listen_addr); - void recoverable_error(const std::exception_ptr err) const { - user_tcall->async_call([this, err](ThreadCall::Handle &) { - if (error_cb) error_cb(err, false); + void recoverable_error(const std::exception_ptr err, int32_t id) const { + user_tcall->async_call([this, err, id](ThreadCall::Handle &) { + if (error_cb) error_cb(err, false, id); }); } @@ -279,7 +281,7 @@ class ConnPool { else conn->send_data_func(conn, fd, what); } catch (...) { - conn->cpool->recoverable_error(std::current_exception()); + conn->cpool->recoverable_error(std::current_exception(), -1); conn->cpool->worker_terminate(conn); } }); @@ -426,6 +428,7 @@ class ConnPool { ConnPool(const EventContext &ec, const Config &config): ec(ec), enable_tls(config._enable_tls), + async_id(0), max_listen_backlog(config._max_listen_backlog), conn_server_timeout(config._conn_server_timeout), seg_buff_size(config._seg_buff_size), @@ -459,8 +462,8 @@ class ConnPool { disp_error_cb = [this](const std::exception_ptr err) { user_tcall->async_call([this, err](ThreadCall::Handle &) { stop_workers(); - std::rethrow_exception(err); - //if (error_cb) error_cb(err, true); + //std::rethrow_exception(err); + if (error_cb) error_cb(err, true, -1); }); disp_ec.stop(); workers[0].stop_tcall(); @@ -528,36 +531,36 @@ class ConnPool { } /** Actively connect to remote addr. */ - conn_t connect(const NetAddr &addr, bool blocking = true) { - if (blocking) - { - auto ret = *(static_cast *>( - disp_tcall->call([this, addr](ThreadCall::Handle &h) { - conn_t conn; - std::exception_ptr err = nullptr; - try { - conn = _connect(addr); - } catch (...) { - err = std::current_exception(); - } - h.set_result(std::make_pair(std::move(conn), err)); - }).get())); - if (ret.second) std::rethrow_exception(ret.second); - return std::move(ret.first); - } - else - { - disp_tcall->async_call([this, addr](ThreadCall::Handle &) { - try { - _connect(addr); - } catch (...) { - disp_error_cb(std::current_exception()); - } - }); - return nullptr; - } + conn_t connect_sync(const NetAddr &addr) { + auto ret = *(static_cast *>( + disp_tcall->call([this, addr](ThreadCall::Handle &h) { + conn_t conn; + std::exception_ptr err = nullptr; + try { + conn = _connect(addr); + } catch (...) { + err = std::current_exception(); + } + h.set_result(std::make_pair(std::move(conn), err)); + }).get())); + if (ret.second) std::rethrow_exception(ret.second); + return std::move(ret.first); } + /** Actively connect to remote addr (async). */ + int32_t connect(const NetAddr &addr) { + auto id = gen_async_id(); + disp_tcall->async_call([this, addr, id](ThreadCall::Handle &) { + try { + _connect(addr); + } catch (...) { + this->recoverable_error(std::current_exception(), id); + } + }); + return id; + } + + /** Listen for passive connections (connection initiated from remote). * Does not need to be called if do not want to accept any passive * connections. */ -- cgit v1.2.3