aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2018-11-21 18:15:14 -0500
committerDeterminant <[email protected]>2018-11-21 18:15:14 -0500
commit27304354844c8baf8c14ef923e0ab65bc4df2dc0 (patch)
tree0910f11ab364f33ac204841e65978c1cfb188936
parent2b1f6791ddfd8ef4fb21cb4b50a3d6bc86945867 (diff)
impl MPMCQueueEventDriven
-rw-r--r--include/salticidae/event.h51
-rw-r--r--test/test_queue.cpp79
2 files changed, 110 insertions, 20 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index bac707c..649cfdd 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -589,6 +589,57 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
}
};
+template<typename T>
+class MPMCQueueEventDriven: public MPMCQueue<T> {
+ private:
+ const uint64_t dummy = 1;
+ std::atomic<bool> wait_sig;
+ int fd;
+ std::vector<FdEvent> evs;
+
+ public:
+ MPMCQueueEventDriven(size_t capacity = 65536):
+ MPMCQueue<T>(capacity),
+ wait_sig(true),
+ fd(eventfd(0, EFD_NONBLOCK)) {}
+
+ ~MPMCQueueEventDriven() {
+ evs.clear();
+ close(fd);
+ }
+
+ // this function is *NOT* thread-safe
+ template<typename Func>
+ void reg_handler(const EventContext &ec, Func &&func) {
+ FdEvent ev(ec, fd, [this, func=std::forward<Func>(func)](int, int) {
+ //fprintf(stderr, "%x\n", std::this_thread::get_id());
+ uint64_t t;
+ if (read(fd, &t, 8) != 8) return;
+ // only one consumer should be here a a time
+ wait_sig.store(true, std::memory_order_release);
+ if (func(*this))
+ write(fd, &dummy, 8);
+ });
+ ev.add(FdEvent::READ);
+ evs.push_back(std::move(ev));
+ }
+
+ void unreg_handlers() { evs.clear(); }
+
+ template<typename U>
+ bool enqueue(U &&e) {
+ static const uint64_t dummy = 1;
+ bool ret = MPMCQueue<T>::enqueue(std::forward<U>(e));
+ // memory barrier here, so any load/store in enqueue must be finialized
+ if (wait_sig.exchange(false, std::memory_order_acq_rel))
+ {
+ SALTICIDAE_LOG_DEBUG("mpsc notify");
+ write(fd, &dummy, 8);
+ }
+ return ret;
+ }
+};
+
}
#endif
diff --git a/test/test_queue.cpp b/test/test_queue.cpp
index 7b06951..c565db3 100644
--- a/test/test_queue.cpp
+++ b/test/test_queue.cpp
@@ -3,10 +3,12 @@
#include <atomic>
#include "salticidae/event.h"
+#include "salticidae/util.h"
using salticidae::TimerEvent;
+using salticidae::Config;
-void test_mpsc(int nproducers = 16, int nops = 100000, size_t burst_size = 128) {
+void test_mpsc(int nproducers, int nops, size_t burst_size) {
size_t total = nproducers * nops;
salticidae::EventContext ec;
std::atomic<size_t> collected(0);
@@ -25,8 +27,9 @@ void test_mpsc(int nproducers = 16, int nops = 100000, size_t burst_size = 128)
});
std::vector<std::thread> producers;
std::thread consumer([&collected, total, &ec]() {
- TimerEvent timer(ec, [&ec, &collected, total](TimerEvent &) {
+ TimerEvent timer(ec, [&ec, &collected, total](TimerEvent &timer) {
if (collected.load() == total) ec.stop();
+ timer.add(1);
});
timer.add(1);
ec.dispatch();
@@ -44,15 +47,15 @@ void test_mpsc(int nproducers = 16, int nops = 100000, size_t burst_size = 128)
}));
}
for (auto &t: producers) t.join();
- fprintf(stderr, "producers terminate\n");
+ SALTICIDAE_LOG_INFO("producers terminate");
consumer.join();
- fprintf(stderr, "consumers terminate\n");
+ SALTICIDAE_LOG_INFO("consumers terminate");
}
-/*
-void test_mpmc(int nproducers = 16, int nconsumers = 4, int nops = 100000) {
+void test_mpmc(int nproducers, int nconsumers, int nops, size_t burst_size) {
size_t total = nproducers * nops;
- salticidae::MPMCQueueEventDriven<int> q;
+ using queue_t = salticidae::MPMCQueueEventDriven<int>;
+ queue_t q;
std::vector<std::thread> producers;
std::vector<std::thread> consumers;
std::vector<salticidae::EventContext> ecs;
@@ -60,21 +63,28 @@ void test_mpmc(int nproducers = 16, int nconsumers = 4, int nops = 100000) {
ecs.resize(nconsumers);
for (int i = 0; i < nconsumers; i++)
{
- q.listen(ecs[i], [&collected](int x) {
- //usleep(10);
- printf("%d\n", x);
- collected.fetch_add(1);
+ q.reg_handler(ecs[i], [&collected, burst_size](queue_t &q) {
+ size_t cnt = burst_size;
+ int x;
+ while (q.try_dequeue(x))
+ {
+ //usleep(10);
+ printf("%d\n", x);
+ collected.fetch_add(1);
+ if (!--cnt) return true;
+ }
+ return false;
});
}
for (int i = 0; i < nconsumers; i++)
{
consumers.emplace(consumers.end(), std::thread(
[&collected, total, &ec = ecs[i]]() {
- salticidae::Event timer(ec, -1, EV_TIMEOUT | EV_PERSIST,
- [&ec, &collected, total](int, short) {
+ TimerEvent timer(ec, [&ec, &collected, total](TimerEvent &timer) {
if (collected.load() == total) ec.stop();
+ timer.add(1);
});
- timer.add_with_timeout(1);
+ timer.add(1);
ec.dispatch();
}));
}
@@ -91,14 +101,43 @@ void test_mpmc(int nproducers = 16, int nconsumers = 4, int nops = 100000) {
}));
}
for (auto &t: producers) t.join();
- fprintf(stderr, "producers terminate\n");
+ SALTICIDAE_LOG_INFO("producers terminate");
for (auto &t: consumers) t.join();
- fprintf(stderr, "consumers terminate\n");
+ SALTICIDAE_LOG_INFO("consumers terminate");
}
-*/
-int main() {
- test_mpsc();
- //test_mpmc();
+int main(int argc, char **argv) {
+ Config config;
+ auto opt_nproducers = Config::OptValInt::create(16);
+ auto opt_nconsumers = Config::OptValInt::create(4);
+ auto opt_burst_size = Config::OptValInt::create(128);
+ auto opt_nops = Config::OptValInt::create(100000);
+ auto opt_mpmc = Config::OptValFlag::create(false);
+ auto opt_help = Config::OptValFlag::create(false);
+ config.add_opt("nproducers", opt_nproducers, Config::SET_VAL);
+ config.add_opt("nconsumers", opt_nconsumers, Config::SET_VAL);
+ config.add_opt("burst-size", opt_burst_size, Config::SET_VAL);
+ config.add_opt("nops", opt_nops, Config::SET_VAL);
+ config.add_opt("mpmc", opt_mpmc, Config::SWITCH_ON);
+ config.add_opt("help", opt_help, Config::SWITCH_ON, 'h', "show this help info");
+ config.parse(argc, argv);
+ if (opt_help->get())
+ {
+ config.print_help();
+ exit(0);
+ }
+
+ if (!opt_mpmc->get())
+ {
+ SALTICIDAE_LOG_INFO("testing an MPSC queue...");
+ test_mpsc(opt_nproducers->get(), opt_nops->get(),
+ opt_burst_size->get());
+ }
+ else
+ {
+ SALTICIDAE_LOG_INFO("testing an MPMC queue...");
+ test_mpmc(opt_nproducers->get(), opt_nconsumers->get(),
+ opt_nops->get(), opt_burst_size->get());
+ }
return 0;
}