aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/conn.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/conn.h')
-rw-r--r--include/salticidae/conn.h73
1 files changed, 38 insertions, 35 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 7f74a87..0058274 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -60,7 +60,7 @@ class ConnPool {
/** The type of callback invoked when connection status is changed. */
using conn_callback_t = std::function<bool(const conn_t &, bool)>;
/** The type of callback invoked when an error occured (during async execution). */
- using error_callback_t = std::function<void(const std::exception_ptr, bool)>;
+ using error_callback_t = std::function<void(const std::exception_ptr, bool, int32_t)>;
/** Abstraction for a bi-directional connection. */
class Conn {
friend ConnPool;
@@ -149,12 +149,14 @@ class ConnPool {
using worker_error_callback_t = std::function<void(const std::exception_ptr err)>;
worker_error_callback_t disp_error_cb;
worker_error_callback_t worker_error_cb;
+ std::atomic<uint16_t> async_id;
+ int32_t gen_async_id() { return async_id.fetch_add(1, std::memory_order_relaxed); }
conn_t _connect(const NetAddr &addr);
void _listen(NetAddr listen_addr);
- void recoverable_error(const std::exception_ptr err) const {
- user_tcall->async_call([this, err](ThreadCall::Handle &) {
- if (error_cb) error_cb(err, false);
+ void recoverable_error(const std::exception_ptr err, int32_t id) const {
+ user_tcall->async_call([this, err, id](ThreadCall::Handle &) {
+ if (error_cb) error_cb(err, false, id);
});
}
@@ -279,7 +281,7 @@ class ConnPool {
else
conn->send_data_func(conn, fd, what);
} catch (...) {
- conn->cpool->recoverable_error(std::current_exception());
+ conn->cpool->recoverable_error(std::current_exception(), -1);
conn->cpool->worker_terminate(conn);
}
});
@@ -426,6 +428,7 @@ class ConnPool {
ConnPool(const EventContext &ec, const Config &config):
ec(ec),
enable_tls(config._enable_tls),
+ async_id(0),
max_listen_backlog(config._max_listen_backlog),
conn_server_timeout(config._conn_server_timeout),
seg_buff_size(config._seg_buff_size),
@@ -459,8 +462,8 @@ class ConnPool {
disp_error_cb = [this](const std::exception_ptr err) {
user_tcall->async_call([this, err](ThreadCall::Handle &) {
stop_workers();
- std::rethrow_exception(err);
- //if (error_cb) error_cb(err, true);
+ //std::rethrow_exception(err);
+ if (error_cb) error_cb(err, true, -1);
});
disp_ec.stop();
workers[0].stop_tcall();
@@ -528,36 +531,36 @@ class ConnPool {
}
/** Actively connect to remote addr. */
- conn_t connect(const NetAddr &addr, bool blocking = true) {
- if (blocking)
- {
- auto ret = *(static_cast<std::pair<conn_t, std::exception_ptr> *>(
- disp_tcall->call([this, addr](ThreadCall::Handle &h) {
- conn_t conn;
- std::exception_ptr err = nullptr;
- try {
- conn = _connect(addr);
- } catch (...) {
- err = std::current_exception();
- }
- h.set_result(std::make_pair(std::move(conn), err));
- }).get()));
- if (ret.second) std::rethrow_exception(ret.second);
- return std::move(ret.first);
- }
- else
- {
- disp_tcall->async_call([this, addr](ThreadCall::Handle &) {
- try {
- _connect(addr);
- } catch (...) {
- disp_error_cb(std::current_exception());
- }
- });
- return nullptr;
- }
+ conn_t connect_sync(const NetAddr &addr) {
+ auto ret = *(static_cast<std::pair<conn_t, std::exception_ptr> *>(
+ disp_tcall->call([this, addr](ThreadCall::Handle &h) {
+ conn_t conn;
+ std::exception_ptr err = nullptr;
+ try {
+ conn = _connect(addr);
+ } catch (...) {
+ err = std::current_exception();
+ }
+ h.set_result(std::make_pair(std::move(conn), err));
+ }).get()));
+ if (ret.second) std::rethrow_exception(ret.second);
+ return std::move(ret.first);
}
+ /** Actively connect to remote addr (async). */
+ int32_t connect(const NetAddr &addr) {
+ auto id = gen_async_id();
+ disp_tcall->async_call([this, addr, id](ThreadCall::Handle &) {
+ try {
+ _connect(addr);
+ } catch (...) {
+ this->recoverable_error(std::current_exception(), id);
+ }
+ });
+ return id;
+ }
+
+
/** Listen for passive connections (connection initiated from remote).
* Does not need to be called if do not want to accept any passive
* connections. */