aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/network.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r--include/salticidae/network.h63
1 files changed, 40 insertions, 23 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index e5165bf..a63976b 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -275,9 +275,9 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
const NetAddr &get_peer() { return peer_id; }
protected:
- void on_close() override {
+ void stop() override {
ev_timeout.clear();
- MsgNet::Conn::on_close();
+ MsgNet::Conn::stop();
}
void on_setup() override;
@@ -302,7 +302,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
Peer(NetAddr addr, conn_t conn, const EventContext &ec):
addr(addr), conn(conn),
ev_ping_timer(
- Event(ec, -1, 0, std::bind(&Peer::ping_timer, this, _1, _2))),
+ Event(ec, -1, std::bind(&Peer::ping_timer, this, _1, _2))),
connected(false) {}
~Peer() {}
Peer &operator=(const Peer &) = delete;
@@ -358,6 +358,8 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
void _pong_msg_cb(const conn_t &conn, uint16_t port);
bool check_new_conn(const conn_t &conn, uint16_t port);
void start_active_conn(const NetAddr &paddr);
+ static void tcall_reset_timeout(ConnPool::Worker *worker,
+ const conn_t &conn, double timeout);
protected:
ConnPool::Conn *create_conn() override { return new Conn(); }
@@ -385,6 +387,8 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
this->reg_handler(generic_bind(&PeerNetwork::msg_pong, this, _1, _2));
}
+ ~PeerNetwork() { this->stop(); }
+
void add_peer(const NetAddr &paddr);
const conn_t get_peer_conn(const NetAddr &paddr) const;
using MsgNet::send_msg;
@@ -445,15 +449,28 @@ void MsgNetwork<OpcodeType>::send_msg(const MsgType &_msg, Conn &conn) {
#endif
}
+template<typename O, O _, O __>
+void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker,
+ const conn_t &conn, double timeout) {
+ worker->get_tcall()->call([conn, t=timeout](ThreadCall::Handle &) {
+ assert(conn->ev_timeout);
+ conn->ev_timeout.del();
+ conn->ev_timeout.add_with_timeout(t, 0);
+ SALTICIDAE_LOG_INFO("reset timeout %.2f", t);
+ });
+}
+
/* begin: functions invoked by the dispatcher */
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::Conn::on_setup() {
MsgNet::Conn::on_setup();
auto pn = get_net();
+ auto conn = static_pointer_cast<Conn>(this->self());
+ auto worker = this->worker;
assert(!ev_timeout);
- ev_timeout = Event(pn->dispatcher_ec, -1, 0, [this](int, int) {
+ ev_timeout = Event(worker->get_ec(), -1, [conn](int, int) {
SALTICIDAE_LOG_INFO("peer ping-pong timeout");
- this->terminate();
+ conn->terminate();
});
if (this->get_mode() == Conn::ConnMode::ACTIVE)
{
@@ -461,9 +478,8 @@ void PeerNetwork<O, _, __>::Conn::on_setup() {
if (pn->id_mode == IP_BASED) peer_id.port = 0;
}
/* the initial ping-pong to set up the connection */
- auto &conn = static_cast<Conn &>(*this);
- reset_timeout(pn->conn_timeout);
- pn->send_msg(MsgPing(pn->listen_port), conn);
+ tcall_reset_timeout(worker, conn, pn->conn_timeout);
+ pn->send_msg(MsgPing(pn->listen_port), *conn);
}
template<typename O, O _, O __>
@@ -481,11 +497,11 @@ void PeerNetwork<O, _, __>::Conn::on_teardown() {
std::string(*this).c_str(),
std::string(peer_id).c_str());
// try to reconnect
- p->ev_retry_timer = Event(pn->dispatcher_ec, -1, 0,
+ p->ev_retry_timer = Event(pn->disp_ec, -1,
[pn, peer_id = this->peer_id](int, int) {
pn->start_active_conn(peer_id);
});
- p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout());
+ p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout(), 0);
}
template<typename O, O _, O __>
@@ -497,7 +513,7 @@ void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) {
//SALTICIDAE_LOG_DEBUG("moving send buffer");
//new_conn->move_send_buffer(conn);
SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str());
- conn->terminate();
+ conn->cpool->terminate(conn);
}
addr = new_conn->get_addr();
conn = new_conn;
@@ -506,19 +522,11 @@ void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) {
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Conn::reset_timeout(double timeout) {
- assert(ev_timeout);
- ev_timeout.del();
- ev_timeout.add_with_timeout(timeout);
- SALTICIDAE_LOG_INFO("reset timeout %.2f", timeout);
-}
-
-template<typename O, O _, O __>
void PeerNetwork<O, _, __>::Peer::reset_ping_timer() {
assert(ev_ping_timer);
ev_ping_timer.del();
ev_ping_timer.add_with_timeout(
- gen_rand_timeout(conn->get_net()->ping_period));
+ gen_rand_timeout(conn->get_net()->ping_period), 0);
}
template<typename O, O _, O __>
@@ -526,7 +534,7 @@ void PeerNetwork<O, _, __>::Peer::send_ping() {
auto pn = conn->get_net();
ping_timer_ok = false;
pong_msg_ok = false;
- conn->reset_timeout(pn->conn_timeout);
+ tcall_reset_timeout(conn->worker, conn, pn->conn_timeout);
pn->send_msg(MsgPing(pn->listen_port), *conn);
}
@@ -554,7 +562,7 @@ bool PeerNetwork<O, _, __>::check_new_conn(const conn_t &conn, uint16_t port) {
{
if (conn != p->conn)
{
- conn->terminate();
+ conn->cpool->terminate(conn);
return true;
}
return false;
@@ -631,7 +639,7 @@ void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
auto it = id2peer.find(addr);
if (it != id2peer.end())
throw PeerNetworkError("peer already exists");
- id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->dispatcher_ec)));
+ id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->disp_ec)));
start_active_conn(addr);
}, true);
}
@@ -646,6 +654,9 @@ PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &paddr) const {
throw PeerNetworkError("peer does not exist");
auto ptr = new conn_t(it->second->conn);
h.set_result(ptr);
+ h.set_deleter([](void *data) {
+ delete static_cast<conn_t *>(data);
+ });
}));
auto conn = *ret;
delete ret;
@@ -657,6 +668,9 @@ bool PeerNetwork<O, _, __>::has_peer(const NetAddr &paddr) const {
auto ret = static_cast<bool *>(this->disp_tcall->call(
[this, paddr](ThreadCall::Handle &h) {
h.set_result(id2peer.count(paddr));
+ h.set_deleter([](void *data) {
+ delete static_cast<bool *>(data);
+ });
}));
auto has = *ret;
delete ret;
@@ -699,6 +713,9 @@ void ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr
throw PeerNetworkError("client does not exist");
auto ptr = new conn_t(it->second->conn);
h.set_result(ptr);
+ h.set_deleter([](void *data) {
+ delete static_cast<conn_t *>(data);
+ });
}));
send_msg(msg, **ret);
delete ret;