aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <tederminant@gmail.com>2019-06-13 15:41:55 -0400
committerDeterminant <tederminant@gmail.com>2019-06-13 15:41:55 -0400
commitd164d2d4535468bf695ff6c0f277486e6016e586 (patch)
treeebc02013c56dd791a422120966a1147c4a4fd09e
parent4de5d8f054a6a34efe70f1e01297136e8a84c08b (diff)
change test_p2p example; fix bugs
-rw-r--r--include/salticidae/network.h8
-rw-r--r--test/test_p2p.cpp177
2 files changed, 134 insertions, 51 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index b119e78..14d270f 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -598,7 +598,13 @@ bool PeerNetwork<O, _, __>::check_new_conn(const conn_t &conn, uint16_t port) {
conn->peer_id.ip = conn->get_addr().ip;
conn->peer_id.port = port;
}
- auto p = id2peer.find(conn->peer_id)->second.get();
+ auto it = id2peer.find(conn->peer_id);
+ if (it == id2peer.end())
+ {
+ conn->disp_terminate();
+ return true;
+ }
+ auto p = it->second.get();
if (p->connected)
{
if (conn != p->conn)
diff --git a/test/test_p2p.cpp b/test/test_p2p.cpp
index e884930..1510f8e 100644
--- a/test/test_p2p.cpp
+++ b/test/test_p2p.cpp
@@ -23,8 +23,10 @@
*/
#include <cstdio>
+#include <cstdint>
#include <string>
#include <functional>
+#include <unordered_map>
#include "salticidae/msg.h"
#include "salticidae/event.h"
@@ -33,69 +35,144 @@
using salticidae::NetAddr;
using salticidae::DataStream;
-using salticidae::MsgNetwork;
+using salticidae::PeerNetwork;
using salticidae::htole;
using salticidae::letoh;
+using salticidae::EventContext;
+using salticidae::ThreadCall;
using std::placeholders::_1;
using std::placeholders::_2;
-/** Hello Message. */
-struct MsgHello {
- static const uint8_t opcode = 0x0;
- DataStream serialized;
- std::string name;
- std::string text;
- /** Defines how to serialize the msg. */
- MsgHello(const std::string &name,
- const std::string &text) {
- serialized << htole((uint32_t)name.length());
- serialized << name << text;
- }
- /** Defines how to parse the msg. */
- MsgHello(DataStream &&s) {
- uint32_t len;
- s >> len;
- len = letoh(len);
- name = std::string((const char *)s.get_data_inplace(len), len);
- len = s.size();
- text = std::string((const char *)s.get_data_inplace(len), len);
- }
-};
+struct Net {
+ uint64_t id;
+ EventContext ec;
+ ThreadCall tc;
+ std::thread th;
+ PeerNetwork<uint8_t> *net;
-/** Acknowledgement Message. */
-struct MsgAck {
- static const uint8_t opcode = 0x1;
- DataStream serialized;
- MsgAck() {}
- MsgAck(DataStream &&s) {}
-};
+ Net(uint64_t id, const std::string &listen_addr): id(id), tc(ec) {
+ net = new salticidae::PeerNetwork<uint8_t>(
+ ec,
+ salticidae::PeerNetwork<uint8_t>::Config().conn_timeout(5).ping_period(2));
+ net->reg_error_handler([this](const std::exception &err, bool fatal) {
+ SALTICIDAE_LOG_WARN("net %lu: captured %s error: %s", this->id, fatal ? "fatal" : "recoverable", err.what());
+ });
+ th = std::thread([=](){
+ try {
+ net->start();
+ net->listen(NetAddr(listen_addr));
+ SALTICIDAE_LOG_INFO("net %lu: listen to %s\n", id, listen_addr.c_str());
+ ec.dispatch();
+ } catch (std::exception &err) {
+ SALTICIDAE_LOG_WARN("net %lu: got error during a sync call: %s", id, err.what());
+ }
+ SALTICIDAE_LOG_INFO("net %lu: main loop ended\n", id);
+ });
+ }
-const uint8_t MsgHello::opcode;
-const uint8_t MsgAck::opcode;
+ void add_peer(const std::string &listen_addr) {
+ try {
+ net->add_peer(NetAddr(listen_addr));
+ } catch (std::exception &err) {
+ fprintf(stderr, "net %lu: got error during a sync call: %s\n", id, err.what());
+ }
+ }
-using MyNet = salticidae::PeerNetwork<uint8_t>;
+ void stop_join() {
+ tc.async_call([ec=this->ec](ThreadCall::Handle &) { ec.stop(); });
+ th.join();
+ }
-std::vector<NetAddr> addrs = {
- NetAddr("127.0.0.1:12345"),
- NetAddr("127.0.0.1:12346"),
- NetAddr("127.0.0.1:12347"),
- NetAddr("127.0.0.1:12348")
+ ~Net() { delete net; }
};
-salticidae::EventContext ec;
-MyNet net(ec, MyNet::Config().conn_timeout(5).ping_period(2));
+std::unordered_map<uint64_t, Net *> nets;
+std::unordered_map<std::string, std::function<void(char *)> > cmd_map;
+
+int read_int(char *buff) {
+ scanf("%64s", buff);
+ try {
+ int t = std::stoi(buff);
+ if (t < 0) throw std::invalid_argument("negative");
+ return t;
+ } catch (std::invalid_argument) {
+ fprintf(stderr, "expect a non-negative integer\n");
+ return -1;
+ }
+}
int main(int argc, char **argv) {
int i;
- net.start();
- net.listen(addrs[i = atoi(argv[1])]);
- for (int j = 0; j < addrs.size(); j++)
- if (i != j) net.add_peer(addrs[j]);
- auto shutdown = [&](int) {ec.stop();};
- salticidae::SigEvent ev_sigint(ec, shutdown);
- salticidae::SigEvent ev_sigterm(ec, shutdown);
- ev_sigint.add(SIGINT);
- ev_sigterm.add(SIGTERM);
- ec.dispatch();
+ fprintf(stderr, "p2p network library playground (type h for help)\n");
+ fprintf(stderr, "================================================\n");
+
+ auto cmd_exit = [](char *) {
+ for (auto &p: nets)
+ p.second->stop_join();
+ exit(0);
+ };
+
+ auto cmd_net = [](char *buff) {
+ int id = read_int(buff);
+ if (id < 0) return;
+ if (nets.count(id))
+ {
+ fprintf(stderr, "net id already exists");
+ return;
+ }
+ scanf("%64s", buff);
+ nets.insert(std::make_pair(id, new Net(id, buff)));
+ };
+
+ auto cmd_ls = [](char *) {
+ for (auto &p: nets)
+ fprintf(stderr, "%d\n", p.first);
+ };
+
+ auto cmd_rm = [](char *buff) {
+ int id = read_int(buff);
+ if (id < 0) return;
+ auto it = nets.find(id);
+ if (it == nets.end())
+ {
+ fprintf(stderr, "net id does not exist\n");
+ return;
+ }
+ it->second->stop_join();
+ delete it->second;
+ nets.erase(it);
+ };
+
+ auto cmd_addpeer = [](char *buff) {
+ int id = read_int(buff);
+ if (id < 0) return;
+ auto it = nets.find(id);
+ if (it == nets.end())
+ {
+ fprintf(stderr, "net id does not exist\n");
+ return;
+ }
+ scanf("%64s", buff);
+ it->second->add_peer(buff);
+ };
+
+ cmd_map.insert(std::make_pair("exit", cmd_exit));
+ cmd_map.insert(std::make_pair("net", cmd_net));
+ cmd_map.insert(std::make_pair("ls", cmd_ls));
+ cmd_map.insert(std::make_pair("rm", cmd_rm));
+ cmd_map.insert(std::make_pair("addpeer", cmd_addpeer));
+
+ for (;;)
+ {
+ fprintf(stderr, "> ");
+ char buff[128];
+ if (scanf("%64s", buff) == EOF) break;
+ auto it = cmd_map.find(buff);
+ if (it == cmd_map.end())
+ fprintf(stderr, "invalid comand \"%s\"\n", buff);
+ else
+ (it->second)(buff);
+ }
+
return 0;
}