aboutsummaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2018-11-21 18:15:14 -0500
committerDeterminant <[email protected]>2018-11-21 18:15:14 -0500
commit27304354844c8baf8c14ef923e0ab65bc4df2dc0 (patch)
tree0910f11ab364f33ac204841e65978c1cfb188936 /test
parent2b1f6791ddfd8ef4fb21cb4b50a3d6bc86945867 (diff)
impl MPMCQueueEventDriven
Diffstat (limited to 'test')
-rw-r--r--test/test_queue.cpp79
1 files changed, 59 insertions, 20 deletions
diff --git a/test/test_queue.cpp b/test/test_queue.cpp
index 7b06951..c565db3 100644
--- a/test/test_queue.cpp
+++ b/test/test_queue.cpp
@@ -3,10 +3,12 @@
#include <atomic>
#include "salticidae/event.h"
+#include "salticidae/util.h"
using salticidae::TimerEvent;
+using salticidae::Config;
-void test_mpsc(int nproducers = 16, int nops = 100000, size_t burst_size = 128) {
+void test_mpsc(int nproducers, int nops, size_t burst_size) {
size_t total = nproducers * nops;
salticidae::EventContext ec;
std::atomic<size_t> collected(0);
@@ -25,8 +27,9 @@ void test_mpsc(int nproducers = 16, int nops = 100000, size_t burst_size = 128)
});
std::vector<std::thread> producers;
std::thread consumer([&collected, total, &ec]() {
- TimerEvent timer(ec, [&ec, &collected, total](TimerEvent &) {
+ TimerEvent timer(ec, [&ec, &collected, total](TimerEvent &timer) {
if (collected.load() == total) ec.stop();
+ timer.add(1);
});
timer.add(1);
ec.dispatch();
@@ -44,15 +47,15 @@ void test_mpsc(int nproducers = 16, int nops = 100000, size_t burst_size = 128)
}));
}
for (auto &t: producers) t.join();
- fprintf(stderr, "producers terminate\n");
+ SALTICIDAE_LOG_INFO("producers terminate");
consumer.join();
- fprintf(stderr, "consumers terminate\n");
+ SALTICIDAE_LOG_INFO("consumers terminate");
}
-/*
-void test_mpmc(int nproducers = 16, int nconsumers = 4, int nops = 100000) {
+void test_mpmc(int nproducers, int nconsumers, int nops, size_t burst_size) {
size_t total = nproducers * nops;
- salticidae::MPMCQueueEventDriven<int> q;
+ using queue_t = salticidae::MPMCQueueEventDriven<int>;
+ queue_t q;
std::vector<std::thread> producers;
std::vector<std::thread> consumers;
std::vector<salticidae::EventContext> ecs;
@@ -60,21 +63,28 @@ void test_mpmc(int nproducers = 16, int nconsumers = 4, int nops = 100000) {
ecs.resize(nconsumers);
for (int i = 0; i < nconsumers; i++)
{
- q.listen(ecs[i], [&collected](int x) {
- //usleep(10);
- printf("%d\n", x);
- collected.fetch_add(1);
+ q.reg_handler(ecs[i], [&collected, burst_size](queue_t &q) {
+ size_t cnt = burst_size;
+ int x;
+ while (q.try_dequeue(x))
+ {
+ //usleep(10);
+ printf("%d\n", x);
+ collected.fetch_add(1);
+ if (!--cnt) return true;
+ }
+ return false;
});
}
for (int i = 0; i < nconsumers; i++)
{
consumers.emplace(consumers.end(), std::thread(
[&collected, total, &ec = ecs[i]]() {
- salticidae::Event timer(ec, -1, EV_TIMEOUT | EV_PERSIST,
- [&ec, &collected, total](int, short) {
+ TimerEvent timer(ec, [&ec, &collected, total](TimerEvent &timer) {
if (collected.load() == total) ec.stop();
+ timer.add(1);
});
- timer.add_with_timeout(1);
+ timer.add(1);
ec.dispatch();
}));
}
@@ -91,14 +101,43 @@ void test_mpmc(int nproducers = 16, int nconsumers = 4, int nops = 100000) {
}));
}
for (auto &t: producers) t.join();
- fprintf(stderr, "producers terminate\n");
+ SALTICIDAE_LOG_INFO("producers terminate");
for (auto &t: consumers) t.join();
- fprintf(stderr, "consumers terminate\n");
+ SALTICIDAE_LOG_INFO("consumers terminate");
}
-*/
-int main() {
- test_mpsc();
- //test_mpmc();
+int main(int argc, char **argv) {
+ Config config;
+ auto opt_nproducers = Config::OptValInt::create(16);
+ auto opt_nconsumers = Config::OptValInt::create(4);
+ auto opt_burst_size = Config::OptValInt::create(128);
+ auto opt_nops = Config::OptValInt::create(100000);
+ auto opt_mpmc = Config::OptValFlag::create(false);
+ auto opt_help = Config::OptValFlag::create(false);
+ config.add_opt("nproducers", opt_nproducers, Config::SET_VAL);
+ config.add_opt("nconsumers", opt_nconsumers, Config::SET_VAL);
+ config.add_opt("burst-size", opt_burst_size, Config::SET_VAL);
+ config.add_opt("nops", opt_nops, Config::SET_VAL);
+ config.add_opt("mpmc", opt_mpmc, Config::SWITCH_ON);
+ config.add_opt("help", opt_help, Config::SWITCH_ON, 'h', "show this help info");
+ config.parse(argc, argv);
+ if (opt_help->get())
+ {
+ config.print_help();
+ exit(0);
+ }
+
+ if (!opt_mpmc->get())
+ {
+ SALTICIDAE_LOG_INFO("testing an MPSC queue...");
+ test_mpsc(opt_nproducers->get(), opt_nops->get(),
+ opt_burst_size->get());
+ }
+ else
+ {
+ SALTICIDAE_LOG_INFO("testing an MPMC queue...");
+ test_mpmc(opt_nproducers->get(), opt_nconsumers->get(),
+ opt_nops->get(), opt_burst_size->get());
+ }
return 0;
}