diff options
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r-- | include/salticidae/network.h | 195 |
1 files changed, 121 insertions, 74 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h index e7e77f5..9c57749 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -199,8 +199,10 @@ class MsgNetwork: public ConnPool { } template<typename MsgType> - bool send_msg(MsgType &&msg, const conn_t &conn); - inline bool _send_msg(const Msg &msg, const conn_t &conn); + void send_msg(MsgType &&msg, const conn_t &conn); + inline void _send_msg(Msg &&msg, const conn_t &conn); + inline void _send_msg_dispatcher(const Msg &msg, const conn_t &conn); + using ConnPool::listen; conn_t connect(const NetAddr &addr) { return static_pointer_cast<Conn>(ConnPool::connect(addr)); @@ -248,10 +250,6 @@ class ClientNetwork: public MsgNetwork<OpcodeType> { void send_msg(MsgType &&msg, const NetAddr &addr); }; -class PeerNetworkError: public ConnPoolError { - using ConnPoolError::ConnPoolError; -}; - /** Peer-to-peer network where any two nodes could hold a bi-diretional message * channel, established by either side. */ template<typename OpcodeType = uint8_t, @@ -434,9 +432,9 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { template<typename MsgType> void send_msg(MsgType &&msg, const NetAddr &paddr); inline void _send_msg(Msg &&msg, const NetAddr &paddr); - template<typename MsgType> void multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs); + inline void _multicast_msg(Msg &&msg, const std::vector<NetAddr> &paddrs); void listen(NetAddr listen_addr); conn_t connect(const NetAddr &addr) = delete; @@ -480,13 +478,23 @@ void MsgNetwork<OpcodeType>::Conn::on_read() { template<typename OpcodeType> template<typename MsgType> -bool MsgNetwork<OpcodeType>::send_msg(MsgType &&_msg, const conn_t &conn) { - Msg msg(std::forward<MsgType>(_msg)); - return _send_msg(msg, conn); +void MsgNetwork<OpcodeType>::send_msg(MsgType &&msg, const conn_t &conn) { + return _send_msg(MsgType(std::move(msg)), conn); +} + +template<typename OpcodeType> +inline void MsgNetwork<OpcodeType>::_send_msg(Msg &&msg, const conn_t &conn) { + this->disp_tcall->async_call( + [this, msg=std::move(msg), conn](ThreadCall::Handle &) { + try { + this->_send_msg_dispatcher(msg, conn); + throw SalticidaeError("wow"); + } catch (...) { this->disp_error_cb(std::current_exception()); } + }); } template<typename OpcodeType> -inline bool MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn) { +inline void MsgNetwork<OpcodeType>::_send_msg_dispatcher(const Msg &msg, const conn_t &conn) { bytearray_t msg_data = msg.serialize(); SALTICIDAE_LOG_DEBUG("wrote message %s to %s", std::string(msg).c_str(), @@ -495,17 +503,19 @@ inline bool MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn conn->nsent++; conn->nsentb += msg.get_length(); #endif - return conn->write(std::move(msg_data)); + conn->write(std::move(msg_data)); } template<typename O, O _, O __> void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker, const conn_t &conn, double timeout) { - worker->get_tcall()->async_call([conn, t=timeout](ThreadCall::Handle &) { - if (!conn->ev_timeout) return; - conn->ev_timeout.del(); - conn->ev_timeout.add(t); - SALTICIDAE_LOG_DEBUG("reset connection timeout %.2f", t); + worker->get_tcall()->async_call([worker, conn, t=timeout](ThreadCall::Handle &) { + try { + if (!conn->ev_timeout) return; + conn->ev_timeout.del(); + conn->ev_timeout.add(t); + SALTICIDAE_LOG_DEBUG("reset connection timeout %.2f", t); + } catch (...) { worker->error_callback(std::current_exception()); } }); } @@ -517,9 +527,11 @@ void PeerNetwork<O, _, __>::Conn::on_setup() { auto conn = static_pointer_cast<Conn>(this->self()); auto worker = this->worker; assert(!ev_timeout); - ev_timeout = TimerEvent(worker->get_ec(), [conn](TimerEvent &) { - SALTICIDAE_LOG_INFO("peer ping-pong timeout"); - conn->worker_terminate(); + ev_timeout = TimerEvent(worker->get_ec(), [worker, conn](TimerEvent &) { + try { + SALTICIDAE_LOG_INFO("peer ping-pong timeout"); + conn->worker_terminate(); + } catch (...) { worker->error_callback(std::current_exception()); } }); /* the initial ping-pong to set up the connection */ tcall_reset_timeout(worker, conn, pn->conn_timeout); @@ -541,7 +553,9 @@ void PeerNetwork<O, _, __>::Conn::on_teardown() { // try to reconnect p->ev_retry_timer = TimerEvent(pn->disp_ec, [pn, peer_id = this->peer_id](TimerEvent &) { - pn->start_active_conn(peer_id); + try { + pn->start_active_conn(peer_id); + } catch (...) { pn->disp_error_cb(std::current_exception()); } }); p->ev_retry_timer.add(pn->gen_conn_timeout()); } @@ -647,11 +661,13 @@ template<typename O, O _, O __> void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, const conn_t &conn) { uint16_t port = msg.port; this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &) { - if (conn->get_mode() == ConnPool::Conn::DEAD) return; - SALTICIDAE_LOG_INFO("ping from %s, port %u", - std::string(*conn).c_str(), ntohs(port)); - if (check_new_conn(conn, port)) return; - send_msg(MsgPong(this->listen_port), conn); + try { + if (conn->get_mode() == ConnPool::Conn::DEAD) return; + SALTICIDAE_LOG_INFO("ping from %s, port %u", + std::string(*conn).c_str(), ntohs(port)); + if (check_new_conn(conn, port)) return; + send_msg(MsgPong(this->listen_port), conn); + } catch (...) { this->disp_error_cb(std::current_exception()); } }); } @@ -659,54 +675,74 @@ template<typename O, O _, O __> void PeerNetwork<O, _, __>::msg_pong(MsgPong &&msg, const conn_t &conn) { uint16_t port = msg.port; this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &) { - if (conn->get_mode() == ConnPool::Conn::DEAD) return; - auto it = id2peer.find(conn->peer_id); - if (it == id2peer.end()) - { - SALTICIDAE_LOG_WARN("pong message discarded"); - return; - } - if (check_new_conn(conn, port)) return; - auto p = it->second.get(); - p->pong_msg_ok = true; - if (p->ping_timer_ok) - { - p->reset_ping_timer(); - p->send_ping(); - } + try { + if (conn->get_mode() == ConnPool::Conn::DEAD) return; + auto it = id2peer.find(conn->peer_id); + if (it == id2peer.end()) + { + SALTICIDAE_LOG_WARN("pong message discarded"); + return; + } + if (check_new_conn(conn, port)) return; + auto p = it->second.get(); + p->pong_msg_ok = true; + if (p->ping_timer_ok) + { + p->reset_ping_timer(); + p->send_ping(); + } + } catch (...) { this->disp_error_cb(std::current_exception()); } }); } template<typename O, O _, O __> void PeerNetwork<O, _, __>::listen(NetAddr listen_addr) { - this->disp_tcall->call([this, listen_addr](ThreadCall::Handle &) { - MsgNet::_listen(listen_addr); - listen_port = listen_addr.port; - }); + auto ret = *(static_cast<SalticidaeError *>( + this->disp_tcall->call([this, listen_addr](ThreadCall::Handle &h) { + SalticidaeError err; + try { + MsgNet::_listen(listen_addr); + listen_port = listen_addr.port; + } catch (SalticidaeError &e) { + err = e; + } + h.set_result(std::move(err)); + }).get())); + if (ret.get_code()) throw ret; } template<typename O, O _, O __> void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) { - this->disp_tcall->call([this, addr](ThreadCall::Handle &) { - auto it = id2peer.find(addr); - if (it != id2peer.end()) - throw PeerNetworkError("peer already exists"); - id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->disp_ec))); - start_active_conn(addr); + this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) { + try { + auto it = id2peer.find(addr); + if (it != id2peer.end()) + throw PeerNetworkError(SALTI_ERROR_PEER_ALREADY_EXISTS); + id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->disp_ec))); + start_active_conn(addr); + } catch (...) { this->disp_error_cb(std::current_exception()); } }); } template<typename O, O _, O __> const typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &paddr) const { - auto ret = *(static_cast<conn_t *>(this->disp_tcall->call( - [this, paddr](ThreadCall::Handle &h) { - auto it = id2peer.find(paddr); - if (it == id2peer.end()) - throw PeerNetworkError("peer does not exist"); - h.set_result(it->second->conn); + auto ret = *(static_cast<std::pair<conn_t, SalticidaeError> *>( + this->disp_tcall->call([this, paddr](ThreadCall::Handle &h) { + conn_t conn; + SalticidaeError err; + try { + auto it = id2peer.find(paddr); + if (it == id2peer.end()) + throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXISTS); + conn = it->second->conn; + } catch (SalticidaeError &e) { + err = e; + } + h.set_result(std::make_pair(std::move(conn), std::move(err))); }).get())); - return std::move(ret); + if (ret.second.get_code()) throw ret.second; + return std::move(ret.first); } template<typename O, O _, O __> @@ -720,32 +756,41 @@ bool PeerNetwork<O, _, __>::has_peer(const NetAddr &paddr) const { template<typename O, O _, O __> template<typename MsgType> void PeerNetwork<O, _, __>::send_msg(MsgType &&msg, const NetAddr &paddr) { - return _send_msg(MsgType(msg), paddr); + return _send_msg(MsgType(std::move(msg)), paddr); } template<typename O, O _, O __> void PeerNetwork<O, _, __>::_send_msg(Msg &&msg, const NetAddr &paddr) { this->disp_tcall->async_call( [this, msg=std::move(msg), paddr](ThreadCall::Handle &) { - auto it = id2peer.find(paddr); - if (it == id2peer.end()) - throw PeerNetworkError("peer does not exist"); - send_msg(std::move(msg), it->second->conn); + try { + auto it = id2peer.find(paddr); + if (it == id2peer.end()) + throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXISTS); + this->_send_msg_dispatcher(msg, it->second->conn); + } catch (...) { this->disp_error_cb(std::current_exception()); } }); } template<typename O, O _, O __> template<typename MsgType> void PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs) { + return _multicast_msg(MsgType(std::move(msg)), paddrs); +} + +template<typename O, O _, O __> +void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &paddrs) { this->disp_tcall->async_call( [this, msg=std::move(msg), paddrs](ThreadCall::Handle &) { - for (auto &addr: paddrs) - { - auto it = id2peer.find(addr); - if (it == id2peer.end()) - throw PeerNetworkError("peer does not exist"); - send_msg(std::move(msg), it->second->conn); - } + try { + for (auto &addr: paddrs) + { + auto it = id2peer.find(addr); + if (it == id2peer.end()) + throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXISTS); + this->_send_msg_dispatcher(msg, it->second->conn); + } + } catch (...) { this->disp_error_cb(std::current_exception()); } }); } @@ -774,9 +819,11 @@ template<typename MsgType> void ClientNetwork<OpcodeType>::send_msg(MsgType &&msg, const NetAddr &addr) { this->disp_tcall->async_call( [this, addr, msg=std::forward<MsgType>(msg)](ThreadCall::Handle &) { - auto it = addr2conn.find(addr); - if (it != addr2conn.end()) - send_msg(std::move(msg), it->second); + try { + auto it = addr2conn.find(addr); + if (it != addr2conn.end()) + send_msg(std::move(msg), it->second); + } catch (...) { this->disp_error_cb(std::current_exception()); } }); } @@ -842,7 +889,7 @@ void msgnetwork_config_queue_capacity(msgnetwork_config_t *self, size_t cap); msgnetwork_t *msgnetwork_new(const eventcontext_t *ec, const msgnetwork_config_t *config); void msgnetwork_free(const msgnetwork_t *self); -bool msgnetwork_send_msg(msgnetwork_t *self, const msg_t *msg, const msgnetwork_conn_t *conn); +void msgnetwork_send_msg_by_move(msgnetwork_t *self, msg_t *_moved_msg, const msgnetwork_conn_t *conn); msgnetwork_conn_t *msgnetwork_connect(msgnetwork_t *self, const netaddr_t *addr); msgnetwork_conn_t *msgnetwork_conn_copy(const msgnetwork_conn_t *self); void msgnetwork_conn_free(const msgnetwork_conn_t *self); |