aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2018-09-10 16:06:27 -0400
committerDeterminant <[email protected]>2018-09-10 16:06:27 -0400
commit6261c95184b86c43755071b351e6928f89e2343c (patch)
tree7a33a5d8ad3b252f629d52c5183eab90efb3438a
parent05f2c56b909a2cd05a200ad663001696e4a23261 (diff)
finish simple multithreaded verifier
-rw-r--r--.gitignore3
-rw-r--r--.gitmodules3
-rw-r--r--CMakeLists.txt7
m---------concurrentqueue0
-rw-r--r--include/hotstuff/consensus.h7
-rw-r--r--include/hotstuff/crypto.h36
-rw-r--r--include/hotstuff/entity.h20
-rw-r--r--include/hotstuff/hotstuff.h4
-rw-r--r--include/hotstuff/worker.h92
-rw-r--r--src/consensus.cpp10
-rw-r--r--src/crypto.cpp20
-rw-r--r--src/hotstuff.cpp28
-rw-r--r--test/CMakeLists.txt7
-rw-r--r--test/test_concurrent_queue.cpp68
14 files changed, 281 insertions, 24 deletions
diff --git a/.gitignore b/.gitignore
index 85d0a04..68646ab 100644
--- a/.gitignore
+++ b/.gitignore
@@ -20,5 +20,6 @@ include/hotstuff/config.h
*.gch
/Makefile
/test/Makefile
-test_secp256k1
+/test/test_secp256k1
+/test/test_concurrent_queue
core
diff --git a/.gitmodules b/.gitmodules
index 834ffa4..5674fe5 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -4,3 +4,6 @@
[submodule "salticidae"]
path = salticidae
url = https://github.com/Determinant/salticidae.git
+[submodule "concurrentqueue"]
+ path = concurrentqueue
+ url = https://github.com/cameron314/concurrentqueue.git
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 63e92f7..a443e8f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1,12 +1,14 @@
cmake_minimum_required(VERSION 3.9)
project(hotstuff)
set(CMAKE_CXX_STANDARD 17)
+set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/salticidae/cmake/Modules/")
add_subdirectory(salticidae)
include_directories(salticidae/include)
find_package(OpenSSL REQUIRED)
+find_package(Threads REQUIRED)
include(ExternalProject)
include_directories(secp256k1/include)
@@ -27,6 +29,7 @@ add_dependencies(secp256k1 libsecp256k1)
# add libraries
+include_directories(./)
include_directories(include)
add_library(hotstuff
OBJECT
@@ -43,11 +46,11 @@ if(BUILD_SHARED)
set_property(TARGET hotstuff PROPERTY POSITION_INDEPENDENT_CODE 1)
add_library(hotstuff_shared SHARED $<TARGET_OBJECTS:hotstuff>)
set_target_properties(hotstuff_shared PROPERTIES OUTPUT_NAME "hotstuff")
- target_link_libraries(hotstuff_shared salticidae_static secp256k1 crypto)
+ target_link_libraries(hotstuff_shared salticidae_static secp256k1 crypto ${CMAKE_THREAD_LIBS_INIT})
endif()
add_library(hotstuff_static STATIC $<TARGET_OBJECTS:hotstuff>)
set_target_properties(hotstuff_static PROPERTIES OUTPUT_NAME "hotstuff")
-target_link_libraries(hotstuff_static salticidae_static secp256k1 crypto)
+target_link_libraries(hotstuff_static salticidae_static secp256k1 crypto ${CMAKE_THREAD_LIBS_INIT})
add_subdirectory(test)
diff --git a/concurrentqueue b/concurrentqueue
new file mode 160000
+Subproject 8f65a8734d77c3cc00d74c0532efca872931d3c
diff --git a/include/hotstuff/consensus.h b/include/hotstuff/consensus.h
index 65ffff2..9e2558c 100644
--- a/include/hotstuff/consensus.h
+++ b/include/hotstuff/consensus.h
@@ -253,6 +253,13 @@ struct Vote: public Serializable {
cert->get_blk_hash() == blk_hash;
}
+ promise_t verify(VeriPool &vpool) const {
+ assert(hsc != nullptr);
+ return cert->verify(hsc->get_config().get_pubkey(voter), vpool).then([this](bool result) {
+ return result && cert->get_blk_hash() == blk_hash;
+ });
+ }
+
operator std::string () const {
DataStream s;
s << "<vote "
diff --git a/include/hotstuff/crypto.h b/include/hotstuff/crypto.h
index 40c9140..b79c433 100644
--- a/include/hotstuff/crypto.h
+++ b/include/hotstuff/crypto.h
@@ -6,6 +6,7 @@
#include "secp256k1.h"
#include "salticidae/crypto.h"
#include "hotstuff/type.h"
+#include "hotstuff/worker.h"
namespace hotstuff {
@@ -31,6 +32,7 @@ using privkey_bt = BoxObj<PrivKey>;
class PartCert: public Serializable, public Cloneable {
public:
virtual ~PartCert() = default;
+ virtual promise_t verify(const PubKey &pubkey, VeriPool &vpool) = 0;
virtual bool verify(const PubKey &pubkey) = 0;
virtual const uint256_t &get_blk_hash() const = 0;
virtual PartCert *clone() override = 0;
@@ -43,6 +45,7 @@ class QuorumCert: public Serializable, public Cloneable {
virtual ~QuorumCert() = default;
virtual void add_part(ReplicaID replica, const PartCert &pc) = 0;
virtual void compute() = 0;
+ virtual promise_t verify(const ReplicaConfig &config, VeriPool &vpool) = 0;
virtual bool verify(const ReplicaConfig &config) = 0;
virtual const uint256_t &get_blk_hash() const = 0;
virtual QuorumCert *clone() override = 0;
@@ -85,6 +88,9 @@ class PartCertDummy: public PartCert {
}
bool verify(const PubKey &) override { return true; }
+ promise_t verify(const PubKey &, VeriPool &) override {
+ return promise_t([](promise_t &pm){ pm.resolve(true); });
+ }
const uint256_t &get_blk_hash() const override { return blk_hash; }
};
@@ -112,6 +118,9 @@ class QuorumCertDummy: public QuorumCert {
void add_part(ReplicaID, const PartCert &) override {}
void compute() override {}
bool verify(const ReplicaConfig &) override { return true; }
+ promise_t verify(const ReplicaConfig &, VeriPool &) override {
+ return promise_t([](promise_t &pm) { pm.resolve(true); });
+ }
const uint256_t &get_blk_hash() const override { return blk_hash; }
};
@@ -243,7 +252,7 @@ class SigSecp256k1: public Serializable {
secp256k1_ecdsa_signature data;
secp256k1_context_t ctx;
- void check_msg_length(const bytearray_t &msg) {
+ static void check_msg_length(const bytearray_t &msg) {
if (msg.size() != 32)
throw std::invalid_argument("the message should be 32-bytes");
}
@@ -291,7 +300,7 @@ class SigSecp256k1: public Serializable {
}
bool verify(const bytearray_t &msg, const PubKeySecp256k1 &pub_key,
- const secp256k1_context_t &_ctx) {
+ const secp256k1_context_t &_ctx) const {
check_msg_length(msg);
return secp256k1_ecdsa_verify(
_ctx->ctx, &data,
@@ -304,6 +313,22 @@ class SigSecp256k1: public Serializable {
}
};
+class Secp256k1VeriTask: public VeriTask {
+ uint256_t msg;
+ PubKeySecp256k1 pubkey;
+ SigSecp256k1 sig;
+ public:
+ Secp256k1VeriTask(const uint256_t &msg,
+ const PubKeySecp256k1 &pubkey,
+ const SigSecp256k1 &sig):
+ msg(msg), pubkey(pubkey), sig(sig) {}
+ virtual ~Secp256k1VeriTask() = default;
+
+ bool verify() override {
+ return sig.verify(msg, pubkey, secp256k1_default_verify_ctx);
+ }
+};
+
class PartCertSecp256k1: public SigSecp256k1, public PartCert {
uint256_t blk_hash;
@@ -320,6 +345,12 @@ class PartCertSecp256k1: public SigSecp256k1, public PartCert {
secp256k1_default_verify_ctx);
}
+ promise_t verify(const PubKey &pub_key, VeriPool &vpool) override {
+ return vpool.verify(new Secp256k1VeriTask(blk_hash,
+ static_cast<const PubKeySecp256k1 &>(pub_key),
+ static_cast<const SigSecp256k1 &>(*this)));
+ }
+
const uint256_t &get_blk_hash() const override { return blk_hash; }
PartCertSecp256k1 *clone() override {
@@ -357,6 +388,7 @@ class QuorumCertSecp256k1: public QuorumCert {
void compute() override {}
bool verify(const ReplicaConfig &config) override;
+ promise_t verify(const ReplicaConfig &config, VeriPool &vpool) override;
const uint256_t &get_blk_hash() const override { return blk_hash; }
diff --git a/include/hotstuff/entity.h b/include/hotstuff/entity.h
index 6f73db8..6327dfe 100644
--- a/include/hotstuff/entity.h
+++ b/include/hotstuff/entity.h
@@ -179,6 +179,16 @@ class Block {
return true;
}
+ promise_t verify(const ReplicaConfig &config, VeriPool &vpool) const {
+ return (qc ? qc->verify(config, vpool) :
+ promise_t([](promise_t &pm) { pm.resolve(true); })).then([this](bool result) {
+ if (!result) return false;
+ for (auto cmd: cmds)
+ if (!cmd->verify()) return false;
+ return true;
+ });
+ }
+
int8_t get_decision() const { return decision; }
bool is_delivered() const { return delivered; }
@@ -223,11 +233,11 @@ class EntityStorage {
}
block_t add_blk(Block &&_blk, const ReplicaConfig &config) {
- if (!_blk.verify(config))
- {
- HOTSTUFF_LOG_WARN("invalid %s", std::string(_blk).c_str());
- return nullptr;
- }
+ //if (!_blk.verify(config))
+ //{
+ // HOTSTUFF_LOG_WARN("invalid %s", std::string(_blk).c_str());
+ // return nullptr;
+ //}
block_t blk = new Block(std::move(_blk));
return blk_cache.insert(std::make_pair(blk->get_hash(), blk)).first->second;
}
diff --git a/include/hotstuff/hotstuff.h b/include/hotstuff/hotstuff.h
index f9aad3d..983a7b3 100644
--- a/include/hotstuff/hotstuff.h
+++ b/include/hotstuff/hotstuff.h
@@ -121,6 +121,7 @@ class HotStuffBase: public HotStuffCore {
size_t blk_size;
/** libevent handle */
EventContext eb;
+ VeriPool vpool;
private:
/** whether libevent handle is owned by itself */
@@ -183,7 +184,8 @@ class HotStuffBase: public HotStuffCore {
privkey_bt &&priv_key,
NetAddr listen_addr,
pacemaker_bt pmaker,
- EventContext eb);
+ EventContext eb,
+ size_t nworker = 4);
~HotStuffBase();
diff --git a/include/hotstuff/worker.h b/include/hotstuff/worker.h
new file mode 100644
index 0000000..229b1bf
--- /dev/null
+++ b/include/hotstuff/worker.h
@@ -0,0 +1,92 @@
+#ifndef _HOTSTUFF_WORKER_H
+#define _HOTSTUFF_WORKER_H
+
+#include <thread>
+#include <unordered_map>
+#include <unistd.h>
+#include "concurrentqueue/blockingconcurrentqueue.h"
+
+namespace hotstuff {
+
+class VeriTask {
+ friend class VeriPool;
+ bool result;
+ public:
+ virtual bool verify() = 0;
+ virtual ~VeriTask() = default;
+};
+
+using veritask_ut = BoxObj<VeriTask>;
+
+class VeriPool {
+ using queue_t = moodycamel::BlockingConcurrentQueue<VeriTask *>;
+ int fin_fd[2];
+ Event fin_ev;
+ queue_t in_queue;
+ queue_t out_queue;
+ std::thread notifier;
+ std::vector<std::thread> workers;
+ std::unordered_map<VeriTask *, std::pair<veritask_ut, promise_t>> pms;
+ public:
+ VeriPool(EventContext ec, size_t nworker) {
+ pipe(fin_fd);
+ fin_ev = Event(ec, fin_fd[0], EV_READ, [&](int fd, short) {
+ VeriTask *task;
+ bool result;
+ read(fd, &task, sizeof(VeriTask *));
+ read(fd, &result, sizeof(bool));
+ auto it = pms.find(task);
+ it->second.second.resolve(result);
+ pms.erase(it);
+ fin_ev.add();
+ });
+ fin_ev.add();
+ // finish notifier thread
+ notifier = std::thread([this]() {
+ while (true)
+ {
+ VeriTask *task;
+ out_queue.wait_dequeue(task);
+ write(fin_fd[1], &task, sizeof(VeriTask *));
+ write(fin_fd[1], &(task->result), sizeof(bool));
+ }
+ });
+ for (size_t i = 0; i < nworker; i++)
+ {
+ workers.push_back(std::thread([this]() {
+ while (true)
+ {
+ VeriTask *task;
+ in_queue.wait_dequeue(task);
+ //fprintf(stderr, "%lu working on %u\n", std::this_thread::get_id(), (uintptr_t)task);
+ task->result = task->verify();
+ out_queue.enqueue(task);
+ }
+ }));
+ }
+ }
+
+ ~VeriPool() {
+ notifier.detach();
+ for (auto &w: workers) w.detach();
+ close(fin_fd[0]);
+ close(fin_fd[1]);
+ }
+
+ promise_t verify(veritask_ut &&task) {
+ auto ptr = task.get();
+ auto ret = pms.insert(std::make_pair(ptr,
+ std::make_pair(std::move(task), promise_t([](promise_t &){}))));
+ assert(ret.second);
+ in_queue.enqueue(ptr);
+ return ret.first->second.second;
+ }
+
+ int get_fd() {
+ return fin_fd[0];
+ }
+};
+
+}
+
+#endif
diff --git a/src/consensus.cpp b/src/consensus.cpp
index d40f488..c80de59 100644
--- a/src/consensus.cpp
+++ b/src/consensus.cpp
@@ -196,11 +196,11 @@ void HotStuffCore::on_receive_vote(const Vote &vote) {
block_t blk = get_delivered_blk(vote.blk_hash);
if (vote.cert == nullptr) return;
/* otherwise the vote is positive */
- if (!vote.verify())
- {
- LOG_WARN("invalid vote from %d", vote.voter);
- return;
- }
+ //if (!vote.verify())
+ //{
+ // LOG_WARN("invalid vote from %d", vote.voter);
+ // return;
+ //}
if (!blk->voted.insert(vote.voter).second)
{
LOG_WARN("duplicate vote from %d", vote.voter);
diff --git a/src/crypto.cpp b/src/crypto.cpp
index 88c2f57..0932652 100644
--- a/src/crypto.cpp
+++ b/src/crypto.cpp
@@ -27,4 +27,24 @@ bool QuorumCertSecp256k1::verify(const ReplicaConfig &config) {
return true;
}
+promise_t QuorumCertSecp256k1::verify(const ReplicaConfig &config, VeriPool &vpool) {
+ if (sigs.size() < config.nmajority)
+ return promise_t([](promise_t &pm) { pm.resolve(false); });
+ std::vector<promise_t> vpm;
+ for (size_t i = 0; i < rids.size(); i++)
+ if (rids.get(i))
+ {
+ HOTSTUFF_LOG_DEBUG("checking cert(%d), blk_hash=%s",
+ i, get_hex10(blk_hash).c_str());
+ vpm.push_back(vpool.verify(new Secp256k1VeriTask(blk_hash,
+ static_cast<const PubKeySecp256k1 &>(config.get_pubkey(i)),
+ sigs[i])));
+ }
+ return promise::all(vpm).then([](const promise::values_t &values) {
+ for (const auto &v: values)
+ if (!promise::any_cast<bool>(v)) return false;
+ return true;
+ });
+}
+
}
diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp
index e235bd8..98c2237 100644
--- a/src/hotstuff.cpp
+++ b/src/hotstuff.cpp
@@ -243,6 +243,7 @@ promise_t HotStuffBase::async_deliver_blk(const uint256_t &blk_hash,
/* the parents should be delivered */
for (const auto &phash: blk->get_parent_hashes())
pms.push_back(async_deliver_blk(phash, replica_id));
+ pms.push_back(blk->verify(get_config(), vpool));
promise::all(pms).then([this, blk]() {
on_deliver_blk(blk);
});
@@ -267,12 +268,20 @@ void HotStuffBase::propose_handler(MsgPropose &&msg, Conn &conn) {
void HotStuffBase::vote_handler(MsgVote &&msg, Conn &conn) {
const NetAddr &peer = conn.get_peer();
msg.postponed_parse(this);
- auto &vote = msg.vote;
+ //auto &vote = msg.vote;
+ RcObj<Vote> v(new Vote(std::move(msg.vote)));
promise::all(std::vector<promise_t>{
- async_deliver_blk(vote.bqc_hash, peer),
- async_deliver_blk(vote.blk_hash, peer)
- }).then([this, vote = std::move(vote)]() {
- on_receive_vote(vote);
+ async_deliver_blk(v->bqc_hash, peer),
+ async_deliver_blk(v->blk_hash, peer)
+ }).then([this, v=std::move(v)]() {
+ //bool result = vote->verify();
+ auto pm = v->verify(vpool);
+ pm.then([this, v=std::move(v)](bool result) {
+ if (!result)
+ LOG_WARN("invalid vote from %d", v->voter);
+ else
+ on_receive_vote(*v);
+ });
});
}
@@ -383,11 +392,13 @@ HotStuffBase::HotStuffBase(uint32_t blk_size,
privkey_bt &&priv_key,
NetAddr listen_addr,
pacemaker_bt pmaker,
- EventContext eb):
+ EventContext eb,
+ size_t nworker):
HotStuffCore(rid, std::move(priv_key)),
listen_addr(listen_addr),
blk_size(blk_size),
eb(eb),
+ vpool(eb, nworker),
pn(eb),
pmaker(std::move(pmaker)),
@@ -420,7 +431,10 @@ void HotStuffBase::do_vote(ReplicaID last_proposer, const Vote &vote) {
pmaker->beat_resp(last_proposer)
.then([this, vote](ReplicaID proposer) {
if (proposer == get_id())
- on_receive_vote(vote);
+ {
+ throw HotStuffError("unreachable line");
+ //on_receive_vote(vote);
+ }
else
pn.send_msg(MsgVote(vote), get_config().get_addr(proposer));
});
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 9e44cbe..e4c7a1b 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -1,4 +1,9 @@
include_directories(../src/
- ../salticidae/include/)
+ ../salticidae/include/
+ ../)
+
add_executable(test_secp256k1 test_secp256k1.cpp)
target_link_libraries(test_secp256k1 hotstuff_static)
+
+add_executable(test_concurrent_queue test_concurrent_queue.cpp)
+target_link_libraries(test_concurrent_queue salticidae_static pthread)
diff --git a/test/test_concurrent_queue.cpp b/test/test_concurrent_queue.cpp
new file mode 100644
index 0000000..7412213
--- /dev/null
+++ b/test/test_concurrent_queue.cpp
@@ -0,0 +1,68 @@
+#include "salticidae/event.h"
+#include "concurrentqueue/blockingconcurrentqueue.h"
+#include <thread>
+#include <unistd.h>
+
+class VeriPool {
+ int fin_fd[2];
+ moodycamel::BlockingConcurrentQueue<int> in_queue;
+ moodycamel::BlockingConcurrentQueue<int> out_queue;
+ std::thread notifier;
+ std::vector<std::thread> workers;
+ public:
+ VeriPool(size_t nworker) {
+ pipe(fin_fd);
+ // finish notifier thread
+ notifier = std::thread([this]() {
+ while (true)
+ {
+ int item;
+ out_queue.wait_dequeue(item);
+ write(fin_fd[1], &item, sizeof(item));
+ }
+ });
+ for (size_t i = 0; i < nworker; i++)
+ {
+ workers.push_back(std::thread([this]() {
+ while (true)
+ {
+ int item;
+ in_queue.wait_dequeue(item);
+ fprintf(stderr, "%lu working on %d\n", std::this_thread::get_id(), item);
+ out_queue.enqueue(item * 1000);
+ }
+ }));
+ }
+ }
+
+ ~VeriPool() {
+ notifier.detach();
+ for (auto &w: workers) w.detach();
+ close(fin_fd[0]);
+ close(fin_fd[1]);
+ }
+
+ void submit(int item) {
+ in_queue.enqueue(item);
+ }
+
+ int get_fd() {
+ return fin_fd[0];
+ }
+};
+
+int main() {
+ VeriPool p(2);
+ salticidae::EventContext ec;
+ salticidae::Event ev;
+ ev = salticidae::Event(ec, p.get_fd(), EV_READ, [&ev](int fd, short) {
+ int item;
+ read(fd, &item, sizeof(item));
+ printf("finished %d\n", item);
+ ev.add();
+ });
+ for (int i = 0; i < 10000; i++)
+ p.submit(i);
+ ev.add();
+ ec.dispatch();
+}