From 39f6d6ac46d440aa68e7b1a2f2e4eb629356af34 Mon Sep 17 00:00:00 2001 From: Determinant Date: Sun, 18 Nov 2018 00:51:19 -0500 Subject: fix bugs --- include/salticidae/conn.h | 8 +-- include/salticidae/network.h | 38 +++++----- include/salticidae/util.h | 1 + test/bench_network.cpp | 4 +- test/test_network.cpp | 14 ++-- test/test_p2p_stress.cpp | 163 +++++++++++++++++++++++-------------------- 6 files changed, 119 insertions(+), 109 deletions(-) diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 69402d3..c357875 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -63,7 +63,7 @@ class ConnPool { /** The handle to a bi-directional connection. */ using conn_t = ArcObj; /** The type of callback invoked when connection status is changed. */ - using conn_callback_t = std::function; + using conn_callback_t = std::function; /** Abstraction for a bi-directional connection. */ class Conn { friend ConnPool; @@ -168,7 +168,7 @@ class ConnPool { void update_conn(const conn_t &conn, bool connected) { user_tcall->async_call([this, conn, connected](ThreadCall::Handle &) { - if (conn_cb) conn_cb(*conn, connected); + if (conn_cb) conn_cb(conn, connected); }); } @@ -391,9 +391,7 @@ class ConnPool { template void reg_conn_handler(Func cb) { conn_cb = cb; } - void terminate(Conn &_conn) { - auto conn = _conn.self(); - if (!conn) return; + void terminate(const conn_t &conn) { disp_tcall->async_call([this, conn](ThreadCall::Handle &) { conn->disp_terminate(); }); diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 4e182fc..43638cf 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -46,7 +46,7 @@ class MsgNetwork: public ConnPool { template struct callback_traits { using ret_type = ReturnType; - using conn_type = ConnType; + using conn_type = typename std::remove_reference::type::type; using msg_type = typename std::remove_reference::type; }; @@ -121,7 +121,7 @@ class MsgNetwork: public ConnPool { private: std::unordered_map< typename Msg::opcode_t, - std::function> handler_map; + std::function> handler_map; using queue_t = MPSCQueueEventDriven>; queue_t incoming_msgs; @@ -168,7 +168,7 @@ class MsgNetwork: public ConnPool { SALTICIDAE_LOG_DEBUG("got message %s from %s", std::string(msg).c_str(), std::string(*conn).c_str()); - it->second(msg, *conn); + it->second(msg, conn); #ifdef SALTICIDAE_MSG_STAT conn->nrecv++; recv_by_opcode.add(msg); @@ -185,14 +185,14 @@ class MsgNetwork: public ConnPool { typename callback_traits::msg_type, DataStream &&>::value>::type reg_handler(Func handler) { using callback_t = callback_traits; - handler_map[callback_t::msg_type::opcode] = [handler](const Msg &msg, Conn &conn) { + handler_map[callback_t::msg_type::opcode] = [handler](const Msg &msg, const conn_t &conn) { handler(typename callback_t::msg_type(msg.get_payload()), - static_cast(conn)); + static_pointer_cast(conn)); }; } template - void send_msg(const MsgType &msg, Conn &conn); + void send_msg(const MsgType &msg, const conn_t &conn); using ConnPool::listen; #ifdef SALTICIDAE_MSG_STAT msg_stat_by_opcode_t &get_sent_by_opcode() const { @@ -357,8 +357,8 @@ class PeerNetwork: public MsgNetwork { } }; - void msg_ping(MsgPing &&msg, Conn &conn); - void msg_pong(MsgPong &&msg, Conn &conn); + void msg_ping(MsgPing &&msg, const conn_t &conn); + void msg_pong(MsgPong &&msg, const conn_t &conn); void _ping_msg_cb(const conn_t &conn, uint16_t port); void _pong_msg_cb(const conn_t &conn, uint16_t port); bool check_new_conn(const conn_t &conn, uint16_t port); @@ -472,15 +472,15 @@ void MsgNetwork::Conn::on_read() { template template -void MsgNetwork::send_msg(const MsgType &_msg, Conn &conn) { +void MsgNetwork::send_msg(const MsgType &_msg, const conn_t &conn) { Msg msg(_msg); bytearray_t msg_data = msg.serialize(); SALTICIDAE_LOG_DEBUG("wrote message %s to %s", std::string(msg).c_str(), - std::string(conn).c_str()); - conn.write(std::move(msg_data)); + std::string(*conn).c_str()); + conn->write(std::move(msg_data)); #ifdef SALTICIDAE_MSG_STAT - conn.nsent++; + conn->nsent++; sent_by_opcode.add(msg); #endif } @@ -510,7 +510,7 @@ void PeerNetwork::Conn::on_setup() { }); /* the initial ping-pong to set up the connection */ tcall_reset_timeout(worker, conn, pn->conn_timeout); - pn->send_msg(MsgPing(pn->listen_port), *conn); + pn->send_msg(MsgPing(pn->listen_port), conn); } template @@ -564,7 +564,7 @@ void PeerNetwork::Peer::send_ping() { ping_timer_ok = false; pong_msg_ok = false; tcall_reset_timeout(conn->worker, conn, pn->conn_timeout); - pn->send_msg(MsgPing(pn->listen_port), *conn); + pn->send_msg(MsgPing(pn->listen_port), conn); } template @@ -621,9 +621,7 @@ void PeerNetwork::start_active_conn(const NetAddr &addr) { /* begin: functions invoked by the user loop */ template -void PeerNetwork::msg_ping(MsgPing &&msg, Conn &_conn) { - auto conn = static_pointer_cast(_conn.self()); - if (!conn) return; +void PeerNetwork::msg_ping(MsgPing &&msg, const conn_t &conn) { uint16_t port = msg.port; this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &msg) { if (conn->get_mode() == ConnPool::Conn::DEAD) return; @@ -631,14 +629,12 @@ void PeerNetwork::msg_ping(MsgPing &&msg, Conn &_conn) { std::string(*conn).c_str(), ntohs(port)); if (check_new_conn(conn, port)) return; auto p = id2peer.find(conn->peer_id)->second.get(); - send_msg(MsgPong(this->listen_port), *conn); + send_msg(MsgPong(this->listen_port), conn); }); } template -void PeerNetwork::msg_pong(MsgPong &&msg, Conn &_conn) { - auto conn = static_pointer_cast(_conn.self()); - if (!conn) return; +void PeerNetwork::msg_pong(MsgPong &&msg, const conn_t &conn) { uint16_t port = msg.port; this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &msg) { if (conn->get_mode() == ConnPool::Conn::DEAD) return; diff --git a/include/salticidae/util.h b/include/salticidae/util.h index 4e6825a..7552f8e 100644 --- a/include/salticidae/util.h +++ b/include/salticidae/util.h @@ -302,6 +302,7 @@ class Config { void update(Opt &opt, const char *optval); public: + Config() {} Config(const std::string &conf_fname): conf_fname(conf_fname), opt_val_conf(OptValConf::create(this)) { diff --git a/test/bench_network.cpp b/test/bench_network.cpp index d63d377..61307e1 100644 --- a/test/bench_network.cpp +++ b/test/bench_network.cpp @@ -111,7 +111,7 @@ struct MyNet: public MsgNetworkByteOp { /* send the first message through this connection */ net->ev_period_send = Event(net->ec, -1, [net, conn = self()](int, short) { - net->send_msg(MsgBytes(256), *conn); + net->send_msg(MsgBytes(256), conn); net->ev_period_send.add_with_timeout(0, 0); }); net->ev_period_send.add_with_timeout(0, 0); @@ -134,7 +134,7 @@ struct MyNet: public MsgNetworkByteOp { return new Conn(); } - void on_receive_bytes(MsgBytes &&msg, Conn &conn) { + void on_receive_bytes(MsgBytes &&msg, const conn_t &conn) { nrecv++; } }; diff --git a/test/test_network.cpp b/test/test_network.cpp index e439bca..6a12117 100644 --- a/test/test_network.cpp +++ b/test/test_network.cpp @@ -89,16 +89,16 @@ struct MyNet: public MsgNetworkByteOp { reg_handler(salticidae::generic_bind( &MyNet::on_receive_hello, this, _1, _2)); - reg_conn_handler([this](ConnPool::Conn &conn, bool connected) { + reg_conn_handler([this](const ConnPool::conn_t &conn, bool connected) { if (connected) { - if (conn.get_mode() == ConnPool::Conn::ACTIVE) + if (conn->get_mode() == ConnPool::Conn::ACTIVE) { printf("[%s] Connected, sending hello.\n", this->name.c_str()); /* send the first message through this connection */ send_msg(MsgHello(this->name, "Hello there!"), - static_cast(conn)); + salticidae::static_pointer_cast(conn)); } else printf("[%s] Accepted, waiting for greetings.\n", @@ -108,12 +108,12 @@ struct MyNet: public MsgNetworkByteOp { { printf("[%s] Disconnected, retrying.\n", this->name.c_str()); /* try to reconnect to the same address */ - connect(conn.get_addr()); + connect(conn->get_addr()); } }); } - void on_receive_hello(MsgHello &&msg, MyNet::Conn &conn) { + void on_receive_hello(MsgHello &&msg, const MyNet::conn_t &conn) { printf("[%s] %s says %s\n", name.c_str(), msg.name.c_str(), msg.text.c_str()); @@ -123,8 +123,8 @@ struct MyNet: public MsgNetworkByteOp { }; -void on_receive_ack(MsgAck &&msg, MyNet::Conn &conn) { - auto net = static_cast(conn.get_net()); +void on_receive_ack(MsgAck &&msg, const MyNet::conn_t &conn) { + auto net = static_cast(conn->get_net()); printf("[%s] the peer knows\n", net->name.c_str()); } diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp index 0f479e4..b58bcc7 100644 --- a/test/test_p2p_stress.cpp +++ b/test/test_p2p_stress.cpp @@ -37,44 +37,35 @@ using salticidae::DataStream; using salticidae::MsgNetwork; using salticidae::ConnPool; using salticidae::Event; +using salticidae::EventContext; using salticidae::htole; using salticidae::letoh; using salticidae::bytearray_t; using salticidae::uint256_t; +using salticidae::static_pointer_cast; +using salticidae::Config; using std::placeholders::_1; using std::placeholders::_2; -const size_t SEG_BUFF_SIZE = 4096; - -/** Hello Message. */ struct MsgRand { static const uint8_t opcode = 0x0; DataStream serialized; bytearray_t bytes; - /** Defines how to serialize the msg. */ MsgRand(size_t size) { bytearray_t bytes; bytes.resize(size); RAND_bytes(&bytes[0], size); serialized << std::move(bytes); } - /** Defines how to parse the msg. */ - MsgRand(DataStream &&s) { - bytes = s; - } + MsgRand(DataStream &&s) { bytes = s; } }; -/** Acknowledgement Message. */ struct MsgAck { static const uint8_t opcode = 0x1; uint256_t hash; DataStream serialized; - MsgAck(const uint256_t &hash) { - serialized << hash; - } - MsgAck(DataStream &&s) { - s >> hash; - } + MsgAck(const uint256_t &hash) { serialized << hash; } + MsgAck(DataStream &&s) { s >> hash; } }; const uint8_t MsgRand::opcode; @@ -88,73 +79,97 @@ void signal_handler(int) { throw salticidae::SalticidaeError("got termination signal"); } +struct TestContext { + Event timer; + int state; + uint256_t hash; +}; + +void install_proto(EventContext &ec, MyNet &net, + std::unordered_map &_tc, const size_t &seg_buff_size) { + auto send_rand = [&](int size, const MyNet::conn_t &conn) { + auto &tc = _tc[conn->get_addr()]; + MsgRand msg(size); + tc.hash = msg.serialized.get_hash(); + net.send_msg(std::move(msg), conn); + }; + net.reg_conn_handler([&, send_rand](const ConnPool::conn_t &conn, bool connected) { + if (connected) + { + if (conn->get_mode() == ConnPool::Conn::ACTIVE) + { + auto &tc = _tc[conn->get_addr()]; + tc.state = 1; + SALTICIDAE_LOG_INFO("increasing phase"); + send_rand(tc.state, static_pointer_cast(conn)); + } + } + }); + net.reg_handler([&](MsgRand &&msg, const MyNet::conn_t &conn) { + uint256_t hash = salticidae::get_hash(msg.bytes); + net.send_msg(MsgAck(hash), conn); + }); + net.reg_handler([&, send_rand](MsgAck &&msg, const MyNet::conn_t &conn) { + auto &tc = _tc[conn->get_addr()]; + if (msg.hash != tc.hash) + { + SALTICIDAE_LOG_ERROR("corrupted I/O!"); + exit(1); + } + + if (tc.state == seg_buff_size * 2) + { + send_rand(tc.state, conn); + tc.state = -1; + tc.timer = Event(ec, -1, [&net, conn](int, int) { + net.terminate(conn); + }); + double t = salticidae::gen_rand_timeout(10); + tc.timer.add_with_timeout(t, 0); + SALTICIDAE_LOG_INFO("rand-bomboard phase, ending in %.2f secs", t); + } + else if (tc.state == -1) + send_rand(rand() % (seg_buff_size * 10), conn); + else + send_rand(++tc.state, conn); + }); +} + int main(int argc, char **argv) { signal(SIGTERM, signal_handler); signal(SIGINT, signal_handler); - - int n = argc > 1 ? atoi(argv[1]) : 5; - for (int i = 0; i < n; i++) + Config config; + auto opt_no_msg = Config::OptValFlag::create(false); + auto opt_npeers = Config::OptValInt::create(5); + auto opt_seg_buff_size = Config::OptValInt::create(4096); + auto opt_nworker = Config::OptValInt::create(2); + auto opt_help = Config::OptValFlag::create(false); + config.add_opt("no-msg", opt_no_msg, Config::SWITCH_ON); + config.add_opt("npeers", opt_npeers, Config::SET_VAL); + config.add_opt("seg-buff-size", opt_seg_buff_size, Config::SET_VAL); + config.add_opt("nworker", opt_nworker, Config::SET_VAL); + config.add_opt("help", opt_help, Config::SWITCH_ON, 'h', "show this help info"); + config.parse(argc, argv); + if (opt_help->get()) + { + config.print_help(); + exit(0); + } + size_t seg_buff_size = opt_seg_buff_size->get(); + for (int i = 0; i < opt_npeers->get(); i++) addrs.push_back(NetAddr("127.0.0.1:" + std::to_string(12345 + i))); - std::vector nodes; + std::vector peers; for (auto &addr: addrs) { - nodes.push_back(std::thread([addr]() { - salticidae::EventContext ec; - /* test two nodes */ + peers.push_back(std::thread([&, addr]() { + EventContext ec; + std::unordered_map tc; MyNet net(ec, MyNet::Config( salticidae::ConnPool::Config() - .nworker(2).seg_buff_size(SEG_BUFF_SIZE)) + .nworker(opt_nworker->get()).seg_buff_size(seg_buff_size)) .conn_timeout(5).ping_period(2)); - int state; - uint256_t hash; - auto send_rand = [&net, &hash](int size, MyNet::Conn &conn) { - MsgRand msg(size); - hash = msg.serialized.get_hash(); - net.send_msg(std::move(msg), conn); - }; - Event timer; - net.reg_conn_handler([&state, &net, &send_rand](salticidae::ConnPool::Conn &conn, bool connected) { - if (connected) - { - if (conn.get_mode() == ConnPool::Conn::ACTIVE) - { - state = 1; - SALTICIDAE_LOG_INFO("increasing phase"); - send_rand(state, static_cast(conn)); - } - } - }); - net.reg_handler([&state, &net](MsgRand &&msg, MyNet::Conn &conn) { - uint256_t hash = salticidae::get_hash(msg.bytes); - net.send_msg(MsgAck(hash), conn); - }); - net.reg_handler([&state, &net, &hash, &send_rand, &ec, &timer](MsgAck &&msg, MyNet::Conn &conn) { - if (msg.hash != hash) - { - SALTICIDAE_LOG_ERROR("corrupted I/O!"); - exit(1); - } - - if (state == SEG_BUFF_SIZE * 2) - { - send_rand(state, conn); - state = -1; - timer = Event(ec, -1, [&net, conn=conn.self()](int, int) { - net.terminate(*conn); - }); - double t = salticidae::gen_rand_timeout(10); - timer.add_with_timeout(t, 0); - SALTICIDAE_LOG_INFO("rand-bomboard phase, ending in %.2f secs", t); - } - else if (state == -1) - { - send_rand(rand() % (SEG_BUFF_SIZE * 10), conn); - } - else - { - send_rand(++state, conn); - } - }); + if (!opt_no_msg->get()) + install_proto(ec, net, tc, seg_buff_size); try { net.start(); net.listen(addr); @@ -164,6 +179,6 @@ int main(int argc, char **argv) { } catch (salticidae::SalticidaeError &e) {} })); } - for (auto &t: nodes) t.join(); + for (auto &t: peers) t.join(); return 0; } -- cgit v1.2.3