aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/network.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r--include/salticidae/network.h278
1 files changed, 163 insertions, 115 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index d82772f..8f8940b 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -77,8 +77,8 @@ class MsgNetwork: public ConnPool {
protected:
#ifdef SALTICIDAE_MSG_STAT
- mutable size_t nsent;
- mutable size_t nrecv;
+ mutable std::atomic<size_t> nsent;
+ mutable std::atomic<size_t> nrecv;
#endif
public:
@@ -107,7 +107,8 @@ class MsgNetwork: public ConnPool {
#ifdef SALTICIDAE_MSG_STAT
class msg_stat_by_opcode_t:
public std::unordered_map<typename Msg::opcode_t,
- std::pair<uint32_t, size_t>> {
+ std::pair<std::atomic<uint32_t>,
+ std::atomic<size_t>>> {
public:
void add(const Msg &msg) {
auto &p = this->operator[](msg.get_opcode());
@@ -259,6 +260,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
friend PeerNetwork;
NetAddr peer_id;
Event ev_timeout;
+ void reset_timeout(double timeout);
public:
Conn() = default;
@@ -292,6 +294,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
bool ping_timer_ok;
bool pong_msg_ok;
bool connected;
+ std::mutex mlock;
Peer() = delete;
Peer(NetAddr addr, conn_t conn, const EventContext &ec):
@@ -314,7 +317,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
};
std::unordered_map <NetAddr, BoxObj<Peer>> id2peer;
- std::vector<NetAddr> peer_list;
+ std::mutex pn_mlock;
IdentityMode id_mode;
double retry_conn_delay;
@@ -350,7 +353,6 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
void msg_ping(MsgPing &&msg, Conn &conn);
void msg_pong(MsgPong &&msg, Conn &conn);
- void reset_conn_timeout(Conn &conn);
bool check_new_conn(Conn &conn, uint16_t port);
void start_active_conn(const NetAddr &paddr);
@@ -380,12 +382,11 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
void add_peer(const NetAddr &paddr);
const conn_t get_peer_conn(const NetAddr &paddr) const;
template<typename MsgType>
- void send_msg(const MsgType &msg, const Peer *peer);
+ void _send_msg(const MsgType &msg, const Peer *peer);
template<typename MsgType>
void send_msg(const MsgType &msg, const NetAddr &paddr);
void listen(NetAddr listen_addr);
bool has_peer(const NetAddr &paddr) const;
- const std::vector<NetAddr> &all_peers() const;
conn_t connect(const NetAddr &addr) {
return static_pointer_cast<Conn>(ConnPool::connect(addr));
}
@@ -395,9 +396,9 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
template<typename OpcodeType>
void MsgNetwork<OpcodeType>::Conn::on_read() {
ConnPool::Conn::on_read();
- auto &recv_buffer = get_recv_buffer();
+ auto &recv_buffer = this->recv_buffer;
auto mn = get_net();
- while (get_fd() != -1)
+ while (fd != -1)
{
if (msg_state == Conn::HEADER)
{
@@ -426,23 +427,22 @@ void MsgNetwork<OpcodeType>::Conn::on_read() {
}
}
-template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) {
- if (conn != new_conn)
- {
- if (conn)
- {
- //SALTICIDAE_LOG_DEBUG("moving send buffer");
- //new_conn->move_send_buffer(conn);
- SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str());
- conn->terminate();
- }
- addr = new_conn->get_addr();
- conn = new_conn;
- }
- clear_all_events();
+template<typename OpcodeType>
+template<typename MsgType>
+void MsgNetwork<OpcodeType>::send_msg(const MsgType &_msg, Conn &conn) {
+ Msg msg(_msg);
+ bytearray_t msg_data = msg.serialize();
+ SALTICIDAE_LOG_DEBUG("wrote message %s to %s",
+ std::string(msg).c_str(),
+ std::string(conn).c_str());
+ conn.write(std::move(msg_data));
+#ifdef SALTICIDAE_MSG_STAT
+ conn.nsent++;
+ sent_by_opcode.add(msg);
+#endif
}
+/* begin: functions invoked by the dispatcher */
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::Conn::on_setup() {
MsgNet::Conn::on_setup();
@@ -452,9 +452,14 @@ void PeerNetwork<O, _, __>::Conn::on_setup() {
SALTICIDAE_LOG_INFO("peer ping-pong timeout");
this->terminate();
});
+ if (this->get_mode() == Conn::ConnMode::ACTIVE)
+ {
+ peer_id = this->get_addr();
+ if (id_mode == IP_BASED) peer_id.port = 0;
+ }
/* the initial ping-pong to set up the connection */
auto &conn = static_cast<Conn &>(*this);
- pn->reset_conn_timeout(conn);
+ conn->reset_timeout(pn->conn_timeout);
pn->MsgNet::send_msg(MsgPing(pn->listen_port), conn);
}
@@ -462,9 +467,11 @@ template<typename O, O _, O __>
void PeerNetwork<O, _, __>::Conn::on_teardown() {
MsgNet::Conn::on_teardown();
auto pn = get_net();
+ mutex_lg_t _pn_lg(pn->pn_mlock);
auto it = pn->id2peer.find(peer_id);
if (it == pn->id2peer.end()) return;
auto p = it->second.get();
+ mutex_lg_t _p_lg(p->mlock);
if (this != p->conn.get()) return;
p->ev_ping_timer.del();
p->connected = false;
@@ -472,14 +479,69 @@ void PeerNetwork<O, _, __>::Conn::on_teardown() {
SALTICIDAE_LOG_INFO("connection lost %s for %s",
std::string(*this).c_str(),
std::string(peer_id).c_str());
- p->ev_retry_timer = Event(pn->ec, -1, 0,
+ // try to reconnect
+ p->ev_retry_timer = Event(pn->dispatcher_ec, -1, 0,
[pn, peer_id = this->peer_id](evutil_socket_t, short) {
+ mutex_lg_t _pn_lg(pn->pn_mlock);
pn->start_active_conn(peer_id);
});
p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout());
}
template<typename O, O _, O __>
+void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) {
+ if (conn != new_conn)
+ {
+ if (conn)
+ {
+ //SALTICIDAE_LOG_DEBUG("moving send buffer");
+ //new_conn->move_send_buffer(conn);
+ SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str());
+ conn->terminate();
+ }
+ addr = new_conn->get_addr();
+ conn = new_conn;
+ }
+ clear_all_events();
+}
+
+template<typename O, O _, O __>
+void PeerNetwork<O, _, __>::Conn::reset_timeout(double timeout) {
+ assert(ev_timeout);
+ ev_timeout.del();
+ ev_timeout.add_with_timeout(timeout);
+ SALTICIDAE_LOG_INFO("reset timeout %.2f", timeout);
+}
+
+template<typename O, O _, O __>
+void PeerNetwork<O, _, __>::Peer::reset_ping_timer() {
+ assert(ev_ping_timer);
+ ev_ping_timer.del();
+ ev_ping_timer.add_with_timeout(
+ gen_rand_timeout(conn->get_net()->ping_period));
+}
+
+template<typename O, O _, O __>
+void PeerNetwork<O, _, __>::Peer::send_ping() {
+ auto pn = conn->get_net();
+ ping_timer_ok = false;
+ pong_msg_ok = false;
+ conn->reset_timeout(pn->conn_timeout);
+ pn->_send_msg(MsgPing(pn->listen_port), this);
+}
+
+template<typename O, O _, O __>
+void PeerNetwork<O, _, __>::Peer::ping_timer(evutil_socket_t, short) {
+ mutex_lg_t _p_lg(mlock);
+ ping_timer_ok = true;
+ if (pong_msg_ok)
+ {
+ reset_ping_timer();
+ send_ping();
+ }
+}
+
+template<typename O, O _, O __>
bool PeerNetwork<O, _, __>::check_new_conn(Conn &conn, uint16_t port) {
if (conn.peer_id.is_null())
{ /* passive connections can eventually have ids after getting the port
@@ -489,6 +551,7 @@ bool PeerNetwork<O, _, __>::check_new_conn(Conn &conn, uint16_t port) {
conn.peer_id.port = port;
}
auto p = id2peer.find(conn.peer_id)->second.get();
+ mutex_lg_t _p_lg(p->mlock);
if (p->connected)
{
if (conn.self() != p->conn)
@@ -509,31 +572,77 @@ bool PeerNetwork<O, _, __>::check_new_conn(Conn &conn, uint16_t port) {
}
template<typename O, O _, O __>
+class PeerNetworkPing: public ConnPool::DispatchCmd {
+ using conn_t = typename PeerNetwork<O, _, __>::conn_t;
+ conn_t conn;
+ uint16_t port;
+ public:
+ PeerNetworkPing(const conn_t &conn, uint16_t port):
+ conn(conn), port(port) {}
+ void exec(ConnPool *cpool) override {
+ auto pn = static_cast<PeerNetwork<O, _, __> *>(cpool);
+ mutex_lg_t _pn_lg(pn->pn_mlock);
+ if (pn->check_new_conn(conn, port)) return;
+ auto p = pn->id2peer.find(conn.peer_id)->second.get();
+ mutex_lg_t _p_lg(p->mlock);
+ pn->_send_msg(MsgPong(this->listen_port), p);
+ }
+};
+
+template<typename O, O _, O __>
+class PeerNetworkPong: public ConnPool::DispatchCmd {
+ using conn_t = typename PeerNetwork<O, _, __>::conn_t;
+ conn_t conn;
+ uint16_t port;
+ public:
+ PeerNetworkPong(const conn_t &conn, uint16_t port):
+ conn(conn), port(port) {}
+ void exec(ConnPool *cpool) override {
+ auto pn = static_cast<PeerNetwork<O, _, __> *>(cpool);
+ mutex_lg_t _pn_lg(pn->pn_mlock);
+ auto it = pn->id2peer.find(conn->peer_id);
+ if (it == pn->id2peer.end())
+ {
+ SALTICIDAE_LOG_WARN("pong message discarded");
+ return;
+ }
+ if (pn->check_new_conn(conn, port)) return;
+ auto p = it->second.get();
+ mutex_lg_t _p_lg(p->mlock);
+ p->pong_msg_ok = true;
+ if (p->ping_timer_ok)
+ {
+ p->reset_ping_timer();
+ p->send_ping();
+ }
+ }
+};
+/* end: functions invoked by the dispatcher */
+
+/* this function could be both invoked by the dispatcher and the user loop */
+template<typename O, O _, O __>
+void PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) {
+ auto p = id2peer.find(addr)->second.get();
+ mutex_lg_t _p_lg(p->mlock);
+ if (p->connected) return;
+ auto conn = static_pointer_cast<Conn>(connect(addr));
+ assert(p->conn == nullptr);
+ p->conn = conn;
+}
+
+/* begin: functions invoked by the user loop */
+template<typename O, O _, O __>
void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, Conn &conn) {
uint16_t port = msg.port;
SALTICIDAE_LOG_INFO("ping from %s, port %u", std::string(conn).c_str(), ntohs(port));
- if (check_new_conn(conn, port)) return;
- auto p = id2peer.find(conn.peer_id)->second.get();
- send_msg(MsgPong(this->listen_port), p);
+ auto dcmd = new PeerNetworkPing<O, _, __>(conn.self(), port);
+ write(this->dlisten_fd[1], &dcmd, sizeof(dcmd));
}
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::msg_pong(MsgPong &&msg, Conn &conn) {
- auto it = id2peer.find(conn.peer_id);
- if (it == id2peer.end())
- {
- SALTICIDAE_LOG_WARN("pong message discarded");
- return;
- }
- auto p = it->second.get();
- uint16_t port = msg.port;
- if (check_new_conn(conn, port)) return;
- p->pong_msg_ok = true;
- if (p->ping_timer_ok)
- {
- p->reset_ping_timer();
- p->send_ping();
- }
+ auto dcmd = new PeerNetworkPong<O, _, __>(conn.self(), msg.port);
+ write(this->dlisten_fd[1], &dcmd, sizeof(dcmd));
}
template<typename O, O _, O __>
@@ -545,30 +654,19 @@ void PeerNetwork<O, _, __>::listen(NetAddr listen_addr) {
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) {
- auto p = id2peer.find(addr)->second.get();
- if (p->connected) return;
- auto conn = static_pointer_cast<Conn>(connect(addr));
- assert(p->conn == nullptr);
- p->conn = conn;
- conn->peer_id = addr;
- if (id_mode == IP_BASED)
- conn->peer_id.port = 0;
-}
-
-template<typename O, O _, O __>
void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
+ mutex_lg_t _pn_lg(pn_mlock);
auto it = id2peer.find(addr);
if (it != id2peer.end())
throw PeerNetworkError("peer already exists");
- id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->ec)));
- peer_list.push_back(addr);
+ id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->dispatcher_ec)));
start_active_conn(addr);
}
template<typename O, O _, O __>
const typename PeerNetwork<O, _, __>::conn_t
PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &paddr) const {
+ mutex_lg_t _pn_lg(pn_mlock);
auto it = id2peer.find(paddr);
if (it == id2peer.end())
throw PeerNetworkError("peer does not exist");
@@ -577,27 +675,13 @@ PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &paddr) const {
template<typename O, O _, O __>
bool PeerNetwork<O, _, __>::has_peer(const NetAddr &paddr) const {
+ mutex_lg_t _pn_lg(pn_mlock);
return id2peer.count(paddr);
}
-template<typename OpcodeType>
-template<typename MsgType>
-void MsgNetwork<OpcodeType>::send_msg(const MsgType &_msg, Conn &conn) {
- Msg msg(_msg);
- bytearray_t msg_data = msg.serialize();
- SALTICIDAE_LOG_DEBUG("wrote message %s to %s",
- std::string(msg).c_str(),
- std::string(conn).c_str());
- conn.write(std::move(msg_data));
-#ifdef SALTICIDAE_MSG_STAT
- conn.nsent++;
- sent_by_opcode.add(msg);
-#endif
-}
-
template<typename O, O _, O __>
template<typename MsgType>
-void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const Peer *peer) {
+void PeerNetwork<O, _, __>::_send_msg(const MsgType &msg, const Peer *peer) {
if (peer->connected)
MsgNet::send_msg(msg, *(peer->conn));
else
@@ -607,6 +691,7 @@ void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const Peer *peer) {
template<typename O, O _, O __>
template<typename MsgType>
void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &addr) {
+ mutex_lg_t _pn_lg(pn_mlock);
auto it = id2peer.find(addr);
if (it == id2peer.end())
{
@@ -614,48 +699,11 @@ void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &addr) {
std::string(addr).c_str());
throw PeerNetworkError("peer does not exist");
}
- send_msg(msg, it->second.get());
-}
-
-template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Peer::reset_ping_timer() {
- assert(ev_ping_timer);
- ev_ping_timer.del();
- ev_ping_timer.add_with_timeout(
- gen_rand_timeout(conn->get_net()->ping_period));
-}
-
-template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::reset_conn_timeout(Conn &conn) {
- assert(conn.ev_timeout);
- conn.ev_timeout.del();
- conn.ev_timeout.add_with_timeout(conn_timeout);
- SALTICIDAE_LOG_INFO("reset timeout %.2f", conn_timeout);
-}
-
-template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Peer::send_ping() {
- auto pn = conn->get_net();
- ping_timer_ok = false;
- pong_msg_ok = false;
- pn->reset_conn_timeout(*conn);
- pn->send_msg(MsgPing(pn->listen_port), this);
-}
-
-template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Peer::ping_timer(evutil_socket_t, short) {
- ping_timer_ok = true;
- if (pong_msg_ok)
- {
- reset_ping_timer();
- send_ping();
- }
-}
-
-template<typename O, O _, O __>
-const std::vector<NetAddr> &PeerNetwork<O, _, __>::all_peers() const {
- return peer_list;
+ auto p = it->second.get();
+ mutex_lg_t _p_lg(p->mlock);
+ _send_msg(msg, p);
}
+/* end: functions invoked by the user loop */
template<typename OpcodeType>
void ClientNetwork<OpcodeType>::Conn::on_setup() {