From e27e529e589ef89fbe010ebf7c5635ec2873f64f Mon Sep 17 00:00:00 2001 From: Determinant Date: Wed, 12 Jun 2019 19:14:40 -0400 Subject: WIP: error handling --- include/salticidae/conn.h | 171 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 118 insertions(+), 53 deletions(-) (limited to 'include/salticidae/conn.h') diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index cb637cb..42e87aa 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -50,10 +50,6 @@ namespace salticidae { -struct ConnPoolError: public SalticidaeError { - using SalticidaeError::SalticidaeError; -}; - /** Abstraction for connection management. */ class ConnPool { class Worker; @@ -63,6 +59,7 @@ class ConnPool { using conn_t = ArcObj; /** The type of callback invoked when connection status is changed. */ using conn_callback_t = std::function; + using error_callback_t = std::function; /** Abstraction for a bi-directional connection. */ class Conn { friend ConnPool; @@ -151,6 +148,10 @@ class ConnPool { ThreadCall* disp_tcall; /** Should be implemented by derived class to return a new Conn object. */ virtual Conn *create_conn() = 0; + using _error_callback_t = std::function; + _error_callback_t disp_error_cb; + _error_callback_t worker_error_cb; + private: const int max_listen_backlog; @@ -161,6 +162,7 @@ class ConnPool { /* owned by user loop */ BoxObj user_tcall; conn_callback_t conn_cb; + error_callback_t error_cb; /* owned by the dispatcher */ FdEvent ev_listen; @@ -174,15 +176,27 @@ class ConnPool { } class Worker { + public: + + private: EventContext ec; ThreadCall tcall; std::thread handle; bool disp_flag; std::atomic nconn; + ConnPool::_error_callback_t on_fatal_error; public: Worker(): tcall(ec), disp_flag(false), nconn(0) {} + void set_error_callback(ConnPool::_error_callback_t _on_error) { + on_fatal_error = std::move(_on_error); + } + + void error_callback(const std::exception_ptr err) const { + on_fatal_error(err); + } + /* the following functions are called by the dispatcher */ void start() { handle = std::thread([this]() { ec.dispatch(); }); @@ -191,36 +205,40 @@ class ConnPool { void feed(const conn_t &conn, int client_fd) { /* the caller should finalize all the preparation */ tcall.async_call([this, conn, client_fd](ThreadCall::Handle &) { - if (conn->mode == Conn::ConnMode::DEAD) - { - SALTICIDAE_LOG_INFO("worker %x discarding dead connection", - std::this_thread::get_id()); - return; - } - assert(conn->fd != -1); - SALTICIDAE_LOG_INFO("worker %x got %s", - std::this_thread::get_id(), - std::string(*conn).c_str()); - conn->get_send_buffer() - .get_queue() - .reg_handler(this->ec, [conn, client_fd] - (MPSCWriteBuffer::queue_t &) { - if (conn->ready_send) + try { + if (conn->mode == Conn::ConnMode::DEAD) { - conn->ev_socket.del(); - conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE); - conn->send_data(client_fd, FdEvent::WRITE); + SALTICIDAE_LOG_INFO("worker %x discarding dead connection", + std::this_thread::get_id()); + return; } - return false; - }); - conn->ev_socket = FdEvent(ec, client_fd, [conn=conn](int fd, int what) { - if (what & FdEvent::READ) - conn->recv_data(fd, what); - else - conn->send_data(fd, what); - }); - conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE); - nconn++; + assert(conn->fd != -1); + SALTICIDAE_LOG_INFO("worker %x got %s", + std::this_thread::get_id(), + std::string(*conn).c_str()); + conn->get_send_buffer() + .get_queue() + .reg_handler(this->ec, [conn, client_fd] + (MPSCWriteBuffer::queue_t &) { + if (conn->ready_send) + { + conn->ev_socket.del(); + conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE); + conn->send_data(client_fd, FdEvent::WRITE); + } + return false; + }); + conn->ev_socket = FdEvent(ec, client_fd, [this, conn=conn](int fd, int what) { + try { + if (what & FdEvent::READ) + conn->recv_data(fd, what); + else + conn->send_data(fd, what); + } catch (...) { on_fatal_error(std::current_exception()); } + }); + conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE); + nconn++; + } catch (...) { on_fatal_error(std::current_exception()); } }); } @@ -253,18 +271,6 @@ class ConnPool { private: - //class DspMulticast: public DispatchCmd { - // std::vector receivers; - // bytearray_t data; - // public: - // DspMulticast(std::vector &&receivers, bytearray_t &&data): - // receivers(std::move(receivers)), - // data(std::move(data)) {} - // void exec(ConnPool *) override { - // for (auto &r: receivers) r->write(bytearray_t(data)); - // } - //}; - Worker &select_worker() { size_t idx = 0; size_t best = workers[idx].get_nconn(); @@ -280,6 +286,15 @@ class ConnPool { return workers[idx]; } + void on_fatal_error(const std::exception &error) { + stop_workers(); + if (error_cb) error_cb(error, true); + } + + void on_recoverable_error(const std::exception &error) { + if (error_cb) error_cb(error, false); + } + public: class Config { @@ -338,6 +353,30 @@ class ConnPool { disp_ec = workers[0].get_ec(); disp_tcall = workers[0].get_tcall(); workers[0].set_dispatcher(); + disp_error_cb = [this](const std::exception_ptr _err) { + user_tcall->async_call([this, _err](ThreadCall::Handle &) { + try { + std::rethrow_exception(_err); + } catch (const std::exception &err) { + on_fatal_error(err); + } + }); + }; + + worker_error_cb = [this](const std::exception_ptr err) { + disp_tcall->async_call([this, err](ThreadCall::Handle &) { + // forward to the dispatcher + disp_error_cb(err); + }); + }; + for (size_t i = 0; i < nworker; i++) + { + auto &worker = workers[i]; + if (worker.is_dispatcher()) + worker.set_error_callback(disp_error_cb); + else + worker.set_error_callback(worker_error_cb); + } } ~ConnPool() { stop(); } @@ -388,17 +427,28 @@ class ConnPool { conn_t connect(const NetAddr &addr, bool blocking = true) { if (blocking) { - auto ret = *(static_cast(disp_tcall->call( + auto ret = *(static_cast *>(disp_tcall->call( [this, addr](ThreadCall::Handle &h) { - auto conn = _connect(addr); - h.set_result(std::move(conn)); + conn_t conn; + std::exception_ptr err = nullptr; + try { + conn = _connect(addr); + } catch (...) { + err = std::current_exception(); + } + h.set_result(std::make_pair(std::move(conn), err)); }).get())); - return std::move(ret); + if (ret.second) std::rethrow_exception(ret.second); + return std::move(ret.first); } else { disp_tcall->async_call([this, addr](ThreadCall::Handle &) { - _connect(addr); + try { + _connect(addr); + } catch (...) { + disp_error_cb(std::current_exception()); + } }); return nullptr; } @@ -408,17 +458,32 @@ class ConnPool { * Does not need to be called if do not want to accept any passive * connections. */ void listen(NetAddr listen_addr) { - disp_tcall->call([this, listen_addr](ThreadCall::Handle &) { - _listen(listen_addr); - }); + auto ret = *(static_cast( + disp_tcall->call([this, listen_addr](ThreadCall::Handle &h) { + std::exception_ptr err = nullptr; + try { + _listen(listen_addr); + } catch (...) { + err = std::current_exception(); + } + h.set_result(err); + }).get())); + if (ret) std::rethrow_exception(ret); } template void reg_conn_handler(Func cb) { conn_cb = cb; } + template + void reg_error_handler(Func cb) { error_cb = cb; } + void terminate(const conn_t &conn) { disp_tcall->async_call([this, conn](ThreadCall::Handle &) { - conn->disp_terminate(); + try { + conn->disp_terminate(); + } catch (...) { + disp_error_cb(std::current_exception()); + } }); } }; -- cgit v1.2.3