From 6261c95184b86c43755071b351e6928f89e2343c Mon Sep 17 00:00:00 2001 From: Determinant Date: Mon, 10 Sep 2018 16:06:27 -0400 Subject: finish simple multithreaded verifier --- src/consensus.cpp | 10 +++++----- src/crypto.cpp | 20 ++++++++++++++++++++ src/hotstuff.cpp | 28 +++++++++++++++++++++------- 3 files changed, 46 insertions(+), 12 deletions(-) (limited to 'src') diff --git a/src/consensus.cpp b/src/consensus.cpp index d40f488..c80de59 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -196,11 +196,11 @@ void HotStuffCore::on_receive_vote(const Vote &vote) { block_t blk = get_delivered_blk(vote.blk_hash); if (vote.cert == nullptr) return; /* otherwise the vote is positive */ - if (!vote.verify()) - { - LOG_WARN("invalid vote from %d", vote.voter); - return; - } + //if (!vote.verify()) + //{ + // LOG_WARN("invalid vote from %d", vote.voter); + // return; + //} if (!blk->voted.insert(vote.voter).second) { LOG_WARN("duplicate vote from %d", vote.voter); diff --git a/src/crypto.cpp b/src/crypto.cpp index 88c2f57..0932652 100644 --- a/src/crypto.cpp +++ b/src/crypto.cpp @@ -27,4 +27,24 @@ bool QuorumCertSecp256k1::verify(const ReplicaConfig &config) { return true; } +promise_t QuorumCertSecp256k1::verify(const ReplicaConfig &config, VeriPool &vpool) { + if (sigs.size() < config.nmajority) + return promise_t([](promise_t &pm) { pm.resolve(false); }); + std::vector vpm; + for (size_t i = 0; i < rids.size(); i++) + if (rids.get(i)) + { + HOTSTUFF_LOG_DEBUG("checking cert(%d), blk_hash=%s", + i, get_hex10(blk_hash).c_str()); + vpm.push_back(vpool.verify(new Secp256k1VeriTask(blk_hash, + static_cast(config.get_pubkey(i)), + sigs[i]))); + } + return promise::all(vpm).then([](const promise::values_t &values) { + for (const auto &v: values) + if (!promise::any_cast(v)) return false; + return true; + }); +} + } diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp index e235bd8..98c2237 100644 --- a/src/hotstuff.cpp +++ b/src/hotstuff.cpp @@ -243,6 +243,7 @@ promise_t HotStuffBase::async_deliver_blk(const uint256_t &blk_hash, /* the parents should be delivered */ for (const auto &phash: blk->get_parent_hashes()) pms.push_back(async_deliver_blk(phash, replica_id)); + pms.push_back(blk->verify(get_config(), vpool)); promise::all(pms).then([this, blk]() { on_deliver_blk(blk); }); @@ -267,12 +268,20 @@ void HotStuffBase::propose_handler(MsgPropose &&msg, Conn &conn) { void HotStuffBase::vote_handler(MsgVote &&msg, Conn &conn) { const NetAddr &peer = conn.get_peer(); msg.postponed_parse(this); - auto &vote = msg.vote; + //auto &vote = msg.vote; + RcObj v(new Vote(std::move(msg.vote))); promise::all(std::vector{ - async_deliver_blk(vote.bqc_hash, peer), - async_deliver_blk(vote.blk_hash, peer) - }).then([this, vote = std::move(vote)]() { - on_receive_vote(vote); + async_deliver_blk(v->bqc_hash, peer), + async_deliver_blk(v->blk_hash, peer) + }).then([this, v=std::move(v)]() { + //bool result = vote->verify(); + auto pm = v->verify(vpool); + pm.then([this, v=std::move(v)](bool result) { + if (!result) + LOG_WARN("invalid vote from %d", v->voter); + else + on_receive_vote(*v); + }); }); } @@ -383,11 +392,13 @@ HotStuffBase::HotStuffBase(uint32_t blk_size, privkey_bt &&priv_key, NetAddr listen_addr, pacemaker_bt pmaker, - EventContext eb): + EventContext eb, + size_t nworker): HotStuffCore(rid, std::move(priv_key)), listen_addr(listen_addr), blk_size(blk_size), eb(eb), + vpool(eb, nworker), pn(eb), pmaker(std::move(pmaker)), @@ -420,7 +431,10 @@ void HotStuffBase::do_vote(ReplicaID last_proposer, const Vote &vote) { pmaker->beat_resp(last_proposer) .then([this, vote](ReplicaID proposer) { if (proposer == get_id()) - on_receive_vote(vote); + { + throw HotStuffError("unreachable line"); + //on_receive_vote(vote); + } else pn.send_msg(MsgVote(vote), get_config().get_addr(proposer)); }); -- cgit v1.2.3