diff options
author | Determinant <[email protected]> | 2018-09-10 16:06:27 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2018-09-10 16:06:27 -0400 |
commit | 6261c95184b86c43755071b351e6928f89e2343c (patch) | |
tree | 7a33a5d8ad3b252f629d52c5183eab90efb3438a | |
parent | 05f2c56b909a2cd05a200ad663001696e4a23261 (diff) |
finish simple multithreaded verifier
-rw-r--r-- | .gitignore | 3 | ||||
-rw-r--r-- | .gitmodules | 3 | ||||
-rw-r--r-- | CMakeLists.txt | 7 | ||||
m--------- | concurrentqueue | 0 | ||||
-rw-r--r-- | include/hotstuff/consensus.h | 7 | ||||
-rw-r--r-- | include/hotstuff/crypto.h | 36 | ||||
-rw-r--r-- | include/hotstuff/entity.h | 20 | ||||
-rw-r--r-- | include/hotstuff/hotstuff.h | 4 | ||||
-rw-r--r-- | include/hotstuff/worker.h | 92 | ||||
-rw-r--r-- | src/consensus.cpp | 10 | ||||
-rw-r--r-- | src/crypto.cpp | 20 | ||||
-rw-r--r-- | src/hotstuff.cpp | 28 | ||||
-rw-r--r-- | test/CMakeLists.txt | 7 | ||||
-rw-r--r-- | test/test_concurrent_queue.cpp | 68 |
14 files changed, 281 insertions, 24 deletions
@@ -20,5 +20,6 @@ include/hotstuff/config.h *.gch /Makefile /test/Makefile -test_secp256k1 +/test/test_secp256k1 +/test/test_concurrent_queue core diff --git a/.gitmodules b/.gitmodules index 834ffa4..5674fe5 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,3 +4,6 @@ [submodule "salticidae"] path = salticidae url = https://github.com/Determinant/salticidae.git +[submodule "concurrentqueue"] + path = concurrentqueue + url = https://github.com/cameron314/concurrentqueue.git diff --git a/CMakeLists.txt b/CMakeLists.txt index 63e92f7..a443e8f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,12 +1,14 @@ cmake_minimum_required(VERSION 3.9) project(hotstuff) set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/salticidae/cmake/Modules/") add_subdirectory(salticidae) include_directories(salticidae/include) find_package(OpenSSL REQUIRED) +find_package(Threads REQUIRED) include(ExternalProject) include_directories(secp256k1/include) @@ -27,6 +29,7 @@ add_dependencies(secp256k1 libsecp256k1) # add libraries +include_directories(./) include_directories(include) add_library(hotstuff OBJECT @@ -43,11 +46,11 @@ if(BUILD_SHARED) set_property(TARGET hotstuff PROPERTY POSITION_INDEPENDENT_CODE 1) add_library(hotstuff_shared SHARED $<TARGET_OBJECTS:hotstuff>) set_target_properties(hotstuff_shared PROPERTIES OUTPUT_NAME "hotstuff") - target_link_libraries(hotstuff_shared salticidae_static secp256k1 crypto) + target_link_libraries(hotstuff_shared salticidae_static secp256k1 crypto ${CMAKE_THREAD_LIBS_INIT}) endif() add_library(hotstuff_static STATIC $<TARGET_OBJECTS:hotstuff>) set_target_properties(hotstuff_static PROPERTIES OUTPUT_NAME "hotstuff") -target_link_libraries(hotstuff_static salticidae_static secp256k1 crypto) +target_link_libraries(hotstuff_static salticidae_static secp256k1 crypto ${CMAKE_THREAD_LIBS_INIT}) add_subdirectory(test) diff --git a/concurrentqueue b/concurrentqueue new file mode 160000 +Subproject 8f65a8734d77c3cc00d74c0532efca872931d3c diff --git a/include/hotstuff/consensus.h b/include/hotstuff/consensus.h index 65ffff2..9e2558c 100644 --- a/include/hotstuff/consensus.h +++ b/include/hotstuff/consensus.h @@ -253,6 +253,13 @@ struct Vote: public Serializable { cert->get_blk_hash() == blk_hash; } + promise_t verify(VeriPool &vpool) const { + assert(hsc != nullptr); + return cert->verify(hsc->get_config().get_pubkey(voter), vpool).then([this](bool result) { + return result && cert->get_blk_hash() == blk_hash; + }); + } + operator std::string () const { DataStream s; s << "<vote " diff --git a/include/hotstuff/crypto.h b/include/hotstuff/crypto.h index 40c9140..b79c433 100644 --- a/include/hotstuff/crypto.h +++ b/include/hotstuff/crypto.h @@ -6,6 +6,7 @@ #include "secp256k1.h" #include "salticidae/crypto.h" #include "hotstuff/type.h" +#include "hotstuff/worker.h" namespace hotstuff { @@ -31,6 +32,7 @@ using privkey_bt = BoxObj<PrivKey>; class PartCert: public Serializable, public Cloneable { public: virtual ~PartCert() = default; + virtual promise_t verify(const PubKey &pubkey, VeriPool &vpool) = 0; virtual bool verify(const PubKey &pubkey) = 0; virtual const uint256_t &get_blk_hash() const = 0; virtual PartCert *clone() override = 0; @@ -43,6 +45,7 @@ class QuorumCert: public Serializable, public Cloneable { virtual ~QuorumCert() = default; virtual void add_part(ReplicaID replica, const PartCert &pc) = 0; virtual void compute() = 0; + virtual promise_t verify(const ReplicaConfig &config, VeriPool &vpool) = 0; virtual bool verify(const ReplicaConfig &config) = 0; virtual const uint256_t &get_blk_hash() const = 0; virtual QuorumCert *clone() override = 0; @@ -85,6 +88,9 @@ class PartCertDummy: public PartCert { } bool verify(const PubKey &) override { return true; } + promise_t verify(const PubKey &, VeriPool &) override { + return promise_t([](promise_t &pm){ pm.resolve(true); }); + } const uint256_t &get_blk_hash() const override { return blk_hash; } }; @@ -112,6 +118,9 @@ class QuorumCertDummy: public QuorumCert { void add_part(ReplicaID, const PartCert &) override {} void compute() override {} bool verify(const ReplicaConfig &) override { return true; } + promise_t verify(const ReplicaConfig &, VeriPool &) override { + return promise_t([](promise_t &pm) { pm.resolve(true); }); + } const uint256_t &get_blk_hash() const override { return blk_hash; } }; @@ -243,7 +252,7 @@ class SigSecp256k1: public Serializable { secp256k1_ecdsa_signature data; secp256k1_context_t ctx; - void check_msg_length(const bytearray_t &msg) { + static void check_msg_length(const bytearray_t &msg) { if (msg.size() != 32) throw std::invalid_argument("the message should be 32-bytes"); } @@ -291,7 +300,7 @@ class SigSecp256k1: public Serializable { } bool verify(const bytearray_t &msg, const PubKeySecp256k1 &pub_key, - const secp256k1_context_t &_ctx) { + const secp256k1_context_t &_ctx) const { check_msg_length(msg); return secp256k1_ecdsa_verify( _ctx->ctx, &data, @@ -304,6 +313,22 @@ class SigSecp256k1: public Serializable { } }; +class Secp256k1VeriTask: public VeriTask { + uint256_t msg; + PubKeySecp256k1 pubkey; + SigSecp256k1 sig; + public: + Secp256k1VeriTask(const uint256_t &msg, + const PubKeySecp256k1 &pubkey, + const SigSecp256k1 &sig): + msg(msg), pubkey(pubkey), sig(sig) {} + virtual ~Secp256k1VeriTask() = default; + + bool verify() override { + return sig.verify(msg, pubkey, secp256k1_default_verify_ctx); + } +}; + class PartCertSecp256k1: public SigSecp256k1, public PartCert { uint256_t blk_hash; @@ -320,6 +345,12 @@ class PartCertSecp256k1: public SigSecp256k1, public PartCert { secp256k1_default_verify_ctx); } + promise_t verify(const PubKey &pub_key, VeriPool &vpool) override { + return vpool.verify(new Secp256k1VeriTask(blk_hash, + static_cast<const PubKeySecp256k1 &>(pub_key), + static_cast<const SigSecp256k1 &>(*this))); + } + const uint256_t &get_blk_hash() const override { return blk_hash; } PartCertSecp256k1 *clone() override { @@ -357,6 +388,7 @@ class QuorumCertSecp256k1: public QuorumCert { void compute() override {} bool verify(const ReplicaConfig &config) override; + promise_t verify(const ReplicaConfig &config, VeriPool &vpool) override; const uint256_t &get_blk_hash() const override { return blk_hash; } diff --git a/include/hotstuff/entity.h b/include/hotstuff/entity.h index 6f73db8..6327dfe 100644 --- a/include/hotstuff/entity.h +++ b/include/hotstuff/entity.h @@ -179,6 +179,16 @@ class Block { return true; } + promise_t verify(const ReplicaConfig &config, VeriPool &vpool) const { + return (qc ? qc->verify(config, vpool) : + promise_t([](promise_t &pm) { pm.resolve(true); })).then([this](bool result) { + if (!result) return false; + for (auto cmd: cmds) + if (!cmd->verify()) return false; + return true; + }); + } + int8_t get_decision() const { return decision; } bool is_delivered() const { return delivered; } @@ -223,11 +233,11 @@ class EntityStorage { } block_t add_blk(Block &&_blk, const ReplicaConfig &config) { - if (!_blk.verify(config)) - { - HOTSTUFF_LOG_WARN("invalid %s", std::string(_blk).c_str()); - return nullptr; - } + //if (!_blk.verify(config)) + //{ + // HOTSTUFF_LOG_WARN("invalid %s", std::string(_blk).c_str()); + // return nullptr; + //} block_t blk = new Block(std::move(_blk)); return blk_cache.insert(std::make_pair(blk->get_hash(), blk)).first->second; } diff --git a/include/hotstuff/hotstuff.h b/include/hotstuff/hotstuff.h index f9aad3d..983a7b3 100644 --- a/include/hotstuff/hotstuff.h +++ b/include/hotstuff/hotstuff.h @@ -121,6 +121,7 @@ class HotStuffBase: public HotStuffCore { size_t blk_size; /** libevent handle */ EventContext eb; + VeriPool vpool; private: /** whether libevent handle is owned by itself */ @@ -183,7 +184,8 @@ class HotStuffBase: public HotStuffCore { privkey_bt &&priv_key, NetAddr listen_addr, pacemaker_bt pmaker, - EventContext eb); + EventContext eb, + size_t nworker = 4); ~HotStuffBase(); diff --git a/include/hotstuff/worker.h b/include/hotstuff/worker.h new file mode 100644 index 0000000..229b1bf --- /dev/null +++ b/include/hotstuff/worker.h @@ -0,0 +1,92 @@ +#ifndef _HOTSTUFF_WORKER_H +#define _HOTSTUFF_WORKER_H + +#include <thread> +#include <unordered_map> +#include <unistd.h> +#include "concurrentqueue/blockingconcurrentqueue.h" + +namespace hotstuff { + +class VeriTask { + friend class VeriPool; + bool result; + public: + virtual bool verify() = 0; + virtual ~VeriTask() = default; +}; + +using veritask_ut = BoxObj<VeriTask>; + +class VeriPool { + using queue_t = moodycamel::BlockingConcurrentQueue<VeriTask *>; + int fin_fd[2]; + Event fin_ev; + queue_t in_queue; + queue_t out_queue; + std::thread notifier; + std::vector<std::thread> workers; + std::unordered_map<VeriTask *, std::pair<veritask_ut, promise_t>> pms; + public: + VeriPool(EventContext ec, size_t nworker) { + pipe(fin_fd); + fin_ev = Event(ec, fin_fd[0], EV_READ, [&](int fd, short) { + VeriTask *task; + bool result; + read(fd, &task, sizeof(VeriTask *)); + read(fd, &result, sizeof(bool)); + auto it = pms.find(task); + it->second.second.resolve(result); + pms.erase(it); + fin_ev.add(); + }); + fin_ev.add(); + // finish notifier thread + notifier = std::thread([this]() { + while (true) + { + VeriTask *task; + out_queue.wait_dequeue(task); + write(fin_fd[1], &task, sizeof(VeriTask *)); + write(fin_fd[1], &(task->result), sizeof(bool)); + } + }); + for (size_t i = 0; i < nworker; i++) + { + workers.push_back(std::thread([this]() { + while (true) + { + VeriTask *task; + in_queue.wait_dequeue(task); + //fprintf(stderr, "%lu working on %u\n", std::this_thread::get_id(), (uintptr_t)task); + task->result = task->verify(); + out_queue.enqueue(task); + } + })); + } + } + + ~VeriPool() { + notifier.detach(); + for (auto &w: workers) w.detach(); + close(fin_fd[0]); + close(fin_fd[1]); + } + + promise_t verify(veritask_ut &&task) { + auto ptr = task.get(); + auto ret = pms.insert(std::make_pair(ptr, + std::make_pair(std::move(task), promise_t([](promise_t &){})))); + assert(ret.second); + in_queue.enqueue(ptr); + return ret.first->second.second; + } + + int get_fd() { + return fin_fd[0]; + } +}; + +} + +#endif diff --git a/src/consensus.cpp b/src/consensus.cpp index d40f488..c80de59 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -196,11 +196,11 @@ void HotStuffCore::on_receive_vote(const Vote &vote) { block_t blk = get_delivered_blk(vote.blk_hash); if (vote.cert == nullptr) return; /* otherwise the vote is positive */ - if (!vote.verify()) - { - LOG_WARN("invalid vote from %d", vote.voter); - return; - } + //if (!vote.verify()) + //{ + // LOG_WARN("invalid vote from %d", vote.voter); + // return; + //} if (!blk->voted.insert(vote.voter).second) { LOG_WARN("duplicate vote from %d", vote.voter); diff --git a/src/crypto.cpp b/src/crypto.cpp index 88c2f57..0932652 100644 --- a/src/crypto.cpp +++ b/src/crypto.cpp @@ -27,4 +27,24 @@ bool QuorumCertSecp256k1::verify(const ReplicaConfig &config) { return true; } +promise_t QuorumCertSecp256k1::verify(const ReplicaConfig &config, VeriPool &vpool) { + if (sigs.size() < config.nmajority) + return promise_t([](promise_t &pm) { pm.resolve(false); }); + std::vector<promise_t> vpm; + for (size_t i = 0; i < rids.size(); i++) + if (rids.get(i)) + { + HOTSTUFF_LOG_DEBUG("checking cert(%d), blk_hash=%s", + i, get_hex10(blk_hash).c_str()); + vpm.push_back(vpool.verify(new Secp256k1VeriTask(blk_hash, + static_cast<const PubKeySecp256k1 &>(config.get_pubkey(i)), + sigs[i]))); + } + return promise::all(vpm).then([](const promise::values_t &values) { + for (const auto &v: values) + if (!promise::any_cast<bool>(v)) return false; + return true; + }); +} + } diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp index e235bd8..98c2237 100644 --- a/src/hotstuff.cpp +++ b/src/hotstuff.cpp @@ -243,6 +243,7 @@ promise_t HotStuffBase::async_deliver_blk(const uint256_t &blk_hash, /* the parents should be delivered */ for (const auto &phash: blk->get_parent_hashes()) pms.push_back(async_deliver_blk(phash, replica_id)); + pms.push_back(blk->verify(get_config(), vpool)); promise::all(pms).then([this, blk]() { on_deliver_blk(blk); }); @@ -267,12 +268,20 @@ void HotStuffBase::propose_handler(MsgPropose &&msg, Conn &conn) { void HotStuffBase::vote_handler(MsgVote &&msg, Conn &conn) { const NetAddr &peer = conn.get_peer(); msg.postponed_parse(this); - auto &vote = msg.vote; + //auto &vote = msg.vote; + RcObj<Vote> v(new Vote(std::move(msg.vote))); promise::all(std::vector<promise_t>{ - async_deliver_blk(vote.bqc_hash, peer), - async_deliver_blk(vote.blk_hash, peer) - }).then([this, vote = std::move(vote)]() { - on_receive_vote(vote); + async_deliver_blk(v->bqc_hash, peer), + async_deliver_blk(v->blk_hash, peer) + }).then([this, v=std::move(v)]() { + //bool result = vote->verify(); + auto pm = v->verify(vpool); + pm.then([this, v=std::move(v)](bool result) { + if (!result) + LOG_WARN("invalid vote from %d", v->voter); + else + on_receive_vote(*v); + }); }); } @@ -383,11 +392,13 @@ HotStuffBase::HotStuffBase(uint32_t blk_size, privkey_bt &&priv_key, NetAddr listen_addr, pacemaker_bt pmaker, - EventContext eb): + EventContext eb, + size_t nworker): HotStuffCore(rid, std::move(priv_key)), listen_addr(listen_addr), blk_size(blk_size), eb(eb), + vpool(eb, nworker), pn(eb), pmaker(std::move(pmaker)), @@ -420,7 +431,10 @@ void HotStuffBase::do_vote(ReplicaID last_proposer, const Vote &vote) { pmaker->beat_resp(last_proposer) .then([this, vote](ReplicaID proposer) { if (proposer == get_id()) - on_receive_vote(vote); + { + throw HotStuffError("unreachable line"); + //on_receive_vote(vote); + } else pn.send_msg(MsgVote(vote), get_config().get_addr(proposer)); }); diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 9e44cbe..e4c7a1b 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,4 +1,9 @@ include_directories(../src/ - ../salticidae/include/) + ../salticidae/include/ + ../) + add_executable(test_secp256k1 test_secp256k1.cpp) target_link_libraries(test_secp256k1 hotstuff_static) + +add_executable(test_concurrent_queue test_concurrent_queue.cpp) +target_link_libraries(test_concurrent_queue salticidae_static pthread) diff --git a/test/test_concurrent_queue.cpp b/test/test_concurrent_queue.cpp new file mode 100644 index 0000000..7412213 --- /dev/null +++ b/test/test_concurrent_queue.cpp @@ -0,0 +1,68 @@ +#include "salticidae/event.h" +#include "concurrentqueue/blockingconcurrentqueue.h" +#include <thread> +#include <unistd.h> + +class VeriPool { + int fin_fd[2]; + moodycamel::BlockingConcurrentQueue<int> in_queue; + moodycamel::BlockingConcurrentQueue<int> out_queue; + std::thread notifier; + std::vector<std::thread> workers; + public: + VeriPool(size_t nworker) { + pipe(fin_fd); + // finish notifier thread + notifier = std::thread([this]() { + while (true) + { + int item; + out_queue.wait_dequeue(item); + write(fin_fd[1], &item, sizeof(item)); + } + }); + for (size_t i = 0; i < nworker; i++) + { + workers.push_back(std::thread([this]() { + while (true) + { + int item; + in_queue.wait_dequeue(item); + fprintf(stderr, "%lu working on %d\n", std::this_thread::get_id(), item); + out_queue.enqueue(item * 1000); + } + })); + } + } + + ~VeriPool() { + notifier.detach(); + for (auto &w: workers) w.detach(); + close(fin_fd[0]); + close(fin_fd[1]); + } + + void submit(int item) { + in_queue.enqueue(item); + } + + int get_fd() { + return fin_fd[0]; + } +}; + +int main() { + VeriPool p(2); + salticidae::EventContext ec; + salticidae::Event ev; + ev = salticidae::Event(ec, p.get_fd(), EV_READ, [&ev](int fd, short) { + int item; + read(fd, &item, sizeof(item)); + printf("finished %d\n", item); + ev.add(); + }); + for (int i = 0; i < 10000; i++) + p.submit(i); + ev.add(); + ec.dispatch(); +} |