aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/network.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/network.h')
-rw-r--r--include/salticidae/network.h55
1 files changed, 45 insertions, 10 deletions
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index fd69b58..6f7a034 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -45,7 +45,7 @@ class MsgNetwork: public ConnPool {
template<typename T>
struct callback_traits:
public callback_traits<decltype(&T::operator())> {};
-
+
/* match plain functions */
template<typename ReturnType, typename MsgType, typename ConnType>
struct callback_traits<ReturnType(MsgType, ConnType)> {
@@ -53,17 +53,17 @@ class MsgNetwork: public ConnPool {
using conn_type = typename std::remove_reference<ConnType>::type::type;
using msg_type = typename std::remove_reference<MsgType>::type;
};
-
+
/* match function pointers */
template<typename ReturnType, typename... Args>
struct callback_traits<ReturnType(*)(Args...)>:
public callback_traits<ReturnType(Args...)> {};
-
+
/* match const member functions */
template<typename ClassType, typename ReturnType, typename... Args>
struct callback_traits<ReturnType(ClassType::*)(Args...) const>:
public callback_traits<ReturnType(Args...)> {};
-
+
/* match member functions */
template<typename ClassType, typename ReturnType, typename... Args>
struct callback_traits<ReturnType(ClassType::*)(Args...)>:
@@ -78,6 +78,8 @@ class MsgNetwork: public ConnPool {
Msg msg;
MsgState msg_state;
+ bool msg_sleep;
+ TimerEvent ev_enqueue_poll;
protected:
#ifdef SALTICIDAE_MSG_STAT
@@ -88,7 +90,7 @@ class MsgNetwork: public ConnPool {
#endif
public:
- Conn(): msg_state(HEADER)
+ Conn(): msg_state(HEADER), msg_sleep(false)
#ifdef SALTICIDAE_MSG_STAT
, nsent(0), nrecv(0), nsentb(0), nrecvb(0)
#endif
@@ -128,6 +130,26 @@ class MsgNetwork: public ConnPool {
ConnPool::Conn *create_conn() override { return new Conn(); }
void on_read(const ConnPool::conn_t &) override;
+ void on_setup(const ConnPool::conn_t &_conn) override {
+ auto conn = static_pointer_cast<Conn>(_conn);
+ conn->ev_enqueue_poll = TimerEvent(conn->worker->get_ec(),
+ [this, conn](TimerEvent &) {
+ if (!incoming_msgs.enqueue(std::make_pair(conn->msg, conn), false))
+ {
+ conn->msg_sleep = true;
+ conn->ev_enqueue_poll.add(0);
+ return;
+ }
+ conn->msg_sleep = false;
+ on_read(conn);
+ });
+ }
+
+ void on_teardown(const ConnPool::conn_t &_conn) override {
+ auto conn = static_pointer_cast<Conn>(_conn);
+ conn->ev_enqueue_poll.clear();
+ }
+
public:
class Config: public ConnPool::Config {
@@ -153,7 +175,7 @@ class MsgNetwork: public ConnPool {
incoming_msgs.reg_handler(ec, [this, burst_size=config._burst_size](queue_t &q) {
std::pair<Msg, conn_t> item;
size_t cnt = 0;
- while (q.try_dequeue(item))
+ while (q.try_dequeue(item) && this->system_state == 1)
{
auto &msg = item.first;
auto &conn = item.second;
@@ -525,8 +547,9 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
/* this callback is run by a worker */
template<typename OpcodeType>
void MsgNetwork<OpcodeType>::on_read(const ConnPool::conn_t &_conn) {
- ConnPool::on_read(_conn);
auto conn = static_pointer_cast<Conn>(_conn);
+ if (conn->msg_sleep) return;
+ ConnPool::on_read(_conn);
auto &recv_buffer = conn->recv_buffer;
auto &msg = conn->msg;
auto &msg_state = conn->msg_state;
@@ -550,13 +573,25 @@ void MsgNetwork<OpcodeType>::on_read(const ConnPool::conn_t &_conn) {
if (!msg.verify_checksum())
{
SALTICIDAE_LOG_WARN("checksums do not match, dropping the message");
- return;
+ break;
}
#endif
- while (!incoming_msgs.enqueue(std::make_pair(msg, conn), true))
- std::this_thread::yield();
+ if (!incoming_msgs.enqueue(std::make_pair(msg, conn), false))
+ {
+ conn->msg_sleep = true;
+ conn->ev_enqueue_poll.add(0);
+ return;
+ }
}
}
+ if (conn->ready_recv && recv_buffer.size() < conn->max_recv_buff_size)
+ {
+ /* resume reading from socket */
+ conn->ev_socket.del();
+ conn->ev_socket.add(FdEvent::READ |
+ (conn->ready_send ? 0: FdEvent::WRITE));
+ conn->send_data_func(conn, conn->fd, FdEvent::READ);
+ }
}
template<typename OpcodeType>