diff options
Diffstat (limited to 'include/hotstuff/worker.h')
-rw-r--r-- | include/hotstuff/worker.h | 92 |
1 files changed, 92 insertions, 0 deletions
diff --git a/include/hotstuff/worker.h b/include/hotstuff/worker.h new file mode 100644 index 0000000..229b1bf --- /dev/null +++ b/include/hotstuff/worker.h @@ -0,0 +1,92 @@ +#ifndef _HOTSTUFF_WORKER_H +#define _HOTSTUFF_WORKER_H + +#include <thread> +#include <unordered_map> +#include <unistd.h> +#include "concurrentqueue/blockingconcurrentqueue.h" + +namespace hotstuff { + +class VeriTask { + friend class VeriPool; + bool result; + public: + virtual bool verify() = 0; + virtual ~VeriTask() = default; +}; + +using veritask_ut = BoxObj<VeriTask>; + +class VeriPool { + using queue_t = moodycamel::BlockingConcurrentQueue<VeriTask *>; + int fin_fd[2]; + Event fin_ev; + queue_t in_queue; + queue_t out_queue; + std::thread notifier; + std::vector<std::thread> workers; + std::unordered_map<VeriTask *, std::pair<veritask_ut, promise_t>> pms; + public: + VeriPool(EventContext ec, size_t nworker) { + pipe(fin_fd); + fin_ev = Event(ec, fin_fd[0], EV_READ, [&](int fd, short) { + VeriTask *task; + bool result; + read(fd, &task, sizeof(VeriTask *)); + read(fd, &result, sizeof(bool)); + auto it = pms.find(task); + it->second.second.resolve(result); + pms.erase(it); + fin_ev.add(); + }); + fin_ev.add(); + // finish notifier thread + notifier = std::thread([this]() { + while (true) + { + VeriTask *task; + out_queue.wait_dequeue(task); + write(fin_fd[1], &task, sizeof(VeriTask *)); + write(fin_fd[1], &(task->result), sizeof(bool)); + } + }); + for (size_t i = 0; i < nworker; i++) + { + workers.push_back(std::thread([this]() { + while (true) + { + VeriTask *task; + in_queue.wait_dequeue(task); + //fprintf(stderr, "%lu working on %u\n", std::this_thread::get_id(), (uintptr_t)task); + task->result = task->verify(); + out_queue.enqueue(task); + } + })); + } + } + + ~VeriPool() { + notifier.detach(); + for (auto &w: workers) w.detach(); + close(fin_fd[0]); + close(fin_fd[1]); + } + + promise_t verify(veritask_ut &&task) { + auto ptr = task.get(); + auto ret = pms.insert(std::make_pair(ptr, + std::make_pair(std::move(task), promise_t([](promise_t &){})))); + assert(ret.second); + in_queue.enqueue(ptr); + return ret.first->second.second; + } + + int get_fd() { + return fin_fd[0]; + } +}; + +} + +#endif |