diff options
Diffstat (limited to 'test/test_queue.cpp')
-rw-r--r-- | test/test_queue.cpp | 92 |
1 files changed, 92 insertions, 0 deletions
diff --git a/test/test_queue.cpp b/test/test_queue.cpp new file mode 100644 index 0000000..bed88c4 --- /dev/null +++ b/test/test_queue.cpp @@ -0,0 +1,92 @@ +#include <cstdio> +#include <thread> +#include <atomic> + +#include "salticidae/event.h" + +void test_mpsc(int nproducers = 16, int nops = 100000) { + size_t total = nproducers * nops; + salticidae::EventContext ec; + std::atomic<size_t> collected(0); + salticidae::MPSCQueueEventDriven<int> q(ec, [&collected](int x) { + printf("%d\n", x); + collected.fetch_add(1); + }); + std::vector<std::thread> producers; + std::thread consumer([&collected, total, &ec]() { + salticidae::Event timer(ec, -1, EV_TIMEOUT | EV_PERSIST, + [&ec, &collected, total](int, short) { + if (collected.load() == total) ec.stop(); + }); + timer.add_with_timeout(1); + ec.dispatch(); + }); + for (int i = 0; i < nproducers; i++) + { + producers.emplace(producers.end(), std::thread([&q, nops, i, nproducers]() { + int x = i; + for (int j = 0; j < nops; j++) + { + //usleep(rand() / double(RAND_MAX) * 100); + q.enqueue(x); + x += nproducers; + } + })); + } + for (auto &t: producers) t.join(); + fprintf(stderr, "producers terminate\n"); + consumer.join(); + fprintf(stderr, "consumers terminate\n"); +} + +void test_mpmc(int nproducers = 16, int nconsumers = 4, int nops = 100000) { + size_t total = nproducers * nops; + salticidae::MPMCQueueEventDriven<int> q; + std::vector<std::thread> producers; + std::vector<std::thread> consumers; + std::vector<salticidae::EventContext> ecs; + std::atomic<size_t> collected(0); + ecs.resize(nconsumers); + for (int i = 0; i < nconsumers; i++) + { + q.listen(ecs[i], [&collected](int x) { + //usleep(10); + printf("%d\n", x); + collected.fetch_add(1); + }); + } + for (int i = 0; i < nconsumers; i++) + { + consumers.emplace(consumers.end(), std::thread( + [&collected, total, &ec = ecs[i]]() { + salticidae::Event timer(ec, -1, EV_TIMEOUT | EV_PERSIST, + [&ec, &collected, total](int, short) { + if (collected.load() == total) ec.stop(); + }); + timer.add_with_timeout(1); + ec.dispatch(); + })); + } + for (int i = 0; i < nproducers; i++) + { + producers.emplace(producers.end(), std::thread([&q, nops, i, nproducers]() { + int x = i; + for (int j = 0; j < nops; j++) + { + //usleep(rand() / double(RAND_MAX) * 100); + q.enqueue(x); + x += nproducers; + } + })); + } + for (auto &t: producers) t.join(); + fprintf(stderr, "producers terminate\n"); + for (auto &t: consumers) t.join(); + fprintf(stderr, "consumers terminate\n"); +} + +int main() { + //test_mpsc(); + test_mpmc(); + return 0; +} |