aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/network.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r--include/salticidae/network.h236
1 files changed, 157 insertions, 79 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index dd05ed3..152ca60 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -345,12 +345,26 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
using peer_callback_t = std::function<void(const conn_t &peer_conn, bool connected)>;
using unknown_peer_callback_t = std::function<void(const NetAddr &claimed_addr, const X509 *cert)>;
+ struct PeerId: public uint256_t {
+ PeerId(const NetAddr &addr) {
+ DataStream tmp;
+ tmp << addr;
+ *this = tmp.get_hash();
+ }
+
+ PeerId(const X509 &cert) {
+ DataStream tmp;
+ tmp << cert.get_der();
+ *this = tmp.get_hash();
+ }
+ };
+
private:
class PeerConn {
friend PeerNetwork;
/** connection addr, may be different due to passive mode */
- uint256_t peer_id;
- NetAddr peer_addr;
+ uint256_t session_id;
+ PeerId peer_id;
/** the underlying connection, may be invalid when connected = false */
conn_t conn;
conn_t inbound_conn;
@@ -365,10 +379,10 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
double ping_period;
PeerConn() = delete;
- PeerConn(const uint256_t &peer_id,
+ PeerConn(const uint256_t &session_id, const PeerId &peer_id,
conn_t conn, conn_t inbound_conn, conn_t outbound_conn,
const PeerNetwork *pn):
- peer_id(peer_id),
+ session_id(session_id),
conn(conn),
inbound_conn(inbound_conn),
outbound_conn(outbound_conn),
@@ -396,33 +410,41 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
};
struct Peer {
- uint256_t peer_id;
+ uint256_t session_id;
+ NetAddr addr;
+ double retry_delay;
+ ssize_t ntry;
TimerEvent ev_retry_timer;
conn_t peer_conn;
- Peer(const conn_t &conn):
- peer_id(), ev_retry_timer(), peer_conn(conn) {}
+ Peer():
+ session_id(),
+ retry_delay(0), ntry(0),
+ ev_retry_timer() {}
};
+ /* connections whose PeerId is unknown */
std::unordered_map<NetAddr, conn_t> pending_peers;
- std::unordered_map<NetAddr, BoxObj<Peer>> known_peers;
- std::unordered_map<uint256_t, BoxObj<PeerConn>> pid2peer;
+ /* registered peers */
+ std::unordered_map<PeerId, BoxObj<Peer>> known_peers;
+ /* peer connection deduplication map */
+ std::unordered_map<uint256_t, BoxObj<PeerConn>> sid2peer;
using pinfo_slock_t = std::shared_lock<std::shared_timed_mutex>;
using pinfo_ulock_t = std::shared_lock<std::shared_timed_mutex>;
mutable std::shared_timed_mutex known_peers_lock;
- mutable std::shared_timed_mutex pid2peer_lock;
+ mutable std::shared_timed_mutex sid2peer_lock;
peer_callback_t peer_cb;
unknown_peer_callback_t unknown_peer_cb;
const IdentityMode id_mode;
- double retry_conn_delay;
double ping_period;
double conn_timeout;
NetAddr listen_addr;
bool allow_unknown_peer;
+ bool force_two_way_handshake;
uint256_t my_nonce;
struct MsgPing {
@@ -464,12 +486,10 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
protected:
ConnPool::Conn *create_conn() override { return new Conn(); }
- virtual double gen_conn_timeout() {
- return gen_rand_timeout(retry_conn_delay);
- }
void on_setup(const ConnPool::conn_t &) override;
void on_teardown(const ConnPool::conn_t &) override;
- uint256_t gen_peer_id(const conn_t &conn, const NetAddr &claimed_addr, const uint256_t &nonce) {
+
+ uint256_t get_session_id(const conn_t &conn, const NetAddr &claimed_addr, const uint256_t &nonce) {
DataStream tmp;
if (!this->enable_tls || id_mode == ADDR_BASED)
tmp << nonce << claimed_addr;
@@ -478,14 +498,22 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
return tmp.get_hash();
}
+ PeerId get_peer_id(const conn_t &conn, const NetAddr &addr) {
+ DataStream tmp;
+ if (!this->enable_tls || id_mode == ADDR_BASED)
+ return PeerId(addr);
+ else
+ return PeerId(*conn->get_peer_cert());
+ }
+
public:
class Config: public MsgNet::Config {
friend PeerNetwork;
- double _retry_conn_delay;
double _ping_period;
double _conn_timeout;
bool _allow_unknown_peer;
+ bool _force_two_way_handshake;
IdentityMode _id_mode;
public:
@@ -493,18 +521,13 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
Config(const typename MsgNet::Config &config):
MsgNet::Config(config),
- _retry_conn_delay(2),
_ping_period(30),
_conn_timeout(180),
_allow_unknown_peer(false),
+ _force_two_way_handshake(false),
_id_mode(CERT_BASED) {}
- Config &retry_conn_delay(double x) {
- _retry_conn_delay = x;
- return *this;
- }
-
Config &ping_period(double x) {
_ping_period = x;
return *this;
@@ -524,26 +547,37 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
_allow_unknown_peer = x;
return *this;
}
+
+ Config &force_two_way_handshake(bool x) {
+ _force_two_way_handshake = x;
+ return *this;
+ }
};
PeerNetwork(const EventContext &ec, const Config &config):
MsgNet(ec, config),
id_mode(config._id_mode),
- retry_conn_delay(config._retry_conn_delay),
ping_period(config._ping_period),
conn_timeout(config._conn_timeout),
- allow_unknown_peer(config._allow_unknown_peer) {
+ allow_unknown_peer(config._allow_unknown_peer),
+ force_two_way_handshake(config._force_two_way_handshake) {
this->reg_handler(generic_bind(&PeerNetwork::ping_handler, this, _1, _2));
this->reg_handler(generic_bind(&PeerNetwork::pong_handler, this, _1, _2));
}
virtual ~PeerNetwork() { this->stop(); }
- int32_t add_peer(const NetAddr &addr);
- int32_t del_peer(const NetAddr &addr);
- bool has_peer(const NetAddr &addr) const;
+ /* register a peer as known */
+ int32_t add_peer(const PeerId &pid);
+ /* set the peer's public IP */
+ int32_t set_peer_addr(const PeerId &pid, const NetAddr &addr);
+ /* try to connect to the peer: once (ntry = 1), indefinitely (ntry = -1), give up retry (ntry = 0) */
+ int32_t try_conn_peer(const PeerId &pid, ssize_t ntry = -1, double retry_delay = 2);
+ /* unregister the peer */
+ int32_t del_peer(const PeerId &pid);
+ bool has_peer(const PeerId &pid) const;
size_t get_npending() const;
- conn_t get_peer_conn(const NetAddr &addr) const;
+ conn_t get_peer_conn(const PeerId &addr) const;
using MsgNet::send_msg;
template<typename MsgType>
inline bool send_msg(const MsgType &msg, const NetAddr &addr);
@@ -703,16 +737,16 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
pending_peers.erase(addr);
SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*conn).c_str());
auto p = conn->peer;
- if (p) addr = p->peer_addr;
+ auto pid = p ? p->peer_id : PeerId();
pinfo_ulock_t _g(known_peers_lock);
- auto it = known_peers.find(addr);
+ auto it = known_peers.find(pid);
if (it == known_peers.end()) return;
auto &pp = it->second;
if (conn == pp->peer_conn)
{
- pinfo_ulock_t __g(pid2peer_lock);
- auto it2 = pid2peer.find(pp->peer_id);
- if (it2 != pid2peer.end())
+ pinfo_ulock_t __g(sid2peer_lock);
+ auto it2 = sid2peer.find(pp->session_id);
+ if (it2 != sid2peer.end())
{
auto &p = it2->second;
if (p->connected && p->conn != conn)
@@ -728,13 +762,13 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
p->connected = false;
p->outbound_handshake = false;
p->inbound_handshake = false;
- pp->peer_id = uint256_t();
+ pp->session_id = uint256_t();
PeerConn *peer = nullptr;
{
- pinfo_ulock_t __g(pid2peer_lock);
- auto it2 = pid2peer.find(p->peer_id);
+ pinfo_ulock_t __g(sid2peer_lock);
+ auto it2 = sid2peer.find(p->session_id);
peer = it2->second.unwrap();
- pid2peer.erase(it2);
+ sid2peer.erase(it2);
}
peer->conn = nullptr;
conn->_dead_peer = peer;
@@ -744,7 +778,7 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
}
else
{
- if (!it->second->peer_id.is_null()) return;
+ if (!it->second->session_id.is_null()) return;
}
auto &ev_retry_timer = it->second->ev_retry_timer;
ev_retry_timer = TimerEvent(this->disp_ec, [this, addr](TimerEvent &) {
@@ -752,7 +786,10 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
start_active_conn(addr);
} catch (...) { this->disp_error_cb(std::current_exception()); }
});
- ev_retry_timer.add(gen_conn_timeout());
+ /* auto retry the connection */
+ if (pp->ntry > 0) pp->ntry--;
+ if (pp->ntry)
+ ev_retry_timer.add(gen_rand_timeout(pp->retry_delay));
}
template<typename O, O _, O __>
@@ -798,8 +835,10 @@ void PeerNetwork<O, _, __>::move_peer_buffer(conn_t &old_conn, const conn_t &new
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::check_handshake(PeerConn *p) {
- if (!(p->inbound_handshake && p->outbound_handshake) ||
- p->connected)
+ if ((force_two_way_handshake &&
+ !(p->inbound_handshake && p->outbound_handshake)) ||
+ !(p->inbound_handshake || p->outbound_handshake) ||
+ p->connected)
return;
p->clear_all_events();
if (p->inbound_conn && p->inbound_conn != p->conn)
@@ -813,7 +852,7 @@ void PeerNetwork<O, _, __>::check_handshake(PeerConn *p) {
{
pinfo_ulock_t _g(known_peers_lock);
auto &pp = known_peers[p->peer_addr];
- pp->peer_id = p->peer_id;
+ pp->session_id = p->session_id;
pp->ev_retry_timer.del();
auto &old_conn = pp->peer_conn;
auto &conn = p->conn;
@@ -885,11 +924,11 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) {
if (conn->is_terminated()) return;
if (!msg.claimed_addr.is_null())
{
- auto peer_id = gen_peer_id(conn, msg.claimed_addr, msg.nonce);
+ auto session_id = get_session_id(conn, msg.claimed_addr, msg.nonce);
if (conn->get_mode() == Conn::ConnMode::PASSIVE)
{
pinfo_slock_t _g(known_peers_lock);
- pinfo_ulock_t __g(pid2peer_lock);
+ pinfo_ulock_t __g(sid2peer_lock);
if (!known_peers.count(msg.claimed_addr))
{
this->user_tcall->async_call([this, addr=msg.claimed_addr, conn](ThreadCall::Handle &) {
@@ -902,8 +941,8 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) {
std::string(listen_addr).c_str(),
std::string(*conn).c_str());
send_msg(MsgPong(listen_addr, my_nonce), conn);
- auto it = pid2peer.find(peer_id);
- if (it != pid2peer.end())
+ auto it = sid2peer.find(session_id);
+ if (it != sid2peer.end())
{
auto p = it->second.get();
if (p->connected)
@@ -928,8 +967,9 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) {
}
else
{
- it = pid2peer.insert(std::make_pair(peer_id,
- new PeerConn(peer_id, conn, conn, nullptr, this))).first;
+ it = sid2peer.insert(std::make_pair(session_id,
+ new PeerConn(session_id, get_peer_id(conn, msg.claimed_addr),
+ conn, conn, nullptr, this))).first;
}
auto p = it->second.get();
p->inbound_handshake = true;
@@ -955,16 +995,16 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) {
if (conn->is_terminated()) return;
if (!msg.claimed_addr.is_null())
{
- auto peer_id = gen_peer_id(conn, msg.claimed_addr, msg.nonce);
+ auto session_id = get_session_id(conn, msg.claimed_addr, msg.nonce);
if (conn->get_mode() == Conn::ConnMode::ACTIVE)
{
pinfo_ulock_t _g(known_peers_lock);
- pinfo_ulock_t __g(pid2peer_lock);
+ pinfo_ulock_t __g(sid2peer_lock);
SALTICIDAE_LOG_INFO("%s outbound handshake to %s",
std::string(listen_addr).c_str(),
std::string(*conn).c_str());
- auto it = pid2peer.find(peer_id);
- if (it != pid2peer.end())
+ auto it = sid2peer.find(session_id);
+ if (it != sid2peer.end())
{
auto p = it->second.get();
if (p->connected)
@@ -996,8 +1036,9 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) {
}
else
{
- it = pid2peer.insert(std::make_pair(peer_id,
- new PeerConn(peer_id, conn, nullptr, conn, this))).first;
+ it = sid2peer.insert(std::make_pair(session_id,
+ new PeerConn(session_id, get_peer_id(conn, msg.claimed_addr),
+ conn, nullptr, conn, this))).first;
}
auto p = it->second.get();
p->outbound_handshake = true;
@@ -1050,20 +1091,14 @@ void PeerNetwork<O, _, __>::listen(NetAddr _listen_addr) {
}
template<typename O, O _, O __>
-int32_t PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
+int32_t PeerNetwork<O, _, __>::add_peer(const PeerId &pid) {
auto id = this->gen_async_id();
- this->disp_tcall->async_call([this, addr, id](ThreadCall::Handle &) {
+ this->disp_tcall->async_call([this, pid, id](ThreadCall::Handle &) {
try {
pinfo_ulock_t _g(known_peers_lock);
- if (known_peers.count(addr))
+ if (known_peers.count(pid))
throw PeerNetworkError(SALTI_ERROR_PEER_ALREADY_EXISTS);
- auto it = pending_peers.find(addr);
- conn_t conn;
- if (it == pending_peers.end())
- conn = start_active_conn(addr);
- else
- conn = it->second;
- known_peers.insert(std::make_pair(addr, new Peer(conn)));
+ known_peers.insert(std::make_pair(pid, new Peer()));
} catch (const PeerNetworkError &) {
this->recoverable_error(std::current_exception(), id);
} catch (...) { this->disp_error_cb(std::current_exception()); }
@@ -1072,16 +1107,60 @@ int32_t PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
}
template<typename O, O _, O __>
-int32_t PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) {
+int32_t PeerNetwork<O, _, __>::try_conn_peer(const PeerId &pid, ssize_t ntry, double retry_delay) {
+ auto id = this->gen_async_id();
+ this->disp_tcall->async_call([this, pid, ntry, retry_delay, id](ThreadCall::Handle &) {
+ try {
+ pinfo_ulock_t _g(known_peers_lock);
+ auto it = known_peers.find(pid);
+ if (it == known_peers.end())
+ throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
+ auto &p = it->second;
+ if (p.addr.is_null())
+ throw PeerNetworkError(SALTI_ERROR_PEER_NOT_READY);
+ // FIXME: ???
+ if (p.peer_conn)
+ p.peer_conn->disp_terminate();
+ p.retry_delay = retry_delay;
+ p.peer_conn = ntry ? start_active_conn(p.addr) : conn_t();
+ } catch (const PeerNetworkError &) {
+ this->recoverable_error(std::current_exception(), id);
+ } catch (...) { this->disp_error_cb(std::current_exception()); }
+ });
+ return id;
+}
+
+template<typename O, O _, O __>
+int32_t PeerNetwork<O, _, __>::set_peer_addr(const PeerId &pid, const NetAddr &addr) {
+ auto id = this->gen_async_id();
+ this->disp_tcall->async_call([this, pid, addr, id](ThreadCall::Handle &) {
+ try {
+ pinfo_ulock_t _g(known_peers_lock);
+ auto it = known_peers.find(pid);
+ if (it == known_peers.end())
+ throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
+ auto it2 = pending_peers.find(addr);
+ if (it2 != pending_peers.end())
+ it->second.peer_conn = it2->second;
+ } catch (const PeerNetworkError &) {
+ this->recoverable_error(std::current_exception(), id);
+ } catch (...) { this->disp_error_cb(std::current_exception()); }
+ });
+ return id;
+}
+
+
+template<typename O, O _, O __>
+int32_t PeerNetwork<O, _, __>::del_peer(const PeerId &pid) {
auto id = this->gen_async_id();
- this->disp_tcall->async_call([this, addr, id](ThreadCall::Handle &) {
+ this->disp_tcall->async_call([this, pid, id](ThreadCall::Handle &) {
try {
pinfo_ulock_t _g(known_peers_lock);
- pinfo_ulock_t __g(pid2peer_lock);
- auto it = known_peers.find(addr);
+ pinfo_ulock_t __g(sid2peer_lock);
+ auto it = known_peers.find(pid);
if (it == known_peers.end())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- auto peer_id = it->second->peer_id;
+ auto addr = it->second->addr;
known_peers.erase(it);
auto it2 = pending_peers.find(addr);
if (it2 != pending_peers.end())
@@ -1090,12 +1169,12 @@ int32_t PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) {
this->disp_terminate(it2->second);
pending_peers.erase(it2);
}
- auto it3 = pid2peer.find(peer_id);
- if (it3 != pid2peer.end())
+ auto it3 = sid2peer.find(pid);
+ if (it3 != sid2peer.end())
{
auto &p = it3->second;
this->disp_terminate(p->conn);
- pid2peer.erase(it3);
+ sid2peer.erase(it3);
}
} catch (const PeerNetworkError &) {
this->recoverable_error(std::current_exception(), id);
@@ -1106,21 +1185,21 @@ int32_t PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) {
template<typename O, O _, O __>
typename PeerNetwork<O, _, __>::conn_t
-PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &addr) const {
+PeerNetwork<O, _, __>::get_peer_conn(const PeerId &pid) const {
auto ret = *(static_cast<conn_t *>(
this->disp_tcall->call([this, addr](ThreadCall::Handle &h) {
conn_t conn;
pinfo_slock_t _g(known_peers_lock);
- pinfo_slock_t __g(pid2peer_lock);
+ pinfo_slock_t __g(sid2peer_lock);
auto it = known_peers.find(addr);
if (it == known_peers.end())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- if (it->second->peer_id.is_null())
+ if (it->second->session_id.is_null())
conn = nullptr;
else
{
- auto it2 = pid2peer.find(it->second->peer_id);
- assert(it2 != pid2peer.end());
+ auto it2 = sid2peer.find(it->second->session_id);
+ assert(it2 != sid2peer.end());
conn = it2->second->conn;
}
h.set_result(std::move(conn));
@@ -1313,7 +1392,7 @@ void msgnetwork_config_max_msg_queue_size(msgnetwork_config_t *self, size_t size
void msgnetwork_config_burst_size(msgnetwork_config_t *self, size_t burst_size);
void msgnetwork_config_max_listen_backlog(msgnetwork_config_t *self, int backlog);
void msgnetwork_config_conn_server_timeout(msgnetwork_config_t *self, double timeout);
-void msgnetwork_config_seg_buff_size(msgnetwork_config_t *self, size_t size);
+void msgnetwork_config_recv_chunk_size(msgnetwork_config_t *self, size_t size);
void msgnetwork_config_nworker(msgnetwork_config_t *self, size_t nworker);
void msgnetwork_config_max_recv_buff_size(msgnetwork_config_t *self, size_t size);
void msgnetwork_config_max_send_buff_size(msgnetwork_config_t *self, size_t size);
@@ -1356,7 +1435,6 @@ bool msgnetwork_conn_is_terminated(const msgnetwork_conn_t *conn);
peernetwork_config_t *peernetwork_config_new();
void peernetwork_config_free(const peernetwork_config_t *self);
-void peernetwork_config_retry_conn_delay(peernetwork_config_t *self, double t);
void peernetwork_config_ping_period(peernetwork_config_t *self, double t);
void peernetwork_config_conn_timeout(peernetwork_config_t *self, double t);
void peernetwork_config_id_mode(peernetwork_config_t *self, peernetwork_id_mode_t mode);