aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/network.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r--include/salticidae/network.h79
1 files changed, 40 insertions, 39 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 07c6ba5..20dc696 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -109,7 +109,6 @@ class MsgNetwork: public ConnPool {
#endif
protected:
- void on_read() override;
};
using conn_t = ArcObj<Conn>;
@@ -127,6 +126,7 @@ class MsgNetwork: public ConnPool {
protected:
ConnPool::Conn *create_conn() override { return new Conn(); }
+ void on_read(const ConnPool::conn_t &) override;
public:
@@ -287,12 +287,9 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
protected:
void stop() override {
- ev_timeout.clear();
+ ev_timeout.del();
MsgNet::Conn::stop();
}
-
- void on_setup() override;
- void on_teardown() override;
};
using conn_t = ArcObj<Conn>;
@@ -326,7 +323,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
if (ev_ping_timer)
ev_ping_timer.del();
}
- void reset_conn(conn_t conn);
+ void reset_conn(const conn_t &conn);
};
std::unordered_map<NetAddr, BoxObj<Peer>> id2peer;
@@ -381,6 +378,8 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
virtual double gen_conn_timeout() {
return gen_rand_timeout(retry_conn_delay);
}
+ void on_setup(const ConnPool::conn_t &) override;
+ void on_teardown(const ConnPool::conn_t &) override;
public:
@@ -466,11 +465,13 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
/* this callback is run by a worker */
template<typename OpcodeType>
-void MsgNetwork<OpcodeType>::Conn::on_read() {
- ConnPool::Conn::on_read();
- auto &recv_buffer = this->recv_buffer;
- auto mn = get_net();
- while (self_ref)
+void MsgNetwork<OpcodeType>::on_read(const ConnPool::conn_t &_conn) {
+ ConnPool::on_read(_conn);
+ auto conn = static_pointer_cast<Conn>(_conn);
+ auto &recv_buffer = conn->recv_buffer;
+ auto &msg = conn->msg;
+ auto &msg_state = conn->msg_state;
+ while (true) //(!conn->is_terminated())
{
if (msg_state == Conn::HEADER)
{
@@ -493,8 +494,7 @@ void MsgNetwork<OpcodeType>::Conn::on_read() {
return;
}
#endif
- auto conn = static_pointer_cast<Conn>(self());
- while (!mn->incoming_msgs.enqueue(std::make_pair(msg, conn), false))
+ while (!incoming_msgs.enqueue(std::make_pair(msg, conn), false))
std::this_thread::yield();
}
}
@@ -550,46 +550,47 @@ void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker,
/* begin: functions invoked by the dispatcher */
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Conn::on_setup() {
- MsgNet::Conn::on_setup();
- auto pn = get_net();
- auto conn = static_pointer_cast<Conn>(this->self());
- auto worker = this->worker;
+void PeerNetwork<O, _, __>::on_setup(const ConnPool::conn_t &_conn) {
+ MsgNet::on_setup(_conn);
+ auto conn = static_pointer_cast<Conn>(_conn);
+ auto worker = conn->worker;
+ auto &ev_timeout = conn->ev_timeout;
assert(!ev_timeout);
ev_timeout = TimerEvent(worker->get_ec(), [worker, conn](TimerEvent &) {
try {
SALTICIDAE_LOG_INFO("peer ping-pong timeout");
- conn->worker_terminate();
+ conn->get_net()->worker_terminate(conn);
} catch (...) { worker->error_callback(std::current_exception()); }
});
/* the initial ping-pong to set up the connection */
- tcall_reset_timeout(worker, conn, pn->conn_timeout);
- pn->send_msg(MsgPing(pn->listen_port), conn);
+ tcall_reset_timeout(worker, conn, conn_timeout);
+ send_msg(MsgPing(listen_port), conn);
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Conn::on_teardown() {
- MsgNet::Conn::on_teardown();
- auto pn = get_net();
- auto p = pn->get_peer(peer_id);
+void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
+ MsgNet::on_teardown(_conn);
+ auto conn = static_pointer_cast<Conn>(_conn);
+ conn->ev_timeout.clear();
+ const auto &peer_id = conn->peer_id;
+ auto p = get_peer(peer_id);
if (!p) return;
- if (this != p->conn.get()) return;
+ if (conn != p->conn) return;
p->ev_ping_timer.del();
p->connected = false;
//p->conn = nullptr;
- SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*this).c_str());
+ SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*conn).c_str());
// try to reconnect
- p->ev_retry_timer = TimerEvent(pn->disp_ec,
- [pn, peer_id = this->peer_id](TimerEvent &) {
+ p->ev_retry_timer = TimerEvent(this->disp_ec, [this, peer_id](TimerEvent &) {
try {
- pn->start_active_conn(peer_id);
- } catch (...) { pn->disp_error_cb(std::current_exception()); }
+ start_active_conn(peer_id);
+ } catch (...) { this->disp_error_cb(std::current_exception()); }
});
- p->ev_retry_timer.add(pn->gen_conn_timeout());
+ p->ev_retry_timer.add(gen_conn_timeout());
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) {
+void PeerNetwork<O, _, __>::Peer::reset_conn(const conn_t &new_conn) {
if (conn != new_conn)
{
if (conn)
@@ -597,7 +598,8 @@ void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) {
//SALTICIDAE_LOG_DEBUG("moving send buffer");
//new_conn->move_send_buffer(conn);
SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str());
- conn->disp_terminate();
+ auto net = conn->get_net();
+ net->disp_terminate(conn);
}
addr = new_conn->get_addr();
conn = new_conn;
@@ -656,7 +658,7 @@ bool PeerNetwork<O, _, __>::check_new_conn(const conn_t &conn, uint16_t port) {
}
else
{
- conn->disp_terminate();
+ this->disp_terminate(conn);
return true;
}
}
@@ -665,7 +667,7 @@ bool PeerNetwork<O, _, __>::check_new_conn(const conn_t &conn, uint16_t port) {
{
if (conn != p->conn)
{
- conn->disp_terminate();
+ this->disp_terminate(conn);
return true;
}
return false;
@@ -797,7 +799,7 @@ void PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) {
auto it = id2peer.find(addr);
if (it == id2peer.end())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- it->second->conn->disp_terminate();
+ this->disp_terminate(it->second->conn);
id2peer.erase(it);
} catch (const PeerNetworkError &) {
this->recoverable_error(std::current_exception());
@@ -899,8 +901,7 @@ void ClientNetwork<OpcodeType>::Conn::on_setup() {
auto cn = get_net();
cn->addr2conn.erase(addr);
cn->addr2conn.insert(
- std::make_pair(addr,
- static_pointer_cast<Conn>(this->self())));
+ std::make_pair(addr, static_pointer_cast<Conn>(this->self())));
}
template<typename OpcodeType>