aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2019-06-12 19:14:40 -0400
committerDeterminant <[email protected]>2019-06-12 19:14:40 -0400
commite27e529e589ef89fbe010ebf7c5635ec2873f64f (patch)
tree76e7d20589d11f3aabd255ca70201f1288d1d1bc
parent9f6460c7ab774d900f391345bbf3fac8617a3aa3 (diff)
WIP: error handling
-rw-r--r--include/salticidae/conn.h171
-rw-r--r--include/salticidae/event.h15
-rw-r--r--include/salticidae/netaddr.h10
-rw-r--r--include/salticidae/network.h195
-rw-r--r--include/salticidae/queue.h1
-rw-r--r--include/salticidae/util.h48
-rw-r--r--src/conn.cpp114
-rw-r--r--src/network.cpp9
-rw-r--r--src/util.cpp26
-rw-r--r--test/test_msgnet_c.c42
-rw-r--r--test/test_p2p_stress.cpp5
11 files changed, 419 insertions, 217 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index cb637cb..42e87aa 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -50,10 +50,6 @@
namespace salticidae {
-struct ConnPoolError: public SalticidaeError {
- using SalticidaeError::SalticidaeError;
-};
-
/** Abstraction for connection management. */
class ConnPool {
class Worker;
@@ -63,6 +59,7 @@ class ConnPool {
using conn_t = ArcObj<Conn>;
/** The type of callback invoked when connection status is changed. */
using conn_callback_t = std::function<void(const conn_t &, bool)>;
+ using error_callback_t = std::function<void(const std::exception &, bool)>;
/** Abstraction for a bi-directional connection. */
class Conn {
friend ConnPool;
@@ -151,6 +148,10 @@ class ConnPool {
ThreadCall* disp_tcall;
/** Should be implemented by derived class to return a new Conn object. */
virtual Conn *create_conn() = 0;
+ using _error_callback_t = std::function<void(const std::exception_ptr err)>;
+ _error_callback_t disp_error_cb;
+ _error_callback_t worker_error_cb;
+
private:
const int max_listen_backlog;
@@ -161,6 +162,7 @@ class ConnPool {
/* owned by user loop */
BoxObj<ThreadCall> user_tcall;
conn_callback_t conn_cb;
+ error_callback_t error_cb;
/* owned by the dispatcher */
FdEvent ev_listen;
@@ -174,15 +176,27 @@ class ConnPool {
}
class Worker {
+ public:
+
+ private:
EventContext ec;
ThreadCall tcall;
std::thread handle;
bool disp_flag;
std::atomic<size_t> nconn;
+ ConnPool::_error_callback_t on_fatal_error;
public:
Worker(): tcall(ec), disp_flag(false), nconn(0) {}
+ void set_error_callback(ConnPool::_error_callback_t _on_error) {
+ on_fatal_error = std::move(_on_error);
+ }
+
+ void error_callback(const std::exception_ptr err) const {
+ on_fatal_error(err);
+ }
+
/* the following functions are called by the dispatcher */
void start() {
handle = std::thread([this]() { ec.dispatch(); });
@@ -191,36 +205,40 @@ class ConnPool {
void feed(const conn_t &conn, int client_fd) {
/* the caller should finalize all the preparation */
tcall.async_call([this, conn, client_fd](ThreadCall::Handle &) {
- if (conn->mode == Conn::ConnMode::DEAD)
- {
- SALTICIDAE_LOG_INFO("worker %x discarding dead connection",
- std::this_thread::get_id());
- return;
- }
- assert(conn->fd != -1);
- SALTICIDAE_LOG_INFO("worker %x got %s",
- std::this_thread::get_id(),
- std::string(*conn).c_str());
- conn->get_send_buffer()
- .get_queue()
- .reg_handler(this->ec, [conn, client_fd]
- (MPSCWriteBuffer::queue_t &) {
- if (conn->ready_send)
+ try {
+ if (conn->mode == Conn::ConnMode::DEAD)
{
- conn->ev_socket.del();
- conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE);
- conn->send_data(client_fd, FdEvent::WRITE);
+ SALTICIDAE_LOG_INFO("worker %x discarding dead connection",
+ std::this_thread::get_id());
+ return;
}
- return false;
- });
- conn->ev_socket = FdEvent(ec, client_fd, [conn=conn](int fd, int what) {
- if (what & FdEvent::READ)
- conn->recv_data(fd, what);
- else
- conn->send_data(fd, what);
- });
- conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE);
- nconn++;
+ assert(conn->fd != -1);
+ SALTICIDAE_LOG_INFO("worker %x got %s",
+ std::this_thread::get_id(),
+ std::string(*conn).c_str());
+ conn->get_send_buffer()
+ .get_queue()
+ .reg_handler(this->ec, [conn, client_fd]
+ (MPSCWriteBuffer::queue_t &) {
+ if (conn->ready_send)
+ {
+ conn->ev_socket.del();
+ conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE);
+ conn->send_data(client_fd, FdEvent::WRITE);
+ }
+ return false;
+ });
+ conn->ev_socket = FdEvent(ec, client_fd, [this, conn=conn](int fd, int what) {
+ try {
+ if (what & FdEvent::READ)
+ conn->recv_data(fd, what);
+ else
+ conn->send_data(fd, what);
+ } catch (...) { on_fatal_error(std::current_exception()); }
+ });
+ conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE);
+ nconn++;
+ } catch (...) { on_fatal_error(std::current_exception()); }
});
}
@@ -253,18 +271,6 @@ class ConnPool {
private:
- //class DspMulticast: public DispatchCmd {
- // std::vector<conn_t> receivers;
- // bytearray_t data;
- // public:
- // DspMulticast(std::vector<conn_t> &&receivers, bytearray_t &&data):
- // receivers(std::move(receivers)),
- // data(std::move(data)) {}
- // void exec(ConnPool *) override {
- // for (auto &r: receivers) r->write(bytearray_t(data));
- // }
- //};
-
Worker &select_worker() {
size_t idx = 0;
size_t best = workers[idx].get_nconn();
@@ -280,6 +286,15 @@ class ConnPool {
return workers[idx];
}
+ void on_fatal_error(const std::exception &error) {
+ stop_workers();
+ if (error_cb) error_cb(error, true);
+ }
+
+ void on_recoverable_error(const std::exception &error) {
+ if (error_cb) error_cb(error, false);
+ }
+
public:
class Config {
@@ -338,6 +353,30 @@ class ConnPool {
disp_ec = workers[0].get_ec();
disp_tcall = workers[0].get_tcall();
workers[0].set_dispatcher();
+ disp_error_cb = [this](const std::exception_ptr _err) {
+ user_tcall->async_call([this, _err](ThreadCall::Handle &) {
+ try {
+ std::rethrow_exception(_err);
+ } catch (const std::exception &err) {
+ on_fatal_error(err);
+ }
+ });
+ };
+
+ worker_error_cb = [this](const std::exception_ptr err) {
+ disp_tcall->async_call([this, err](ThreadCall::Handle &) {
+ // forward to the dispatcher
+ disp_error_cb(err);
+ });
+ };
+ for (size_t i = 0; i < nworker; i++)
+ {
+ auto &worker = workers[i];
+ if (worker.is_dispatcher())
+ worker.set_error_callback(disp_error_cb);
+ else
+ worker.set_error_callback(worker_error_cb);
+ }
}
~ConnPool() { stop(); }
@@ -388,17 +427,28 @@ class ConnPool {
conn_t connect(const NetAddr &addr, bool blocking = true) {
if (blocking)
{
- auto ret = *(static_cast<conn_t *>(disp_tcall->call(
+ auto ret = *(static_cast<std::pair<conn_t, std::exception_ptr> *>(disp_tcall->call(
[this, addr](ThreadCall::Handle &h) {
- auto conn = _connect(addr);
- h.set_result(std::move(conn));
+ conn_t conn;
+ std::exception_ptr err = nullptr;
+ try {
+ conn = _connect(addr);
+ } catch (...) {
+ err = std::current_exception();
+ }
+ h.set_result(std::make_pair(std::move(conn), err));
}).get()));
- return std::move(ret);
+ if (ret.second) std::rethrow_exception(ret.second);
+ return std::move(ret.first);
}
else
{
disp_tcall->async_call([this, addr](ThreadCall::Handle &) {
- _connect(addr);
+ try {
+ _connect(addr);
+ } catch (...) {
+ disp_error_cb(std::current_exception());
+ }
});
return nullptr;
}
@@ -408,17 +458,32 @@ class ConnPool {
* Does not need to be called if do not want to accept any passive
* connections. */
void listen(NetAddr listen_addr) {
- disp_tcall->call([this, listen_addr](ThreadCall::Handle &) {
- _listen(listen_addr);
- });
+ auto ret = *(static_cast<std::exception_ptr *>(
+ disp_tcall->call([this, listen_addr](ThreadCall::Handle &h) {
+ std::exception_ptr err = nullptr;
+ try {
+ _listen(listen_addr);
+ } catch (...) {
+ err = std::current_exception();
+ }
+ h.set_result(err);
+ }).get()));
+ if (ret) std::rethrow_exception(ret);
}
template<typename Func>
void reg_conn_handler(Func cb) { conn_cb = cb; }
+ template<typename Func>
+ void reg_error_handler(Func cb) { error_cb = cb; }
+
void terminate(const conn_t &conn) {
disp_tcall->async_call([this, conn](ThreadCall::Handle &) {
- conn->disp_terminate();
+ try {
+ conn->disp_terminate();
+ } catch (...) {
+ disp_error_cb(std::current_exception());
+ }
});
}
};
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index 91637e4..5a315dd 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -467,8 +467,14 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
// since all enqueue operations are finalized, the dequeue should be able
// to see those enqueued values in func()
wait_sig.exchange(true, std::memory_order_acq_rel);
- if (func(*this))
+ bool again;
+ try {
+ again = func(*this);
+ } catch (SalticidaeError &err) {
write(fd, &dummy, 8);
+ throw err;
+ }
+ if (again) write(fd, &dummy, 8);
});
ev.add(FdEvent::READ);
}
@@ -610,7 +616,12 @@ class ThreadCall {
Handle *h;
while (q.try_dequeue(h))
{
- h->exec();
+ try {
+ h->exec();
+ } catch (SalticidaeError &err) {
+ delete h;
+ throw err;
+ }
delete h;
if (++cnt == burst_size) return true;
}
diff --git a/include/salticidae/netaddr.h b/include/salticidae/netaddr.h
index 909e092..a25bf6f 100644
--- a/include/salticidae/netaddr.h
+++ b/include/salticidae/netaddr.h
@@ -53,7 +53,7 @@ struct NetAddr {
void set_by_ip_port(const std::string &_addr, uint16_t _port) {
struct hostent *h;
if ((h = gethostbyname(_addr.c_str())) == nullptr)
- throw SalticidaeError("gethostbyname failed");
+ throw SalticidaeError(SALTI_ERROR_NETADDR_INVALID, errno);
memmove(&ip, h->h_addr_list[0], sizeof(in_addr_t));
port = htons(_port);
}
@@ -61,19 +61,19 @@ struct NetAddr {
NetAddr(const std::string &ip_port_addr) {
size_t pos = ip_port_addr.find(":");
if (pos == std::string::npos)
- throw SalticidaeError("invalid port format");
+ throw SalticidaeError(SALTI_ERROR_NETADDR_INVALID);
std::string ip_str = ip_port_addr.substr(0, pos);
std::string port_str = ip_port_addr.substr(pos + 1);
long port;
try {
port = std::stol(port_str.c_str());
} catch (std::logic_error &) {
- throw SalticidaeError("invalid port format");
+ throw SalticidaeError(SALTI_ERROR_NETADDR_INVALID);
}
if (port < 0)
- throw SalticidaeError("negative port number");
+ throw SalticidaeError(SALTI_ERROR_NETADDR_INVALID);
if (port > 0xffff)
- throw SalticidaeError("port number greater than 0xffff");
+ throw SalticidaeError(SALTI_ERROR_NETADDR_INVALID);
set_by_ip_port(ip_str, (uint16_t)port);
}
/* construct from unix socket format */
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index e7e77f5..9c57749 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -199,8 +199,10 @@ class MsgNetwork: public ConnPool {
}
template<typename MsgType>
- bool send_msg(MsgType &&msg, const conn_t &conn);
- inline bool _send_msg(const Msg &msg, const conn_t &conn);
+ void send_msg(MsgType &&msg, const conn_t &conn);
+ inline void _send_msg(Msg &&msg, const conn_t &conn);
+ inline void _send_msg_dispatcher(const Msg &msg, const conn_t &conn);
+
using ConnPool::listen;
conn_t connect(const NetAddr &addr) {
return static_pointer_cast<Conn>(ConnPool::connect(addr));
@@ -248,10 +250,6 @@ class ClientNetwork: public MsgNetwork<OpcodeType> {
void send_msg(MsgType &&msg, const NetAddr &addr);
};
-class PeerNetworkError: public ConnPoolError {
- using ConnPoolError::ConnPoolError;
-};
-
/** Peer-to-peer network where any two nodes could hold a bi-diretional message
* channel, established by either side. */
template<typename OpcodeType = uint8_t,
@@ -434,9 +432,9 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
template<typename MsgType>
void send_msg(MsgType &&msg, const NetAddr &paddr);
inline void _send_msg(Msg &&msg, const NetAddr &paddr);
-
template<typename MsgType>
void multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs);
+ inline void _multicast_msg(Msg &&msg, const std::vector<NetAddr> &paddrs);
void listen(NetAddr listen_addr);
conn_t connect(const NetAddr &addr) = delete;
@@ -480,13 +478,23 @@ void MsgNetwork<OpcodeType>::Conn::on_read() {
template<typename OpcodeType>
template<typename MsgType>
-bool MsgNetwork<OpcodeType>::send_msg(MsgType &&_msg, const conn_t &conn) {
- Msg msg(std::forward<MsgType>(_msg));
- return _send_msg(msg, conn);
+void MsgNetwork<OpcodeType>::send_msg(MsgType &&msg, const conn_t &conn) {
+ return _send_msg(MsgType(std::move(msg)), conn);
+}
+
+template<typename OpcodeType>
+inline void MsgNetwork<OpcodeType>::_send_msg(Msg &&msg, const conn_t &conn) {
+ this->disp_tcall->async_call(
+ [this, msg=std::move(msg), conn](ThreadCall::Handle &) {
+ try {
+ this->_send_msg_dispatcher(msg, conn);
+ throw SalticidaeError("wow");
+ } catch (...) { this->disp_error_cb(std::current_exception()); }
+ });
}
template<typename OpcodeType>
-inline bool MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn) {
+inline void MsgNetwork<OpcodeType>::_send_msg_dispatcher(const Msg &msg, const conn_t &conn) {
bytearray_t msg_data = msg.serialize();
SALTICIDAE_LOG_DEBUG("wrote message %s to %s",
std::string(msg).c_str(),
@@ -495,17 +503,19 @@ inline bool MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn
conn->nsent++;
conn->nsentb += msg.get_length();
#endif
- return conn->write(std::move(msg_data));
+ conn->write(std::move(msg_data));
}
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker,
const conn_t &conn, double timeout) {
- worker->get_tcall()->async_call([conn, t=timeout](ThreadCall::Handle &) {
- if (!conn->ev_timeout) return;
- conn->ev_timeout.del();
- conn->ev_timeout.add(t);
- SALTICIDAE_LOG_DEBUG("reset connection timeout %.2f", t);
+ worker->get_tcall()->async_call([worker, conn, t=timeout](ThreadCall::Handle &) {
+ try {
+ if (!conn->ev_timeout) return;
+ conn->ev_timeout.del();
+ conn->ev_timeout.add(t);
+ SALTICIDAE_LOG_DEBUG("reset connection timeout %.2f", t);
+ } catch (...) { worker->error_callback(std::current_exception()); }
});
}
@@ -517,9 +527,11 @@ void PeerNetwork<O, _, __>::Conn::on_setup() {
auto conn = static_pointer_cast<Conn>(this->self());
auto worker = this->worker;
assert(!ev_timeout);
- ev_timeout = TimerEvent(worker->get_ec(), [conn](TimerEvent &) {
- SALTICIDAE_LOG_INFO("peer ping-pong timeout");
- conn->worker_terminate();
+ ev_timeout = TimerEvent(worker->get_ec(), [worker, conn](TimerEvent &) {
+ try {
+ SALTICIDAE_LOG_INFO("peer ping-pong timeout");
+ conn->worker_terminate();
+ } catch (...) { worker->error_callback(std::current_exception()); }
});
/* the initial ping-pong to set up the connection */
tcall_reset_timeout(worker, conn, pn->conn_timeout);
@@ -541,7 +553,9 @@ void PeerNetwork<O, _, __>::Conn::on_teardown() {
// try to reconnect
p->ev_retry_timer = TimerEvent(pn->disp_ec,
[pn, peer_id = this->peer_id](TimerEvent &) {
- pn->start_active_conn(peer_id);
+ try {
+ pn->start_active_conn(peer_id);
+ } catch (...) { pn->disp_error_cb(std::current_exception()); }
});
p->ev_retry_timer.add(pn->gen_conn_timeout());
}
@@ -647,11 +661,13 @@ template<typename O, O _, O __>
void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, const conn_t &conn) {
uint16_t port = msg.port;
this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &) {
- if (conn->get_mode() == ConnPool::Conn::DEAD) return;
- SALTICIDAE_LOG_INFO("ping from %s, port %u",
- std::string(*conn).c_str(), ntohs(port));
- if (check_new_conn(conn, port)) return;
- send_msg(MsgPong(this->listen_port), conn);
+ try {
+ if (conn->get_mode() == ConnPool::Conn::DEAD) return;
+ SALTICIDAE_LOG_INFO("ping from %s, port %u",
+ std::string(*conn).c_str(), ntohs(port));
+ if (check_new_conn(conn, port)) return;
+ send_msg(MsgPong(this->listen_port), conn);
+ } catch (...) { this->disp_error_cb(std::current_exception()); }
});
}
@@ -659,54 +675,74 @@ template<typename O, O _, O __>
void PeerNetwork<O, _, __>::msg_pong(MsgPong &&msg, const conn_t &conn) {
uint16_t port = msg.port;
this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &) {
- if (conn->get_mode() == ConnPool::Conn::DEAD) return;
- auto it = id2peer.find(conn->peer_id);
- if (it == id2peer.end())
- {
- SALTICIDAE_LOG_WARN("pong message discarded");
- return;
- }
- if (check_new_conn(conn, port)) return;
- auto p = it->second.get();
- p->pong_msg_ok = true;
- if (p->ping_timer_ok)
- {
- p->reset_ping_timer();
- p->send_ping();
- }
+ try {
+ if (conn->get_mode() == ConnPool::Conn::DEAD) return;
+ auto it = id2peer.find(conn->peer_id);
+ if (it == id2peer.end())
+ {
+ SALTICIDAE_LOG_WARN("pong message discarded");
+ return;
+ }
+ if (check_new_conn(conn, port)) return;
+ auto p = it->second.get();
+ p->pong_msg_ok = true;
+ if (p->ping_timer_ok)
+ {
+ p->reset_ping_timer();
+ p->send_ping();
+ }
+ } catch (...) { this->disp_error_cb(std::current_exception()); }
});
}
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::listen(NetAddr listen_addr) {
- this->disp_tcall->call([this, listen_addr](ThreadCall::Handle &) {
- MsgNet::_listen(listen_addr);
- listen_port = listen_addr.port;
- });
+ auto ret = *(static_cast<SalticidaeError *>(
+ this->disp_tcall->call([this, listen_addr](ThreadCall::Handle &h) {
+ SalticidaeError err;
+ try {
+ MsgNet::_listen(listen_addr);
+ listen_port = listen_addr.port;
+ } catch (SalticidaeError &e) {
+ err = e;
+ }
+ h.set_result(std::move(err));
+ }).get()));
+ if (ret.get_code()) throw ret;
}
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
- this->disp_tcall->call([this, addr](ThreadCall::Handle &) {
- auto it = id2peer.find(addr);
- if (it != id2peer.end())
- throw PeerNetworkError("peer already exists");
- id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->disp_ec)));
- start_active_conn(addr);
+ this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) {
+ try {
+ auto it = id2peer.find(addr);
+ if (it != id2peer.end())
+ throw PeerNetworkError(SALTI_ERROR_PEER_ALREADY_EXISTS);
+ id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->disp_ec)));
+ start_active_conn(addr);
+ } catch (...) { this->disp_error_cb(std::current_exception()); }
});
}
template<typename O, O _, O __>
const typename PeerNetwork<O, _, __>::conn_t
PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &paddr) const {
- auto ret = *(static_cast<conn_t *>(this->disp_tcall->call(
- [this, paddr](ThreadCall::Handle &h) {
- auto it = id2peer.find(paddr);
- if (it == id2peer.end())
- throw PeerNetworkError("peer does not exist");
- h.set_result(it->second->conn);
+ auto ret = *(static_cast<std::pair<conn_t, SalticidaeError> *>(
+ this->disp_tcall->call([this, paddr](ThreadCall::Handle &h) {
+ conn_t conn;
+ SalticidaeError err;
+ try {
+ auto it = id2peer.find(paddr);
+ if (it == id2peer.end())
+ throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXISTS);
+ conn = it->second->conn;
+ } catch (SalticidaeError &e) {
+ err = e;
+ }
+ h.set_result(std::make_pair(std::move(conn), std::move(err)));
}).get()));
- return std::move(ret);
+ if (ret.second.get_code()) throw ret.second;
+ return std::move(ret.first);
}
template<typename O, O _, O __>
@@ -720,32 +756,41 @@ bool PeerNetwork<O, _, __>::has_peer(const NetAddr &paddr) const {
template<typename O, O _, O __>
template<typename MsgType>
void PeerNetwork<O, _, __>::send_msg(MsgType &&msg, const NetAddr &paddr) {
- return _send_msg(MsgType(msg), paddr);
+ return _send_msg(MsgType(std::move(msg)), paddr);
}
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::_send_msg(Msg &&msg, const NetAddr &paddr) {
this->disp_tcall->async_call(
[this, msg=std::move(msg), paddr](ThreadCall::Handle &) {
- auto it = id2peer.find(paddr);
- if (it == id2peer.end())
- throw PeerNetworkError("peer does not exist");
- send_msg(std::move(msg), it->second->conn);
+ try {
+ auto it = id2peer.find(paddr);
+ if (it == id2peer.end())
+ throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXISTS);
+ this->_send_msg_dispatcher(msg, it->second->conn);
+ } catch (...) { this->disp_error_cb(std::current_exception()); }
});
}
template<typename O, O _, O __>
template<typename MsgType>
void PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs) {
+ return _multicast_msg(MsgType(std::move(msg)), paddrs);
+}
+
+template<typename O, O _, O __>
+void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &paddrs) {
this->disp_tcall->async_call(
[this, msg=std::move(msg), paddrs](ThreadCall::Handle &) {
- for (auto &addr: paddrs)
- {
- auto it = id2peer.find(addr);
- if (it == id2peer.end())
- throw PeerNetworkError("peer does not exist");
- send_msg(std::move(msg), it->second->conn);
- }
+ try {
+ for (auto &addr: paddrs)
+ {
+ auto it = id2peer.find(addr);
+ if (it == id2peer.end())
+ throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXISTS);
+ this->_send_msg_dispatcher(msg, it->second->conn);
+ }
+ } catch (...) { this->disp_error_cb(std::current_exception()); }
});
}
@@ -774,9 +819,11 @@ template<typename MsgType>
void ClientNetwork<OpcodeType>::send_msg(MsgType &&msg, const NetAddr &addr) {
this->disp_tcall->async_call(
[this, addr, msg=std::forward<MsgType>(msg)](ThreadCall::Handle &) {
- auto it = addr2conn.find(addr);
- if (it != addr2conn.end())
- send_msg(std::move(msg), it->second);
+ try {
+ auto it = addr2conn.find(addr);
+ if (it != addr2conn.end())
+ send_msg(std::move(msg), it->second);
+ } catch (...) { this->disp_error_cb(std::current_exception()); }
});
}
@@ -842,7 +889,7 @@ void msgnetwork_config_queue_capacity(msgnetwork_config_t *self, size_t cap);
msgnetwork_t *msgnetwork_new(const eventcontext_t *ec, const msgnetwork_config_t *config);
void msgnetwork_free(const msgnetwork_t *self);
-bool msgnetwork_send_msg(msgnetwork_t *self, const msg_t *msg, const msgnetwork_conn_t *conn);
+void msgnetwork_send_msg_by_move(msgnetwork_t *self, msg_t *_moved_msg, const msgnetwork_conn_t *conn);
msgnetwork_conn_t *msgnetwork_connect(msgnetwork_t *self, const netaddr_t *addr);
msgnetwork_conn_t *msgnetwork_conn_copy(const msgnetwork_conn_t *self);
void msgnetwork_conn_free(const msgnetwork_conn_t *self);
diff --git a/include/salticidae/queue.h b/include/salticidae/queue.h
index da11e8a..9fd11f1 100644
--- a/include/salticidae/queue.h
+++ b/include/salticidae/queue.h
@@ -17,6 +17,7 @@ class FreeList {
std::atomic<size_t> refcnt;
std::atomic<bool> freed;
Node(): next(nullptr), refcnt(1), freed(false) {}
+ virtual ~Node() {}
};
private:
diff --git a/include/salticidae/util.h b/include/salticidae/util.h
index 19779b0..0ddf5be 100644
--- a/include/salticidae/util.h
+++ b/include/salticidae/util.h
@@ -30,6 +30,7 @@
#include <string>
#include <exception>
#include <cstdarg>
+#include <cstring>
#include <vector>
#include <unordered_map>
#include <functional>
@@ -52,18 +53,57 @@ std::vector<std::string> trim_all(const std::vector<std::string> &ss);
std::string vstringprintf(const char *fmt, va_list ap);
std::string stringprintf(const char *fmt, ...);
+enum SalticidaeErrorCode {
+ SALTI_NORMAL,
+ SALTI_ERROR_GENERIC,
+ SALTI_ERROR_ACCEPT,
+ SALTI_ERROR_LISTEN,
+ SALTI_ERROR_CONNECT,
+ SALTI_ERROR_PEER_ALREADY_EXISTS,
+ SALTI_ERROR_PEER_NOT_EXISTS,
+ SALTI_ERROR_NETADDR_INVALID,
+ SALTI_ERROR_OPTVAL_INVALID,
+ SALTI_ERROR_OPTNAME_ALREADY_EXISTS,
+ SALTI_ERROR_OPT_UNKNOWN_ACTION,
+ SALTI_ERROR_CONFIG_LINE_TOO_LONG,
+ SALTI_ERROR_OPT_INVALID
+};
+
+extern const char *SALTICIDAE_ERROR_STRINGS[];
+
class SalticidaeError: public std::exception {
std::string msg;
+ int code;
+ int oscode;
+
public:
- SalticidaeError();
+ SalticidaeError() : code(SALTI_NORMAL) {}
template<typename... Args>
- SalticidaeError(const std::string &fmt, Args... args) {
+ SalticidaeError(const std::string &fmt, Args... args): code(SALTI_ERROR_GENERIC) {
msg = stringprintf(fmt.c_str(), args...);
}
+ SalticidaeError(int code, int oscode = 0): code(code), oscode(oscode) {
+ if (oscode)
+ msg = stringprintf("%s: %s", SALTICIDAE_ERROR_STRINGS[code], strerror(oscode));
+ else
+ msg = SALTICIDAE_ERROR_STRINGS[code];
+ }
+
operator std::string() const { return msg; }
const char *what() const throw() override { return msg.c_str(); }
+ int get_code() const { return code; }
+ int get_oscode() const { return oscode; }
+};
+
+
+struct ConnPoolError: public SalticidaeError {
+ using SalticidaeError::SalticidaeError;
+};
+
+class PeerNetworkError: public ConnPoolError {
+ using ConnPoolError::ConnPoolError;
};
extern const char *TTY_COLOR_RED;
@@ -217,7 +257,7 @@ class Config {
try {
val = stoi(strval, &idx);
} catch (std::invalid_argument &) {
- throw SalticidaeError("invalid integer");
+ throw SalticidaeError(SALTI_ERROR_OPTVAL_INVALID);
}
}
int &get() { return val; }
@@ -237,7 +277,7 @@ class Config {
try {
val = stod(strval, &idx);
} catch (std::invalid_argument &) {
- throw SalticidaeError("invalid double");
+ throw SalticidaeError(SALTI_ERROR_OPTVAL_INVALID);
}
}
double &get() { return val; }
diff --git a/src/conn.cpp b/src/conn.cpp
index 5fc59f3..7f485fd 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -170,34 +170,38 @@ void ConnPool::Conn::disp_terminate() {
void ConnPool::accept_client(int fd, int) {
int client_fd;
struct sockaddr client_addr;
- socklen_t addr_size = sizeof(struct sockaddr_in);
- if ((client_fd = accept(fd, &client_addr, &addr_size)) < 0)
- SALTICIDAE_LOG_ERROR("error while accepting the connection: %s",
- strerror(errno));
- else
- {
- int one = 1;
- if (setsockopt(client_fd, SOL_TCP, TCP_NODELAY, (const char *)&one, sizeof(one)) < 0)
- throw ConnPoolError(std::string("setsockopt failed"));
- if (fcntl(client_fd, F_SETFL, O_NONBLOCK) == -1)
- throw ConnPoolError(std::string("unable to set nonblocking socket"));
+ try {
+ socklen_t addr_size = sizeof(struct sockaddr_in);
+ if ((client_fd = accept(fd, &client_addr, &addr_size)) < 0)
+ throw ConnPoolError(SALTI_ERROR_ACCEPT, errno);
+ else
+ {
+ int one = 1;
+ if (setsockopt(client_fd, SOL_TCP, TCP_NODELAY, (const char *)&one, sizeof(one)) < 0)
+ throw ConnPoolError(SALTI_ERROR_ACCEPT, errno);
+ if (fcntl(client_fd, F_SETFL, O_NONBLOCK) == -1)
+ throw ConnPoolError(SALTI_ERROR_ACCEPT, errno);
- NetAddr addr((struct sockaddr_in *)&client_addr);
- conn_t conn = create_conn();
- conn->self_ref = conn;
- conn->send_buffer.set_capacity(queue_capacity);
- conn->seg_buff_size = seg_buff_size;
- conn->fd = client_fd;
- conn->cpool = this;
- conn->mode = Conn::PASSIVE;
- conn->addr = addr;
- add_conn(conn);
- SALTICIDAE_LOG_INFO("accepted %s", std::string(*conn).c_str());
- auto &worker = select_worker();
- conn->worker = &worker;
- conn->on_setup();
- update_conn(conn, true);
- worker.feed(conn, client_fd);
+ NetAddr addr((struct sockaddr_in *)&client_addr);
+ conn_t conn = create_conn();
+ conn->self_ref = conn;
+ conn->send_buffer.set_capacity(queue_capacity);
+ conn->seg_buff_size = seg_buff_size;
+ conn->fd = client_fd;
+ conn->cpool = this;
+ conn->mode = Conn::PASSIVE;
+ conn->addr = addr;
+ add_conn(conn);
+ SALTICIDAE_LOG_INFO("accepted %s", std::string(*conn).c_str());
+ auto &worker = select_worker();
+ conn->worker = &worker;
+ conn->on_setup();
+ update_conn(conn, true);
+ worker.feed(conn, client_fd);
+ }
+ } catch (ConnPoolError &e) {
+ SALTICIDAE_LOG_ERROR("%s", e.what());
+ throw e;
}
}
@@ -229,24 +233,29 @@ void ConnPool::_listen(NetAddr listen_addr) {
ev_listen.clear();
close(listen_fd);
}
- if ((listen_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
- throw ConnPoolError(std::string("cannot create socket for listening"));
- if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, (const char *)&one, sizeof(one)) < 0 ||
- setsockopt(listen_fd, SOL_TCP, TCP_NODELAY, (const char *)&one, sizeof(one)) < 0)
- throw ConnPoolError(std::string("setsockopt failed"));
- if (fcntl(listen_fd, F_SETFL, O_NONBLOCK) == -1)
- throw ConnPoolError(std::string("unable to set nonblocking socket"));
+ try {
+ if ((listen_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
+ throw ConnPoolError(SALTI_ERROR_LISTEN, errno);
+ if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, (const char *)&one, sizeof(one)) < 0 ||
+ setsockopt(listen_fd, SOL_TCP, TCP_NODELAY, (const char *)&one, sizeof(one)) < 0)
+ throw ConnPoolError(SALTI_ERROR_LISTEN, errno);
+ if (fcntl(listen_fd, F_SETFL, O_NONBLOCK) == -1)
+ throw ConnPoolError(SALTI_ERROR_LISTEN, errno);
- struct sockaddr_in sockin;
- memset(&sockin, 0, sizeof(struct sockaddr_in));
- sockin.sin_family = AF_INET;
- sockin.sin_addr.s_addr = INADDR_ANY;
- sockin.sin_port = listen_addr.port;
+ struct sockaddr_in sockin;
+ memset(&sockin, 0, sizeof(struct sockaddr_in));
+ sockin.sin_family = AF_INET;
+ sockin.sin_addr.s_addr = INADDR_ANY;
+ sockin.sin_port = listen_addr.port;
- if (bind(listen_fd, (struct sockaddr *)&sockin, sizeof(sockin)) < 0)
- throw ConnPoolError(std::string("binding error"));
- if (::listen(listen_fd, max_listen_backlog) < 0)
- throw ConnPoolError(std::string("listen error"));
+ if (bind(listen_fd, (struct sockaddr *)&sockin, sizeof(sockin)) < 0)
+ throw ConnPoolError(SALTI_ERROR_LISTEN, errno);
+ if (::listen(listen_fd, max_listen_backlog) < 0)
+ throw ConnPoolError(SALTI_ERROR_LISTEN, errno);
+ } catch (ConnPoolError &e) {
+ SALTICIDAE_LOG_ERROR("%s", e.what());
+ throw e;
+ }
ev_listen = FdEvent(disp_ec, listen_fd,
std::bind(&ConnPool::accept_client, this, _1, _2));
ev_listen.add(FdEvent::READ);
@@ -256,13 +265,18 @@ void ConnPool::_listen(NetAddr listen_addr) {
ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) {
int fd;
int one = 1;
- if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
- throw ConnPoolError(std::string("cannot create socket for remote"));
- if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const char *)&one, sizeof(one)) < 0 ||
- setsockopt(fd, SOL_TCP, TCP_NODELAY, (const char *)&one, sizeof(one)) < 0)
- throw ConnPoolError(std::string("setsockopt failed"));
- if (fcntl(fd, F_SETFL, O_NONBLOCK) == -1)
- throw ConnPoolError(std::string("unable to set nonblocking socket"));
+ try {
+ if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
+ throw ConnPoolError(SALTI_ERROR_CONNECT, errno);
+ if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const char *)&one, sizeof(one)) < 0 ||
+ setsockopt(fd, SOL_TCP, TCP_NODELAY, (const char *)&one, sizeof(one)) < 0)
+ throw ConnPoolError(SALTI_ERROR_CONNECT, errno);
+ if (fcntl(fd, F_SETFL, O_NONBLOCK) == -1)
+ throw ConnPoolError(SALTI_ERROR_CONNECT, errno);
+ } catch (ConnPoolError &e) {
+ SALTICIDAE_LOG_ERROR("%s", e.what());
+ throw e;
+ }
conn_t conn = create_conn();
conn->self_ref = conn;
conn->send_buffer.set_capacity(queue_capacity);
diff --git a/src/network.cpp b/src/network.cpp
index 49bad48..30ef0eb 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -44,9 +44,10 @@ void msgnetwork_config_queue_capacity(msgnetwork_config_t *self, size_t cap) {
self->queue_capacity(cap);
}
-bool msgnetwork_send_msg(msgnetwork_t *self,
- const msg_t *msg, const msgnetwork_conn_t *conn) {
- return self->_send_msg(*msg, *conn);
+void msgnetwork_send_msg_by_move(msgnetwork_t *self,
+ msg_t *_moved_msg, const msgnetwork_conn_t *conn) {
+ self->_send_msg(std::move(*_moved_msg), *conn);
+ delete _moved_msg;
}
msgnetwork_conn_t *msgnetwork_connect(msgnetwork_t *self, const netaddr_t *addr) {
@@ -167,7 +168,7 @@ void peernetwork_send_msg_by_move(peernetwork_t *self,
void peernetwork_multicast_msg_by_move(peernetwork_t *self,
msg_t *_moved_msg,
const netaddr_array_t *paddrs) {
- self->multicast_msg(std::move(*_moved_msg), *paddrs);
+ self->_multicast_msg(std::move(*_moved_msg), *paddrs);
delete _moved_msg;
}
diff --git a/src/util.cpp b/src/util.cpp
index a0d5044..f762b4c 100644
--- a/src/util.cpp
+++ b/src/util.cpp
@@ -34,6 +34,22 @@
namespace salticidae {
+const char *SALTICIDAE_ERROR_STRINGS[] = {
+ "",
+ "generic",
+ "unable to accept",
+ "unable to listen",
+ "unable to connect",
+ "peer already exists",
+ "peer does not exist",
+ "invalid NetAddr format",
+ "invalid OptVal format",
+ "option name already exists",
+ "unknown action",
+ "configuration file line too long",
+ "invalid option format"
+};
+
const char *TTY_COLOR_RED = "\x1b[31m";
const char *TTY_COLOR_GREEN = "\x1b[32m";
const char *TTY_COLOR_YELLOW = "\x1b[33m";
@@ -91,8 +107,6 @@ const std::string get_current_datetime() {
return std::string(buf);
}
-SalticidaeError::SalticidaeError() : msg("unknown") {}
-
void Logger::set_color() {
if (is_tty())
{
@@ -182,7 +196,7 @@ void Config::add_opt(const std::string &optname, const optval_t &optval, Action
char short_opt,
const std::string &optdoc) {
if (conf.count(optname))
- throw SalticidaeError("option name already exists");
+ throw SalticidaeError(SALTI_ERROR_OPTNAME_ALREADY_EXISTS);
opts.push_back(new Opt(optname, optdoc,
optval, action, short_opt,
opts.size()));
@@ -199,7 +213,7 @@ void Config::update(Opt &p, const char *optval) {
case SET_VAL: p.optval->set_val(optval); break;
case APPEND: p.optval->append(optval); break;
default:
- throw SalticidaeError("unknown action");
+ throw SalticidaeError(SALTI_ERROR_OPT_UNKNOWN_ACTION);
}
}
@@ -220,7 +234,7 @@ bool Config::load(const std::string &fname) {
if (strlen(buff) == BUFF_SIZE - 1)
{
fclose(conf_f);
- throw SalticidaeError("configuration file line too long");
+ throw SalticidaeError(SALTI_ERROR_CONFIG_LINE_TOO_LONG);
}
std::string line(buff);
size_t pos = line.find("=");
@@ -269,7 +283,7 @@ size_t Config::parse(int argc, char **argv) {
if (id == -1)
break;
if (id == '?')
- throw SalticidaeError("invalid option format");
+ throw SalticidaeError(SALTI_ERROR_OPT_INVALID);
if (id >= 0x100)
update(*(opts[id - 0x100]), optarg);
else
diff --git a/test/test_msgnet_c.c b/test/test_msgnet_c.c
index b717137..656bd80 100644
--- a/test/test_msgnet_c.c
+++ b/test/test_msgnet_c.c
@@ -35,23 +35,25 @@
const uint8_t MSG_OPCODE_HELLO = 0x0;
const uint8_t MSG_OPCODE_ACK = 0x1;
typedef struct MsgHello {
- const char *name;
- const char *text;
+ char *name;
+ char *text;
} MsgHello;
+
/** Defines how to serialize the msg. */
msg_t *msg_hello_serialize(const char *name, const char *text) {
datastream_t *serialized = datastream_new();
size_t name_len = strlen(name);
datastream_put_i32(serialized, (uint32_t)htole32(name_len));
- datastream_put_data(serialized, name, name + name_len);
- datastream_put_data(serialized, text, text + strlen(text));
- msg_t *msg = msg_new(MSG_OPCODE_HELLO, bytearray_new_moved_from_datastream(serialized));
+ datastream_put_data(serialized, name, name_len);
+ datastream_put_data(serialized, text, strlen(text));
+ msg_t *msg = msg_new_moved_from_bytearray(
+ MSG_OPCODE_HELLO, bytearray_new_moved_from_datastream(serialized));
return msg;
}
/** Defines how to parse the msg. */
MsgHello msg_hello_unserialize(const msg_t *msg) {
- datastream_t *s = msg_get_payload(msg);
+ datastream_t *s = msg_consume_payload(msg);
MsgHello res;
uint32_t len;
len = datastream_get_u32(s);
@@ -73,7 +75,7 @@ MsgHello msg_hello_unserialize(const msg_t *msg) {
}
msg_t *msg_ack_serialize() {
- msg_t *msg = msg_new(MSG_OPCODE_ACK, bytearray_new());
+ msg_t *msg = msg_new_moved_from_bytearray(MSG_OPCODE_ACK, bytearray_new());
return msg;
}
@@ -88,10 +90,11 @@ void on_receive_hello(const msg_t *_msg, const msgnetwork_conn_t *conn, void *us
const char *name = ((MyNet *)userdata)->name;
MsgHello msg = msg_hello_unserialize(_msg);
printf("[%s] %s says %s\n", name, msg.name, msg.text);
+ free(msg.name);
+ free(msg.text);
msg_t *ack = msg_ack_serialize();
/* send acknowledgement */
- msgnetwork_send_msg(net, ack, conn);
- msg_free(ack);
+ msgnetwork_send_msg_by_move(net, ack, conn);
}
void on_receive_ack(const msg_t *msg, const msgnetwork_conn_t *conn, void *userdata) {
@@ -110,8 +113,7 @@ void conn_handler(const msgnetwork_conn_t *conn, bool connected, void *userdata)
printf("[%s] Connected, sending hello.", name);
/* send the first message through this connection */
msg_t *hello = msg_hello_serialize(name, "Hello there!");
- msgnetwork_send_msg(n->net, hello, conn);
- msg_free(hello);
+ msgnetwork_send_msg_by_move(n->net, hello, conn);
}
else
printf("[%s] Accepted, waiting for greetings.\n", name);
@@ -120,7 +122,9 @@ void conn_handler(const msgnetwork_conn_t *conn, bool connected, void *userdata)
{
printf("[%s] Disconnected, retrying.\n", name);
/* try to reconnect to the same address */
- msgnetwork_connect(net, msgnetwork_conn_get_addr(conn));
+ netaddr_t *addr = msgnetwork_conn_get_addr(conn);
+ msgnetwork_connect(net, addr);
+ netaddr_free(addr);
}
}
@@ -136,7 +140,7 @@ MyNet gen_mynet(const eventcontext_t *ec,
static eventcontext_t *ec;
-void on_term_signal(int sig) {
+void on_term_signal(int sig, void *userdata) {
eventcontext_stop(ec);
}
@@ -166,25 +170,25 @@ int main() {
msgnetwork_listen(bob.net, bob_addr);
/* try to connect once */
- msgnetwork_connect(alice.net, bob_addr);
- msgnetwork_connect(bob.net, alice_addr);
+ msgnetwork_conn_free(msgnetwork_connect(alice.net, bob_addr));
+ msgnetwork_conn_free(msgnetwork_connect(bob.net, alice_addr));
netaddr_free(alice_addr);
netaddr_free(bob_addr);
/* the main loop can be shutdown by ctrl-c or kill */
- sigev_t *ev_sigint = sigev_new(ec, on_term_signal);
- sigev_t *ev_sigterm = sigev_new(ec, on_term_signal);
+ sigev_t *ev_sigint = sigev_new(ec, on_term_signal, NULL);
+ sigev_t *ev_sigterm = sigev_new(ec, on_term_signal, NULL);
sigev_add(ev_sigint, SIGINT);
sigev_add(ev_sigterm, SIGTERM);
/* enter the main loop */
eventcontext_dispatch(ec);
- sigev_free(ev_sigint);
- sigev_free(ev_sigterm);
msgnetwork_free(alice.net);
msgnetwork_free(bob.net);
+ sigev_free(ev_sigint);
+ sigev_free(ev_sigterm);
eventcontext_free(ec);
return 0;
}
diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp
index 70e3444..7321217 100644
--- a/test/test_p2p_stress.cpp
+++ b/test/test_p2p_stress.cpp
@@ -114,6 +114,11 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) {
}
}
});
+ net.reg_error_handler([ec](const std::exception &err, bool fatal) {
+ SALTICIDAE_LOG_WARN("main thread captured %s error: %s",
+ fatal ? "fatal" : "recoverable", err.what());
+ ec.stop();
+ });
net.reg_handler([&](MsgRand &&msg, const MyNet::conn_t &conn) {
uint256_t hash = salticidae::get_hash(msg.bytes);
net.send_msg(MsgAck(hash), conn);