aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/salticidae/queue.h173
-rw-r--r--test/test_queue.cpp14
2 files changed, 132 insertions, 55 deletions
diff --git a/include/salticidae/queue.h b/include/salticidae/queue.h
index 0b4dae0..3df1529 100644
--- a/include/salticidae/queue.h
+++ b/include/salticidae/queue.h
@@ -4,6 +4,7 @@
#include <atomic>
#include <vector>
#include <cassert>
+#include <thread>
namespace salticidae {
@@ -82,11 +83,16 @@ class FreeList {
}
};
+const size_t MPMCQ_SIZE = 4096;
+
template<typename T>
class MPMCQueue {
protected:
struct Block: public FreeList::Node {
- T elem;
+ alignas(cacheline_size) std::atomic<uint32_t> head;
+ alignas(cacheline_size) std::atomic<uint32_t> tail;
+ T elem[MPMCQ_SIZE];
+ std::atomic<bool> avail[MPMCQ_SIZE];
std::atomic<Block *> next;
};
@@ -96,11 +102,52 @@ class MPMCQueue {
alignas(cacheline_size) std::atomic<Block *> tail;
template<typename U>
- void _enqueue(Block *nblk, U &&e) {
- new (&(nblk->elem)) T(std::forward<U>(e));
- nblk->next.store(nullptr, std::memory_order_relaxed);
- auto prev = tail.exchange(nblk, std::memory_order_acq_rel);
- prev->next.store(nblk, std::memory_order_relaxed);
+ bool _enqueue(U &&e, bool unbounded = true) {
+ for (;;)
+ {
+ auto t = tail.load(std::memory_order_relaxed);
+ auto tcnt = t->refcnt.load(std::memory_order_relaxed);
+ if (!tcnt) continue;
+ if (!t->refcnt.compare_exchange_weak(tcnt, tcnt + 1, std::memory_order_relaxed))
+ continue;
+ auto tt = t->tail.load(std::memory_order_relaxed);
+ if (tt >= MPMCQ_SIZE)
+ {
+ if (t->next.load(std::memory_order_relaxed) == nullptr)
+ {
+ FreeList::Node * _nblk;
+ if (!blks.pop(_nblk))
+ {
+ if (unbounded) _nblk = new Block();
+ else {
+ blks.release_ref(t);
+ return false;
+ }
+ }
+ auto nblk = static_cast<Block *>(_nblk);
+ nblk->head.store(0, std::memory_order_relaxed);
+ nblk->tail.store(0, std::memory_order_relaxed);
+ nblk->next.store(nullptr, std::memory_order_relaxed);
+ Block *tnext = nullptr;
+ if (!t->next.compare_exchange_weak(tnext, nblk, std::memory_order_acq_rel))
+ blks.push(_nblk);
+ else
+ tail.store(nblk, std::memory_order_relaxed);
+ }
+ blks.release_ref(t);
+ continue;
+ }
+ auto tt2 = tt;
+ if (t->tail.compare_exchange_weak(tt2, tt2 + 1, std::memory_order_relaxed))
+ {
+ new (&(t->elem[tt])) T(std::forward<U>(e));
+ t->avail[tt].store(true, std::memory_order_release);
+ blks.release_ref(t);
+ break;
+ }
+ blks.release_ref(t);
+ }
+ return true;
}
public:
@@ -108,7 +155,10 @@ class MPMCQueue {
MPMCQueue(MPMCQueue &&) = delete;
MPMCQueue(): head(new Block()), tail(head.load()) {
- head.load()->next = nullptr;
+ auto h = head.load();
+ h->head = h->tail = 0;
+ memset(h->avail, 0, sizeof(h->avail));
+ h->next = nullptr;
}
~MPMCQueue() {
@@ -121,54 +171,53 @@ class MPMCQueue {
}
void set_capacity(size_t capacity = 0) {
+ capacity = std::max(capacity / MPMCQ_SIZE, (size_t)1);
while (capacity--) blks.push(new Block());
}
template<typename U>
bool enqueue(U &&e, bool unbounded = true) {
- FreeList::Node * _nblk;
- if (!blks.pop(_nblk))
- {
- if (unbounded) _nblk = new Block();
- else return false;
- }
- _enqueue(static_cast<Block *>(_nblk), std::forward<U>(e));
- return true;
+ return _enqueue(e, unbounded);
}
template<typename U>
bool try_enqueue(U &&e) {
- FreeList::Node * _nblk;
- if (!blks.pop(_nblk)) return false;
- _enqueue(static_cast<Block *>(_nblk), std::forward<U>(e));
- return true;
+ return _enqueue(e, false);
}
bool try_dequeue(T &e) {
for (;;)
{
- auto h = head.load(std::memory_order_relaxed);
- auto t = h->refcnt.load(std::memory_order_relaxed);
- if (!t) continue;
- if (h->refcnt.compare_exchange_weak(t, t + 1, std::memory_order_relaxed))
+ auto h = this->head.load(std::memory_order_relaxed);
+ auto hcnt = h->refcnt.load(std::memory_order_relaxed);
+ if (!hcnt) continue;
+ if (!h->refcnt.compare_exchange_weak(hcnt, hcnt + 1, std::memory_order_relaxed))
+ continue;
+
+ auto hh = h->head.load(std::memory_order_relaxed);
+ auto tt = h->tail.load(std::memory_order_relaxed);
+ if (hh >= tt)
{
- auto nh = h->next.load(std::memory_order_relaxed);
- if (nh == nullptr)
- {
- blks.release_ref(h);
- return false;
- }
- e = std::move(nh->elem);
- auto hh = h;
- if (head.compare_exchange_weak(hh, nh, std::memory_order_relaxed))
- {
- blks.release_ref(h);
- blks.push(h);
- return true;
- }
+ if (tt < MPMCQ_SIZE) { blks.release_ref(h); return false; }
+ auto hnext = h->next.load(std::memory_order_relaxed);
+ if (hnext == nullptr) { blks.release_ref(h); return false; }
+ if (this->head.compare_exchange_weak(h, hnext, std::memory_order_relaxed))
+ this->blks.push(h);
blks.release_ref(h);
+ continue;
}
+ while (!h->avail[hh].load(std::memory_order_acquire))
+ std::this_thread::yield();
+ auto hh2 = hh;
+ if (h->head.compare_exchange_weak(hh2, hh2 + 1, std::memory_order_relaxed))
+ {
+ e = std::move(h->elem[hh]);
+ blks.release_ref(h);
+ break;
+ }
+ blks.release_ref(h);
}
+ return true;
}
};
@@ -178,25 +227,49 @@ struct MPSCQueue: public MPMCQueue<T> {
/* the same thread is calling the following functions */
bool try_dequeue(T &e) {
- auto h = this->head.load(std::memory_order_relaxed);
- auto nh = h->next.load(std::memory_order_relaxed);
- if (nh == nullptr) return false;
- e = std::move(nh->elem);
- this->head.store(nh, std::memory_order_relaxed);
- this->blks.push(h);
+ for (;;)
+ {
+ auto h = this->head.load(std::memory_order_relaxed);
+ auto hh = h->head.load(std::memory_order_relaxed);
+ auto tt = h->tail.load(std::memory_order_relaxed);
+ if (hh >= tt)
+ {
+ if (tt < MPMCQ_SIZE) return false;
+ auto hnext = h->next.load(std::memory_order_relaxed);
+ if (hnext == nullptr) return false;
+ this->head.store(hnext, std::memory_order_relaxed);
+ this->blks.push(h);
+ continue;
+ }
+ while (!h->avail[hh].load(std::memory_order_acquire))
+ std::this_thread::yield();
+ h->head.store(hh + 1, std::memory_order_relaxed);
+ e = std::move(h->elem[hh]);
+ h->avail[hh].store(false, std::memory_order_relaxed);
+ break;
+ }
return true;
}
template<typename U>
bool rewind(U &&e) {
- FreeList::Node * _nblk;
- if (!this->blks.pop(_nblk)) _nblk = new typename MPMCQueue<T>::Block();
-
- auto nblk = static_cast<typename MPMCQueue<T>::Block *>(_nblk);
auto h = this->head.load(std::memory_order_relaxed);
- new (&(h->elem)) T(std::forward<U>(e));
- nblk->next.store(h, std::memory_order_relaxed);
- this->head.store(nblk, std::memory_order_relaxed);
+ auto hh = h->head.load(std::memory_order_relaxed);
+ if (!hh)
+ {
+ FreeList::Node * _nblk;
+ if (!this->blks.pop(_nblk)) _nblk = new typename MPMCQueue<T>::Block();
+ auto nblk = static_cast<typename MPMCQueue<T>::Block *>(_nblk);
+ nblk->head.store(MPMCQ_SIZE, std::memory_order_relaxed);
+ nblk->tail.store(MPMCQ_SIZE, std::memory_order_relaxed);
+ nblk->next.store(h, std::memory_order_relaxed);
+ this->head.store(nblk, std::memory_order_relaxed);
+ }
+ h = this->head.load(std::memory_order_relaxed);
+ hh = h->head.load(std::memory_order_relaxed) - 1;
+ new (&(h->elem[hh])) T(std::forward<U>(e));
+ h->avail[hh].store(true, std::memory_order_relaxed);
+ h->head.store(hh, std::memory_order_relaxed);
return true;
}
};
diff --git a/test/test_queue.cpp b/test/test_queue.cpp
index c565db3..5c56585 100644
--- a/test/test_queue.cpp
+++ b/test/test_queue.cpp
@@ -14,6 +14,7 @@ void test_mpsc(int nproducers, int nops, size_t burst_size) {
std::atomic<size_t> collected(0);
using queue_t = salticidae::MPSCQueueEventDriven<int>;
queue_t q;
+ q.set_capacity(65536);
q.reg_handler(ec, [&collected, burst_size](queue_t &q) {
size_t cnt = burst_size;
int x;
@@ -34,6 +35,7 @@ void test_mpsc(int nproducers, int nops, size_t burst_size) {
timer.add(1);
ec.dispatch();
});
+
for (int i = 0; i < nproducers; i++)
{
producers.emplace(producers.end(), std::thread([&q, nops, i, nproducers]() {
@@ -41,7 +43,8 @@ void test_mpsc(int nproducers, int nops, size_t burst_size) {
for (int j = 0; j < nops; j++)
{
//usleep(rand() / double(RAND_MAX) * 100);
- q.enqueue(x);
+ while (!q.try_enqueue(x))
+ std::this_thread::yield();
x += nproducers;
}
}));
@@ -52,7 +55,7 @@ void test_mpsc(int nproducers, int nops, size_t burst_size) {
SALTICIDAE_LOG_INFO("consumers terminate");
}
-void test_mpmc(int nproducers, int nconsumers, int nops, size_t burst_size) {
+/*void test_mpmc(int nproducers, int nconsumers, int nops, size_t burst_size) {
size_t total = nproducers * nops;
using queue_t = salticidae::MPMCQueueEventDriven<int>;
queue_t q;
@@ -95,7 +98,7 @@ void test_mpmc(int nproducers, int nconsumers, int nops, size_t burst_size) {
for (int j = 0; j < nops; j++)
{
//usleep(rand() / double(RAND_MAX) * 100);
- q.enqueue(x);
+ q.try_enqueue(x);
x += nproducers;
}
}));
@@ -105,6 +108,7 @@ void test_mpmc(int nproducers, int nconsumers, int nops, size_t burst_size) {
for (auto &t: consumers) t.join();
SALTICIDAE_LOG_INFO("consumers terminate");
}
+*/
int main(int argc, char **argv) {
Config config;
@@ -136,8 +140,8 @@ int main(int argc, char **argv) {
else
{
SALTICIDAE_LOG_INFO("testing an MPMC queue...");
- test_mpmc(opt_nproducers->get(), opt_nconsumers->get(),
- opt_nops->get(), opt_burst_size->get());
+ //test_mpmc(opt_nproducers->get(), opt_nconsumers->get(),
+ // opt_nops->get(), opt_burst_size->get());
}
return 0;
}