aboutsummaryrefslogtreecommitdiff
path: root/include
diff options
context:
space:
mode:
Diffstat (limited to 'include')
-rw-r--r--include/salticidae/queue.h173
1 files changed, 123 insertions, 50 deletions
diff --git a/include/salticidae/queue.h b/include/salticidae/queue.h
index 0b4dae0..3df1529 100644
--- a/include/salticidae/queue.h
+++ b/include/salticidae/queue.h
@@ -4,6 +4,7 @@
#include <atomic>
#include <vector>
#include <cassert>
+#include <thread>
namespace salticidae {
@@ -82,11 +83,16 @@ class FreeList {
}
};
+const size_t MPMCQ_SIZE = 4096;
+
template<typename T>
class MPMCQueue {
protected:
struct Block: public FreeList::Node {
- T elem;
+ alignas(cacheline_size) std::atomic<uint32_t> head;
+ alignas(cacheline_size) std::atomic<uint32_t> tail;
+ T elem[MPMCQ_SIZE];
+ std::atomic<bool> avail[MPMCQ_SIZE];
std::atomic<Block *> next;
};
@@ -96,11 +102,52 @@ class MPMCQueue {
alignas(cacheline_size) std::atomic<Block *> tail;
template<typename U>
- void _enqueue(Block *nblk, U &&e) {
- new (&(nblk->elem)) T(std::forward<U>(e));
- nblk->next.store(nullptr, std::memory_order_relaxed);
- auto prev = tail.exchange(nblk, std::memory_order_acq_rel);
- prev->next.store(nblk, std::memory_order_relaxed);
+ bool _enqueue(U &&e, bool unbounded = true) {
+ for (;;)
+ {
+ auto t = tail.load(std::memory_order_relaxed);
+ auto tcnt = t->refcnt.load(std::memory_order_relaxed);
+ if (!tcnt) continue;
+ if (!t->refcnt.compare_exchange_weak(tcnt, tcnt + 1, std::memory_order_relaxed))
+ continue;
+ auto tt = t->tail.load(std::memory_order_relaxed);
+ if (tt >= MPMCQ_SIZE)
+ {
+ if (t->next.load(std::memory_order_relaxed) == nullptr)
+ {
+ FreeList::Node * _nblk;
+ if (!blks.pop(_nblk))
+ {
+ if (unbounded) _nblk = new Block();
+ else {
+ blks.release_ref(t);
+ return false;
+ }
+ }
+ auto nblk = static_cast<Block *>(_nblk);
+ nblk->head.store(0, std::memory_order_relaxed);
+ nblk->tail.store(0, std::memory_order_relaxed);
+ nblk->next.store(nullptr, std::memory_order_relaxed);
+ Block *tnext = nullptr;
+ if (!t->next.compare_exchange_weak(tnext, nblk, std::memory_order_acq_rel))
+ blks.push(_nblk);
+ else
+ tail.store(nblk, std::memory_order_relaxed);
+ }
+ blks.release_ref(t);
+ continue;
+ }
+ auto tt2 = tt;
+ if (t->tail.compare_exchange_weak(tt2, tt2 + 1, std::memory_order_relaxed))
+ {
+ new (&(t->elem[tt])) T(std::forward<U>(e));
+ t->avail[tt].store(true, std::memory_order_release);
+ blks.release_ref(t);
+ break;
+ }
+ blks.release_ref(t);
+ }
+ return true;
}
public:
@@ -108,7 +155,10 @@ class MPMCQueue {
MPMCQueue(MPMCQueue &&) = delete;
MPMCQueue(): head(new Block()), tail(head.load()) {
- head.load()->next = nullptr;
+ auto h = head.load();
+ h->head = h->tail = 0;
+ memset(h->avail, 0, sizeof(h->avail));
+ h->next = nullptr;
}
~MPMCQueue() {
@@ -121,54 +171,53 @@ class MPMCQueue {
}
void set_capacity(size_t capacity = 0) {
+ capacity = std::max(capacity / MPMCQ_SIZE, (size_t)1);
while (capacity--) blks.push(new Block());
}
template<typename U>
bool enqueue(U &&e, bool unbounded = true) {
- FreeList::Node * _nblk;
- if (!blks.pop(_nblk))
- {
- if (unbounded) _nblk = new Block();
- else return false;
- }
- _enqueue(static_cast<Block *>(_nblk), std::forward<U>(e));
- return true;
+ return _enqueue(e, unbounded);
}
template<typename U>
bool try_enqueue(U &&e) {
- FreeList::Node * _nblk;
- if (!blks.pop(_nblk)) return false;
- _enqueue(static_cast<Block *>(_nblk), std::forward<U>(e));
- return true;
+ return _enqueue(e, false);
}
bool try_dequeue(T &e) {
for (;;)
{
- auto h = head.load(std::memory_order_relaxed);
- auto t = h->refcnt.load(std::memory_order_relaxed);
- if (!t) continue;
- if (h->refcnt.compare_exchange_weak(t, t + 1, std::memory_order_relaxed))
+ auto h = this->head.load(std::memory_order_relaxed);
+ auto hcnt = h->refcnt.load(std::memory_order_relaxed);
+ if (!hcnt) continue;
+ if (!h->refcnt.compare_exchange_weak(hcnt, hcnt + 1, std::memory_order_relaxed))
+ continue;
+
+ auto hh = h->head.load(std::memory_order_relaxed);
+ auto tt = h->tail.load(std::memory_order_relaxed);
+ if (hh >= tt)
{
- auto nh = h->next.load(std::memory_order_relaxed);
- if (nh == nullptr)
- {
- blks.release_ref(h);
- return false;
- }
- e = std::move(nh->elem);
- auto hh = h;
- if (head.compare_exchange_weak(hh, nh, std::memory_order_relaxed))
- {
- blks.release_ref(h);
- blks.push(h);
- return true;
- }
+ if (tt < MPMCQ_SIZE) { blks.release_ref(h); return false; }
+ auto hnext = h->next.load(std::memory_order_relaxed);
+ if (hnext == nullptr) { blks.release_ref(h); return false; }
+ if (this->head.compare_exchange_weak(h, hnext, std::memory_order_relaxed))
+ this->blks.push(h);
blks.release_ref(h);
+ continue;
}
+ while (!h->avail[hh].load(std::memory_order_acquire))
+ std::this_thread::yield();
+ auto hh2 = hh;
+ if (h->head.compare_exchange_weak(hh2, hh2 + 1, std::memory_order_relaxed))
+ {
+ e = std::move(h->elem[hh]);
+ blks.release_ref(h);
+ break;
+ }
+ blks.release_ref(h);
}
+ return true;
}
};
@@ -178,25 +227,49 @@ struct MPSCQueue: public MPMCQueue<T> {
/* the same thread is calling the following functions */
bool try_dequeue(T &e) {
- auto h = this->head.load(std::memory_order_relaxed);
- auto nh = h->next.load(std::memory_order_relaxed);
- if (nh == nullptr) return false;
- e = std::move(nh->elem);
- this->head.store(nh, std::memory_order_relaxed);
- this->blks.push(h);
+ for (;;)
+ {
+ auto h = this->head.load(std::memory_order_relaxed);
+ auto hh = h->head.load(std::memory_order_relaxed);
+ auto tt = h->tail.load(std::memory_order_relaxed);
+ if (hh >= tt)
+ {
+ if (tt < MPMCQ_SIZE) return false;
+ auto hnext = h->next.load(std::memory_order_relaxed);
+ if (hnext == nullptr) return false;
+ this->head.store(hnext, std::memory_order_relaxed);
+ this->blks.push(h);
+ continue;
+ }
+ while (!h->avail[hh].load(std::memory_order_acquire))
+ std::this_thread::yield();
+ h->head.store(hh + 1, std::memory_order_relaxed);
+ e = std::move(h->elem[hh]);
+ h->avail[hh].store(false, std::memory_order_relaxed);
+ break;
+ }
return true;
}
template<typename U>
bool rewind(U &&e) {
- FreeList::Node * _nblk;
- if (!this->blks.pop(_nblk)) _nblk = new typename MPMCQueue<T>::Block();
-
- auto nblk = static_cast<typename MPMCQueue<T>::Block *>(_nblk);
auto h = this->head.load(std::memory_order_relaxed);
- new (&(h->elem)) T(std::forward<U>(e));
- nblk->next.store(h, std::memory_order_relaxed);
- this->head.store(nblk, std::memory_order_relaxed);
+ auto hh = h->head.load(std::memory_order_relaxed);
+ if (!hh)
+ {
+ FreeList::Node * _nblk;
+ if (!this->blks.pop(_nblk)) _nblk = new typename MPMCQueue<T>::Block();
+ auto nblk = static_cast<typename MPMCQueue<T>::Block *>(_nblk);
+ nblk->head.store(MPMCQ_SIZE, std::memory_order_relaxed);
+ nblk->tail.store(MPMCQ_SIZE, std::memory_order_relaxed);
+ nblk->next.store(h, std::memory_order_relaxed);
+ this->head.store(nblk, std::memory_order_relaxed);
+ }
+ h = this->head.load(std::memory_order_relaxed);
+ hh = h->head.load(std::memory_order_relaxed) - 1;
+ new (&(h->elem[hh])) T(std::forward<U>(e));
+ h->avail[hh].store(true, std::memory_order_relaxed);
+ h->head.store(hh, std::memory_order_relaxed);
return true;
}
};