aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/salticidae/conn.h3
-rw-r--r--src/conn.cpp28
-rw-r--r--test/bench_network.cpp4
-rw-r--r--test/test_p2p_stress.cpp42
4 files changed, 45 insertions, 32 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 777c480..ea5ccf2 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -196,6 +196,7 @@ class ConnPool {
std::this_thread::get_id());
return;
}
+ assert(conn->fd != -1);
SALTICIDAE_LOG_INFO("worker %x got %s",
std::this_thread::get_id(),
std::string(*conn).c_str());
@@ -243,7 +244,7 @@ class ConnPool {
void accept_client(int, int);
conn_t add_conn(const conn_t &conn);
- void remove_conn(int fd);
+ void del_conn(const conn_t &conn);
protected:
conn_t _connect(const NetAddr &addr);
diff --git a/src/conn.cpp b/src/conn.cpp
index 413a582..b131684 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -145,26 +145,26 @@ void ConnPool::Conn::stop() {
}
void ConnPool::Conn::worker_terminate() {
+ auto conn = self();
+ if (!conn) return;
stop();
if (!worker->is_dispatcher())
cpool->disp_tcall->async_call(
- [cpool=this->cpool, fd=this->fd](ThreadCall::Handle &) {
- cpool->remove_conn(fd);
+ [cpool=this->cpool, conn](ThreadCall::Handle &) {
+ cpool->del_conn(conn);
});
- else cpool->remove_conn(fd);
+ else cpool->del_conn(conn);
}
void ConnPool::Conn::disp_terminate() {
+ auto conn = self();
+ if (!conn) return;
if (worker && !worker->is_dispatcher())
- {
- auto conn = self();
- if (conn)
- worker->get_tcall()->call([conn](ThreadCall::Handle &) {
- conn->stop();
- });
- }
+ worker->get_tcall()->call([conn](ThreadCall::Handle &) {
+ conn->stop();
+ });
else stop();
- cpool->remove_conn(fd);
+ cpool->del_conn(conn);
}
void ConnPool::accept_client(int fd, int) {
@@ -201,6 +201,7 @@ void ConnPool::accept_client(int fd, int) {
void ConnPool::Conn::conn_server(int fd, int events) {
auto conn = self(); /* pin the connection */
+ if (!conn) return;
if (send(fd, "", 0, MSG_NOSIGNAL) == 0)
{
ev_connect.clear();
@@ -290,8 +291,8 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) {
return conn;
}
-void ConnPool::remove_conn(int fd) {
- auto it = pool.find(fd);
+void ConnPool::del_conn(const conn_t &conn) {
+ auto it = pool.find(conn->fd);
if (it != pool.end())
{
/* temporarily pin the conn before it dies */
@@ -303,6 +304,7 @@ void ConnPool::remove_conn(int fd) {
update_conn(conn, false);
conn->release_self(); /* remove the self-cycle */
::close(conn->fd);
+ SALTICIDAE_LOG_INFO("remove_conn: %s", std::string(*conn).c_str());
conn->fd = -1;
}
}
diff --git a/test/bench_network.cpp b/test/bench_network.cpp
index 61307e1..b0856b8 100644
--- a/test/bench_network.cpp
+++ b/test/bench_network.cpp
@@ -83,7 +83,7 @@ struct MyNet: public MsgNetworkByteOp {
name(name),
peer(peer),
ev_period_stat(ec, -1, [this, stat_timeout](int, short) {
- printf("%.2f mps\n", nrecv / (double)stat_timeout);
+ SALTICIDAE_LOG_INFO("%.2f mps\n", nrecv / (double)stat_timeout);
nrecv = 0;
ev_period_stat.add_with_timeout(stat_timeout, 0);
}),
@@ -154,10 +154,12 @@ int main() {
sigaction(SIGINT, &sa, NULL);
/* test two nodes */
MyNet alice(ec, "Alice", bob_addr, 10);
+ alice.start();
alice.listen(alice_addr);
std::thread bob_thread([]() {
salticidae::EventContext ec;
MyNet bob(ec, "Bob", alice_addr);
+ bob.start();
bob.connect(alice_addr);
try {
ec.dispatch();
diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp
index cc61948..318e344 100644
--- a/test/test_p2p_stress.cpp
+++ b/test/test_p2p_stress.cpp
@@ -81,12 +81,23 @@ struct TestContext {
Event timer;
int state;
uint256_t hash;
+ size_t ncompleted;
+ TestContext(): ncompleted(0) {}
};
-void install_proto(EventContext &ec, MyNet &net,
- std::unordered_map<NetAddr, TestContext> &_tc, const size_t &seg_buff_size) {
+struct AppContext {
+ NetAddr addr;
+ EventContext ec;
+ BoxObj<MyNet> net;
+ BoxObj<ThreadCall> tcall;
+ std::unordered_map<NetAddr, TestContext> tc;
+};
+
+void install_proto(AppContext &app, const size_t &seg_buff_size) {
+ auto &ec = app.ec;
+ auto &net = *app.net;
auto send_rand = [&](int size, const MyNet::conn_t &conn) {
- auto &tc = _tc[conn->get_addr()];
+ auto &tc = app.tc[conn->get_addr()];
MsgRand msg(size);
tc.hash = msg.serialized.get_hash();
net.send_msg(std::move(msg), conn);
@@ -96,7 +107,7 @@ void install_proto(EventContext &ec, MyNet &net,
{
if (conn->get_mode() == ConnPool::Conn::ACTIVE)
{
- auto &tc = _tc[conn->get_addr()];
+ auto &tc = app.tc[conn->get_addr()];
tc.state = 1;
SALTICIDAE_LOG_INFO("increasing phase");
send_rand(tc.state, static_pointer_cast<MyNet::Conn>(conn));
@@ -108,7 +119,7 @@ void install_proto(EventContext &ec, MyNet &net,
net.send_msg(MsgAck(hash), conn);
});
net.reg_handler([&, send_rand](MsgAck &&msg, const MyNet::conn_t &conn) {
- auto &tc = _tc[conn->get_addr()];
+ auto &tc = app.tc[conn->get_addr()];
if (msg.hash != tc.hash)
{
SALTICIDAE_LOG_ERROR("corrupted I/O!");
@@ -119,8 +130,13 @@ void install_proto(EventContext &ec, MyNet &net,
{
send_rand(tc.state, conn);
tc.state = -1;
- tc.timer = Event(ec, -1, [&net, conn](int, int) {
+ tc.timer = Event(ec, -1, [&, conn](int, int) {
+ tc.ncompleted++;
net.terminate(conn);
+ std::string s;
+ for (const auto &p: app.tc)
+ s += salticidae::stringprintf(" %d(%d)", p.first.port, p.second.ncompleted);
+ SALTICIDAE_LOG_INFO("%d completed:%s", app.addr.port, s.c_str());
});
double t = salticidae::gen_rand_timeout(10);
tc.timer.add_with_timeout(t, 0);
@@ -133,14 +149,6 @@ void install_proto(EventContext &ec, MyNet &net,
});
}
-struct AppContext {
- NetAddr addr;
- EventContext ec;
- BoxObj<MyNet> net;
- BoxObj<ThreadCall> tcall;
- std::unordered_map<NetAddr, TestContext> tc;
-};
-
int main(int argc, char **argv) {
Config config;
auto opt_no_msg = Config::OptValFlag::create(false);
@@ -148,7 +156,7 @@ int main(int argc, char **argv) {
auto opt_seg_buff_size = Config::OptValInt::create(4096);
auto opt_nworker = Config::OptValInt::create(2);
auto opt_conn_timeout = Config::OptValDouble::create(5);
- auto opt_ping_peroid = Config::OptValDouble::create(5);
+ auto opt_ping_peroid = Config::OptValDouble::create(2);
auto opt_help = Config::OptValFlag::create(false);
config.add_opt("no-msg", opt_no_msg, Config::SWITCH_ON);
config.add_opt("npeers", opt_npeers, Config::SET_VAL);
@@ -181,7 +189,7 @@ int main(int argc, char **argv) {
.ping_period(opt_ping_peroid->get()));
a.tcall = new ThreadCall(a.ec);
if (!opt_no_msg->get())
- install_proto(a.ec, *a.net, a.tc, seg_buff_size);
+ install_proto(a, seg_buff_size);
a.net->start();
}
@@ -197,7 +205,7 @@ int main(int argc, char **argv) {
for (auto &a: apps)
{
auto &tc = a.tcall;
- tc->async_call([ec=tc->get_ec()](ThreadCall::Handle &) {
+ tc->async_call([ec=a.ec](ThreadCall::Handle &) {
ec.stop();
});
}