aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/salticidae/queue.h20
-rw-r--r--test/test_queue.cpp2
2 files changed, 16 insertions, 6 deletions
diff --git a/include/salticidae/queue.h b/include/salticidae/queue.h
index 0417750..da11e8a 100644
--- a/include/salticidae/queue.h
+++ b/include/salticidae/queue.h
@@ -15,7 +15,8 @@ class FreeList {
struct Node {
std::atomic<Node *> next;
std::atomic<size_t> refcnt;
- Node(): next(nullptr), refcnt(1) {}
+ std::atomic<bool> freed;
+ Node(): next(nullptr), refcnt(1), freed(false) {}
};
private:
@@ -37,15 +38,16 @@ class FreeList {
// the replacement is ok even if ABA happens
if (top.compare_exchange_weak(t, u, std::memory_order_release))
{
+ // must have seen freed == true
u->refcnt.store(1, std::memory_order_relaxed);
break;
}
}
}
- bool push(Node *u) {
+ inline void push(Node *u) {
+ u->freed.store(true, std::memory_order_relaxed);
release_ref(u);
- return true;
}
bool pop(Node *&r) {
@@ -110,6 +112,11 @@ class MPMCQueue {
if (!tcnt) continue;
if (!t->refcnt.compare_exchange_weak(tcnt, tcnt + 1, std::memory_order_relaxed))
continue;
+ if (t->freed.load(std::memory_order_relaxed))
+ {
+ blks.release_ref(t);
+ continue;
+ }
auto tt = t->tail.load(std::memory_order_relaxed);
if (tt >= MPMCQ_SIZE)
{
@@ -130,9 +137,12 @@ class MPMCQueue {
nblk->next.store(nullptr, std::memory_order_relaxed);
Block *tnext = nullptr;
if (!t->next.compare_exchange_weak(tnext, nblk, std::memory_order_acq_rel))
- blks.push(_nblk);
+ blks.push(nblk);
else
- tail.store(nblk, std::memory_order_relaxed);
+ {
+ tail.store(nblk, std::memory_order_release);
+ nblk->freed.store(false, std::memory_order_release);
+ }
}
blks.release_ref(t);
continue;
diff --git a/test/test_queue.cpp b/test/test_queue.cpp
index 11699c1..5c32dac 100644
--- a/test/test_queue.cpp
+++ b/test/test_queue.cpp
@@ -69,7 +69,7 @@ void test_mpmc(int nproducers, int nconsumers, int nops, size_t burst_size) {
std::vector<salticidae::EventContext> ecs;
std::atomic<size_t> collected(0);
ecs.resize(nconsumers);
- q.set_capacity(65536 * nproducers);
+ q.set_capacity(65536);
for (int i = 0; i < nconsumers; i++)
{
q.reg_handler(ecs[i], [&collected, burst_size](queue_t &q) {