diff options
author | Determinant <[email protected]> | 2018-10-04 18:30:23 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2018-10-04 18:30:23 -0400 |
commit | 2b584edbddc782b6436a8e28d4f8b85a11f528a6 (patch) | |
tree | 364aa1067c95976c390b61a260ecbb39d4ff6555 /include | |
parent | 0eea9ddc7cfb2820295dd87aed3dc911a100ecde (diff) |
add lock-free queue impl and event-driven queue
Diffstat (limited to 'include')
-rw-r--r-- | include/salticidae/event.h | 93 | ||||
-rw-r--r-- | include/salticidae/queue.h | 176 |
2 files changed, 269 insertions, 0 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 5b41d9e..99fff2f 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -25,8 +25,14 @@ #ifndef _SALTICIDAE_EVENT_H #define _SALTICIDAE_EVENT_H +#include <unistd.h> #include <event2/event.h> +#include <sys/eventfd.h> +#ifdef SALTICIDAE_DEBUG_LOG +#include <thread> +#endif +#include "salticidae/queue.h" #include "salticidae/util.h" #include "salticidae/ref.h" @@ -50,6 +56,7 @@ class EventContext: public _event_context_ot { EventContext &operator=(const EventContext &) = default; EventContext &operator=(EventContext &&) = default; void dispatch() { event_base_dispatch(get()); } + void stop() { event_base_loopbreak(get()); } }; class Event { @@ -76,6 +83,7 @@ class Event { ev(event_new(eb.get(), fd, events, Event::_then, this)), callback(callback) {} + Event(const Event &) = delete; Event(Event &&other): eb(std::move(other.eb)), fd(other.fd), events(other.events), callback(std::move(other.callback)) { @@ -115,6 +123,91 @@ class Event { operator bool() const { return ev != nullptr; } }; +template<typename T> +class MPSCQueueEventDriven: public MPMCQueue<T> { + private: + std::atomic<bool> wait_sig; + int fd; + Event ev; + + public: + template<typename Func> + MPSCQueueEventDriven(const EventContext &ec, Func &&func, size_t capacity = 65536): + MPMCQueue<T>(capacity), + wait_sig(true), + fd(eventfd(0, EFD_NONBLOCK)), + ev(Event(ec, fd, EV_READ | EV_PERSIST, + [this, func=std::forward<Func>(func)](int, short) { + uint64_t t; + read(fd, &t, 8); + SALTICIDAE_LOG_DEBUG("%x\n", std::this_thread::get_id()); + T elem; + while (MPMCQueue<T>::try_dequeue(elem)) + func(std::move(elem)); + wait_sig.store(true, std::memory_order_relaxed); + })) { ev.add(); } + + ~MPSCQueueEventDriven() { close(fd); } + + template<typename U> + bool enqueue(U &&e) { + static const uint64_t dummy = 1; + bool ret = MPMCQueue<T>::enqueue(std::forward<U>(e)); + if (wait_sig.exchange(false, std::memory_order_relaxed)) + { + SALTICIDAE_LOG_DEBUG("mpsc notify"); + write(fd, &dummy, 8); + } + return ret; + } +}; + +template<typename T> +class MPMCQueueEventDriven: public MPMCQueue<T> { + private: + std::atomic<bool> wait_sig; + std::vector<std::pair<BoxObj<Event>, int>> evs; + + public: + MPMCQueueEventDriven(size_t capacity = 65536): + MPMCQueue<T>(capacity), + wait_sig(true) {} + + template<typename Func> + void listen(const EventContext &ec, Func &&func) { + int fd = eventfd(0, EFD_NONBLOCK); + evs.emplace(evs.end(), std::make_pair(new Event(ec, fd, EV_READ | EV_PERSIST, + [this, func=std::forward<Func>(func)](int fd, short) { + uint64_t t; + read(fd, &t, 8); + SALTICIDAE_LOG_DEBUG("%x\n", std::this_thread::get_id()); + T elem; + while (MPMCQueue<T>::try_dequeue(elem)) + func(std::move(elem)); + wait_sig.store(true, std::memory_order_relaxed); + }), fd)); + evs.rbegin()->first->add(); + } + + ~MPMCQueueEventDriven() { + for (const auto &p: evs) + close(p.second); + } + + template<typename U> + bool enqueue(U &&e) { + static const uint64_t dummy = 1; + bool ret = MPMCQueue<T>::enqueue(std::forward<U>(e)); + if (wait_sig.exchange(false, std::memory_order_relaxed)) + { + SALTICIDAE_LOG_DEBUG("mpmc notify"); + for (const auto &p: evs) + write(p.second, &dummy, 8); + } + return ret; + } +}; + } #endif diff --git a/include/salticidae/queue.h b/include/salticidae/queue.h new file mode 100644 index 0000000..88b3fc3 --- /dev/null +++ b/include/salticidae/queue.h @@ -0,0 +1,176 @@ +#ifndef _SALTICIDAE_QUEUE_H +#define _SALTICIDAE_QUEUE_H + +#include <atomic> +#include <vector> +#include <cassert> + +namespace salticidae { + +static size_t const cacheline_size = 64; + +class FreeList { + public: + struct Node { + std::atomic<Node *> next; + std::atomic<bool> addback; + std::atomic<size_t> refcnt; + Node(): next(nullptr), addback(false), refcnt(1) {} + }; + + private: + alignas(cacheline_size) std::atomic<Node *> top; + + public: + FreeList(): top(nullptr) {} + FreeList(const FreeList &) = delete; + FreeList(FreeList &&) = delete; + + void release_ref(Node *u) { + if (u->refcnt.fetch_sub(1, std::memory_order_relaxed) != 1) return; + u->addback.store(false, std::memory_order_relaxed); + //if (!u->addback.exchange(false, std::memory_order_relaxed)) return; + for (;;) + { + auto t = top.load(std::memory_order_acquire); + // repair the next pointer before CAS, otherwise u->next == nullptr + // could lead to skipping elements + u->next.store(t, std::memory_order_consume); + // the replacement is ok even if ABA happens + if (top.compare_exchange_weak(t, u, std::memory_order_consume)) + { + u->refcnt.store(1, std::memory_order_relaxed); + break; + } + } + } + + bool push(Node *u) { + assert(u->addback.load() == false); + // attempt to push it + u->addback.store(true, std::memory_order_release); + release_ref(u); + return true; + } + + bool pop(Node *&r) { + bool loop = true; + while (loop) + { + auto u = top.load(std::memory_order_acquire); + /* the list is now empty */ + if (u == nullptr) return false; + auto t = u->refcnt.load(std::memory_order_acquire); + /* let's wait for another round if u is a ghost (already popped) */ + if (!t) continue; + /* otherwise t > 0, so with CAS, the invariant that zero refcnt can + * never be increased is guaranteed */ + if (u->refcnt.compare_exchange_weak(t, t + 1, std::memory_order_consume)) + { + /* here, nobody is able to change v->next (because v->next is + * only changed when pushed) even when ABA happens */ + auto v = u; + auto nv = u->next.load(std::memory_order_acquire); + if (top.compare_exchange_weak(v, nv, std::memory_order_consume)) + { + /* manage to pop the head */ + r = u; + loop = false; + /* do not need to try cas_push here because the current + * thread is the only one who can push u back */ + } + /* release the refcnt and execute the delayed push call if + * necessary */ + release_ref(u); + } + } + return true; + } +}; + +template<typename T> +class MPMCQueue { + struct Block: public FreeList::Node { + T elem; + std::atomic<Block *> next; + }; + + FreeList blks; + + alignas(cacheline_size) std::atomic<Block *> head; + alignas(cacheline_size) std::atomic<Block *> tail; + + template<typename U> + void _enqueue(Block *nblk, U &&e) { + new (&(nblk->elem)) T(std::forward<U>(e)); + nblk->next.store(nullptr, std::memory_order_release); + auto prev = tail.exchange(nblk, std::memory_order_relaxed); + prev->next.store(nblk, std::memory_order_relaxed); + } + + public: + MPMCQueue(const MPMCQueue &) = delete; + MPMCQueue(MPMCQueue &&) = delete; + + MPMCQueue(size_t capacity = 65536): head(new Block()), tail(head.load()) { + head.load()->next = nullptr; + while (capacity--) + blks.push(new Block()); + } + + ~MPMCQueue() { + for (FreeList::Node *ptr; blks.pop(ptr); ) delete ptr; + for (Block *ptr = head.load(), *nptr; ptr; ptr = nptr) + { + nptr = ptr->next; + delete ptr; + } + } + + template<typename U> + bool enqueue(U &&e) { + FreeList::Node * _nblk; + if (!blks.pop(_nblk)) _nblk = new Block(); + _enqueue(static_cast<Block *>(_nblk), std::forward<U>(e)); + return true; + } + + template<typename U> + bool try_enqueue(U &&e) { + FreeList::Node * _nblk; + if (!blks.pop(_nblk)) return false; + _enqueue(static_cast<Block *>(_nblk), std::forward<U>(e)); + return true; + } + + bool try_dequeue(T &e) { + for (;;) + { + auto h = head.load(std::memory_order_acquire); + auto t = h->refcnt.load(std::memory_order_relaxed); + if (!t) continue; + if (h->refcnt.compare_exchange_weak(t, t + 1, std::memory_order_consume)) + { + auto nh = h->next.load(std::memory_order_relaxed); + if (nh == nullptr) + { + blks.release_ref(h); + return false; + } + e = std::move(nh->elem); + auto hh = h; + if (head.compare_exchange_weak(hh, nh, std::memory_order_consume)) + { + blks.release_ref(h); + blks.push(h); + return true; + } + blks.release_ref(h); + } + } + } +}; + +} + +#endif |