From 6e60955da1225447625f179f8117787c0c579302 Mon Sep 17 00:00:00 2001 From: Determinant Date: Wed, 21 Nov 2018 18:39:30 -0500 Subject: drop the dependency on concurrentqueue (use native impl from salticidae) --- .gitmodules | 3 -- concurrentqueue | 1 - include/hotstuff/crypto.h | 2 +- include/hotstuff/task.h | 100 ++++++++++++++++++++++++++++++++++++++++++++++ include/hotstuff/worker.h | 92 ------------------------------------------ salticidae | 2 +- 6 files changed, 102 insertions(+), 98 deletions(-) delete mode 160000 concurrentqueue create mode 100644 include/hotstuff/task.h delete mode 100644 include/hotstuff/worker.h diff --git a/.gitmodules b/.gitmodules index 5674fe5..834ffa4 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,6 +4,3 @@ [submodule "salticidae"] path = salticidae url = https://github.com/Determinant/salticidae.git -[submodule "concurrentqueue"] - path = concurrentqueue - url = https://github.com/cameron314/concurrentqueue.git diff --git a/concurrentqueue b/concurrentqueue deleted file mode 160000 index 8f65a87..0000000 --- a/concurrentqueue +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 8f65a8734d77c3cc00d74c0532efca872931d3ce diff --git a/include/hotstuff/crypto.h b/include/hotstuff/crypto.h index b79c433..a867ae9 100644 --- a/include/hotstuff/crypto.h +++ b/include/hotstuff/crypto.h @@ -6,7 +6,7 @@ #include "secp256k1.h" #include "salticidae/crypto.h" #include "hotstuff/type.h" -#include "hotstuff/worker.h" +#include "hotstuff/task.h" namespace hotstuff { diff --git a/include/hotstuff/task.h b/include/hotstuff/task.h new file mode 100644 index 0000000..2564e7d --- /dev/null +++ b/include/hotstuff/task.h @@ -0,0 +1,100 @@ +#ifndef _HOTSTUFF_WORKER_H +#define _HOTSTUFF_WORKER_H + +#include +#include +#include + +#include "salticidae/event.h" +#include "hotstuff/util.h" + +namespace hotstuff { + +class VeriTask { + friend class VeriPool; + bool result; + public: + virtual bool verify() = 0; + virtual ~VeriTask() = default; +}; + +using salticidae::ThreadCall; +using veritask_ut = BoxObj; +using mpmc_queue_t = salticidae::MPMCQueueEventDriven; +using mpsc_queue_t = salticidae::MPSCQueueEventDriven; + +class VeriPool { + mpmc_queue_t in_queue; + mpsc_queue_t out_queue; + + struct Worker { + std::thread handle; + EventContext ec; + BoxObj tcall; + }; + + std::vector workers; + std::unordered_map> pms; + + public: + VeriPool(EventContext ec, size_t nworker, size_t burst_size = 128) { + out_queue.reg_handler(ec, [this, burst_size](mpsc_queue_t &q) { + size_t cnt = burst_size; + VeriTask *task; + while (q.try_dequeue(task)) + { + auto it = pms.find(task); + it->second.second.resolve(task->result); + pms.erase(it); + delete task; + if (!--cnt) return true; + } + return false; + }); + + workers.resize(nworker); + for (size_t i = 0; i < nworker; i++) + { + in_queue.reg_handler(workers[i].ec, [this, burst_size](mpmc_queue_t &q) { + size_t cnt = burst_size; + VeriTask *task; + while (q.try_dequeue(task)) + { + HOTSTUFF_LOG_DEBUG("%lu working on %u", + std::this_thread::get_id(), (uintptr_t)task); + task->result = task->verify(); + out_queue.enqueue(task); + if (!--cnt) return true; + } + return false; + }); + } + for (auto &w: workers) + { + w.tcall = new ThreadCall(w.ec); + w.handle = std::thread([ec=w.ec]() { ec.dispatch(); }); + } + } + + ~VeriPool() { + for (auto &w: workers) + w.tcall->async_call([ec=w.ec](ThreadCall::Handle &) { + ec.stop(); + }); + for (auto &w: workers) + w.handle.join(); + } + + promise_t verify(veritask_ut &&task) { + auto ptr = task.get(); + auto ret = pms.insert(std::make_pair(ptr, + std::make_pair(std::move(task), promise_t([](promise_t &){})))); + assert(ret.second); + in_queue.enqueue(ptr); + return ret.first->second.second; + } +}; + +} + +#endif diff --git a/include/hotstuff/worker.h b/include/hotstuff/worker.h deleted file mode 100644 index e39ea66..0000000 --- a/include/hotstuff/worker.h +++ /dev/null @@ -1,92 +0,0 @@ -#ifndef _HOTSTUFF_WORKER_H -#define _HOTSTUFF_WORKER_H - -#include -#include -#include -#include "concurrentqueue/blockingconcurrentqueue.h" - -namespace hotstuff { - -class VeriTask { - friend class VeriPool; - bool result; - public: - virtual bool verify() = 0; - virtual ~VeriTask() = default; -}; - -using veritask_ut = BoxObj; - -class VeriPool { - using queue_t = moodycamel::BlockingConcurrentQueue; - int fin_fd[2]; - FdEvent fin_ev; - queue_t in_queue; - queue_t out_queue; - std::thread notifier; - std::vector workers; - std::unordered_map> pms; - public: - VeriPool(EventContext ec, size_t nworker) { - pipe(fin_fd); - fin_ev = FdEvent(ec, fin_fd[0], [&](int fd, short) { - VeriTask *task; - bool result; - read(fd, &task, sizeof(VeriTask *)); - read(fd, &result, sizeof(bool)); - auto it = pms.find(task); - it->second.second.resolve(result); - pms.erase(it); - fin_ev.add(FdEvent::READ); - }); - fin_ev.add(FdEvent::READ); - // finish notifier thread - notifier = std::thread([this]() { - while (true) - { - VeriTask *task; - out_queue.wait_dequeue(task); - write(fin_fd[1], &task, sizeof(VeriTask *)); - write(fin_fd[1], &(task->result), sizeof(bool)); - } - }); - for (size_t i = 0; i < nworker; i++) - { - workers.push_back(std::thread([this]() { - while (true) - { - VeriTask *task; - in_queue.wait_dequeue(task); - //fprintf(stderr, "%lu working on %u\n", std::this_thread::get_id(), (uintptr_t)task); - task->result = task->verify(); - out_queue.enqueue(task); - } - })); - } - } - - ~VeriPool() { - notifier.detach(); - for (auto &w: workers) w.detach(); - close(fin_fd[0]); - close(fin_fd[1]); - } - - promise_t verify(veritask_ut &&task) { - auto ptr = task.get(); - auto ret = pms.insert(std::make_pair(ptr, - std::make_pair(std::move(task), promise_t([](promise_t &){})))); - assert(ret.second); - in_queue.enqueue(ptr); - return ret.first->second.second; - } - - int get_fd() { - return fin_fd[0]; - } -}; - -} - -#endif diff --git a/salticidae b/salticidae index 2b1f679..2730435 160000 --- a/salticidae +++ b/salticidae @@ -1 +1 @@ -Subproject commit 2b1f6791ddfd8ef4fb21cb4b50a3d6bc86945867 +Subproject commit 27304354844c8baf8c14ef923e0ab65bc4df2dc0 -- cgit v1.2.3