aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2019-06-18 01:48:18 -0400
committerDeterminant <[email protected]>2019-06-18 01:48:18 -0400
commitd91fc3e873d4bddd5cdd69fda7f67bd780a0ac55 (patch)
treea6c2e4c18c47b8dd595f2a115e5514ddd1ee7ab4
parenta154cb399a6fcbd3d4fd19ab46aa2c107128d344 (diff)
fix bugs in the benchmark example; keep both send_msg APIs
-rw-r--r--include/salticidae/event.h79
-rw-r--r--include/salticidae/network.h93
-rw-r--r--src/network.cpp4
-rw-r--r--test/bench_network.cpp70
4 files changed, 166 insertions, 80 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index c4f65ba..d3625d5 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -418,6 +418,85 @@ class SigEvent {
const EventContext &get_ec() const { return ec; }
};
+class CheckEvent {
+ public:
+ using callback_t = std::function<void()>;
+ private:
+ EventContext ec;
+ uv_check_t *ev_check;
+ callback_t callback;
+ static inline void check_then(uv_check_t *h) {
+ auto event = static_cast<CheckEvent *>(h->data);
+ event->callback();
+ }
+
+ public:
+ CheckEvent(): ec(nullptr), ev_check(nullptr) {}
+ CheckEvent(const EventContext &ec, callback_t callback):
+ ec(ec), ev_check(new uv_check_t()),
+ callback(std::move(callback)) {
+ uv_check_init(ec.get(), ev_check);
+ ev_check->data = this;
+ }
+
+ CheckEvent(const CheckEvent &) = delete;
+ CheckEvent(CheckEvent &&other):
+ ec(std::move(other.ec)), ev_check(other.ev_check),
+ callback(std::move(other.callback)) {
+ other.ev_check = nullptr;
+ if (ev_check != nullptr)
+ ev_check->data = this;
+ }
+
+ void swap(CheckEvent &other) {
+ std::swap(ec, other.ec);
+ std::swap(ev_check, other.ev_check);
+ std::swap(callback, other.callback);
+ if (ev_check != nullptr)
+ ev_check->data = this;
+ if (other.ev_check != nullptr)
+ other.ev_check->data = &other;
+ }
+
+ CheckEvent &operator=(CheckEvent &&other) {
+ if (this != &other)
+ {
+ CheckEvent tmp(std::move(other));
+ tmp.swap(*this);
+ }
+ return *this;
+ }
+
+ ~CheckEvent() { clear(); }
+
+ void clear() {
+ if (ev_check != nullptr)
+ {
+ uv_check_stop(ev_check);
+ uv_close((uv_handle_t *)ev_check, _on_uv_handle_close);
+ ev_check = nullptr;
+ }
+ callback = nullptr;
+ }
+
+ void set_callback(callback_t _callback) {
+ callback = _callback;
+ }
+
+ void add() {
+ assert(ev_check != nullptr);
+ uv_check_start(ev_check, CheckEvent::check_then);
+ }
+
+ void del() {
+ if (ev_check != nullptr) uv_check_stop(ev_check);
+ }
+
+ operator bool() const { return ev_check != nullptr; }
+
+ const EventContext &get_ec() const { return ec; }
+};
+
template<typename T>
class ThreadNotifier {
std::condition_variable cv;
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index a09f8ac..e9fdae6 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -199,14 +199,16 @@ class MsgNetwork: public ConnPool {
}
template<typename MsgType>
- void send_msg(MsgType &&msg, const conn_t &conn);
- inline void _send_msg(Msg &&msg, const conn_t &conn);
- inline void _send_msg_dispatcher(const Msg &msg, const conn_t &conn);
+ inline void send_msg(const MsgType &msg, const conn_t &conn);
+ inline void _send_msg(const Msg &msg, const conn_t &conn);
+ template<typename MsgType>
+ inline void send_msg_deferred(MsgType &&msg, const conn_t &conn);
+ inline void _send_msg_deferred(Msg &&msg, const conn_t &conn);
void stop() { stop_workers(); }
using ConnPool::listen;
- conn_t connect(const NetAddr &addr) {
- return static_pointer_cast<Conn>(ConnPool::connect(addr));
+ conn_t connect(const NetAddr &addr, bool blocking = true) {
+ return static_pointer_cast<Conn>(ConnPool::connect(addr, blocking));
}
};
@@ -446,8 +448,11 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
const conn_t get_peer_conn(const NetAddr &paddr) const;
using MsgNet::send_msg;
template<typename MsgType>
- void send_msg(MsgType &&msg, const NetAddr &paddr);
- inline void _send_msg(Msg &&msg, const NetAddr &paddr);
+ inline void send_msg(const MsgType &msg, const NetAddr &paddr);
+ inline void _send_msg(const Msg &msg, const NetAddr &paddr);
+ template<typename MsgType>
+ inline void send_msg_deferred(MsgType &&msg, const NetAddr &paddr);
+ inline void _send_msg_deferred(Msg &&msg, const NetAddr &paddr);
template<typename MsgType>
void multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs);
inline void _multicast_msg(Msg &&msg, const std::vector<NetAddr> &paddrs);
@@ -496,20 +501,39 @@ void MsgNetwork<OpcodeType>::Conn::on_read() {
template<typename OpcodeType>
template<typename MsgType>
-void MsgNetwork<OpcodeType>::send_msg(MsgType &&msg, const conn_t &conn) {
- return _send_msg(MsgType(std::move(msg)), conn);
+inline void MsgNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const conn_t &conn) {
+ return _send_msg_deferred(MsgType(std::move(msg)), conn);
}
template<typename OpcodeType>
-inline void MsgNetwork<OpcodeType>::_send_msg(Msg &&msg, const conn_t &conn) {
+inline void MsgNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const conn_t &conn) {
this->disp_tcall->async_call(
[this, msg=std::move(msg), conn](ThreadCall::Handle &) {
try {
- this->_send_msg_dispatcher(msg, conn);
+ this->send_msg(msg, conn);
} catch (...) { this->recoverable_error(std::current_exception()); }
});
}
+template<typename OpcodeType>
+template<typename MsgType>
+inline void MsgNetwork<OpcodeType>::send_msg(const MsgType &msg, const conn_t &conn) {
+ return _send_msg(msg, conn);
+}
+
+template<typename OpcodeType>
+inline void MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn) {
+ bytearray_t msg_data = msg.serialize();
+ SALTICIDAE_LOG_DEBUG("wrote message %s to %s",
+ std::string(msg).c_str(),
+ std::string(*conn).c_str());
+#ifdef SALTICIDAE_MSG_STAT
+ conn->nsent++;
+ conn->nsentb += msg.get_length();
+#endif
+ conn->write(std::move(msg_data));
+}
+
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker,
const conn_t &conn, double timeout) {
@@ -686,19 +710,6 @@ typename PeerNetwork<O, _, __>::Peer *PeerNetwork<O, _, __>::get_peer(const NetA
if (it != id2upeer.end()) return it->second.get();
return nullptr;
}
-
-template<typename OpcodeType>
-inline void MsgNetwork<OpcodeType>::_send_msg_dispatcher(const Msg &msg, const conn_t &conn) {
- bytearray_t msg_data = msg.serialize();
- SALTICIDAE_LOG_DEBUG("wrote message %s to %s",
- std::string(msg).c_str(),
- std::string(*conn).c_str());
-#ifdef SALTICIDAE_MSG_STAT
- conn->nsent++;
- conn->nsentb += msg.get_length();
-#endif
- conn->write(std::move(msg_data));
-}
/* end: functions invoked by the dispatcher */
/* begin: functions invoked by the user loop */
@@ -826,33 +837,41 @@ bool PeerNetwork<O, _, __>::has_peer(const NetAddr &paddr) const {
template<typename O, O _, O __>
template<typename MsgType>
-void PeerNetwork<O, _, __>::send_msg(MsgType &&msg, const NetAddr &paddr) {
- return _send_msg(MsgType(std::move(msg)), paddr);
+inline void PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const NetAddr &paddr) {
+ return _send_msg_deferred(MsgType(std::move(msg)), paddr);
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::_send_msg(Msg &&msg, const NetAddr &paddr) {
+inline void PeerNetwork<O, _, __>::_send_msg_deferred(Msg &&msg, const NetAddr &paddr) {
this->disp_tcall->async_call(
- [this, msg=std::move(msg), paddr](ThreadCall::Handle &) {
+ [this, msg=std::move(msg), paddr](ThreadCall::Handle &) {
try {
- auto p = get_peer(paddr);
- if (!p)
- throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- this->_send_msg_dispatcher(msg, p->conn);
- } catch (const PeerNetworkError &) {
- this->recoverable_error(std::current_exception());
+ send_msg(msg, paddr);
} catch (...) { this->recoverable_error(std::current_exception()); }
});
}
template<typename O, O _, O __>
template<typename MsgType>
-void PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs) {
+inline void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &paddr) {
+ return _send_msg(msg, paddr);
+}
+
+template<typename O, O _, O __>
+inline void PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const NetAddr &paddr) {
+ auto p = get_peer(paddr);
+ if (!p) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
+ this->send_msg(msg, p->conn);
+}
+
+template<typename O, O _, O __>
+template<typename MsgType>
+inline void PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs) {
return _multicast_msg(MsgType(std::move(msg)), paddrs);
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &paddrs) {
+inline void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &paddrs) {
this->disp_tcall->async_call(
[this, msg=std::move(msg), paddrs](ThreadCall::Handle &) {
try {
@@ -861,7 +880,7 @@ void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr>
auto p = get_peer(addr);
if (!p)
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- this->_send_msg_dispatcher(msg, p->conn);
+ this->send_msg(msg, p->conn);
}
} catch (const PeerNetworkError &) {
this->recoverable_error(std::current_exception());
diff --git a/src/network.cpp b/src/network.cpp
index 938c105..b8d058a 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -46,7 +46,7 @@ void msgnetwork_config_queue_capacity(msgnetwork_config_t *self, size_t cap) {
void msgnetwork_send_msg_by_move(msgnetwork_t *self,
msg_t *_moved_msg, const msgnetwork_conn_t *conn) {
- self->_send_msg(std::move(*_moved_msg), *conn);
+ self->_send_msg_deferred(std::move(*_moved_msg), *conn);
//delete _moved_msg;
}
@@ -213,7 +213,7 @@ void peernetwork_conn_free(const peernetwork_conn_t *self) { delete self; }
void peernetwork_send_msg_by_move(peernetwork_t *self,
msg_t * _moved_msg, const netaddr_t *paddr) {
- self->_send_msg(std::move(*_moved_msg), *paddr);
+ self->_send_msg_deferred(std::move(*_moved_msg), *paddr);
//delete _moved_msg;
}
diff --git a/test/bench_network.cpp b/test/bench_network.cpp
index 58f4a64..a498954 100644
--- a/test/bench_network.cpp
+++ b/test/bench_network.cpp
@@ -43,6 +43,7 @@ using salticidae::htole;
using salticidae::letoh;
using salticidae::bytearray_t;
using salticidae::TimerEvent;
+using salticidae::ThreadCall;
using std::placeholders::_1;
using std::placeholders::_2;
using opcode_t = uint8_t;
@@ -71,9 +72,10 @@ using MsgNetworkByteOp = MsgNetwork<opcode_t>;
struct MyNet: public MsgNetworkByteOp {
const std::string name;
const NetAddr peer;
- TimerEvent ev_period_send;
TimerEvent ev_period_stat;
+ ThreadCall tcall;
size_t nrecv;
+ std::function<void(ThreadCall::Handle &)> trigger;
MyNet(const salticidae::EventContext &ec,
const std::string name,
@@ -88,50 +90,36 @@ struct MyNet: public MsgNetworkByteOp {
nrecv = 0;
ev_period_stat.add(stat_timeout);
}),
+ tcall(ec),
nrecv(0) {
/* message handler could be a bound method */
- reg_handler(salticidae::generic_bind(
- &MyNet::on_receive_bytes, this, _1, _2));
+ reg_handler(salticidae::generic_bind(&MyNet::on_receive_bytes, this, _1, _2));
if (stat_timeout > 0)
ev_period_stat.add(0);
- }
-
- struct Conn: public MsgNetworkByteOp::Conn {
- MyNet *get_net() { return static_cast<MyNet *>(get_pool()); }
- salticidae::ArcObj<Conn> self() {
- return salticidae::static_pointer_cast<Conn>(
- MsgNetworkByteOp::Conn::self());
- }
-
- void on_setup() override {
- auto net = get_net();
- if (get_mode() == ACTIVE)
+ reg_conn_handler([this, ec](const ConnPool::conn_t &conn, bool connected) {
+ if (connected)
{
- printf("[%s] Connected, sending hello.\n",
- net->name.c_str());
- /* send the first message through this connection */
- net->ev_period_send = TimerEvent(net->ec,
- [net, conn = self()](TimerEvent &) {
- net->send_msg(MsgBytes(256), conn);
- net->ev_period_send.add(0);
- });
- net->ev_period_send.add(0);
+ if (conn->get_mode() == MyNet::Conn::ACTIVE)
+ {
+ printf("[%s] Connected, sending hello.\n", this->name.c_str());
+ /* send the first message through this connection */
+ trigger = [this, conn](ThreadCall::Handle &) {
+ send_msg(MsgBytes(256), salticidae::static_pointer_cast<Conn>(conn));
+ if (conn->get_mode() != MyNet::Conn::DEAD)
+ tcall.async_call(trigger);
+ };
+ tcall.async_call(trigger);
+ }
+ else
+ printf("[%s] Passively connected, waiting for greetings.\n", this->name.c_str());
}
else
- printf("[%s] Passively connected, waiting for greetings.\n",
- net->name.c_str());
- }
- void on_teardown() override {
- auto net = get_net();
- net->ev_period_send.clear();
- printf("[%s] Disconnected, retrying.\n", net->name.c_str());
- /* try to reconnect to the same address */
- net->connect(get_addr());
- }
- };
-
- salticidae::ConnPool::Conn *create_conn() override {
- return new Conn();
+ {
+ printf("[%s] Disconnected, retrying.\n", this->name.c_str());
+ /* try to reconnect to the same address */
+ connect(conn->get_addr(), false);
+ }
+ });
}
void on_receive_bytes(MsgBytes &&msg, const conn_t &conn) {
@@ -148,7 +136,7 @@ int main() {
alice->start();
alice->listen(alice_addr);
salticidae::EventContext tec;
- salticidae::BoxObj<salticidae::ThreadCall> tcall = new salticidae::ThreadCall(tec);
+ salticidae::BoxObj<ThreadCall> tcall = new ThreadCall(tec);
std::thread bob_thread([&tec]() {
MyNet bob(tec, "Bob", alice_addr);
bob.start();
@@ -163,8 +151,8 @@ int main() {
tec.stop();
});
alice = nullptr;
- //ec.stop();
- //bob_thread.join();
+ ec.stop();
+ bob_thread.join();
};
salticidae::SigEvent ev_sigint(ec, shutdown);
salticidae::SigEvent ev_sigterm(ec, shutdown);