aboutsummaryrefslogtreecommitdiff
path: root/include
diff options
context:
space:
mode:
Diffstat (limited to 'include')
-rw-r--r--include/salticidae/conn.h37
-rw-r--r--include/salticidae/network.h55
2 files changed, 72 insertions, 20 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 56d2d6a..a594da6 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -69,10 +69,11 @@ class ConnPool {
ACTIVE, /**< the connection is established by connect() */
PASSIVE, /**< the connection is established by accept() */
};
-
+
protected:
std::atomic<bool> terminated;
size_t seg_buff_size;
+ size_t max_recv_buff_size;
int fd;
Worker *worker;
ConnPool *cpool;
@@ -86,13 +87,14 @@ class ConnPool {
FdEvent ev_socket;
/** does not need to wait if true */
bool ready_send;
+ bool ready_recv;
typedef void (socket_io_func)(const conn_t &, int, int);
socket_io_func *send_data_func;
socket_io_func *recv_data_func;
BoxObj<TLS> tls;
BoxObj<const X509> peer_cert;
-
+
static socket_io_func _recv_data;
static socket_io_func _send_data;
@@ -107,12 +109,13 @@ class ConnPool {
virtual void stop();
public:
- Conn(): terminated(false), worker(nullptr), ready_send(false),
+ Conn(): terminated(false), worker(nullptr),
+ ready_send(false), ready_recv(false),
send_data_func(nullptr), recv_data_func(nullptr),
tls(nullptr), peer_cert(nullptr) {}
Conn(const Conn &) = delete;
Conn(Conn &&other) = delete;
-
+
virtual ~Conn() {
SALTICIDAE_LOG_INFO("destroyed %s", std::string(*this).c_str());
}
@@ -139,6 +142,7 @@ class ConnPool {
};
protected:
+ int system_state;
EventContext ec;
EventContext disp_ec;
ThreadCall* disp_tcall;
@@ -178,6 +182,7 @@ class ConnPool {
const int max_listen_backlog;
const double conn_server_timeout;
const size_t seg_buff_size;
+ const size_t max_recv_buff_size;
const size_t queue_capacity;
tls_context_t tls_ctx;
@@ -230,7 +235,12 @@ class ConnPool {
/* the following functions are called by the dispatcher */
void start() {
- handle = std::thread([this]() { ec.dispatch(); });
+ handle = std::thread([this]() {
+ sigset_t mask;
+ sigfillset(&mask);
+ pthread_sigmask(SIG_BLOCK, &mask, NULL);
+ ec.dispatch();
+ });
}
void enable_send_buffer(const conn_t &conn, int client_fd) {
@@ -240,7 +250,8 @@ class ConnPool {
if (conn->ready_send)
{
conn->ev_socket.del();
- conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE);
+ conn->ev_socket.add((conn->ready_recv ? 0 : FdEvent::READ) |
+ FdEvent::WRITE);
conn->send_data_func(conn, client_fd, FdEvent::WRITE);
}
return false;
@@ -318,7 +329,6 @@ class ConnPool {
/* related to workers */
size_t nworker;
salticidae::BoxObj<Worker[]> workers;
- int system_state;
void accept_client(int, int);
void conn_server(const conn_t &conn, int, int);
@@ -348,6 +358,7 @@ class ConnPool {
int _max_listen_backlog;
double _conn_server_timeout;
size_t _seg_buff_size;
+ size_t _max_recv_buff_size;
size_t _nworker;
size_t _queue_capacity;
bool _enable_tls;
@@ -363,6 +374,7 @@ class ConnPool {
_max_listen_backlog(10),
_conn_server_timeout(2),
_seg_buff_size(4096),
+ _max_recv_buff_size(4096),
_nworker(1),
_queue_capacity(0),
_enable_tls(false),
@@ -388,6 +400,11 @@ class ConnPool {
return *this;
}
+ Config &max_recv_buff_size(size_t x) {
+ _max_recv_buff_size = x;
+ return *this;
+ }
+
Config &nworker(size_t x) {
_nworker = std::max((size_t)1, x);
return *this;
@@ -435,17 +452,17 @@ class ConnPool {
};
ConnPool(const EventContext &ec, const Config &config):
- ec(ec),
+ system_state(0), ec(ec),
enable_tls(config._enable_tls),
async_id(0),
max_listen_backlog(config._max_listen_backlog),
conn_server_timeout(config._conn_server_timeout),
seg_buff_size(config._seg_buff_size),
+ max_recv_buff_size(config._max_recv_buff_size),
queue_capacity(config._queue_capacity),
tls_ctx(nullptr),
listen_fd(-1),
- nworker(config._nworker),
- system_state(0) {
+ nworker(config._nworker) {
if (enable_tls)
{
tls_ctx = new TLSContext();
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index fd69b58..6f7a034 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -45,7 +45,7 @@ class MsgNetwork: public ConnPool {
template<typename T>
struct callback_traits:
public callback_traits<decltype(&T::operator())> {};
-
+
/* match plain functions */
template<typename ReturnType, typename MsgType, typename ConnType>
struct callback_traits<ReturnType(MsgType, ConnType)> {
@@ -53,17 +53,17 @@ class MsgNetwork: public ConnPool {
using conn_type = typename std::remove_reference<ConnType>::type::type;
using msg_type = typename std::remove_reference<MsgType>::type;
};
-
+
/* match function pointers */
template<typename ReturnType, typename... Args>
struct callback_traits<ReturnType(*)(Args...)>:
public callback_traits<ReturnType(Args...)> {};
-
+
/* match const member functions */
template<typename ClassType, typename ReturnType, typename... Args>
struct callback_traits<ReturnType(ClassType::*)(Args...) const>:
public callback_traits<ReturnType(Args...)> {};
-
+
/* match member functions */
template<typename ClassType, typename ReturnType, typename... Args>
struct callback_traits<ReturnType(ClassType::*)(Args...)>:
@@ -78,6 +78,8 @@ class MsgNetwork: public ConnPool {
Msg msg;
MsgState msg_state;
+ bool msg_sleep;
+ TimerEvent ev_enqueue_poll;
protected:
#ifdef SALTICIDAE_MSG_STAT
@@ -88,7 +90,7 @@ class MsgNetwork: public ConnPool {
#endif
public:
- Conn(): msg_state(HEADER)
+ Conn(): msg_state(HEADER), msg_sleep(false)
#ifdef SALTICIDAE_MSG_STAT
, nsent(0), nrecv(0), nsentb(0), nrecvb(0)
#endif
@@ -128,6 +130,26 @@ class MsgNetwork: public ConnPool {
ConnPool::Conn *create_conn() override { return new Conn(); }
void on_read(const ConnPool::conn_t &) override;
+ void on_setup(const ConnPool::conn_t &_conn) override {
+ auto conn = static_pointer_cast<Conn>(_conn);
+ conn->ev_enqueue_poll = TimerEvent(conn->worker->get_ec(),
+ [this, conn](TimerEvent &) {
+ if (!incoming_msgs.enqueue(std::make_pair(conn->msg, conn), false))
+ {
+ conn->msg_sleep = true;
+ conn->ev_enqueue_poll.add(0);
+ return;
+ }
+ conn->msg_sleep = false;
+ on_read(conn);
+ });
+ }
+
+ void on_teardown(const ConnPool::conn_t &_conn) override {
+ auto conn = static_pointer_cast<Conn>(_conn);
+ conn->ev_enqueue_poll.clear();
+ }
+
public:
class Config: public ConnPool::Config {
@@ -153,7 +175,7 @@ class MsgNetwork: public ConnPool {
incoming_msgs.reg_handler(ec, [this, burst_size=config._burst_size](queue_t &q) {
std::pair<Msg, conn_t> item;
size_t cnt = 0;
- while (q.try_dequeue(item))
+ while (q.try_dequeue(item) && this->system_state == 1)
{
auto &msg = item.first;
auto &conn = item.second;
@@ -525,8 +547,9 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
/* this callback is run by a worker */
template<typename OpcodeType>
void MsgNetwork<OpcodeType>::on_read(const ConnPool::conn_t &_conn) {
- ConnPool::on_read(_conn);
auto conn = static_pointer_cast<Conn>(_conn);
+ if (conn->msg_sleep) return;
+ ConnPool::on_read(_conn);
auto &recv_buffer = conn->recv_buffer;
auto &msg = conn->msg;
auto &msg_state = conn->msg_state;
@@ -550,13 +573,25 @@ void MsgNetwork<OpcodeType>::on_read(const ConnPool::conn_t &_conn) {
if (!msg.verify_checksum())
{
SALTICIDAE_LOG_WARN("checksums do not match, dropping the message");
- return;
+ break;
}
#endif
- while (!incoming_msgs.enqueue(std::make_pair(msg, conn), true))
- std::this_thread::yield();
+ if (!incoming_msgs.enqueue(std::make_pair(msg, conn), false))
+ {
+ conn->msg_sleep = true;
+ conn->ev_enqueue_poll.add(0);
+ return;
+ }
}
}
+ if (conn->ready_recv && recv_buffer.size() < conn->max_recv_buff_size)
+ {
+ /* resume reading from socket */
+ conn->ev_socket.del();
+ conn->ev_socket.add(FdEvent::READ |
+ (conn->ready_send ? 0: FdEvent::WRITE));
+ conn->send_data_func(conn, conn->fd, FdEvent::READ);
+ }
}
template<typename OpcodeType>