From b7802b3b06511f067719cb845dfd03a223f0e18f Mon Sep 17 00:00:00 2001 From: Determinant Date: Wed, 3 Apr 2019 14:55:26 -0400 Subject: improve non-blocking API --- CMakeLists.txt | 4 +- include/salticidae/buffer.h | 8 +- include/salticidae/conn.h | 15 ++- include/salticidae/event.h | 218 +++++++++++++++++++++---------------------- include/salticidae/network.h | 25 ++++- include/salticidae/queue.h | 16 +++- src/conn.cpp | 2 + test/bench_network.cpp | 2 +- 8 files changed, 163 insertions(+), 127 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index b441591..903a66f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -62,5 +62,5 @@ option(SALTICIDAE_NOCHECKSUM " disable checksum in messages" OFF) configure_file(src/config.h.in include/salticidae/config.h @ONLY) -set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -W -Wall -Wextra -pedantic -Wsuggest-override") -set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -W -Wall -Wextra -pedantic -Wsuggest-override") +set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -faligned-new -W -Wall -Wextra -pedantic -Wsuggest-override") +set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -faligned-new -W -Wall -Wextra -pedantic -Wsuggest-override") diff --git a/include/salticidae/buffer.h b/include/salticidae/buffer.h index 2324d8b..c7b11dd 100644 --- a/include/salticidae/buffer.h +++ b/include/salticidae/buffer.h @@ -126,14 +126,14 @@ struct MPSCWriteBuffer { MPSCWriteBuffer(const SegBuffer &other) = delete; MPSCWriteBuffer(SegBuffer &&other) = delete; + void set_capacity(size_t capacity) { buffer.set_capacity(capacity); } + void rewind(bytearray_t &&data) { buffer.rewind(buffer_entry_t(std::move(data))); } - void push(bytearray_t &&data) { - buffer_entry_t d(std::move(data)); - // TODO: better bounded buffer impl - while (!buffer.try_enqueue(d)) {} + bool push(bytearray_t &&data, bool unbounded) { + return buffer.enqueue(buffer_entry_t(std::move(data)), unbounded); } bytearray_t move_pop() { diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index b4df259..201e574 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -129,8 +129,8 @@ class ConnPool { /** Write data to the connection (non-blocking). The data will be sent * whenever I/O is available. */ - void write(bytearray_t &&data) { - send_buffer.push(std::move(data)); + bool write(bytearray_t &&data) { + return send_buffer.push(std::move(data), !cpool->queue_capacity); } protected: @@ -156,6 +156,7 @@ class ConnPool { const int max_listen_backlog; const double conn_server_timeout; const size_t seg_buff_size; + const size_t queue_capacity; /* owned by user loop */ BoxObj user_tcall; @@ -287,13 +288,15 @@ class ConnPool { double _conn_server_timeout; size_t _seg_buff_size; size_t _nworker; + size_t _queue_capacity; public: Config(): _max_listen_backlog(10), _conn_server_timeout(2), _seg_buff_size(4096), - _nworker(1) {} + _nworker(1), + _queue_capacity(0) {} Config &max_listen_backlog(int x) { _max_listen_backlog = x; @@ -314,6 +317,11 @@ class ConnPool { _nworker = std::max((size_t)1, x); return *this; } + + Config &queue_capacity(size_t x) { + _queue_capacity = x; + return *this; + } }; ConnPool(const EventContext &ec, const Config &config): @@ -321,6 +329,7 @@ class ConnPool { max_listen_backlog(config._max_listen_backlog), conn_server_timeout(config._conn_server_timeout), seg_buff_size(config._seg_buff_size), + queue_capacity(config._queue_capacity), listen_fd(-1), nworker(config._nworker), worker_running(false) { diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 641bb8e..0600109 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -434,107 +434,6 @@ class ThreadNotifier { } }; -class ThreadCall { - int ctl_fd[2]; - EventContext ec; - FdEvent ev_listen; - - public: - struct Result { - void *data; - std::function deleter; - Result(): data(nullptr) {} - Result(void *data, std::function &&deleter): - data(data), deleter(std::move(deleter)) {} - ~Result() { if (data != nullptr) deleter(data); } - Result(const Result &) = delete; - Result(Result &&other): - data(other.data), deleter(std::move(other.deleter)) { - other.data = nullptr; - } - void swap(Result &other) { - std::swap(data, other.data); - std::swap(deleter, other.deleter); - } - Result &operator=(const Result &other) = delete; - Result &operator=(Result &&other) { - if (this != &other) - { - Result tmp(std::move(other)); - tmp.swap(*this); - } - return *this; - } - void *get() { return data; } - }; - class Handle { - std::function callback; - ThreadNotifier * notifier; - Result result; - friend ThreadCall; - public: - Handle(): notifier(nullptr) {} - void exec() { - callback(*this); - if (notifier) - notifier->notify(std::move(result)); - } - template - void set_result(T &&data) { - using _T = std::remove_reference_t; - result = Result(new _T(std::forward(data)), - [](void *ptr) {delete static_cast<_T *>(ptr);}); - } - }; - - ThreadCall() = default; - ThreadCall(const ThreadCall &) = delete; - ThreadCall(ThreadCall &&) = delete; - ThreadCall(EventContext ec): ec(ec) { - if (pipe2(ctl_fd, O_NONBLOCK)) - throw SalticidaeError(std::string("ThreadCall: failed to create pipe")); - ev_listen = FdEvent(ec, ctl_fd[0], [this](int fd, int) { - Handle *h; - read(fd, &h, sizeof(h)); - std::atomic_thread_fence(std::memory_order_acquire); - h->exec(); - delete h; - }); - ev_listen.add(FdEvent::READ); - } - - ~ThreadCall() { - ev_listen.clear(); - Handle *h; - while (read(ctl_fd[0], &h, sizeof(h)) == sizeof(h)) - delete h; - close(ctl_fd[0]); - close(ctl_fd[1]); - } - - template - void async_call(Func callback) { - auto h = new Handle(); - h->callback = callback; - std::atomic_thread_fence(std::memory_order_release); - write(ctl_fd[1], &h, sizeof(h)); - } - - template - Result call(Func callback) { - auto h = new Handle(); - h->callback = callback; - ThreadNotifier notifier; - h->notifier = ¬ifier; - std::atomic_thread_fence(std::memory_order_release); - write(ctl_fd[1], &h, sizeof(h)); - return notifier.wait(); - } - - const EventContext &get_ec() const { return ec; } -}; - - template class MPSCQueueEventDriven: public MPSCQueue { private: @@ -544,8 +443,7 @@ class MPSCQueueEventDriven: public MPSCQueue { FdEvent ev; public: - MPSCQueueEventDriven(size_t capacity = 65536): - MPSCQueue(capacity), + MPSCQueueEventDriven(): wait_sig(true), fd(eventfd(0, EFD_NONBLOCK)) {} @@ -577,9 +475,10 @@ class MPSCQueueEventDriven: public MPSCQueue { void unreg_handler() { ev.clear(); } template - bool enqueue(U &&e) { + bool enqueue(U &&e, bool unbounded = true) { static const uint64_t dummy = 1; - MPSCQueue::enqueue(std::forward(e)); + if (!MPSCQueue::enqueue(std::forward(e), unbounded)) + return false; // memory barrier here, so any load/store in enqueue must be finialized if (wait_sig.exchange(false, std::memory_order_acq_rel)) { @@ -613,8 +512,7 @@ class MPMCQueueEventDriven: public MPMCQueue { std::vector evs; public: - MPMCQueueEventDriven(size_t capacity = 65536): - MPMCQueue(capacity), + MPMCQueueEventDriven(): wait_sig(true), fd(eventfd(0, EFD_NONBLOCK)) {} @@ -642,9 +540,10 @@ class MPMCQueueEventDriven: public MPMCQueue { void unreg_handlers() { evs.clear(); } template - bool enqueue(U &&e) { + bool enqueue(U &&e, bool unbounded = true) { static const uint64_t dummy = 1; - MPMCQueue::enqueue(std::forward(e)); + if (!MPMCQueue::enqueue(std::forward(e), unbounded)) + return false; // memory barrier here, so any load/store in enqueue must be finialized if (wait_sig.exchange(false, std::memory_order_acq_rel)) { @@ -655,6 +554,107 @@ class MPMCQueueEventDriven: public MPMCQueue { } }; +class ThreadCall { + public: class Handle; + private: + int ctl_fd[2]; + EventContext ec; + const size_t burst_size; + using queue_t = MPSCQueueEventDriven; + queue_t q; + + public: + struct Result { + void *data; + std::function deleter; + Result(): data(nullptr) {} + Result(void *data, std::function &&deleter): + data(data), deleter(std::move(deleter)) {} + ~Result() { if (data != nullptr) deleter(data); } + Result(const Result &) = delete; + Result(Result &&other): + data(other.data), deleter(std::move(other.deleter)) { + other.data = nullptr; + } + void swap(Result &other) { + std::swap(data, other.data); + std::swap(deleter, other.deleter); + } + Result &operator=(const Result &other) = delete; + Result &operator=(Result &&other) { + if (this != &other) + { + Result tmp(std::move(other)); + tmp.swap(*this); + } + return *this; + } + void *get() { return data; } + }; + class Handle { + std::function callback; + ThreadNotifier * notifier; + Result result; + friend ThreadCall; + public: + Handle(): notifier(nullptr) {} + void exec() { + callback(*this); + if (notifier) + notifier->notify(std::move(result)); + } + template + void set_result(T &&data) { + using _T = std::remove_reference_t; + result = Result(new _T(std::forward(data)), + [](void *ptr) {delete static_cast<_T *>(ptr);}); + } + }; + + ThreadCall(size_t burst_size): burst_size(burst_size) {} + ThreadCall(const ThreadCall &) = delete; + ThreadCall(ThreadCall &&) = delete; + ThreadCall(EventContext ec, size_t burst_size = 128): ec(ec), burst_size(burst_size) { + q.reg_handler(ec, [this, burst_size=burst_size](queue_t &q) { + size_t cnt = 0; + Handle *h; + while (q.try_dequeue(h)) + { + h->exec(); + delete h; + if (++cnt == burst_size) return true; + } + return false; + }); + } + + ~ThreadCall() { + Handle *h; + while (q.try_dequeue(h)) delete h; + close(ctl_fd[0]); + close(ctl_fd[1]); + } + + template + void async_call(Func callback) { + auto h = new Handle(); + h->callback = callback; + q.enqueue(h); + } + + template + Result call(Func callback) { + auto h = new Handle(); + h->callback = callback; + ThreadNotifier notifier; + h->notifier = ¬ifier; + q.enqueue(h); + return notifier.wait(); + } + + const EventContext &get_ec() const { return ec; } +}; + } #endif diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 7814d56..2cef81d 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -140,6 +140,7 @@ class MsgNetwork: public ConnPool { MsgNetwork(const EventContext &ec, const Config &config): ConnPool(ec, config) { + incoming_msgs.set_capacity(65536); incoming_msgs.reg_handler(ec, [this, burst_size=config._burst_size](queue_t &q) { std::pair item; size_t cnt = 0; @@ -179,7 +180,7 @@ class MsgNetwork: public ConnPool { } template - void send_msg(MsgType &&msg, const conn_t &conn); + bool send_msg(MsgType &&msg, const conn_t &conn); using ConnPool::listen; #ifdef SALTICIDAE_MSG_STAT #endif @@ -413,6 +414,8 @@ class PeerNetwork: public MsgNetwork { using MsgNet::send_msg; template void send_msg(MsgType &&msg, const NetAddr &paddr); + template + void multicast_msg(MsgType &&msg, const std::vector &paddrs); void listen(NetAddr listen_addr); bool has_peer(const NetAddr &paddr) const; conn_t connect(const NetAddr &addr) = delete; @@ -455,16 +458,16 @@ void MsgNetwork::Conn::on_read() { template template -void MsgNetwork::send_msg(MsgType &&_msg, const conn_t &conn) { +bool MsgNetwork::send_msg(MsgType &&_msg, const conn_t &conn) { Msg msg(std::forward(_msg)); bytearray_t msg_data = msg.serialize(); SALTICIDAE_LOG_DEBUG("wrote message %s to %s", std::string(msg).c_str(), std::string(*conn).c_str()); - conn->write(std::move(msg_data)); #ifdef SALTICIDAE_MSG_STAT conn->nsent++; #endif + return conn->write(std::move(msg_data)); } template @@ -697,6 +700,22 @@ void PeerNetwork::send_msg(MsgType &&msg, const NetAddr &paddr) { send_msg(std::move(msg), it->second->conn); }); } + +template +template +void PeerNetwork::multicast_msg(MsgType &&msg, const std::vector &paddrs) { + this->disp_tcall->async_call( + [this, msg=std::forward(msg), paddrs](ThreadCall::Handle &) { + for (auto &paddr: paddrs) + { + auto it = id2peer.find(paddr); + if (it == id2peer.end()) + throw PeerNetworkError("peer does not exist"); + send_msg(std::move(msg), it->second->conn); + } + }); +} + /* end: functions invoked by the user loop */ template diff --git a/include/salticidae/queue.h b/include/salticidae/queue.h index a72a28f..0b4dae0 100644 --- a/include/salticidae/queue.h +++ b/include/salticidae/queue.h @@ -107,10 +107,8 @@ class MPMCQueue { MPMCQueue(const MPMCQueue &) = delete; MPMCQueue(MPMCQueue &&) = delete; - MPMCQueue(size_t capacity = 65536): head(new Block()), tail(head.load()) { + MPMCQueue(): head(new Block()), tail(head.load()) { head.load()->next = nullptr; - while (capacity--) - blks.push(new Block()); } ~MPMCQueue() { @@ -122,10 +120,18 @@ class MPMCQueue { } } + void set_capacity(size_t capacity = 0) { + while (capacity--) blks.push(new Block()); + } + template - bool enqueue(U &&e) { + bool enqueue(U &&e, bool unbounded = true) { FreeList::Node * _nblk; - if (!blks.pop(_nblk)) _nblk = new Block(); + if (!blks.pop(_nblk)) + { + if (unbounded) _nblk = new Block(); + else return false; + } _enqueue(static_cast(_nblk), std::forward(e)); return true; } diff --git a/src/conn.cpp b/src/conn.cpp index 4ab3040..5fc59f3 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -185,6 +185,7 @@ void ConnPool::accept_client(int fd, int) { NetAddr addr((struct sockaddr_in *)&client_addr); conn_t conn = create_conn(); conn->self_ref = conn; + conn->send_buffer.set_capacity(queue_capacity); conn->seg_buff_size = seg_buff_size; conn->fd = client_fd; conn->cpool = this; @@ -264,6 +265,7 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) { throw ConnPoolError(std::string("unable to set nonblocking socket")); conn_t conn = create_conn(); conn->self_ref = conn; + conn->send_buffer.set_capacity(queue_capacity); conn->seg_buff_size = seg_buff_size; conn->fd = fd; conn->cpool = this; diff --git a/test/bench_network.cpp b/test/bench_network.cpp index 89289fe..58f4a64 100644 --- a/test/bench_network.cpp +++ b/test/bench_network.cpp @@ -79,7 +79,7 @@ struct MyNet: public MsgNetworkByteOp { const std::string name, const NetAddr &peer, double stat_timeout = -1): - MsgNetworkByteOp(ec, MsgNetworkByteOp::Config().burst_size(1000)), + MsgNetworkByteOp(ec, MsgNetworkByteOp::Config().burst_size(1000).queue_capacity(65536)), name(name), peer(peer), ev_period_stat(ec, [this, stat_timeout](TimerEvent &) { -- cgit v1.2.3