aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitmodules3
m---------concurrentqueue0
-rw-r--r--include/hotstuff/crypto.h2
-rw-r--r--include/hotstuff/task.h100
-rw-r--r--include/hotstuff/worker.h92
m---------salticidae0
6 files changed, 101 insertions, 96 deletions
diff --git a/.gitmodules b/.gitmodules
index 5674fe5..834ffa4 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -4,6 +4,3 @@
[submodule "salticidae"]
path = salticidae
url = https://github.com/Determinant/salticidae.git
-[submodule "concurrentqueue"]
- path = concurrentqueue
- url = https://github.com/cameron314/concurrentqueue.git
diff --git a/concurrentqueue b/concurrentqueue
deleted file mode 160000
-Subproject 8f65a8734d77c3cc00d74c0532efca872931d3c
diff --git a/include/hotstuff/crypto.h b/include/hotstuff/crypto.h
index b79c433..a867ae9 100644
--- a/include/hotstuff/crypto.h
+++ b/include/hotstuff/crypto.h
@@ -6,7 +6,7 @@
#include "secp256k1.h"
#include "salticidae/crypto.h"
#include "hotstuff/type.h"
-#include "hotstuff/worker.h"
+#include "hotstuff/task.h"
namespace hotstuff {
diff --git a/include/hotstuff/task.h b/include/hotstuff/task.h
new file mode 100644
index 0000000..2564e7d
--- /dev/null
+++ b/include/hotstuff/task.h
@@ -0,0 +1,100 @@
+#ifndef _HOTSTUFF_WORKER_H
+#define _HOTSTUFF_WORKER_H
+
+#include <thread>
+#include <unordered_map>
+#include <unistd.h>
+
+#include "salticidae/event.h"
+#include "hotstuff/util.h"
+
+namespace hotstuff {
+
+class VeriTask {
+ friend class VeriPool;
+ bool result;
+ public:
+ virtual bool verify() = 0;
+ virtual ~VeriTask() = default;
+};
+
+using salticidae::ThreadCall;
+using veritask_ut = BoxObj<VeriTask>;
+using mpmc_queue_t = salticidae::MPMCQueueEventDriven<VeriTask *>;
+using mpsc_queue_t = salticidae::MPSCQueueEventDriven<VeriTask *>;
+
+class VeriPool {
+ mpmc_queue_t in_queue;
+ mpsc_queue_t out_queue;
+
+ struct Worker {
+ std::thread handle;
+ EventContext ec;
+ BoxObj<ThreadCall> tcall;
+ };
+
+ std::vector<Worker> workers;
+ std::unordered_map<VeriTask *, std::pair<veritask_ut, promise_t>> pms;
+
+ public:
+ VeriPool(EventContext ec, size_t nworker, size_t burst_size = 128) {
+ out_queue.reg_handler(ec, [this, burst_size](mpsc_queue_t &q) {
+ size_t cnt = burst_size;
+ VeriTask *task;
+ while (q.try_dequeue(task))
+ {
+ auto it = pms.find(task);
+ it->second.second.resolve(task->result);
+ pms.erase(it);
+ delete task;
+ if (!--cnt) return true;
+ }
+ return false;
+ });
+
+ workers.resize(nworker);
+ for (size_t i = 0; i < nworker; i++)
+ {
+ in_queue.reg_handler(workers[i].ec, [this, burst_size](mpmc_queue_t &q) {
+ size_t cnt = burst_size;
+ VeriTask *task;
+ while (q.try_dequeue(task))
+ {
+ HOTSTUFF_LOG_DEBUG("%lu working on %u",
+ std::this_thread::get_id(), (uintptr_t)task);
+ task->result = task->verify();
+ out_queue.enqueue(task);
+ if (!--cnt) return true;
+ }
+ return false;
+ });
+ }
+ for (auto &w: workers)
+ {
+ w.tcall = new ThreadCall(w.ec);
+ w.handle = std::thread([ec=w.ec]() { ec.dispatch(); });
+ }
+ }
+
+ ~VeriPool() {
+ for (auto &w: workers)
+ w.tcall->async_call([ec=w.ec](ThreadCall::Handle &) {
+ ec.stop();
+ });
+ for (auto &w: workers)
+ w.handle.join();
+ }
+
+ promise_t verify(veritask_ut &&task) {
+ auto ptr = task.get();
+ auto ret = pms.insert(std::make_pair(ptr,
+ std::make_pair(std::move(task), promise_t([](promise_t &){}))));
+ assert(ret.second);
+ in_queue.enqueue(ptr);
+ return ret.first->second.second;
+ }
+};
+
+}
+
+#endif
diff --git a/include/hotstuff/worker.h b/include/hotstuff/worker.h
deleted file mode 100644
index e39ea66..0000000
--- a/include/hotstuff/worker.h
+++ /dev/null
@@ -1,92 +0,0 @@
-#ifndef _HOTSTUFF_WORKER_H
-#define _HOTSTUFF_WORKER_H
-
-#include <thread>
-#include <unordered_map>
-#include <unistd.h>
-#include "concurrentqueue/blockingconcurrentqueue.h"
-
-namespace hotstuff {
-
-class VeriTask {
- friend class VeriPool;
- bool result;
- public:
- virtual bool verify() = 0;
- virtual ~VeriTask() = default;
-};
-
-using veritask_ut = BoxObj<VeriTask>;
-
-class VeriPool {
- using queue_t = moodycamel::BlockingConcurrentQueue<VeriTask *>;
- int fin_fd[2];
- FdEvent fin_ev;
- queue_t in_queue;
- queue_t out_queue;
- std::thread notifier;
- std::vector<std::thread> workers;
- std::unordered_map<VeriTask *, std::pair<veritask_ut, promise_t>> pms;
- public:
- VeriPool(EventContext ec, size_t nworker) {
- pipe(fin_fd);
- fin_ev = FdEvent(ec, fin_fd[0], [&](int fd, short) {
- VeriTask *task;
- bool result;
- read(fd, &task, sizeof(VeriTask *));
- read(fd, &result, sizeof(bool));
- auto it = pms.find(task);
- it->second.second.resolve(result);
- pms.erase(it);
- fin_ev.add(FdEvent::READ);
- });
- fin_ev.add(FdEvent::READ);
- // finish notifier thread
- notifier = std::thread([this]() {
- while (true)
- {
- VeriTask *task;
- out_queue.wait_dequeue(task);
- write(fin_fd[1], &task, sizeof(VeriTask *));
- write(fin_fd[1], &(task->result), sizeof(bool));
- }
- });
- for (size_t i = 0; i < nworker; i++)
- {
- workers.push_back(std::thread([this]() {
- while (true)
- {
- VeriTask *task;
- in_queue.wait_dequeue(task);
- //fprintf(stderr, "%lu working on %u\n", std::this_thread::get_id(), (uintptr_t)task);
- task->result = task->verify();
- out_queue.enqueue(task);
- }
- }));
- }
- }
-
- ~VeriPool() {
- notifier.detach();
- for (auto &w: workers) w.detach();
- close(fin_fd[0]);
- close(fin_fd[1]);
- }
-
- promise_t verify(veritask_ut &&task) {
- auto ptr = task.get();
- auto ret = pms.insert(std::make_pair(ptr,
- std::make_pair(std::move(task), promise_t([](promise_t &){}))));
- assert(ret.second);
- in_queue.enqueue(ptr);
- return ret.first->second.second;
- }
-
- int get_fd() {
- return fin_fd[0];
- }
-};
-
-}
-
-#endif
diff --git a/salticidae b/salticidae
-Subproject 2b1f6791ddfd8ef4fb21cb4b50a3d6bc8694586
+Subproject 27304354844c8baf8c14ef923e0ab65bc4df2dc