From d15ec0b93def57e5f3832f429a3b948e86a62887 Mon Sep 17 00:00:00 2001 From: Determinant Date: Thu, 27 Jun 2019 20:33:06 -0400 Subject: finish p2p & TLS integration and testing --- test/.gitignore | 1 + test/CMakeLists.txt | 3 + test/test_p2p.cpp | 14 ++- test/test_p2p_stress.cpp | 26 ++-- test/test_p2p_tls.cpp | 317 +++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 345 insertions(+), 16 deletions(-) create mode 100644 test/test_p2p_tls.cpp (limited to 'test') diff --git a/test/.gitignore b/test/.gitignore index 7cebf5d..f50f029 100644 --- a/test/.gitignore +++ b/test/.gitignore @@ -2,6 +2,7 @@ test_msg test_bits test_msgnet test_p2p +test_p2p_tls test_p2p_stress test_queue bench_network diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d112b7a..0a1d3f1 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -35,6 +35,9 @@ target_link_libraries(test_msgnet_tls salticidae_static) add_executable(test_p2p test_p2p.cpp) target_link_libraries(test_p2p salticidae_static) +add_executable(test_p2p_tls test_p2p_tls.cpp) +target_link_libraries(test_p2p_tls salticidae_static) + add_executable(test_p2p_stress test_p2p_stress.cpp) target_link_libraries(test_p2p_stress salticidae_static) diff --git a/test/test_p2p.cpp b/test/test_p2p.cpp index 14304eb..7f80f85 100644 --- a/test/test_p2p.cpp +++ b/test/test_p2p.cpp @@ -87,8 +87,14 @@ struct Net { this->id, fatal ? "fatal" : "recoverable", err.what()); } }); - net->reg_unknown_peer_handler([this](const NetAddr &addr) { - fprintf(stdout, "net %lu: unknown peer attempts to connnect %s\n", this->id, std::string(addr).c_str()); + net->reg_peer_handler([this](const PeerNetwork::conn_t &conn, bool connected) { + fprintf(stdout, "net %lu: %s peer %s\n", this->id, + connected ? "connected to" : "disconnected from", + std::string(conn->get_peer_addr()).c_str()); + }); + net->reg_unknown_peer_handler([this](const NetAddr &claimed_addr) { + fprintf(stdout, "net %lu: unknown peer %s attempts to connnect\n", + this->id, std::string(claimed_addr).c_str()); }); th = std::thread([=](){ try { @@ -258,8 +264,8 @@ int main(int argc, char **argv) { fprintf(stdout, "add -- start a node (create a PeerNetwork instance)\n" "addpeer -- add a peer to a given node\n" - "rmpeer -- add a peer to a given node\n" - "rm -- remove a node (destroy a PeerNetwork instance)\n" + "delpeer -- add a peer to a given node\n" + "del -- remove a node (destroy a PeerNetwork instance)\n" "msg -- send a text message to a node\n" "ls -- list all node ids\n" "exit -- quit the program\n" diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp index 1cb2ca3..1eb4a0d 100644 --- a/test/test_p2p_stress.cpp +++ b/test/test_p2p_stress.cpp @@ -97,29 +97,29 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) { auto &ec = app.ec; auto &net = *app.net; auto send_rand = [&](int size, const MyNet::conn_t &conn) { - auto &tc = app.tc[conn->get_addr()]; + auto addr = conn->get_peer_addr(); + assert(!addr.is_null()); + auto &tc = app.tc[addr]; MsgRand msg(size); tc.hash = msg.serialized.get_hash(); net.send_msg(std::move(msg), conn); }; - net.reg_conn_handler([&, send_rand](const ConnPool::conn_t &conn, bool connected) { + net.reg_peer_handler([&, send_rand](const MyNet::conn_t &conn, bool connected) { if (connected) { - if (conn->get_mode() == ConnPool::Conn::ACTIVE) - { - auto &tc = app.tc[conn->get_addr()]; - tc.state = 1; - SALTICIDAE_LOG_INFO("increasing phase"); - send_rand(tc.state, static_pointer_cast(conn)); - } + auto addr = conn->get_peer_addr(); + assert(!addr.is_null()); + auto &tc = app.tc[addr]; + tc.state = 1; + SALTICIDAE_LOG_INFO("increasing phase"); + send_rand(tc.state, conn); } - return true; }); net.reg_error_handler([ec](const std::exception_ptr _err, bool fatal) { try { std::rethrow_exception(_err); } catch (const std::exception & err) { - SALTICIDAE_LOG_WARN("main thread captured %s error: %s", + SALTICIDAE_LOG_WARN("captured %s error: %s", fatal ? "fatal" : "recoverable", err.what()); } }); @@ -128,7 +128,9 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) { net.send_msg(MsgAck(hash), conn); }); net.reg_handler([&, send_rand](MsgAck &&msg, const MyNet::conn_t &conn) { - auto &tc = app.tc[conn->get_addr()]; + auto addr = conn->get_peer_addr(); + assert(!addr.is_null()); + auto &tc = app.tc[addr]; if (msg.hash != tc.hash) { SALTICIDAE_LOG_ERROR("corrupted I/O!"); diff --git a/test/test_p2p_tls.cpp b/test/test_p2p_tls.cpp new file mode 100644 index 0000000..9fe0aec --- /dev/null +++ b/test/test_p2p_tls.cpp @@ -0,0 +1,317 @@ +/** + * Copyright (c) 2019 Ava Labs, Inc. + * + * Author: Ted Yin + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is furnished to do + * so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include +#include +#include +#include +#include + +#include "salticidae/msg.h" +#include "salticidae/event.h" +#include "salticidae/network.h" +#include "salticidae/stream.h" + +using salticidae::NetAddr; +using salticidae::DataStream; +using salticidae::htole; +using salticidae::letoh; +using salticidae::EventContext; +using salticidae::ThreadCall; +using salticidae::PKey; +using std::placeholders::_1; +using std::placeholders::_2; + +using PeerNetwork = salticidae::PeerNetwork; + +struct MsgText { + static const uint8_t opcode = 0x0; + DataStream serialized; + uint64_t id; + std::string text; + + MsgText(uint64_t id, const std::string &text) { + serialized << salticidae::htole(id) << salticidae::htole((uint32_t)text.length()) << text; + } + + MsgText(DataStream &&s) { + uint32_t len; + s >> id; + id = salticidae::letoh(id); + s >> len; + len = salticidae::letoh(len); + text = std::string((const char *)s.get_data_inplace(len), len); + } +}; + +const uint8_t MsgText::opcode; + +struct Net { + uint64_t id; + EventContext ec; + ThreadCall tc; + std::thread th; + PeerNetwork *net; + const std::string listen_addr; + + Net(uint64_t id, uint16_t port): id(id), tc(ec), listen_addr("127.0.0.1:"+ std::to_string(port)) { + auto tls_key = new PKey(PKey::create_privkey_rsa(2048)); + auto tls_cert = new salticidae::X509(salticidae::X509::create_self_signed_from_pubkey(*tls_key)); + tls_key->save_privkey_to_file(std::to_string(port) + "_pkey.pem"); + tls_cert->save_to_file(std::to_string(port) + ".pem"); + net = new PeerNetwork(ec, PeerNetwork::Config(salticidae::ConnPool::Config() + .enable_tls(true) + .tls_key(tls_key) + .tls_cert(tls_cert) + ).conn_timeout(5) + .ping_period(2) + .id_mode(PeerNetwork::IdentityMode::ADDR_BASED)); + net->reg_handler([this](const MsgText &msg, const PeerNetwork::conn_t &) { + fprintf(stdout, "net %lu: peer %lu says %s\n", this->id, msg.id, msg.text.c_str()); + }); + net->reg_conn_handler([this](const salticidae::ConnPool::conn_t &conn, bool connected) { + if (connected) + { + fprintf(stdout, "net %lu: peer's cert is %s\n", this->id, + salticidae::get_hash(conn->get_peer_cert()->get_der()).to_hex().c_str()); + } + return true; + }); + net->reg_error_handler([this](const std::exception_ptr _err, bool fatal) { + try { + std::rethrow_exception(_err); + } catch (const std::exception &err) { + fprintf(stdout, "net %lu: captured %s error during an async call: %s\n", + this->id, fatal ? "fatal" : "recoverable", err.what()); + } + }); + net->reg_peer_handler([this](const PeerNetwork::conn_t &conn, bool connected) { + fprintf(stdout, "net %lu: %s peer %s\n", this->id, + connected ? "connected to" : "disconnected from", + std::string(conn->get_peer_addr()).c_str()); + }); + net->reg_unknown_peer_handler([this](const NetAddr &claimed_addr) { + fprintf(stdout, "net %lu: unknown peer %s attempts to connnect\n", + this->id, std::string(claimed_addr).c_str()); + }); + th = std::thread([=](){ + try { + net->start(); + net->listen(NetAddr(listen_addr)); + fprintf(stdout, "net %lu: listen to %s\n", id, listen_addr.c_str()); + ec.dispatch(); + } catch (std::exception &err) { + fprintf(stdout, "net %lu: got error during a sync call: %s\n", id, err.what()); + } + fprintf(stdout, "net %lu: main loop ended\n", id); + }); + } + + void add_peer(const std::string &listen_addr) { + try { + net->add_peer(NetAddr(listen_addr)); + } catch (std::exception &err) { + fprintf(stdout, "net %lu: got error during a sync call: %s\n", id, err.what()); + } + } + + void del_peer(const std::string &listen_addr) { + try { + net->del_peer(NetAddr(listen_addr)); + } catch (std::exception &err) { + fprintf(stdout, "net %lu: got error during a sync call: %s\n", id, err.what()); + } + } + + void stop_join() { + tc.async_call([ec=this->ec](ThreadCall::Handle &) { ec.stop(); }); + th.join(); + } + + ~Net() { delete net; } +}; + +std::unordered_map nets; +std::unordered_map > cmd_map; + +int read_int(char *buff) { + scanf("%64s", buff); + try { + int t = std::stoi(buff); + if (t < 0) throw std::invalid_argument("negative"); + return t; + } catch (std::invalid_argument) { + fprintf(stdout, "expect a non-negative integer\n"); + return -1; + } +} + +int main(int argc, char **argv) { + int i; + fprintf(stdout, "p2p network library playground (type help for more info)\n"); + fprintf(stdout, "========================================================\n"); + + auto cmd_exit = [](char *) { + for (auto &p: nets) + { + p.second->stop_join(); + delete p.second; + } + exit(0); + }; + + auto cmd_add = [](char *buff) { + int id = read_int(buff); + if (id < 0) return; + if (nets.count(id)) + { + fprintf(stdout, "net id already exists\n"); + return; + } + int port = read_int(buff); + if (port < 0) return; + if (port >= 65536) + { + fprintf(stdout, "port should be < 65536\n"); + return; + } + nets.insert(std::make_pair(id, new Net(id, port))); + }; + + auto cmd_ls = [](char *) { + for (auto &p: nets) + fprintf(stdout, "%d -> %s\n", p.first, p.second->listen_addr.c_str()); + }; + + auto cmd_del = [](char *buff) { + int id = read_int(buff); + if (id < 0) return; + auto it = nets.find(id); + if (it == nets.end()) + { + fprintf(stdout, "net id does not exist\n"); + return; + } + it->second->stop_join(); + delete it->second; + nets.erase(it); + }; + + auto cmd_addpeer = [](char *buff) { + int id = read_int(buff); + if (id < 0) return; + auto it = nets.find(id); + if (it == nets.end()) + { + fprintf(stdout, "net id does not exist\n"); + return; + } + int id2 = read_int(buff); + if (id2 < 0) return; + auto it2 = nets.find(id2); + if (it2 == nets.end()) + { + fprintf(stdout, "net id does not exist\n"); + return; + } + it->second->add_peer(it2->second->listen_addr); + }; + + auto cmd_delpeer = [](char *buff) { + int id = read_int(buff); + if (id < 0) return; + auto it = nets.find(id); + if (it == nets.end()) + { + fprintf(stdout, "net id does not exist\n"); + return; + } + int id2 = read_int(buff); + if (id2 < 0) return; + auto it2 = nets.find(id2); + if (it2 == nets.end()) + { + fprintf(stdout, "net id does not exist\n"); + return; + } + it->second->del_peer(it2->second->listen_addr); + }; + + auto cmd_msg = [](char *buff) { + int id = read_int(buff); + if (id < 0) return; + auto it = nets.find(id); + if (it == nets.end()) + { + fprintf(stdout, "net id does not exist\n"); + return; + } + int id2 = read_int(buff); + if (id2 < 0) return; + auto it2 = nets.find(id2); + if (it2 == nets.end()) + { + fprintf(stdout, "net id does not exist\n"); + return; + } + scanf("%64s", buff); + it->second->net->send_msg(MsgText(id, buff), it2->second->listen_addr); + }; + + auto cmd_help = [](char *) { + fprintf(stdout, + "add -- start a node (create a PeerNetwork instance)\n" + "addpeer -- add a peer to a given node\n" + "delpeer -- add a peer to a given node\n" + "del -- remove a node (destroy a PeerNetwork instance)\n" + "msg -- send a text message to a node\n" + "ls -- list all node ids\n" + "exit -- quit the program\n" + "help -- show this info\n" + ); + }; + + cmd_map.insert(std::make_pair("add", cmd_add)); + cmd_map.insert(std::make_pair("addpeer", cmd_addpeer)); + cmd_map.insert(std::make_pair("del", cmd_del)); + cmd_map.insert(std::make_pair("delpeer", cmd_delpeer)); + cmd_map.insert(std::make_pair("msg", cmd_msg)); + cmd_map.insert(std::make_pair("ls", cmd_ls)); + cmd_map.insert(std::make_pair("exit", cmd_exit)); + cmd_map.insert(std::make_pair("help", cmd_help)); + + for (;;) + { + fprintf(stdout, "> "); + char buff[128]; + if (scanf("%64s", buff) == EOF) break; + auto it = cmd_map.find(buff); + if (it == cmd_map.end()) + fprintf(stdout, "invalid comand \"%s\"\n", buff); + else + (it->second)(buff); + } + + return 0; +} -- cgit v1.2.3