diff options
author | Determinant <[email protected]> | 2018-11-21 18:15:14 -0500 |
---|---|---|
committer | Determinant <[email protected]> | 2018-11-21 18:15:14 -0500 |
commit | 27304354844c8baf8c14ef923e0ab65bc4df2dc0 (patch) | |
tree | 0910f11ab364f33ac204841e65978c1cfb188936 | |
parent | 2b1f6791ddfd8ef4fb21cb4b50a3d6bc86945867 (diff) |
impl MPMCQueueEventDriven
-rw-r--r-- | include/salticidae/event.h | 51 | ||||
-rw-r--r-- | test/test_queue.cpp | 79 |
2 files changed, 110 insertions, 20 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h index bac707c..649cfdd 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -589,6 +589,57 @@ class MPSCQueueEventDriven: public MPSCQueue<T> { } }; +template<typename T> +class MPMCQueueEventDriven: public MPMCQueue<T> { + private: + const uint64_t dummy = 1; + std::atomic<bool> wait_sig; + int fd; + std::vector<FdEvent> evs; + + public: + MPMCQueueEventDriven(size_t capacity = 65536): + MPMCQueue<T>(capacity), + wait_sig(true), + fd(eventfd(0, EFD_NONBLOCK)) {} + + ~MPMCQueueEventDriven() { + evs.clear(); + close(fd); + } + + // this function is *NOT* thread-safe + template<typename Func> + void reg_handler(const EventContext &ec, Func &&func) { + FdEvent ev(ec, fd, [this, func=std::forward<Func>(func)](int, int) { + //fprintf(stderr, "%x\n", std::this_thread::get_id()); + uint64_t t; + if (read(fd, &t, 8) != 8) return; + // only one consumer should be here a a time + wait_sig.store(true, std::memory_order_release); + if (func(*this)) + write(fd, &dummy, 8); + }); + ev.add(FdEvent::READ); + evs.push_back(std::move(ev)); + } + + void unreg_handlers() { evs.clear(); } + + template<typename U> + bool enqueue(U &&e) { + static const uint64_t dummy = 1; + bool ret = MPMCQueue<T>::enqueue(std::forward<U>(e)); + // memory barrier here, so any load/store in enqueue must be finialized + if (wait_sig.exchange(false, std::memory_order_acq_rel)) + { + SALTICIDAE_LOG_DEBUG("mpsc notify"); + write(fd, &dummy, 8); + } + return ret; + } +}; + } #endif diff --git a/test/test_queue.cpp b/test/test_queue.cpp index 7b06951..c565db3 100644 --- a/test/test_queue.cpp +++ b/test/test_queue.cpp @@ -3,10 +3,12 @@ #include <atomic> #include "salticidae/event.h" +#include "salticidae/util.h" using salticidae::TimerEvent; +using salticidae::Config; -void test_mpsc(int nproducers = 16, int nops = 100000, size_t burst_size = 128) { +void test_mpsc(int nproducers, int nops, size_t burst_size) { size_t total = nproducers * nops; salticidae::EventContext ec; std::atomic<size_t> collected(0); @@ -25,8 +27,9 @@ void test_mpsc(int nproducers = 16, int nops = 100000, size_t burst_size = 128) }); std::vector<std::thread> producers; std::thread consumer([&collected, total, &ec]() { - TimerEvent timer(ec, [&ec, &collected, total](TimerEvent &) { + TimerEvent timer(ec, [&ec, &collected, total](TimerEvent &timer) { if (collected.load() == total) ec.stop(); + timer.add(1); }); timer.add(1); ec.dispatch(); @@ -44,15 +47,15 @@ void test_mpsc(int nproducers = 16, int nops = 100000, size_t burst_size = 128) })); } for (auto &t: producers) t.join(); - fprintf(stderr, "producers terminate\n"); + SALTICIDAE_LOG_INFO("producers terminate"); consumer.join(); - fprintf(stderr, "consumers terminate\n"); + SALTICIDAE_LOG_INFO("consumers terminate"); } -/* -void test_mpmc(int nproducers = 16, int nconsumers = 4, int nops = 100000) { +void test_mpmc(int nproducers, int nconsumers, int nops, size_t burst_size) { size_t total = nproducers * nops; - salticidae::MPMCQueueEventDriven<int> q; + using queue_t = salticidae::MPMCQueueEventDriven<int>; + queue_t q; std::vector<std::thread> producers; std::vector<std::thread> consumers; std::vector<salticidae::EventContext> ecs; @@ -60,21 +63,28 @@ void test_mpmc(int nproducers = 16, int nconsumers = 4, int nops = 100000) { ecs.resize(nconsumers); for (int i = 0; i < nconsumers; i++) { - q.listen(ecs[i], [&collected](int x) { - //usleep(10); - printf("%d\n", x); - collected.fetch_add(1); + q.reg_handler(ecs[i], [&collected, burst_size](queue_t &q) { + size_t cnt = burst_size; + int x; + while (q.try_dequeue(x)) + { + //usleep(10); + printf("%d\n", x); + collected.fetch_add(1); + if (!--cnt) return true; + } + return false; }); } for (int i = 0; i < nconsumers; i++) { consumers.emplace(consumers.end(), std::thread( [&collected, total, &ec = ecs[i]]() { - salticidae::Event timer(ec, -1, EV_TIMEOUT | EV_PERSIST, - [&ec, &collected, total](int, short) { + TimerEvent timer(ec, [&ec, &collected, total](TimerEvent &timer) { if (collected.load() == total) ec.stop(); + timer.add(1); }); - timer.add_with_timeout(1); + timer.add(1); ec.dispatch(); })); } @@ -91,14 +101,43 @@ void test_mpmc(int nproducers = 16, int nconsumers = 4, int nops = 100000) { })); } for (auto &t: producers) t.join(); - fprintf(stderr, "producers terminate\n"); + SALTICIDAE_LOG_INFO("producers terminate"); for (auto &t: consumers) t.join(); - fprintf(stderr, "consumers terminate\n"); + SALTICIDAE_LOG_INFO("consumers terminate"); } -*/ -int main() { - test_mpsc(); - //test_mpmc(); +int main(int argc, char **argv) { + Config config; + auto opt_nproducers = Config::OptValInt::create(16); + auto opt_nconsumers = Config::OptValInt::create(4); + auto opt_burst_size = Config::OptValInt::create(128); + auto opt_nops = Config::OptValInt::create(100000); + auto opt_mpmc = Config::OptValFlag::create(false); + auto opt_help = Config::OptValFlag::create(false); + config.add_opt("nproducers", opt_nproducers, Config::SET_VAL); + config.add_opt("nconsumers", opt_nconsumers, Config::SET_VAL); + config.add_opt("burst-size", opt_burst_size, Config::SET_VAL); + config.add_opt("nops", opt_nops, Config::SET_VAL); + config.add_opt("mpmc", opt_mpmc, Config::SWITCH_ON); + config.add_opt("help", opt_help, Config::SWITCH_ON, 'h', "show this help info"); + config.parse(argc, argv); + if (opt_help->get()) + { + config.print_help(); + exit(0); + } + + if (!opt_mpmc->get()) + { + SALTICIDAE_LOG_INFO("testing an MPSC queue..."); + test_mpsc(opt_nproducers->get(), opt_nops->get(), + opt_burst_size->get()); + } + else + { + SALTICIDAE_LOG_INFO("testing an MPMC queue..."); + test_mpmc(opt_nproducers->get(), opt_nconsumers->get(), + opt_nops->get(), opt_burst_size->get()); + } return 0; } |