aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <ted.sybil@gmail.com>2019-06-26 19:13:43 -0400
committerDeterminant <ted.sybil@gmail.com>2019-06-26 19:13:43 -0400
commit85552ce1b0bc997f58341f21ab8bbcf7d937ab4b (patch)
treef1f58dbfcb4f485d97781a011785e3e031fd499f
parent051fb69b7e1bd321e4c780e35f92ea93828b520f (diff)
change to new (more flexible) p2p design
-rw-r--r--include/salticidae/conn.h11
-rw-r--r--include/salticidae/network.h507
-rw-r--r--include/salticidae/stream.h7
-rw-r--r--include/salticidae/util.h1
-rw-r--r--src/util.cpp1
5 files changed, 306 insertions, 221 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index ff75e34..87966ac 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -144,8 +144,9 @@ class ConnPool {
EventContext ec;
EventContext disp_ec;
ThreadCall* disp_tcall;
- /* owned by user loop */
BoxObj<ThreadCall> user_tcall;
+ const bool enable_tls;
+ RcObj<const X509> tls_cert;
using worker_error_callback_t = std::function<void(const std::exception_ptr err)>;
worker_error_callback_t disp_error_cb;
@@ -178,7 +179,6 @@ class ConnPool {
const double conn_server_timeout;
const size_t seg_buff_size;
const size_t queue_capacity;
- const bool enable_tls;
tls_context_t tls_ctx;
conn_callback_t conn_cb;
@@ -422,11 +422,11 @@ class ConnPool {
ConnPool(const EventContext &ec, const Config &config):
ec(ec),
+ enable_tls(config._enable_tls),
max_listen_backlog(config._max_listen_backlog),
conn_server_timeout(config._conn_server_timeout),
seg_buff_size(config._seg_buff_size),
queue_capacity(config._queue_capacity),
- enable_tls(config._enable_tls),
tls_ctx(nullptr),
listen_fd(-1),
nworker(config._nworker),
@@ -435,9 +435,10 @@ class ConnPool {
{
tls_ctx = new TLSContext();
if (config._tls_cert)
- tls_ctx->use_cert(*config._tls_cert);
+ tls_cert = config._tls_cert;
else
- tls_ctx->use_cert_file(config._tls_cert_file);
+ tls_cert = new X509(X509::create_from_pem_file(config._tls_cert_file));
+ tls_ctx->use_cert(*tls_cert);
if (config._tls_key)
tls_ctx->use_privkey(*config._tls_key);
else
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 7f0964d..975084f 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -32,6 +32,8 @@
#include "salticidae/conn.h"
#ifdef __cplusplus
+#include <unordered_set>
+#include <openssl/rand.h>
namespace salticidae {
/** Network of nodes who can send async messages. */
template<typename OpcodeType>
@@ -157,9 +159,6 @@ class MsgNetwork: public ConnPool {
{
auto &msg = item.first;
auto &conn = item.second;
-#ifdef SALTICIDAE_CBINDINGS_INJECT_CALLBACK
- salticidae_injected_msg_callback(&msg, conn.get());
-#else
auto it = handler_map.find(msg.get_opcode());
if (it == handler_map.end())
SALTICIDAE_LOG_WARN("unknown opcode: %s",
@@ -175,7 +174,6 @@ class MsgNetwork: public ConnPool {
#endif
it->second(msg, conn);
}
-#endif
if (++cnt == burst_size) return true;
}
return false;
@@ -266,25 +264,29 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
using unknown_callback_t = std::function<void(const NetAddr &)>;
enum IdentityMode {
- IP_BASED,
- IP_PORT_BASED
+ ADDR_BASED,
+ CERT_BASED
};
+ private:
+ struct Peer;
+
+ public:
class Conn: public MsgNet::Conn {
friend PeerNetwork;
- NetAddr peer_id;
+ Peer *peer;
TimerEvent ev_timeout;
+ TimerEvent ev_retry_timer;
+
void reset_timeout(double timeout);
public:
- Conn() = default;
+ Conn(): MsgNet::Conn(), peer(nullptr) {}
PeerNetwork *get_net() {
return static_cast<PeerNetwork *>(ConnPool::Conn::get_pool());
}
- const NetAddr &get_peer() { return peer_id; }
-
protected:
void stop() override {
ev_timeout.del();
@@ -297,81 +299,99 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
private:
struct Peer {
/** connection addr, may be different due to passive mode */
- NetAddr addr;
+ uint256_t nonce;
+ uint256_t peer_id;
+ NetAddr peer_addr;
/** the underlying connection, may be invalid when connected = false */
conn_t conn;
+ conn_t inbound_conn;
+ conn_t outbound_conn;
+
TimerEvent ev_ping_timer;
- TimerEvent ev_retry_timer;
bool ping_timer_ok;
bool pong_msg_ok;
bool connected;
+ bool outbound_handshake;
+ bool inbound_handshake;
+ double ping_period;
Peer() = delete;
- Peer(NetAddr addr, conn_t conn, const EventContext &ec):
- addr(addr), conn(conn),
+ Peer(conn_t conn, conn_t inbound_conn, conn_t outbound_conn, const PeerNetwork *pn):
+ conn(conn),
+ inbound_conn(inbound_conn),
+ outbound_conn(outbound_conn),
ev_ping_timer(
- TimerEvent(ec, std::bind(&Peer::ping_timer, this, _1))),
- connected(false) {}
+ TimerEvent(pn->disp_ec, std::bind(&Peer::ping_timer, this, _1))),
+ connected(false),
+ outbound_handshake(false),
+ inbound_handshake(false),
+ ping_period(pn->ping_period) {}
~Peer() {}
Peer &operator=(const Peer &) = delete;
Peer(const Peer &) = delete;
- void ping_timer(TimerEvent &);
void reset_ping_timer();
void send_ping();
+ void ping_timer(TimerEvent &);
void clear_all_events() {
if (ev_ping_timer)
ev_ping_timer.del();
}
- void reset_conn(const conn_t &conn);
};
- std::unordered_map<NetAddr, BoxObj<Peer>> id2peer;
- std::unordered_map<NetAddr, BoxObj<Peer>> id2upeer;
+ std::unordered_map<NetAddr, conn_t> pending_peers;
+ std::unordered_map<NetAddr, uint256_t> known_peers;
+ std::unordered_map<uint256_t, BoxObj<Peer>> pid2peer;
unknown_callback_t unknown_peer_cb;
const IdentityMode id_mode;
double retry_conn_delay;
double ping_period;
double conn_timeout;
- uint16_t listen_port;
+ NetAddr listen_addr;
bool allow_unknown_peer;
+ uint256_t my_pname;
+ uint256_t my_nonce;
struct MsgPing {
static const OpcodeType opcode;
DataStream serialized;
- uint16_t port;
- MsgPing(uint16_t port) {
- serialized << htole(port);
+ uint256_t pname;
+ uint256_t nonce;
+ uint256_t peer_id;
+ MsgPing() { serialized << false; }
+ MsgPing(const uint256_t &_pname, const uint256_t &_nonce) {
+ serialized << true << _pname << _nonce;
}
MsgPing(DataStream &&s) {
- s >> port;
- port = letoh(port);
+ uint8_t flag;
+ s >> flag;
+ if (flag)
+ {
+ s >> pname >> nonce;
+ DataStream tmp;
+ tmp << pname << nonce;
+ peer_id = tmp.get_hash();
+ }
}
};
- struct MsgPong {
+ struct MsgPong: public MsgPing {
static const OpcodeType opcode;
- DataStream serialized;
- uint16_t port;
- MsgPong(uint16_t port) {
- serialized << htole(port);
- }
- MsgPong(DataStream &&s) {
- s >> port;
- port = letoh(port);
- }
+ MsgPong(): MsgPing() {}
+ MsgPong(const uint256_t &_pname, const uint256_t _nonce): MsgPing(_pname, _nonce) {}
+ MsgPong(DataStream &&s): MsgPing(std::move(s)) {}
};
void msg_ping(MsgPing &&msg, const conn_t &conn);
void msg_pong(MsgPong &&msg, const conn_t &conn);
void _ping_msg_cb(const conn_t &conn, uint16_t port);
void _pong_msg_cb(const conn_t &conn, uint16_t port);
- bool check_new_conn(const conn_t &conn, uint16_t port);
- void start_active_conn(const NetAddr &paddr);
+ bool check_handshake(Peer *peer);
+ void start_active_conn(const NetAddr &addr);
static void tcall_reset_timeout(ConnPool::Worker *worker,
const conn_t &conn, double timeout);
- Peer *get_peer(const NetAddr &id) const;
+ inline conn_t _get_peer_conn(const NetAddr &addr) const;
protected:
ConnPool::Conn *create_conn() override { return new Conn(); }
@@ -400,7 +420,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
_ping_period(30),
_conn_timeout(180),
_allow_unknown_peer(false),
- _id_mode(IP_PORT_BASED) {}
+ _id_mode(ADDR_BASED) {}
Config &retry_conn_delay(double x) {
@@ -442,20 +462,20 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
virtual ~PeerNetwork() { this->stop(); }
- void add_peer(const NetAddr &paddr);
- void del_peer(const NetAddr &paddr);
- bool has_peer(const NetAddr &paddr) const;
- const conn_t get_peer_conn(const NetAddr &paddr) const;
+ void add_peer(const NetAddr &addr);
+ void del_peer(const NetAddr &addr);
+ bool has_peer(const NetAddr &addr) const;
+ conn_t get_peer_conn(const NetAddr &addr) const;
using MsgNet::send_msg;
template<typename MsgType>
- inline void send_msg(const MsgType &msg, const NetAddr &paddr);
- inline void _send_msg(const Msg &msg, const NetAddr &paddr);
+ inline void send_msg(const MsgType &msg, const NetAddr &addr);
+ inline void _send_msg(const Msg &msg, const NetAddr &addr);
template<typename MsgType>
- inline void send_msg_deferred(MsgType &&msg, const NetAddr &paddr);
- inline void _send_msg_deferred(Msg &&msg, const NetAddr &paddr);
+ inline void send_msg_deferred(MsgType &&msg, const NetAddr &addr);
+ inline void _send_msg_deferred(Msg &&msg, const NetAddr &addr);
template<typename MsgType>
- void multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs);
- inline void _multicast_msg(Msg &&msg, const std::vector<NetAddr> &paddrs);
+ void multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs);
+ inline void _multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs);
void listen(NetAddr listen_addr);
conn_t connect(const NetAddr &addr) = delete;
@@ -555,6 +575,7 @@ void PeerNetwork<O, _, __>::on_setup(const ConnPool::conn_t &_conn) {
auto conn = static_pointer_cast<Conn>(_conn);
auto worker = conn->worker;
auto &ev_timeout = conn->ev_timeout;
+ conn->ev_retry_timer.del();
assert(!ev_timeout);
ev_timeout = TimerEvent(worker->get_ec(), [worker, conn](TimerEvent &) {
try {
@@ -564,54 +585,42 @@ void PeerNetwork<O, _, __>::on_setup(const ConnPool::conn_t &_conn) {
});
/* the initial ping-pong to set up the connection */
tcall_reset_timeout(worker, conn, conn_timeout);
- send_msg(MsgPing(listen_port), conn);
+ pending_peers[conn->get_addr()] = conn;
+ if (conn->get_mode() == Conn::ConnMode::ACTIVE)
+ send_msg(MsgPing(my_pname, my_nonce), conn);
}
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
MsgNet::on_teardown(_conn);
auto conn = static_pointer_cast<Conn>(_conn);
+ const auto &addr = conn->get_addr();
+ conn->ev_retry_timer.clear();
conn->ev_timeout.clear();
- const auto &peer_id = conn->peer_id;
- auto p = get_peer(peer_id);
- if (!p) return;
- if (conn != p->conn) return;
- p->ev_ping_timer.del();
- p->connected = false;
- //p->conn = nullptr;
- SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*conn).c_str());
- // try to reconnect
- p->ev_retry_timer = TimerEvent(this->disp_ec, [this, peer_id](TimerEvent &) {
- try {
- start_active_conn(peer_id);
- } catch (...) { this->disp_error_cb(std::current_exception()); }
- });
- p->ev_retry_timer.add(gen_conn_timeout());
-}
-
-template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Peer::reset_conn(const conn_t &new_conn) {
- if (conn != new_conn)
+ pending_peers.erase(addr);
+ auto p = conn->peer;
+ if (p)
{
- if (conn)
- {
- //SALTICIDAE_LOG_DEBUG("moving send buffer");
- //new_conn->move_send_buffer(conn);
- SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str());
- auto net = conn->get_net();
- net->disp_terminate(conn);
- }
- addr = new_conn->get_addr();
- conn = new_conn;
+ if (conn != p->conn) return;
+ p->ev_ping_timer.del();
+ p->connected = false;
+ known_peers[p->peer_addr] = uint256_t();
+ // try to reconnect
+ conn->ev_retry_timer = TimerEvent(this->disp_ec, [this, addr](TimerEvent &) {
+ try {
+ start_active_conn(addr);
+ } catch (...) { this->disp_error_cb(std::current_exception()); }
+ });
+ conn->ev_retry_timer.add(gen_conn_timeout());
}
- clear_all_events();
+ SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*conn).c_str());
}
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::Peer::reset_ping_timer() {
assert(ev_ping_timer);
ev_ping_timer.del();
- ev_ping_timer.add(gen_rand_timeout(conn->get_net()->ping_period));
+ ev_ping_timer.add(gen_rand_timeout(ping_period));
}
template<typename O, O _, O __>
@@ -620,7 +629,7 @@ void PeerNetwork<O, _, __>::Peer::send_ping() {
ping_timer_ok = false;
pong_msg_ok = false;
tcall_reset_timeout(conn->worker, conn, pn->conn_timeout);
- pn->send_msg(MsgPing(pn->listen_port), conn);
+ pn->send_msg(MsgPing(), conn);
}
template<typename O, O _, O __>
@@ -634,47 +643,20 @@ void PeerNetwork<O, _, __>::Peer::ping_timer(TimerEvent &) {
}
template<typename O, O _, O __>
-bool PeerNetwork<O, _, __>::check_new_conn(const conn_t &conn, uint16_t port) {
- if (conn->peer_id.is_null())
- { /* passive connections can eventually have ids after getting the port
- number in IP_BASED_PORT mode */
- conn->peer_id.ip = conn->get_addr().ip;
- conn->peer_id.port = id_mode == IP_BASED ? 0: port;
- }
- const auto &id = conn->peer_id;
- auto it = id2peer.find(id);
- if (it == id2peer.end())
- { /* found an unknown peer */
- const auto &addr = conn->get_addr();
- this->user_tcall->async_call([this, id](ThreadCall::Handle &) {
- if (unknown_peer_cb) unknown_peer_cb(id);
- });
- if (allow_unknown_peer)
- {
- auto it2 = id2upeer.find(id);
- if (it2 == id2upeer.end())
- it = id2upeer.insert(std::make_pair(id, new Peer(addr, nullptr, this->disp_ec))).first;
- }
- else
- {
- this->disp_terminate(conn);
- return true;
- }
- }
- auto p = it->second.get();
- if (p->connected)
- {
- if (conn != p->conn)
- {
- this->disp_terminate(conn);
- return true;
- }
+bool PeerNetwork<O, _, __>::check_handshake(Peer *p) {
+ if (!(p->inbound_handshake && p->outbound_handshake) ||
+ p->connected)
return false;
- }
- p->reset_conn(conn);
+ p->clear_all_events();
+ if (p->inbound_conn && p->inbound_conn != p->conn)
+ p->inbound_conn->peer = nullptr;
+ if (p->outbound_conn && p->outbound_conn != p->conn)
+ p->outbound_conn->peer = nullptr;
+ p->conn->peer = p;
p->connected = true;
p->reset_ping_timer();
p->send_ping();
+ known_peers[p->peer_addr] = p->peer_id;
if (p->connected)
{
auto color_begin = "";
@@ -684,82 +666,182 @@ bool PeerNetwork<O, _, __>::check_new_conn(const conn_t &conn, uint16_t port) {
color_begin = TTY_COLOR_BLUE;
color_end = TTY_COLOR_RESET;
}
- SALTICIDAE_LOG_INFO("%sPeerNetwork: established connection with %s via %s%s",
+ SALTICIDAE_LOG_INFO("%sPeerNetwork: established connection with %s <-> %s via %s",
color_begin,
- std::string(conn->peer_id).c_str(), std::string(*conn).c_str(),
+ std::string(listen_addr).c_str(),
+ std::string(p->peer_addr).c_str(),
+ std::string(*(p->conn)).c_str(),
color_end);
}
- return false;
+ return true;
}
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) {
- auto p = get_peer(addr);
- if (p->connected) return;
auto conn = static_pointer_cast<Conn>(MsgNet::_connect(addr));
- //assert(p->conn == nullptr);
- p->conn = conn;
- conn->peer_id = addr;
- if (id_mode == IP_BASED)
- conn->peer_id.port = 0;
+ pending_peers[addr] = conn;
}
template<typename O, O _, O __>
-typename PeerNetwork<O, _, __>::Peer *PeerNetwork<O, _, __>::get_peer(const NetAddr &addr) const {
- auto it = id2peer.find(addr);
- if (it != id2peer.end()) return it->second.get();
- it = id2upeer.find(addr);
- if (it != id2upeer.end()) return it->second.get();
- return nullptr;
+inline typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::_get_peer_conn(const NetAddr &addr) const {
+ auto it = pending_peers.find(addr);
+ if (it == pending_peers.end())
+ throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
+ return it->second;
}
/* end: functions invoked by the dispatcher */
/* begin: functions invoked by the user loop */
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, const conn_t &conn) {
- uint16_t port = msg.port;
- this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &) {
+ this->disp_tcall->async_call([this, conn, msg=std::move(msg)](ThreadCall::Handle &) {
try {
- if (conn->get_mode() == ConnPool::Conn::DEAD) return;
- SALTICIDAE_LOG_INFO("ping from %s, port %u",
- std::string(*conn).c_str(), ntohs(port));
- if (check_new_conn(conn, port)) return;
- send_msg(MsgPong(this->listen_port), conn);
+ auto conn_mode = conn->get_mode();
+ if (conn_mode == ConnPool::Conn::DEAD) return;
+ if (!msg.peer_id.is_null())
+ {
+ if (conn_mode == Conn::ConnMode::PASSIVE)
+ {
+ send_msg(MsgPong(my_pname, my_nonce), conn);
+ SALTICIDAE_LOG_INFO("%s inbound handshake from %s",
+ std::string(listen_addr).c_str(),
+ std::string(*conn).c_str());
+ auto it = pid2peer.find(msg.peer_id);
+ if (it != pid2peer.end())
+ {
+ if (msg.nonce < my_nonce)
+ {
+ auto p = it->second.get();
+ auto &old_conn = p->inbound_conn;
+ if (old_conn && old_conn->get_mode() != Conn::ConnMode::DEAD)
+ {
+ SALTICIDAE_LOG_INFO("%s terminating old connection %s",
+ std::string(listen_addr).c_str(),
+ std::string(*old_conn).c_str());
+ old_conn->peer = nullptr;
+ old_conn->get_net()->disp_terminate(old_conn);
+ }
+ old_conn = conn;
+ p->conn = conn;
+ }
+ }
+ else
+ {
+ it = pid2peer.insert(std::make_pair(
+ msg.peer_id,
+ new Peer(conn, conn, nullptr, this))).first;
+ }
+ auto p = it->second.get();
+ p->inbound_handshake = true;
+ check_handshake(p);
+ }
+ else
+ SALTICIDAE_LOG_WARN("unexpected inbound handshake from %s",
+ std::string(*conn).c_str());
+ }
+ else
+ {
+ SALTICIDAE_LOG_INFO("ping from %s", std::string(*conn).c_str());
+ send_msg(MsgPong(), conn);
+ }
} catch (...) { this->disp_error_cb(std::current_exception()); }
});
}
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::msg_pong(MsgPong &&msg, const conn_t &conn) {
- uint16_t port = msg.port;
- this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &) {
+ this->disp_tcall->async_call([this, conn, msg=std::move(msg)](ThreadCall::Handle &) {
try {
- if (conn->get_mode() == ConnPool::Conn::DEAD) return;
- auto p = get_peer(conn->peer_id);
- if (!p)
+ auto conn_mode = conn->get_mode();
+ if (conn_mode == ConnPool::Conn::DEAD) return;
+ if (!msg.peer_id.is_null())
{
- SALTICIDAE_LOG_WARN("pong message discarded");
- return;
+ if (conn_mode == Conn::ConnMode::ACTIVE)
+ {
+ SALTICIDAE_LOG_INFO("%s outbound handshake to %s",
+ std::string(listen_addr).c_str(),
+ std::string(*conn).c_str());
+ auto it = pid2peer.find(msg.peer_id);
+ if (it != pid2peer.end())
+ {
+ if (my_nonce < msg.nonce)
+ {
+ auto p = it->second.get();
+ auto &old_conn = p->outbound_conn;
+ if (old_conn && old_conn->get_mode() != Conn::ConnMode::DEAD)
+ {
+ SALTICIDAE_LOG_INFO("%s terminating old connection %s",
+ std::string(listen_addr).c_str(),
+ std::string(*old_conn).c_str());
+ old_conn->peer = nullptr;
+ old_conn->get_net()->disp_terminate(old_conn);
+ }
+ old_conn = conn;
+ p->conn = conn;
+ }
+ else
+ {
+ SALTICIDAE_LOG_INFO("%s terminating low connection %s",
+ std::string(listen_addr).c_str(),
+ std::string(*conn).c_str());
+ conn->get_net()->disp_terminate(conn);
+ }
+ }
+ else
+ {
+ it = pid2peer.insert(std::make_pair(
+ msg.peer_id,
+ new Peer(conn, nullptr, conn, this))).first;
+ }
+ auto p = it->second.get();
+ p->outbound_handshake = true;
+ p->peer_addr = conn->get_addr();
+ p->reset_ping_timer();
+ check_handshake(p);
+ }
+ else
+ SALTICIDAE_LOG_WARN("unexpected outbound handshake from %s",
+ std::string(*conn).c_str());
}
- if (check_new_conn(conn, port)) return;
- p->pong_msg_ok = true;
- if (p->ping_timer_ok)
+ else
{
- p->reset_ping_timer();
- p->send_ping();
+ auto p = conn->peer;
+ if (!p) return;
+ p->pong_msg_ok = true;
+ if (p->ping_timer_ok)
+ {
+ p->reset_ping_timer();
+ p->send_ping();
+ }
}
} catch (...) { this->disp_error_cb(std::current_exception()); }
});
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::listen(NetAddr listen_addr) {
+void PeerNetwork<O, _, __>::listen(NetAddr _listen_addr) {
auto ret = *(static_cast<std::exception_ptr *>(
- this->disp_tcall->call([this, listen_addr](ThreadCall::Handle &h) {
+ this->disp_tcall->call([this, _listen_addr](ThreadCall::Handle &h) {
std::exception_ptr err = nullptr;
try {
- MsgNet::_listen(listen_addr);
- listen_port = listen_addr.port;
+ MsgNet::_listen(_listen_addr);
+ listen_addr = _listen_addr;
+ DataStream pid;
+ if (id_mode == CERT_BASED)
+ {
+ if (!this->enable_tls)
+ throw PeerNetworkError(SALTI_ERROR_TLS_LOAD_CERT);
+ pid << this->tls_cert->get_der();
+ }
+ else
+ {
+ pid << listen_addr;
+ }
+ my_pname = pid.get_hash();
+ uint8_t rand_bytes[32];
+ if (!RAND_bytes(rand_bytes, 32))
+ throw PeerNetworkError(SALTI_ERROR_RAND_SOURCE);
+ my_nonce.load(rand_bytes);
} catch (...) {
err = std::current_exception();
}
@@ -772,19 +854,10 @@ template<typename O, O _, O __>
void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) {
try {
- auto it = id2peer.find(addr);
- if (it != id2peer.end())
+ if (!known_peers.insert(std::make_pair(addr, uint256_t())).second)
throw PeerNetworkError(SALTI_ERROR_PEER_ALREADY_EXISTS);
- auto it2 = id2upeer.find(addr);
- if (it2 != id2upeer.end())
- { /* move to the known peer set */
- auto p = std::move(it2->second);
- id2upeer.erase(it2);
- id2peer.insert(std::make_pair(addr, std::move(p)));
- }
- else
- id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->disp_ec)));
- start_active_conn(addr);
+ if (!pending_peers.count(addr))
+ start_active_conn(addr);
} catch (const PeerNetworkError &) {
this->recoverable_error(std::current_exception());
} catch (...) { this->disp_error_cb(std::current_exception()); }
@@ -795,11 +868,14 @@ template<typename O, O _, O __>
void PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) {
this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) {
try {
- auto it = id2peer.find(addr);
- if (it == id2peer.end())
+ if (!known_peers.erase(addr))
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- this->disp_terminate(it->second->conn);
- id2peer.erase(it);
+ auto it = pending_peers.find(addr);
+ assert(it != pending_peers.end());
+ auto conn = it->second;
+ auto p = conn->peer;
+ if (p) pid2peer.erase(p->peer_id);
+ this->disp_terminate(conn);
} catch (const PeerNetworkError &) {
this->recoverable_error(std::current_exception());
} catch (...) { this->disp_error_cb(std::current_exception()); }
@@ -807,17 +883,24 @@ void PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) {
}
template<typename O, O _, O __>
-const typename PeerNetwork<O, _, __>::conn_t
-PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &paddr) const {
+typename PeerNetwork<O, _, __>::conn_t
+PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &addr) const {
auto ret = *(static_cast<std::pair<conn_t, std::exception_ptr> *>(
- this->disp_tcall->call([this, paddr](ThreadCall::Handle &h) {
+ this->disp_tcall->call([this, addr](ThreadCall::Handle &h) {
conn_t conn;
std::exception_ptr err = nullptr;
try {
- auto p = get_peer(paddr);
- if (!p)
+ auto it = known_peers.find(addr);
+ if (it == known_peers.end())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- conn = p->conn;
+ if (it->second.is_null())
+ {
+ conn = nullptr;
+ return;
+ }
+ auto it2 = pid2peer.find(it->second);
+ assert(it2 != pid2peer.end());
+ conn = it2->second->conn;
} catch (const PeerNetworkError &) {
this->recoverable_error(std::current_exception());
} catch (...) {
@@ -830,60 +913,53 @@ PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &paddr) const {
}
template<typename O, O _, O __>
-bool PeerNetwork<O, _, __>::has_peer(const NetAddr &paddr) const {
+bool PeerNetwork<O, _, __>::has_peer(const NetAddr &addr) const {
return *(static_cast<bool *>(this->disp_tcall->call(
- [this, paddr](ThreadCall::Handle &h) {
- h.set_result(id2peer.count(paddr));
+ [this, addr](ThreadCall::Handle &h) {
+ h.set_result(known_peers.count(addr));
}).get()));
}
template<typename O, O _, O __>
template<typename MsgType>
-inline void PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const NetAddr &paddr) {
- return _send_msg_deferred(std::move(msg), paddr);
+inline void PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) {
+ return _send_msg_deferred(std::move(msg), addr);
}
template<typename O, O _, O __>
-inline void PeerNetwork<O, _, __>::_send_msg_deferred(Msg &&msg, const NetAddr &paddr) {
+inline void PeerNetwork<O, _, __>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) {
this->disp_tcall->async_call(
- [this, msg=std::move(msg), paddr](ThreadCall::Handle &) {
+ [this, msg=std::move(msg), addr](ThreadCall::Handle &) {
try {
- _send_msg(msg, paddr);
+ _send_msg(msg, addr);
} catch (...) { this->recoverable_error(std::current_exception()); }
});
}
template<typename O, O _, O __>
template<typename MsgType>
-inline void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &paddr) {
- return _send_msg(msg, paddr);
+inline void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &addr) {
+ return _send_msg(msg, addr);
}
template<typename O, O _, O __>
-inline void PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const NetAddr &paddr) {
- auto p = get_peer(paddr);
- if (!p) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- MsgNet::_send_msg(msg, p->conn);
+inline void PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const NetAddr &addr) {
+ MsgNet::_send_msg(msg, _get_peer_conn(addr));
}
template<typename O, O _, O __>
template<typename MsgType>
-inline void PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs) {
- return _multicast_msg(MsgType(std::move(msg)), paddrs);
+inline void PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs) {
+ return _multicast_msg(MsgType(std::move(msg)), addrs);
}
template<typename O, O _, O __>
-inline void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &paddrs) {
+inline void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs) {
this->disp_tcall->async_call(
- [this, msg=std::move(msg), paddrs](ThreadCall::Handle &) {
+ [this, msg=std::move(msg), addrs](ThreadCall::Handle &) {
try {
- for (auto &addr: paddrs)
- {
- auto p = get_peer(addr);
- if (!p)
- throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- MsgNet::_send_msg(msg, p->conn);
- }
+ for (auto &addr: addrs)
+ MsgNet::_send_msg(msg, _get_peer_conn(addr));
} catch (const PeerNetworkError &) {
this->recoverable_error(std::current_exception());
} catch (...) { this->recoverable_error(std::current_exception()); }
@@ -987,8 +1063,7 @@ typedef enum msgnetwork_conn_mode_t {
} msgnetwork_conn_mode_t;
typedef enum peernetwork_id_mode_t {
- ID_MODE_IP_BASED,
- ID_MODE_IP_PORT_BASED
+ ID_MODE_ADDR_BASED
} peernetwork_id_mode_t;
#ifdef __cplusplus
@@ -1052,19 +1127,19 @@ msgnetwork_config_t *peernetwork_config_as_msgnetwork_config(peernetwork_config_
peernetwork_t *peernetwork_new(const eventcontext_t *ec, const peernetwork_config_t *config, SalticidaeCError *err);
void peernetwork_free(const peernetwork_t *self);
-void peernetwork_add_peer(peernetwork_t *self, const netaddr_t *paddr);
-void peernetwork_del_peer(peernetwork_t *self, const netaddr_t *paddr);
-bool peernetwork_has_peer(const peernetwork_t *self, const netaddr_t *paddr);
-const peernetwork_conn_t *peernetwork_get_peer_conn(const peernetwork_t *self, const netaddr_t *paddr, SalticidaeCError *cerror);
+void peernetwork_add_peer(peernetwork_t *self, const netaddr_t *addr);
+void peernetwork_del_peer(peernetwork_t *self, const netaddr_t *addr);
+bool peernetwork_has_peer(const peernetwork_t *self, const netaddr_t *addr);
+const peernetwork_conn_t *peernetwork_get_peer_conn(const peernetwork_t *self, const netaddr_t *addr, SalticidaeCError *cerror);
msgnetwork_t *peernetwork_as_msgnetwork(peernetwork_t *self);
peernetwork_t *msgnetwork_as_peernetwork_unsafe(msgnetwork_t *self);
msgnetwork_conn_t *msgnetwork_conn_new_from_peernetwork_conn(const peernetwork_conn_t *conn);
peernetwork_conn_t *peernetwork_conn_new_from_msgnetwork_conn_unsafe(const msgnetwork_conn_t *conn);
peernetwork_conn_t *peernetwork_conn_copy(const peernetwork_conn_t *self);
void peernetwork_conn_free(const peernetwork_conn_t *self);
-void peernetwork_send_msg(peernetwork_t *self, const msg_t * msg, const netaddr_t *paddr);
-void peernetwork_send_msg_deferred_by_move(peernetwork_t *self, msg_t * _moved_msg, const netaddr_t *paddr);
-void peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const netaddr_array_t *paddrs);
+void peernetwork_send_msg(peernetwork_t *self, const msg_t * msg, const netaddr_t *addr);
+void peernetwork_send_msg_deferred_by_move(peernetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr);
+void peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const netaddr_array_t *addrs);
void peernetwork_listen(peernetwork_t *self, const netaddr_t *listen_addr, SalticidaeCError *err);
typedef void