diff options
-rw-r--r-- | include/salticidae/conn.h | 11 | ||||
-rw-r--r-- | include/salticidae/network.h | 60 | ||||
-rw-r--r-- | src/network.cpp | 4 | ||||
-rw-r--r-- | test/test_p2p.cpp | 3 | ||||
-rw-r--r-- | test/test_p2p_tls.cpp | 60 |
5 files changed, 106 insertions, 32 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index ea216c5..e0bf009 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -111,14 +111,13 @@ class ConnPool { virtual void stop(); public: - Conn(): terminated(false), worker(nullptr), + Conn(): terminated(false), // recv_chunk_size initialized later // max_recv_buff_size initialized later - // fd initialized later - // worker initialized later - // cpool initialized later - // mode initialized later - // addr initialized later + fd(-1), + worker(nullptr), + cpool(nullptr), + mode(ConnMode::PASSIVE), ready_send(false), ready_recv(false), send_data_func(nullptr), recv_data_func(nullptr), tls(nullptr), peer_cert(nullptr) {} diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 9cc1883..46112d2 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -394,7 +394,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { std::string id_hex; double retry_delay; - ssize_t ntry; + int32_t ntry; TimerEvent ev_retry_timer; /** the underlying connection, may be invalid when connected = false */ @@ -596,7 +596,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { /* set the peer's public IP */ int32_t set_peer_addr(const PeerId &peer, const NetAddr &addr); /* try to connect to the peer: once (ntry = 1), indefinitely (ntry = -1), give up retry (ntry = 0) */ - int32_t conn_peer(const PeerId &peer, ssize_t ntry = -1, double retry_delay = 2); + int32_t conn_peer(const PeerId &peer, int32_t ntry = -1, double retry_delay = 2); /* check if a peer is registered */ bool has_peer(const PeerId &peer) const; @@ -780,11 +780,13 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { std::string(*conn).c_str()); auto p = conn->peer; if (!p) return; - /* if this connect was the active peer connection */ - bool reset = p->state == Peer::State::RESET; - if (p->conn == conn) + /* there are only two possible cases where p != nullptr: + * 1. p2p is connected + * 2. p2p is disconnected, but it is trying to do an active connection by + * start_active_conn() */ + if (p->state != Peer::State::DISCONNECTED) { - assert(p->state == Peer::State::CONNECTED); + assert(p->conn == conn); p->state = Peer::State::DISCONNECTED; p->inbound_conn = nullptr; p->outbound_conn = nullptr; @@ -797,15 +799,9 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { /* auto retry the connection */ if (p->ntry > 0) p->ntry--; if (p->ntry) - { - p->ev_retry_timer = TimerEvent(this->disp_ec, [this, addr=p->addr, p](TimerEvent &) { - try { - start_active_conn(p); - p->ev_retry_timer.add(gen_rand_timeout(p->retry_delay)); - } catch (...) { this->disp_error_cb(std::current_exception()); } - }); - p->ev_retry_timer.add(reset ? 0 : gen_rand_timeout(p->retry_delay)); - } + p->ev_retry_timer.add( + p->state == Peer::State::RESET ? + 0 : gen_rand_timeout(p->retry_delay)); } template<typename O, O _, O __> @@ -898,8 +894,10 @@ template<typename O, O _, O __> void PeerNetwork<O, _, __>::start_active_conn(Peer *p) { assert(!p->addr.is_null()); auto conn = static_pointer_cast<Conn>(MsgNet::_connect(p->addr)); + conn->peer = p; p->outbound_conn = conn; - replace_pending_conn(conn); + assert(pending_peers.count(conn->get_addr()) == 0); + //replace_pending_conn(conn); } template<typename O, O _, O __> @@ -1079,7 +1077,14 @@ int32_t PeerNetwork<O, _, __>::add_peer(const PeerId &pid) { pinfo_ulock_t _g(known_peers_lock); if (known_peers.count(pid)) throw PeerNetworkError(SALTI_ERROR_PEER_ALREADY_EXISTS); - known_peers.insert(std::make_pair(pid, new Peer(pid, this))); + auto p = new Peer(pid, this); + conn_t conn{new Conn()}; + conn->cpool = this; + conn->set_terminated(); + conn->peer = p; + p->conn = conn; + p->state = Peer::State::DISCONNECTED; + known_peers.insert(std::make_pair(pid, p)); } catch (const PeerNetworkError &) { this->recoverable_error(std::current_exception(), id); } catch (...) { this->disp_error_cb(std::current_exception()); } @@ -1088,7 +1093,7 @@ int32_t PeerNetwork<O, _, __>::add_peer(const PeerId &pid) { } template<typename O, O _, O __> -int32_t PeerNetwork<O, _, __>::conn_peer(const PeerId &pid, ssize_t ntry, double retry_delay) { +int32_t PeerNetwork<O, _, __>::conn_peer(const PeerId &pid, int32_t ntry, double retry_delay) { auto id = this->gen_async_id(); this->disp_tcall->async_call([this, pid, ntry, retry_delay, id](ThreadCall::Handle &) { try { @@ -1105,15 +1110,26 @@ int32_t PeerNetwork<O, _, __>::conn_peer(const PeerId &pid, ssize_t ntry, double p->outbound_conn = nullptr; p->ev_ping_timer.del(); p->nonce = 0; + p->ev_retry_timer = TimerEvent(this->disp_ec, + [this, addr=p->addr, p=p.get()](TimerEvent &) { + try { + start_active_conn(p); + p->ev_retry_timer.add(gen_rand_timeout(p->retry_delay)); + } catch (...) { this->disp_error_cb(std::current_exception()); } + }); + /* has to terminate established connection *before* making the next * attempt */ - if (!p->conn || p->state == Peer::State::DISCONNECTED) + if (p->state == Peer::State::DISCONNECTED && ntry) start_active_conn(p.get()); else if (p->state == Peer::State::CONNECTED) { p->state = Peer::State::RESET; this->disp_terminate(p->conn); } + // else ntry == 0 but state is not connected + // or state is RESET + // then it does nothing } catch (const PeerNetworkError &) { this->recoverable_error(std::current_exception(), id); } catch (...) { this->disp_error_cb(std::current_exception()); } @@ -1333,7 +1349,7 @@ using clientnetwork_conn_t = clientnetwork_t::conn_t; #ifdef SALTICIDAE_CBINDINGS typedef struct peerid_t peerid_t; -typedef struct peerid_t_array_t peerid_array_t; +typedef struct peerid_array_t peerid_array_t; typedef struct msgnetwork_t msgnetwork_t; typedef struct msgnetwork_config_t msgnetwork_config_t; @@ -1420,7 +1436,7 @@ void peerid_free(const peerid_t *self); peerid_t *peerid_new_from_netaddr(const netaddr_t *addr); peerid_t *peerid_new_from_x509(const x509_t *cert); peerid_array_t *peerid_array_new(); -peerid_array_t *peerid_array_new_from_peerids(const peerid_t * const *pids, size_t npids); +peerid_array_t *peerid_array_new_from_peers(const peerid_t * const *peers, size_t npeers); void peerid_array_free(peerid_array_t *self); peernetwork_config_t *peernetwork_config_new(); @@ -1434,7 +1450,7 @@ peernetwork_t *peernetwork_new(const eventcontext_t *ec, const peernetwork_confi void peernetwork_free(const peernetwork_t *self); int32_t peernetwork_add_peer(peernetwork_t *self, const peerid_t *peer); int32_t peernetwork_del_peer(peernetwork_t *self, const peerid_t *peer); -int32_t peernetwork_conn_peer(peernetwork_t *self, const peerid_t *peer, ssize_t ntry, double retry_delay); +int32_t peernetwork_conn_peer(peernetwork_t *self, const peerid_t *peer, int32_t ntry, double retry_delay); bool peernetwork_has_peer(const peernetwork_t *self, const peerid_t *peer); const peernetwork_conn_t *peernetwork_get_peer_conn(const peernetwork_t *self, const peerid_t *peer, SalticidaeCError *cerror); int32_t peernetwork_set_peer_addr(peernetwork_t *self, const peerid_t *peer, const netaddr_t *addr); diff --git a/src/network.cpp b/src/network.cpp index 4182a41..0edc407 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -187,7 +187,7 @@ peerid_t *peerid_new_from_x509(const x509_t *cert) { } peerid_array_t *peerid_array_new() { return new peerid_array_t(); } -peerid_array_t *peerid_array_new_from_peerids(const peerid_t * const *peers, size_t npeers) { +peerid_array_t *peerid_array_new_from_peers(const peerid_t * const *peers, size_t npeers) { auto res = new peerid_array_t(); res->resize(npeers); for (size_t i = 0; i < npeers; i++) @@ -235,7 +235,7 @@ int32_t peernetwork_del_peer(peernetwork_t *self, const peerid_t *peer) { return self->del_peer(*peer); } -int32_t peernetwork_conn_peer(peernetwork_t *self, const peerid_t *peer, ssize_t ntry, double retry_delay) { +int32_t peernetwork_conn_peer(peernetwork_t *self, const peerid_t *peer, int32_t ntry, double retry_delay) { return self->conn_peer(*peer, ntry, retry_delay); } diff --git a/test/test_p2p.cpp b/test/test_p2p.cpp index 4b6d451..e289b45 100644 --- a/test/test_p2p.cpp +++ b/test/test_p2p.cpp @@ -120,9 +120,8 @@ struct Net { } void add_peer(const std::string &listen_addr) { - NetAddr addr(listen_addr); try { - net->add_peer(addr); + net->add_peer(NetAddr(listen_addr)); } catch (std::exception &err) { fprintf(stdout, "net %lu: got error during a sync call: %s\n", id, err.what()); } diff --git a/test/test_p2p_tls.cpp b/test/test_p2p_tls.cpp index 10dacff..59a7859 100644 --- a/test/test_p2p_tls.cpp +++ b/test/test_p2p_tls.cpp @@ -153,6 +153,22 @@ struct Net { } } + void set_peer_addr(const NetAddr &addr) { + try { + net->set_peer_addr(addr, addr); + } catch (std::exception &err) { + fprintf(stdout, "net %lu: got error during a sync call: %s\n", id, err.what()); + } + } + + void conn_peer(const NetAddr &addr) { + try { + net->conn_peer(addr); + } catch (std::exception &err) { + fprintf(stdout, "net %lu: got error during a sync call: %s\n", id, err.what()); + } + } + void del_peer(const std::string &listen_addr) { try { net->del_peer(NetAddr(listen_addr)); @@ -278,6 +294,46 @@ int main(int argc, char **argv) { it->second->del_peer(it2->second->listen_addr); }; + auto cmd_setpeeraddr = [](char *buff) { + int id = read_int(buff); + if (id < 0) return; + auto it = nets.find(id); + if (it == nets.end()) + { + fprintf(stdout, "net id does not exist\n"); + return; + } + int id2 = read_int(buff); + if (id2 < 0) return; + auto it2 = nets.find(id2); + if (it2 == nets.end()) + { + fprintf(stdout, "net id does not exist\n"); + return; + } + it->second->set_peer_addr(it2->second->listen_addr); + }; + + auto cmd_connpeer = [](char *buff) { + int id = read_int(buff); + if (id < 0) return; + auto it = nets.find(id); + if (it == nets.end()) + { + fprintf(stdout, "net id does not exist\n"); + return; + } + int id2 = read_int(buff); + if (id2 < 0) return; + auto it2 = nets.find(id2); + if (it2 == nets.end()) + { + fprintf(stdout, "net id does not exist\n"); + return; + } + it->second->conn_peer(it2->second->listen_addr); + }; + auto cmd_msg = [](char *buff) { int id = read_int(buff); if (id < 0) return; @@ -309,6 +365,8 @@ int main(int argc, char **argv) { fprintf(stdout, "add <node-id> <port> -- start a node (create a PeerNetwork instance)\n" "addpeer <node-id> <peer-id> -- add a peer to a given node\n" + "setpeeraddr <node-id> <peer-id> -- set the peer addr\n" + "connpeer <node-id> <peer-id> -- try to connect to the peer\n" "delpeer <node-id> <peer-id> -- add a peer to a given node\n" "del <node-id> -- remove a node (destroy a PeerNetwork instance)\n" "msg <node-id> <peer-id> <msg> -- send a text message to a node\n" @@ -321,6 +379,8 @@ int main(int argc, char **argv) { cmd_map.insert(std::make_pair("add", cmd_add)); cmd_map.insert(std::make_pair("addpeer", cmd_addpeer)); + cmd_map.insert(std::make_pair("setpeeraddr", cmd_setpeeraddr)); + cmd_map.insert(std::make_pair("connpeer", cmd_connpeer)); cmd_map.insert(std::make_pair("del", cmd_del)); cmd_map.insert(std::make_pair("delpeer", cmd_delpeer)); cmd_map.insert(std::make_pair("msg", cmd_msg)); |