From 4fdac38ad5796cae03f827670655efd79d953699 Mon Sep 17 00:00:00 2001 From: Determinant Date: Mon, 19 Nov 2018 00:54:56 -0500 Subject: fix bug in remove_conn --- include/salticidae/conn.h | 3 ++- src/conn.cpp | 28 +++++++++++++++------------- test/bench_network.cpp | 4 +++- test/test_p2p_stress.cpp | 42 +++++++++++++++++++++++++----------------- 4 files changed, 45 insertions(+), 32 deletions(-) diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 777c480..ea5ccf2 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -196,6 +196,7 @@ class ConnPool { std::this_thread::get_id()); return; } + assert(conn->fd != -1); SALTICIDAE_LOG_INFO("worker %x got %s", std::this_thread::get_id(), std::string(*conn).c_str()); @@ -243,7 +244,7 @@ class ConnPool { void accept_client(int, int); conn_t add_conn(const conn_t &conn); - void remove_conn(int fd); + void del_conn(const conn_t &conn); protected: conn_t _connect(const NetAddr &addr); diff --git a/src/conn.cpp b/src/conn.cpp index 413a582..b131684 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -145,26 +145,26 @@ void ConnPool::Conn::stop() { } void ConnPool::Conn::worker_terminate() { + auto conn = self(); + if (!conn) return; stop(); if (!worker->is_dispatcher()) cpool->disp_tcall->async_call( - [cpool=this->cpool, fd=this->fd](ThreadCall::Handle &) { - cpool->remove_conn(fd); + [cpool=this->cpool, conn](ThreadCall::Handle &) { + cpool->del_conn(conn); }); - else cpool->remove_conn(fd); + else cpool->del_conn(conn); } void ConnPool::Conn::disp_terminate() { + auto conn = self(); + if (!conn) return; if (worker && !worker->is_dispatcher()) - { - auto conn = self(); - if (conn) - worker->get_tcall()->call([conn](ThreadCall::Handle &) { - conn->stop(); - }); - } + worker->get_tcall()->call([conn](ThreadCall::Handle &) { + conn->stop(); + }); else stop(); - cpool->remove_conn(fd); + cpool->del_conn(conn); } void ConnPool::accept_client(int fd, int) { @@ -201,6 +201,7 @@ void ConnPool::accept_client(int fd, int) { void ConnPool::Conn::conn_server(int fd, int events) { auto conn = self(); /* pin the connection */ + if (!conn) return; if (send(fd, "", 0, MSG_NOSIGNAL) == 0) { ev_connect.clear(); @@ -290,8 +291,8 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) { return conn; } -void ConnPool::remove_conn(int fd) { - auto it = pool.find(fd); +void ConnPool::del_conn(const conn_t &conn) { + auto it = pool.find(conn->fd); if (it != pool.end()) { /* temporarily pin the conn before it dies */ @@ -303,6 +304,7 @@ void ConnPool::remove_conn(int fd) { update_conn(conn, false); conn->release_self(); /* remove the self-cycle */ ::close(conn->fd); + SALTICIDAE_LOG_INFO("remove_conn: %s", std::string(*conn).c_str()); conn->fd = -1; } } diff --git a/test/bench_network.cpp b/test/bench_network.cpp index 61307e1..b0856b8 100644 --- a/test/bench_network.cpp +++ b/test/bench_network.cpp @@ -83,7 +83,7 @@ struct MyNet: public MsgNetworkByteOp { name(name), peer(peer), ev_period_stat(ec, -1, [this, stat_timeout](int, short) { - printf("%.2f mps\n", nrecv / (double)stat_timeout); + SALTICIDAE_LOG_INFO("%.2f mps\n", nrecv / (double)stat_timeout); nrecv = 0; ev_period_stat.add_with_timeout(stat_timeout, 0); }), @@ -154,10 +154,12 @@ int main() { sigaction(SIGINT, &sa, NULL); /* test two nodes */ MyNet alice(ec, "Alice", bob_addr, 10); + alice.start(); alice.listen(alice_addr); std::thread bob_thread([]() { salticidae::EventContext ec; MyNet bob(ec, "Bob", alice_addr); + bob.start(); bob.connect(alice_addr); try { ec.dispatch(); diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp index cc61948..318e344 100644 --- a/test/test_p2p_stress.cpp +++ b/test/test_p2p_stress.cpp @@ -81,12 +81,23 @@ struct TestContext { Event timer; int state; uint256_t hash; + size_t ncompleted; + TestContext(): ncompleted(0) {} }; -void install_proto(EventContext &ec, MyNet &net, - std::unordered_map &_tc, const size_t &seg_buff_size) { +struct AppContext { + NetAddr addr; + EventContext ec; + BoxObj net; + BoxObj tcall; + std::unordered_map tc; +}; + +void install_proto(AppContext &app, const size_t &seg_buff_size) { + auto &ec = app.ec; + auto &net = *app.net; auto send_rand = [&](int size, const MyNet::conn_t &conn) { - auto &tc = _tc[conn->get_addr()]; + auto &tc = app.tc[conn->get_addr()]; MsgRand msg(size); tc.hash = msg.serialized.get_hash(); net.send_msg(std::move(msg), conn); @@ -96,7 +107,7 @@ void install_proto(EventContext &ec, MyNet &net, { if (conn->get_mode() == ConnPool::Conn::ACTIVE) { - auto &tc = _tc[conn->get_addr()]; + auto &tc = app.tc[conn->get_addr()]; tc.state = 1; SALTICIDAE_LOG_INFO("increasing phase"); send_rand(tc.state, static_pointer_cast(conn)); @@ -108,7 +119,7 @@ void install_proto(EventContext &ec, MyNet &net, net.send_msg(MsgAck(hash), conn); }); net.reg_handler([&, send_rand](MsgAck &&msg, const MyNet::conn_t &conn) { - auto &tc = _tc[conn->get_addr()]; + auto &tc = app.tc[conn->get_addr()]; if (msg.hash != tc.hash) { SALTICIDAE_LOG_ERROR("corrupted I/O!"); @@ -119,8 +130,13 @@ void install_proto(EventContext &ec, MyNet &net, { send_rand(tc.state, conn); tc.state = -1; - tc.timer = Event(ec, -1, [&net, conn](int, int) { + tc.timer = Event(ec, -1, [&, conn](int, int) { + tc.ncompleted++; net.terminate(conn); + std::string s; + for (const auto &p: app.tc) + s += salticidae::stringprintf(" %d(%d)", p.first.port, p.second.ncompleted); + SALTICIDAE_LOG_INFO("%d completed:%s", app.addr.port, s.c_str()); }); double t = salticidae::gen_rand_timeout(10); tc.timer.add_with_timeout(t, 0); @@ -133,14 +149,6 @@ void install_proto(EventContext &ec, MyNet &net, }); } -struct AppContext { - NetAddr addr; - EventContext ec; - BoxObj net; - BoxObj tcall; - std::unordered_map tc; -}; - int main(int argc, char **argv) { Config config; auto opt_no_msg = Config::OptValFlag::create(false); @@ -148,7 +156,7 @@ int main(int argc, char **argv) { auto opt_seg_buff_size = Config::OptValInt::create(4096); auto opt_nworker = Config::OptValInt::create(2); auto opt_conn_timeout = Config::OptValDouble::create(5); - auto opt_ping_peroid = Config::OptValDouble::create(5); + auto opt_ping_peroid = Config::OptValDouble::create(2); auto opt_help = Config::OptValFlag::create(false); config.add_opt("no-msg", opt_no_msg, Config::SWITCH_ON); config.add_opt("npeers", opt_npeers, Config::SET_VAL); @@ -181,7 +189,7 @@ int main(int argc, char **argv) { .ping_period(opt_ping_peroid->get())); a.tcall = new ThreadCall(a.ec); if (!opt_no_msg->get()) - install_proto(a.ec, *a.net, a.tc, seg_buff_size); + install_proto(a, seg_buff_size); a.net->start(); } @@ -197,7 +205,7 @@ int main(int argc, char **argv) { for (auto &a: apps) { auto &tc = a.tcall; - tc->async_call([ec=tc->get_ec()](ThreadCall::Handle &) { + tc->async_call([ec=a.ec](ThreadCall::Handle &) { ec.stop(); }); } -- cgit v1.2.3