aboutsummaryrefslogtreecommitdiff
path: root/include
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2018-11-11 23:02:59 -0500
committerDeterminant <[email protected]>2018-11-11 23:02:59 -0500
commitdd09443b0b3c0b5d1a8c034644d1065dd25bf5a9 (patch)
treef770ccffa4ae89bfec00c65e162f4f0d0613c3ae /include
parentb7d65ee96221e63c865b1bf8cda79b3021cba412 (diff)
start debugging multiloops design
Diffstat (limited to 'include')
-rw-r--r--include/salticidae/buffer.h148
-rw-r--r--include/salticidae/conn.h392
-rw-r--r--include/salticidae/event.h60
-rw-r--r--include/salticidae/network.h64
-rw-r--r--include/salticidae/queue.h28
5 files changed, 508 insertions, 184 deletions
diff --git a/include/salticidae/buffer.h b/include/salticidae/buffer.h
new file mode 100644
index 0000000..3f415c9
--- /dev/null
+++ b/include/salticidae/buffer.h
@@ -0,0 +1,148 @@
+#ifndef _SALTICIDAE_BUFFER_H
+#define _SALTICIDAE_BUFFER_H
+
+#include <list>
+
+namespace salticidae {
+
+class SegBuffer {
+ public:
+ struct buffer_entry_t {
+ bytearray_t data;
+ bytearray_t::iterator offset;
+ buffer_entry_t(): offset(data.begin()) {}
+ buffer_entry_t(bytearray_t &&_data):
+ data(std::move(_data)), offset(data.begin()) {}
+
+ buffer_entry_t(buffer_entry_t &&other) {
+ size_t _offset = other.offset - other.data.begin();
+ data = std::move(other.data);
+ offset = data.begin() + _offset;
+ }
+
+ buffer_entry_t &operator=(buffer_entry_t &&other) {
+ size_t _offset = other.offset - other.data.begin();
+ data = std::move(other.data);
+ offset = data.begin() + _offset;
+ return *this;
+ }
+
+ buffer_entry_t(const buffer_entry_t &other): data(other.data) {
+ offset = data.begin() + (other.offset - other.data.begin());
+ }
+
+ size_t length() const { return data.end() - offset; }
+ };
+
+ private:
+ std::list<buffer_entry_t> buffer;
+ size_t _size;
+
+ public:
+ SegBuffer(): _size(0) {}
+ ~SegBuffer() { clear(); }
+
+ void swap(SegBuffer &other) {
+ std::swap(buffer, other.buffer);
+ std::swap(_size, other._size);
+ }
+
+ SegBuffer(const SegBuffer &other):
+ buffer(other.buffer), _size(other._size) {}
+
+ SegBuffer(SegBuffer &&other):
+ buffer(std::move(other.buffer)), _size(other._size) {
+ other._size = 0;
+ }
+
+ SegBuffer &operator=(SegBuffer &&other) {
+ if (this != &other)
+ {
+ SegBuffer tmp(std::move(other));
+ tmp.swap(*this);
+ }
+ return *this;
+ }
+
+ SegBuffer &operator=(const SegBuffer &other) {
+ if (this != &other)
+ {
+ SegBuffer tmp(other);
+ tmp.swap(*this);
+ }
+ return *this;
+ }
+
+ void rewind(bytearray_t &&data) {
+ _size += data.size();
+ buffer.push_front(buffer_entry_t(std::move(data)));
+ }
+
+ void push(bytearray_t &&data) {
+ _size += data.size();
+ buffer.push_back(buffer_entry_t(std::move(data)));
+ }
+
+ bytearray_t move_pop() {
+ auto res = std::move(buffer.front().data);
+ buffer.pop_front();
+ _size -= res.size();
+ return std::move(res);
+ }
+
+ bytearray_t pop(size_t len) {
+ bytearray_t res;
+ auto i = buffer.begin();
+ while (len && i != buffer.end())
+ {
+ size_t copy_len = std::min(i->length(), len);
+ res.insert(res.end(), i->offset, i->offset + copy_len);
+ i->offset += copy_len;
+ len -= copy_len;
+ if (i->offset == i->data.end())
+ i++;
+ }
+ buffer.erase(buffer.begin(), i);
+ _size -= res.size();
+ return std::move(res);
+ }
+
+ size_t size() const { return _size; }
+ bool empty() const { return buffer.empty(); }
+
+ void clear() {
+ buffer.clear();
+ _size = 0;
+ }
+};
+
+struct MPSCWriteBuffer {
+ using buffer_entry_t = SegBuffer::buffer_entry_t;
+ using queue_t = MPSCQueueEventDriven<buffer_entry_t>;
+ queue_t buffer;
+
+ MPSCWriteBuffer() {}
+
+ MPSCWriteBuffer(const SegBuffer &other) = delete;
+ MPSCWriteBuffer(SegBuffer &&other) = delete;
+
+ void rewind(bytearray_t &&data) {
+ buffer.rewind(buffer_entry_t(std::move(data)));
+ }
+
+ void push(bytearray_t &&data) {
+ buffer.enqueue(buffer_entry_t(std::move(data)));
+ }
+
+ bytearray_t move_pop() {
+ buffer_entry_t res;
+ buffer.try_dequeue(res);
+ return std::move(res.data);
+ }
+
+ queue_t &get_queue() { return buffer; }
+};
+
+}
+
+#endif
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index f290e3d..26d19fe 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -36,6 +36,9 @@
#include <list>
#include <algorithm>
#include <exception>
+#include <mutex>
+#include <thread>
+#include <fcntl.h>
#include "salticidae/type.h"
#include "salticidae/ref.h"
@@ -43,110 +46,10 @@
#include "salticidae/util.h"
#include "salticidae/netaddr.h"
#include "salticidae/msg.h"
+#include "salticidae/buffer.h"
namespace salticidae {
-class SegBuffer {
- struct buffer_entry_t {
- bytearray_t data;
- bytearray_t::iterator offset;
- buffer_entry_t(bytearray_t &&_data): data(std::move(_data)) {
- offset = data.begin();
- }
-
- buffer_entry_t(buffer_entry_t &&other) {
- size_t _offset = other.offset - other.data.begin();
- data = std::move(other.data);
- offset = data.begin() + _offset;
- }
-
- buffer_entry_t(const buffer_entry_t &other): data(other.data) {
- offset = data.begin() + (other.offset - other.data.begin());
- }
-
- size_t length() const { return data.end() - offset; }
- };
-
- std::list<buffer_entry_t> buffer;
- size_t _size;
-
- public:
- SegBuffer(): _size(0) {}
- ~SegBuffer() { clear(); }
-
- void swap(SegBuffer &other) {
- std::swap(buffer, other.buffer);
- std::swap(_size, other._size);
- }
-
- SegBuffer(const SegBuffer &other):
- buffer(other.buffer), _size(other._size) {}
-
- SegBuffer(SegBuffer &&other):
- buffer(std::move(other.buffer)), _size(other._size) {
- other._size = 0;
- }
-
- SegBuffer &operator=(SegBuffer &&other) {
- if (this != &other)
- {
- SegBuffer tmp(std::move(other));
- tmp.swap(*this);
- }
- return *this;
- }
-
- SegBuffer &operator=(const SegBuffer &other) {
- if (this != &other)
- {
- SegBuffer tmp(other);
- tmp.swap(*this);
- }
- return *this;
- }
-
- void rewind(bytearray_t &&data) {
- _size += data.size();
- buffer.push_front(buffer_entry_t(std::move(data)));
- }
-
- void push(bytearray_t &&data) {
- _size += data.size();
- buffer.push_back(buffer_entry_t(std::move(data)));
- }
-
- bytearray_t move_pop() {
- auto res = std::move(buffer.front().data);
- buffer.pop_front();
- return std::move(res);
- }
-
- bytearray_t pop(size_t len) {
- bytearray_t res;
- auto i = buffer.begin();
- while (len && i != buffer.end())
- {
- size_t copy_len = std::min(i->length(), len);
- res.insert(res.end(), i->offset, i->offset + copy_len);
- i->offset += copy_len;
- len -= copy_len;
- if (i->offset == i->data.end())
- i++;
- }
- buffer.erase(buffer.begin(), i);
- _size -= res.size();
- return std::move(res);
- }
-
- size_t size() const { return _size; }
- bool empty() const { return buffer.empty(); }
-
- void clear() {
- buffer.clear();
- _size = 0;
- }
-};
-
struct ConnPoolError: public SalticidaeError {
using SalticidaeError::SalticidaeError;
};
@@ -156,7 +59,7 @@ class ConnPool {
public:
class Conn;
/** The handle to a bi-directional connection. */
- using conn_t = RcObj<Conn>;
+ using conn_t = ArcObj<Conn>;
/** The type of callback invoked when connection status is changed. */
using conn_callback_t = std::function<void(Conn &)>;
@@ -177,7 +80,8 @@ class ConnPool {
ConnMode mode;
NetAddr addr;
- SegBuffer send_buffer;
+ // TODO: send_buffer should be a thread-safe mpsc queue
+ MPSCWriteBuffer send_buffer;
SegBuffer recv_buffer;
Event ev_read;
@@ -190,6 +94,9 @@ class ConnPool {
void send_data(evutil_socket_t, short);
void conn_server(evutil_socket_t, short);
+ /** Terminate the connection. */
+ void terminate();
+
public:
Conn(): ready_send(false) {}
Conn(const Conn &) = delete;
@@ -206,7 +113,8 @@ class ConnPool {
const NetAddr &get_addr() const { return addr; }
ConnMode get_mode() const { return mode; }
ConnPool *get_pool() const { return cpool; }
- SegBuffer &read() { return recv_buffer; }
+ SegBuffer &get_recv_buffer() { return recv_buffer; }
+ MPSCWriteBuffer &get_send_buffer() { return send_buffer; }
/** Set the buffer size used for send/receive data. */
void set_seg_buff_size(size_t size) { seg_buff_size = size; }
@@ -214,17 +122,12 @@ class ConnPool {
* whenever I/O is available. */
void write(bytearray_t &&data) {
send_buffer.push(std::move(data));
- if (ready_send)
- send_data(fd, EV_WRITE);
- }
-
- /** Move the send buffer from the other (old) connection. */
- void move_send_buffer(conn_t other) {
- send_buffer = std::move(other->send_buffer);
}
- /** Terminate the connection. */
- void terminate();
+ ///** Move the send buffer from the other (old) connection. */
+ //void move_send_buffer(conn_t other) {
+ // send_buffer = std::move(other->send_buffer);
+ //}
protected:
/** Close the IO and clear all on-going or planned events. */
@@ -238,35 +141,196 @@ class ConnPool {
}
/** Called when new data is available. */
- virtual void on_read() {
- if (cpool->read_cb) cpool->read_cb(*this);
- }
+ virtual void on_read() {}
/** Called when the underlying connection is established. */
virtual void on_setup() {
- if (cpool->conn_cb) cpool->conn_cb(*this);
+ cpool->update_conn(self());
}
/** Called when the underlying connection breaks. */
virtual void on_teardown() {
- if (cpool->conn_cb) cpool->conn_cb(*this);
+ cpool->update_conn(self());
}
};
-
+
private:
- int max_listen_backlog;
- double conn_server_timeout;
- size_t seg_buff_size;
- conn_callback_t read_cb;
- conn_callback_t conn_cb;
+ const int max_listen_backlog;
+ const double conn_server_timeout;
+ const size_t seg_buff_size;
+
+ /* owned by user loop */
+ int mlisten_fd[2]; /**< for connection events sent to the user loop */
+ Event ev_mlisten;
+ conn_callback_t conn_cb;
+ /* owned by the dispatcher */
std::unordered_map<int, conn_t> pool;
- int listen_fd;
+ int listen_fd; /**< for accepting new network connections */
+ int dlisten_fd[2]; /**< for control command sent to the dispatcher */
Event ev_listen;
+ Event ev_dlisten;
+ std::mutex cp_mlock;
+
+ void update_conn(const conn_t &conn) {
+ auto ptr = new conn_t(conn);
+ write(mlisten_fd[1], &ptr, sizeof(ptr));
+ }
+
+ struct Worker;
+ class WorkerFeed;
+
+ class WorkerCmd {
+ public:
+ virtual ~WorkerCmd() = default;
+ virtual void exec(Worker *worker) = 0;
+ };
+
+ class Worker {
+ EventContext ec;
+ Event ev_ctl;
+ int ctl_fd[2]; /**< for control messages from dispatcher */
+ std::thread handle;
+
+ public:
+ Worker() {
+ if (pipe2(ctl_fd, O_NONBLOCK))
+ throw ConnPoolError(std::string("failed to create worker pipe"));
+ ev_ctl = Event(ec, ctl_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) {
+ WorkerCmd *dcmd;
+ read(fd, &dcmd, sizeof(dcmd));
+ dcmd->exec(this);
+ delete dcmd;
+ });
+ ev_ctl.add();
+ }
+
+ ~Worker() {
+ close(ctl_fd[0]);
+ close(ctl_fd[1]);
+ }
+
+ /* the following functions are called by the dispatcher */
+ void start() {
+ handle = std::thread([this]() { ec.dispatch(); });
+ }
+
+ void feed(const conn_t &conn, int client_fd) {
+ auto dcmd = new WorkerFeed(conn, client_fd);
+ write(ctl_fd[1], &dcmd, sizeof(dcmd));
+ }
+
+ void stop() {
+ auto dcmd = new WorkerStop();
+ write(ctl_fd[1], &dcmd, sizeof(dcmd));
+ }
+
+ std::thread &get_handle() { return handle; }
+ const EventContext &get_ec() { return ec; }
+ };
+
+ class WorkerFeed: public WorkerCmd {
+ conn_t conn;
+ int client_fd;
+
+ public:
+ WorkerFeed(const conn_t &conn, int client_fd):
+ conn(conn), client_fd(client_fd) {}
+ void exec(Worker *worker) override {
+ SALTICIDAE_LOG_INFO("worker %x got %s",
+ std::this_thread::get_id(),
+ std::string(*conn).c_str());
+ auto &ec = worker->get_ec();
+ conn->get_send_buffer()
+ .get_queue()
+ .reg_handler(ec, [conn=this->conn,
+ client_fd=this->client_fd](MPSCWriteBuffer::queue_t &) {
+ if (conn->ready_send)
+ conn->send_data(client_fd, EV_WRITE);
+ return false;
+ });
+ auto conn_ptr = conn.get();
+ conn->ev_read = Event(ec, client_fd, EV_READ,
+ std::bind(&Conn::recv_data, conn_ptr, _1, _2));
+ conn->ev_write = Event(ec, client_fd, EV_WRITE,
+ std::bind(&Conn::send_data, conn_ptr, _1, _2));
+ conn->ev_read.add();
+ conn->ev_write.add();
+ }
+ };
+
+ class WorkerStop: public WorkerCmd {
+ public:
+ void exec(Worker *worker) override { worker->get_ec().stop(); }
+ };
+
+ /* related to workers */
+ size_t nworker;
+ salticidae::BoxObj<Worker[]> workers;
void accept_client(evutil_socket_t, short);
- conn_t add_conn(conn_t conn);
+ conn_t add_conn(const conn_t &conn);
+ conn_t _connect(const NetAddr &addr);
+ void _listen(NetAddr listen_addr);
+ void _post_terminate(int fd);
+
+ class DispatchCmd {
+ public:
+ virtual ~DispatchCmd() = default;
+ virtual void exec(ConnPool *cpool) = 0;
+ };
+
+ // TODO: the following two are untested
+ class DspListen: public DispatchCmd {
+ const NetAddr addr;
+ public:
+ DspListen(const NetAddr &addr): addr(addr) {}
+ void exec(ConnPool *cpool) override {
+ cpool->_listen(addr);
+ }
+ };
+
+ class DspConnect: public DispatchCmd {
+ const NetAddr addr;
+ public:
+ DspConnect(const NetAddr &addr): addr(addr) {}
+ void exec(ConnPool *cpool) override {
+ cpool->update_conn(cpool->_connect(addr));
+ }
+ };
+
+ class DspPostTerm: public DispatchCmd {
+ int fd;
+ public:
+ DspPostTerm(int fd): fd(fd) {}
+ void exec(ConnPool *cpool) override {
+ cpool->_post_terminate(fd);
+ }
+ };
+
+ class DspMulticast: public DispatchCmd {
+ std::vector<conn_t> receivers;
+ bytearray_t data;
+ public:
+ DspMulticast(std::vector<conn_t> &&receivers, bytearray_t &&data):
+ receivers(std::move(receivers)),
+ data(std::move(data)) {}
+ void exec(ConnPool *) override {
+ for (auto &r: receivers) r->write(bytearray_t(data));
+ }
+ };
+
+ void post_terminate(int fd) {
+ auto dcmd = new DspPostTerm(fd);
+ write(dlisten_fd[1], &dcmd, sizeof(dcmd));
+ }
+
+ Worker &select_worker() {
+ return workers[1];
+ }
protected:
EventContext ec;
+ EventContext dispatcher_ec;
+ std::mutex dsp_ec_mlock;
/** Should be implemented by derived class to return a new Conn object. */
virtual Conn *create_conn() = 0;
@@ -274,29 +338,91 @@ class ConnPool {
ConnPool(const EventContext &ec,
int max_listen_backlog = 10,
double conn_server_timeout = 2,
- size_t seg_buff_size = 4096):
- max_listen_backlog(max_listen_backlog),
- conn_server_timeout(conn_server_timeout),
- seg_buff_size(seg_buff_size),
- ec(ec) {}
+ size_t seg_buff_size = 4096,
+ size_t nworker = 2):
+ max_listen_backlog(max_listen_backlog),
+ conn_server_timeout(conn_server_timeout),
+ seg_buff_size(seg_buff_size),
+ listen_fd(-1),
+ nworker(std::min((size_t)1, nworker)),
+ ec(ec) {
+ if (pipe2(mlisten_fd, O_NONBLOCK))
+ throw ConnPoolError(std::string("failed to create main pipe"));
+ if (pipe2(dlisten_fd, O_NONBLOCK))
+ throw ConnPoolError(std::string("failed to create dispatcher pipe"));
+
+ ev_mlisten = Event(ec, mlisten_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) {
+ conn_t *conn_ptr;
+ read(fd, &conn_ptr, sizeof(conn_ptr));
+ if (conn_cb)
+ conn_cb(**conn_ptr);
+ delete conn_ptr;
+ });
+ ev_mlisten.add();
+
+ workers = new Worker[nworker];
+ dispatcher_ec = workers[0].get_ec();
+
+ ev_dlisten = Event(dispatcher_ec, dlisten_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) {
+ DispatchCmd *dcmd;
+ read(fd, &dcmd, sizeof(dcmd));
+ dcmd->exec(this);
+ delete dcmd;
+ });
+ ev_dlisten.add();
+
+ SALTICIDAE_LOG_INFO("starting all threads...");
+ for (size_t i = 0; i < nworker; i++)
+ workers[i].start();
+ }
~ConnPool() {
+ /* stop all workers */
+ for (size_t i = 0; i < nworker; i++)
+ workers[i].stop();
+ /* join all worker threads */
+ for (size_t i = 0; i < nworker; i++)
+ workers[i].get_handle().join();
for (auto it: pool)
{
conn_t conn = it.second;
conn->on_close();
}
+ if (listen_fd != -1) close(listen_fd);
+ for (int i = 0; i < 2; i++)
+ {
+ close(mlisten_fd[i]);
+ close(dlisten_fd[i]);
+ }
}
ConnPool(const ConnPool &) = delete;
ConnPool(ConnPool &&) = delete;
/** Actively connect to remote addr. */
- conn_t connect(const NetAddr &addr);
+ conn_t connect(const NetAddr &addr, bool blocking = true) {
+ if (blocking)
+ return _connect(addr);
+ else
+ {
+ auto dcmd = new DspConnect(addr);
+ write(dlisten_fd[1], &dcmd, sizeof(dcmd));
+ return nullptr;
+ }
+ }
+
/** Listen for passive connections (connection initiated from remote).
* Does not need to be called if do not want to accept any passive
* connections. */
- void listen(NetAddr listen_addr);
+ void listen(NetAddr listen_addr, bool blocking = true) {
+ if (blocking)
+ _listen(listen_addr);
+ else
+ {
+ auto dcmd = new DspListen(listen_addr);
+ write(dlisten_fd[1], &dcmd, sizeof(dcmd));
+ }
+ }
template<typename Func>
void reg_conn_handler(Func cb) { conn_cb = cb; }
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index 857518b..ddb93fc 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -53,8 +53,8 @@ class EventContext: public _event_context_ot {
EventContext(EventContext &&) = default;
EventContext &operator=(const EventContext &) = default;
EventContext &operator=(EventContext &&) = default;
- void dispatch() { event_base_dispatch(get()); }
- void stop() { event_base_loopbreak(get()); }
+ void dispatch() const { event_base_dispatch(get()); }
+ void stop() const { event_base_loopbreak(get()); }
};
class Event {
@@ -122,7 +122,7 @@ class Event {
};
template<typename T>
-class MPSCQueueEventDriven: public MPMCQueue<T> {
+class MPSCQueueEventDriven: public MPSCQueue<T> {
private:
const uint64_t dummy = 1;
std::atomic<bool> wait_sig;
@@ -130,38 +130,39 @@ class MPSCQueueEventDriven: public MPMCQueue<T> {
Event ev;
public:
- template<typename Func>
- MPSCQueueEventDriven(const EventContext &ec, Func &&func,
- size_t burst_size = 128, size_t capacity = 65536):
- MPMCQueue<T>(capacity),
+ MPSCQueueEventDriven(size_t capacity = 65536):
+ MPSCQueue<T>(capacity),
wait_sig(true),
- fd(eventfd(0, EFD_NONBLOCK)),
- ev(Event(ec, fd, EV_READ | EV_PERSIST,
- [this, func=std::forward<Func>(func), burst_size](int, short) {
- uint64_t t;
- read(fd, &t, 8);
- //fprintf(stderr, "%x\n", std::this_thread::get_id());
- T elem;
- size_t cnt = burst_size;
- while (MPMCQueue<T>::try_dequeue(elem))
- {
- func(std::move(elem));
- if (!--cnt)
- {
- write(fd, &dummy, 8);
- return;
- }
- }
- wait_sig.store(true, std::memory_order_relaxed);
- })) { ev.add(); }
+ fd(eventfd(0, EFD_NONBLOCK)) {}
~MPSCQueueEventDriven() { close(fd); }
+ template<typename Func>
+ void reg_handler(const EventContext &ec, Func &&func) {
+ ev = Event(ec, fd, EV_READ | EV_PERSIST,
+ [this, func=std::forward<Func>(func)](int, short) {
+ //fprintf(stderr, "%x\n", std::this_thread::get_id());
+ uint64_t t;
+ read(fd, &t, 8);
+ // the only undesirable case is there are some new items
+ // enqueued before recovering wait_sig to true, so the consumer
+ // won't be notified. In this case, no enqueuing thread will
+ // get to write(fd). Then store(true) must happen after all exchange(false),
+ // since all enqueue operations are finalized, the dequeue should be able
+ // to see those enqueued values in func()
+ wait_sig.store(true, std::memory_order_release);
+ if (func(*this))
+ write(fd, &dummy, 8);
+ });
+ ev.add();
+ }
+
template<typename U>
bool enqueue(U &&e) {
static const uint64_t dummy = 1;
- bool ret = MPMCQueue<T>::enqueue(std::forward<U>(e));
- if (wait_sig.exchange(false, std::memory_order_relaxed))
+ bool ret = MPSCQueue<T>::enqueue(std::forward<U>(e));
+ // memory barrier here, so any load/store in enqueue must be finialized
+ if (wait_sig.exchange(false, std::memory_order_acq_rel))
{
SALTICIDAE_LOG_DEBUG("mpsc notify");
write(fd, &dummy, 8);
@@ -170,6 +171,8 @@ class MPSCQueueEventDriven: public MPMCQueue<T> {
}
};
+// TODO: incorrect MPMCQueueEventDriven impl
+/*
template<typename T>
class MPMCQueueEventDriven: public MPMCQueue<T> {
private:
@@ -223,6 +226,7 @@ class MPMCQueueEventDriven: public MPMCQueue<T> {
return ret;
}
};
+*/
}
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 4e966d1..d82772f 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -103,7 +103,7 @@ class MsgNetwork: public ConnPool {
void on_read() override;
};
- using conn_t = RcObj<Conn>;
+ using conn_t = ArcObj<Conn>;
#ifdef SALTICIDAE_MSG_STAT
class msg_stat_by_opcode_t:
public std::unordered_map<typename Msg::opcode_t,
@@ -121,6 +121,8 @@ class MsgNetwork: public ConnPool {
std::unordered_map<
typename Msg::opcode_t,
std::function<void(const Msg &msg, Conn &)>> handler_map;
+ using queue_t = MPSCQueueEventDriven<std::pair<Msg, conn_t>>;
+ queue_t incoming_msgs;
protected:
#ifdef SALTICIDAE_MSG_STAT
@@ -134,10 +136,38 @@ class MsgNetwork: public ConnPool {
MsgNetwork(const EventContext &ec,
int max_listen_backlog,
double conn_server_timeout,
- size_t seg_buff_size):
+ size_t seg_buff_size,
+ size_t burst_size = 1000):
ConnPool(ec, max_listen_backlog,
conn_server_timeout,
- seg_buff_size) {}
+ seg_buff_size) {
+ incoming_msgs.reg_handler(ec, [this, burst_size](queue_t &q) {
+ std::pair<Msg, conn_t> item;
+ size_t cnt = 0;
+ while (q.try_dequeue(item))
+ {
+ auto &msg = item.first;
+ auto &conn = item.second;
+ auto it = handler_map.find(msg.get_opcode());
+ if (it == handler_map.end())
+ SALTICIDAE_LOG_WARN("unknown opcode: %s",
+ get_hex(msg.get_opcode()).c_str());
+ else /* call the handler */
+ {
+ SALTICIDAE_LOG_DEBUG("got message %s from %s",
+ std::string(msg).c_str(),
+ std::string(*conn).c_str());
+ it->second(msg, *conn);
+#ifdef SALTICIDAE_MSG_STAT
+ conn->nrecv++;
+ recv_by_opcode.add(msg);
+#endif
+ }
+ if (++cnt == burst_size) return true;
+ }
+ return false;
+ });
+ }
template<typename Func>
typename std::enable_if<std::is_constructible<
@@ -189,7 +219,7 @@ class ClientNetwork: public MsgNetwork<OpcodeType> {
void on_teardown() override;
};
- using conn_t = RcObj<Conn>;
+ using conn_t = ArcObj<Conn>;
protected:
ConnPool::Conn *create_conn() override { return new Conn(); }
@@ -249,7 +279,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
void on_teardown() override;
};
- using conn_t = RcObj<Conn>;
+ using conn_t = ArcObj<Conn>;
private:
struct Peer {
@@ -361,10 +391,11 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
}
};
+/* this callback is run by a worker */
template<typename OpcodeType>
void MsgNetwork<OpcodeType>::Conn::on_read() {
ConnPool::Conn::on_read();
- auto &recv_buffer = read();
+ auto &recv_buffer = get_recv_buffer();
auto mn = get_net();
while (get_fd() != -1)
{
@@ -389,21 +420,8 @@ void MsgNetwork<OpcodeType>::Conn::on_read() {
return;
}
#endif
- auto it = mn->handler_map.find(msg.get_opcode());
- if (it == mn->handler_map.end())
- SALTICIDAE_LOG_WARN("unknown opcode: %s",
- get_hex(msg.get_opcode()).c_str());
- else /* call the handler */
- {
- SALTICIDAE_LOG_DEBUG("got message %s from %s",
- std::string(msg).c_str(),
- std::string(*this).c_str());
- it->second(msg, *this);
-#ifdef SALTICIDAE_MSG_STAT
- nrecv++;
- mn->recv_by_opcode.add(msg);
-#endif
- }
+ mn->incoming_msgs.enqueue(
+ std::make_pair(std::move(msg), static_pointer_cast<Conn>(self())));
}
}
}
@@ -414,8 +432,8 @@ void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) {
{
if (conn)
{
- SALTICIDAE_LOG_DEBUG("moving send buffer");
- new_conn->move_send_buffer(conn);
+ //SALTICIDAE_LOG_DEBUG("moving send buffer");
+ //new_conn->move_send_buffer(conn);
SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str());
conn->terminate();
}
diff --git a/include/salticidae/queue.h b/include/salticidae/queue.h
index 88b3fc3..9045e2b 100644
--- a/include/salticidae/queue.h
+++ b/include/salticidae/queue.h
@@ -90,6 +90,7 @@ class FreeList {
template<typename T>
class MPMCQueue {
+ protected:
struct Block: public FreeList::Node {
T elem;
std::atomic<Block *> next;
@@ -171,6 +172,33 @@ class MPMCQueue {
}
};
+template<typename T>
+struct MPSCQueue: public MPMCQueue<T> {
+ using MPMCQueue<T>::MPMCQueue;
+ bool try_dequeue(T &e) {
+ auto h = this->head.load(std::memory_order_acquire);
+ auto nh = h->next.load(std::memory_order_relaxed);
+ if (nh == nullptr)
+ return false;
+ e = std::move(nh->elem);
+ this->head.store(nh, std::memory_order_release);
+ this->blks.push(h);
+ return true;
+ }
+
+ template<typename U>
+ bool rewind(U &&e) {
+ FreeList::Node * _nblk;
+ if (!this->blks.pop(_nblk)) return false;
+ auto nblk = static_cast<typename MPMCQueue<T>::Block *>(_nblk);
+ auto h = this->head.load(std::memory_order_acquire);
+ nblk->next.store(h, std::memory_order_release);
+ new (&(h->elem)) T(std::forward<U>(e));
+ this->head.store(nblk, std::memory_order_release);
+ return true;
+ }
+};
+
}
#endif