aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2018-11-16 23:23:01 -0500
committerDeterminant <[email protected]>2018-11-16 23:23:01 -0500
commite12e559e85b0c2615fb550cc11560350517efca6 (patch)
tree7ce9f5952d918213e3883032a7bc851e1f32aeb7
parent20669e2b5e956babf888bca577e39a71d83bae79 (diff)
finish stress test coding
-rw-r--r--include/salticidae/conn.h22
-rw-r--r--include/salticidae/network.h8
-rw-r--r--src/conn.cpp2
-rw-r--r--test/test_p2p_stress.cpp84
4 files changed, 95 insertions, 21 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 3144a68..69402d3 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -77,10 +77,11 @@ class ConnPool {
protected:
size_t seg_buff_size;
conn_t self_ref;
+ std::mutex ref_mlock;
int fd;
Worker *worker;
ConnPool *cpool;
- ConnMode mode;
+ std::atomic<ConnMode> mode;
NetAddr addr;
MPSCWriteBuffer send_buffer;
@@ -110,7 +111,16 @@ class ConnPool {
}
/** Get the handle to itself. */
- conn_t self() { return self_ref; }
+ conn_t self() {
+ mutex_lg_t _(ref_mlock);
+ return self_ref;
+ }
+
+ void release_self() {
+ mutex_lg_t _(ref_mlock);
+ self_ref = nullptr;
+ }
+
operator std::string() const;
const NetAddr &get_addr() const { return addr; }
ConnMode get_mode() const { return mode; }
@@ -380,6 +390,14 @@ class ConnPool {
template<typename Func>
void reg_conn_handler(Func cb) { conn_cb = cb; }
+
+ void terminate(Conn &_conn) {
+ auto conn = _conn.self();
+ if (!conn) return;
+ disp_tcall->async_call([this, conn](ThreadCall::Handle &) {
+ conn->disp_terminate();
+ });
+ }
};
}
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 78449eb..4e182fc 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -622,11 +622,11 @@ void PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) {
/* begin: functions invoked by the user loop */
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, Conn &_conn) {
- if (_conn.get_mode() == ConnPool::Conn::DEAD) return;
auto conn = static_pointer_cast<Conn>(_conn.self());
- assert(conn);
+ if (!conn) return;
uint16_t port = msg.port;
this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &msg) {
+ if (conn->get_mode() == ConnPool::Conn::DEAD) return;
SALTICIDAE_LOG_INFO("ping from %s, port %u",
std::string(*conn).c_str(), ntohs(port));
if (check_new_conn(conn, port)) return;
@@ -637,11 +637,11 @@ void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, Conn &_conn) {
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::msg_pong(MsgPong &&msg, Conn &_conn) {
- if (_conn.get_mode() == ConnPool::Conn::DEAD) return;
auto conn = static_pointer_cast<Conn>(_conn.self());
- assert(conn);
+ if (!conn) return;
uint16_t port = msg.port;
this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &msg) {
+ if (conn->get_mode() == ConnPool::Conn::DEAD) return;
auto it = id2peer.find(conn->peer_id);
if (it == id2peer.end())
{
diff --git a/src/conn.cpp b/src/conn.cpp
index c29dee9..413a582 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -301,7 +301,7 @@ void ConnPool::remove_conn(int fd) {
/* inform the upper layer the connection will be destroyed */
conn->on_teardown();
update_conn(conn, false);
- conn->self_ref = nullptr; /* remove the self-cycle */
+ conn->release_self(); /* remove the self-cycle */
::close(conn->fd);
conn->fd = -1;
}
diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp
index 3546329..0f479e4 100644
--- a/test/test_p2p_stress.cpp
+++ b/test/test_p2p_stress.cpp
@@ -35,12 +35,17 @@
using salticidae::NetAddr;
using salticidae::DataStream;
using salticidae::MsgNetwork;
+using salticidae::ConnPool;
+using salticidae::Event;
using salticidae::htole;
using salticidae::letoh;
using salticidae::bytearray_t;
+using salticidae::uint256_t;
using std::placeholders::_1;
using std::placeholders::_2;
+const size_t SEG_BUFF_SIZE = 4096;
+
/** Hello Message. */
struct MsgRand {
static const uint8_t opcode = 0x0;
@@ -51,22 +56,25 @@ struct MsgRand {
bytearray_t bytes;
bytes.resize(size);
RAND_bytes(&bytes[0], size);
- serialized << htole((uint32_t)size) << std::move(bytes);
+ serialized << std::move(bytes);
}
/** Defines how to parse the msg. */
MsgRand(DataStream &&s) {
- uint32_t len;
- s >> len;
- bytes = std::move(s);
+ bytes = s;
}
};
/** Acknowledgement Message. */
struct MsgAck {
static const uint8_t opcode = 0x1;
+ uint256_t hash;
DataStream serialized;
- MsgAck() {}
- MsgAck(DataStream &&s) {}
+ MsgAck(const uint256_t &hash) {
+ serialized << hash;
+ }
+ MsgAck(DataStream &&s) {
+ s >> hash;
+ }
};
const uint8_t MsgRand::opcode;
@@ -74,12 +82,7 @@ const uint8_t MsgAck::opcode;
using MyNet = salticidae::PeerNetwork<uint8_t>;
-std::vector<NetAddr> addrs = {
- NetAddr("127.0.0.1:12345"),
- NetAddr("127.0.0.1:12346"),
- NetAddr("127.0.0.1:12347"),
- NetAddr("127.0.0.1:12348")
-};
+std::vector<NetAddr> addrs;
void signal_handler(int) {
throw salticidae::SalticidaeError("got termination signal");
@@ -89,8 +92,10 @@ int main(int argc, char **argv) {
signal(SIGTERM, signal_handler);
signal(SIGINT, signal_handler);
+ int n = argc > 1 ? atoi(argv[1]) : 5;
+ for (int i = 0; i < n; i++)
+ addrs.push_back(NetAddr("127.0.0.1:" + std::to_string(12345 + i)));
std::vector<std::thread> nodes;
-
for (auto &addr: addrs)
{
nodes.push_back(std::thread([addr]() {
@@ -98,7 +103,58 @@ int main(int argc, char **argv) {
/* test two nodes */
MyNet net(ec, MyNet::Config(
salticidae::ConnPool::Config()
- .nworker(2)).conn_timeout(5).ping_period(2));
+ .nworker(2).seg_buff_size(SEG_BUFF_SIZE))
+ .conn_timeout(5).ping_period(2));
+ int state;
+ uint256_t hash;
+ auto send_rand = [&net, &hash](int size, MyNet::Conn &conn) {
+ MsgRand msg(size);
+ hash = msg.serialized.get_hash();
+ net.send_msg(std::move(msg), conn);
+ };
+ Event timer;
+ net.reg_conn_handler([&state, &net, &send_rand](salticidae::ConnPool::Conn &conn, bool connected) {
+ if (connected)
+ {
+ if (conn.get_mode() == ConnPool::Conn::ACTIVE)
+ {
+ state = 1;
+ SALTICIDAE_LOG_INFO("increasing phase");
+ send_rand(state, static_cast<MyNet::Conn &>(conn));
+ }
+ }
+ });
+ net.reg_handler([&state, &net](MsgRand &&msg, MyNet::Conn &conn) {
+ uint256_t hash = salticidae::get_hash(msg.bytes);
+ net.send_msg(MsgAck(hash), conn);
+ });
+ net.reg_handler([&state, &net, &hash, &send_rand, &ec, &timer](MsgAck &&msg, MyNet::Conn &conn) {
+ if (msg.hash != hash)
+ {
+ SALTICIDAE_LOG_ERROR("corrupted I/O!");
+ exit(1);
+ }
+
+ if (state == SEG_BUFF_SIZE * 2)
+ {
+ send_rand(state, conn);
+ state = -1;
+ timer = Event(ec, -1, [&net, conn=conn.self()](int, int) {
+ net.terminate(*conn);
+ });
+ double t = salticidae::gen_rand_timeout(10);
+ timer.add_with_timeout(t, 0);
+ SALTICIDAE_LOG_INFO("rand-bomboard phase, ending in %.2f secs", t);
+ }
+ else if (state == -1)
+ {
+ send_rand(rand() % (SEG_BUFF_SIZE * 10), conn);
+ }
+ else
+ {
+ send_rand(++state, conn);
+ }
+ });
try {
net.start();
net.listen(addr);