diff options
-rw-r--r-- | .gitmodules | 3 | ||||
m--------- | concurrentqueue | 0 | ||||
-rw-r--r-- | include/hotstuff/crypto.h | 2 | ||||
-rw-r--r-- | include/hotstuff/task.h | 100 | ||||
-rw-r--r-- | include/hotstuff/worker.h | 92 | ||||
m--------- | salticidae | 0 |
6 files changed, 101 insertions, 96 deletions
diff --git a/.gitmodules b/.gitmodules index 5674fe5..834ffa4 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,6 +4,3 @@ [submodule "salticidae"] path = salticidae url = https://github.com/Determinant/salticidae.git -[submodule "concurrentqueue"] - path = concurrentqueue - url = https://github.com/cameron314/concurrentqueue.git diff --git a/concurrentqueue b/concurrentqueue deleted file mode 160000 -Subproject 8f65a8734d77c3cc00d74c0532efca872931d3c diff --git a/include/hotstuff/crypto.h b/include/hotstuff/crypto.h index b79c433..a867ae9 100644 --- a/include/hotstuff/crypto.h +++ b/include/hotstuff/crypto.h @@ -6,7 +6,7 @@ #include "secp256k1.h" #include "salticidae/crypto.h" #include "hotstuff/type.h" -#include "hotstuff/worker.h" +#include "hotstuff/task.h" namespace hotstuff { diff --git a/include/hotstuff/task.h b/include/hotstuff/task.h new file mode 100644 index 0000000..2564e7d --- /dev/null +++ b/include/hotstuff/task.h @@ -0,0 +1,100 @@ +#ifndef _HOTSTUFF_WORKER_H +#define _HOTSTUFF_WORKER_H + +#include <thread> +#include <unordered_map> +#include <unistd.h> + +#include "salticidae/event.h" +#include "hotstuff/util.h" + +namespace hotstuff { + +class VeriTask { + friend class VeriPool; + bool result; + public: + virtual bool verify() = 0; + virtual ~VeriTask() = default; +}; + +using salticidae::ThreadCall; +using veritask_ut = BoxObj<VeriTask>; +using mpmc_queue_t = salticidae::MPMCQueueEventDriven<VeriTask *>; +using mpsc_queue_t = salticidae::MPSCQueueEventDriven<VeriTask *>; + +class VeriPool { + mpmc_queue_t in_queue; + mpsc_queue_t out_queue; + + struct Worker { + std::thread handle; + EventContext ec; + BoxObj<ThreadCall> tcall; + }; + + std::vector<Worker> workers; + std::unordered_map<VeriTask *, std::pair<veritask_ut, promise_t>> pms; + + public: + VeriPool(EventContext ec, size_t nworker, size_t burst_size = 128) { + out_queue.reg_handler(ec, [this, burst_size](mpsc_queue_t &q) { + size_t cnt = burst_size; + VeriTask *task; + while (q.try_dequeue(task)) + { + auto it = pms.find(task); + it->second.second.resolve(task->result); + pms.erase(it); + delete task; + if (!--cnt) return true; + } + return false; + }); + + workers.resize(nworker); + for (size_t i = 0; i < nworker; i++) + { + in_queue.reg_handler(workers[i].ec, [this, burst_size](mpmc_queue_t &q) { + size_t cnt = burst_size; + VeriTask *task; + while (q.try_dequeue(task)) + { + HOTSTUFF_LOG_DEBUG("%lu working on %u", + std::this_thread::get_id(), (uintptr_t)task); + task->result = task->verify(); + out_queue.enqueue(task); + if (!--cnt) return true; + } + return false; + }); + } + for (auto &w: workers) + { + w.tcall = new ThreadCall(w.ec); + w.handle = std::thread([ec=w.ec]() { ec.dispatch(); }); + } + } + + ~VeriPool() { + for (auto &w: workers) + w.tcall->async_call([ec=w.ec](ThreadCall::Handle &) { + ec.stop(); + }); + for (auto &w: workers) + w.handle.join(); + } + + promise_t verify(veritask_ut &&task) { + auto ptr = task.get(); + auto ret = pms.insert(std::make_pair(ptr, + std::make_pair(std::move(task), promise_t([](promise_t &){})))); + assert(ret.second); + in_queue.enqueue(ptr); + return ret.first->second.second; + } +}; + +} + +#endif diff --git a/include/hotstuff/worker.h b/include/hotstuff/worker.h deleted file mode 100644 index e39ea66..0000000 --- a/include/hotstuff/worker.h +++ /dev/null @@ -1,92 +0,0 @@ -#ifndef _HOTSTUFF_WORKER_H -#define _HOTSTUFF_WORKER_H - -#include <thread> -#include <unordered_map> -#include <unistd.h> -#include "concurrentqueue/blockingconcurrentqueue.h" - -namespace hotstuff { - -class VeriTask { - friend class VeriPool; - bool result; - public: - virtual bool verify() = 0; - virtual ~VeriTask() = default; -}; - -using veritask_ut = BoxObj<VeriTask>; - -class VeriPool { - using queue_t = moodycamel::BlockingConcurrentQueue<VeriTask *>; - int fin_fd[2]; - FdEvent fin_ev; - queue_t in_queue; - queue_t out_queue; - std::thread notifier; - std::vector<std::thread> workers; - std::unordered_map<VeriTask *, std::pair<veritask_ut, promise_t>> pms; - public: - VeriPool(EventContext ec, size_t nworker) { - pipe(fin_fd); - fin_ev = FdEvent(ec, fin_fd[0], [&](int fd, short) { - VeriTask *task; - bool result; - read(fd, &task, sizeof(VeriTask *)); - read(fd, &result, sizeof(bool)); - auto it = pms.find(task); - it->second.second.resolve(result); - pms.erase(it); - fin_ev.add(FdEvent::READ); - }); - fin_ev.add(FdEvent::READ); - // finish notifier thread - notifier = std::thread([this]() { - while (true) - { - VeriTask *task; - out_queue.wait_dequeue(task); - write(fin_fd[1], &task, sizeof(VeriTask *)); - write(fin_fd[1], &(task->result), sizeof(bool)); - } - }); - for (size_t i = 0; i < nworker; i++) - { - workers.push_back(std::thread([this]() { - while (true) - { - VeriTask *task; - in_queue.wait_dequeue(task); - //fprintf(stderr, "%lu working on %u\n", std::this_thread::get_id(), (uintptr_t)task); - task->result = task->verify(); - out_queue.enqueue(task); - } - })); - } - } - - ~VeriPool() { - notifier.detach(); - for (auto &w: workers) w.detach(); - close(fin_fd[0]); - close(fin_fd[1]); - } - - promise_t verify(veritask_ut &&task) { - auto ptr = task.get(); - auto ret = pms.insert(std::make_pair(ptr, - std::make_pair(std::move(task), promise_t([](promise_t &){})))); - assert(ret.second); - in_queue.enqueue(ptr); - return ret.first->second.second; - } - - int get_fd() { - return fin_fd[0]; - } -}; - -} - -#endif diff --git a/salticidae b/salticidae -Subproject 2b1f6791ddfd8ef4fb21cb4b50a3d6bc8694586 +Subproject 27304354844c8baf8c14ef923e0ab65bc4df2dc |