aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/salticidae/queue.h16
-rw-r--r--test/test_queue.cpp18
2 files changed, 22 insertions, 12 deletions
diff --git a/include/salticidae/queue.h b/include/salticidae/queue.h
index 3df1529..0417750 100644
--- a/include/salticidae/queue.h
+++ b/include/salticidae/queue.h
@@ -105,7 +105,7 @@ class MPMCQueue {
bool _enqueue(U &&e, bool unbounded = true) {
for (;;)
{
- auto t = tail.load(std::memory_order_relaxed);
+ auto t = tail.load(std::memory_order_acquire);
auto tcnt = t->refcnt.load(std::memory_order_relaxed);
if (!tcnt) continue;
if (!t->refcnt.compare_exchange_weak(tcnt, tcnt + 1, std::memory_order_relaxed))
@@ -188,7 +188,7 @@ class MPMCQueue {
bool try_dequeue(T &e) {
for (;;)
{
- auto h = this->head.load(std::memory_order_relaxed);
+ auto h = this->head.load(std::memory_order_acquire);
auto hcnt = h->refcnt.load(std::memory_order_relaxed);
if (!hcnt) continue;
if (!h->refcnt.compare_exchange_weak(hcnt, hcnt + 1, std::memory_order_relaxed))
@@ -199,19 +199,21 @@ class MPMCQueue {
if (hh >= tt)
{
if (tt < MPMCQ_SIZE) { blks.release_ref(h); return false; }
- auto hnext = h->next.load(std::memory_order_relaxed);
+ auto hnext = h->next.load(std::memory_order_acquire);
if (hnext == nullptr) { blks.release_ref(h); return false; }
- if (this->head.compare_exchange_weak(h, hnext, std::memory_order_relaxed))
+ auto h2 = h;
+ if (this->head.compare_exchange_weak(h2, hnext, std::memory_order_acq_rel))
this->blks.push(h);
blks.release_ref(h);
continue;
}
- while (!h->avail[hh].load(std::memory_order_acquire))
- std::this_thread::yield();
auto hh2 = hh;
if (h->head.compare_exchange_weak(hh2, hh2 + 1, std::memory_order_relaxed))
{
+ while (!h->avail[hh].load(std::memory_order_acquire))
+ std::this_thread::yield();
e = std::move(h->elem[hh]);
+ h->avail[hh].store(false, std::memory_order_relaxed);
blks.release_ref(h);
break;
}
@@ -241,9 +243,9 @@ struct MPSCQueue: public MPMCQueue<T> {
this->blks.push(h);
continue;
}
+ h->head.store(hh + 1, std::memory_order_relaxed);
while (!h->avail[hh].load(std::memory_order_acquire))
std::this_thread::yield();
- h->head.store(hh + 1, std::memory_order_relaxed);
e = std::move(h->elem[hh]);
h->avail[hh].store(false, std::memory_order_relaxed);
break;
diff --git a/test/test_queue.cpp b/test/test_queue.cpp
index fac3bc7..11699c1 100644
--- a/test/test_queue.cpp
+++ b/test/test_queue.cpp
@@ -8,20 +8,25 @@
using salticidae::TimerEvent;
using salticidae::Config;
-void test_mpsc(int nproducers, int nops, size_t burst_size) {
+void test_mpsc(int nproducers, int nops, size_t burst_size, bool test_rewind) {
size_t total = nproducers * nops;
salticidae::EventContext ec;
std::atomic<size_t> collected(0);
using queue_t = salticidae::MPSCQueueEventDriven<int>;
queue_t q;
q.set_capacity(65536);
- q.reg_handler(ec, [&collected, burst_size](queue_t &q) {
+ q.reg_handler(ec, [&collected, burst_size, test_rewind](queue_t &q) {
size_t cnt = burst_size;
int x;
while (q.try_dequeue(x))
{
- printf("%d\n", x);
- collected.fetch_add(1);
+ if (test_rewind && (rand() & 1))
+ q.rewind(x);
+ else
+ {
+ collected.fetch_add(1);
+ printf("%d\n", x);
+ }
if (!--cnt) return true;
}
return false;
@@ -119,13 +124,16 @@ int main(int argc, char **argv) {
auto opt_nops = Config::OptValInt::create(100000);
auto opt_mpmc = Config::OptValFlag::create(false);
auto opt_help = Config::OptValFlag::create(false);
+ auto opt_rewind = Config::OptValFlag::create(false);
config.add_opt("nproducers", opt_nproducers, Config::SET_VAL);
config.add_opt("nconsumers", opt_nconsumers, Config::SET_VAL);
config.add_opt("burst-size", opt_burst_size, Config::SET_VAL);
config.add_opt("nops", opt_nops, Config::SET_VAL);
config.add_opt("mpmc", opt_mpmc, Config::SWITCH_ON);
+ config.add_opt("rewind", opt_rewind, Config::SWITCH_ON);
config.add_opt("help", opt_help, Config::SWITCH_ON, 'h', "show this help info");
config.parse(argc, argv);
+ srand(time(0));
if (opt_help->get())
{
config.print_help();
@@ -136,7 +144,7 @@ int main(int argc, char **argv) {
{
SALTICIDAE_LOG_INFO("testing an MPSC queue...");
test_mpsc(opt_nproducers->get(), opt_nops->get(),
- opt_burst_size->get());
+ opt_burst_size->get(), opt_rewind->get());
}
else
{