diff options
author | Determinant <[email protected]> | 2018-11-16 21:40:53 -0500 |
---|---|---|
committer | Determinant <[email protected]> | 2018-11-16 21:40:53 -0500 |
commit | 20669e2b5e956babf888bca577e39a71d83bae79 (patch) | |
tree | 22733578ee8731cebd43f50be33e09c765c61714 /include | |
parent | b27216a50bd6566b6fd9203d3acf191005ae5763 (diff) |
add test_p2p_stress.cpp; fix bugs
Diffstat (limited to 'include')
-rw-r--r-- | include/salticidae/event.h | 15 | ||||
-rw-r--r-- | include/salticidae/network.h | 20 |
2 files changed, 19 insertions, 16 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 0498fa5..021b5dc 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -70,11 +70,11 @@ class EventContext: public _event_context_ot { class Event { public: - using callback_t = std::function<void(int fd, short events)>; + using callback_t = std::function<void(int fd, int events)>; static const int READ = UV_READABLE; static const int WRITE = UV_WRITABLE; - static const int TIMEOUT = ~(UV_READABLE | UV_WRITABLE | - UV_DISCONNECT | UV_PRIORITIZED); + static const int ERROR = 1 << 30; + static const int TIMEOUT = 1 << 29; private: EventContext eb; @@ -84,10 +84,7 @@ class Event { callback_t callback; static inline void fd_then(uv_poll_t *h, int status, int events) { if (status != 0) - { - //SALTICIDAE_LOG_WARN("%s", uv_strerror(status)); - return; - } + events |= ERROR; auto event = static_cast<Event *>(h->data); event->callback(event->fd, events); } @@ -332,7 +329,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> { template<typename Func> void reg_handler(const EventContext &ec, Func &&func) { ev = Event(ec, fd, - [this, func=std::forward<Func>(func)](int, short) { + [this, func=std::forward<Func>(func)](int, int) { //fprintf(stderr, "%x\n", std::this_thread::get_id()); uint64_t t; read(fd, &t, 8); @@ -385,7 +382,7 @@ class MPMCQueueEventDriven: public MPMCQueue<T> { void listen(const EventContext &ec, Func &&func, size_t burst_size=128) { int fd = eventfd(0, EFD_NONBLOCK); evs.emplace(evs.end(), std::make_pair(new Event(ec, fd, EV_READ | EV_PERSIST, - [this, func=std::forward<Func>(func), burst_size](int fd, short) { + [this, func=std::forward<Func>(func), burst_size](int fd, int) { uint64_t t; read(fd, &t, 8); //fprintf(stderr, "%x\n", std::this_thread::get_id()); diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 5e966fe..78449eb 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -140,7 +140,9 @@ class MsgNetwork: public ConnPool { size_t _burst_size; public: - Config(): _burst_size(1000) {} + Config(): Config(ConnPool::Config()) {} + Config(const ConnPool::Config &config): + ConnPool::Config(config), _burst_size(1000) {} Config &burst_size(size_t x) { _burst_size = x; @@ -380,12 +382,16 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { IdentityMode _id_mode; public: - Config(): + Config(): Config(typename MsgNet::Config()) {} + + Config(const typename MsgNet::Config &config): + MsgNet::Config(config), _retry_conn_delay(2), _ping_period(30), _conn_timeout(180), _id_mode(IP_PORT_BASED) {} + Config &retry_conn_delay(double x) { _retry_conn_delay = x; return *this; @@ -518,9 +524,7 @@ void PeerNetwork<O, _, __>::Conn::on_teardown() { p->ev_ping_timer.del(); p->connected = false; p->conn = nullptr; - SALTICIDAE_LOG_INFO("connection lost %s for %s", - std::string(*this).c_str(), - std::string(peer_id).c_str()); + SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*this).c_str()); // try to reconnect p->ev_retry_timer = Event(pn->disp_ec, -1, [pn, peer_id = this->peer_id](int, int) { @@ -618,8 +622,9 @@ void PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) { /* begin: functions invoked by the user loop */ template<typename O, O _, O __> void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, Conn &_conn) { + if (_conn.get_mode() == ConnPool::Conn::DEAD) return; auto conn = static_pointer_cast<Conn>(_conn.self()); - if (!conn) return; + assert(conn); uint16_t port = msg.port; this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &msg) { SALTICIDAE_LOG_INFO("ping from %s, port %u", @@ -632,8 +637,9 @@ void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, Conn &_conn) { template<typename O, O _, O __> void PeerNetwork<O, _, __>::msg_pong(MsgPong &&msg, Conn &_conn) { + if (_conn.get_mode() == ConnPool::Conn::DEAD) return; auto conn = static_pointer_cast<Conn>(_conn.self()); - if (!conn) return; + assert(conn); uint16_t port = msg.port; this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &msg) { auto it = id2peer.find(conn->peer_id); |