From d164d2d4535468bf695ff6c0f277486e6016e586 Mon Sep 17 00:00:00 2001 From: Determinant Date: Thu, 13 Jun 2019 15:41:55 -0400 Subject: change test_p2p example; fix bugs --- test/test_p2p.cpp | 177 +++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 127 insertions(+), 50 deletions(-) (limited to 'test') diff --git a/test/test_p2p.cpp b/test/test_p2p.cpp index e884930..1510f8e 100644 --- a/test/test_p2p.cpp +++ b/test/test_p2p.cpp @@ -23,8 +23,10 @@ */ #include +#include #include #include +#include #include "salticidae/msg.h" #include "salticidae/event.h" @@ -33,69 +35,144 @@ using salticidae::NetAddr; using salticidae::DataStream; -using salticidae::MsgNetwork; +using salticidae::PeerNetwork; using salticidae::htole; using salticidae::letoh; +using salticidae::EventContext; +using salticidae::ThreadCall; using std::placeholders::_1; using std::placeholders::_2; -/** Hello Message. */ -struct MsgHello { - static const uint8_t opcode = 0x0; - DataStream serialized; - std::string name; - std::string text; - /** Defines how to serialize the msg. */ - MsgHello(const std::string &name, - const std::string &text) { - serialized << htole((uint32_t)name.length()); - serialized << name << text; - } - /** Defines how to parse the msg. */ - MsgHello(DataStream &&s) { - uint32_t len; - s >> len; - len = letoh(len); - name = std::string((const char *)s.get_data_inplace(len), len); - len = s.size(); - text = std::string((const char *)s.get_data_inplace(len), len); - } -}; +struct Net { + uint64_t id; + EventContext ec; + ThreadCall tc; + std::thread th; + PeerNetwork *net; -/** Acknowledgement Message. */ -struct MsgAck { - static const uint8_t opcode = 0x1; - DataStream serialized; - MsgAck() {} - MsgAck(DataStream &&s) {} -}; + Net(uint64_t id, const std::string &listen_addr): id(id), tc(ec) { + net = new salticidae::PeerNetwork( + ec, + salticidae::PeerNetwork::Config().conn_timeout(5).ping_period(2)); + net->reg_error_handler([this](const std::exception &err, bool fatal) { + SALTICIDAE_LOG_WARN("net %lu: captured %s error: %s", this->id, fatal ? "fatal" : "recoverable", err.what()); + }); + th = std::thread([=](){ + try { + net->start(); + net->listen(NetAddr(listen_addr)); + SALTICIDAE_LOG_INFO("net %lu: listen to %s\n", id, listen_addr.c_str()); + ec.dispatch(); + } catch (std::exception &err) { + SALTICIDAE_LOG_WARN("net %lu: got error during a sync call: %s", id, err.what()); + } + SALTICIDAE_LOG_INFO("net %lu: main loop ended\n", id); + }); + } -const uint8_t MsgHello::opcode; -const uint8_t MsgAck::opcode; + void add_peer(const std::string &listen_addr) { + try { + net->add_peer(NetAddr(listen_addr)); + } catch (std::exception &err) { + fprintf(stderr, "net %lu: got error during a sync call: %s\n", id, err.what()); + } + } -using MyNet = salticidae::PeerNetwork; + void stop_join() { + tc.async_call([ec=this->ec](ThreadCall::Handle &) { ec.stop(); }); + th.join(); + } -std::vector addrs = { - NetAddr("127.0.0.1:12345"), - NetAddr("127.0.0.1:12346"), - NetAddr("127.0.0.1:12347"), - NetAddr("127.0.0.1:12348") + ~Net() { delete net; } }; -salticidae::EventContext ec; -MyNet net(ec, MyNet::Config().conn_timeout(5).ping_period(2)); +std::unordered_map nets; +std::unordered_map > cmd_map; + +int read_int(char *buff) { + scanf("%64s", buff); + try { + int t = std::stoi(buff); + if (t < 0) throw std::invalid_argument("negative"); + return t; + } catch (std::invalid_argument) { + fprintf(stderr, "expect a non-negative integer\n"); + return -1; + } +} int main(int argc, char **argv) { int i; - net.start(); - net.listen(addrs[i = atoi(argv[1])]); - for (int j = 0; j < addrs.size(); j++) - if (i != j) net.add_peer(addrs[j]); - auto shutdown = [&](int) {ec.stop();}; - salticidae::SigEvent ev_sigint(ec, shutdown); - salticidae::SigEvent ev_sigterm(ec, shutdown); - ev_sigint.add(SIGINT); - ev_sigterm.add(SIGTERM); - ec.dispatch(); + fprintf(stderr, "p2p network library playground (type h for help)\n"); + fprintf(stderr, "================================================\n"); + + auto cmd_exit = [](char *) { + for (auto &p: nets) + p.second->stop_join(); + exit(0); + }; + + auto cmd_net = [](char *buff) { + int id = read_int(buff); + if (id < 0) return; + if (nets.count(id)) + { + fprintf(stderr, "net id already exists"); + return; + } + scanf("%64s", buff); + nets.insert(std::make_pair(id, new Net(id, buff))); + }; + + auto cmd_ls = [](char *) { + for (auto &p: nets) + fprintf(stderr, "%d\n", p.first); + }; + + auto cmd_rm = [](char *buff) { + int id = read_int(buff); + if (id < 0) return; + auto it = nets.find(id); + if (it == nets.end()) + { + fprintf(stderr, "net id does not exist\n"); + return; + } + it->second->stop_join(); + delete it->second; + nets.erase(it); + }; + + auto cmd_addpeer = [](char *buff) { + int id = read_int(buff); + if (id < 0) return; + auto it = nets.find(id); + if (it == nets.end()) + { + fprintf(stderr, "net id does not exist\n"); + return; + } + scanf("%64s", buff); + it->second->add_peer(buff); + }; + + cmd_map.insert(std::make_pair("exit", cmd_exit)); + cmd_map.insert(std::make_pair("net", cmd_net)); + cmd_map.insert(std::make_pair("ls", cmd_ls)); + cmd_map.insert(std::make_pair("rm", cmd_rm)); + cmd_map.insert(std::make_pair("addpeer", cmd_addpeer)); + + for (;;) + { + fprintf(stderr, "> "); + char buff[128]; + if (scanf("%64s", buff) == EOF) break; + auto it = cmd_map.find(buff); + if (it == cmd_map.end()) + fprintf(stderr, "invalid comand \"%s\"\n", buff); + else + (it->second)(buff); + } + return 0; } -- cgit v1.2.3-70-g09d2