#include <cstdio>
#include <thread>
#include <atomic>
#include "salticidae/event.h"
#include "salticidae/util.h"
using salticidae::TimerEvent;
using salticidae::Config;
void test_mpsc(int nproducers, int nops, size_t burst_size, bool test_rewind) {
size_t total = nproducers * nops;
salticidae::EventContext ec;
std::atomic<size_t> collected(0);
using queue_t = salticidae::MPSCQueueEventDriven<int>;
queue_t q;
q.set_capacity(65536);
q.reg_handler(ec, [&collected, burst_size, test_rewind](queue_t &q) {
size_t cnt = burst_size;
int x;
while (q.try_dequeue(x))
{
if (test_rewind && (rand() & 1))
q.rewind(x);
else
{
collected.fetch_add(1);
printf("%d\n", x);
}
if (!--cnt) return true;
}
return false;
});
std::vector<std::thread> producers;
std::thread consumer([&collected, total, &ec]() {
TimerEvent timer(ec, [&ec, &collected, total](TimerEvent &timer) {
if (collected.load() == total) ec.stop();
timer.add(1);
});
timer.add(1);
ec.dispatch();
});
for (int i = 0; i < nproducers; i++)
{
producers.emplace(producers.end(), std::thread([&q, nops, i, nproducers]() {
int x = i;
for (int j = 0; j < nops; j++)
{
//usleep(rand() / double(RAND_MAX) * 100);
while (!q.enqueue(x, false))
std::this_thread::yield();
x += nproducers;
}
}));
}
for (auto &t: producers) t.join();
SALTICIDAE_LOG_INFO("producers terminate");
consumer.join();
SALTICIDAE_LOG_INFO("consumers terminate");
}
void test_mpmc(int nproducers, int nconsumers, int nops, size_t burst_size) {
size_t total = nproducers * nops;
using queue_t = salticidae::MPMCQueueEventDriven<int>;
queue_t q;
std::vector<std::thread> producers;
std::vector<std::thread> consumers;
std::vector<salticidae::EventContext> ecs;
std::atomic<size_t> collected(0);
ecs.resize(nconsumers);
q.set_capacity(65536);
for (int i = 0; i < nconsumers; i++)
{
q.reg_handler(ecs[i], [&collected, burst_size](queue_t &q) {
size_t cnt = burst_size;
int x;
while (q.try_dequeue(x))
{
//usleep(10);
printf("%d\n", x);
collected.fetch_add(1);
if (!--cnt) return true;
}
return false;
});
}
for (int i = 0; i < nconsumers; i++)
{
consumers.emplace(consumers.end(), std::thread(
[&collected, total, &ec = ecs[i]]() {
TimerEvent timer(ec, [&ec, &collected, total](TimerEvent &timer) {
if (collected.load() == total) ec.stop();
timer.add(1);
});
timer.add(1);
ec.dispatch();
}));
}
for (int i = 0; i < nproducers; i++)
{
producers.emplace(producers.end(), std::thread([&