aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <ted.sybil@gmail.com>2019-06-28 01:22:43 -0400
committerDeterminant <ted.sybil@gmail.com>2019-06-28 01:22:43 -0400
commit1d49e46bfaeb7a6bbd38960c0bfd08643aac7c12 (patch)
treecdb820d432950254e1fc6cbefcce00b307cd907b
parentb886c6e00ab986f3f22d6e77f77fa8f559532e38 (diff)
...
-rw-r--r--include/salticidae/conn.h6
-rw-r--r--include/salticidae/network.h38
-rw-r--r--src/conn.cpp6
3 files changed, 41 insertions, 9 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index e39d31d..b693b4f 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -267,8 +267,10 @@ class ConnPool {
conn->send_data_func = Conn::_send_data;
conn->recv_data_func = Conn::_recv_data;
enable_send_buffer(conn, client_fd);
- cpool->on_setup(conn);
- cpool->update_conn(conn, true);
+ cpool->disp_tcall->async_call([cpool, conn](ThreadCall::Handle &) {
+ cpool->on_setup(conn);
+ cpool->update_conn(conn, true);
+ });
}
assert(conn->fd != -1);
assert(conn->worker == this);
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index e28e3df..092c969 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -33,6 +33,7 @@
#ifdef __cplusplus
#include <unordered_set>
+#include <shared_mutex>
#include <openssl/rand.h>
namespace salticidae {
/** Network of nodes who can send async messages. */
@@ -349,6 +350,13 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
std::unordered_map<NetAddr, conn_t> pending_peers;
std::unordered_map<NetAddr, std::pair<uint256_t, TimerEvent>> known_peers;
std::unordered_map<uint256_t, BoxObj<Peer>> pid2peer;
+
+ using pinfo_slock_t = std::shared_lock<std::shared_timed_mutex>;
+ using pinfo_ulock_t = std::shared_lock<std::shared_timed_mutex>;
+
+ mutable std::shared_timed_mutex known_peers_lock;
+ mutable std::shared_timed_mutex pid2peer_lock;
+
peer_callback_t peer_cb;
unknown_peer_callback_t unknown_peer_cb;
@@ -590,7 +598,8 @@ void PeerNetwork<O, _, __>::on_setup(const ConnPool::conn_t &_conn) {
auto worker = conn->worker;
auto &ev_timeout = conn->ev_timeout;
assert(!ev_timeout);
- ev_timeout = TimerEvent(worker->get_ec(), [listen_addr=this->listen_addr, worker, conn](TimerEvent &) {
+ ev_timeout = TimerEvent(worker->get_ec(),
+ [listen_addr=this->listen_addr, worker, conn](TimerEvent &) {
try {
SALTICIDAE_LOG_INFO("peer ping-pong timeout %s <-> %s",
std::string(listen_addr).c_str(),
@@ -620,6 +629,7 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
start_active_conn(addr);
} catch (...) { this->disp_error_cb(std::current_exception()); }
});
+ pinfo_ulock_t _g(known_peers_lock);
auto it = known_peers.find(addr);
if (it == known_peers.end()) return;
if (p)
@@ -632,7 +642,10 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
p->outbound_handshake = false;
p->inbound_handshake = false;
known_peers[p->peer_addr] = std::make_pair(uint256_t(), TimerEvent());
- pid2peer.erase(p->peer_id);
+ {
+ pinfo_ulock_t __g(pid2peer_lock);
+ pid2peer.erase(p->peer_id);
+ }
this->user_tcall->async_call([this, conn](ThreadCall::Handle &) {
if (peer_cb) peer_cb(conn, false);
});
@@ -686,7 +699,10 @@ bool PeerNetwork<O, _, __>::check_handshake(Peer *p) {
p->connected = true;
p->reset_ping_timer();
p->send_ping();
- known_peers[p->peer_addr] = std::make_pair(p->peer_id, TimerEvent());
+ {
+ pinfo_ulock_t _g(known_peers_lock);
+ known_peers[p->peer_addr] = std::make_pair(p->peer_id, TimerEvent());
+ }
pending_peers.erase(p->conn->get_addr());
if (p->connected)
{
@@ -738,7 +754,7 @@ inline typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::_get_peer_c
auto it = known_peers.find(addr);
if (it == known_peers.end())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- auto &peer_id = it->second.first;
+ const auto &peer_id = it->second.first;
if (peer_id.is_null())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_READY);
auto it2 = pid2peer.find(peer_id);
@@ -759,6 +775,8 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) {
auto peer_id = gen_peer_id(conn, msg.claimed_addr, msg.nonce);
if (conn_mode == Conn::ConnMode::PASSIVE)
{
+ pinfo_slock_t _g(known_peers_lock);
+ pinfo_ulock_t __g(pid2peer_lock);
if (!known_peers.count(msg.claimed_addr))
{
this->user_tcall->async_call([this, addr=msg.claimed_addr](ThreadCall::Handle &) {
@@ -828,6 +846,8 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) {
auto peer_id = gen_peer_id(conn, msg.claimed_addr, msg.nonce);
if (conn_mode == Conn::ConnMode::ACTIVE)
{
+ pinfo_ulock_t _g(known_peers_lock);
+ pinfo_ulock_t __g(pid2peer_lock);
SALTICIDAE_LOG_INFO("%s outbound handshake to %s",
std::string(listen_addr).c_str(),
std::string(*conn).c_str());
@@ -891,7 +911,7 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) {
auto p = conn->peer;
if (!p)
{
- SALTICIDAE_LOG_WARN("unexpected poing mesage");
+ SALTICIDAE_LOG_WARN("unexpected pong mesage");
return;
}
p->pong_msg_ok = true;
@@ -929,6 +949,7 @@ template<typename O, O _, O __>
void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) {
try {
+ pinfo_ulock_t _g(known_peers_lock);
if (!known_peers.insert(std::make_pair(addr,
std::make_pair(uint256_t(), TimerEvent()))).second)
throw PeerNetworkError(SALTI_ERROR_PEER_ALREADY_EXISTS);
@@ -944,6 +965,8 @@ template<typename O, O _, O __>
void PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) {
this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) {
try {
+ pinfo_ulock_t _g(known_peers_lock);
+ pinfo_ulock_t __g(pid2peer_lock);
auto it = known_peers.find(addr);
if (it == known_peers.end())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
@@ -977,6 +1000,8 @@ PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &addr) const {
conn_t conn;
std::exception_ptr err = nullptr;
try {
+ pinfo_slock_t _g(known_peers_lock);
+ pinfo_slock_t __g(pid2peer_lock);
auto it = known_peers.find(addr);
if (it == known_peers.end())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
@@ -1003,6 +1028,7 @@ template<typename O, O _, O __>
bool PeerNetwork<O, _, __>::has_peer(const NetAddr &addr) const {
return *(static_cast<bool *>(this->disp_tcall->call(
[this, addr](ThreadCall::Handle &h) {
+ pinfo_slock_t _g(known_peers_lock);
h.set_result(known_peers.count(addr));
}).get()));
}
@@ -1039,6 +1065,8 @@ inline void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &a
template<typename O, O _, O __>
inline void PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const NetAddr &addr) {
+ pinfo_slock_t _g(known_peers_lock);
+ pinfo_slock_t __g(pid2peer_lock);
MsgNet::_send_msg(msg, _get_peer_conn(addr));
}
diff --git a/src/conn.cpp b/src/conn.cpp
index 84f08a4..e01690d 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -228,8 +228,10 @@ void ConnPool::Conn::_recv_data_tls_handshake(const conn_t &conn, int, int) {
conn->peer_cert = new X509(conn->tls->get_peer_cert());
conn->worker->enable_send_buffer(conn, conn->fd);
auto cpool = conn->cpool;
- cpool->on_setup(conn);
- cpool->update_conn(conn, true);
+ cpool->disp_tcall->async_call([cpool, conn](ThreadCall::Handle &) {
+ cpool->on_setup(conn);
+ cpool->update_conn(conn, true);
+ });
}
else
{