From aaa5ab0a55c0b9758e59883dcffae248c6c52564 Mon Sep 17 00:00:00 2001 From: Determinant Date: Wed, 3 Apr 2019 01:58:28 -0400 Subject: fix bugs; clean up memory order for atomic ops --- include/salticidae/queue.h | 42 +++++++++++++++++------------------------- 1 file changed, 17 insertions(+), 25 deletions(-) (limited to 'include/salticidae/queue.h') diff --git a/include/salticidae/queue.h b/include/salticidae/queue.h index 598303c..a72a28f 100644 --- a/include/salticidae/queue.h +++ b/include/salticidae/queue.h @@ -13,9 +13,8 @@ class FreeList { public: struct Node { std::atomic next; - std::atomic addback; std::atomic refcnt; - Node(): next(nullptr), addback(false), refcnt(1) {} + Node(): next(nullptr), refcnt(1) {} }; private: @@ -28,16 +27,14 @@ class FreeList { void release_ref(Node *u) { if (u->refcnt.fetch_sub(1, std::memory_order_relaxed) != 1) return; - u->addback.store(false, std::memory_order_relaxed); - //if (!u->addback.exchange(false, std::memory_order_relaxed)) return; for (;;) { - auto t = top.load(std::memory_order_acquire); + auto t = top.load(std::memory_order_relaxed); // repair the next pointer before CAS, otherwise u->next == nullptr // could lead to skipping elements - u->next.store(t, std::memory_order_consume); + u->next.store(t, std::memory_order_relaxed); // the replacement is ok even if ABA happens - if (top.compare_exchange_weak(t, u, std::memory_order_consume)) + if (top.compare_exchange_weak(t, u, std::memory_order_release)) { u->refcnt.store(1, std::memory_order_relaxed); break; @@ -46,9 +43,6 @@ class FreeList { } bool push(Node *u) { - assert(u->addback.load() == false); - // attempt to push it - u->addback.store(true, std::memory_order_release); release_ref(u); return true; } @@ -60,18 +54,18 @@ class FreeList { auto u = top.load(std::memory_order_acquire); /* the list is now empty */ if (u == nullptr) return false; - auto t = u->refcnt.load(std::memory_order_acquire); + auto t = u->refcnt.load(std::memory_order_relaxed); /* let's wait for another round if u is a ghost (already popped) */ if (!t) continue; /* otherwise t > 0, so with CAS, the invariant that zero refcnt can * never be increased is guaranteed */ - if (u->refcnt.compare_exchange_weak(t, t + 1, std::memory_order_consume)) + if (u->refcnt.compare_exchange_weak(t, t + 1, std::memory_order_relaxed)) { /* here, nobody is able to change v->next (because v->next is * only changed when pushed) even when ABA happens */ auto v = u; - auto nv = u->next.load(std::memory_order_acquire); - if (top.compare_exchange_weak(v, nv, std::memory_order_consume)) + auto nv = u->next.load(std::memory_order_relaxed); + if (top.compare_exchange_weak(v, nv, std::memory_order_relaxed)) { /* manage to pop the head */ r = u; @@ -104,7 +98,6 @@ class MPMCQueue { template void _enqueue(Block *nblk, U &&e) { new (&(nblk->elem)) T(std::forward(e)); - std::atomic_thread_fence(std::memory_order_release); nblk->next.store(nullptr, std::memory_order_relaxed); auto prev = tail.exchange(nblk, std::memory_order_acq_rel); prev->next.store(nblk, std::memory_order_relaxed); @@ -151,7 +144,7 @@ class MPMCQueue { auto h = head.load(std::memory_order_relaxed); auto t = h->refcnt.load(std::memory_order_relaxed); if (!t) continue; - if (h->refcnt.compare_exchange_weak(t, t + 1, std::memory_order_consume)) + if (h->refcnt.compare_exchange_weak(t, t + 1, std::memory_order_relaxed)) { auto nh = h->next.load(std::memory_order_relaxed); if (nh == nullptr) @@ -159,10 +152,9 @@ class MPMCQueue { blks.release_ref(h); return false; } - std::atomic_thread_fence(std::memory_order_acquire); e = std::move(nh->elem); auto hh = h; - if (head.compare_exchange_weak(hh, nh, std::memory_order_consume)) + if (head.compare_exchange_weak(hh, nh, std::memory_order_relaxed)) { blks.release_ref(h); blks.push(h); @@ -177,13 +169,13 @@ class MPMCQueue { template struct MPSCQueue: public MPMCQueue { using MPMCQueue::MPMCQueue; + /* the same thread is calling the following functions */ + bool try_dequeue(T &e) { auto h = this->head.load(std::memory_order_relaxed); auto nh = h->next.load(std::memory_order_relaxed); - std::atomic_thread_fence(std::memory_order_acquire); if (nh == nullptr) return false; e = std::move(nh->elem); - std::atomic_thread_fence(std::memory_order_release); this->head.store(nh, std::memory_order_relaxed); this->blks.push(h); return true; @@ -192,13 +184,13 @@ struct MPSCQueue: public MPMCQueue { template bool rewind(U &&e) { FreeList::Node * _nblk; - if (!this->blks.pop(_nblk)) return false; + if (!this->blks.pop(_nblk)) _nblk = new typename MPMCQueue::Block(); + auto nblk = static_cast::Block *>(_nblk); - auto h = this->head.load(std::memory_order_acquire); - nblk->next.store(h, std::memory_order_release); + auto h = this->head.load(std::memory_order_relaxed); new (&(h->elem)) T(std::forward(e)); - std::atomic_thread_fence(std::memory_order_release); - this->head.store(nblk, std::memory_order_release); + nblk->next.store(h, std::memory_order_relaxed); + this->head.store(nblk, std::memory_order_relaxed); return true; } }; -- cgit v1.2.3