diff options
-rw-r--r-- | include/salticidae/queue.h | 20 | ||||
-rw-r--r-- | test/test_queue.cpp | 2 |
2 files changed, 16 insertions, 6 deletions
diff --git a/include/salticidae/queue.h b/include/salticidae/queue.h index 0417750..da11e8a 100644 --- a/include/salticidae/queue.h +++ b/include/salticidae/queue.h @@ -15,7 +15,8 @@ class FreeList { struct Node { std::atomic<Node *> next; std::atomic<size_t> refcnt; - Node(): next(nullptr), refcnt(1) {} + std::atomic<bool> freed; + Node(): next(nullptr), refcnt(1), freed(false) {} }; private: @@ -37,15 +38,16 @@ class FreeList { // the replacement is ok even if ABA happens if (top.compare_exchange_weak(t, u, std::memory_order_release)) { + // must have seen freed == true u->refcnt.store(1, std::memory_order_relaxed); break; } } } - bool push(Node *u) { + inline void push(Node *u) { + u->freed.store(true, std::memory_order_relaxed); release_ref(u); - return true; } bool pop(Node *&r) { @@ -110,6 +112,11 @@ class MPMCQueue { if (!tcnt) continue; if (!t->refcnt.compare_exchange_weak(tcnt, tcnt + 1, std::memory_order_relaxed)) continue; + if (t->freed.load(std::memory_order_relaxed)) + { + blks.release_ref(t); + continue; + } auto tt = t->tail.load(std::memory_order_relaxed); if (tt >= MPMCQ_SIZE) { @@ -130,9 +137,12 @@ class MPMCQueue { nblk->next.store(nullptr, std::memory_order_relaxed); Block *tnext = nullptr; if (!t->next.compare_exchange_weak(tnext, nblk, std::memory_order_acq_rel)) - blks.push(_nblk); + blks.push(nblk); else - tail.store(nblk, std::memory_order_relaxed); + { + tail.store(nblk, std::memory_order_release); + nblk->freed.store(false, std::memory_order_release); + } } blks.release_ref(t); continue; diff --git a/test/test_queue.cpp b/test/test_queue.cpp index 11699c1..5c32dac 100644 --- a/test/test_queue.cpp +++ b/test/test_queue.cpp @@ -69,7 +69,7 @@ void test_mpmc(int nproducers, int nconsumers, int nops, size_t burst_size) { std::vector<salticidae::EventContext> ecs; std::atomic<size_t> collected(0); ecs.resize(nconsumers); - q.set_capacity(65536 * nproducers); + q.set_capacity(65536); for (int i = 0; i < nconsumers; i++) { q.reg_handler(ecs[i], [&collected, burst_size](queue_t &q) { |