diff options
-rw-r--r-- | include/salticidae/event.h | 93 | ||||
-rw-r--r-- | include/salticidae/queue.h | 176 | ||||
-rw-r--r-- | test/.gitignore | 1 | ||||
-rw-r--r-- | test/CMakeLists.txt | 3 | ||||
-rw-r--r-- | test/test_queue.cpp | 92 |
5 files changed, 365 insertions, 0 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 5b41d9e..99fff2f 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -25,8 +25,14 @@ #ifndef _SALTICIDAE_EVENT_H #define _SALTICIDAE_EVENT_H +#include <unistd.h> #include <event2/event.h> +#include <sys/eventfd.h> +#ifdef SALTICIDAE_DEBUG_LOG +#include <thread> +#endif +#include "salticidae/queue.h" #include "salticidae/util.h" #include "salticidae/ref.h" @@ -50,6 +56,7 @@ class EventContext: public _event_context_ot { EventContext &operator=(const EventContext &) = default; EventContext &operator=(EventContext &&) = default; void dispatch() { event_base_dispatch(get()); } + void stop() { event_base_loopbreak(get()); } }; class Event { @@ -76,6 +83,7 @@ class Event { ev(event_new(eb.get(), fd, events, Event::_then, this)), callback(callback) {} + Event(const Event &) = delete; Event(Event &&other): eb(std::move(other.eb)), fd(other.fd), events(other.events), callback(std::move(other.callback)) { @@ -115,6 +123,91 @@ class Event { operator bool() const { return ev != nullptr; } }; +template<typename T> +class MPSCQueueEventDriven: public MPMCQueue<T> { + private: + std::atomic<bool> wait_sig; + int fd; + Event ev; + + public: + template<typename Func> + MPSCQueueEventDriven(const EventContext &ec, Func &&func, size_t capacity = 65536): + MPMCQueue<T>(capacity), + wait_sig(true), + fd(eventfd(0, EFD_NONBLOCK)), + ev(Event(ec, fd, EV_READ | EV_PERSIST, + [this, func=std::forward<Func>(func)](int, short) { + uint64_t t; + read(fd, &t, 8); + SALTICIDAE_LOG_DEBUG("%x\n", std::this_thread::get_id()); + T elem; + while (MPMCQueue<T>::try_dequeue(elem)) + func(std::move(elem)); + wait_sig.store(true, std::memory_order_relaxed); + })) { ev.add(); } + + ~MPSCQueueEventDriven() { close(fd); } + + template<typename U> + bool enqueue(U &&e) { + static const uint64_t dummy = 1; + bool ret = MPMCQueue<T>::enqueue(std::forward<U>(e)); + if (wait_sig.exchange(false, std::memory_order_relaxed)) + { + SALTICIDAE_LOG_DEBUG("mpsc notify"); + write(fd, &dummy, 8); + } + return ret; + } +}; + +template<typename T> +class MPMCQueueEventDriven: public MPMCQueue<T> { + private: + std::atomic<bool> wait_sig; + std::vector<std::pair<BoxObj<Event>, int>> evs; + + public: + MPMCQueueEventDriven(size_t capacity = 65536): + MPMCQueue<T>(capacity), + wait_sig(true) {} + + template<typename Func> + void listen(const EventContext &ec, Func &&func) { + int fd = eventfd(0, EFD_NONBLOCK); + evs.emplace(evs.end(), std::make_pair(new Event(ec, fd, EV_READ | EV_PERSIST, + [this, func=std::forward<Func>(func)](int fd, short) { + uint64_t t; + read(fd, &t, 8); + SALTICIDAE_LOG_DEBUG("%x\n", std::this_thread::get_id()); + T elem; + while (MPMCQueue<T>::try_dequeue(elem)) + func(std::move(elem)); + wait_sig.store(true, std::memory_order_relaxed); + }), fd)); + evs.rbegin()->first->add(); + } + + ~MPMCQueueEventDriven() { + for (const auto &p: evs) + close(p.second); + } + + template<typename U> + bool enqueue(U &&e) { + static const uint64_t dummy = 1; + bool ret = MPMCQueue<T>::enqueue(std::forward<U>(e)); + if (wait_sig.exchange(false, std::memory_order_relaxed)) + { + SALTICIDAE_LOG_DEBUG("mpmc notify"); + for (const auto &p: evs) + write(p.second, &dummy, 8); + } + return ret; + } +}; + } #endif diff --git a/include/salticidae/queue.h b/include/salticidae/queue.h new file mode 100644 index 0000000..88b3fc3 --- /dev/null +++ b/include/salticidae/queue.h @@ -0,0 +1,176 @@ +#ifndef _SALTICIDAE_QUEUE_H +#define _SALTICIDAE_QUEUE_H + +#include <atomic> +#include <vector> +#include <cassert> + +namespace salticidae { + +static size_t const cacheline_size = 64; + +class FreeList { + public: + struct Node { + std::atomic<Node *> next; + std::atomic<bool> addback; + std::atomic<size_t> refcnt; + Node(): next(nullptr), addback(false), refcnt(1) {} + }; + + private: + alignas(cacheline_size) std::atomic<Node *> top; + + public: + FreeList(): top(nullptr) {} + FreeList(const FreeList &) = delete; + FreeList(FreeList &&) = delete; + + void release_ref(Node *u) { + if (u->refcnt.fetch_sub(1, std::memory_order_relaxed) != 1) return; + u->addback.store(false, std::memory_order_relaxed); + //if (!u->addback.exchange(false, std::memory_order_relaxed)) return; + for (;;) + { + auto t = top.load(std::memory_order_acquire); + // repair the next pointer before CAS, otherwise u->next == nullptr + // could lead to skipping elements + u->next.store(t, std::memory_order_consume); + // the replacement is ok even if ABA happens + if (top.compare_exchange_weak(t, u, std::memory_order_consume)) + { + u->refcnt.store(1, std::memory_order_relaxed); + break; + } + } + } + + bool push(Node *u) { + assert(u->addback.load() == false); + // attempt to push it + u->addback.store(true, std::memory_order_release); + release_ref(u); + return true; + } + + bool pop(Node *&r) { + bool loop = true; + while (loop) + { + auto u = top.load(std::memory_order_acquire); + /* the list is now empty */ + if (u == nullptr) return false; + auto t = u->refcnt.load(std::memory_order_acquire); + /* let's wait for another round if u is a ghost (already popped) */ + if (!t) continue; + /* otherwise t > 0, so with CAS, the invariant that zero refcnt can + * never be increased is guaranteed */ + if (u->refcnt.compare_exchange_weak(t, t + 1, std::memory_order_consume)) + { + /* here, nobody is able to change v->next (because v->next is + * only changed when pushed) even when ABA happens */ + auto v = u; + auto nv = u->next.load(std::memory_order_acquire); + if (top.compare_exchange_weak(v, nv, std::memory_order_consume)) + { + /* manage to pop the head */ + r = u; + loop = false; + /* do not need to try cas_push here because the current + * thread is the only one who can push u back */ + } + /* release the refcnt and execute the delayed push call if + * necessary */ + release_ref(u); + } + } + return true; + } +}; + +template<typename T> +class MPMCQueue { + struct Block: public FreeList::Node { + T elem; + std::atomic<Block *> next; + }; + + FreeList blks; + + alignas(cacheline_size) std::atomic<Block *> head; + alignas(cacheline_size) std::atomic<Block *> tail; + + template<typename U> + void _enqueue(Block *nblk, U &&e) { + new (&(nblk->elem)) T(std::forward<U>(e)); + nblk->next.store(nullptr, std::memory_order_release); + auto prev = tail.exchange(nblk, std::memory_order_relaxed); + prev->next.store(nblk, std::memory_order_relaxed); + } + + public: + MPMCQueue(const MPMCQueue &) = delete; + MPMCQueue(MPMCQueue &&) = delete; + + MPMCQueue(size_t capacity = 65536): head(new Block()), tail(head.load()) { + head.load()->next = nullptr; + while (capacity--) + blks.push(new Block()); + } + + ~MPMCQueue() { + for (FreeList::Node *ptr; blks.pop(ptr); ) delete ptr; + for (Block *ptr = head.load(), *nptr; ptr; ptr = nptr) + { + nptr = ptr->next; + delete ptr; + } + } + + template<typename U> + bool enqueue(U &&e) { + FreeList::Node * _nblk; + if (!blks.pop(_nblk)) _nblk = new Block(); + _enqueue(static_cast<Block *>(_nblk), std::forward<U>(e)); + return true; + } + + template<typename U> + bool try_enqueue(U &&e) { + FreeList::Node * _nblk; + if (!blks.pop(_nblk)) return false; + _enqueue(static_cast<Block *>(_nblk), std::forward<U>(e)); + return true; + } + + bool try_dequeue(T &e) { + for (;;) + { + auto h = head.load(std::memory_order_acquire); + auto t = h->refcnt.load(std::memory_order_relaxed); + if (!t) continue; + if (h->refcnt.compare_exchange_weak(t, t + 1, std::memory_order_consume)) + { + auto nh = h->next.load(std::memory_order_relaxed); + if (nh == nullptr) + { + blks.release_ref(h); + return false; + } + e = std::move(nh->elem); + auto hh = h; + if (head.compare_exchange_weak(hh, nh, std::memory_order_consume)) + { + blks.release_ref(h); + blks.push(h); + return true; + } + blks.release_ref(h); + } + } + } +}; + +} + +#endif diff --git a/test/.gitignore b/test/.gitignore index 2cd44f2..592f084 100644 --- a/test/.gitignore +++ b/test/.gitignore @@ -1,5 +1,6 @@ test_msg test_bits test_network +test_queue bench_network Makefile diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 9689df3..44c984d 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -29,5 +29,8 @@ target_link_libraries(test_bits salticidae_static) add_executable(test_network test_network.cpp) target_link_libraries(test_network salticidae_static) +add_executable(test_queue test_queue.cpp) +target_link_libraries(test_queue salticidae_static pthread) + add_executable(bench_network bench_network.cpp) target_link_libraries(bench_network salticidae_static pthread) diff --git a/test/test_queue.cpp b/test/test_queue.cpp new file mode 100644 index 0000000..bed88c4 --- /dev/null +++ b/test/test_queue.cpp @@ -0,0 +1,92 @@ +#include <cstdio> +#include <thread> +#include <atomic> + +#include "salticidae/event.h" + +void test_mpsc(int nproducers = 16, int nops = 100000) { + size_t total = nproducers * nops; + salticidae::EventContext ec; + std::atomic<size_t> collected(0); + salticidae::MPSCQueueEventDriven<int> q(ec, [&collected](int x) { + printf("%d\n", x); + collected.fetch_add(1); + }); + std::vector<std::thread> producers; + std::thread consumer([&collected, total, &ec]() { + salticidae::Event timer(ec, -1, EV_TIMEOUT | EV_PERSIST, + [&ec, &collected, total](int, short) { + if (collected.load() == total) ec.stop(); + }); + timer.add_with_timeout(1); + ec.dispatch(); + }); + for (int i = 0; i < nproducers; i++) + { + producers.emplace(producers.end(), std::thread([&q, nops, i, nproducers]() { + int x = i; + for (int j = 0; j < nops; j++) + { + //usleep(rand() / double(RAND_MAX) * 100); + q.enqueue(x); + x += nproducers; + } + })); + } + for (auto &t: producers) t.join(); + fprintf(stderr, "producers terminate\n"); + consumer.join(); + fprintf(stderr, "consumers terminate\n"); +} + +void test_mpmc(int nproducers = 16, int nconsumers = 4, int nops = 100000) { + size_t total = nproducers * nops; + salticidae::MPMCQueueEventDriven<int> q; + std::vector<std::thread> producers; + std::vector<std::thread> consumers; + std::vector<salticidae::EventContext> ecs; + std::atomic<size_t> collected(0); + ecs.resize(nconsumers); + for (int i = 0; i < nconsumers; i++) + { + q.listen(ecs[i], [&collected](int x) { + //usleep(10); + printf("%d\n", x); + collected.fetch_add(1); + }); + } + for (int i = 0; i < nconsumers; i++) + { + consumers.emplace(consumers.end(), std::thread( + [&collected, total, &ec = ecs[i]]() { + salticidae::Event timer(ec, -1, EV_TIMEOUT | EV_PERSIST, + [&ec, &collected, total](int, short) { + if (collected.load() == total) ec.stop(); + }); + timer.add_with_timeout(1); + ec.dispatch(); + })); + } + for (int i = 0; i < nproducers; i++) + { + producers.emplace(producers.end(), std::thread([&q, nops, i, nproducers]() { + int x = i; + for (int j = 0; j < nops; j++) + { + //usleep(rand() / double(RAND_MAX) * 100); + q.enqueue(x); + x += nproducers; + } + })); + } + for (auto &t: producers) t.join(); + fprintf(stderr, "producers terminate\n"); + for (auto &t: consumers) t.join(); + fprintf(stderr, "consumers terminate\n"); +} + +int main() { + //test_mpsc(); + test_mpmc(); + return 0; +} |