aboutsummaryrefslogtreecommitdiff
path: root/include/hotstuff/worker.h
blob: 229b1bf9674ddd994641646f579082c4770ae697 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#ifndef _HOTSTUFF_WORKER_H
#define _HOTSTUFF_WORKER_H

#include <thread>
#include <unordered_map>
#include <unistd.h>
#include "concurrentqueue/blockingconcurrentqueue.h"

namespace hotstuff {

class VeriTask {
    friend class VeriPool;
    bool result;
    public:
    virtual bool verify() = 0;
    virtual ~VeriTask() = default;
};

using veritask_ut = BoxObj<VeriTask>;

class VeriPool {
    using queue_t = moodycamel::BlockingConcurrentQueue<VeriTask *>;
    int fin_fd[2];
    Event fin_ev;
    queue_t in_queue;
    queue_t out_queue;
    std::thread notifier;
    std::vector<std::thread> workers;
    std::unordered_map<VeriTask *, std::pair<veritask_ut, promise_t>> pms;
    public:
    VeriPool(EventContext ec, size_t nworker) {
        pipe(fin_fd);
        fin_ev = Event(ec, fin_fd[0], EV_READ, [&](int fd, short) {
            VeriTask *task;
            bool result;
            read(fd, &task, sizeof(VeriTask *));
            read(fd, &result, sizeof(bool));
            auto it = pms.find(task);
            it->second.second.resolve(result);
            pms.erase(it);
            fin_ev.add();
        });
        fin_ev.add();
        // finish notifier thread
        notifier = std::thread([this]() {
            while (true)
            {
                VeriTask *task;
                out_queue.wait_dequeue(task);
                write(fin_fd[1], &task, sizeof(VeriTask *));
                write(fin_fd[1], &(task->result), sizeof(bool));
            }
        });
        for (size_t i = 0; i < nworker; i++)
        {
            workers.push_back(std::thread([this]() {
                while (true)
                {
                    VeriTask *task;
                    in_queue.wait_dequeue(task);
                    //fprintf(stderr, "%lu working on %u\n", std::this_thread::get_id(), (uintptr_t)task);
                    task->result = task->verify();
                    out_queue.enqueue(task);
                }
            }));
        }
    }

    ~VeriPool() {
        notifier.detach();
        for (auto &w: workers) w.detach();
        close(fin_fd[0]);
        close(fin_fd[1]);
    }

    promise_t verify(veritask_ut &&task) {
        auto ptr = task.get();
        auto ret = pms.insert(std::make_pair(ptr,
                std::make_pair(std::move(task), promise_t([](promise_t &){}))));
        assert(ret.second);
        in_queue.enqueue(ptr);
        return ret.first->second.second;
    }

    int get_fd() {
        return fin_fd[0];
    }
};

}

#endif