From 1d49e46bfaeb7a6bbd38960c0bfd08643aac7c12 Mon Sep 17 00:00:00 2001 From: Determinant Date: Fri, 28 Jun 2019 01:22:43 -0400 Subject: ... --- include/salticidae/conn.h | 6 ++++-- include/salticidae/network.h | 38 +++++++++++++++++++++++++++++++++----- src/conn.cpp | 6 ++++-- 3 files changed, 41 insertions(+), 9 deletions(-) diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index e39d31d..b693b4f 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -267,8 +267,10 @@ class ConnPool { conn->send_data_func = Conn::_send_data; conn->recv_data_func = Conn::_recv_data; enable_send_buffer(conn, client_fd); - cpool->on_setup(conn); - cpool->update_conn(conn, true); + cpool->disp_tcall->async_call([cpool, conn](ThreadCall::Handle &) { + cpool->on_setup(conn); + cpool->update_conn(conn, true); + }); } assert(conn->fd != -1); assert(conn->worker == this); diff --git a/include/salticidae/network.h b/include/salticidae/network.h index e28e3df..092c969 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -33,6 +33,7 @@ #ifdef __cplusplus #include +#include #include namespace salticidae { /** Network of nodes who can send async messages. */ @@ -349,6 +350,13 @@ class PeerNetwork: public MsgNetwork { std::unordered_map pending_peers; std::unordered_map> known_peers; std::unordered_map> pid2peer; + + using pinfo_slock_t = std::shared_lock; + using pinfo_ulock_t = std::shared_lock; + + mutable std::shared_timed_mutex known_peers_lock; + mutable std::shared_timed_mutex pid2peer_lock; + peer_callback_t peer_cb; unknown_peer_callback_t unknown_peer_cb; @@ -590,7 +598,8 @@ void PeerNetwork::on_setup(const ConnPool::conn_t &_conn) { auto worker = conn->worker; auto &ev_timeout = conn->ev_timeout; assert(!ev_timeout); - ev_timeout = TimerEvent(worker->get_ec(), [listen_addr=this->listen_addr, worker, conn](TimerEvent &) { + ev_timeout = TimerEvent(worker->get_ec(), + [listen_addr=this->listen_addr, worker, conn](TimerEvent &) { try { SALTICIDAE_LOG_INFO("peer ping-pong timeout %s <-> %s", std::string(listen_addr).c_str(), @@ -620,6 +629,7 @@ void PeerNetwork::on_teardown(const ConnPool::conn_t &_conn) { start_active_conn(addr); } catch (...) { this->disp_error_cb(std::current_exception()); } }); + pinfo_ulock_t _g(known_peers_lock); auto it = known_peers.find(addr); if (it == known_peers.end()) return; if (p) @@ -632,7 +642,10 @@ void PeerNetwork::on_teardown(const ConnPool::conn_t &_conn) { p->outbound_handshake = false; p->inbound_handshake = false; known_peers[p->peer_addr] = std::make_pair(uint256_t(), TimerEvent()); - pid2peer.erase(p->peer_id); + { + pinfo_ulock_t __g(pid2peer_lock); + pid2peer.erase(p->peer_id); + } this->user_tcall->async_call([this, conn](ThreadCall::Handle &) { if (peer_cb) peer_cb(conn, false); }); @@ -686,7 +699,10 @@ bool PeerNetwork::check_handshake(Peer *p) { p->connected = true; p->reset_ping_timer(); p->send_ping(); - known_peers[p->peer_addr] = std::make_pair(p->peer_id, TimerEvent()); + { + pinfo_ulock_t _g(known_peers_lock); + known_peers[p->peer_addr] = std::make_pair(p->peer_id, TimerEvent()); + } pending_peers.erase(p->conn->get_addr()); if (p->connected) { @@ -738,7 +754,7 @@ inline typename PeerNetwork::conn_t PeerNetwork::_get_peer_c auto it = known_peers.find(addr); if (it == known_peers.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - auto &peer_id = it->second.first; + const auto &peer_id = it->second.first; if (peer_id.is_null()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_READY); auto it2 = pid2peer.find(peer_id); @@ -759,6 +775,8 @@ void PeerNetwork::ping_handler(MsgPing &&msg, const conn_t &conn) { auto peer_id = gen_peer_id(conn, msg.claimed_addr, msg.nonce); if (conn_mode == Conn::ConnMode::PASSIVE) { + pinfo_slock_t _g(known_peers_lock); + pinfo_ulock_t __g(pid2peer_lock); if (!known_peers.count(msg.claimed_addr)) { this->user_tcall->async_call([this, addr=msg.claimed_addr](ThreadCall::Handle &) { @@ -828,6 +846,8 @@ void PeerNetwork::pong_handler(MsgPong &&msg, const conn_t &conn) { auto peer_id = gen_peer_id(conn, msg.claimed_addr, msg.nonce); if (conn_mode == Conn::ConnMode::ACTIVE) { + pinfo_ulock_t _g(known_peers_lock); + pinfo_ulock_t __g(pid2peer_lock); SALTICIDAE_LOG_INFO("%s outbound handshake to %s", std::string(listen_addr).c_str(), std::string(*conn).c_str()); @@ -891,7 +911,7 @@ void PeerNetwork::pong_handler(MsgPong &&msg, const conn_t &conn) { auto p = conn->peer; if (!p) { - SALTICIDAE_LOG_WARN("unexpected poing mesage"); + SALTICIDAE_LOG_WARN("unexpected pong mesage"); return; } p->pong_msg_ok = true; @@ -929,6 +949,7 @@ template void PeerNetwork::add_peer(const NetAddr &addr) { this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) { try { + pinfo_ulock_t _g(known_peers_lock); if (!known_peers.insert(std::make_pair(addr, std::make_pair(uint256_t(), TimerEvent()))).second) throw PeerNetworkError(SALTI_ERROR_PEER_ALREADY_EXISTS); @@ -944,6 +965,8 @@ template void PeerNetwork::del_peer(const NetAddr &addr) { this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) { try { + pinfo_ulock_t _g(known_peers_lock); + pinfo_ulock_t __g(pid2peer_lock); auto it = known_peers.find(addr); if (it == known_peers.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); @@ -977,6 +1000,8 @@ PeerNetwork::get_peer_conn(const NetAddr &addr) const { conn_t conn; std::exception_ptr err = nullptr; try { + pinfo_slock_t _g(known_peers_lock); + pinfo_slock_t __g(pid2peer_lock); auto it = known_peers.find(addr); if (it == known_peers.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); @@ -1003,6 +1028,7 @@ template bool PeerNetwork::has_peer(const NetAddr &addr) const { return *(static_cast(this->disp_tcall->call( [this, addr](ThreadCall::Handle &h) { + pinfo_slock_t _g(known_peers_lock); h.set_result(known_peers.count(addr)); }).get())); } @@ -1039,6 +1065,8 @@ inline void PeerNetwork::send_msg(const MsgType &msg, const NetAddr &a template inline void PeerNetwork::_send_msg(const Msg &msg, const NetAddr &addr) { + pinfo_slock_t _g(known_peers_lock); + pinfo_slock_t __g(pid2peer_lock); MsgNet::_send_msg(msg, _get_peer_conn(addr)); } diff --git a/src/conn.cpp b/src/conn.cpp index 84f08a4..e01690d 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -228,8 +228,10 @@ void ConnPool::Conn::_recv_data_tls_handshake(const conn_t &conn, int, int) { conn->peer_cert = new X509(conn->tls->get_peer_cert()); conn->worker->enable_send_buffer(conn, conn->fd); auto cpool = conn->cpool; - cpool->on_setup(conn); - cpool->update_conn(conn, true); + cpool->disp_tcall->async_call([cpool, conn](ThreadCall::Handle &) { + cpool->on_setup(conn); + cpool->update_conn(conn, true); + }); } else { -- cgit v1.2.3