aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/salticidae/conn.h13
-rw-r--r--include/salticidae/event.h2
-rw-r--r--include/salticidae/network.h10
-rw-r--r--src/conn.cpp5
-rw-r--r--test/test_p2p.cpp84
5 files changed, 29 insertions, 85 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 73b3022..480809f 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -123,12 +123,11 @@ class ConnPool {
protected:
/** Close the IO and clear all on-going or planned events. */
virtual void stop() {
- if (fd == -1) return;
+ if (!self_ref) return;
ev_connect.clear();
ev_socket.clear();
send_buffer.get_queue().unreg_handler();
::close(fd);
- fd = -1;
self_ref = nullptr; /* remove the self-cycle */
}
@@ -189,7 +188,7 @@ class ConnPool {
.get_queue()
.reg_handler(this->ec, [conn, client_fd]
(MPSCWriteBuffer::queue_t &) {
- if (conn->ready_send && conn->fd != -1)
+ if (conn->ready_send && conn->self_ref)
{
conn->ev_socket.del();
conn->ev_socket.add(Event::READ | Event::WRITE);
@@ -340,9 +339,13 @@ class ConnPool {
void terminate(const conn_t &conn, bool blocking = true) {
int fd = conn->fd;
- conn->worker->get_tcall()->call([conn](ThreadCall::Handle &) {
+ auto worker = conn->worker;
+ if (worker)
+ worker->get_tcall()->call([conn](ThreadCall::Handle &) {
+ conn->stop();
+ }, blocking);
+ else
conn->stop();
- }, blocking);
remove_conn(fd);
}
};
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index 616f598..3fd11b6 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -85,7 +85,7 @@ class Event {
static inline void fd_then(uv_poll_t *h, int status, int events) {
if (status != 0)
{
- SALTICIDAE_LOG_WARN("%s", uv_strerror(status));
+ //SALTICIDAE_LOG_WARN("%s", uv_strerror(status));
return;
}
auto event = static_cast<Event *>(h->data);
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index a63976b..60d8f20 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -405,7 +405,7 @@ void MsgNetwork<OpcodeType>::Conn::on_read() {
ConnPool::Conn::on_read();
auto &recv_buffer = this->recv_buffer;
auto mn = get_net();
- while (fd != -1)
+ while (self_ref)
{
if (msg_state == Conn::HEADER)
{
@@ -472,11 +472,6 @@ void PeerNetwork<O, _, __>::Conn::on_setup() {
SALTICIDAE_LOG_INFO("peer ping-pong timeout");
conn->terminate();
});
- if (this->get_mode() == Conn::ConnMode::ACTIVE)
- {
- peer_id = this->get_addr();
- if (pn->id_mode == IP_BASED) peer_id.port = 0;
- }
/* the initial ping-pong to set up the connection */
tcall_reset_timeout(worker, conn, pn->conn_timeout);
pn->send_msg(MsgPing(pn->listen_port), *conn);
@@ -584,6 +579,9 @@ void PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) {
auto conn = static_pointer_cast<Conn>(MsgNet::_connect(addr));
assert(p->conn == nullptr);
p->conn = conn;
+ conn->peer_id = addr;
+ if (id_mode == IP_BASED)
+ conn->peer_id.port = 0;
}
/* end: functions invoked by the dispatcher */
diff --git a/src/conn.cpp b/src/conn.cpp
index da8086c..f170d0a 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -173,7 +173,7 @@ void ConnPool::Conn::conn_server(int fd, int events) {
{
if (events & Event::TIMEOUT)
SALTICIDAE_LOG_INFO("%s connect timeout", std::string(*this).c_str());
- stop();
+ cpool->terminate(conn);
return;
}
}
@@ -237,7 +237,7 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) {
sizeof(struct sockaddr_in)) < 0 && errno != EINPROGRESS)
{
SALTICIDAE_LOG_INFO("cannot connect to %s", std::string(addr).c_str());
- conn->stop();
+ terminate(conn);
}
else
{
@@ -256,6 +256,7 @@ void ConnPool::remove_conn(int fd) {
/* temporarily pin the conn before it dies */
auto conn = it->second;
//assert(conn->fd == fd);
+ conn->fd = -1;
pool.erase(it);
/* inform the upper layer the connection will be destroyed */
conn->on_teardown();
diff --git a/test/test_p2p.cpp b/test/test_p2p.cpp
index 558be5c..f902afa 100644
--- a/test/test_p2p.cpp
+++ b/test/test_p2p.cpp
@@ -73,90 +73,32 @@ struct MsgAck {
const uint8_t MsgHello::opcode;
const uint8_t MsgAck::opcode;
-using PeerNetworkByteOp = salticidae::PeerNetwork<uint8_t>;
+using MyNet = salticidae::PeerNetwork<uint8_t>;
-struct MyNet: public PeerNetworkByteOp {
- const std::string name;
- const NetAddr peer;
-
- MyNet(const salticidae::EventContext &ec,
- const std::string name,
- const NetAddr &peer):
- PeerNetwork<uint8_t>(ec, 10, 2, 2, 4096, 2, 10),
- name(name),
- peer(peer) {
- /* message handler could be a bound method */
- reg_handler(salticidae::generic_bind(
- &MyNet::on_receive_hello, this, _1, _2));
-
- reg_conn_handler([this](ConnPool::Conn &conn, bool connected) {
- if (connected)
- {
- if (conn.get_mode() == ConnPool::Conn::ACTIVE)
- {
- printf("[%s] Connected, sending hello.\n",
- this->name.c_str());
- /* send the first message through this connection */
- MsgNet::send_msg(MsgHello(this->name, "Hello there!"),
- static_cast<MsgNet::Conn &>(conn));
- }
- else
- printf("[%s] Accepted, waiting for greetings.\n",
- this->name.c_str());
- }
- else
- {
- printf("[%s] Disconnected, retrying.\n", this->name.c_str());
- }
- });
- }
-
- void on_receive_hello(MsgHello &&msg, MyNet::Conn &conn) {
- printf("[%s] %s says %s\n",
- name.c_str(),
- msg.name.c_str(), msg.text.c_str());
- /* send acknowledgement */
- MsgNet::send_msg(MsgAck(), conn);
- }
+NetAddr addrs[] = {
+ NetAddr("127.0.0.1:12345"),
+ NetAddr("127.0.0.1:12346"),
+ NetAddr("127.0.0.1:12347")
};
-
-void on_receive_ack(MsgAck &&msg, MyNet::Conn &conn) {
- auto net = static_cast<MyNet *>(conn.get_net());
- printf("[%s] the peer knows\n", net->name.c_str());
-}
-
-salticidae::EventContext ec;
-NetAddr alice_addr("127.0.0.1:12345");
-NetAddr bob_addr("127.0.0.1:12346");
-
void signal_handler(int) {
throw salticidae::SalticidaeError("got termination signal");
}
-int main() {
+int main(int argc, char **argv) {
signal(SIGTERM, signal_handler);
signal(SIGINT, signal_handler);
+ salticidae::EventContext ec;
/* test two nodes */
- MyNet alice(ec, "Alice", bob_addr);
- MyNet bob(ec, "Bob", alice_addr);
-
- /* message handler could be a normal function */
- alice.reg_handler(on_receive_ack);
- bob.reg_handler(on_receive_ack);
+ MyNet net(ec, 10, 2, 2, 4096, 3, 5);
try {
- alice.start();
- bob.start();
-
- alice.listen(alice_addr);
- bob.listen(bob_addr);
-
- /* first attempt */
- alice.add_peer(bob_addr);
- bob.add_peer(alice_addr);
-
+ int i;
+ net.start();
+ net.listen(addrs[i = atoi(argv[1])]);
+ for (int j = 0; j < 3; j++)
+ if (i != j) net.add_peer(addrs[j]);
ec.dispatch();
} catch (salticidae::SalticidaeError &e) {}
return 0;