aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/conn.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/conn.h')
-rw-r--r--include/salticidae/conn.h171
1 files changed, 118 insertions, 53 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index cb637cb..42e87aa 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -50,10 +50,6 @@
namespace salticidae {
-struct ConnPoolError: public SalticidaeError {
- using SalticidaeError::SalticidaeError;
-};
-
/** Abstraction for connection management. */
class ConnPool {
class Worker;
@@ -63,6 +59,7 @@ class ConnPool {
using conn_t = ArcObj<Conn>;
/** The type of callback invoked when connection status is changed. */
using conn_callback_t = std::function<void(const conn_t &, bool)>;
+ using error_callback_t = std::function<void(const std::exception &, bool)>;
/** Abstraction for a bi-directional connection. */
class Conn {
friend ConnPool;
@@ -151,6 +148,10 @@ class ConnPool {
ThreadCall* disp_tcall;
/** Should be implemented by derived class to return a new Conn object. */
virtual Conn *create_conn() = 0;
+ using _error_callback_t = std::function<void(const std::exception_ptr err)>;
+ _error_callback_t disp_error_cb;
+ _error_callback_t worker_error_cb;
+
private:
const int max_listen_backlog;
@@ -161,6 +162,7 @@ class ConnPool {
/* owned by user loop */
BoxObj<ThreadCall> user_tcall;
conn_callback_t conn_cb;
+ error_callback_t error_cb;
/* owned by the dispatcher */
FdEvent ev_listen;
@@ -174,15 +176,27 @@ class ConnPool {
}
class Worker {
+ public:
+
+ private:
EventContext ec;
ThreadCall tcall;
std::thread handle;
bool disp_flag;
std::atomic<size_t> nconn;
+ ConnPool::_error_callback_t on_fatal_error;
public:
Worker(): tcall(ec), disp_flag(false), nconn(0) {}
+ void set_error_callback(ConnPool::_error_callback_t _on_error) {
+ on_fatal_error = std::move(_on_error);
+ }
+
+ void error_callback(const std::exception_ptr err) const {
+ on_fatal_error(err);
+ }
+
/* the following functions are called by the dispatcher */
void start() {
handle = std::thread([this]() { ec.dispatch(); });
@@ -191,36 +205,40 @@ class ConnPool {
void feed(const conn_t &conn, int client_fd) {
/* the caller should finalize all the preparation */
tcall.async_call([this, conn, client_fd](ThreadCall::Handle &) {
- if (conn->mode == Conn::ConnMode::DEAD)
- {
- SALTICIDAE_LOG_INFO("worker %x discarding dead connection",
- std::this_thread::get_id());
- return;
- }
- assert(conn->fd != -1);
- SALTICIDAE_LOG_INFO("worker %x got %s",
- std::this_thread::get_id(),
- std::string(*conn).c_str());
- conn->get_send_buffer()
- .get_queue()
- .reg_handler(this->ec, [conn, client_fd]
- (MPSCWriteBuffer::queue_t &) {
- if (conn->ready_send)
+ try {
+ if (conn->mode == Conn::ConnMode::DEAD)
{
- conn->ev_socket.del();
- conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE);
- conn->send_data(client_fd, FdEvent::WRITE);
+ SALTICIDAE_LOG_INFO("worker %x discarding dead connection",
+ std::this_thread::get_id());
+ return;
}
- return false;
- });
- conn->ev_socket = FdEvent(ec, client_fd, [conn=conn](int fd, int what) {
- if (what & FdEvent::READ)
- conn->recv_data(fd, what);
- else
- conn->send_data(fd, what);
- });
- conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE);
- nconn++;
+ assert(conn->fd != -1);
+ SALTICIDAE_LOG_INFO("worker %x got %s",
+ std::this_thread::get_id(),
+ std::string(*conn).c_str());
+ conn->get_send_buffer()
+ .get_queue()
+ .reg_handler(this->ec, [conn, client_fd]
+ (MPSCWriteBuffer::queue_t &) {
+ if (conn->ready_send)
+ {
+ conn->ev_socket.del();
+ conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE);
+ conn->send_data(client_fd, FdEvent::WRITE);
+ }
+ return false;
+ });
+ conn->ev_socket = FdEvent(ec, client_fd, [this, conn=conn](int fd, int what) {
+ try {
+ if (what & FdEvent::READ)
+ conn->recv_data(fd, what);
+ else
+ conn->send_data(fd, what);
+ } catch (...) { on_fatal_error(std::current_exception()); }
+ });
+ conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE);
+ nconn++;
+ } catch (...) { on_fatal_error(std::current_exception()); }
});
}
@@ -253,18 +271,6 @@ class ConnPool {
private:
- //class DspMulticast: public DispatchCmd {
- // std::vector<conn_t> receivers;
- // bytearray_t data;
- // public:
- // DspMulticast(std::vector<conn_t> &&receivers, bytearray_t &&data):
- // receivers(std::move(receivers)),
- // data(std::move(data)) {}
- // void exec(ConnPool *) override {
- // for (auto &r: receivers) r->write(bytearray_t(data));
- // }
- //};
-
Worker &select_worker() {
size_t idx = 0;
size_t best = workers[idx].get_nconn();
@@ -280,6 +286,15 @@ class ConnPool {
return workers[idx];
}
+ void on_fatal_error(const std::exception &error) {
+ stop_workers();
+ if (error_cb) error_cb(error, true);
+ }
+
+ void on_recoverable_error(const std::exception &error) {
+ if (error_cb) error_cb(error, false);
+ }
+
public:
class Config {
@@ -338,6 +353,30 @@ class ConnPool {
disp_ec = workers[0].get_ec();
disp_tcall = workers[0].get_tcall();
workers[0].set_dispatcher();
+ disp_error_cb = [this](const std::exception_ptr _err) {
+ user_tcall->async_call([this, _err](ThreadCall::Handle &) {
+ try {
+ std::rethrow_exception(_err);
+ } catch (const std::exception &err) {
+ on_fatal_error(err);
+ }
+ });
+ };
+
+ worker_error_cb = [this](const std::exception_ptr err) {
+ disp_tcall->async_call([this, err](ThreadCall::Handle &) {
+ // forward to the dispatcher
+ disp_error_cb(err);
+ });
+ };
+ for (size_t i = 0; i < nworker; i++)
+ {
+ auto &worker = workers[i];
+ if (worker.is_dispatcher())
+ worker.set_error_callback(disp_error_cb);
+ else
+ worker.set_error_callback(worker_error_cb);
+ }
}
~ConnPool() { stop(); }
@@ -388,17 +427,28 @@ class ConnPool {
conn_t connect(const NetAddr &addr, bool blocking = true) {
if (blocking)
{
- auto ret = *(static_cast<conn_t *>(disp_tcall->call(
+ auto ret = *(static_cast<std::pair<conn_t, std::exception_ptr> *>(disp_tcall->call(
[this, addr](ThreadCall::Handle &h) {
- auto conn = _connect(addr);
- h.set_result(std::move(conn));
+ conn_t conn;
+ std::exception_ptr err = nullptr;
+ try {
+ conn = _connect(addr);
+ } catch (...) {
+ err = std::current_exception();
+ }
+ h.set_result(std::make_pair(std::move(conn), err));
}).get()));
- return std::move(ret);
+ if (ret.second) std::rethrow_exception(ret.second);
+ return std::move(ret.first);
}
else
{
disp_tcall->async_call([this, addr](ThreadCall::Handle &) {
- _connect(addr);
+ try {
+ _connect(addr);
+ } catch (...) {
+ disp_error_cb(std::current_exception());
+ }
});
return nullptr;
}
@@ -408,17 +458,32 @@ class ConnPool {
* Does not need to be called if do not want to accept any passive
* connections. */
void listen(NetAddr listen_addr) {
- disp_tcall->call([this, listen_addr](ThreadCall::Handle &) {
- _listen(listen_addr);
- });
+ auto ret = *(static_cast<std::exception_ptr *>(
+ disp_tcall->call([this, listen_addr](ThreadCall::Handle &h) {
+ std::exception_ptr err = nullptr;
+ try {
+ _listen(listen_addr);
+ } catch (...) {
+ err = std::current_exception();
+ }
+ h.set_result(err);
+ }).get()));
+ if (ret) std::rethrow_exception(ret);
}
template<typename Func>
void reg_conn_handler(Func cb) { conn_cb = cb; }
+ template<typename Func>
+ void reg_error_handler(Func cb) { error_cb = cb; }
+
void terminate(const conn_t &conn) {
disp_tcall->async_call([this, conn](ThreadCall::Handle &) {
- conn->disp_terminate();
+ try {
+ conn->disp_terminate();
+ } catch (...) {
+ disp_error_cb(std::current_exception());
+ }
});
}
};