1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
#ifndef _HOTSTUFF_WORKER_H
#define _HOTSTUFF_WORKER_H
#include <thread>
#include <unordered_map>
#include <unistd.h>
#include "concurrentqueue/blockingconcurrentqueue.h"
namespace hotstuff {
class VeriTask {
friend class VeriPool;
bool result;
public:
virtual bool verify() = 0;
virtual ~VeriTask() = default;
};
using veritask_ut = BoxObj<VeriTask>;
class VeriPool {
using queue_t = moodycamel::BlockingConcurrentQueue<VeriTask *>;
int fin_fd[2];
Event fin_ev;
queue_t in_queue;
queue_t out_queue;
std::thread notifier;
std::vector<std::thread> workers;
std::unordered_map<VeriTask *, std::pair<veritask_ut, promise_t>> pms;
public:
VeriPool(EventContext ec, size_t nworker) {
pipe(fin_fd);
fin_ev = Event(ec, fin_fd[0], EV_READ, [&](int fd, short) {
VeriTask *task;
bool result;
read(fd, &task, sizeof(VeriTask *));
read(fd, &result, sizeof(bool));
auto it = pms.find(task);
it->second.second.resolve(result);
pms.erase(it);
fin_ev.add();
});
fin_ev.add();
// finish notifier thread
notifier = std::thread([this]() {
while (true)
{
VeriTask *task;
out_queue.wait_dequeue(task);
write(fin_fd[1], &task, sizeof(VeriTask *));
write(fin_fd[1], &(task->result), sizeof(bool));
}
});
for (size_t i = 0; i < nworker; i++)
{
workers.push_back(std::thread([this]() {
while (true)
{
VeriTask *task;
in_queue.wait_dequeue(task);
//fprintf(stderr, "%lu working on %u\n", std::this_thread::get_id(), (uintptr_t)task);
task->result = task->verify();
out_queue.enqueue(task);
}
}));
}
}
~VeriPool() {
notifier.detach();
for (auto &w: workers) w.detach();
close(fin_fd[0]);
close(fin_fd[1]);
}
promise_t verify(veritask_ut &&task) {
auto ptr = task.get();
auto ret = pms.insert(std::make_pair(ptr,
std::make_pair(std::move(task), promise_t([](promise_t &){}))));
assert(ret.second);
in_queue.enqueue(ptr);
return ret.first->second.second;
}
int get_fd() {
return fin_fd[0];
}
};
}
#endif
|