aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/salticidae/event.h93
-rw-r--r--include/salticidae/queue.h176
-rw-r--r--test/.gitignore1
-rw-r--r--test/CMakeLists.txt3
-rw-r--r--test/test_queue.cpp92
5 files changed, 365 insertions, 0 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index 5b41d9e..99fff2f 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -25,8 +25,14 @@
#ifndef _SALTICIDAE_EVENT_H
#define _SALTICIDAE_EVENT_H
+#include <unistd.h>
#include <event2/event.h>
+#include <sys/eventfd.h>
+#ifdef SALTICIDAE_DEBUG_LOG
+#include <thread>
+#endif
+#include "salticidae/queue.h"
#include "salticidae/util.h"
#include "salticidae/ref.h"
@@ -50,6 +56,7 @@ class EventContext: public _event_context_ot {
EventContext &operator=(const EventContext &) = default;
EventContext &operator=(EventContext &&) = default;
void dispatch() { event_base_dispatch(get()); }
+ void stop() { event_base_loopbreak(get()); }
};
class Event {
@@ -76,6 +83,7 @@ class Event {
ev(event_new(eb.get(), fd, events, Event::_then, this)),
callback(callback) {}
+ Event(const Event &) = delete;
Event(Event &&other):
eb(std::move(other.eb)), fd(other.fd), events(other.events),
callback(std::move(other.callback)) {
@@ -115,6 +123,91 @@ class Event {
operator bool() const { return ev != nullptr; }
};
+template<typename T>
+class MPSCQueueEventDriven: public MPMCQueue<T> {
+ private:
+ std::atomic<bool> wait_sig;
+ int fd;
+ Event ev;
+
+ public:
+ template<typename Func>
+ MPSCQueueEventDriven(const EventContext &ec, Func &&func, size_t capacity = 65536):
+ MPMCQueue<T>(capacity),
+ wait_sig(true),
+ fd(eventfd(0, EFD_NONBLOCK)),
+ ev(Event(ec, fd, EV_READ | EV_PERSIST,
+ [this, func=std::forward<Func>(func)](int, short) {
+ uint64_t t;
+ read(fd, &t, 8);
+ SALTICIDAE_LOG_DEBUG("%x\n", std::this_thread::get_id());
+ T elem;
+ while (MPMCQueue<T>::try_dequeue(elem))
+ func(std::move(elem));
+ wait_sig.store(true, std::memory_order_relaxed);
+ })) { ev.add(); }
+
+ ~MPSCQueueEventDriven() { close(fd); }
+
+ template<typename U>
+ bool enqueue(U &&e) {
+ static const uint64_t dummy = 1;
+ bool ret = MPMCQueue<T>::enqueue(std::forward<U>(e));
+ if (wait_sig.exchange(false, std::memory_order_relaxed))
+ {
+ SALTICIDAE_LOG_DEBUG("mpsc notify");
+ write(fd, &dummy, 8);
+ }
+ return ret;
+ }
+};
+
+template<typename T>
+class MPMCQueueEventDriven: public MPMCQueue<T> {
+ private:
+ std::atomic<bool> wait_sig;
+ std::vector<std::pair<BoxObj<Event>, int>> evs;
+
+ public:
+ MPMCQueueEventDriven(size_t capacity = 65536):
+ MPMCQueue<T>(capacity),
+ wait_sig(true) {}
+
+ template<typename Func>
+ void listen(const EventContext &ec, Func &&func) {
+ int fd = eventfd(0, EFD_NONBLOCK);
+ evs.emplace(evs.end(), std::make_pair(new Event(ec, fd, EV_READ | EV_PERSIST,
+ [this, func=std::forward<Func>(func)](int fd, short) {
+ uint64_t t;
+ read(fd, &t, 8);
+ SALTICIDAE_LOG_DEBUG("%x\n", std::this_thread::get_id());
+ T elem;
+ while (MPMCQueue<T>::try_dequeue(elem))
+ func(std::move(elem));
+ wait_sig.store(true, std::memory_order_relaxed);
+ }), fd));
+ evs.rbegin()->first->add();
+ }
+
+ ~MPMCQueueEventDriven() {
+ for (const auto &p: evs)
+ close(p.second);
+ }
+
+ template<typename U>
+ bool enqueue(U &&e) {
+ static const uint64_t dummy = 1;
+ bool ret = MPMCQueue<T>::enqueue(std::forward<U>(e));
+ if (wait_sig.exchange(false, std::memory_order_relaxed))
+ {
+ SALTICIDAE_LOG_DEBUG("mpmc notify");
+ for (const auto &p: evs)
+ write(p.second, &dummy, 8);
+ }
+ return ret;
+ }
+};
+
}
#endif
diff --git a/include/salticidae/queue.h b/include/salticidae/queue.h
new file mode 100644
index 0000000..88b3fc3
--- /dev/null
+++ b/include/salticidae/queue.h
@@ -0,0 +1,176 @@
+#ifndef _SALTICIDAE_QUEUE_H
+#define _SALTICIDAE_QUEUE_H
+
+#include <atomic>
+#include <vector>
+#include <cassert>
+
+namespace salticidae {
+
+static size_t const cacheline_size = 64;
+
+class FreeList {
+ public:
+ struct Node {
+ std::atomic<Node *> next;
+ std::atomic<bool> addback;
+ std::atomic<size_t> refcnt;
+ Node(): next(nullptr), addback(false), refcnt(1) {}
+ };
+
+ private:
+ alignas(cacheline_size) std::atomic<Node *> top;
+
+ public:
+ FreeList(): top(nullptr) {}
+ FreeList(const FreeList &) = delete;
+ FreeList(FreeList &&) = delete;
+
+ void release_ref(Node *u) {
+ if (u->refcnt.fetch_sub(1, std::memory_order_relaxed) != 1) return;
+ u->addback.store(false, std::memory_order_relaxed);
+ //if (!u->addback.exchange(false, std::memory_order_relaxed)) return;
+ for (;;)
+ {
+ auto t = top.load(std::memory_order_acquire);
+ // repair the next pointer before CAS, otherwise u->next == nullptr
+ // could lead to skipping elements
+ u->next.store(t, std::memory_order_consume);
+ // the replacement is ok even if ABA happens
+ if (top.compare_exchange_weak(t, u, std::memory_order_consume))
+ {
+ u->refcnt.store(1, std::memory_order_relaxed);
+ break;
+ }
+ }
+ }
+
+ bool push(Node *u) {
+ assert(u->addback.load() == false);
+ // attempt to push it
+ u->addback.store(true, std::memory_order_release);
+ release_ref(u);
+ return true;
+ }
+
+ bool pop(Node *&r) {
+ bool loop = true;
+ while (loop)
+ {
+ auto u = top.load(std::memory_order_acquire);
+ /* the list is now empty */
+ if (u == nullptr) return false;
+ auto t = u->refcnt.load(std::memory_order_acquire);
+ /* let's wait for another round if u is a ghost (already popped) */
+ if (!t) continue;
+ /* otherwise t > 0, so with CAS, the invariant that zero refcnt can
+ * never be increased is guaranteed */
+ if (u->refcnt.compare_exchange_weak(t, t + 1, std::memory_order_consume))
+ {
+ /* here, nobody is able to change v->next (because v->next is
+ * only changed when pushed) even when ABA happens */
+ auto v = u;
+ auto nv = u->next.load(std::memory_order_acquire);
+ if (top.compare_exchange_weak(v, nv, std::memory_order_consume))
+ {
+ /* manage to pop the head */
+ r = u;
+ loop = false;
+ /* do not need to try cas_push here because the current
+ * thread is the only one who can push u back */
+ }
+ /* release the refcnt and execute the delayed push call if
+ * necessary */
+ release_ref(u);
+ }
+ }
+ return true;
+ }
+};
+
+template<typename T>
+class MPMCQueue {
+ struct Block: public FreeList::Node {
+ T elem;
+ std::atomic<Block *> next;
+ };
+
+ FreeList blks;
+
+ alignas(cacheline_size) std::atomic<Block *> head;
+ alignas(cacheline_size) std::atomic<Block *> tail;
+
+ template<typename U>
+ void _enqueue(Block *nblk, U &&e) {
+ new (&(nblk->elem)) T(std::forward<U>(e));
+ nblk->next.store(nullptr, std::memory_order_release);
+ auto prev = tail.exchange(nblk, std::memory_order_relaxed);
+ prev->next.store(nblk, std::memory_order_relaxed);
+ }
+
+ public:
+ MPMCQueue(const MPMCQueue &) = delete;
+ MPMCQueue(MPMCQueue &&) = delete;
+
+ MPMCQueue(size_t capacity = 65536): head(new Block()), tail(head.load()) {
+ head.load()->next = nullptr;
+ while (capacity--)
+ blks.push(new Block());
+ }
+
+ ~MPMCQueue() {
+ for (FreeList::Node *ptr; blks.pop(ptr); ) delete ptr;
+ for (Block *ptr = head.load(), *nptr; ptr; ptr = nptr)
+ {
+ nptr = ptr->next;
+ delete ptr;
+ }
+ }
+
+ template<typename U>
+ bool enqueue(U &&e) {
+ FreeList::Node * _nblk;
+ if (!blks.pop(_nblk)) _nblk = new Block();
+ _enqueue(static_cast<Block *>(_nblk), std::forward<U>(e));
+ return true;
+ }
+
+ template<typename U>
+ bool try_enqueue(U &&e) {
+ FreeList::Node * _nblk;
+ if (!blks.pop(_nblk)) return false;
+ _enqueue(static_cast<Block *>(_nblk), std::forward<U>(e));
+ return true;
+ }
+
+ bool try_dequeue(T &e) {
+ for (;;)
+ {
+ auto h = head.load(std::memory_order_acquire);
+ auto t = h->refcnt.load(std::memory_order_relaxed);
+ if (!t) continue;
+ if (h->refcnt.compare_exchange_weak(t, t + 1, std::memory_order_consume))
+ {
+ auto nh = h->next.load(std::memory_order_relaxed);
+ if (nh == nullptr)
+ {
+ blks.release_ref(h);
+ return false;
+ }
+ e = std::move(nh->elem);
+ auto hh = h;
+ if (head.compare_exchange_weak(hh, nh, std::memory_order_consume))
+ {
+ blks.release_ref(h);
+ blks.push(h);
+ return true;
+ }
+ blks.release_ref(h);
+ }
+ }
+ }
+};
+
+}
+
+#endif
diff --git a/test/.gitignore b/test/.gitignore
index 2cd44f2..592f084 100644
--- a/test/.gitignore
+++ b/test/.gitignore
@@ -1,5 +1,6 @@
test_msg
test_bits
test_network
+test_queue
bench_network
Makefile
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 9689df3..44c984d 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -29,5 +29,8 @@ target_link_libraries(test_bits salticidae_static)
add_executable(test_network test_network.cpp)
target_link_libraries(test_network salticidae_static)
+add_executable(test_queue test_queue.cpp)
+target_link_libraries(test_queue salticidae_static pthread)
+
add_executable(bench_network bench_network.cpp)
target_link_libraries(bench_network salticidae_static pthread)
diff --git a/test/test_queue.cpp b/test/test_queue.cpp
new file mode 100644
index 0000000..bed88c4
--- /dev/null
+++ b/test/test_queue.cpp
@@ -0,0 +1,92 @@
+#include <cstdio>
+#include <thread>
+#include <atomic>
+
+#include "salticidae/event.h"
+
+void test_mpsc(int nproducers = 16, int nops = 100000) {
+ size_t total = nproducers * nops;
+ salticidae::EventContext ec;
+ std::atomic<size_t> collected(0);
+ salticidae::MPSCQueueEventDriven<int> q(ec, [&collected](int x) {
+ printf("%d\n", x);
+ collected.fetch_add(1);
+ });
+ std::vector<std::thread> producers;
+ std::thread consumer([&collected, total, &ec]() {
+ salticidae::Event timer(ec, -1, EV_TIMEOUT | EV_PERSIST,
+ [&ec, &collected, total](int, short) {
+ if (collected.load() == total) ec.stop();
+ });
+ timer.add_with_timeout(1);
+ ec.dispatch();
+ });
+ for (int i = 0; i < nproducers; i++)
+ {
+ producers.emplace(producers.end(), std::thread([&q, nops, i, nproducers]() {
+ int x = i;
+ for (int j = 0; j < nops; j++)
+ {
+ //usleep(rand() / double(RAND_MAX) * 100);
+ q.enqueue(x);
+ x += nproducers;
+ }
+ }));
+ }
+ for (auto &t: producers) t.join();
+ fprintf(stderr, "producers terminate\n");
+ consumer.join();
+ fprintf(stderr, "consumers terminate\n");
+}
+
+void test_mpmc(int nproducers = 16, int nconsumers = 4, int nops = 100000) {
+ size_t total = nproducers * nops;
+ salticidae::MPMCQueueEventDriven<int> q;
+ std::vector<std::thread> producers;
+ std::vector<std::thread> consumers;
+ std::vector<salticidae::EventContext> ecs;
+ std::atomic<size_t> collected(0);
+ ecs.resize(nconsumers);
+ for (int i = 0; i < nconsumers; i++)
+ {
+ q.listen(ecs[i], [&collected](int x) {
+ //usleep(10);
+ printf("%d\n", x);
+ collected.fetch_add(1);
+ });
+ }
+ for (int i = 0; i < nconsumers; i++)
+ {
+ consumers.emplace(consumers.end(), std::thread(
+ [&collected, total, &ec = ecs[i]]() {
+ salticidae::Event timer(ec, -1, EV_TIMEOUT | EV_PERSIST,
+ [&ec, &collected, total](int, short) {
+ if (collected.load() == total) ec.stop();
+ });
+ timer.add_with_timeout(1);
+ ec.dispatch();
+ }));
+ }
+ for (int i = 0; i < nproducers; i++)
+ {
+ producers.emplace(producers.end(), std::thread([&q, nops, i, nproducers]() {
+ int x = i;
+ for (int j = 0; j < nops; j++)
+ {
+ //usleep(rand() / double(RAND_MAX) * 100);
+ q.enqueue(x);
+ x += nproducers;
+ }
+ }));
+ }
+ for (auto &t: producers) t.join();
+ fprintf(stderr, "producers terminate\n");
+ for (auto &t: consumers) t.join();
+ fprintf(stderr, "consumers terminate\n");
+}
+
+int main() {
+ //test_mpsc();
+ test_mpmc();
+ return 0;
+}