diff options
-rw-r--r-- | include/salticidae/conn.h | 13 | ||||
-rw-r--r-- | include/salticidae/event.h | 2 | ||||
-rw-r--r-- | include/salticidae/network.h | 10 | ||||
-rw-r--r-- | src/conn.cpp | 5 | ||||
-rw-r--r-- | test/test_p2p.cpp | 84 |
5 files changed, 29 insertions, 85 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 73b3022..480809f 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -123,12 +123,11 @@ class ConnPool { protected: /** Close the IO and clear all on-going or planned events. */ virtual void stop() { - if (fd == -1) return; + if (!self_ref) return; ev_connect.clear(); ev_socket.clear(); send_buffer.get_queue().unreg_handler(); ::close(fd); - fd = -1; self_ref = nullptr; /* remove the self-cycle */ } @@ -189,7 +188,7 @@ class ConnPool { .get_queue() .reg_handler(this->ec, [conn, client_fd] (MPSCWriteBuffer::queue_t &) { - if (conn->ready_send && conn->fd != -1) + if (conn->ready_send && conn->self_ref) { conn->ev_socket.del(); conn->ev_socket.add(Event::READ | Event::WRITE); @@ -340,9 +339,13 @@ class ConnPool { void terminate(const conn_t &conn, bool blocking = true) { int fd = conn->fd; - conn->worker->get_tcall()->call([conn](ThreadCall::Handle &) { + auto worker = conn->worker; + if (worker) + worker->get_tcall()->call([conn](ThreadCall::Handle &) { + conn->stop(); + }, blocking); + else conn->stop(); - }, blocking); remove_conn(fd); } }; diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 616f598..3fd11b6 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -85,7 +85,7 @@ class Event { static inline void fd_then(uv_poll_t *h, int status, int events) { if (status != 0) { - SALTICIDAE_LOG_WARN("%s", uv_strerror(status)); + //SALTICIDAE_LOG_WARN("%s", uv_strerror(status)); return; } auto event = static_cast<Event *>(h->data); diff --git a/include/salticidae/network.h b/include/salticidae/network.h index a63976b..60d8f20 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -405,7 +405,7 @@ void MsgNetwork<OpcodeType>::Conn::on_read() { ConnPool::Conn::on_read(); auto &recv_buffer = this->recv_buffer; auto mn = get_net(); - while (fd != -1) + while (self_ref) { if (msg_state == Conn::HEADER) { @@ -472,11 +472,6 @@ void PeerNetwork<O, _, __>::Conn::on_setup() { SALTICIDAE_LOG_INFO("peer ping-pong timeout"); conn->terminate(); }); - if (this->get_mode() == Conn::ConnMode::ACTIVE) - { - peer_id = this->get_addr(); - if (pn->id_mode == IP_BASED) peer_id.port = 0; - } /* the initial ping-pong to set up the connection */ tcall_reset_timeout(worker, conn, pn->conn_timeout); pn->send_msg(MsgPing(pn->listen_port), *conn); @@ -584,6 +579,9 @@ void PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) { auto conn = static_pointer_cast<Conn>(MsgNet::_connect(addr)); assert(p->conn == nullptr); p->conn = conn; + conn->peer_id = addr; + if (id_mode == IP_BASED) + conn->peer_id.port = 0; } /* end: functions invoked by the dispatcher */ diff --git a/src/conn.cpp b/src/conn.cpp index da8086c..f170d0a 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -173,7 +173,7 @@ void ConnPool::Conn::conn_server(int fd, int events) { { if (events & Event::TIMEOUT) SALTICIDAE_LOG_INFO("%s connect timeout", std::string(*this).c_str()); - stop(); + cpool->terminate(conn); return; } } @@ -237,7 +237,7 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) { sizeof(struct sockaddr_in)) < 0 && errno != EINPROGRESS) { SALTICIDAE_LOG_INFO("cannot connect to %s", std::string(addr).c_str()); - conn->stop(); + terminate(conn); } else { @@ -256,6 +256,7 @@ void ConnPool::remove_conn(int fd) { /* temporarily pin the conn before it dies */ auto conn = it->second; //assert(conn->fd == fd); + conn->fd = -1; pool.erase(it); /* inform the upper layer the connection will be destroyed */ conn->on_teardown(); diff --git a/test/test_p2p.cpp b/test/test_p2p.cpp index 558be5c..f902afa 100644 --- a/test/test_p2p.cpp +++ b/test/test_p2p.cpp @@ -73,90 +73,32 @@ struct MsgAck { const uint8_t MsgHello::opcode; const uint8_t MsgAck::opcode; -using PeerNetworkByteOp = salticidae::PeerNetwork<uint8_t>; +using MyNet = salticidae::PeerNetwork<uint8_t>; -struct MyNet: public PeerNetworkByteOp { - const std::string name; - const NetAddr peer; - - MyNet(const salticidae::EventContext &ec, - const std::string name, - const NetAddr &peer): - PeerNetwork<uint8_t>(ec, 10, 2, 2, 4096, 2, 10), - name(name), - peer(peer) { - /* message handler could be a bound method */ - reg_handler(salticidae::generic_bind( - &MyNet::on_receive_hello, this, _1, _2)); - - reg_conn_handler([this](ConnPool::Conn &conn, bool connected) { - if (connected) - { - if (conn.get_mode() == ConnPool::Conn::ACTIVE) - { - printf("[%s] Connected, sending hello.\n", - this->name.c_str()); - /* send the first message through this connection */ - MsgNet::send_msg(MsgHello(this->name, "Hello there!"), - static_cast<MsgNet::Conn &>(conn)); - } - else - printf("[%s] Accepted, waiting for greetings.\n", - this->name.c_str()); - } - else - { - printf("[%s] Disconnected, retrying.\n", this->name.c_str()); - } - }); - } - - void on_receive_hello(MsgHello &&msg, MyNet::Conn &conn) { - printf("[%s] %s says %s\n", - name.c_str(), - msg.name.c_str(), msg.text.c_str()); - /* send acknowledgement */ - MsgNet::send_msg(MsgAck(), conn); - } +NetAddr addrs[] = { + NetAddr("127.0.0.1:12345"), + NetAddr("127.0.0.1:12346"), + NetAddr("127.0.0.1:12347") }; - -void on_receive_ack(MsgAck &&msg, MyNet::Conn &conn) { - auto net = static_cast<MyNet *>(conn.get_net()); - printf("[%s] the peer knows\n", net->name.c_str()); -} - -salticidae::EventContext ec; -NetAddr alice_addr("127.0.0.1:12345"); -NetAddr bob_addr("127.0.0.1:12346"); - void signal_handler(int) { throw salticidae::SalticidaeError("got termination signal"); } -int main() { +int main(int argc, char **argv) { signal(SIGTERM, signal_handler); signal(SIGINT, signal_handler); + salticidae::EventContext ec; /* test two nodes */ - MyNet alice(ec, "Alice", bob_addr); - MyNet bob(ec, "Bob", alice_addr); - - /* message handler could be a normal function */ - alice.reg_handler(on_receive_ack); - bob.reg_handler(on_receive_ack); + MyNet net(ec, 10, 2, 2, 4096, 3, 5); try { - alice.start(); - bob.start(); - - alice.listen(alice_addr); - bob.listen(bob_addr); - - /* first attempt */ - alice.add_peer(bob_addr); - bob.add_peer(alice_addr); - + int i; + net.start(); + net.listen(addrs[i = atoi(argv[1])]); + for (int j = 0; j < 3; j++) + if (i != j) net.add_peer(addrs[j]); ec.dispatch(); } catch (salticidae::SalticidaeError &e) {} return 0; |