From 2b584edbddc782b6436a8e28d4f8b85a11f528a6 Mon Sep 17 00:00:00 2001 From: Determinant Date: Thu, 4 Oct 2018 18:30:23 -0400 Subject: add lock-free queue impl and event-driven queue --- test/.gitignore | 1 + test/CMakeLists.txt | 3 ++ test/test_queue.cpp | 92 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 96 insertions(+) create mode 100644 test/test_queue.cpp (limited to 'test') diff --git a/test/.gitignore b/test/.gitignore index 2cd44f2..592f084 100644 --- a/test/.gitignore +++ b/test/.gitignore @@ -1,5 +1,6 @@ test_msg test_bits test_network +test_queue bench_network Makefile diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 9689df3..44c984d 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -29,5 +29,8 @@ target_link_libraries(test_bits salticidae_static) add_executable(test_network test_network.cpp) target_link_libraries(test_network salticidae_static) +add_executable(test_queue test_queue.cpp) +target_link_libraries(test_queue salticidae_static pthread) + add_executable(bench_network bench_network.cpp) target_link_libraries(bench_network salticidae_static pthread) diff --git a/test/test_queue.cpp b/test/test_queue.cpp new file mode 100644 index 0000000..bed88c4 --- /dev/null +++ b/test/test_queue.cpp @@ -0,0 +1,92 @@ +#include +#include +#include + +#include "salticidae/event.h" + +void test_mpsc(int nproducers = 16, int nops = 100000) { + size_t total = nproducers * nops; + salticidae::EventContext ec; + std::atomic collected(0); + salticidae::MPSCQueueEventDriven q(ec, [&collected](int x) { + printf("%d\n", x); + collected.fetch_add(1); + }); + std::vector producers; + std::thread consumer([&collected, total, &ec]() { + salticidae::Event timer(ec, -1, EV_TIMEOUT | EV_PERSIST, + [&ec, &collected, total](int, short) { + if (collected.load() == total) ec.stop(); + }); + timer.add_with_timeout(1); + ec.dispatch(); + }); + for (int i = 0; i < nproducers; i++) + { + producers.emplace(producers.end(), std::thread([&q, nops, i, nproducers]() { + int x = i; + for (int j = 0; j < nops; j++) + { + //usleep(rand() / double(RAND_MAX) * 100); + q.enqueue(x); + x += nproducers; + } + })); + } + for (auto &t: producers) t.join(); + fprintf(stderr, "producers terminate\n"); + consumer.join(); + fprintf(stderr, "consumers terminate\n"); +} + +void test_mpmc(int nproducers = 16, int nconsumers = 4, int nops = 100000) { + size_t total = nproducers * nops; + salticidae::MPMCQueueEventDriven q; + std::vector producers; + std::vector consumers; + std::vector ecs; + std::atomic collected(0); + ecs.resize(nconsumers); + for (int i = 0; i < nconsumers; i++) + { + q.listen(ecs[i], [&collected](int x) { + //usleep(10); + printf("%d\n", x); + collected.fetch_add(1); + }); + } + for (int i = 0; i < nconsumers; i++) + { + consumers.emplace(consumers.end(), std::thread( + [&collected, total, &ec = ecs[i]]() { + salticidae::Event timer(ec, -1, EV_TIMEOUT | EV_PERSIST, + [&ec, &collected, total](int, short) { + if (collected.load() == total) ec.stop(); + }); + timer.add_with_timeout(1); + ec.dispatch(); + })); + } + for (int i = 0; i < nproducers; i++) + { + producers.emplace(producers.end(), std::thread([&q, nops, i, nproducers]() { + int x = i; + for (int j = 0; j < nops; j++) + { + //usleep(rand() / double(RAND_MAX) * 100); + q.enqueue(x); + x += nproducers; + } + })); + } + for (auto &t: producers) t.join(); + fprintf(stderr, "producers terminate\n"); + for (auto &t: consumers) t.join(); + fprintf(stderr, "consumers terminate\n"); +} + +int main() { + //test_mpsc(); + test_mpmc(); + return 0; +} -- cgit v1.2.3