aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/network.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r--include/salticidae/network.h217
1 files changed, 90 insertions, 127 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 18406ea..e5165bf 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -206,7 +206,6 @@ class ClientNetwork: public MsgNetwork<OpcodeType> {
private:
std::unordered_map<NetAddr, typename MsgNet::conn_t> addr2conn;
- std::mutex cn_mlock;
public:
class Conn: public MsgNet::Conn {
@@ -298,7 +297,6 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
bool ping_timer_ok;
bool pong_msg_ok;
bool connected;
- std::mutex mlock;
Peer() = delete;
Peer(NetAddr addr, conn_t conn, const EventContext &ec):
@@ -321,7 +319,6 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
};
std::unordered_map <NetAddr, BoxObj<Peer>> id2peer;
- std::mutex pn_mlock;
const IdentityMode id_mode;
double retry_conn_delay;
@@ -355,30 +352,11 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
}
};
- struct PingCmd: public ConnPool::DispatchCmd {
- conn_t conn;
- uint16_t port;
- PingCmd(const conn_t &conn, uint16_t port):
- conn(conn), port(port) {}
- void exec(ConnPool *cpool) override {
- auto pn = static_cast<PeerNetwork *>(cpool);
- pn->_ping_msg_cb(conn, port);
- }
- };
-
- struct PongCmd: public PingCmd {
- using PingCmd::PingCmd;
- void exec(ConnPool *cpool) override {
- auto pn = static_cast<PeerNetwork *>(cpool);
- pn->_pong_msg_cb(this->conn, this->port);
- }
- };
-
void msg_ping(MsgPing &&msg, Conn &conn);
void msg_pong(MsgPong &&msg, Conn &conn);
void _ping_msg_cb(const conn_t &conn, uint16_t port);
void _pong_msg_cb(const conn_t &conn, uint16_t port);
- bool check_new_conn(Conn &conn, uint16_t port);
+ bool check_new_conn(const conn_t &conn, uint16_t port);
void start_active_conn(const NetAddr &paddr);
protected:
@@ -402,12 +380,14 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
id_mode(id_mode),
retry_conn_delay(retry_conn_delay),
ping_period(ping_period),
- conn_timeout(conn_timeout) {}
+ conn_timeout(conn_timeout) {
+ this->reg_handler(generic_bind(&PeerNetwork::msg_ping, this, _1, _2));
+ this->reg_handler(generic_bind(&PeerNetwork::msg_pong, this, _1, _2));
+ }
void add_peer(const NetAddr &paddr);
const conn_t get_peer_conn(const NetAddr &paddr) const;
- template<typename MsgType>
- void _send_msg(const MsgType &msg, const Peer *peer);
+ using MsgNet::send_msg;
template<typename MsgType>
void send_msg(const MsgType &msg, const NetAddr &paddr);
void listen(NetAddr listen_addr);
@@ -471,7 +451,7 @@ void PeerNetwork<O, _, __>::Conn::on_setup() {
MsgNet::Conn::on_setup();
auto pn = get_net();
assert(!ev_timeout);
- ev_timeout = Event(pn->ec, -1, 0, [this](int, int) {
+ ev_timeout = Event(pn->dispatcher_ec, -1, 0, [this](int, int) {
SALTICIDAE_LOG_INFO("peer ping-pong timeout");
this->terminate();
});
@@ -483,18 +463,16 @@ void PeerNetwork<O, _, __>::Conn::on_setup() {
/* the initial ping-pong to set up the connection */
auto &conn = static_cast<Conn &>(*this);
reset_timeout(pn->conn_timeout);
- pn->MsgNet::send_msg(MsgPing(pn->listen_port), conn);
+ pn->send_msg(MsgPing(pn->listen_port), conn);
}
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::Conn::on_teardown() {
MsgNet::Conn::on_teardown();
auto pn = get_net();
- mutex_lg_t _pn_lg(pn->pn_mlock);
auto it = pn->id2peer.find(peer_id);
if (it == pn->id2peer.end()) return;
auto p = it->second.get();
- mutex_lg_t _p_lg(p->mlock);
if (this != p->conn.get()) return;
p->ev_ping_timer.del();
p->connected = false;
@@ -505,7 +483,6 @@ void PeerNetwork<O, _, __>::Conn::on_teardown() {
// try to reconnect
p->ev_retry_timer = Event(pn->dispatcher_ec, -1, 0,
[pn, peer_id = this->peer_id](int, int) {
- mutex_lg_t _pn_lg(pn->pn_mlock);
pn->start_active_conn(peer_id);
});
p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout());
@@ -550,12 +527,11 @@ void PeerNetwork<O, _, __>::Peer::send_ping() {
ping_timer_ok = false;
pong_msg_ok = false;
conn->reset_timeout(pn->conn_timeout);
- pn->_send_msg(MsgPing(pn->listen_port), this);
+ pn->send_msg(MsgPing(pn->listen_port), *conn);
}
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::Peer::ping_timer(int, int) {
- mutex_lg_t _p_lg(mlock);
ping_timer_ok = true;
if (pong_msg_ok)
{
@@ -565,149 +541,132 @@ void PeerNetwork<O, _, __>::Peer::ping_timer(int, int) {
}
template<typename O, O _, O __>
-bool PeerNetwork<O, _, __>::check_new_conn(Conn &conn, uint16_t port) {
- if (conn.peer_id.is_null())
+bool PeerNetwork<O, _, __>::check_new_conn(const conn_t &conn, uint16_t port) {
+ if (conn->peer_id.is_null())
{ /* passive connections can eventually have ids after getting the port
number in IP_BASED_PORT mode */
assert(id_mode == IP_PORT_BASED);
- conn.peer_id.ip = conn.get_addr().ip;
- conn.peer_id.port = port;
+ conn->peer_id.ip = conn->get_addr().ip;
+ conn->peer_id.port = port;
}
- auto p = id2peer.find(conn.peer_id)->second.get();
- mutex_lg_t _p_lg(p->mlock);
+ auto p = id2peer.find(conn->peer_id)->second.get();
if (p->connected)
{
- if (conn.self() != p->conn)
+ if (conn != p->conn)
{
- conn.terminate();
+ conn->terminate();
return true;
}
return false;
}
- p->reset_conn(static_pointer_cast<Conn>(conn.self()));
+ p->reset_conn(conn);
p->connected = true;
p->reset_ping_timer();
p->send_ping();
if (p->connected)
SALTICIDAE_LOG_INFO("PeerNetwork: established connection with %s via %s",
- std::string(conn.peer_id).c_str(), std::string(conn).c_str());
+ std::string(conn->peer_id).c_str(), std::string(*conn).c_str());
return false;
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::_ping_msg_cb(const conn_t &conn, uint16_t port) {
- mutex_lg_t _pn_lg(pn_mlock);
- if (check_new_conn(*conn, port)) return;
- auto p = id2peer.find(conn->peer_id)->second.get();
- mutex_lg_t _p_lg(p->mlock);
- _send_msg(MsgPong(this->listen_port), p);
-}
-
-template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::_pong_msg_cb(const conn_t &conn, uint16_t port) {
- mutex_lg_t _pn_lg(pn_mlock);
- auto it = id2peer.find(conn->peer_id);
- if (it == id2peer.end())
- {
- SALTICIDAE_LOG_WARN("pong message discarded");
- return;
- }
- if (check_new_conn(*conn, port)) return;
- auto p = it->second.get();
- mutex_lg_t _p_lg(p->mlock);
- p->pong_msg_ok = true;
- if (p->ping_timer_ok)
- {
- p->reset_ping_timer();
- p->send_ping();
- }
-}
-
-/* end: functions invoked by the dispatcher */
-
-/* this function could be both invoked by the dispatcher and the user loop */
-template<typename O, O _, O __>
void PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) {
auto p = id2peer.find(addr)->second.get();
- mutex_lg_t _p_lg(p->mlock);
if (p->connected) return;
- auto conn = static_pointer_cast<Conn>(MsgNet::connect(addr));
+ auto conn = static_pointer_cast<Conn>(MsgNet::_connect(addr));
assert(p->conn == nullptr);
p->conn = conn;
}
+/* end: functions invoked by the dispatcher */
/* begin: functions invoked by the user loop */
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, Conn &conn) {
+void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, Conn &_conn) {
+ auto conn = static_pointer_cast<Conn>(_conn.self());
+ if (!conn) return;
uint16_t port = msg.port;
- SALTICIDAE_LOG_INFO("ping from %s, port %u", std::string(conn).c_str(), ntohs(port));
- auto dcmd = new PingCmd(static_pointer_cast<Conn>(conn.self()), port);
- write(this->dlisten_fd[1], &dcmd, sizeof(dcmd));
+ this->disp_tcall->call([this, conn, port](ThreadCall::Handle &msg) {
+ SALTICIDAE_LOG_INFO("ping from %s, port %u",
+ std::string(*conn).c_str(), ntohs(port));
+ if (check_new_conn(conn, port)) return;
+ auto p = id2peer.find(conn->peer_id)->second.get();
+ send_msg(MsgPong(this->listen_port), *conn);
+ });
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::msg_pong(MsgPong &&msg, Conn &conn) {
- auto dcmd = new PongCmd(static_pointer_cast<Conn>(conn.self()), msg.port);
- write(this->dlisten_fd[1], &dcmd, sizeof(dcmd));
+void PeerNetwork<O, _, __>::msg_pong(MsgPong &&msg, Conn &_conn) {
+ auto conn = static_pointer_cast<Conn>(_conn.self());
+ if (!conn) return;
+ uint16_t port = msg.port;
+ this->disp_tcall->call([this, conn, port](ThreadCall::Handle &msg) {
+ auto it = id2peer.find(conn->peer_id);
+ if (it == id2peer.end())
+ {
+ SALTICIDAE_LOG_WARN("pong message discarded");
+ return;
+ }
+ if (check_new_conn(conn, port)) return;
+ auto p = it->second.get();
+ p->pong_msg_ok = true;
+ if (p->ping_timer_ok)
+ {
+ p->reset_ping_timer();
+ p->send_ping();
+ }
+ });
}
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::listen(NetAddr listen_addr) {
- MsgNet::listen(listen_addr);
- listen_port = listen_addr.port;
- this->reg_handler(generic_bind(&PeerNetwork::msg_ping, this, _1, _2));
- this->reg_handler(generic_bind(&PeerNetwork::msg_pong, this, _1, _2));
+ this->disp_tcall->call([this, listen_addr](ThreadCall::Handle &msg) {
+ MsgNet::_listen(listen_addr);
+ listen_port = listen_addr.port;
+ }, true);
}
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
- mutex_lg_t _pn_lg(pn_mlock);
- auto it = id2peer.find(addr);
- if (it != id2peer.end())
- throw PeerNetworkError("peer already exists");
- id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->dispatcher_ec)));
- start_active_conn(addr);
+ this->disp_tcall->call([this, addr](ThreadCall::Handle &) {
+ auto it = id2peer.find(addr);
+ if (it != id2peer.end())
+ throw PeerNetworkError("peer already exists");
+ id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->dispatcher_ec)));
+ start_active_conn(addr);
+ }, true);
}
template<typename O, O _, O __>
const typename PeerNetwork<O, _, __>::conn_t
PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &paddr) const {
- mutex_lg_t _pn_lg(pn_mlock);
- auto it = id2peer.find(paddr);
- if (it == id2peer.end())
- throw PeerNetworkError("peer does not exist");
- return it->second->conn;
+ auto ret = static_cast<conn_t *>(this->disp_tcall->call(
+ [this, paddr](ThreadCall::Handle &h) {
+ auto it = id2peer.find(paddr);
+ if (it == id2peer.end())
+ throw PeerNetworkError("peer does not exist");
+ auto ptr = new conn_t(it->second->conn);
+ h.set_result(ptr);
+ }));
+ auto conn = *ret;
+ delete ret;
+ return std::move(conn);
}
template<typename O, O _, O __>
bool PeerNetwork<O, _, __>::has_peer(const NetAddr &paddr) const {
- mutex_lg_t _pn_lg(pn_mlock);
- return id2peer.count(paddr);
-}
-
-template<typename O, O _, O __>
-template<typename MsgType>
-void PeerNetwork<O, _, __>::_send_msg(const MsgType &msg, const Peer *peer) {
- if (peer->connected)
- MsgNet::send_msg(msg, *(peer->conn));
- else
- SALTICIDAE_LOG_DEBUG("dropped");
+ auto ret = static_cast<bool *>(this->disp_tcall->call(
+ [this, paddr](ThreadCall::Handle &h) {
+ h.set_result(id2peer.count(paddr));
+ }));
+ auto has = *ret;
+ delete ret;
+ return has;
}
template<typename O, O _, O __>
template<typename MsgType>
void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &addr) {
- mutex_lg_t _pn_lg(pn_mlock);
- auto it = id2peer.find(addr);
- if (it == id2peer.end())
- {
- SALTICIDAE_LOG_ERROR("sending to non-existing peer: %s",
- std::string(addr).c_str());
- throw PeerNetworkError("peer does not exist");
- }
- auto p = it->second.get();
- mutex_lg_t _p_lg(p->mlock);
- _send_msg(msg, p);
+ send_msg(msg, *get_peer_conn(addr));
}
/* end: functions invoked by the user loop */
@@ -716,7 +675,6 @@ void ClientNetwork<OpcodeType>::Conn::on_setup() {
MsgNet::Conn::on_setup();
assert(this->get_mode() == Conn::PASSIVE);
const auto &addr = this->get_addr();
- mutex_lg_t _cn_lg(cn_mlock);
auto cn = get_net();
cn->addr2conn.erase(addr);
cn->addr2conn.insert(
@@ -728,17 +686,22 @@ template<typename OpcodeType>
void ClientNetwork<OpcodeType>::Conn::on_teardown() {
MsgNet::Conn::on_teardown();
assert(this->get_mode() == Conn::PASSIVE);
- mutex_lg_t _cn_lg(cn_mlock);
get_net()->addr2conn.erase(this->get_addr());
}
template<typename OpcodeType>
template<typename MsgType>
void ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr) {
- mutex_lg_t _cn_lg(cn_mlock);
- auto it = addr2conn.find(addr);
- if (it == addr2conn.end()) return;
- MsgNet::send_msg(msg, *(it->second));
+ auto ret = static_cast<conn_t *>(this->disp_tcall->call(
+ [this, addr](ThreadCall::Handle &h) {
+ auto it = addr2conn.find(addr);
+ if (it == addr2conn.end())
+ throw PeerNetworkError("client does not exist");
+ auto ptr = new conn_t(it->second->conn);
+ h.set_result(ptr);
+ }));
+ send_msg(msg, **ret);
+ delete ret;
}
template<typename O, O OPCODE_PING, O _>