aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/network.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r--include/salticidae/network.h81
1 files changed, 43 insertions, 38 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 0db69e4..fb55f83 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -388,7 +388,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
struct Peer {
PeerId id;
NetAddr addr; /** remote address (if set) */
- uint32_t my_nonce;
+ uint32_t nonce;
conn_t conn;
double retry_delay;
@@ -403,22 +403,21 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
TimerEvent ev_ping_timer;
bool ping_timer_ok;
bool pong_msg_ok;
+ double ping_period;
enum State {
DISCONNECTED,
CONNECTED,
RESET
} state;
- double ping_period;
-
Peer(const PeerId &pid, const PeerNetwork *pn):
id(pid),
retry_delay(0), ntry(0),
ev_ping_timer(
TimerEvent(pn->disp_ec, std::bind(&Peer::ping_timer, this, _1))),
- state(DISCONNECTED),
- ping_period(pn->ping_period) {}
+ ping_period(pn->ping_period),
+ state(DISCONNECTED) {}
Peer &operator=(const Peer &) = delete;
Peer(const Peer &) = delete;
@@ -431,14 +430,14 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
ev_ping_timer.del();
}
uint32_t get_nonce() {
- if (my_nonce == 0)
+ if (nonce == 0)
{
uint16_t n;
if (!RAND_bytes((uint8_t *)&n, 2))
throw PeerNetworkError(SALTI_ERROR_RAND_SOURCE);
- my_nonce = n + 1;
+ nonce = n + 1;
}
- return my_nonce;
+ return nonce;
}
public:
@@ -454,7 +453,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
std::unordered_map<PeerId, BoxObj<Peer>> known_peers;
using pinfo_slock_t = std::shared_lock<std::shared_timed_mutex>;
- using pinfo_ulock_t = std::shared_lock<std::shared_timed_mutex>;
+ using pinfo_ulock_t = std::unique_lock<std::shared_timed_mutex>;
mutable std::shared_timed_mutex known_peers_lock;
@@ -500,7 +499,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
void _pong_msg_cb(const conn_t &conn, uint16_t port);
void finish_handshake(Peer *peer);
void replace_pending_conn(const conn_t &conn);
- conn_t start_active_conn(const NetAddr &addr);
+ void start_active_conn(Peer *peer);
static void tcall_reset_timeout(ConnPool::Worker *worker,
const conn_t &conn, double timeout);
inline conn_t _get_peer_conn(const PeerId &peer) const;
@@ -519,7 +518,6 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
}
public:
-
class Config: public MsgNet::Config {
friend PeerNetwork;
double _ping_period;
@@ -574,13 +572,15 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
/* register a peer as known */
int32_t add_peer(const PeerId &peer);
+ /* unregister the peer */
+ int32_t del_peer(const PeerId &peer);
/* set the peer's public IP */
int32_t set_peer_addr(const PeerId &peer, const NetAddr &addr);
/* try to connect to the peer: once (ntry = 1), indefinitely (ntry = -1), give up retry (ntry = 0) */
int32_t conn_peer(const PeerId &peer, ssize_t ntry = -1, double retry_delay = 2);
- /* unregister the peer */
- int32_t del_peer(const PeerId &peer);
+ /* check if a peer is registered */
bool has_peer(const PeerId &peer) const;
+
size_t get_npending() const;
conn_t get_peer_conn(const PeerId &addr) const;
using MsgNet::send_msg;
@@ -662,7 +662,7 @@ void MsgNetwork<OpcodeType>::on_read(const ConnPool::conn_t &_conn) {
template<typename OpcodeType>
template<typename MsgType>
inline int32_t MsgNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const conn_t &conn) {
- return _send_msg_deferred(std::move(msg), conn);
+ return _send_msg_deferred(Msg(std::move(msg), msg_magic), conn);
}
template<typename OpcodeType>
@@ -757,7 +757,7 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
p->inbound_conn = nullptr;
p->outbound_conn = nullptr;
p->ev_ping_timer.del();
- p->my_nonce = 0;
+ p->nonce = 0;
this->user_tcall->async_call([this, conn](ThreadCall::Handle &) {
if (peer_cb) peer_cb(conn, false);
});
@@ -768,7 +768,8 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
{
p->ev_retry_timer = TimerEvent(this->disp_ec, [this, addr=p->addr, p](TimerEvent &) {
try {
- start_active_conn(addr)->peer = p;
+ start_active_conn(p);
+ p->ev_retry_timer.add(gen_rand_timeout(p->retry_delay));
} catch (...) { this->disp_error_cb(std::current_exception()); }
});
p->ev_retry_timer.add(reset ? 0 : gen_rand_timeout(p->retry_delay));
@@ -865,10 +866,11 @@ void PeerNetwork<O, _, __>::replace_pending_conn(const conn_t &conn) {
}
template<typename O, O _, O __>
-typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) {
- auto conn = static_pointer_cast<Conn>(MsgNet::_connect(addr));
+void PeerNetwork<O, _, __>::start_active_conn(Peer *p) {
+ assert(!p->addr.is_null());
+ auto conn = static_pointer_cast<Conn>(MsgNet::_connect(p->addr));
+ p->outbound_conn = conn;
replace_pending_conn(conn);
- return conn;
}
template<typename O, O _, O __>
@@ -911,7 +913,7 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) {
listen_addr,
p->addr.is_null() ? passive_nonce : p->get_nonce()), conn);
auto &old_conn = p->inbound_conn;
- if (old_conn && !old_conn->is_terminated())
+ if (old_conn && old_conn != conn)
{
SALTICIDAE_LOG_DEBUG("%s terminating stale handshake connection %s",
std::string(listen_addr).c_str(),
@@ -955,7 +957,7 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) {
if (conn->get_mode() == Conn::ConnMode::ACTIVE)
{
auto pid = get_peer_id(conn, conn->get_addr());
- pinfo_ulock_t _g(known_peers_lock);
+ pinfo_slock_t _g(known_peers_lock);
auto pit = known_peers.find(pid);
if (pit == known_peers.end())
{
@@ -971,7 +973,7 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) {
std::string(listen_addr).c_str(),
std::string(*conn).c_str());
auto &old_conn = p->outbound_conn;
- if (old_conn && !old_conn->is_terminated())
+ if (old_conn && old_conn != conn)
{
SALTICIDAE_LOG_DEBUG("%s terminating stale handshake connection %s",
std::string(listen_addr).c_str(),
@@ -992,7 +994,7 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) {
SALTICIDAE_LOG_DEBUG(
"%04x >= %04x, terminating and resetting",
p->get_nonce(), msg.nonce);
- p->my_nonce = 0;
+ p->nonce = 0;
this->disp_terminate(conn);
}
}
@@ -1048,7 +1050,7 @@ int32_t PeerNetwork<O, _, __>::conn_peer(const PeerId &pid, ssize_t ntry, double
auto id = this->gen_async_id();
this->disp_tcall->async_call([this, pid, ntry, retry_delay, id](ThreadCall::Handle &) {
try {
- pinfo_ulock_t _g(known_peers_lock);
+ pinfo_slock_t _g(known_peers_lock);
auto it = known_peers.find(pid);
if (it == known_peers.end())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
@@ -1060,11 +1062,11 @@ int32_t PeerNetwork<O, _, __>::conn_peer(const PeerId &pid, ssize_t ntry, double
p->inbound_conn = nullptr;
p->outbound_conn = nullptr;
p->ev_ping_timer.del();
- p->my_nonce = 0;
+ p->nonce = 0;
/* has to terminate established connection *before* making the next
* attempt */
if (!p->conn || p->state == Peer::State::DISCONNECTED)
- start_active_conn(p->addr);
+ start_active_conn(p.get());
else if (p->state == Peer::State::CONNECTED)
{
p->state = Peer::State::RESET;
@@ -1082,7 +1084,7 @@ int32_t PeerNetwork<O, _, __>::set_peer_addr(const PeerId &pid, const NetAddr &a
auto id = this->gen_async_id();
this->disp_tcall->async_call([this, pid, addr, id](ThreadCall::Handle &) {
try {
- pinfo_ulock_t _g(known_peers_lock);
+ pinfo_slock_t _g(known_peers_lock);
auto it = known_peers.find(pid);
if (it == known_peers.end())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
@@ -1110,8 +1112,9 @@ int32_t PeerNetwork<O, _, __>::del_peer(const PeerId &pid) {
auto it2 = pending_peers.find(addr);
if (it2 != pending_peers.end())
{
- if (!it2->second->peer)
- this->disp_terminate(it2->second);
+ auto &conn = it2->second;
+ if (!conn->peer)
+ this->disp_terminate(conn);
pending_peers.erase(it2);
}
} catch (const PeerNetworkError &) {
@@ -1156,7 +1159,7 @@ size_t PeerNetwork<O, _, __>::get_npending() const {
template<typename O, O _, O __>
template<typename MsgType>
inline int32_t PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const PeerId &pid) {
- return _send_msg_deferred(std::move(msg), pid);
+ return _send_msg_deferred(Msg(std::move(msg), this->msg_magic), pid);
}
template<typename O, O _, O __>
@@ -1187,7 +1190,7 @@ inline bool PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const PeerId &pid)
template<typename O, O _, O __>
template<typename MsgType>
inline int32_t PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<PeerId> &pids) {
- return _multicast_msg(MsgType(std::move(msg), this->msg_magic), pids);
+ return _multicast_msg(Msg(std::move(msg), this->msg_magic), pids);
}
template<typename O, O _, O __>
@@ -1196,6 +1199,7 @@ inline int32_t PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vecto
this->disp_tcall->async_call(
[this, msg=std::move(msg), pids, id](ThreadCall::Handle &) {
try {
+ pinfo_slock_t _g(known_peers_lock);
bool succ = true;
for (auto &pid: pids)
succ &= MsgNet::_send_msg(msg, _get_peer_conn(pid));
@@ -1228,7 +1232,7 @@ void ClientNetwork<OpcodeType>::on_teardown(const ConnPool::conn_t &_conn) {
template<typename OpcodeType>
template<typename MsgType>
inline int32_t ClientNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) {
- return _send_msg_deferred(std::move(msg), addr);
+ return _send_msg_deferred(Msg(std::move(msg), this->msg_magic), addr);
}
template<typename OpcodeType>
@@ -1246,7 +1250,7 @@ inline int32_t ClientNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const Ne
template<typename OpcodeType>
template<typename MsgType>
inline bool ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr) {
- return _send_msg(msg, addr);
+ return _send_msg(Msg(msg, this->msg_magic), addr);
}
template<typename OpcodeType>
@@ -1368,7 +1372,6 @@ bool msgnetwork_conn_is_terminated(const msgnetwork_conn_t *conn);
/* PeerNetwork */
-//peerid_t *peerid_new();
void peerid_free(const peerid_t *self);
peerid_t *peerid_new_from_netaddr(const netaddr_t *addr);
peerid_t *peerid_new_from_x509(const x509_t *cert);
@@ -1385,10 +1388,12 @@ msgnetwork_config_t *peernetwork_config_as_msgnetwork_config(peernetwork_config_
peernetwork_t *peernetwork_new(const eventcontext_t *ec, const peernetwork_config_t *config, SalticidaeCError *err);
void peernetwork_free(const peernetwork_t *self);
-int32_t peernetwork_add_peer(peernetwork_t *self, const peerid_t *pid);
-int32_t peernetwork_del_peer(peernetwork_t *self, const peerid_t *pid);
-bool peernetwork_has_peer(const peernetwork_t *self, const peerid_t *pid);
-const peernetwork_conn_t *peernetwork_get_peer_conn(const peernetwork_t *self, const peerid_t *pid, SalticidaeCError *cerror);
+int32_t peernetwork_add_peer(peernetwork_t *self, const peerid_t *peer);
+int32_t peernetwork_del_peer(peernetwork_t *self, const peerid_t *peer);
+int32_t peernetwork_conn_peer(peernetwork_t *self, const peerid_t *peer, ssize_t ntry, double retry_delay);
+bool peernetwork_has_peer(const peernetwork_t *self, const peerid_t *peer);
+const peernetwork_conn_t *peernetwork_get_peer_conn(const peernetwork_t *self, const peerid_t *peer, SalticidaeCError *cerror);
+int32_t peernetwork_set_peer_addr(peernetwork_t *self, const peerid_t *peer, const netaddr_t *addr);
msgnetwork_t *peernetwork_as_msgnetwork(peernetwork_t *self);
peernetwork_t *msgnetwork_as_peernetwork_unsafe(msgnetwork_t *self);
msgnetwork_conn_t *msgnetwork_conn_new_from_peernetwork_conn(const peernetwork_conn_t *conn);