From 6e60955da1225447625f179f8117787c0c579302 Mon Sep 17 00:00:00 2001 From: Determinant Date: Wed, 21 Nov 2018 18:39:30 -0500 Subject: drop the dependency on concurrentqueue (use native impl from salticidae) --- include/hotstuff/task.h | 100 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 include/hotstuff/task.h (limited to 'include/hotstuff/task.h') diff --git a/include/hotstuff/task.h b/include/hotstuff/task.h new file mode 100644 index 0000000..2564e7d --- /dev/null +++ b/include/hotstuff/task.h @@ -0,0 +1,100 @@ +#ifndef _HOTSTUFF_WORKER_H +#define _HOTSTUFF_WORKER_H + +#include +#include +#include + +#include "salticidae/event.h" +#include "hotstuff/util.h" + +namespace hotstuff { + +class VeriTask { + friend class VeriPool; + bool result; + public: + virtual bool verify() = 0; + virtual ~VeriTask() = default; +}; + +using salticidae::ThreadCall; +using veritask_ut = BoxObj; +using mpmc_queue_t = salticidae::MPMCQueueEventDriven; +using mpsc_queue_t = salticidae::MPSCQueueEventDriven; + +class VeriPool { + mpmc_queue_t in_queue; + mpsc_queue_t out_queue; + + struct Worker { + std::thread handle; + EventContext ec; + BoxObj tcall; + }; + + std::vector workers; + std::unordered_map> pms; + + public: + VeriPool(EventContext ec, size_t nworker, size_t burst_size = 128) { + out_queue.reg_handler(ec, [this, burst_size](mpsc_queue_t &q) { + size_t cnt = burst_size; + VeriTask *task; + while (q.try_dequeue(task)) + { + auto it = pms.find(task); + it->second.second.resolve(task->result); + pms.erase(it); + delete task; + if (!--cnt) return true; + } + return false; + }); + + workers.resize(nworker); + for (size_t i = 0; i < nworker; i++) + { + in_queue.reg_handler(workers[i].ec, [this, burst_size](mpmc_queue_t &q) { + size_t cnt = burst_size; + VeriTask *task; + while (q.try_dequeue(task)) + { + HOTSTUFF_LOG_DEBUG("%lu working on %u", + std::this_thread::get_id(), (uintptr_t)task); + task->result = task->verify(); + out_queue.enqueue(task); + if (!--cnt) return true; + } + return false; + }); + } + for (auto &w: workers) + { + w.tcall = new ThreadCall(w.ec); + w.handle = std::thread([ec=w.ec]() { ec.dispatch(); }); + } + } + + ~VeriPool() { + for (auto &w: workers) + w.tcall->async_call([ec=w.ec](ThreadCall::Handle &) { + ec.stop(); + }); + for (auto &w: workers) + w.handle.join(); + } + + promise_t verify(veritask_ut &&task) { + auto ptr = task.get(); + auto ret = pms.insert(std::make_pair(ptr, + std::make_pair(std::move(task), promise_t([](promise_t &){})))); + assert(ret.second); + in_queue.enqueue(ptr); + return ret.first->second.second; + } +}; + +} + +#endif -- cgit v1.2.3