From 27304354844c8baf8c14ef923e0ab65bc4df2dc0 Mon Sep 17 00:00:00 2001 From: Determinant Date: Wed, 21 Nov 2018 18:15:14 -0500 Subject: impl MPMCQueueEventDriven --- test/test_queue.cpp | 79 +++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 59 insertions(+), 20 deletions(-) (limited to 'test') diff --git a/test/test_queue.cpp b/test/test_queue.cpp index 7b06951..c565db3 100644 --- a/test/test_queue.cpp +++ b/test/test_queue.cpp @@ -3,10 +3,12 @@ #include #include "salticidae/event.h" +#include "salticidae/util.h" using salticidae::TimerEvent; +using salticidae::Config; -void test_mpsc(int nproducers = 16, int nops = 100000, size_t burst_size = 128) { +void test_mpsc(int nproducers, int nops, size_t burst_size) { size_t total = nproducers * nops; salticidae::EventContext ec; std::atomic collected(0); @@ -25,8 +27,9 @@ void test_mpsc(int nproducers = 16, int nops = 100000, size_t burst_size = 128) }); std::vector producers; std::thread consumer([&collected, total, &ec]() { - TimerEvent timer(ec, [&ec, &collected, total](TimerEvent &) { + TimerEvent timer(ec, [&ec, &collected, total](TimerEvent &timer) { if (collected.load() == total) ec.stop(); + timer.add(1); }); timer.add(1); ec.dispatch(); @@ -44,15 +47,15 @@ void test_mpsc(int nproducers = 16, int nops = 100000, size_t burst_size = 128) })); } for (auto &t: producers) t.join(); - fprintf(stderr, "producers terminate\n"); + SALTICIDAE_LOG_INFO("producers terminate"); consumer.join(); - fprintf(stderr, "consumers terminate\n"); + SALTICIDAE_LOG_INFO("consumers terminate"); } -/* -void test_mpmc(int nproducers = 16, int nconsumers = 4, int nops = 100000) { +void test_mpmc(int nproducers, int nconsumers, int nops, size_t burst_size) { size_t total = nproducers * nops; - salticidae::MPMCQueueEventDriven q; + using queue_t = salticidae::MPMCQueueEventDriven; + queue_t q; std::vector producers; std::vector consumers; std::vector ecs; @@ -60,21 +63,28 @@ void test_mpmc(int nproducers = 16, int nconsumers = 4, int nops = 100000) { ecs.resize(nconsumers); for (int i = 0; i < nconsumers; i++) { - q.listen(ecs[i], [&collected](int x) { - //usleep(10); - printf("%d\n", x); - collected.fetch_add(1); + q.reg_handler(ecs[i], [&collected, burst_size](queue_t &q) { + size_t cnt = burst_size; + int x; + while (q.try_dequeue(x)) + { + //usleep(10); + printf("%d\n", x); + collected.fetch_add(1); + if (!--cnt) return true; + } + return false; }); } for (int i = 0; i < nconsumers; i++) { consumers.emplace(consumers.end(), std::thread( [&collected, total, &ec = ecs[i]]() { - salticidae::Event timer(ec, -1, EV_TIMEOUT | EV_PERSIST, - [&ec, &collected, total](int, short) { + TimerEvent timer(ec, [&ec, &collected, total](TimerEvent &timer) { if (collected.load() == total) ec.stop(); + timer.add(1); }); - timer.add_with_timeout(1); + timer.add(1); ec.dispatch(); })); } @@ -91,14 +101,43 @@ void test_mpmc(int nproducers = 16, int nconsumers = 4, int nops = 100000) { })); } for (auto &t: producers) t.join(); - fprintf(stderr, "producers terminate\n"); + SALTICIDAE_LOG_INFO("producers terminate"); for (auto &t: consumers) t.join(); - fprintf(stderr, "consumers terminate\n"); + SALTICIDAE_LOG_INFO("consumers terminate"); } -*/ -int main() { - test_mpsc(); - //test_mpmc(); +int main(int argc, char **argv) { + Config config; + auto opt_nproducers = Config::OptValInt::create(16); + auto opt_nconsumers = Config::OptValInt::create(4); + auto opt_burst_size = Config::OptValInt::create(128); + auto opt_nops = Config::OptValInt::create(100000); + auto opt_mpmc = Config::OptValFlag::create(false); + auto opt_help = Config::OptValFlag::create(false); + config.add_opt("nproducers", opt_nproducers, Config::SET_VAL); + config.add_opt("nconsumers", opt_nconsumers, Config::SET_VAL); + config.add_opt("burst-size", opt_burst_size, Config::SET_VAL); + config.add_opt("nops", opt_nops, Config::SET_VAL); + config.add_opt("mpmc", opt_mpmc, Config::SWITCH_ON); + config.add_opt("help", opt_help, Config::SWITCH_ON, 'h', "show this help info"); + config.parse(argc, argv); + if (opt_help->get()) + { + config.print_help(); + exit(0); + } + + if (!opt_mpmc->get()) + { + SALTICIDAE_LOG_INFO("testing an MPSC queue..."); + test_mpsc(opt_nproducers->get(), opt_nops->get(), + opt_burst_size->get()); + } + else + { + SALTICIDAE_LOG_INFO("testing an MPMC queue..."); + test_mpmc(opt_nproducers->get(), opt_nconsumers->get(), + opt_nops->get(), opt_burst_size->get()); + } return 0; } -- cgit v1.2.3-70-g09d2