From 76e6831168a25bc3aba8802f1586d0a0ab0dc4a2 Mon Sep 17 00:00:00 2001 From: Determinant Date: Thu, 13 Jun 2019 16:51:35 -0400 Subject: add del_peer --- include/salticidae/conn.h | 2 +- include/salticidae/network.h | 31 +++++++++++-- include/salticidae/util.h | 2 +- test/test_p2p.cpp | 106 +++++++++++++++++++++++++++++++++---------- 4 files changed, 111 insertions(+), 30 deletions(-) diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 546df5f..62825db 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -266,7 +266,7 @@ class ConnPool { protected: conn_t _connect(const NetAddr &addr); void _listen(NetAddr listen_addr); - void recoverable_error(const std::exception_ptr err) { + void recoverable_error(const std::exception_ptr err) const { user_tcall->async_call([this, err](ThreadCall::Handle &) { if (error_cb) { try { diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 14d270f..1095c89 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -426,6 +426,7 @@ class PeerNetwork: public MsgNetwork { ~PeerNetwork() { this->stop_workers(); } void add_peer(const NetAddr &paddr); + void del_peer(const NetAddr &paddr); bool has_peer(const NetAddr &paddr) const; const conn_t get_peer_conn(const NetAddr &paddr) const; using MsgNet::send_msg; @@ -725,6 +726,23 @@ void PeerNetwork::add_peer(const NetAddr &addr) { throw PeerNetworkError(SALTI_ERROR_PEER_ALREADY_EXISTS); id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->disp_ec))); start_active_conn(addr); + } catch (const PeerNetworkError &) { + this->recoverable_error(std::current_exception()); + } catch (...) { this->disp_error_cb(std::current_exception()); } + }); +} + +template +void PeerNetwork::del_peer(const NetAddr &addr) { + this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) { + try { + auto it = id2peer.find(addr); + if (it == id2peer.end()) + throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); + it->second->conn->disp_terminate(); + id2peer.erase(it); + } catch (const PeerNetworkError &) { + this->recoverable_error(std::current_exception()); } catch (...) { this->disp_error_cb(std::current_exception()); } }); } @@ -739,8 +757,10 @@ PeerNetwork::get_peer_conn(const NetAddr &paddr) const { try { auto it = id2peer.find(paddr); if (it == id2peer.end()) - throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXISTS); + throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); conn = it->second->conn; + } catch (const PeerNetworkError &) { + this->recoverable_error(std::current_exception()); } catch (...) { err = std::current_exception(); } @@ -771,8 +791,10 @@ void PeerNetwork::_send_msg(Msg &&msg, const NetAddr &paddr) { try { auto it = id2peer.find(paddr); if (it == id2peer.end()) - throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXISTS); + throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); this->_send_msg_dispatcher(msg, it->second->conn); + } catch (const PeerNetworkError &) { + this->recoverable_error(std::current_exception()); } catch (...) { this->recoverable_error(std::current_exception()); } }); } @@ -792,9 +814,11 @@ void PeerNetwork::_multicast_msg(Msg &&msg, const std::vector { auto it = id2peer.find(addr); if (it == id2peer.end()) - throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXISTS); + throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); this->_send_msg_dispatcher(msg, it->second->conn); } + } catch (const PeerNetworkError &) { + this->recoverable_error(std::current_exception()); } catch (...) { this->recoverable_error(std::current_exception()); } }); } @@ -925,6 +949,7 @@ msgnetwork_config_t *peernetwork_config_as_msgnetwork_config(peernetwork_config_ peernetwork_t *peernetwork_new(const eventcontext_t *ec, const peernetwork_config_t *config); void peernetwork_free(const peernetwork_t *self); void peernetwork_add_peer(peernetwork_t *self, const netaddr_t *paddr); +void peernetwork_del_peer(peernetwork_t *self, const netaddr_t *paddr); bool peernetwork_has_peer(const peernetwork_t *self, const netaddr_t *paddr); const peernetwork_conn_t *peernetwork_get_peer_conn(const peernetwork_t *self, const netaddr_t *paddr); msgnetwork_t *peernetwork_as_msgnetwork(peernetwork_t *self); diff --git a/include/salticidae/util.h b/include/salticidae/util.h index 0ddf5be..41f681c 100644 --- a/include/salticidae/util.h +++ b/include/salticidae/util.h @@ -60,7 +60,7 @@ enum SalticidaeErrorCode { SALTI_ERROR_LISTEN, SALTI_ERROR_CONNECT, SALTI_ERROR_PEER_ALREADY_EXISTS, - SALTI_ERROR_PEER_NOT_EXISTS, + SALTI_ERROR_PEER_NOT_EXIST, SALTI_ERROR_NETADDR_INVALID, SALTI_ERROR_OPTVAL_INVALID, SALTI_ERROR_OPTNAME_ALREADY_EXISTS, diff --git a/test/test_p2p.cpp b/test/test_p2p.cpp index 1510f8e..28bab10 100644 --- a/test/test_p2p.cpp +++ b/test/test_p2p.cpp @@ -49,24 +49,25 @@ struct Net { ThreadCall tc; std::thread th; PeerNetwork *net; + const std::string listen_addr; - Net(uint64_t id, const std::string &listen_addr): id(id), tc(ec) { + Net(uint64_t id, uint16_t port): id(id), tc(ec), listen_addr("127.0.0.1:"+ std::to_string(port)) { net = new salticidae::PeerNetwork( ec, salticidae::PeerNetwork::Config().conn_timeout(5).ping_period(2)); net->reg_error_handler([this](const std::exception &err, bool fatal) { - SALTICIDAE_LOG_WARN("net %lu: captured %s error: %s", this->id, fatal ? "fatal" : "recoverable", err.what()); + fprintf(stdout, "net %lu: captured %s error during an async call: %s\n", this->id, fatal ? "fatal" : "recoverable", err.what()); }); th = std::thread([=](){ try { net->start(); net->listen(NetAddr(listen_addr)); - SALTICIDAE_LOG_INFO("net %lu: listen to %s\n", id, listen_addr.c_str()); + fprintf(stdout, "net %lu: listen to %s\n", id, listen_addr.c_str()); ec.dispatch(); } catch (std::exception &err) { - SALTICIDAE_LOG_WARN("net %lu: got error during a sync call: %s", id, err.what()); + fprintf(stdout, "net %lu: got error during a sync call: %s\n", id, err.what()); } - SALTICIDAE_LOG_INFO("net %lu: main loop ended\n", id); + fprintf(stdout, "net %lu: main loop ended\n", id); }); } @@ -74,7 +75,15 @@ struct Net { try { net->add_peer(NetAddr(listen_addr)); } catch (std::exception &err) { - fprintf(stderr, "net %lu: got error during a sync call: %s\n", id, err.what()); + fprintf(stdout, "net %lu: got error during a sync call: %s\n", id, err.what()); + } + } + + void del_peer(const std::string &listen_addr) { + try { + net->del_peer(NetAddr(listen_addr)); + } catch (std::exception &err) { + fprintf(stdout, "net %lu: got error during a sync call: %s\n", id, err.what()); } } @@ -96,15 +105,15 @@ int read_int(char *buff) { if (t < 0) throw std::invalid_argument("negative"); return t; } catch (std::invalid_argument) { - fprintf(stderr, "expect a non-negative integer\n"); + fprintf(stdout, "expect a non-negative integer\n"); return -1; } } int main(int argc, char **argv) { int i; - fprintf(stderr, "p2p network library playground (type h for help)\n"); - fprintf(stderr, "================================================\n"); + fprintf(stdout, "p2p network library playground (type help for more info)\n"); + fprintf(stdout, "========================================================\n"); auto cmd_exit = [](char *) { for (auto &p: nets) @@ -112,30 +121,36 @@ int main(int argc, char **argv) { exit(0); }; - auto cmd_net = [](char *buff) { + auto cmd_add = [](char *buff) { int id = read_int(buff); if (id < 0) return; if (nets.count(id)) { - fprintf(stderr, "net id already exists"); + fprintf(stdout, "net id already exists\n"); + return; + } + int port = read_int(buff); + if (port < 0) return; + if (port >= 65536) + { + fprintf(stdout, "port should be < 65536\n"); return; } - scanf("%64s", buff); - nets.insert(std::make_pair(id, new Net(id, buff))); + nets.insert(std::make_pair(id, new Net(id, port))); }; auto cmd_ls = [](char *) { for (auto &p: nets) - fprintf(stderr, "%d\n", p.first); + fprintf(stdout, "%d\n", p.first); }; - auto cmd_rm = [](char *buff) { + auto cmd_del = [](char *buff) { int id = read_int(buff); if (id < 0) return; auto it = nets.find(id); if (it == nets.end()) { - fprintf(stderr, "net id does not exist\n"); + fprintf(stdout, "net id does not exist\n"); return; } it->second->stop_join(); @@ -149,27 +164,68 @@ int main(int argc, char **argv) { auto it = nets.find(id); if (it == nets.end()) { - fprintf(stderr, "net id does not exist\n"); + fprintf(stdout, "net id does not exist\n"); return; } - scanf("%64s", buff); - it->second->add_peer(buff); + int id2 = read_int(buff); + if (id2 < 0) return; + auto it2 = nets.find(id2); + if (it2 == nets.end()) + { + fprintf(stdout, "net id does not exist\n"); + return; + } + it->second->add_peer(it2->second->listen_addr); }; - cmd_map.insert(std::make_pair("exit", cmd_exit)); - cmd_map.insert(std::make_pair("net", cmd_net)); - cmd_map.insert(std::make_pair("ls", cmd_ls)); - cmd_map.insert(std::make_pair("rm", cmd_rm)); + auto cmd_delpeer = [](char *buff) { + int id = read_int(buff); + if (id < 0) return; + auto it = nets.find(id); + if (it == nets.end()) + { + fprintf(stdout, "net id does not exist\n"); + return; + } + int id2 = read_int(buff); + if (id2 < 0) return; + auto it2 = nets.find(id2); + if (it2 == nets.end()) + { + fprintf(stdout, "net id does not exist\n"); + return; + } + it->second->del_peer(it2->second->listen_addr); + }; + + auto cmd_help = [](char *) { + fprintf(stdout, + "add -- start a node (create a PeerNetwork instance)\n" + "addpeer -- add a peer to a given node\n" + "rmpeer -- add a peer to a given node\n" + "rm -- remove a node (destroy a PeerNetwork instance)\n" + "ls -- list all node ids\n" + "exit -- quit the program\n" + "help -- show this info\n" + ); + }; + + cmd_map.insert(std::make_pair("add", cmd_add)); cmd_map.insert(std::make_pair("addpeer", cmd_addpeer)); + cmd_map.insert(std::make_pair("del", cmd_del)); + cmd_map.insert(std::make_pair("delpeer", cmd_delpeer)); + cmd_map.insert(std::make_pair("ls", cmd_ls)); + cmd_map.insert(std::make_pair("exit", cmd_exit)); + cmd_map.insert(std::make_pair("help", cmd_help)); for (;;) { - fprintf(stderr, "> "); + fprintf(stdout, "> "); char buff[128]; if (scanf("%64s", buff) == EOF) break; auto it = cmd_map.find(buff); if (it == cmd_map.end()) - fprintf(stderr, "invalid comand \"%s\"\n", buff); + fprintf(stdout, "invalid comand \"%s\"\n", buff); else (it->second)(buff); } -- cgit v1.2.3-70-g09d2