diff options
-rw-r--r-- | include/salticidae/queue.h | 16 | ||||
-rw-r--r-- | test/test_queue.cpp | 18 |
2 files changed, 22 insertions, 12 deletions
diff --git a/include/salticidae/queue.h b/include/salticidae/queue.h index 3df1529..0417750 100644 --- a/include/salticidae/queue.h +++ b/include/salticidae/queue.h @@ -105,7 +105,7 @@ class MPMCQueue { bool _enqueue(U &&e, bool unbounded = true) { for (;;) { - auto t = tail.load(std::memory_order_relaxed); + auto t = tail.load(std::memory_order_acquire); auto tcnt = t->refcnt.load(std::memory_order_relaxed); if (!tcnt) continue; if (!t->refcnt.compare_exchange_weak(tcnt, tcnt + 1, std::memory_order_relaxed)) @@ -188,7 +188,7 @@ class MPMCQueue { bool try_dequeue(T &e) { for (;;) { - auto h = this->head.load(std::memory_order_relaxed); + auto h = this->head.load(std::memory_order_acquire); auto hcnt = h->refcnt.load(std::memory_order_relaxed); if (!hcnt) continue; if (!h->refcnt.compare_exchange_weak(hcnt, hcnt + 1, std::memory_order_relaxed)) @@ -199,19 +199,21 @@ class MPMCQueue { if (hh >= tt) { if (tt < MPMCQ_SIZE) { blks.release_ref(h); return false; } - auto hnext = h->next.load(std::memory_order_relaxed); + auto hnext = h->next.load(std::memory_order_acquire); if (hnext == nullptr) { blks.release_ref(h); return false; } - if (this->head.compare_exchange_weak(h, hnext, std::memory_order_relaxed)) + auto h2 = h; + if (this->head.compare_exchange_weak(h2, hnext, std::memory_order_acq_rel)) this->blks.push(h); blks.release_ref(h); continue; } - while (!h->avail[hh].load(std::memory_order_acquire)) - std::this_thread::yield(); auto hh2 = hh; if (h->head.compare_exchange_weak(hh2, hh2 + 1, std::memory_order_relaxed)) { + while (!h->avail[hh].load(std::memory_order_acquire)) + std::this_thread::yield(); e = std::move(h->elem[hh]); + h->avail[hh].store(false, std::memory_order_relaxed); blks.release_ref(h); break; } @@ -241,9 +243,9 @@ struct MPSCQueue: public MPMCQueue<T> { this->blks.push(h); continue; } + h->head.store(hh + 1, std::memory_order_relaxed); while (!h->avail[hh].load(std::memory_order_acquire)) std::this_thread::yield(); - h->head.store(hh + 1, std::memory_order_relaxed); e = std::move(h->elem[hh]); h->avail[hh].store(false, std::memory_order_relaxed); break; diff --git a/test/test_queue.cpp b/test/test_queue.cpp index fac3bc7..11699c1 100644 --- a/test/test_queue.cpp +++ b/test/test_queue.cpp @@ -8,20 +8,25 @@ using salticidae::TimerEvent; using salticidae::Config; -void test_mpsc(int nproducers, int nops, size_t burst_size) { +void test_mpsc(int nproducers, int nops, size_t burst_size, bool test_rewind) { size_t total = nproducers * nops; salticidae::EventContext ec; std::atomic<size_t> collected(0); using queue_t = salticidae::MPSCQueueEventDriven<int>; queue_t q; q.set_capacity(65536); - q.reg_handler(ec, [&collected, burst_size](queue_t &q) { + q.reg_handler(ec, [&collected, burst_size, test_rewind](queue_t &q) { size_t cnt = burst_size; int x; while (q.try_dequeue(x)) { - printf("%d\n", x); - collected.fetch_add(1); + if (test_rewind && (rand() & 1)) + q.rewind(x); + else + { + collected.fetch_add(1); + printf("%d\n", x); + } if (!--cnt) return true; } return false; @@ -119,13 +124,16 @@ int main(int argc, char **argv) { auto opt_nops = Config::OptValInt::create(100000); auto opt_mpmc = Config::OptValFlag::create(false); auto opt_help = Config::OptValFlag::create(false); + auto opt_rewind = Config::OptValFlag::create(false); config.add_opt("nproducers", opt_nproducers, Config::SET_VAL); config.add_opt("nconsumers", opt_nconsumers, Config::SET_VAL); config.add_opt("burst-size", opt_burst_size, Config::SET_VAL); config.add_opt("nops", opt_nops, Config::SET_VAL); config.add_opt("mpmc", opt_mpmc, Config::SWITCH_ON); + config.add_opt("rewind", opt_rewind, Config::SWITCH_ON); config.add_opt("help", opt_help, Config::SWITCH_ON, 'h', "show this help info"); config.parse(argc, argv); + srand(time(0)); if (opt_help->get()) { config.print_help(); @@ -136,7 +144,7 @@ int main(int argc, char **argv) { { SALTICIDAE_LOG_INFO("testing an MPSC queue..."); test_mpsc(opt_nproducers->get(), opt_nops->get(), - opt_burst_size->get()); + opt_burst_size->get(), opt_rewind->get()); } else { |