diff options
author | Determinant <[email protected]> | 2019-06-12 19:14:40 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2019-06-12 19:14:40 -0400 |
commit | e27e529e589ef89fbe010ebf7c5635ec2873f64f (patch) | |
tree | 76e7d20589d11f3aabd255ca70201f1288d1d1bc | |
parent | 9f6460c7ab774d900f391345bbf3fac8617a3aa3 (diff) |
WIP: error handling
-rw-r--r-- | include/salticidae/conn.h | 171 | ||||
-rw-r--r-- | include/salticidae/event.h | 15 | ||||
-rw-r--r-- | include/salticidae/netaddr.h | 10 | ||||
-rw-r--r-- | include/salticidae/network.h | 195 | ||||
-rw-r--r-- | include/salticidae/queue.h | 1 | ||||
-rw-r--r-- | include/salticidae/util.h | 48 | ||||
-rw-r--r-- | src/conn.cpp | 114 | ||||
-rw-r--r-- | src/network.cpp | 9 | ||||
-rw-r--r-- | src/util.cpp | 26 | ||||
-rw-r--r-- | test/test_msgnet_c.c | 42 | ||||
-rw-r--r-- | test/test_p2p_stress.cpp | 5 |
11 files changed, 419 insertions, 217 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index cb637cb..42e87aa 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -50,10 +50,6 @@ namespace salticidae { -struct ConnPoolError: public SalticidaeError { - using SalticidaeError::SalticidaeError; -}; - /** Abstraction for connection management. */ class ConnPool { class Worker; @@ -63,6 +59,7 @@ class ConnPool { using conn_t = ArcObj<Conn>; /** The type of callback invoked when connection status is changed. */ using conn_callback_t = std::function<void(const conn_t &, bool)>; + using error_callback_t = std::function<void(const std::exception &, bool)>; /** Abstraction for a bi-directional connection. */ class Conn { friend ConnPool; @@ -151,6 +148,10 @@ class ConnPool { ThreadCall* disp_tcall; /** Should be implemented by derived class to return a new Conn object. */ virtual Conn *create_conn() = 0; + using _error_callback_t = std::function<void(const std::exception_ptr err)>; + _error_callback_t disp_error_cb; + _error_callback_t worker_error_cb; + private: const int max_listen_backlog; @@ -161,6 +162,7 @@ class ConnPool { /* owned by user loop */ BoxObj<ThreadCall> user_tcall; conn_callback_t conn_cb; + error_callback_t error_cb; /* owned by the dispatcher */ FdEvent ev_listen; @@ -174,15 +176,27 @@ class ConnPool { } class Worker { + public: + + private: EventContext ec; ThreadCall tcall; std::thread handle; bool disp_flag; std::atomic<size_t> nconn; + ConnPool::_error_callback_t on_fatal_error; public: Worker(): tcall(ec), disp_flag(false), nconn(0) {} + void set_error_callback(ConnPool::_error_callback_t _on_error) { + on_fatal_error = std::move(_on_error); + } + + void error_callback(const std::exception_ptr err) const { + on_fatal_error(err); + } + /* the following functions are called by the dispatcher */ void start() { handle = std::thread([this]() { ec.dispatch(); }); @@ -191,36 +205,40 @@ class ConnPool { void feed(const conn_t &conn, int client_fd) { /* the caller should finalize all the preparation */ tcall.async_call([this, conn, client_fd](ThreadCall::Handle &) { - if (conn->mode == Conn::ConnMode::DEAD) - { - SALTICIDAE_LOG_INFO("worker %x discarding dead connection", - std::this_thread::get_id()); - return; - } - assert(conn->fd != -1); - SALTICIDAE_LOG_INFO("worker %x got %s", - std::this_thread::get_id(), - std::string(*conn).c_str()); - conn->get_send_buffer() - .get_queue() - .reg_handler(this->ec, [conn, client_fd] - (MPSCWriteBuffer::queue_t &) { - if (conn->ready_send) + try { + if (conn->mode == Conn::ConnMode::DEAD) { - conn->ev_socket.del(); - conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE); - conn->send_data(client_fd, FdEvent::WRITE); + SALTICIDAE_LOG_INFO("worker %x discarding dead connection", + std::this_thread::get_id()); + return; } - return false; - }); - conn->ev_socket = FdEvent(ec, client_fd, [conn=conn](int fd, int what) { - if (what & FdEvent::READ) - conn->recv_data(fd, what); - else - conn->send_data(fd, what); - }); - conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE); - nconn++; + assert(conn->fd != -1); + SALTICIDAE_LOG_INFO("worker %x got %s", + std::this_thread::get_id(), + std::string(*conn).c_str()); + conn->get_send_buffer() + .get_queue() + .reg_handler(this->ec, [conn, client_fd] + (MPSCWriteBuffer::queue_t &) { + if (conn->ready_send) + { + conn->ev_socket.del(); + conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE); + conn->send_data(client_fd, FdEvent::WRITE); + } + return false; + }); + conn->ev_socket = FdEvent(ec, client_fd, [this, conn=conn](int fd, int what) { + try { + if (what & FdEvent::READ) + conn->recv_data(fd, what); + else + conn->send_data(fd, what); + } catch (...) { on_fatal_error(std::current_exception()); } + }); + conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE); + nconn++; + } catch (...) { on_fatal_error(std::current_exception()); } }); } @@ -253,18 +271,6 @@ class ConnPool { private: - //class DspMulticast: public DispatchCmd { - // std::vector<conn_t> receivers; - // bytearray_t data; - // public: - // DspMulticast(std::vector<conn_t> &&receivers, bytearray_t &&data): - // receivers(std::move(receivers)), - // data(std::move(data)) {} - // void exec(ConnPool *) override { - // for (auto &r: receivers) r->write(bytearray_t(data)); - // } - //}; - Worker &select_worker() { size_t idx = 0; size_t best = workers[idx].get_nconn(); @@ -280,6 +286,15 @@ class ConnPool { return workers[idx]; } + void on_fatal_error(const std::exception &error) { + stop_workers(); + if (error_cb) error_cb(error, true); + } + + void on_recoverable_error(const std::exception &error) { + if (error_cb) error_cb(error, false); + } + public: class Config { @@ -338,6 +353,30 @@ class ConnPool { disp_ec = workers[0].get_ec(); disp_tcall = workers[0].get_tcall(); workers[0].set_dispatcher(); + disp_error_cb = [this](const std::exception_ptr _err) { + user_tcall->async_call([this, _err](ThreadCall::Handle &) { + try { + std::rethrow_exception(_err); + } catch (const std::exception &err) { + on_fatal_error(err); + } + }); + }; + + worker_error_cb = [this](const std::exception_ptr err) { + disp_tcall->async_call([this, err](ThreadCall::Handle &) { + // forward to the dispatcher + disp_error_cb(err); + }); + }; + for (size_t i = 0; i < nworker; i++) + { + auto &worker = workers[i]; + if (worker.is_dispatcher()) + worker.set_error_callback(disp_error_cb); + else + worker.set_error_callback(worker_error_cb); + } } ~ConnPool() { stop(); } @@ -388,17 +427,28 @@ class ConnPool { conn_t connect(const NetAddr &addr, bool blocking = true) { if (blocking) { - auto ret = *(static_cast<conn_t *>(disp_tcall->call( + auto ret = *(static_cast<std::pair<conn_t, std::exception_ptr> *>(disp_tcall->call( [this, addr](ThreadCall::Handle &h) { - auto conn = _connect(addr); - h.set_result(std::move(conn)); + conn_t conn; + std::exception_ptr err = nullptr; + try { + conn = _connect(addr); + } catch (...) { + err = std::current_exception(); + } + h.set_result(std::make_pair(std::move(conn), err)); }).get())); - return std::move(ret); + if (ret.second) std::rethrow_exception(ret.second); + return std::move(ret.first); } else { disp_tcall->async_call([this, addr](ThreadCall::Handle &) { - _connect(addr); + try { + _connect(addr); + } catch (...) { + disp_error_cb(std::current_exception()); + } }); return nullptr; } @@ -408,17 +458,32 @@ class ConnPool { * Does not need to be called if do not want to accept any passive * connections. */ void listen(NetAddr listen_addr) { - disp_tcall->call([this, listen_addr](ThreadCall::Handle &) { - _listen(listen_addr); - }); + auto ret = *(static_cast<std::exception_ptr *>( + disp_tcall->call([this, listen_addr](ThreadCall::Handle &h) { + std::exception_ptr err = nullptr; + try { + _listen(listen_addr); + } catch (...) { + err = std::current_exception(); + } + h.set_result(err); + }).get())); + if (ret) std::rethrow_exception(ret); } template<typename Func> void reg_conn_handler(Func cb) { conn_cb = cb; } + template<typename Func> + void reg_error_handler(Func cb) { error_cb = cb; } + void terminate(const conn_t &conn) { disp_tcall->async_call([this, conn](ThreadCall::Handle &) { - conn->disp_terminate(); + try { + conn->disp_terminate(); + } catch (...) { + disp_error_cb(std::current_exception()); + } }); } }; diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 91637e4..5a315dd 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -467,8 +467,14 @@ class MPSCQueueEventDriven: public MPSCQueue<T> { // since all enqueue operations are finalized, the dequeue should be able // to see those enqueued values in func() wait_sig.exchange(true, std::memory_order_acq_rel); - if (func(*this)) + bool again; + try { + again = func(*this); + } catch (SalticidaeError &err) { write(fd, &dummy, 8); + throw err; + } + if (again) write(fd, &dummy, 8); }); ev.add(FdEvent::READ); } @@ -610,7 +616,12 @@ class ThreadCall { Handle *h; while (q.try_dequeue(h)) { - h->exec(); + try { + h->exec(); + } catch (SalticidaeError &err) { + delete h; + throw err; + } delete h; if (++cnt == burst_size) return true; } diff --git a/include/salticidae/netaddr.h b/include/salticidae/netaddr.h index 909e092..a25bf6f 100644 --- a/include/salticidae/netaddr.h +++ b/include/salticidae/netaddr.h @@ -53,7 +53,7 @@ struct NetAddr { void set_by_ip_port(const std::string &_addr, uint16_t _port) { struct hostent *h; if ((h = gethostbyname(_addr.c_str())) == nullptr) - throw SalticidaeError("gethostbyname failed"); + throw SalticidaeError(SALTI_ERROR_NETADDR_INVALID, errno); memmove(&ip, h->h_addr_list[0], sizeof(in_addr_t)); port = htons(_port); } @@ -61,19 +61,19 @@ struct NetAddr { NetAddr(const std::string &ip_port_addr) { size_t pos = ip_port_addr.find(":"); if (pos == std::string::npos) - throw SalticidaeError("invalid port format"); + throw SalticidaeError(SALTI_ERROR_NETADDR_INVALID); std::string ip_str = ip_port_addr.substr(0, pos); std::string port_str = ip_port_addr.substr(pos + 1); long port; try { port = std::stol(port_str.c_str()); } catch (std::logic_error &) { - throw SalticidaeError("invalid port format"); + throw SalticidaeError(SALTI_ERROR_NETADDR_INVALID); } if (port < 0) - throw SalticidaeError("negative port number"); + throw SalticidaeError(SALTI_ERROR_NETADDR_INVALID); if (port > 0xffff) - throw SalticidaeError("port number greater than 0xffff"); + throw SalticidaeError(SALTI_ERROR_NETADDR_INVALID); set_by_ip_port(ip_str, (uint16_t)port); } /* construct from unix socket format */ diff --git a/include/salticidae/network.h b/include/salticidae/network.h index e7e77f5..9c57749 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -199,8 +199,10 @@ class MsgNetwork: public ConnPool { } template<typename MsgType> - bool send_msg(MsgType &&msg, const conn_t &conn); - inline bool _send_msg(const Msg &msg, const conn_t &conn); + void send_msg(MsgType &&msg, const conn_t &conn); + inline void _send_msg(Msg &&msg, const conn_t &conn); + inline void _send_msg_dispatcher(const Msg &msg, const conn_t &conn); + using ConnPool::listen; conn_t connect(const NetAddr &addr) { return static_pointer_cast<Conn>(ConnPool::connect(addr)); @@ -248,10 +250,6 @@ class ClientNetwork: public MsgNetwork<OpcodeType> { void send_msg(MsgType &&msg, const NetAddr &addr); }; -class PeerNetworkError: public ConnPoolError { - using ConnPoolError::ConnPoolError; -}; - /** Peer-to-peer network where any two nodes could hold a bi-diretional message * channel, established by either side. */ template<typename OpcodeType = uint8_t, @@ -434,9 +432,9 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { template<typename MsgType> void send_msg(MsgType &&msg, const NetAddr &paddr); inline void _send_msg(Msg &&msg, const NetAddr &paddr); - template<typename MsgType> void multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs); + inline void _multicast_msg(Msg &&msg, const std::vector<NetAddr> &paddrs); void listen(NetAddr listen_addr); conn_t connect(const NetAddr &addr) = delete; @@ -480,13 +478,23 @@ void MsgNetwork<OpcodeType>::Conn::on_read() { template<typename OpcodeType> template<typename MsgType> -bool MsgNetwork<OpcodeType>::send_msg(MsgType &&_msg, const conn_t &conn) { - Msg msg(std::forward<MsgType>(_msg)); - return _send_msg(msg, conn); +void MsgNetwork<OpcodeType>::send_msg(MsgType &&msg, const conn_t &conn) { + return _send_msg(MsgType(std::move(msg)), conn); +} + +template<typename OpcodeType> +inline void MsgNetwork<OpcodeType>::_send_msg(Msg &&msg, const conn_t &conn) { + this->disp_tcall->async_call( + [this, msg=std::move(msg), conn](ThreadCall::Handle &) { + try { + this->_send_msg_dispatcher(msg, conn); + throw SalticidaeError("wow"); + } catch (...) { this->disp_error_cb(std::current_exception()); } + }); } template<typename OpcodeType> -inline bool MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn) { +inline void MsgNetwork<OpcodeType>::_send_msg_dispatcher(const Msg &msg, const conn_t &conn) { bytearray_t msg_data = msg.serialize(); SALTICIDAE_LOG_DEBUG("wrote message %s to %s", std::string(msg).c_str(), @@ -495,17 +503,19 @@ inline bool MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn conn->nsent++; conn->nsentb += msg.get_length(); #endif - return conn->write(std::move(msg_data)); + conn->write(std::move(msg_data)); } template<typename O, O _, O __> void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker, const conn_t &conn, double timeout) { - worker->get_tcall()->async_call([conn, t=timeout](ThreadCall::Handle &) { - if (!conn->ev_timeout) return; - conn->ev_timeout.del(); - conn->ev_timeout.add(t); - SALTICIDAE_LOG_DEBUG("reset connection timeout %.2f", t); + worker->get_tcall()->async_call([worker, conn, t=timeout](ThreadCall::Handle &) { + try { + if (!conn->ev_timeout) return; + conn->ev_timeout.del(); + conn->ev_timeout.add(t); + SALTICIDAE_LOG_DEBUG("reset connection timeout %.2f", t); + } catch (...) { worker->error_callback(std::current_exception()); } }); } @@ -517,9 +527,11 @@ void PeerNetwork<O, _, __>::Conn::on_setup() { auto conn = static_pointer_cast<Conn>(this->self()); auto worker = this->worker; assert(!ev_timeout); - ev_timeout = TimerEvent(worker->get_ec(), [conn](TimerEvent &) { - SALTICIDAE_LOG_INFO("peer ping-pong timeout"); - conn->worker_terminate(); + ev_timeout = TimerEvent(worker->get_ec(), [worker, conn](TimerEvent &) { + try { + SALTICIDAE_LOG_INFO("peer ping-pong timeout"); + conn->worker_terminate(); + } catch (...) { worker->error_callback(std::current_exception()); } }); /* the initial ping-pong to set up the connection */ tcall_reset_timeout(worker, conn, pn->conn_timeout); @@ -541,7 +553,9 @@ void PeerNetwork<O, _, __>::Conn::on_teardown() { // try to reconnect p->ev_retry_timer = TimerEvent(pn->disp_ec, [pn, peer_id = this->peer_id](TimerEvent &) { - pn->start_active_conn(peer_id); + try { + pn->start_active_conn(peer_id); + } catch (...) { pn->disp_error_cb(std::current_exception()); } }); p->ev_retry_timer.add(pn->gen_conn_timeout()); } @@ -647,11 +661,13 @@ template<typename O, O _, O __> void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, const conn_t &conn) { uint16_t port = msg.port; this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &) { - if (conn->get_mode() == ConnPool::Conn::DEAD) return; - SALTICIDAE_LOG_INFO("ping from %s, port %u", - std::string(*conn).c_str(), ntohs(port)); - if (check_new_conn(conn, port)) return; - send_msg(MsgPong(this->listen_port), conn); + try { + if (conn->get_mode() == ConnPool::Conn::DEAD) return; + SALTICIDAE_LOG_INFO("ping from %s, port %u", + std::string(*conn).c_str(), ntohs(port)); + if (check_new_conn(conn, port)) return; + send_msg(MsgPong(this->listen_port), conn); + } catch (...) { this->disp_error_cb(std::current_exception()); } }); } @@ -659,54 +675,74 @@ template<typename O, O _, O __> void PeerNetwork<O, _, __>::msg_pong(MsgPong &&msg, const conn_t &conn) { uint16_t port = msg.port; this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &) { - if (conn->get_mode() == ConnPool::Conn::DEAD) return; - auto it = id2peer.find(conn->peer_id); - if (it == id2peer.end()) - { - SALTICIDAE_LOG_WARN("pong message discarded"); - return; - } - if (check_new_conn(conn, port)) return; - auto p = it->second.get(); - p->pong_msg_ok = true; - if (p->ping_timer_ok) - { - p->reset_ping_timer(); - p->send_ping(); - } + try { + if (conn->get_mode() == ConnPool::Conn::DEAD) return; + auto it = id2peer.find(conn->peer_id); + if (it == id2peer.end()) + { + SALTICIDAE_LOG_WARN("pong message discarded"); + return; + } + if (check_new_conn(conn, port)) return; + auto p = it->second.get(); + p->pong_msg_ok = true; + if (p->ping_timer_ok) + { + p->reset_ping_timer(); + p->send_ping(); + } + } catch (...) { this->disp_error_cb(std::current_exception()); } }); } template<typename O, O _, O __> void PeerNetwork<O, _, __>::listen(NetAddr listen_addr) { - this->disp_tcall->call([this, listen_addr](ThreadCall::Handle &) { - MsgNet::_listen(listen_addr); - listen_port = listen_addr.port; - }); + auto ret = *(static_cast<SalticidaeError *>( + this->disp_tcall->call([this, listen_addr](ThreadCall::Handle &h) { + SalticidaeError err; + try { + MsgNet::_listen(listen_addr); + listen_port = listen_addr.port; + } catch (SalticidaeError &e) { + err = e; + } + h.set_result(std::move(err)); + }).get())); + if (ret.get_code()) throw ret; } template<typename O, O _, O __> void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) { - this->disp_tcall->call([this, addr](ThreadCall::Handle &) { - auto it = id2peer.find(addr); - if (it != id2peer.end()) - throw PeerNetworkError("peer already exists"); - id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->disp_ec))); - start_active_conn(addr); + this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) { + try { + auto it = id2peer.find(addr); + if (it != id2peer.end()) + throw PeerNetworkError(SALTI_ERROR_PEER_ALREADY_EXISTS); + id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->disp_ec))); + start_active_conn(addr); + } catch (...) { this->disp_error_cb(std::current_exception()); } }); } template<typename O, O _, O __> const typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &paddr) const { - auto ret = *(static_cast<conn_t *>(this->disp_tcall->call( - [this, paddr](ThreadCall::Handle &h) { - auto it = id2peer.find(paddr); - if (it == id2peer.end()) - throw PeerNetworkError("peer does not exist"); - h.set_result(it->second->conn); + auto ret = *(static_cast<std::pair<conn_t, SalticidaeError> *>( + this->disp_tcall->call([this, paddr](ThreadCall::Handle &h) { + conn_t conn; + SalticidaeError err; + try { + auto it = id2peer.find(paddr); + if (it == id2peer.end()) + throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXISTS); + conn = it->second->conn; + } catch (SalticidaeError &e) { + err = e; + } + h.set_result(std::make_pair(std::move(conn), std::move(err))); }).get())); - return std::move(ret); + if (ret.second.get_code()) throw ret.second; + return std::move(ret.first); } template<typename O, O _, O __> @@ -720,32 +756,41 @@ bool PeerNetwork<O, _, __>::has_peer(const NetAddr &paddr) const { template<typename O, O _, O __> template<typename MsgType> void PeerNetwork<O, _, __>::send_msg(MsgType &&msg, const NetAddr &paddr) { - return _send_msg(MsgType(msg), paddr); + return _send_msg(MsgType(std::move(msg)), paddr); } template<typename O, O _, O __> void PeerNetwork<O, _, __>::_send_msg(Msg &&msg, const NetAddr &paddr) { this->disp_tcall->async_call( [this, msg=std::move(msg), paddr](ThreadCall::Handle &) { - auto it = id2peer.find(paddr); - if (it == id2peer.end()) - throw PeerNetworkError("peer does not exist"); - send_msg(std::move(msg), it->second->conn); + try { + auto it = id2peer.find(paddr); + if (it == id2peer.end()) + throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXISTS); + this->_send_msg_dispatcher(msg, it->second->conn); + } catch (...) { this->disp_error_cb(std::current_exception()); } }); } template<typename O, O _, O __> template<typename MsgType> void PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs) { + return _multicast_msg(MsgType(std::move(msg)), paddrs); +} + +template<typename O, O _, O __> +void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &paddrs) { this->disp_tcall->async_call( [this, msg=std::move(msg), paddrs](ThreadCall::Handle &) { - for (auto &addr: paddrs) - { - auto it = id2peer.find(addr); - if (it == id2peer.end()) - throw PeerNetworkError("peer does not exist"); - send_msg(std::move(msg), it->second->conn); - } + try { + for (auto &addr: paddrs) + { + auto it = id2peer.find(addr); + if (it == id2peer.end()) + throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXISTS); + this->_send_msg_dispatcher(msg, it->second->conn); + } + } catch (...) { this->disp_error_cb(std::current_exception()); } }); } @@ -774,9 +819,11 @@ template<typename MsgType> void ClientNetwork<OpcodeType>::send_msg(MsgType &&msg, const NetAddr &addr) { this->disp_tcall->async_call( [this, addr, msg=std::forward<MsgType>(msg)](ThreadCall::Handle &) { - auto it = addr2conn.find(addr); - if (it != addr2conn.end()) - send_msg(std::move(msg), it->second); + try { + auto it = addr2conn.find(addr); + if (it != addr2conn.end()) + send_msg(std::move(msg), it->second); + } catch (...) { this->disp_error_cb(std::current_exception()); } }); } @@ -842,7 +889,7 @@ void msgnetwork_config_queue_capacity(msgnetwork_config_t *self, size_t cap); msgnetwork_t *msgnetwork_new(const eventcontext_t *ec, const msgnetwork_config_t *config); void msgnetwork_free(const msgnetwork_t *self); -bool msgnetwork_send_msg(msgnetwork_t *self, const msg_t *msg, const msgnetwork_conn_t *conn); +void msgnetwork_send_msg_by_move(msgnetwork_t *self, msg_t *_moved_msg, const msgnetwork_conn_t *conn); msgnetwork_conn_t *msgnetwork_connect(msgnetwork_t *self, const netaddr_t *addr); msgnetwork_conn_t *msgnetwork_conn_copy(const msgnetwork_conn_t *self); void msgnetwork_conn_free(const msgnetwork_conn_t *self); diff --git a/include/salticidae/queue.h b/include/salticidae/queue.h index da11e8a..9fd11f1 100644 --- a/include/salticidae/queue.h +++ b/include/salticidae/queue.h @@ -17,6 +17,7 @@ class FreeList { std::atomic<size_t> refcnt; std::atomic<bool> freed; Node(): next(nullptr), refcnt(1), freed(false) {} + virtual ~Node() {} }; private: diff --git a/include/salticidae/util.h b/include/salticidae/util.h index 19779b0..0ddf5be 100644 --- a/include/salticidae/util.h +++ b/include/salticidae/util.h @@ -30,6 +30,7 @@ #include <string> #include <exception> #include <cstdarg> +#include <cstring> #include <vector> #include <unordered_map> #include <functional> @@ -52,18 +53,57 @@ std::vector<std::string> trim_all(const std::vector<std::string> &ss); std::string vstringprintf(const char *fmt, va_list ap); std::string stringprintf(const char *fmt, ...); +enum SalticidaeErrorCode { + SALTI_NORMAL, + SALTI_ERROR_GENERIC, + SALTI_ERROR_ACCEPT, + SALTI_ERROR_LISTEN, + SALTI_ERROR_CONNECT, + SALTI_ERROR_PEER_ALREADY_EXISTS, + SALTI_ERROR_PEER_NOT_EXISTS, + SALTI_ERROR_NETADDR_INVALID, + SALTI_ERROR_OPTVAL_INVALID, + SALTI_ERROR_OPTNAME_ALREADY_EXISTS, + SALTI_ERROR_OPT_UNKNOWN_ACTION, + SALTI_ERROR_CONFIG_LINE_TOO_LONG, + SALTI_ERROR_OPT_INVALID +}; + +extern const char *SALTICIDAE_ERROR_STRINGS[]; + class SalticidaeError: public std::exception { std::string msg; + int code; + int oscode; + public: - SalticidaeError(); + SalticidaeError() : code(SALTI_NORMAL) {} template<typename... Args> - SalticidaeError(const std::string &fmt, Args... args) { + SalticidaeError(const std::string &fmt, Args... args): code(SALTI_ERROR_GENERIC) { msg = stringprintf(fmt.c_str(), args...); } + SalticidaeError(int code, int oscode = 0): code(code), oscode(oscode) { + if (oscode) + msg = stringprintf("%s: %s", SALTICIDAE_ERROR_STRINGS[code], strerror(oscode)); + else + msg = SALTICIDAE_ERROR_STRINGS[code]; + } + operator std::string() const { return msg; } const char *what() const throw() override { return msg.c_str(); } + int get_code() const { return code; } + int get_oscode() const { return oscode; } +}; + + +struct ConnPoolError: public SalticidaeError { + using SalticidaeError::SalticidaeError; +}; + +class PeerNetworkError: public ConnPoolError { + using ConnPoolError::ConnPoolError; }; extern const char *TTY_COLOR_RED; @@ -217,7 +257,7 @@ class Config { try { val = stoi(strval, &idx); } catch (std::invalid_argument &) { - throw SalticidaeError("invalid integer"); + throw SalticidaeError(SALTI_ERROR_OPTVAL_INVALID); } } int &get() { return val; } @@ -237,7 +277,7 @@ class Config { try { val = stod(strval, &idx); } catch (std::invalid_argument &) { - throw SalticidaeError("invalid double"); + throw SalticidaeError(SALTI_ERROR_OPTVAL_INVALID); } } double &get() { return val; } diff --git a/src/conn.cpp b/src/conn.cpp index 5fc59f3..7f485fd 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -170,34 +170,38 @@ void ConnPool::Conn::disp_terminate() { void ConnPool::accept_client(int fd, int) { int client_fd; struct sockaddr client_addr; - socklen_t addr_size = sizeof(struct sockaddr_in); - if ((client_fd = accept(fd, &client_addr, &addr_size)) < 0) - SALTICIDAE_LOG_ERROR("error while accepting the connection: %s", - strerror(errno)); - else - { - int one = 1; - if (setsockopt(client_fd, SOL_TCP, TCP_NODELAY, (const char *)&one, sizeof(one)) < 0) - throw ConnPoolError(std::string("setsockopt failed")); - if (fcntl(client_fd, F_SETFL, O_NONBLOCK) == -1) - throw ConnPoolError(std::string("unable to set nonblocking socket")); + try { + socklen_t addr_size = sizeof(struct sockaddr_in); + if ((client_fd = accept(fd, &client_addr, &addr_size)) < 0) + throw ConnPoolError(SALTI_ERROR_ACCEPT, errno); + else + { + int one = 1; + if (setsockopt(client_fd, SOL_TCP, TCP_NODELAY, (const char *)&one, sizeof(one)) < 0) + throw ConnPoolError(SALTI_ERROR_ACCEPT, errno); + if (fcntl(client_fd, F_SETFL, O_NONBLOCK) == -1) + throw ConnPoolError(SALTI_ERROR_ACCEPT, errno); - NetAddr addr((struct sockaddr_in *)&client_addr); - conn_t conn = create_conn(); - conn->self_ref = conn; - conn->send_buffer.set_capacity(queue_capacity); - conn->seg_buff_size = seg_buff_size; - conn->fd = client_fd; - conn->cpool = this; - conn->mode = Conn::PASSIVE; - conn->addr = addr; - add_conn(conn); - SALTICIDAE_LOG_INFO("accepted %s", std::string(*conn).c_str()); - auto &worker = select_worker(); - conn->worker = &worker; - conn->on_setup(); - update_conn(conn, true); - worker.feed(conn, client_fd); + NetAddr addr((struct sockaddr_in *)&client_addr); + conn_t conn = create_conn(); + conn->self_ref = conn; + conn->send_buffer.set_capacity(queue_capacity); + conn->seg_buff_size = seg_buff_size; + conn->fd = client_fd; + conn->cpool = this; + conn->mode = Conn::PASSIVE; + conn->addr = addr; + add_conn(conn); + SALTICIDAE_LOG_INFO("accepted %s", std::string(*conn).c_str()); + auto &worker = select_worker(); + conn->worker = &worker; + conn->on_setup(); + update_conn(conn, true); + worker.feed(conn, client_fd); + } + } catch (ConnPoolError &e) { + SALTICIDAE_LOG_ERROR("%s", e.what()); + throw e; } } @@ -229,24 +233,29 @@ void ConnPool::_listen(NetAddr listen_addr) { ev_listen.clear(); close(listen_fd); } - if ((listen_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) - throw ConnPoolError(std::string("cannot create socket for listening")); - if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, (const char *)&one, sizeof(one)) < 0 || - setsockopt(listen_fd, SOL_TCP, TCP_NODELAY, (const char *)&one, sizeof(one)) < 0) - throw ConnPoolError(std::string("setsockopt failed")); - if (fcntl(listen_fd, F_SETFL, O_NONBLOCK) == -1) - throw ConnPoolError(std::string("unable to set nonblocking socket")); + try { + if ((listen_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) + throw ConnPoolError(SALTI_ERROR_LISTEN, errno); + if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, (const char *)&one, sizeof(one)) < 0 || + setsockopt(listen_fd, SOL_TCP, TCP_NODELAY, (const char *)&one, sizeof(one)) < 0) + throw ConnPoolError(SALTI_ERROR_LISTEN, errno); + if (fcntl(listen_fd, F_SETFL, O_NONBLOCK) == -1) + throw ConnPoolError(SALTI_ERROR_LISTEN, errno); - struct sockaddr_in sockin; - memset(&sockin, 0, sizeof(struct sockaddr_in)); - sockin.sin_family = AF_INET; - sockin.sin_addr.s_addr = INADDR_ANY; - sockin.sin_port = listen_addr.port; + struct sockaddr_in sockin; + memset(&sockin, 0, sizeof(struct sockaddr_in)); + sockin.sin_family = AF_INET; + sockin.sin_addr.s_addr = INADDR_ANY; + sockin.sin_port = listen_addr.port; - if (bind(listen_fd, (struct sockaddr *)&sockin, sizeof(sockin)) < 0) - throw ConnPoolError(std::string("binding error")); - if (::listen(listen_fd, max_listen_backlog) < 0) - throw ConnPoolError(std::string("listen error")); + if (bind(listen_fd, (struct sockaddr *)&sockin, sizeof(sockin)) < 0) + throw ConnPoolError(SALTI_ERROR_LISTEN, errno); + if (::listen(listen_fd, max_listen_backlog) < 0) + throw ConnPoolError(SALTI_ERROR_LISTEN, errno); + } catch (ConnPoolError &e) { + SALTICIDAE_LOG_ERROR("%s", e.what()); + throw e; + } ev_listen = FdEvent(disp_ec, listen_fd, std::bind(&ConnPool::accept_client, this, _1, _2)); ev_listen.add(FdEvent::READ); @@ -256,13 +265,18 @@ void ConnPool::_listen(NetAddr listen_addr) { ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) { int fd; int one = 1; - if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) - throw ConnPoolError(std::string("cannot create socket for remote")); - if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const char *)&one, sizeof(one)) < 0 || - setsockopt(fd, SOL_TCP, TCP_NODELAY, (const char *)&one, sizeof(one)) < 0) - throw ConnPoolError(std::string("setsockopt failed")); - if (fcntl(fd, F_SETFL, O_NONBLOCK) == -1) - throw ConnPoolError(std::string("unable to set nonblocking socket")); + try { + if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) + throw ConnPoolError(SALTI_ERROR_CONNECT, errno); + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const char *)&one, sizeof(one)) < 0 || + setsockopt(fd, SOL_TCP, TCP_NODELAY, (const char *)&one, sizeof(one)) < 0) + throw ConnPoolError(SALTI_ERROR_CONNECT, errno); + if (fcntl(fd, F_SETFL, O_NONBLOCK) == -1) + throw ConnPoolError(SALTI_ERROR_CONNECT, errno); + } catch (ConnPoolError &e) { + SALTICIDAE_LOG_ERROR("%s", e.what()); + throw e; + } conn_t conn = create_conn(); conn->self_ref = conn; conn->send_buffer.set_capacity(queue_capacity); diff --git a/src/network.cpp b/src/network.cpp index 49bad48..30ef0eb 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -44,9 +44,10 @@ void msgnetwork_config_queue_capacity(msgnetwork_config_t *self, size_t cap) { self->queue_capacity(cap); } -bool msgnetwork_send_msg(msgnetwork_t *self, - const msg_t *msg, const msgnetwork_conn_t *conn) { - return self->_send_msg(*msg, *conn); +void msgnetwork_send_msg_by_move(msgnetwork_t *self, + msg_t *_moved_msg, const msgnetwork_conn_t *conn) { + self->_send_msg(std::move(*_moved_msg), *conn); + delete _moved_msg; } msgnetwork_conn_t *msgnetwork_connect(msgnetwork_t *self, const netaddr_t *addr) { @@ -167,7 +168,7 @@ void peernetwork_send_msg_by_move(peernetwork_t *self, void peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const netaddr_array_t *paddrs) { - self->multicast_msg(std::move(*_moved_msg), *paddrs); + self->_multicast_msg(std::move(*_moved_msg), *paddrs); delete _moved_msg; } diff --git a/src/util.cpp b/src/util.cpp index a0d5044..f762b4c 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -34,6 +34,22 @@ namespace salticidae { +const char *SALTICIDAE_ERROR_STRINGS[] = { + "", + "generic", + "unable to accept", + "unable to listen", + "unable to connect", + "peer already exists", + "peer does not exist", + "invalid NetAddr format", + "invalid OptVal format", + "option name already exists", + "unknown action", + "configuration file line too long", + "invalid option format" +}; + const char *TTY_COLOR_RED = "\x1b[31m"; const char *TTY_COLOR_GREEN = "\x1b[32m"; const char *TTY_COLOR_YELLOW = "\x1b[33m"; @@ -91,8 +107,6 @@ const std::string get_current_datetime() { return std::string(buf); } -SalticidaeError::SalticidaeError() : msg("unknown") {} - void Logger::set_color() { if (is_tty()) { @@ -182,7 +196,7 @@ void Config::add_opt(const std::string &optname, const optval_t &optval, Action char short_opt, const std::string &optdoc) { if (conf.count(optname)) - throw SalticidaeError("option name already exists"); + throw SalticidaeError(SALTI_ERROR_OPTNAME_ALREADY_EXISTS); opts.push_back(new Opt(optname, optdoc, optval, action, short_opt, opts.size())); @@ -199,7 +213,7 @@ void Config::update(Opt &p, const char *optval) { case SET_VAL: p.optval->set_val(optval); break; case APPEND: p.optval->append(optval); break; default: - throw SalticidaeError("unknown action"); + throw SalticidaeError(SALTI_ERROR_OPT_UNKNOWN_ACTION); } } @@ -220,7 +234,7 @@ bool Config::load(const std::string &fname) { if (strlen(buff) == BUFF_SIZE - 1) { fclose(conf_f); - throw SalticidaeError("configuration file line too long"); + throw SalticidaeError(SALTI_ERROR_CONFIG_LINE_TOO_LONG); } std::string line(buff); size_t pos = line.find("="); @@ -269,7 +283,7 @@ size_t Config::parse(int argc, char **argv) { if (id == -1) break; if (id == '?') - throw SalticidaeError("invalid option format"); + throw SalticidaeError(SALTI_ERROR_OPT_INVALID); if (id >= 0x100) update(*(opts[id - 0x100]), optarg); else diff --git a/test/test_msgnet_c.c b/test/test_msgnet_c.c index b717137..656bd80 100644 --- a/test/test_msgnet_c.c +++ b/test/test_msgnet_c.c @@ -35,23 +35,25 @@ const uint8_t MSG_OPCODE_HELLO = 0x0; const uint8_t MSG_OPCODE_ACK = 0x1; typedef struct MsgHello { - const char *name; - const char *text; + char *name; + char *text; } MsgHello; + /** Defines how to serialize the msg. */ msg_t *msg_hello_serialize(const char *name, const char *text) { datastream_t *serialized = datastream_new(); size_t name_len = strlen(name); datastream_put_i32(serialized, (uint32_t)htole32(name_len)); - datastream_put_data(serialized, name, name + name_len); - datastream_put_data(serialized, text, text + strlen(text)); - msg_t *msg = msg_new(MSG_OPCODE_HELLO, bytearray_new_moved_from_datastream(serialized)); + datastream_put_data(serialized, name, name_len); + datastream_put_data(serialized, text, strlen(text)); + msg_t *msg = msg_new_moved_from_bytearray( + MSG_OPCODE_HELLO, bytearray_new_moved_from_datastream(serialized)); return msg; } /** Defines how to parse the msg. */ MsgHello msg_hello_unserialize(const msg_t *msg) { - datastream_t *s = msg_get_payload(msg); + datastream_t *s = msg_consume_payload(msg); MsgHello res; uint32_t len; len = datastream_get_u32(s); @@ -73,7 +75,7 @@ MsgHello msg_hello_unserialize(const msg_t *msg) { } msg_t *msg_ack_serialize() { - msg_t *msg = msg_new(MSG_OPCODE_ACK, bytearray_new()); + msg_t *msg = msg_new_moved_from_bytearray(MSG_OPCODE_ACK, bytearray_new()); return msg; } @@ -88,10 +90,11 @@ void on_receive_hello(const msg_t *_msg, const msgnetwork_conn_t *conn, void *us const char *name = ((MyNet *)userdata)->name; MsgHello msg = msg_hello_unserialize(_msg); printf("[%s] %s says %s\n", name, msg.name, msg.text); + free(msg.name); + free(msg.text); msg_t *ack = msg_ack_serialize(); /* send acknowledgement */ - msgnetwork_send_msg(net, ack, conn); - msg_free(ack); + msgnetwork_send_msg_by_move(net, ack, conn); } void on_receive_ack(const msg_t *msg, const msgnetwork_conn_t *conn, void *userdata) { @@ -110,8 +113,7 @@ void conn_handler(const msgnetwork_conn_t *conn, bool connected, void *userdata) printf("[%s] Connected, sending hello.", name); /* send the first message through this connection */ msg_t *hello = msg_hello_serialize(name, "Hello there!"); - msgnetwork_send_msg(n->net, hello, conn); - msg_free(hello); + msgnetwork_send_msg_by_move(n->net, hello, conn); } else printf("[%s] Accepted, waiting for greetings.\n", name); @@ -120,7 +122,9 @@ void conn_handler(const msgnetwork_conn_t *conn, bool connected, void *userdata) { printf("[%s] Disconnected, retrying.\n", name); /* try to reconnect to the same address */ - msgnetwork_connect(net, msgnetwork_conn_get_addr(conn)); + netaddr_t *addr = msgnetwork_conn_get_addr(conn); + msgnetwork_connect(net, addr); + netaddr_free(addr); } } @@ -136,7 +140,7 @@ MyNet gen_mynet(const eventcontext_t *ec, static eventcontext_t *ec; -void on_term_signal(int sig) { +void on_term_signal(int sig, void *userdata) { eventcontext_stop(ec); } @@ -166,25 +170,25 @@ int main() { msgnetwork_listen(bob.net, bob_addr); /* try to connect once */ - msgnetwork_connect(alice.net, bob_addr); - msgnetwork_connect(bob.net, alice_addr); + msgnetwork_conn_free(msgnetwork_connect(alice.net, bob_addr)); + msgnetwork_conn_free(msgnetwork_connect(bob.net, alice_addr)); netaddr_free(alice_addr); netaddr_free(bob_addr); /* the main loop can be shutdown by ctrl-c or kill */ - sigev_t *ev_sigint = sigev_new(ec, on_term_signal); - sigev_t *ev_sigterm = sigev_new(ec, on_term_signal); + sigev_t *ev_sigint = sigev_new(ec, on_term_signal, NULL); + sigev_t *ev_sigterm = sigev_new(ec, on_term_signal, NULL); sigev_add(ev_sigint, SIGINT); sigev_add(ev_sigterm, SIGTERM); /* enter the main loop */ eventcontext_dispatch(ec); - sigev_free(ev_sigint); - sigev_free(ev_sigterm); msgnetwork_free(alice.net); msgnetwork_free(bob.net); + sigev_free(ev_sigint); + sigev_free(ev_sigterm); eventcontext_free(ec); return 0; } diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp index 70e3444..7321217 100644 --- a/test/test_p2p_stress.cpp +++ b/test/test_p2p_stress.cpp @@ -114,6 +114,11 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) { } } }); + net.reg_error_handler([ec](const std::exception &err, bool fatal) { + SALTICIDAE_LOG_WARN("main thread captured %s error: %s", + fatal ? "fatal" : "recoverable", err.what()); + ec.stop(); + }); net.reg_handler([&](MsgRand &&msg, const MyNet::conn_t &conn) { uint256_t hash = salticidae::get_hash(msg.bytes); net.send_msg(MsgAck(hash), conn); |