aboutsummaryrefslogtreecommitdiff
path: root/include
diff options
context:
space:
mode:
authorDeterminant <ted.sybil@gmail.com>2019-06-18 01:48:18 -0400
committerDeterminant <ted.sybil@gmail.com>2019-06-18 01:48:18 -0400
commitd91fc3e873d4bddd5cdd69fda7f67bd780a0ac55 (patch)
treea6c2e4c18c47b8dd595f2a115e5514ddd1ee7ab4 /include
parenta154cb399a6fcbd3d4fd19ab46aa2c107128d344 (diff)
fix bugs in the benchmark example; keep both send_msg APIs
Diffstat (limited to 'include')
-rw-r--r--include/salticidae/event.h79
-rw-r--r--include/salticidae/network.h93
2 files changed, 135 insertions, 37 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index c4f65ba..d3625d5 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -418,6 +418,85 @@ class SigEvent {
const EventContext &get_ec() const { return ec; }
};
+class CheckEvent {
+ public:
+ using callback_t = std::function<void()>;
+ private:
+ EventContext ec;
+ uv_check_t *ev_check;
+ callback_t callback;
+ static inline void check_then(uv_check_t *h) {
+ auto event = static_cast<CheckEvent *>(h->data);
+ event->callback();
+ }
+
+ public:
+ CheckEvent(): ec(nullptr), ev_check(nullptr) {}
+ CheckEvent(const EventContext &ec, callback_t callback):
+ ec(ec), ev_check(new uv_check_t()),
+ callback(std::move(callback)) {
+ uv_check_init(ec.get(), ev_check);
+ ev_check->data = this;
+ }
+
+ CheckEvent(const CheckEvent &) = delete;
+ CheckEvent(CheckEvent &&other):
+ ec(std::move(other.ec)), ev_check(other.ev_check),
+ callback(std::move(other.callback)) {
+ other.ev_check = nullptr;
+ if (ev_check != nullptr)
+ ev_check->data = this;
+ }
+
+ void swap(CheckEvent &other) {
+ std::swap(ec, other.ec);
+ std::swap(ev_check, other.ev_check);
+ std::swap(callback, other.callback);
+ if (ev_check != nullptr)
+ ev_check->data = this;
+ if (other.ev_check != nullptr)
+ other.ev_check->data = &other;
+ }
+
+ CheckEvent &operator=(CheckEvent &&other) {
+ if (this != &other)
+ {
+ CheckEvent tmp(std::move(other));
+ tmp.swap(*this);
+ }
+ return *this;
+ }
+
+ ~CheckEvent() { clear(); }
+
+ void clear() {
+ if (ev_check != nullptr)
+ {
+ uv_check_stop(ev_check);
+ uv_close((uv_handle_t *)ev_check, _on_uv_handle_close);
+ ev_check = nullptr;
+ }
+ callback = nullptr;
+ }
+
+ void set_callback(callback_t _callback) {
+ callback = _callback;
+ }
+
+ void add() {
+ assert(ev_check != nullptr);
+ uv_check_start(ev_check, CheckEvent::check_then);
+ }
+
+ void del() {
+ if (ev_check != nullptr) uv_check_stop(ev_check);
+ }
+
+ operator bool() const { return ev_check != nullptr; }
+
+ const EventContext &get_ec() const { return ec; }
+};
+
template<typename T>
class ThreadNotifier {
std::condition_variable cv;
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index a09f8ac..e9fdae6 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -199,14 +199,16 @@ class MsgNetwork: public ConnPool {
}
template<typename MsgType>
- void send_msg(MsgType &&msg, const conn_t &conn);
- inline void _send_msg(Msg &&msg, const conn_t &conn);
- inline void _send_msg_dispatcher(const Msg &msg, const conn_t &conn);
+ inline void send_msg(const MsgType &msg, const conn_t &conn);
+ inline void _send_msg(const Msg &msg, const conn_t &conn);
+ template<typename MsgType>
+ inline void send_msg_deferred(MsgType &&msg, const conn_t &conn);
+ inline void _send_msg_deferred(Msg &&msg, const conn_t &conn);
void stop() { stop_workers(); }
using ConnPool::listen;
- conn_t connect(const NetAddr &addr) {
- return static_pointer_cast<Conn>(ConnPool::connect(addr));
+ conn_t connect(const NetAddr &addr, bool blocking = true) {
+ return static_pointer_cast<Conn>(ConnPool::connect(addr, blocking));
}
};
@@ -446,8 +448,11 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
const conn_t get_peer_conn(const NetAddr &paddr) const;
using MsgNet::send_msg;
template<typename MsgType>
- void send_msg(MsgType &&msg, const NetAddr &paddr);
- inline void _send_msg(Msg &&msg, const NetAddr &paddr);
+ inline void send_msg(const MsgType &msg, const NetAddr &paddr);
+ inline void _send_msg(const Msg &msg, const NetAddr &paddr);
+ template<typename MsgType>
+ inline void send_msg_deferred(MsgType &&msg, const NetAddr &paddr);
+ inline void _send_msg_deferred(Msg &&msg, const NetAddr &paddr);
template<typename MsgType>
void multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs);
inline void _multicast_msg(Msg &&msg, const std::vector<NetAddr> &paddrs);
@@ -496,20 +501,39 @@ void MsgNetwork<OpcodeType>::Conn::on_read() {
template<typename OpcodeType>
template<typename MsgType>
-void MsgNetwork<OpcodeType>::send_msg(MsgType &&msg, const conn_t &conn) {
- return _send_msg(MsgType(std::move(msg)), conn);
+inline void MsgNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const conn_t &conn) {
+ return _send_msg_deferred(MsgType(std::move(msg)), conn);
}
template<typename OpcodeType>
-inline void MsgNetwork<OpcodeType>::_send_msg(Msg &&msg, const conn_t &conn) {
+inline void MsgNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const conn_t &conn) {
this->disp_tcall->async_call(
[this, msg=std::move(msg), conn](ThreadCall::Handle &) {
try {
- this->_send_msg_dispatcher(msg, conn);
+ this->send_msg(msg, conn);
} catch (...) { this->recoverable_error(std::current_exception()); }
});
}
+template<typename OpcodeType>
+template<typename MsgType>
+inline void MsgNetwork<OpcodeType>::send_msg(const MsgType &msg, const conn_t &conn) {
+ return _send_msg(msg, conn);
+}
+
+template<typename OpcodeType>
+inline void MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn) {
+ bytearray_t msg_data = msg.serialize();
+ SALTICIDAE_LOG_DEBUG("wrote message %s to %s",
+ std::string(msg).c_str(),
+ std::string(*conn).c_str());
+#ifdef SALTICIDAE_MSG_STAT
+ conn->nsent++;
+ conn->nsentb += msg.get_length();
+#endif
+ conn->write(std::move(msg_data));
+}
+
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker,
const conn_t &conn, double timeout) {
@@ -686,19 +710,6 @@ typename PeerNetwork<O, _, __>::Peer *PeerNetwork<O, _, __>::get_peer(const NetA
if (it != id2upeer.end()) return it->second.get();
return nullptr;
}
-
-template<typename OpcodeType>
-inline void MsgNetwork<OpcodeType>::_send_msg_dispatcher(const Msg &msg, const conn_t &conn) {
- bytearray_t msg_data = msg.serialize();
- SALTICIDAE_LOG_DEBUG("wrote message %s to %s",
- std::string(msg).c_str(),
- std::string(*conn).c_str());
-#ifdef SALTICIDAE_MSG_STAT
- conn->nsent++;
- conn->nsentb += msg.get_length();
-#endif
- conn->write(std::move(msg_data));
-}
/* end: functions invoked by the dispatcher */
/* begin: functions invoked by the user loop */
@@ -826,33 +837,41 @@ bool PeerNetwork<O, _, __>::has_peer(const NetAddr &paddr) const {
template<typename O, O _, O __>
template<typename MsgType>
-void PeerNetwork<O, _, __>::send_msg(MsgType &&msg, const NetAddr &paddr) {
- return _send_msg(MsgType(std::move(msg)), paddr);
+inline void PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const NetAddr &paddr) {
+ return _send_msg_deferred(MsgType(std::move(msg)), paddr);
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::_send_msg(Msg &&msg, const NetAddr &paddr) {
+inline void PeerNetwork<O, _, __>::_send_msg_deferred(Msg &&msg, const NetAddr &paddr) {
this->disp_tcall->async_call(
- [this, msg=std::move(msg), paddr](ThreadCall::Handle &) {
+ [this, msg=std::move(msg), paddr](ThreadCall::Handle &) {
try {
- auto p = get_peer(paddr);
- if (!p)
- throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- this->_send_msg_dispatcher(msg, p->conn);
- } catch (const PeerNetworkError &) {
- this->recoverable_error(std::current_exception());
+ send_msg(msg, paddr);
} catch (...) { this->recoverable_error(std::current_exception()); }
});
}
template<typename O, O _, O __>
template<typename MsgType>
-void PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs) {
+inline void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &paddr) {
+ return _send_msg(msg, paddr);
+}
+
+template<typename O, O _, O __>
+inline void PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const NetAddr &paddr) {
+ auto p = get_peer(paddr);
+ if (!p) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
+ this->send_msg(msg, p->conn);
+}
+
+template<typename O, O _, O __>
+template<typename MsgType>
+inline void PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs) {
return _multicast_msg(MsgType(std::move(msg)), paddrs);
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &paddrs) {
+inline void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &paddrs) {
this->disp_tcall->async_call(
[this, msg=std::move(msg), paddrs](ThreadCall::Handle &) {
try {
@@ -861,7 +880,7 @@ void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr>
auto p = get_peer(addr);
if (!p)
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- this->_send_msg_dispatcher(msg, p->conn);
+ this->send_msg(msg, p->conn);
}
} catch (const PeerNetworkError &) {
this->recoverable_error(std::current_exception());