From 20669e2b5e956babf888bca577e39a71d83bae79 Mon Sep 17 00:00:00 2001 From: Determinant Date: Fri, 16 Nov 2018 21:40:53 -0500 Subject: add test_p2p_stress.cpp; fix bugs --- include/salticidae/event.h | 15 ++++++--------- include/salticidae/network.h | 20 +++++++++++++------- 2 files changed, 19 insertions(+), 16 deletions(-) (limited to 'include/salticidae') diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 0498fa5..021b5dc 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -70,11 +70,11 @@ class EventContext: public _event_context_ot { class Event { public: - using callback_t = std::function; + using callback_t = std::function; static const int READ = UV_READABLE; static const int WRITE = UV_WRITABLE; - static const int TIMEOUT = ~(UV_READABLE | UV_WRITABLE | - UV_DISCONNECT | UV_PRIORITIZED); + static const int ERROR = 1 << 30; + static const int TIMEOUT = 1 << 29; private: EventContext eb; @@ -84,10 +84,7 @@ class Event { callback_t callback; static inline void fd_then(uv_poll_t *h, int status, int events) { if (status != 0) - { - //SALTICIDAE_LOG_WARN("%s", uv_strerror(status)); - return; - } + events |= ERROR; auto event = static_cast(h->data); event->callback(event->fd, events); } @@ -332,7 +329,7 @@ class MPSCQueueEventDriven: public MPSCQueue { template void reg_handler(const EventContext &ec, Func &&func) { ev = Event(ec, fd, - [this, func=std::forward(func)](int, short) { + [this, func=std::forward(func)](int, int) { //fprintf(stderr, "%x\n", std::this_thread::get_id()); uint64_t t; read(fd, &t, 8); @@ -385,7 +382,7 @@ class MPMCQueueEventDriven: public MPMCQueue { void listen(const EventContext &ec, Func &&func, size_t burst_size=128) { int fd = eventfd(0, EFD_NONBLOCK); evs.emplace(evs.end(), std::make_pair(new Event(ec, fd, EV_READ | EV_PERSIST, - [this, func=std::forward(func), burst_size](int fd, short) { + [this, func=std::forward(func), burst_size](int fd, int) { uint64_t t; read(fd, &t, 8); //fprintf(stderr, "%x\n", std::this_thread::get_id()); diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 5e966fe..78449eb 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -140,7 +140,9 @@ class MsgNetwork: public ConnPool { size_t _burst_size; public: - Config(): _burst_size(1000) {} + Config(): Config(ConnPool::Config()) {} + Config(const ConnPool::Config &config): + ConnPool::Config(config), _burst_size(1000) {} Config &burst_size(size_t x) { _burst_size = x; @@ -380,12 +382,16 @@ class PeerNetwork: public MsgNetwork { IdentityMode _id_mode; public: - Config(): + Config(): Config(typename MsgNet::Config()) {} + + Config(const typename MsgNet::Config &config): + MsgNet::Config(config), _retry_conn_delay(2), _ping_period(30), _conn_timeout(180), _id_mode(IP_PORT_BASED) {} + Config &retry_conn_delay(double x) { _retry_conn_delay = x; return *this; @@ -518,9 +524,7 @@ void PeerNetwork::Conn::on_teardown() { p->ev_ping_timer.del(); p->connected = false; p->conn = nullptr; - SALTICIDAE_LOG_INFO("connection lost %s for %s", - std::string(*this).c_str(), - std::string(peer_id).c_str()); + SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*this).c_str()); // try to reconnect p->ev_retry_timer = Event(pn->disp_ec, -1, [pn, peer_id = this->peer_id](int, int) { @@ -618,8 +622,9 @@ void PeerNetwork::start_active_conn(const NetAddr &addr) { /* begin: functions invoked by the user loop */ template void PeerNetwork::msg_ping(MsgPing &&msg, Conn &_conn) { + if (_conn.get_mode() == ConnPool::Conn::DEAD) return; auto conn = static_pointer_cast(_conn.self()); - if (!conn) return; + assert(conn); uint16_t port = msg.port; this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &msg) { SALTICIDAE_LOG_INFO("ping from %s, port %u", @@ -632,8 +637,9 @@ void PeerNetwork::msg_ping(MsgPing &&msg, Conn &_conn) { template void PeerNetwork::msg_pong(MsgPong &&msg, Conn &_conn) { + if (_conn.get_mode() == ConnPool::Conn::DEAD) return; auto conn = static_pointer_cast(_conn.self()); - if (!conn) return; + assert(conn); uint16_t port = msg.port; this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &msg) { auto it = id2peer.find(conn->peer_id); -- cgit v1.2.3