From d91fc3e873d4bddd5cdd69fda7f67bd780a0ac55 Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 18 Jun 2019 01:48:18 -0400 Subject: fix bugs in the benchmark example; keep both send_msg APIs --- include/salticidae/event.h | 79 +++++++++++++++++++++++++++++++++++++ include/salticidae/network.h | 93 ++++++++++++++++++++++++++------------------ src/network.cpp | 4 +- test/bench_network.cpp | 70 ++++++++++++++------------------- 4 files changed, 166 insertions(+), 80 deletions(-) diff --git a/include/salticidae/event.h b/include/salticidae/event.h index c4f65ba..d3625d5 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -418,6 +418,85 @@ class SigEvent { const EventContext &get_ec() const { return ec; } }; +class CheckEvent { + public: + using callback_t = std::function; + private: + EventContext ec; + uv_check_t *ev_check; + callback_t callback; + static inline void check_then(uv_check_t *h) { + auto event = static_cast(h->data); + event->callback(); + } + + public: + CheckEvent(): ec(nullptr), ev_check(nullptr) {} + CheckEvent(const EventContext &ec, callback_t callback): + ec(ec), ev_check(new uv_check_t()), + callback(std::move(callback)) { + uv_check_init(ec.get(), ev_check); + ev_check->data = this; + } + + CheckEvent(const CheckEvent &) = delete; + CheckEvent(CheckEvent &&other): + ec(std::move(other.ec)), ev_check(other.ev_check), + callback(std::move(other.callback)) { + other.ev_check = nullptr; + if (ev_check != nullptr) + ev_check->data = this; + } + + void swap(CheckEvent &other) { + std::swap(ec, other.ec); + std::swap(ev_check, other.ev_check); + std::swap(callback, other.callback); + if (ev_check != nullptr) + ev_check->data = this; + if (other.ev_check != nullptr) + other.ev_check->data = &other; + } + + CheckEvent &operator=(CheckEvent &&other) { + if (this != &other) + { + CheckEvent tmp(std::move(other)); + tmp.swap(*this); + } + return *this; + } + + ~CheckEvent() { clear(); } + + void clear() { + if (ev_check != nullptr) + { + uv_check_stop(ev_check); + uv_close((uv_handle_t *)ev_check, _on_uv_handle_close); + ev_check = nullptr; + } + callback = nullptr; + } + + void set_callback(callback_t _callback) { + callback = _callback; + } + + void add() { + assert(ev_check != nullptr); + uv_check_start(ev_check, CheckEvent::check_then); + } + + void del() { + if (ev_check != nullptr) uv_check_stop(ev_check); + } + + operator bool() const { return ev_check != nullptr; } + + const EventContext &get_ec() const { return ec; } +}; + template class ThreadNotifier { std::condition_variable cv; diff --git a/include/salticidae/network.h b/include/salticidae/network.h index a09f8ac..e9fdae6 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -199,14 +199,16 @@ class MsgNetwork: public ConnPool { } template - void send_msg(MsgType &&msg, const conn_t &conn); - inline void _send_msg(Msg &&msg, const conn_t &conn); - inline void _send_msg_dispatcher(const Msg &msg, const conn_t &conn); + inline void send_msg(const MsgType &msg, const conn_t &conn); + inline void _send_msg(const Msg &msg, const conn_t &conn); + template + inline void send_msg_deferred(MsgType &&msg, const conn_t &conn); + inline void _send_msg_deferred(Msg &&msg, const conn_t &conn); void stop() { stop_workers(); } using ConnPool::listen; - conn_t connect(const NetAddr &addr) { - return static_pointer_cast(ConnPool::connect(addr)); + conn_t connect(const NetAddr &addr, bool blocking = true) { + return static_pointer_cast(ConnPool::connect(addr, blocking)); } }; @@ -446,8 +448,11 @@ class PeerNetwork: public MsgNetwork { const conn_t get_peer_conn(const NetAddr &paddr) const; using MsgNet::send_msg; template - void send_msg(MsgType &&msg, const NetAddr &paddr); - inline void _send_msg(Msg &&msg, const NetAddr &paddr); + inline void send_msg(const MsgType &msg, const NetAddr &paddr); + inline void _send_msg(const Msg &msg, const NetAddr &paddr); + template + inline void send_msg_deferred(MsgType &&msg, const NetAddr &paddr); + inline void _send_msg_deferred(Msg &&msg, const NetAddr &paddr); template void multicast_msg(MsgType &&msg, const std::vector &paddrs); inline void _multicast_msg(Msg &&msg, const std::vector &paddrs); @@ -496,20 +501,39 @@ void MsgNetwork::Conn::on_read() { template template -void MsgNetwork::send_msg(MsgType &&msg, const conn_t &conn) { - return _send_msg(MsgType(std::move(msg)), conn); +inline void MsgNetwork::send_msg_deferred(MsgType &&msg, const conn_t &conn) { + return _send_msg_deferred(MsgType(std::move(msg)), conn); } template -inline void MsgNetwork::_send_msg(Msg &&msg, const conn_t &conn) { +inline void MsgNetwork::_send_msg_deferred(Msg &&msg, const conn_t &conn) { this->disp_tcall->async_call( [this, msg=std::move(msg), conn](ThreadCall::Handle &) { try { - this->_send_msg_dispatcher(msg, conn); + this->send_msg(msg, conn); } catch (...) { this->recoverable_error(std::current_exception()); } }); } +template +template +inline void MsgNetwork::send_msg(const MsgType &msg, const conn_t &conn) { + return _send_msg(msg, conn); +} + +template +inline void MsgNetwork::_send_msg(const Msg &msg, const conn_t &conn) { + bytearray_t msg_data = msg.serialize(); + SALTICIDAE_LOG_DEBUG("wrote message %s to %s", + std::string(msg).c_str(), + std::string(*conn).c_str()); +#ifdef SALTICIDAE_MSG_STAT + conn->nsent++; + conn->nsentb += msg.get_length(); +#endif + conn->write(std::move(msg_data)); +} + template void PeerNetwork::tcall_reset_timeout(ConnPool::Worker *worker, const conn_t &conn, double timeout) { @@ -686,19 +710,6 @@ typename PeerNetwork::Peer *PeerNetwork::get_peer(const NetA if (it != id2upeer.end()) return it->second.get(); return nullptr; } - -template -inline void MsgNetwork::_send_msg_dispatcher(const Msg &msg, const conn_t &conn) { - bytearray_t msg_data = msg.serialize(); - SALTICIDAE_LOG_DEBUG("wrote message %s to %s", - std::string(msg).c_str(), - std::string(*conn).c_str()); -#ifdef SALTICIDAE_MSG_STAT - conn->nsent++; - conn->nsentb += msg.get_length(); -#endif - conn->write(std::move(msg_data)); -} /* end: functions invoked by the dispatcher */ /* begin: functions invoked by the user loop */ @@ -826,33 +837,41 @@ bool PeerNetwork::has_peer(const NetAddr &paddr) const { template template -void PeerNetwork::send_msg(MsgType &&msg, const NetAddr &paddr) { - return _send_msg(MsgType(std::move(msg)), paddr); +inline void PeerNetwork::send_msg_deferred(MsgType &&msg, const NetAddr &paddr) { + return _send_msg_deferred(MsgType(std::move(msg)), paddr); } template -void PeerNetwork::_send_msg(Msg &&msg, const NetAddr &paddr) { +inline void PeerNetwork::_send_msg_deferred(Msg &&msg, const NetAddr &paddr) { this->disp_tcall->async_call( - [this, msg=std::move(msg), paddr](ThreadCall::Handle &) { + [this, msg=std::move(msg), paddr](ThreadCall::Handle &) { try { - auto p = get_peer(paddr); - if (!p) - throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - this->_send_msg_dispatcher(msg, p->conn); - } catch (const PeerNetworkError &) { - this->recoverable_error(std::current_exception()); + send_msg(msg, paddr); } catch (...) { this->recoverable_error(std::current_exception()); } }); } template template -void PeerNetwork::multicast_msg(MsgType &&msg, const std::vector &paddrs) { +inline void PeerNetwork::send_msg(const MsgType &msg, const NetAddr &paddr) { + return _send_msg(msg, paddr); +} + +template +inline void PeerNetwork::_send_msg(const Msg &msg, const NetAddr &paddr) { + auto p = get_peer(paddr); + if (!p) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); + this->send_msg(msg, p->conn); +} + +template +template +inline void PeerNetwork::multicast_msg(MsgType &&msg, const std::vector &paddrs) { return _multicast_msg(MsgType(std::move(msg)), paddrs); } template -void PeerNetwork::_multicast_msg(Msg &&msg, const std::vector &paddrs) { +inline void PeerNetwork::_multicast_msg(Msg &&msg, const std::vector &paddrs) { this->disp_tcall->async_call( [this, msg=std::move(msg), paddrs](ThreadCall::Handle &) { try { @@ -861,7 +880,7 @@ void PeerNetwork::_multicast_msg(Msg &&msg, const std::vector auto p = get_peer(addr); if (!p) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - this->_send_msg_dispatcher(msg, p->conn); + this->send_msg(msg, p->conn); } } catch (const PeerNetworkError &) { this->recoverable_error(std::current_exception()); diff --git a/src/network.cpp b/src/network.cpp index 938c105..b8d058a 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -46,7 +46,7 @@ void msgnetwork_config_queue_capacity(msgnetwork_config_t *self, size_t cap) { void msgnetwork_send_msg_by_move(msgnetwork_t *self, msg_t *_moved_msg, const msgnetwork_conn_t *conn) { - self->_send_msg(std::move(*_moved_msg), *conn); + self->_send_msg_deferred(std::move(*_moved_msg), *conn); //delete _moved_msg; } @@ -213,7 +213,7 @@ void peernetwork_conn_free(const peernetwork_conn_t *self) { delete self; } void peernetwork_send_msg_by_move(peernetwork_t *self, msg_t * _moved_msg, const netaddr_t *paddr) { - self->_send_msg(std::move(*_moved_msg), *paddr); + self->_send_msg_deferred(std::move(*_moved_msg), *paddr); //delete _moved_msg; } diff --git a/test/bench_network.cpp b/test/bench_network.cpp index 58f4a64..a498954 100644 --- a/test/bench_network.cpp +++ b/test/bench_network.cpp @@ -43,6 +43,7 @@ using salticidae::htole; using salticidae::letoh; using salticidae::bytearray_t; using salticidae::TimerEvent; +using salticidae::ThreadCall; using std::placeholders::_1; using std::placeholders::_2; using opcode_t = uint8_t; @@ -71,9 +72,10 @@ using MsgNetworkByteOp = MsgNetwork; struct MyNet: public MsgNetworkByteOp { const std::string name; const NetAddr peer; - TimerEvent ev_period_send; TimerEvent ev_period_stat; + ThreadCall tcall; size_t nrecv; + std::function trigger; MyNet(const salticidae::EventContext &ec, const std::string name, @@ -88,50 +90,36 @@ struct MyNet: public MsgNetworkByteOp { nrecv = 0; ev_period_stat.add(stat_timeout); }), + tcall(ec), nrecv(0) { /* message handler could be a bound method */ - reg_handler(salticidae::generic_bind( - &MyNet::on_receive_bytes, this, _1, _2)); + reg_handler(salticidae::generic_bind(&MyNet::on_receive_bytes, this, _1, _2)); if (stat_timeout > 0) ev_period_stat.add(0); - } - - struct Conn: public MsgNetworkByteOp::Conn { - MyNet *get_net() { return static_cast(get_pool()); } - salticidae::ArcObj self() { - return salticidae::static_pointer_cast( - MsgNetworkByteOp::Conn::self()); - } - - void on_setup() override { - auto net = get_net(); - if (get_mode() == ACTIVE) + reg_conn_handler([this, ec](const ConnPool::conn_t &conn, bool connected) { + if (connected) { - printf("[%s] Connected, sending hello.\n", - net->name.c_str()); - /* send the first message through this connection */ - net->ev_period_send = TimerEvent(net->ec, - [net, conn = self()](TimerEvent &) { - net->send_msg(MsgBytes(256), conn); - net->ev_period_send.add(0); - }); - net->ev_period_send.add(0); + if (conn->get_mode() == MyNet::Conn::ACTIVE) + { + printf("[%s] Connected, sending hello.\n", this->name.c_str()); + /* send the first message through this connection */ + trigger = [this, conn](ThreadCall::Handle &) { + send_msg(MsgBytes(256), salticidae::static_pointer_cast(conn)); + if (conn->get_mode() != MyNet::Conn::DEAD) + tcall.async_call(trigger); + }; + tcall.async_call(trigger); + } + else + printf("[%s] Passively connected, waiting for greetings.\n", this->name.c_str()); } else - printf("[%s] Passively connected, waiting for greetings.\n", - net->name.c_str()); - } - void on_teardown() override { - auto net = get_net(); - net->ev_period_send.clear(); - printf("[%s] Disconnected, retrying.\n", net->name.c_str()); - /* try to reconnect to the same address */ - net->connect(get_addr()); - } - }; - - salticidae::ConnPool::Conn *create_conn() override { - return new Conn(); + { + printf("[%s] Disconnected, retrying.\n", this->name.c_str()); + /* try to reconnect to the same address */ + connect(conn->get_addr(), false); + } + }); } void on_receive_bytes(MsgBytes &&msg, const conn_t &conn) { @@ -148,7 +136,7 @@ int main() { alice->start(); alice->listen(alice_addr); salticidae::EventContext tec; - salticidae::BoxObj tcall = new salticidae::ThreadCall(tec); + salticidae::BoxObj tcall = new ThreadCall(tec); std::thread bob_thread([&tec]() { MyNet bob(tec, "Bob", alice_addr); bob.start(); @@ -163,8 +151,8 @@ int main() { tec.stop(); }); alice = nullptr; - //ec.stop(); - //bob_thread.join(); + ec.stop(); + bob_thread.join(); }; salticidae::SigEvent ev_sigint(ec, shutdown); salticidae::SigEvent ev_sigterm(ec, shutdown); -- cgit v1.2.3