From 39f6d6ac46d440aa68e7b1a2f2e4eb629356af34 Mon Sep 17 00:00:00 2001 From: Determinant Date: Sun, 18 Nov 2018 00:51:19 -0500 Subject: fix bugs --- test/bench_network.cpp | 4 +- test/test_network.cpp | 14 ++-- test/test_p2p_stress.cpp | 163 ++++++++++++++++++++++++++--------------------- 3 files changed, 98 insertions(+), 83 deletions(-) (limited to 'test') diff --git a/test/bench_network.cpp b/test/bench_network.cpp index d63d377..61307e1 100644 --- a/test/bench_network.cpp +++ b/test/bench_network.cpp @@ -111,7 +111,7 @@ struct MyNet: public MsgNetworkByteOp { /* send the first message through this connection */ net->ev_period_send = Event(net->ec, -1, [net, conn = self()](int, short) { - net->send_msg(MsgBytes(256), *conn); + net->send_msg(MsgBytes(256), conn); net->ev_period_send.add_with_timeout(0, 0); }); net->ev_period_send.add_with_timeout(0, 0); @@ -134,7 +134,7 @@ struct MyNet: public MsgNetworkByteOp { return new Conn(); } - void on_receive_bytes(MsgBytes &&msg, Conn &conn) { + void on_receive_bytes(MsgBytes &&msg, const conn_t &conn) { nrecv++; } }; diff --git a/test/test_network.cpp b/test/test_network.cpp index e439bca..6a12117 100644 --- a/test/test_network.cpp +++ b/test/test_network.cpp @@ -89,16 +89,16 @@ struct MyNet: public MsgNetworkByteOp { reg_handler(salticidae::generic_bind( &MyNet::on_receive_hello, this, _1, _2)); - reg_conn_handler([this](ConnPool::Conn &conn, bool connected) { + reg_conn_handler([this](const ConnPool::conn_t &conn, bool connected) { if (connected) { - if (conn.get_mode() == ConnPool::Conn::ACTIVE) + if (conn->get_mode() == ConnPool::Conn::ACTIVE) { printf("[%s] Connected, sending hello.\n", this->name.c_str()); /* send the first message through this connection */ send_msg(MsgHello(this->name, "Hello there!"), - static_cast(conn)); + salticidae::static_pointer_cast(conn)); } else printf("[%s] Accepted, waiting for greetings.\n", @@ -108,12 +108,12 @@ struct MyNet: public MsgNetworkByteOp { { printf("[%s] Disconnected, retrying.\n", this->name.c_str()); /* try to reconnect to the same address */ - connect(conn.get_addr()); + connect(conn->get_addr()); } }); } - void on_receive_hello(MsgHello &&msg, MyNet::Conn &conn) { + void on_receive_hello(MsgHello &&msg, const MyNet::conn_t &conn) { printf("[%s] %s says %s\n", name.c_str(), msg.name.c_str(), msg.text.c_str()); @@ -123,8 +123,8 @@ struct MyNet: public MsgNetworkByteOp { }; -void on_receive_ack(MsgAck &&msg, MyNet::Conn &conn) { - auto net = static_cast(conn.get_net()); +void on_receive_ack(MsgAck &&msg, const MyNet::conn_t &conn) { + auto net = static_cast(conn->get_net()); printf("[%s] the peer knows\n", net->name.c_str()); } diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp index 0f479e4..b58bcc7 100644 --- a/test/test_p2p_stress.cpp +++ b/test/test_p2p_stress.cpp @@ -37,44 +37,35 @@ using salticidae::DataStream; using salticidae::MsgNetwork; using salticidae::ConnPool; using salticidae::Event; +using salticidae::EventContext; using salticidae::htole; using salticidae::letoh; using salticidae::bytearray_t; using salticidae::uint256_t; +using salticidae::static_pointer_cast; +using salticidae::Config; using std::placeholders::_1; using std::placeholders::_2; -const size_t SEG_BUFF_SIZE = 4096; - -/** Hello Message. */ struct MsgRand { static const uint8_t opcode = 0x0; DataStream serialized; bytearray_t bytes; - /** Defines how to serialize the msg. */ MsgRand(size_t size) { bytearray_t bytes; bytes.resize(size); RAND_bytes(&bytes[0], size); serialized << std::move(bytes); } - /** Defines how to parse the msg. */ - MsgRand(DataStream &&s) { - bytes = s; - } + MsgRand(DataStream &&s) { bytes = s; } }; -/** Acknowledgement Message. */ struct MsgAck { static const uint8_t opcode = 0x1; uint256_t hash; DataStream serialized; - MsgAck(const uint256_t &hash) { - serialized << hash; - } - MsgAck(DataStream &&s) { - s >> hash; - } + MsgAck(const uint256_t &hash) { serialized << hash; } + MsgAck(DataStream &&s) { s >> hash; } }; const uint8_t MsgRand::opcode; @@ -88,73 +79,97 @@ void signal_handler(int) { throw salticidae::SalticidaeError("got termination signal"); } +struct TestContext { + Event timer; + int state; + uint256_t hash; +}; + +void install_proto(EventContext &ec, MyNet &net, + std::unordered_map &_tc, const size_t &seg_buff_size) { + auto send_rand = [&](int size, const MyNet::conn_t &conn) { + auto &tc = _tc[conn->get_addr()]; + MsgRand msg(size); + tc.hash = msg.serialized.get_hash(); + net.send_msg(std::move(msg), conn); + }; + net.reg_conn_handler([&, send_rand](const ConnPool::conn_t &conn, bool connected) { + if (connected) + { + if (conn->get_mode() == ConnPool::Conn::ACTIVE) + { + auto &tc = _tc[conn->get_addr()]; + tc.state = 1; + SALTICIDAE_LOG_INFO("increasing phase"); + send_rand(tc.state, static_pointer_cast(conn)); + } + } + }); + net.reg_handler([&](MsgRand &&msg, const MyNet::conn_t &conn) { + uint256_t hash = salticidae::get_hash(msg.bytes); + net.send_msg(MsgAck(hash), conn); + }); + net.reg_handler([&, send_rand](MsgAck &&msg, const MyNet::conn_t &conn) { + auto &tc = _tc[conn->get_addr()]; + if (msg.hash != tc.hash) + { + SALTICIDAE_LOG_ERROR("corrupted I/O!"); + exit(1); + } + + if (tc.state == seg_buff_size * 2) + { + send_rand(tc.state, conn); + tc.state = -1; + tc.timer = Event(ec, -1, [&net, conn](int, int) { + net.terminate(conn); + }); + double t = salticidae::gen_rand_timeout(10); + tc.timer.add_with_timeout(t, 0); + SALTICIDAE_LOG_INFO("rand-bomboard phase, ending in %.2f secs", t); + } + else if (tc.state == -1) + send_rand(rand() % (seg_buff_size * 10), conn); + else + send_rand(++tc.state, conn); + }); +} + int main(int argc, char **argv) { signal(SIGTERM, signal_handler); signal(SIGINT, signal_handler); - - int n = argc > 1 ? atoi(argv[1]) : 5; - for (int i = 0; i < n; i++) + Config config; + auto opt_no_msg = Config::OptValFlag::create(false); + auto opt_npeers = Config::OptValInt::create(5); + auto opt_seg_buff_size = Config::OptValInt::create(4096); + auto opt_nworker = Config::OptValInt::create(2); + auto opt_help = Config::OptValFlag::create(false); + config.add_opt("no-msg", opt_no_msg, Config::SWITCH_ON); + config.add_opt("npeers", opt_npeers, Config::SET_VAL); + config.add_opt("seg-buff-size", opt_seg_buff_size, Config::SET_VAL); + config.add_opt("nworker", opt_nworker, Config::SET_VAL); + config.add_opt("help", opt_help, Config::SWITCH_ON, 'h', "show this help info"); + config.parse(argc, argv); + if (opt_help->get()) + { + config.print_help(); + exit(0); + } + size_t seg_buff_size = opt_seg_buff_size->get(); + for (int i = 0; i < opt_npeers->get(); i++) addrs.push_back(NetAddr("127.0.0.1:" + std::to_string(12345 + i))); - std::vector nodes; + std::vector peers; for (auto &addr: addrs) { - nodes.push_back(std::thread([addr]() { - salticidae::EventContext ec; - /* test two nodes */ + peers.push_back(std::thread([&, addr]() { + EventContext ec; + std::unordered_map tc; MyNet net(ec, MyNet::Config( salticidae::ConnPool::Config() - .nworker(2).seg_buff_size(SEG_BUFF_SIZE)) + .nworker(opt_nworker->get()).seg_buff_size(seg_buff_size)) .conn_timeout(5).ping_period(2)); - int state; - uint256_t hash; - auto send_rand = [&net, &hash](int size, MyNet::Conn &conn) { - MsgRand msg(size); - hash = msg.serialized.get_hash(); - net.send_msg(std::move(msg), conn); - }; - Event timer; - net.reg_conn_handler([&state, &net, &send_rand](salticidae::ConnPool::Conn &conn, bool connected) { - if (connected) - { - if (conn.get_mode() == ConnPool::Conn::ACTIVE) - { - state = 1; - SALTICIDAE_LOG_INFO("increasing phase"); - send_rand(state, static_cast(conn)); - } - } - }); - net.reg_handler([&state, &net](MsgRand &&msg, MyNet::Conn &conn) { - uint256_t hash = salticidae::get_hash(msg.bytes); - net.send_msg(MsgAck(hash), conn); - }); - net.reg_handler([&state, &net, &hash, &send_rand, &ec, &timer](MsgAck &&msg, MyNet::Conn &conn) { - if (msg.hash != hash) - { - SALTICIDAE_LOG_ERROR("corrupted I/O!"); - exit(1); - } - - if (state == SEG_BUFF_SIZE * 2) - { - send_rand(state, conn); - state = -1; - timer = Event(ec, -1, [&net, conn=conn.self()](int, int) { - net.terminate(*conn); - }); - double t = salticidae::gen_rand_timeout(10); - timer.add_with_timeout(t, 0); - SALTICIDAE_LOG_INFO("rand-bomboard phase, ending in %.2f secs", t); - } - else if (state == -1) - { - send_rand(rand() % (SEG_BUFF_SIZE * 10), conn); - } - else - { - send_rand(++state, conn); - } - }); + if (!opt_no_msg->get()) + install_proto(ec, net, tc, seg_buff_size); try { net.start(); net.listen(addr); @@ -164,6 +179,6 @@ int main(int argc, char **argv) { } catch (salticidae::SalticidaeError &e) {} })); } - for (auto &t: nodes) t.join(); + for (auto &t: peers) t.join(); return 0; } -- cgit v1.2.3-70-g09d2