diff options
-rw-r--r-- | include/salticidae/event.h | 15 | ||||
-rw-r--r-- | include/salticidae/network.h | 20 | ||||
-rw-r--r-- | src/conn.cpp | 45 | ||||
-rw-r--r-- | test/test_p2p_stress.cpp | 113 |
4 files changed, 165 insertions, 28 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 0498fa5..021b5dc 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -70,11 +70,11 @@ class EventContext: public _event_context_ot { class Event { public: - using callback_t = std::function<void(int fd, short events)>; + using callback_t = std::function<void(int fd, int events)>; static const int READ = UV_READABLE; static const int WRITE = UV_WRITABLE; - static const int TIMEOUT = ~(UV_READABLE | UV_WRITABLE | - UV_DISCONNECT | UV_PRIORITIZED); + static const int ERROR = 1 << 30; + static const int TIMEOUT = 1 << 29; private: EventContext eb; @@ -84,10 +84,7 @@ class Event { callback_t callback; static inline void fd_then(uv_poll_t *h, int status, int events) { if (status != 0) - { - //SALTICIDAE_LOG_WARN("%s", uv_strerror(status)); - return; - } + events |= ERROR; auto event = static_cast<Event *>(h->data); event->callback(event->fd, events); } @@ -332,7 +329,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> { template<typename Func> void reg_handler(const EventContext &ec, Func &&func) { ev = Event(ec, fd, - [this, func=std::forward<Func>(func)](int, short) { + [this, func=std::forward<Func>(func)](int, int) { //fprintf(stderr, "%x\n", std::this_thread::get_id()); uint64_t t; read(fd, &t, 8); @@ -385,7 +382,7 @@ class MPMCQueueEventDriven: public MPMCQueue<T> { void listen(const EventContext &ec, Func &&func, size_t burst_size=128) { int fd = eventfd(0, EFD_NONBLOCK); evs.emplace(evs.end(), std::make_pair(new Event(ec, fd, EV_READ | EV_PERSIST, - [this, func=std::forward<Func>(func), burst_size](int fd, short) { + [this, func=std::forward<Func>(func), burst_size](int fd, int) { uint64_t t; read(fd, &t, 8); //fprintf(stderr, "%x\n", std::this_thread::get_id()); diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 5e966fe..78449eb 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -140,7 +140,9 @@ class MsgNetwork: public ConnPool { size_t _burst_size; public: - Config(): _burst_size(1000) {} + Config(): Config(ConnPool::Config()) {} + Config(const ConnPool::Config &config): + ConnPool::Config(config), _burst_size(1000) {} Config &burst_size(size_t x) { _burst_size = x; @@ -380,12 +382,16 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { IdentityMode _id_mode; public: - Config(): + Config(): Config(typename MsgNet::Config()) {} + + Config(const typename MsgNet::Config &config): + MsgNet::Config(config), _retry_conn_delay(2), _ping_period(30), _conn_timeout(180), _id_mode(IP_PORT_BASED) {} + Config &retry_conn_delay(double x) { _retry_conn_delay = x; return *this; @@ -518,9 +524,7 @@ void PeerNetwork<O, _, __>::Conn::on_teardown() { p->ev_ping_timer.del(); p->connected = false; p->conn = nullptr; - SALTICIDAE_LOG_INFO("connection lost %s for %s", - std::string(*this).c_str(), - std::string(peer_id).c_str()); + SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*this).c_str()); // try to reconnect p->ev_retry_timer = Event(pn->disp_ec, -1, [pn, peer_id = this->peer_id](int, int) { @@ -618,8 +622,9 @@ void PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) { /* begin: functions invoked by the user loop */ template<typename O, O _, O __> void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, Conn &_conn) { + if (_conn.get_mode() == ConnPool::Conn::DEAD) return; auto conn = static_pointer_cast<Conn>(_conn.self()); - if (!conn) return; + assert(conn); uint16_t port = msg.port; this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &msg) { SALTICIDAE_LOG_INFO("ping from %s, port %u", @@ -632,8 +637,9 @@ void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, Conn &_conn) { template<typename O, O _, O __> void PeerNetwork<O, _, __>::msg_pong(MsgPong &&msg, Conn &_conn) { + if (_conn.get_mode() == ConnPool::Conn::DEAD) return; auto conn = static_pointer_cast<Conn>(_conn.self()); - if (!conn) return; + assert(conn); uint16_t port = msg.port; this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &msg) { auto it = id2peer.find(conn->peer_id); diff --git a/src/conn.cpp b/src/conn.cpp index 5967339..c29dee9 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -40,14 +40,25 @@ ConnPool::Conn::operator std::string() const { s << "<Conn " << "fd=" << std::to_string(fd) << " " << "addr=" << std::string(addr) << " " - << "mode=" << ((mode == Conn::ACTIVE) ? "active" : "passive") << ">"; + << "mode="; + switch (mode) + { + case Conn::ACTIVE: s << "active"; break; + case Conn::PASSIVE: s << "passive"; break; + case Conn::DEAD: s << "dead"; break; + } + s << ">"; return std::move(s); } /* the following two functions are executed by exactly one worker per Conn object */ void ConnPool::Conn::send_data(int fd, int events) { - if (!(events & Event::WRITE)) return; + if (events & Event::ERROR) + { + worker_terminate(); + return; + } auto conn = self(); /* pin the connection */ ssize_t ret = seg_buff_size; for (;;) @@ -88,7 +99,11 @@ void ConnPool::Conn::send_data(int fd, int events) { } void ConnPool::Conn::recv_data(int fd, int events) { - if (!(events & Event::READ)) return; + if (events & Event::ERROR) + { + worker_terminate(); + return; + } auto conn = self(); /* pin the connection */ ssize_t ret = seg_buff_size; while (ret == (ssize_t)seg_buff_size) @@ -125,24 +140,29 @@ void ConnPool::Conn::stop() { ev_connect.clear(); ev_socket.clear(); send_buffer.get_queue().unreg_handler(); - ::close(fd); mode = ConnMode::DEAD; } } void ConnPool::Conn::worker_terminate() { stop(); - cpool->disp_tcall->call( - [cpool=this->cpool, fd=this->fd](ThreadCall::Handle &) { - cpool->remove_conn(fd); - }); + if (!worker->is_dispatcher()) + cpool->disp_tcall->async_call( + [cpool=this->cpool, fd=this->fd](ThreadCall::Handle &) { + cpool->remove_conn(fd); + }); + else cpool->remove_conn(fd); } void ConnPool::Conn::disp_terminate() { if (worker && !worker->is_dispatcher()) - worker->get_tcall()->call([conn=self()](ThreadCall::Handle &) { - conn->stop(); - }); + { + auto conn = self(); + if (conn) + worker->get_tcall()->call([conn](ThreadCall::Handle &) { + conn->stop(); + }); + } else stop(); cpool->remove_conn(fd); } @@ -277,12 +297,13 @@ void ConnPool::remove_conn(int fd) { /* temporarily pin the conn before it dies */ auto conn = it->second; //assert(conn->fd == fd); - conn->fd = -1; pool.erase(it); /* inform the upper layer the connection will be destroyed */ conn->on_teardown(); update_conn(conn, false); conn->self_ref = nullptr; /* remove the self-cycle */ + ::close(conn->fd); + conn->fd = -1; } } diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp new file mode 100644 index 0000000..3546329 --- /dev/null +++ b/test/test_p2p_stress.cpp @@ -0,0 +1,113 @@ +/** + * Copyright (c) 2018 Cornell University. + * + * Author: Ted Yin <tederminant@gmail.com> + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is furnished to do + * so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include <cstdio> +#include <string> +#include <functional> +#include <openssl/rand.h> + +#include "salticidae/msg.h" +#include "salticidae/event.h" +#include "salticidae/network.h" +#include "salticidae/stream.h" + +using salticidae::NetAddr; +using salticidae::DataStream; +using salticidae::MsgNetwork; +using salticidae::htole; +using salticidae::letoh; +using salticidae::bytearray_t; +using std::placeholders::_1; +using std::placeholders::_2; + +/** Hello Message. */ +struct MsgRand { + static const uint8_t opcode = 0x0; + DataStream serialized; + bytearray_t bytes; + /** Defines how to serialize the msg. */ + MsgRand(size_t size) { + bytearray_t bytes; + bytes.resize(size); + RAND_bytes(&bytes[0], size); + serialized << htole((uint32_t)size) << std::move(bytes); + } + /** Defines how to parse the msg. */ + MsgRand(DataStream &&s) { + uint32_t len; + s >> len; + bytes = std::move(s); + } +}; + +/** Acknowledgement Message. */ +struct MsgAck { + static const uint8_t opcode = 0x1; + DataStream serialized; + MsgAck() {} + MsgAck(DataStream &&s) {} +}; + +const uint8_t MsgRand::opcode; +const uint8_t MsgAck::opcode; + +using MyNet = salticidae::PeerNetwork<uint8_t>; + +std::vector<NetAddr> addrs = { + NetAddr("127.0.0.1:12345"), + NetAddr("127.0.0.1:12346"), + NetAddr("127.0.0.1:12347"), + NetAddr("127.0.0.1:12348") +}; + +void signal_handler(int) { + throw salticidae::SalticidaeError("got termination signal"); +} + +int main(int argc, char **argv) { + signal(SIGTERM, signal_handler); + signal(SIGINT, signal_handler); + + std::vector<std::thread> nodes; + + for (auto &addr: addrs) + { + nodes.push_back(std::thread([addr]() { + salticidae::EventContext ec; + /* test two nodes */ + MyNet net(ec, MyNet::Config( + salticidae::ConnPool::Config() + .nworker(2)).conn_timeout(5).ping_period(2)); + try { + net.start(); + net.listen(addr); + for (auto &paddr: addrs) + if (paddr != addr) net.add_peer(paddr); + ec.dispatch(); + } catch (salticidae::SalticidaeError &e) {} + })); + } + for (auto &t: nodes) t.join(); + return 0; +} |