From 2b584edbddc782b6436a8e28d4f8b85a11f528a6 Mon Sep 17 00:00:00 2001 From: Determinant Date: Thu, 4 Oct 2018 18:30:23 -0400 Subject: add lock-free queue impl and event-driven queue --- test/test_queue.cpp | 92 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) create mode 100644 test/test_queue.cpp (limited to 'test/test_queue.cpp') diff --git a/test/test_queue.cpp b/test/test_queue.cpp new file mode 100644 index 0000000..bed88c4 --- /dev/null +++ b/test/test_queue.cpp @@ -0,0 +1,92 @@ +#include +#include +#include + +#include "salticidae/event.h" + +void test_mpsc(int nproducers = 16, int nops = 100000) { + size_t total = nproducers * nops; + salticidae::EventContext ec; + std::atomic collected(0); + salticidae::MPSCQueueEventDriven q(ec, [&collected](int x) { + printf("%d\n", x); + collected.fetch_add(1); + }); + std::vector producers; + std::thread consumer([&collected, total, &ec]() { + salticidae::Event timer(ec, -1, EV_TIMEOUT | EV_PERSIST, + [&ec, &collected, total](int, short) { + if (collected.load() == total) ec.stop(); + }); + timer.add_with_timeout(1); + ec.dispatch(); + }); + for (int i = 0; i < nproducers; i++) + { + producers.emplace(producers.end(), std::thread([&q, nops, i, nproducers]() { + int x = i; + for (int j = 0; j < nops; j++) + { + //usleep(rand() / double(RAND_MAX) * 100); + q.enqueue(x); + x += nproducers; + } + })); + } + for (auto &t: producers) t.join(); + fprintf(stderr, "producers terminate\n"); + consumer.join(); + fprintf(stderr, "consumers terminate\n"); +} + +void test_mpmc(int nproducers = 16, int nconsumers = 4, int nops = 100000) { + size_t total = nproducers * nops; + salticidae::MPMCQueueEventDriven q; + std::vector producers; + std::vector consumers; + std::vector ecs; + std::atomic collected(0); + ecs.resize(nconsumers); + for (int i = 0; i < nconsumers; i++) + { + q.listen(ecs[i], [&collected](int x) { + //usleep(10); + printf("%d\n", x); + collected.fetch_add(1); + }); + } + for (int i = 0; i < nconsumers; i++) + { + consumers.emplace(consumers.end(), std::thread( + [&collected, total, &ec = ecs[i]]() { + salticidae::Event timer(ec, -1, EV_TIMEOUT | EV_PERSIST, + [&ec, &collected, total](int, short) { + if (collected.load() == total) ec.stop(); + }); + timer.add_with_timeout(1); + ec.dispatch(); + })); + } + for (int i = 0; i < nproducers; i++) + { + producers.emplace(producers.end(), std::thread([&q, nops, i, nproducers]() { + int x = i; + for (int j = 0; j < nops; j++) + { + //usleep(rand() / double(RAND_MAX) * 100); + q.enqueue(x); + x += nproducers; + } + })); + } + for (auto &t: producers) t.join(); + fprintf(stderr, "producers terminate\n"); + for (auto &t: consumers) t.join(); + fprintf(stderr, "consumers terminate\n"); +} + +int main() { + //test_mpsc(); + test_mpmc(); + return 0; +} -- cgit v1.2.3