From dd09443b0b3c0b5d1a8c034644d1065dd25bf5a9 Mon Sep 17 00:00:00 2001 From: Determinant Date: Sun, 11 Nov 2018 23:02:59 -0500 Subject: start debugging multiloops design --- include/salticidae/buffer.h | 148 ++++++++++++++++ include/salticidae/conn.h | 392 ++++++++++++++++++++++++++++--------------- include/salticidae/event.h | 60 +++---- include/salticidae/network.h | 64 ++++--- include/salticidae/queue.h | 28 ++++ 5 files changed, 508 insertions(+), 184 deletions(-) create mode 100644 include/salticidae/buffer.h (limited to 'include') diff --git a/include/salticidae/buffer.h b/include/salticidae/buffer.h new file mode 100644 index 0000000..3f415c9 --- /dev/null +++ b/include/salticidae/buffer.h @@ -0,0 +1,148 @@ +#ifndef _SALTICIDAE_BUFFER_H +#define _SALTICIDAE_BUFFER_H + +#include + +namespace salticidae { + +class SegBuffer { + public: + struct buffer_entry_t { + bytearray_t data; + bytearray_t::iterator offset; + buffer_entry_t(): offset(data.begin()) {} + buffer_entry_t(bytearray_t &&_data): + data(std::move(_data)), offset(data.begin()) {} + + buffer_entry_t(buffer_entry_t &&other) { + size_t _offset = other.offset - other.data.begin(); + data = std::move(other.data); + offset = data.begin() + _offset; + } + + buffer_entry_t &operator=(buffer_entry_t &&other) { + size_t _offset = other.offset - other.data.begin(); + data = std::move(other.data); + offset = data.begin() + _offset; + return *this; + } + + buffer_entry_t(const buffer_entry_t &other): data(other.data) { + offset = data.begin() + (other.offset - other.data.begin()); + } + + size_t length() const { return data.end() - offset; } + }; + + private: + std::list buffer; + size_t _size; + + public: + SegBuffer(): _size(0) {} + ~SegBuffer() { clear(); } + + void swap(SegBuffer &other) { + std::swap(buffer, other.buffer); + std::swap(_size, other._size); + } + + SegBuffer(const SegBuffer &other): + buffer(other.buffer), _size(other._size) {} + + SegBuffer(SegBuffer &&other): + buffer(std::move(other.buffer)), _size(other._size) { + other._size = 0; + } + + SegBuffer &operator=(SegBuffer &&other) { + if (this != &other) + { + SegBuffer tmp(std::move(other)); + tmp.swap(*this); + } + return *this; + } + + SegBuffer &operator=(const SegBuffer &other) { + if (this != &other) + { + SegBuffer tmp(other); + tmp.swap(*this); + } + return *this; + } + + void rewind(bytearray_t &&data) { + _size += data.size(); + buffer.push_front(buffer_entry_t(std::move(data))); + } + + void push(bytearray_t &&data) { + _size += data.size(); + buffer.push_back(buffer_entry_t(std::move(data))); + } + + bytearray_t move_pop() { + auto res = std::move(buffer.front().data); + buffer.pop_front(); + _size -= res.size(); + return std::move(res); + } + + bytearray_t pop(size_t len) { + bytearray_t res; + auto i = buffer.begin(); + while (len && i != buffer.end()) + { + size_t copy_len = std::min(i->length(), len); + res.insert(res.end(), i->offset, i->offset + copy_len); + i->offset += copy_len; + len -= copy_len; + if (i->offset == i->data.end()) + i++; + } + buffer.erase(buffer.begin(), i); + _size -= res.size(); + return std::move(res); + } + + size_t size() const { return _size; } + bool empty() const { return buffer.empty(); } + + void clear() { + buffer.clear(); + _size = 0; + } +}; + +struct MPSCWriteBuffer { + using buffer_entry_t = SegBuffer::buffer_entry_t; + using queue_t = MPSCQueueEventDriven; + queue_t buffer; + + MPSCWriteBuffer() {} + + MPSCWriteBuffer(const SegBuffer &other) = delete; + MPSCWriteBuffer(SegBuffer &&other) = delete; + + void rewind(bytearray_t &&data) { + buffer.rewind(buffer_entry_t(std::move(data))); + } + + void push(bytearray_t &&data) { + buffer.enqueue(buffer_entry_t(std::move(data))); + } + + bytearray_t move_pop() { + buffer_entry_t res; + buffer.try_dequeue(res); + return std::move(res.data); + } + + queue_t &get_queue() { return buffer; } +}; + +} + +#endif diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index f290e3d..26d19fe 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -36,6 +36,9 @@ #include #include #include +#include +#include +#include #include "salticidae/type.h" #include "salticidae/ref.h" @@ -43,110 +46,10 @@ #include "salticidae/util.h" #include "salticidae/netaddr.h" #include "salticidae/msg.h" +#include "salticidae/buffer.h" namespace salticidae { -class SegBuffer { - struct buffer_entry_t { - bytearray_t data; - bytearray_t::iterator offset; - buffer_entry_t(bytearray_t &&_data): data(std::move(_data)) { - offset = data.begin(); - } - - buffer_entry_t(buffer_entry_t &&other) { - size_t _offset = other.offset - other.data.begin(); - data = std::move(other.data); - offset = data.begin() + _offset; - } - - buffer_entry_t(const buffer_entry_t &other): data(other.data) { - offset = data.begin() + (other.offset - other.data.begin()); - } - - size_t length() const { return data.end() - offset; } - }; - - std::list buffer; - size_t _size; - - public: - SegBuffer(): _size(0) {} - ~SegBuffer() { clear(); } - - void swap(SegBuffer &other) { - std::swap(buffer, other.buffer); - std::swap(_size, other._size); - } - - SegBuffer(const SegBuffer &other): - buffer(other.buffer), _size(other._size) {} - - SegBuffer(SegBuffer &&other): - buffer(std::move(other.buffer)), _size(other._size) { - other._size = 0; - } - - SegBuffer &operator=(SegBuffer &&other) { - if (this != &other) - { - SegBuffer tmp(std::move(other)); - tmp.swap(*this); - } - return *this; - } - - SegBuffer &operator=(const SegBuffer &other) { - if (this != &other) - { - SegBuffer tmp(other); - tmp.swap(*this); - } - return *this; - } - - void rewind(bytearray_t &&data) { - _size += data.size(); - buffer.push_front(buffer_entry_t(std::move(data))); - } - - void push(bytearray_t &&data) { - _size += data.size(); - buffer.push_back(buffer_entry_t(std::move(data))); - } - - bytearray_t move_pop() { - auto res = std::move(buffer.front().data); - buffer.pop_front(); - return std::move(res); - } - - bytearray_t pop(size_t len) { - bytearray_t res; - auto i = buffer.begin(); - while (len && i != buffer.end()) - { - size_t copy_len = std::min(i->length(), len); - res.insert(res.end(), i->offset, i->offset + copy_len); - i->offset += copy_len; - len -= copy_len; - if (i->offset == i->data.end()) - i++; - } - buffer.erase(buffer.begin(), i); - _size -= res.size(); - return std::move(res); - } - - size_t size() const { return _size; } - bool empty() const { return buffer.empty(); } - - void clear() { - buffer.clear(); - _size = 0; - } -}; - struct ConnPoolError: public SalticidaeError { using SalticidaeError::SalticidaeError; }; @@ -156,7 +59,7 @@ class ConnPool { public: class Conn; /** The handle to a bi-directional connection. */ - using conn_t = RcObj; + using conn_t = ArcObj; /** The type of callback invoked when connection status is changed. */ using conn_callback_t = std::function; @@ -177,7 +80,8 @@ class ConnPool { ConnMode mode; NetAddr addr; - SegBuffer send_buffer; + // TODO: send_buffer should be a thread-safe mpsc queue + MPSCWriteBuffer send_buffer; SegBuffer recv_buffer; Event ev_read; @@ -190,6 +94,9 @@ class ConnPool { void send_data(evutil_socket_t, short); void conn_server(evutil_socket_t, short); + /** Terminate the connection. */ + void terminate(); + public: Conn(): ready_send(false) {} Conn(const Conn &) = delete; @@ -206,7 +113,8 @@ class ConnPool { const NetAddr &get_addr() const { return addr; } ConnMode get_mode() const { return mode; } ConnPool *get_pool() const { return cpool; } - SegBuffer &read() { return recv_buffer; } + SegBuffer &get_recv_buffer() { return recv_buffer; } + MPSCWriteBuffer &get_send_buffer() { return send_buffer; } /** Set the buffer size used for send/receive data. */ void set_seg_buff_size(size_t size) { seg_buff_size = size; } @@ -214,17 +122,12 @@ class ConnPool { * whenever I/O is available. */ void write(bytearray_t &&data) { send_buffer.push(std::move(data)); - if (ready_send) - send_data(fd, EV_WRITE); - } - - /** Move the send buffer from the other (old) connection. */ - void move_send_buffer(conn_t other) { - send_buffer = std::move(other->send_buffer); } - /** Terminate the connection. */ - void terminate(); + ///** Move the send buffer from the other (old) connection. */ + //void move_send_buffer(conn_t other) { + // send_buffer = std::move(other->send_buffer); + //} protected: /** Close the IO and clear all on-going or planned events. */ @@ -238,35 +141,196 @@ class ConnPool { } /** Called when new data is available. */ - virtual void on_read() { - if (cpool->read_cb) cpool->read_cb(*this); - } + virtual void on_read() {} /** Called when the underlying connection is established. */ virtual void on_setup() { - if (cpool->conn_cb) cpool->conn_cb(*this); + cpool->update_conn(self()); } /** Called when the underlying connection breaks. */ virtual void on_teardown() { - if (cpool->conn_cb) cpool->conn_cb(*this); + cpool->update_conn(self()); } }; - + private: - int max_listen_backlog; - double conn_server_timeout; - size_t seg_buff_size; - conn_callback_t read_cb; - conn_callback_t conn_cb; + const int max_listen_backlog; + const double conn_server_timeout; + const size_t seg_buff_size; + + /* owned by user loop */ + int mlisten_fd[2]; /**< for connection events sent to the user loop */ + Event ev_mlisten; + conn_callback_t conn_cb; + /* owned by the dispatcher */ std::unordered_map pool; - int listen_fd; + int listen_fd; /**< for accepting new network connections */ + int dlisten_fd[2]; /**< for control command sent to the dispatcher */ Event ev_listen; + Event ev_dlisten; + std::mutex cp_mlock; + + void update_conn(const conn_t &conn) { + auto ptr = new conn_t(conn); + write(mlisten_fd[1], &ptr, sizeof(ptr)); + } + + struct Worker; + class WorkerFeed; + + class WorkerCmd { + public: + virtual ~WorkerCmd() = default; + virtual void exec(Worker *worker) = 0; + }; + + class Worker { + EventContext ec; + Event ev_ctl; + int ctl_fd[2]; /**< for control messages from dispatcher */ + std::thread handle; + + public: + Worker() { + if (pipe2(ctl_fd, O_NONBLOCK)) + throw ConnPoolError(std::string("failed to create worker pipe")); + ev_ctl = Event(ec, ctl_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) { + WorkerCmd *dcmd; + read(fd, &dcmd, sizeof(dcmd)); + dcmd->exec(this); + delete dcmd; + }); + ev_ctl.add(); + } + + ~Worker() { + close(ctl_fd[0]); + close(ctl_fd[1]); + } + + /* the following functions are called by the dispatcher */ + void start() { + handle = std::thread([this]() { ec.dispatch(); }); + } + + void feed(const conn_t &conn, int client_fd) { + auto dcmd = new WorkerFeed(conn, client_fd); + write(ctl_fd[1], &dcmd, sizeof(dcmd)); + } + + void stop() { + auto dcmd = new WorkerStop(); + write(ctl_fd[1], &dcmd, sizeof(dcmd)); + } + + std::thread &get_handle() { return handle; } + const EventContext &get_ec() { return ec; } + }; + + class WorkerFeed: public WorkerCmd { + conn_t conn; + int client_fd; + + public: + WorkerFeed(const conn_t &conn, int client_fd): + conn(conn), client_fd(client_fd) {} + void exec(Worker *worker) override { + SALTICIDAE_LOG_INFO("worker %x got %s", + std::this_thread::get_id(), + std::string(*conn).c_str()); + auto &ec = worker->get_ec(); + conn->get_send_buffer() + .get_queue() + .reg_handler(ec, [conn=this->conn, + client_fd=this->client_fd](MPSCWriteBuffer::queue_t &) { + if (conn->ready_send) + conn->send_data(client_fd, EV_WRITE); + return false; + }); + auto conn_ptr = conn.get(); + conn->ev_read = Event(ec, client_fd, EV_READ, + std::bind(&Conn::recv_data, conn_ptr, _1, _2)); + conn->ev_write = Event(ec, client_fd, EV_WRITE, + std::bind(&Conn::send_data, conn_ptr, _1, _2)); + conn->ev_read.add(); + conn->ev_write.add(); + } + }; + + class WorkerStop: public WorkerCmd { + public: + void exec(Worker *worker) override { worker->get_ec().stop(); } + }; + + /* related to workers */ + size_t nworker; + salticidae::BoxObj workers; void accept_client(evutil_socket_t, short); - conn_t add_conn(conn_t conn); + conn_t add_conn(const conn_t &conn); + conn_t _connect(const NetAddr &addr); + void _listen(NetAddr listen_addr); + void _post_terminate(int fd); + + class DispatchCmd { + public: + virtual ~DispatchCmd() = default; + virtual void exec(ConnPool *cpool) = 0; + }; + + // TODO: the following two are untested + class DspListen: public DispatchCmd { + const NetAddr addr; + public: + DspListen(const NetAddr &addr): addr(addr) {} + void exec(ConnPool *cpool) override { + cpool->_listen(addr); + } + }; + + class DspConnect: public DispatchCmd { + const NetAddr addr; + public: + DspConnect(const NetAddr &addr): addr(addr) {} + void exec(ConnPool *cpool) override { + cpool->update_conn(cpool->_connect(addr)); + } + }; + + class DspPostTerm: public DispatchCmd { + int fd; + public: + DspPostTerm(int fd): fd(fd) {} + void exec(ConnPool *cpool) override { + cpool->_post_terminate(fd); + } + }; + + class DspMulticast: public DispatchCmd { + std::vector receivers; + bytearray_t data; + public: + DspMulticast(std::vector &&receivers, bytearray_t &&data): + receivers(std::move(receivers)), + data(std::move(data)) {} + void exec(ConnPool *) override { + for (auto &r: receivers) r->write(bytearray_t(data)); + } + }; + + void post_terminate(int fd) { + auto dcmd = new DspPostTerm(fd); + write(dlisten_fd[1], &dcmd, sizeof(dcmd)); + } + + Worker &select_worker() { + return workers[1]; + } protected: EventContext ec; + EventContext dispatcher_ec; + std::mutex dsp_ec_mlock; /** Should be implemented by derived class to return a new Conn object. */ virtual Conn *create_conn() = 0; @@ -274,29 +338,91 @@ class ConnPool { ConnPool(const EventContext &ec, int max_listen_backlog = 10, double conn_server_timeout = 2, - size_t seg_buff_size = 4096): - max_listen_backlog(max_listen_backlog), - conn_server_timeout(conn_server_timeout), - seg_buff_size(seg_buff_size), - ec(ec) {} + size_t seg_buff_size = 4096, + size_t nworker = 2): + max_listen_backlog(max_listen_backlog), + conn_server_timeout(conn_server_timeout), + seg_buff_size(seg_buff_size), + listen_fd(-1), + nworker(std::min((size_t)1, nworker)), + ec(ec) { + if (pipe2(mlisten_fd, O_NONBLOCK)) + throw ConnPoolError(std::string("failed to create main pipe")); + if (pipe2(dlisten_fd, O_NONBLOCK)) + throw ConnPoolError(std::string("failed to create dispatcher pipe")); + + ev_mlisten = Event(ec, mlisten_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) { + conn_t *conn_ptr; + read(fd, &conn_ptr, sizeof(conn_ptr)); + if (conn_cb) + conn_cb(**conn_ptr); + delete conn_ptr; + }); + ev_mlisten.add(); + + workers = new Worker[nworker]; + dispatcher_ec = workers[0].get_ec(); + + ev_dlisten = Event(dispatcher_ec, dlisten_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) { + DispatchCmd *dcmd; + read(fd, &dcmd, sizeof(dcmd)); + dcmd->exec(this); + delete dcmd; + }); + ev_dlisten.add(); + + SALTICIDAE_LOG_INFO("starting all threads..."); + for (size_t i = 0; i < nworker; i++) + workers[i].start(); + } ~ConnPool() { + /* stop all workers */ + for (size_t i = 0; i < nworker; i++) + workers[i].stop(); + /* join all worker threads */ + for (size_t i = 0; i < nworker; i++) + workers[i].get_handle().join(); for (auto it: pool) { conn_t conn = it.second; conn->on_close(); } + if (listen_fd != -1) close(listen_fd); + for (int i = 0; i < 2; i++) + { + close(mlisten_fd[i]); + close(dlisten_fd[i]); + } } ConnPool(const ConnPool &) = delete; ConnPool(ConnPool &&) = delete; /** Actively connect to remote addr. */ - conn_t connect(const NetAddr &addr); + conn_t connect(const NetAddr &addr, bool blocking = true) { + if (blocking) + return _connect(addr); + else + { + auto dcmd = new DspConnect(addr); + write(dlisten_fd[1], &dcmd, sizeof(dcmd)); + return nullptr; + } + } + /** Listen for passive connections (connection initiated from remote). * Does not need to be called if do not want to accept any passive * connections. */ - void listen(NetAddr listen_addr); + void listen(NetAddr listen_addr, bool blocking = true) { + if (blocking) + _listen(listen_addr); + else + { + auto dcmd = new DspListen(listen_addr); + write(dlisten_fd[1], &dcmd, sizeof(dcmd)); + } + } template void reg_conn_handler(Func cb) { conn_cb = cb; } diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 857518b..ddb93fc 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -53,8 +53,8 @@ class EventContext: public _event_context_ot { EventContext(EventContext &&) = default; EventContext &operator=(const EventContext &) = default; EventContext &operator=(EventContext &&) = default; - void dispatch() { event_base_dispatch(get()); } - void stop() { event_base_loopbreak(get()); } + void dispatch() const { event_base_dispatch(get()); } + void stop() const { event_base_loopbreak(get()); } }; class Event { @@ -122,7 +122,7 @@ class Event { }; template -class MPSCQueueEventDriven: public MPMCQueue { +class MPSCQueueEventDriven: public MPSCQueue { private: const uint64_t dummy = 1; std::atomic wait_sig; @@ -130,38 +130,39 @@ class MPSCQueueEventDriven: public MPMCQueue { Event ev; public: - template - MPSCQueueEventDriven(const EventContext &ec, Func &&func, - size_t burst_size = 128, size_t capacity = 65536): - MPMCQueue(capacity), + MPSCQueueEventDriven(size_t capacity = 65536): + MPSCQueue(capacity), wait_sig(true), - fd(eventfd(0, EFD_NONBLOCK)), - ev(Event(ec, fd, EV_READ | EV_PERSIST, - [this, func=std::forward(func), burst_size](int, short) { - uint64_t t; - read(fd, &t, 8); - //fprintf(stderr, "%x\n", std::this_thread::get_id()); - T elem; - size_t cnt = burst_size; - while (MPMCQueue::try_dequeue(elem)) - { - func(std::move(elem)); - if (!--cnt) - { - write(fd, &dummy, 8); - return; - } - } - wait_sig.store(true, std::memory_order_relaxed); - })) { ev.add(); } + fd(eventfd(0, EFD_NONBLOCK)) {} ~MPSCQueueEventDriven() { close(fd); } + template + void reg_handler(const EventContext &ec, Func &&func) { + ev = Event(ec, fd, EV_READ | EV_PERSIST, + [this, func=std::forward(func)](int, short) { + //fprintf(stderr, "%x\n", std::this_thread::get_id()); + uint64_t t; + read(fd, &t, 8); + // the only undesirable case is there are some new items + // enqueued before recovering wait_sig to true, so the consumer + // won't be notified. In this case, no enqueuing thread will + // get to write(fd). Then store(true) must happen after all exchange(false), + // since all enqueue operations are finalized, the dequeue should be able + // to see those enqueued values in func() + wait_sig.store(true, std::memory_order_release); + if (func(*this)) + write(fd, &dummy, 8); + }); + ev.add(); + } + template bool enqueue(U &&e) { static const uint64_t dummy = 1; - bool ret = MPMCQueue::enqueue(std::forward(e)); - if (wait_sig.exchange(false, std::memory_order_relaxed)) + bool ret = MPSCQueue::enqueue(std::forward(e)); + // memory barrier here, so any load/store in enqueue must be finialized + if (wait_sig.exchange(false, std::memory_order_acq_rel)) { SALTICIDAE_LOG_DEBUG("mpsc notify"); write(fd, &dummy, 8); @@ -170,6 +171,8 @@ class MPSCQueueEventDriven: public MPMCQueue { } }; +// TODO: incorrect MPMCQueueEventDriven impl +/* template class MPMCQueueEventDriven: public MPMCQueue { private: @@ -223,6 +226,7 @@ class MPMCQueueEventDriven: public MPMCQueue { return ret; } }; +*/ } diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 4e966d1..d82772f 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -103,7 +103,7 @@ class MsgNetwork: public ConnPool { void on_read() override; }; - using conn_t = RcObj; + using conn_t = ArcObj; #ifdef SALTICIDAE_MSG_STAT class msg_stat_by_opcode_t: public std::unordered_map> handler_map; + using queue_t = MPSCQueueEventDriven>; + queue_t incoming_msgs; protected: #ifdef SALTICIDAE_MSG_STAT @@ -134,10 +136,38 @@ class MsgNetwork: public ConnPool { MsgNetwork(const EventContext &ec, int max_listen_backlog, double conn_server_timeout, - size_t seg_buff_size): + size_t seg_buff_size, + size_t burst_size = 1000): ConnPool(ec, max_listen_backlog, conn_server_timeout, - seg_buff_size) {} + seg_buff_size) { + incoming_msgs.reg_handler(ec, [this, burst_size](queue_t &q) { + std::pair item; + size_t cnt = 0; + while (q.try_dequeue(item)) + { + auto &msg = item.first; + auto &conn = item.second; + auto it = handler_map.find(msg.get_opcode()); + if (it == handler_map.end()) + SALTICIDAE_LOG_WARN("unknown opcode: %s", + get_hex(msg.get_opcode()).c_str()); + else /* call the handler */ + { + SALTICIDAE_LOG_DEBUG("got message %s from %s", + std::string(msg).c_str(), + std::string(*conn).c_str()); + it->second(msg, *conn); +#ifdef SALTICIDAE_MSG_STAT + conn->nrecv++; + recv_by_opcode.add(msg); +#endif + } + if (++cnt == burst_size) return true; + } + return false; + }); + } template typename std::enable_if { void on_teardown() override; }; - using conn_t = RcObj; + using conn_t = ArcObj; protected: ConnPool::Conn *create_conn() override { return new Conn(); } @@ -249,7 +279,7 @@ class PeerNetwork: public MsgNetwork { void on_teardown() override; }; - using conn_t = RcObj; + using conn_t = ArcObj; private: struct Peer { @@ -361,10 +391,11 @@ class PeerNetwork: public MsgNetwork { } }; +/* this callback is run by a worker */ template void MsgNetwork::Conn::on_read() { ConnPool::Conn::on_read(); - auto &recv_buffer = read(); + auto &recv_buffer = get_recv_buffer(); auto mn = get_net(); while (get_fd() != -1) { @@ -389,21 +420,8 @@ void MsgNetwork::Conn::on_read() { return; } #endif - auto it = mn->handler_map.find(msg.get_opcode()); - if (it == mn->handler_map.end()) - SALTICIDAE_LOG_WARN("unknown opcode: %s", - get_hex(msg.get_opcode()).c_str()); - else /* call the handler */ - { - SALTICIDAE_LOG_DEBUG("got message %s from %s", - std::string(msg).c_str(), - std::string(*this).c_str()); - it->second(msg, *this); -#ifdef SALTICIDAE_MSG_STAT - nrecv++; - mn->recv_by_opcode.add(msg); -#endif - } + mn->incoming_msgs.enqueue( + std::make_pair(std::move(msg), static_pointer_cast(self()))); } } } @@ -414,8 +432,8 @@ void PeerNetwork::Peer::reset_conn(conn_t new_conn) { { if (conn) { - SALTICIDAE_LOG_DEBUG("moving send buffer"); - new_conn->move_send_buffer(conn); + //SALTICIDAE_LOG_DEBUG("moving send buffer"); + //new_conn->move_send_buffer(conn); SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str()); conn->terminate(); } diff --git a/include/salticidae/queue.h b/include/salticidae/queue.h index 88b3fc3..9045e2b 100644 --- a/include/salticidae/queue.h +++ b/include/salticidae/queue.h @@ -90,6 +90,7 @@ class FreeList { template class MPMCQueue { + protected: struct Block: public FreeList::Node { T elem; std::atomic next; @@ -171,6 +172,33 @@ class MPMCQueue { } }; +template +struct MPSCQueue: public MPMCQueue { + using MPMCQueue::MPMCQueue; + bool try_dequeue(T &e) { + auto h = this->head.load(std::memory_order_acquire); + auto nh = h->next.load(std::memory_order_relaxed); + if (nh == nullptr) + return false; + e = std::move(nh->elem); + this->head.store(nh, std::memory_order_release); + this->blks.push(h); + return true; + } + + template + bool rewind(U &&e) { + FreeList::Node * _nblk; + if (!this->blks.pop(_nblk)) return false; + auto nblk = static_cast::Block *>(_nblk); + auto h = this->head.load(std::memory_order_acquire); + nblk->next.store(h, std::memory_order_release); + new (&(h->elem)) T(std::forward(e)); + this->head.store(nblk, std::memory_order_release); + return true; + } +}; + } #endif -- cgit v1.2.3-70-g09d2