aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/network.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r--include/salticidae/network.h195
1 files changed, 121 insertions, 74 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index e7e77f5..9c57749 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -199,8 +199,10 @@ class MsgNetwork: public ConnPool {
}
template<typename MsgType>
- bool send_msg(MsgType &&msg, const conn_t &conn);
- inline bool _send_msg(const Msg &msg, const conn_t &conn);
+ void send_msg(MsgType &&msg, const conn_t &conn);
+ inline void _send_msg(Msg &&msg, const conn_t &conn);
+ inline void _send_msg_dispatcher(const Msg &msg, const conn_t &conn);
+
using ConnPool::listen;
conn_t connect(const NetAddr &addr) {
return static_pointer_cast<Conn>(ConnPool::connect(addr));
@@ -248,10 +250,6 @@ class ClientNetwork: public MsgNetwork<OpcodeType> {
void send_msg(MsgType &&msg, const NetAddr &addr);
};
-class PeerNetworkError: public ConnPoolError {
- using ConnPoolError::ConnPoolError;
-};
-
/** Peer-to-peer network where any two nodes could hold a bi-diretional message
* channel, established by either side. */
template<typename OpcodeType = uint8_t,
@@ -434,9 +432,9 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
template<typename MsgType>
void send_msg(MsgType &&msg, const NetAddr &paddr);
inline void _send_msg(Msg &&msg, const NetAddr &paddr);
-
template<typename MsgType>
void multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs);
+ inline void _multicast_msg(Msg &&msg, const std::vector<NetAddr> &paddrs);
void listen(NetAddr listen_addr);
conn_t connect(const NetAddr &addr) = delete;
@@ -480,13 +478,23 @@ void MsgNetwork<OpcodeType>::Conn::on_read() {
template<typename OpcodeType>
template<typename MsgType>
-bool MsgNetwork<OpcodeType>::send_msg(MsgType &&_msg, const conn_t &conn) {
- Msg msg(std::forward<MsgType>(_msg));
- return _send_msg(msg, conn);
+void MsgNetwork<OpcodeType>::send_msg(MsgType &&msg, const conn_t &conn) {
+ return _send_msg(MsgType(std::move(msg)), conn);
+}
+
+template<typename OpcodeType>
+inline void MsgNetwork<OpcodeType>::_send_msg(Msg &&msg, const conn_t &conn) {
+ this->disp_tcall->async_call(
+ [this, msg=std::move(msg), conn](ThreadCall::Handle &) {
+ try {
+ this->_send_msg_dispatcher(msg, conn);
+ throw SalticidaeError("wow");
+ } catch (...) { this->disp_error_cb(std::current_exception()); }
+ });
}
template<typename OpcodeType>
-inline bool MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn) {
+inline void MsgNetwork<OpcodeType>::_send_msg_dispatcher(const Msg &msg, const conn_t &conn) {
bytearray_t msg_data = msg.serialize();
SALTICIDAE_LOG_DEBUG("wrote message %s to %s",
std::string(msg).c_str(),
@@ -495,17 +503,19 @@ inline bool MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn
conn->nsent++;
conn->nsentb += msg.get_length();
#endif
- return conn->write(std::move(msg_data));
+ conn->write(std::move(msg_data));
}
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker,
const conn_t &conn, double timeout) {
- worker->get_tcall()->async_call([conn, t=timeout](ThreadCall::Handle &) {
- if (!conn->ev_timeout) return;
- conn->ev_timeout.del();
- conn->ev_timeout.add(t);
- SALTICIDAE_LOG_DEBUG("reset connection timeout %.2f", t);
+ worker->get_tcall()->async_call([worker, conn, t=timeout](ThreadCall::Handle &) {
+ try {
+ if (!conn->ev_timeout) return;
+ conn->ev_timeout.del();
+ conn->ev_timeout.add(t);
+ SALTICIDAE_LOG_DEBUG("reset connection timeout %.2f", t);
+ } catch (...) { worker->error_callback(std::current_exception()); }
});
}
@@ -517,9 +527,11 @@ void PeerNetwork<O, _, __>::Conn::on_setup() {
auto conn = static_pointer_cast<Conn>(this->self());
auto worker = this->worker;
assert(!ev_timeout);
- ev_timeout = TimerEvent(worker->get_ec(), [conn](TimerEvent &) {
- SALTICIDAE_LOG_INFO("peer ping-pong timeout");
- conn->worker_terminate();
+ ev_timeout = TimerEvent(worker->get_ec(), [worker, conn](TimerEvent &) {
+ try {
+ SALTICIDAE_LOG_INFO("peer ping-pong timeout");
+ conn->worker_terminate();
+ } catch (...) { worker->error_callback(std::current_exception()); }
});
/* the initial ping-pong to set up the connection */
tcall_reset_timeout(worker, conn, pn->conn_timeout);
@@ -541,7 +553,9 @@ void PeerNetwork<O, _, __>::Conn::on_teardown() {
// try to reconnect
p->ev_retry_timer = TimerEvent(pn->disp_ec,
[pn, peer_id = this->peer_id](TimerEvent &) {
- pn->start_active_conn(peer_id);
+ try {
+ pn->start_active_conn(peer_id);
+ } catch (...) { pn->disp_error_cb(std::current_exception()); }
});
p->ev_retry_timer.add(pn->gen_conn_timeout());
}
@@ -647,11 +661,13 @@ template<typename O, O _, O __>
void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, const conn_t &conn) {
uint16_t port = msg.port;
this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &) {
- if (conn->get_mode() == ConnPool::Conn::DEAD) return;
- SALTICIDAE_LOG_INFO("ping from %s, port %u",
- std::string(*conn).c_str(), ntohs(port));
- if (check_new_conn(conn, port)) return;
- send_msg(MsgPong(this->listen_port), conn);
+ try {
+ if (conn->get_mode() == ConnPool::Conn::DEAD) return;
+ SALTICIDAE_LOG_INFO("ping from %s, port %u",
+ std::string(*conn).c_str(), ntohs(port));
+ if (check_new_conn(conn, port)) return;
+ send_msg(MsgPong(this->listen_port), conn);
+ } catch (...) { this->disp_error_cb(std::current_exception()); }
});
}
@@ -659,54 +675,74 @@ template<typename O, O _, O __>
void PeerNetwork<O, _, __>::msg_pong(MsgPong &&msg, const conn_t &conn) {
uint16_t port = msg.port;
this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &) {
- if (conn->get_mode() == ConnPool::Conn::DEAD) return;
- auto it = id2peer.find(conn->peer_id);
- if (it == id2peer.end())
- {
- SALTICIDAE_LOG_WARN("pong message discarded");
- return;
- }
- if (check_new_conn(conn, port)) return;
- auto p = it->second.get();
- p->pong_msg_ok = true;
- if (p->ping_timer_ok)
- {
- p->reset_ping_timer();
- p->send_ping();
- }
+ try {
+ if (conn->get_mode() == ConnPool::Conn::DEAD) return;
+ auto it = id2peer.find(conn->peer_id);
+ if (it == id2peer.end())
+ {
+ SALTICIDAE_LOG_WARN("pong message discarded");
+ return;
+ }
+ if (check_new_conn(conn, port)) return;
+ auto p = it->second.get();
+ p->pong_msg_ok = true;
+ if (p->ping_timer_ok)
+ {
+ p->reset_ping_timer();
+ p->send_ping();
+ }
+ } catch (...) { this->disp_error_cb(std::current_exception()); }
});
}
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::listen(NetAddr listen_addr) {
- this->disp_tcall->call([this, listen_addr](ThreadCall::Handle &) {
- MsgNet::_listen(listen_addr);
- listen_port = listen_addr.port;
- });
+ auto ret = *(static_cast<SalticidaeError *>(
+ this->disp_tcall->call([this, listen_addr](ThreadCall::Handle &h) {
+ SalticidaeError err;
+ try {
+ MsgNet::_listen(listen_addr);
+ listen_port = listen_addr.port;
+ } catch (SalticidaeError &e) {
+ err = e;
+ }
+ h.set_result(std::move(err));
+ }).get()));
+ if (ret.get_code()) throw ret;
}
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
- this->disp_tcall->call([this, addr](ThreadCall::Handle &) {
- auto it = id2peer.find(addr);
- if (it != id2peer.end())
- throw PeerNetworkError("peer already exists");
- id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->disp_ec)));
- start_active_conn(addr);
+ this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) {
+ try {
+ auto it = id2peer.find(addr);
+ if (it != id2peer.end())
+ throw PeerNetworkError(SALTI_ERROR_PEER_ALREADY_EXISTS);
+ id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->disp_ec)));
+ start_active_conn(addr);
+ } catch (...) { this->disp_error_cb(std::current_exception()); }
});
}
template<typename O, O _, O __>
const typename PeerNetwork<O, _, __>::conn_t
PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &paddr) const {
- auto ret = *(static_cast<conn_t *>(this->disp_tcall->call(
- [this, paddr](ThreadCall::Handle &h) {
- auto it = id2peer.find(paddr);
- if (it == id2peer.end())
- throw PeerNetworkError("peer does not exist");
- h.set_result(it->second->conn);
+ auto ret = *(static_cast<std::pair<conn_t, SalticidaeError> *>(
+ this->disp_tcall->call([this, paddr](ThreadCall::Handle &h) {
+ conn_t conn;
+ SalticidaeError err;
+ try {
+ auto it = id2peer.find(paddr);
+ if (it == id2peer.end())
+ throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXISTS);
+ conn = it->second->conn;
+ } catch (SalticidaeError &e) {
+ err = e;
+ }
+ h.set_result(std::make_pair(std::move(conn), std::move(err)));
}).get()));
- return std::move(ret);
+ if (ret.second.get_code()) throw ret.second;
+ return std::move(ret.first);
}
template<typename O, O _, O __>
@@ -720,32 +756,41 @@ bool PeerNetwork<O, _, __>::has_peer(const NetAddr &paddr) const {
template<typename O, O _, O __>
template<typename MsgType>
void PeerNetwork<O, _, __>::send_msg(MsgType &&msg, const NetAddr &paddr) {
- return _send_msg(MsgType(msg), paddr);
+ return _send_msg(MsgType(std::move(msg)), paddr);
}
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::_send_msg(Msg &&msg, const NetAddr &paddr) {
this->disp_tcall->async_call(
[this, msg=std::move(msg), paddr](ThreadCall::Handle &) {
- auto it = id2peer.find(paddr);
- if (it == id2peer.end())
- throw PeerNetworkError("peer does not exist");
- send_msg(std::move(msg), it->second->conn);
+ try {
+ auto it = id2peer.find(paddr);
+ if (it == id2peer.end())
+ throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXISTS);
+ this->_send_msg_dispatcher(msg, it->second->conn);
+ } catch (...) { this->disp_error_cb(std::current_exception()); }
});
}
template<typename O, O _, O __>
template<typename MsgType>
void PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs) {
+ return _multicast_msg(MsgType(std::move(msg)), paddrs);
+}
+
+template<typename O, O _, O __>
+void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &paddrs) {
this->disp_tcall->async_call(
[this, msg=std::move(msg), paddrs](ThreadCall::Handle &) {
- for (auto &addr: paddrs)
- {
- auto it = id2peer.find(addr);
- if (it == id2peer.end())
- throw PeerNetworkError("peer does not exist");
- send_msg(std::move(msg), it->second->conn);
- }
+ try {
+ for (auto &addr: paddrs)
+ {
+ auto it = id2peer.find(addr);
+ if (it == id2peer.end())
+ throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXISTS);
+ this->_send_msg_dispatcher(msg, it->second->conn);
+ }
+ } catch (...) { this->disp_error_cb(std::current_exception()); }
});
}
@@ -774,9 +819,11 @@ template<typename MsgType>
void ClientNetwork<OpcodeType>::send_msg(MsgType &&msg, const NetAddr &addr) {
this->disp_tcall->async_call(
[this, addr, msg=std::forward<MsgType>(msg)](ThreadCall::Handle &) {
- auto it = addr2conn.find(addr);
- if (it != addr2conn.end())
- send_msg(std::move(msg), it->second);
+ try {
+ auto it = addr2conn.find(addr);
+ if (it != addr2conn.end())
+ send_msg(std::move(msg), it->second);
+ } catch (...) { this->disp_error_cb(std::current_exception()); }
});
}
@@ -842,7 +889,7 @@ void msgnetwork_config_queue_capacity(msgnetwork_config_t *self, size_t cap);
msgnetwork_t *msgnetwork_new(const eventcontext_t *ec, const msgnetwork_config_t *config);
void msgnetwork_free(const msgnetwork_t *self);
-bool msgnetwork_send_msg(msgnetwork_t *self, const msg_t *msg, const msgnetwork_conn_t *conn);
+void msgnetwork_send_msg_by_move(msgnetwork_t *self, msg_t *_moved_msg, const msgnetwork_conn_t *conn);
msgnetwork_conn_t *msgnetwork_connect(msgnetwork_t *self, const netaddr_t *addr);
msgnetwork_conn_t *msgnetwork_conn_copy(const msgnetwork_conn_t *self);
void msgnetwork_conn_free(const msgnetwork_conn_t *self);