aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/network.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r--include/salticidae/network.h217
1 files changed, 132 insertions, 85 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 37ecc75..eff5369 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -129,6 +129,7 @@ class MsgNetwork: public ConnPool {
queue_t incoming_msgs;
protected:
+ const uint32_t msg_magic;
ConnPool::Conn *create_conn() override { return new Conn(); }
void on_read(const ConnPool::conn_t &) override;
@@ -159,6 +160,7 @@ class MsgNetwork: public ConnPool {
size_t _max_msg_size;
size_t _max_msg_queue_size;
size_t _burst_size;
+ uint32_t _msg_magic;
public:
Config(): Config(ConnPool::Config()) {}
@@ -166,7 +168,8 @@ class MsgNetwork: public ConnPool {
ConnPool::Config(config),
_max_msg_size(1024),
_max_msg_queue_size(65536),
- _burst_size(1000) {}
+ _burst_size(1000),
+ _msg_magic(0x0) {}
Config &max_msg_size(size_t x) {
_max_msg_size = x;
@@ -182,6 +185,10 @@ class MsgNetwork: public ConnPool {
_burst_size = x;
return *this;
}
+
+ Config &msg_magic(uint32_t x) {
+ _msg_magic = x;
+ }
};
virtual ~MsgNetwork() { stop(); }
@@ -189,7 +196,8 @@ class MsgNetwork: public ConnPool {
MsgNetwork(const EventContext &ec, const Config &config):
ConnPool(ec, config),
max_msg_size(config._max_msg_size),
- max_msg_queue_size(config._max_msg_queue_size) {
+ max_msg_queue_size(config._max_msg_queue_size),
+ msg_magic(config._msg_magic) {
incoming_msgs.set_capacity(max_msg_queue_size);
incoming_msgs.reg_handler(ec, [this, burst_size=config._burst_size](queue_t &q) {
std::pair<Msg, conn_t> item;
@@ -293,6 +301,37 @@ class ClientNetwork: public MsgNetwork<OpcodeType> {
inline int32_t _send_msg_deferred(Msg &&msg, const NetAddr &addr);
};
+struct PeerId: public uint256_t {
+ using uint256_t::uint256_t;
+ PeerId(const NetAddr &addr) {
+ *(static_cast<uint256_t *>(this)) = salticidae::get_hash(addr);
+ }
+
+ PeerId(const X509 &cert) {
+ *(static_cast<uint256_t *>(this)) = salticidae::get_hash(cert.get_der());
+ }
+};
+
+}
+
+namespace std {
+ template <>
+ struct hash<salticidae::PeerId> {
+ size_t operator()(const salticidae::PeerId &p) const {
+ return hash<salticidae::uint256_t>()(p);
+ }
+ };
+
+ template <>
+ struct hash<const salticidae::PeerId> {
+ size_t operator()(const salticidae::PeerId &p) const {
+ return hash<salticidae::uint256_t>()(p);
+ }
+ };
+}
+
+namespace salticidae {
+
/** Peer-to-peer network where any two nodes could hold a bi-diretional message
* channel, established by either side. */
template<typename OpcodeType = uint8_t,
@@ -344,20 +383,6 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
using peer_callback_t = std::function<void(const conn_t &peer_conn, bool connected)>;
using unknown_peer_callback_t = std::function<void(const NetAddr &claimed_addr, const X509 *cert)>;
- struct PeerId: public uint256_t {
- PeerId(const NetAddr &addr) {
- DataStream tmp;
- tmp << addr;
- *(static_cast<uint256_t *>(this)) = tmp.get_hash();
- }
-
- PeerId(const X509 &cert) {
- DataStream tmp;
- tmp << cert.get_der();
- *(static_cast<uint256_t *>(this)) = tmp.get_hash();
- }
- };
-
private:
struct Peer {
@@ -425,6 +450,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
NetAddr listen_addr;
bool allow_unknown_peer;
uint256_t my_nonce;
+ uint256_t passive_nonce;
struct MsgPing {
static const OpcodeType opcode;
@@ -460,7 +486,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
conn_t start_active_conn(const NetAddr &addr);
static void tcall_reset_timeout(ConnPool::Worker *worker,
const conn_t &conn, double timeout);
- inline conn_t _get_peer_conn(const NetAddr &addr) const;
+ inline conn_t _get_peer_conn(const PeerId &peer) const;
protected:
ConnPool::Conn *create_conn() override { return new Conn(); }
@@ -522,6 +548,9 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
ping_period(config._ping_period),
conn_timeout(config._conn_timeout),
allow_unknown_peer(config._allow_unknown_peer) {
+ uint64_t ones[4];
+ memset(ones, 0xff, 32);
+ passive_nonce.load((uint8_t *)ones);
this->reg_handler(generic_bind(&PeerNetwork::ping_handler, this, _1, _2));
this->reg_handler(generic_bind(&PeerNetwork::pong_handler, this, _1, _2));
}
@@ -529,26 +558,26 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
virtual ~PeerNetwork() { this->stop(); }
/* register a peer as known */
- int32_t add_peer(const PeerId &pid);
+ int32_t add_peer(const PeerId &peer);
/* set the peer's public IP */
- int32_t set_peer_addr(const PeerId &pid, const NetAddr &addr);
+ int32_t set_peer_addr(const PeerId &peer, const NetAddr &addr);
/* try to connect to the peer: once (ntry = 1), indefinitely (ntry = -1), give up retry (ntry = 0) */
- int32_t try_conn_peer(const PeerId &pid, ssize_t ntry = -1, double retry_delay = 2);
+ int32_t conn_peer(const PeerId &peer, ssize_t ntry = -1, double retry_delay = 2);
/* unregister the peer */
- int32_t del_peer(const PeerId &pid);
- bool has_peer(const PeerId &pid) const;
+ int32_t del_peer(const PeerId &peer);
+ bool has_peer(const PeerId &peer) const;
size_t get_npending() const;
conn_t get_peer_conn(const PeerId &addr) const;
using MsgNet::send_msg;
template<typename MsgType>
- inline bool send_msg(const MsgType &msg, const NetAddr &addr);
- inline bool _send_msg(const Msg &msg, const NetAddr &addr);
+ inline bool send_msg(const MsgType &msg, const PeerId &peer);
+ inline bool _send_msg(const Msg &msg, const PeerId &peer);
template<typename MsgType>
- inline int32_t send_msg_deferred(MsgType &&msg, const NetAddr &addr);
- inline int32_t _send_msg_deferred(Msg &&msg, const NetAddr &addr);
+ inline int32_t send_msg_deferred(MsgType &&msg, const PeerId &peer);
+ inline int32_t _send_msg_deferred(Msg &&msg, const PeerId &peer);
template<typename MsgType>
- inline int32_t multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs);
- inline int32_t _multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs);
+ inline int32_t multicast_msg(MsgType &&msg, const std::vector<PeerId> &peers);
+ inline int32_t _multicast_msg(Msg &&msg, const std::vector<PeerId> &peers);
void listen(NetAddr listen_addr);
conn_t connect(const NetAddr &addr) = delete;
@@ -637,7 +666,7 @@ inline int32_t MsgNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const conn_
template<typename OpcodeType>
template<typename MsgType>
inline bool MsgNetwork<OpcodeType>::send_msg(const MsgType &msg, const conn_t &conn) {
- return _send_msg(msg, conn);
+ return _send_msg(Msg(msg, msg_magic), conn);
}
template<typename OpcodeType>
@@ -700,11 +729,11 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
auto p = conn->peer;
if (!p) return;
assert(conn == p->conn);
+ p->connected = false;
+ p->conn = nullptr;
p->inbound_conn = nullptr;
p->outbound_conn = nullptr;
p->ev_ping_timer.del();
- p->connected = false;
- p->conn = nullptr;
this->user_tcall->async_call([this, conn](ThreadCall::Handle &) {
if (peer_cb) peer_cb(conn, false);
});
@@ -725,18 +754,21 @@ template<typename O, O _, O __>
void PeerNetwork<O, _, __>::Peer::update_conn(const conn_t &new_conn) {
if (conn != new_conn)
{
- conn->peer = nullptr;
- if (conn->is_terminated())
+ if (conn)
{
- for (;;)
+ conn->peer = nullptr;
+ if (conn->is_terminated())
{
- bytearray_t buff_seg = conn->send_buffer.move_pop();
- if (!buff_seg.size()) break;
- new_conn->write(std::move(buff_seg));
+ for (;;)
+ {
+ bytearray_t buff_seg = conn->send_buffer.move_pop();
+ if (!buff_seg.size()) break;
+ new_conn->write(std::move(buff_seg));
+ }
}
+ else
+ conn->get_net()->disp_terminate(conn);
}
- else
- this->disp_terminate(conn);
conn = new_conn;
}
}
@@ -791,10 +823,10 @@ void PeerNetwork<O, _, __>::finish_handshake(Peer *p) {
color_begin = TTY_COLOR_BLUE;
color_end = TTY_COLOR_RESET;
}
- SALTICIDAE_LOG_INFO("%sPeerNetwork: established connection %s <-> %s via %s",
+ SALTICIDAE_LOG_INFO("%sPeerNetwork: established connection %s <-> %s(%s)",
color_begin,
std::string(listen_addr).c_str(),
- std::string(p->addr).c_str(),
+ get_hex10(p->id).c_str(),
std::string(*(p->conn)).c_str(),
color_end);
}
@@ -823,11 +855,11 @@ typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::start_active_conn(
}
template<typename O, O _, O __>
-inline typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::_get_peer_conn(const NetAddr &addr) const {
- auto it = known_peers.find(addr);
+inline typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::_get_peer_conn(const PeerId &pid) const {
+ auto it = known_peers.find(pid);
if (it == known_peers.end())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- return it->second->peer_conn;
+ return it->second->conn;
}
/* end: functions invoked by the dispatcher */
@@ -843,7 +875,7 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) {
if (conn->get_mode() == Conn::ConnMode::PASSIVE)
{
pinfo_slock_t _g(known_peers_lock);
- auto pit = known_peers.find(msg.claimed_addr);
+ auto pit = known_peers.find(pid);
if (pit == known_peers.end())
{
this->user_tcall->async_call([this, addr=msg.claimed_addr, conn](ThreadCall::Handle &) {
@@ -856,7 +888,7 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) {
SALTICIDAE_LOG_INFO("%s inbound handshake from %s",
std::string(listen_addr).c_str(),
std::string(*conn).c_str());
- send_msg(MsgPong(listen_addr, my_nonce), conn);
+ send_msg(MsgPong(listen_addr, p->addr.is_null() ? passive_nonce : my_nonce), conn);
if (p->connected)
{
//conn->get_net()->disp_terminate(conn);
@@ -879,7 +911,7 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) {
this->disp_terminate(conn);
return;
}
- finish_handshake(p);
+ finish_handshake(p.get());
}
else
SALTICIDAE_LOG_WARN("unexpected inbound handshake from %s",
@@ -905,17 +937,16 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) {
if (conn->get_mode() == Conn::ConnMode::ACTIVE)
{
pinfo_ulock_t _g(known_peers_lock);
- SALTICIDAE_LOG_INFO("%s outbound handshake to %s",
- std::string(listen_addr).c_str(),
- std::string(*conn).c_str());
auto pit = known_peers.find(pid);
- assert(pit != known_peers.end());
- auto &p = pit->second;
- if (p->connected)
+ if (pit == known_peers.end())
{
- conn->get_net()->disp_terminate(conn);
+ this->disp_terminate(conn);
return;
}
+ SALTICIDAE_LOG_INFO("%s outbound handshake to %s",
+ std::string(listen_addr).c_str(),
+ std::string(*conn).c_str());
+ auto &p = pit->second;
auto &old_conn = p->outbound_conn;
if (old_conn && !old_conn->is_terminated())
{
@@ -940,7 +971,7 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) {
SALTICIDAE_LOG_WARN("multiple peer addresses share the same identity");
old_peer_addr = peer_addr;
p->reset_ping_timer();
- finish_handshake(p);
+ finish_handshake(p.get());
}
else
SALTICIDAE_LOG_WARN("unexpected outbound handshake from %s",
@@ -994,7 +1025,7 @@ int32_t PeerNetwork<O, _, __>::add_peer(const PeerId &pid) {
}
template<typename O, O _, O __>
-int32_t PeerNetwork<O, _, __>::try_conn_peer(const PeerId &pid, ssize_t ntry, double retry_delay) {
+int32_t PeerNetwork<O, _, __>::conn_peer(const PeerId &pid, ssize_t ntry, double retry_delay) {
auto id = this->gen_async_id();
this->disp_tcall->async_call([this, pid, ntry, retry_delay, id](ThreadCall::Handle &) {
try {
@@ -1003,13 +1034,15 @@ int32_t PeerNetwork<O, _, __>::try_conn_peer(const PeerId &pid, ssize_t ntry, do
if (it == known_peers.end())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
auto &p = it->second;
- if (p.addr.is_null())
+ if (p->addr.is_null())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_READY);
- // FIXME: ???
- if (p.peer_conn)
- p.peer_conn->disp_terminate();
- p.retry_delay = retry_delay;
- p.peer_conn = ntry ? start_active_conn(p.addr) : conn_t();
+ p->ntry = ntry;
+ p->retry_delay = retry_delay;
+ p->connected = false;
+ p->inbound_conn = nullptr;
+ p->outbound_conn = nullptr;
+ p->ev_ping_timer.del();
+ p->update_conn(ntry ? start_active_conn(p->addr) : conn_t());
} catch (const PeerNetworkError &) {
this->recoverable_error(std::current_exception(), id);
} catch (...) { this->disp_error_cb(std::current_exception()); }
@@ -1026,7 +1059,7 @@ int32_t PeerNetwork<O, _, __>::set_peer_addr(const PeerId &pid, const NetAddr &a
auto it = known_peers.find(pid);
if (it == known_peers.end())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- it->addr = addr;
+ it->second->addr = addr;
} catch (const PeerNetworkError &) {
this->recoverable_error(std::current_exception(), id);
} catch (...) { this->disp_error_cb(std::current_exception()); }
@@ -1094,17 +1127,17 @@ size_t PeerNetwork<O, _, __>::get_npending() const {
template<typename O, O _, O __>
template<typename MsgType>
-inline int32_t PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) {
- return _send_msg_deferred(std::move(msg), addr);
+inline int32_t PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const PeerId &pid) {
+ return _send_msg_deferred(std::move(msg), pid);
}
template<typename O, O _, O __>
-inline int32_t PeerNetwork<O, _, __>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) {
+inline int32_t PeerNetwork<O, _, __>::_send_msg_deferred(Msg &&msg, const PeerId &pid) {
auto id = this->gen_async_id();
this->disp_tcall->async_call(
- [this, msg=std::move(msg), addr, id](ThreadCall::Handle &) {
+ [this, msg=std::move(msg), pid, id](ThreadCall::Handle &) {
try {
- if (!_send_msg(msg, addr))
+ if (!_send_msg(msg, pid))
throw PeerNetworkError(SALTI_ERROR_CONN_NOT_READY);
} catch (...) { this->recoverable_error(std::current_exception(), id); }
});
@@ -1113,31 +1146,31 @@ inline int32_t PeerNetwork<O, _, __>::_send_msg_deferred(Msg &&msg, const NetAdd
template<typename O, O _, O __>
template<typename MsgType>
-inline bool PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &addr) {
- return _send_msg(msg, addr);
+inline bool PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const PeerId &pid) {
+ return _send_msg(Msg(msg, this->msg_magic), pid);
}
template<typename O, O _, O __>
-inline bool PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const NetAddr &addr) {
+inline bool PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const PeerId &pid) {
pinfo_slock_t _g(known_peers_lock);
- return MsgNet::_send_msg(msg, _get_peer_conn(addr));
+ return MsgNet::_send_msg(msg, _get_peer_conn(pid));
}
template<typename O, O _, O __>
template<typename MsgType>
-inline int32_t PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs) {
- return _multicast_msg(MsgType(std::move(msg)), addrs);
+inline int32_t PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<PeerId> &pids) {
+ return _multicast_msg(MsgType(std::move(msg), this->msg_magic), pids);
}
template<typename O, O _, O __>
-inline int32_t PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs) {
+inline int32_t PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<PeerId> &pids) {
auto id = this->gen_async_id();
this->disp_tcall->async_call(
- [this, msg=std::move(msg), addrs, id](ThreadCall::Handle &) {
+ [this, msg=std::move(msg), pids, id](ThreadCall::Handle &) {
try {
bool succ = true;
- for (auto &addr: addrs)
- succ &= MsgNet::_send_msg(msg, _get_peer_conn(addr));
+ for (auto &pid: pids)
+ succ &= MsgNet::_send_msg(msg, _get_peer_conn(pid));
if (!succ) throw PeerNetworkError(SALTI_ERROR_CONN_NOT_READY);
} catch (...) { this->recoverable_error(std::current_exception(), id); }
});
@@ -1205,6 +1238,9 @@ const O PeerNetwork<O, _, OPCODE_PONG>::MsgPong::opcode = OPCODE_PONG;
}
#ifdef SALTICIDAE_CBINDINGS
+using peerid_t = salticidae::PeerId;
+using peerid_array_t = std::vector<peerid_t>;
+
using msgnetwork_t = salticidae::MsgNetwork<_opcode_t>;
using msgnetwork_config_t = msgnetwork_t::Config;
using msgnetwork_conn_t = msgnetwork_t::conn_t;
@@ -1220,6 +1256,9 @@ using clientnetwork_conn_t = clientnetwork_t::conn_t;
#else
#ifdef SALTICIDAE_CBINDINGS
+typedef struct peerid_t peerid_t;
+typedef struct peerid_t_array_t peerid_array_t;
+
typedef struct msgnetwork_t msgnetwork_t;
typedef struct msgnetwork_config_t msgnetwork_config_t;
typedef struct msgnetwork_conn_t msgnetwork_conn_t;
@@ -1301,6 +1340,14 @@ bool msgnetwork_conn_is_terminated(const msgnetwork_conn_t *conn);
/* PeerNetwork */
+//peerid_t *peerid_new();
+void peerid_free(const peerid_t *self);
+peerid_t *peerid_new_from_netaddr(const netaddr_t *addr);
+peerid_t *peerid_new_from_x509(const x509_t *cert);
+peerid_array_t *peerid_array_new();
+peerid_array_t *peerid_array_new_from_peerids(const peerid_t * const *pids, size_t npids);
+void peerid_array_free(peerid_array_t *self);
+
peernetwork_config_t *peernetwork_config_new();
void peernetwork_config_free(const peernetwork_config_t *self);
void peernetwork_config_ping_period(peernetwork_config_t *self, double t);
@@ -1310,10 +1357,10 @@ msgnetwork_config_t *peernetwork_config_as_msgnetwork_config(peernetwork_config_
peernetwork_t *peernetwork_new(const eventcontext_t *ec, const peernetwork_config_t *config, SalticidaeCError *err);
void peernetwork_free(const peernetwork_t *self);
-int32_t peernetwork_add_peer(peernetwork_t *self, const netaddr_t *addr);
-int32_t peernetwork_del_peer(peernetwork_t *self, const netaddr_t *addr);
-bool peernetwork_has_peer(const peernetwork_t *self, const netaddr_t *addr);
-const peernetwork_conn_t *peernetwork_get_peer_conn(const peernetwork_t *self, const netaddr_t *addr, SalticidaeCError *cerror);
+int32_t peernetwork_add_peer(peernetwork_t *self, const peerid_t *pid);
+int32_t peernetwork_del_peer(peernetwork_t *self, const peerid_t *pid);
+bool peernetwork_has_peer(const peernetwork_t *self, const peerid_t *pid);
+const peernetwork_conn_t *peernetwork_get_peer_conn(const peernetwork_t *self, const peerid_t *pid, SalticidaeCError *cerror);
msgnetwork_t *peernetwork_as_msgnetwork(peernetwork_t *self);
peernetwork_t *msgnetwork_as_peernetwork_unsafe(msgnetwork_t *self);
msgnetwork_conn_t *msgnetwork_conn_new_from_peernetwork_conn(const peernetwork_conn_t *conn);
@@ -1321,9 +1368,9 @@ peernetwork_conn_t *peernetwork_conn_new_from_msgnetwork_conn_unsafe(const msgne
peernetwork_conn_t *peernetwork_conn_copy(const peernetwork_conn_t *self);
netaddr_t *peernetwork_conn_get_peer_addr(const peernetwork_conn_t *self);
void peernetwork_conn_free(const peernetwork_conn_t *self);
-bool peernetwork_send_msg(peernetwork_t *self, const msg_t * msg, const netaddr_t *addr);
-int32_t peernetwork_send_msg_deferred_by_move(peernetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr);
-int32_t peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const netaddr_array_t *addrs);
+bool peernetwork_send_msg(peernetwork_t *self, const msg_t * msg, const peerid_t *peer);
+int32_t peernetwork_send_msg_deferred_by_move(peernetwork_t *self, msg_t * _moved_msg, const peerid_t *peer);
+int32_t peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const peerid_array_t *peers);
void peernetwork_listen(peernetwork_t *self, const netaddr_t *listen_addr, SalticidaeCError *err);
typedef void (*peernetwork_peer_callback_t)(const peernetwork_conn_t *, bool connected, void *userdata);