diff options
author | Determinant <[email protected]> | 2018-09-10 16:07:21 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2018-09-10 16:07:21 -0400 |
commit | 073f33b2bdbef4fb711174033707d7b164036b6e (patch) | |
tree | f351d7119a3764fe72d6ad5a0b66f0a22ada9fb0 /test/test_concurrent_queue.cpp | |
parent | d959b9c8db4e9ba9695c08ae6c2f06edb6e82fdc (diff) | |
parent | 6261c95184b86c43755071b351e6928f89e2343c (diff) |
Merge branch 'multithreaded-verifier'
Diffstat (limited to 'test/test_concurrent_queue.cpp')
-rw-r--r-- | test/test_concurrent_queue.cpp | 68 |
1 files changed, 68 insertions, 0 deletions
diff --git a/test/test_concurrent_queue.cpp b/test/test_concurrent_queue.cpp new file mode 100644 index 0000000..7412213 --- /dev/null +++ b/test/test_concurrent_queue.cpp @@ -0,0 +1,68 @@ +#include "salticidae/event.h" +#include "concurrentqueue/blockingconcurrentqueue.h" +#include <thread> +#include <unistd.h> + +class VeriPool { + int fin_fd[2]; + moodycamel::BlockingConcurrentQueue<int> in_queue; + moodycamel::BlockingConcurrentQueue<int> out_queue; + std::thread notifier; + std::vector<std::thread> workers; + public: + VeriPool(size_t nworker) { + pipe(fin_fd); + // finish notifier thread + notifier = std::thread([this]() { + while (true) + { + int item; + out_queue.wait_dequeue(item); + write(fin_fd[1], &item, sizeof(item)); + } + }); + for (size_t i = 0; i < nworker; i++) + { + workers.push_back(std::thread([this]() { + while (true) + { + int item; + in_queue.wait_dequeue(item); + fprintf(stderr, "%lu working on %d\n", std::this_thread::get_id(), item); + out_queue.enqueue(item * 1000); + } + })); + } + } + + ~VeriPool() { + notifier.detach(); + for (auto &w: workers) w.detach(); + close(fin_fd[0]); + close(fin_fd[1]); + } + + void submit(int item) { + in_queue.enqueue(item); + } + + int get_fd() { + return fin_fd[0]; + } +}; + +int main() { + VeriPool p(2); + salticidae::EventContext ec; + salticidae::Event ev; + ev = salticidae::Event(ec, p.get_fd(), EV_READ, [&ev](int fd, short) { + int item; + read(fd, &item, sizeof(item)); + printf("finished %d\n", item); + ev.add(); + }); + for (int i = 0; i < 10000; i++) + p.submit(i); + ev.add(); + ec.dispatch(); +} |