aboutsummaryrefslogtreecommitdiff
path: root/test/test_concurrent_queue.cpp
blob: 7412213a3cb66753c4a743e3032afa90b041ec52 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#include "salticidae/event.h"
#include "concurrentqueue/blockingconcurrentqueue.h"
#include <thread>
#include <unistd.h>

class VeriPool {
    int fin_fd[2];
    moodycamel::BlockingConcurrentQueue<int> in_queue;
    moodycamel::BlockingConcurrentQueue<int> out_queue;
    std::thread notifier;
    std::vector<std::thread> workers;
    public:
    VeriPool(size_t nworker) {
        pipe(fin_fd);
        // finish notifier thread
        notifier = std::thread([this]() {
            while (true)
            {
                int item;
                out_queue.wait_dequeue(item);
                write(fin_fd[1], &item, sizeof(item));
            }
        });
        for (size_t i = 0; i < nworker; i++)
        {
            workers.push_back(std::thread([this]() {
                while (true)
                {
                    int item;
                    in_queue.wait_dequeue(item);
                    fprintf(stderr, "%lu working on %d\n", std::this_thread::get_id(), item);
                    out_queue.enqueue(item * 1000);
                }
            }));
        }
    }

    ~VeriPool() {
        notifier.detach();
        for (auto &w: workers) w.detach();
        close(fin_fd[0]);
        close(fin_fd[1]);
    }

    void submit(int item) {
        in_queue.enqueue(item);
    }

    int get_fd() {
        return fin_fd[0];
    }
};

int main() {
    VeriPool p(2);
    salticidae::EventContext ec;
    salticidae::Event ev;
    ev = salticidae::Event(ec, p.get_fd(), EV_READ, [&ev](int fd, short) {
        int item;
        read(fd, &item, sizeof(item));
        printf("finished %d\n", item);
        ev.add();
    });
    for (int i = 0; i < 10000; i++)
        p.submit(i);
    ev.add();
    ec.dispatch();
}