aboutsummaryrefslogtreecommitdiff
path: root/include
diff options
context:
space:
mode:
Diffstat (limited to 'include')
-rw-r--r--include/salticidae/conn.h40
-rw-r--r--include/salticidae/network.h55
2 files changed, 47 insertions, 48 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 6fc1288..546df5f 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -148,9 +148,9 @@ class ConnPool {
ThreadCall* disp_tcall;
/** Should be implemented by derived class to return a new Conn object. */
virtual Conn *create_conn() = 0;
- using _error_callback_t = std::function<void(const std::exception_ptr err)>;
- _error_callback_t disp_error_cb;
- _error_callback_t worker_error_cb;
+ using worker_error_callback_t = std::function<void(const std::exception_ptr err)>;
+ worker_error_callback_t disp_error_cb;
+ worker_error_callback_t worker_error_cb;
private:
@@ -176,20 +176,17 @@ class ConnPool {
}
class Worker {
- public:
-
- private:
EventContext ec;
ThreadCall tcall;
std::thread handle;
bool disp_flag;
std::atomic<size_t> nconn;
- ConnPool::_error_callback_t on_fatal_error;
+ ConnPool::worker_error_callback_t on_fatal_error;
public:
Worker(): tcall(ec), disp_flag(false), nconn(0) {}
- void set_error_callback(ConnPool::_error_callback_t _on_error) {
+ void set_error_callback(ConnPool::worker_error_callback_t _on_error) {
on_fatal_error = std::move(_on_error);
}
@@ -269,6 +266,17 @@ class ConnPool {
protected:
conn_t _connect(const NetAddr &addr);
void _listen(NetAddr listen_addr);
+ void recoverable_error(const std::exception_ptr err) {
+ user_tcall->async_call([this, err](ThreadCall::Handle &) {
+ if (error_cb) {
+ try {
+ std::rethrow_exception(err);
+ } catch (const std::exception &e) {
+ error_cb(e, false);
+ }
+ }
+ });
+ }
private:
@@ -287,15 +295,6 @@ class ConnPool {
return workers[idx];
}
- void on_fatal_error(const std::exception &error) {
- stop_workers();
- if (error_cb) error_cb(error, true);
- }
-
- void on_recoverable_error(const std::exception &error) {
- if (error_cb) error_cb(error, false);
- }
-
public:
class Config {
@@ -359,7 +358,8 @@ class ConnPool {
try {
std::rethrow_exception(_err);
} catch (const std::exception &err) {
- on_fatal_error(err);
+ stop_workers();
+ if (error_cb) error_cb(err, true);
}
});
disp_ec.stop();
@@ -430,8 +430,8 @@ class ConnPool {
conn_t connect(const NetAddr &addr, bool blocking = true) {
if (blocking)
{
- auto ret = *(static_cast<std::pair<conn_t, std::exception_ptr> *>(disp_tcall->call(
- [this, addr](ThreadCall::Handle &h) {
+ auto ret = *(static_cast<std::pair<conn_t, std::exception_ptr> *>(
+ disp_tcall->call([this, addr](ThreadCall::Handle &h) {
conn_t conn;
std::exception_ptr err = nullptr;
try {
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 9c57749..b119e78 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -488,24 +488,10 @@ inline void MsgNetwork<OpcodeType>::_send_msg(Msg &&msg, const conn_t &conn) {
[this, msg=std::move(msg), conn](ThreadCall::Handle &) {
try {
this->_send_msg_dispatcher(msg, conn);
- throw SalticidaeError("wow");
- } catch (...) { this->disp_error_cb(std::current_exception()); }
+ } catch (...) { this->recoverable_error(std::current_exception()); }
});
}
-template<typename OpcodeType>
-inline void MsgNetwork<OpcodeType>::_send_msg_dispatcher(const Msg &msg, const conn_t &conn) {
- bytearray_t msg_data = msg.serialize();
- SALTICIDAE_LOG_DEBUG("wrote message %s to %s",
- std::string(msg).c_str(),
- std::string(*conn).c_str());
-#ifdef SALTICIDAE_MSG_STAT
- conn->nsent++;
- conn->nsentb += msg.get_length();
-#endif
- conn->write(std::move(msg_data));
-}
-
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker,
const conn_t &conn, double timeout) {
@@ -654,6 +640,19 @@ void PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) {
if (id_mode == IP_BASED)
conn->peer_id.port = 0;
}
+
+template<typename OpcodeType>
+inline void MsgNetwork<OpcodeType>::_send_msg_dispatcher(const Msg &msg, const conn_t &conn) {
+ bytearray_t msg_data = msg.serialize();
+ SALTICIDAE_LOG_DEBUG("wrote message %s to %s",
+ std::string(msg).c_str(),
+ std::string(*conn).c_str());
+#ifdef SALTICIDAE_MSG_STAT
+ conn->nsent++;
+ conn->nsentb += msg.get_length();
+#endif
+ conn->write(std::move(msg_data));
+}
/* end: functions invoked by the dispatcher */
/* begin: functions invoked by the user loop */
@@ -697,18 +696,18 @@ void PeerNetwork<O, _, __>::msg_pong(MsgPong &&msg, const conn_t &conn) {
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::listen(NetAddr listen_addr) {
- auto ret = *(static_cast<SalticidaeError *>(
+ auto ret = *(static_cast<std::exception_ptr *>(
this->disp_tcall->call([this, listen_addr](ThreadCall::Handle &h) {
- SalticidaeError err;
+ std::exception_ptr err = nullptr;
try {
MsgNet::_listen(listen_addr);
listen_port = listen_addr.port;
- } catch (SalticidaeError &e) {
- err = e;
+ } catch (...) {
+ err = std::current_exception();
}
h.set_result(std::move(err));
}).get()));
- if (ret.get_code()) throw ret;
+ if (ret) std::rethrow_exception(ret);
}
template<typename O, O _, O __>
@@ -727,21 +726,21 @@ void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
template<typename O, O _, O __>
const typename PeerNetwork<O, _, __>::conn_t
PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &paddr) const {
- auto ret = *(static_cast<std::pair<conn_t, SalticidaeError> *>(
+ auto ret = *(static_cast<std::pair<conn_t, std::exception_ptr> *>(
this->disp_tcall->call([this, paddr](ThreadCall::Handle &h) {
conn_t conn;
- SalticidaeError err;
+ std::exception_ptr err = nullptr;
try {
auto it = id2peer.find(paddr);
if (it == id2peer.end())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXISTS);
conn = it->second->conn;
- } catch (SalticidaeError &e) {
- err = e;
+ } catch (...) {
+ err = std::current_exception();
}
- h.set_result(std::make_pair(std::move(conn), std::move(err)));
+ h.set_result(std::make_pair(std::move(conn), err));
}).get()));
- if (ret.second.get_code()) throw ret.second;
+ if (ret.second) std::rethrow_exception(ret.second);
return std::move(ret.first);
}
@@ -768,7 +767,7 @@ void PeerNetwork<O, _, __>::_send_msg(Msg &&msg, const NetAddr &paddr) {
if (it == id2peer.end())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXISTS);
this->_send_msg_dispatcher(msg, it->second->conn);
- } catch (...) { this->disp_error_cb(std::current_exception()); }
+ } catch (...) { this->recoverable_error(std::current_exception()); }
});
}
@@ -790,7 +789,7 @@ void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr>
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXISTS);
this->_send_msg_dispatcher(msg, it->second->conn);
}
- } catch (...) { this->disp_error_cb(std::current_exception()); }
+ } catch (...) { this->recoverable_error(std::current_exception()); }
});
}