aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDeterminant <tederminant@gmail.com>2018-07-16 19:26:36 -0400
committerDeterminant <tederminant@gmail.com>2018-07-16 19:26:36 -0400
commit02e347dae1a01172dbcc2efe054014c015d96507 (patch)
tree4b49650ba8d5dac0ab689cdc59867da3127f2bb7 /src
parenta7cfb274d651e858ab06eff5b28a6f77e0178cf1 (diff)
...
Diffstat (limited to 'src')
-rw-r--r--src/consensus.cpp296
-rw-r--r--src/core.cpp723
-rw-r--r--src/entity.cpp2
-rw-r--r--src/hotstuff.cpp647
-rw-r--r--src/hotstuff_app.cpp287
-rw-r--r--src/hotstuff_client.cpp8
6 files changed, 986 insertions, 977 deletions
diff --git a/src/consensus.cpp b/src/consensus.cpp
new file mode 100644
index 0000000..7749558
--- /dev/null
+++ b/src/consensus.cpp
@@ -0,0 +1,296 @@
+#include <cassert>
+#include <stack>
+
+#include "hotstuff/util.h"
+#include "hotstuff/consensus.h"
+
+#define LOG_INFO HOTSTUFF_LOG_INFO
+#define LOG_DEBUG HOTSTUFF_LOG_DEBUG
+#define LOG_WARN HOTSTUFF_LOG_WARN
+
+namespace hotstuff {
+
+/* The core logic of HotStuff, is farily simple :) */
+/*** begin HotStuff protocol logic ***/
+HotStuffCore::HotStuffCore(ReplicaID id,
+ privkey_bt &&priv_key,
+ int32_t parent_limit):
+ b0(new Block(true, 1)),
+ bqc(b0),
+ bexec(b0),
+ vheight(0),
+ priv_key(std::move(priv_key)),
+ tails{bqc},
+ id(id),
+ parent_limit(parent_limit),
+ storage(new EntityStorage()) {
+ storage->add_blk(b0);
+ b0->qc_ref = b0;
+}
+
+void HotStuffCore::sanity_check_delivered(const block_t &blk) {
+ if (!blk->delivered)
+ throw std::runtime_error("block not delivered");
+}
+
+block_t HotStuffCore::sanity_check_delivered(const uint256_t &blk_hash) {
+ block_t blk = storage->find_blk(blk_hash);
+ if (blk == nullptr || !blk->delivered)
+ throw std::runtime_error("block not delivered");
+ return std::move(blk);
+}
+
+bool HotStuffCore::on_deliver_blk(const block_t &blk) {
+ if (blk->delivered)
+ {
+ LOG_WARN("attempt to deliver a block twice");
+ return false;
+ }
+ blk->parents.clear();
+ for (const auto &hash: blk->parent_hashes)
+ {
+ block_t p = sanity_check_delivered(hash);
+ blk->parents.push_back(p);
+ }
+ blk->height = blk->parents[0]->height + 1;
+ for (const auto &cmd: blk->cmds)
+ cmd->container = blk;
+
+ if (blk->qc)
+ {
+ block_t _blk = storage->find_blk(blk->qc->get_blk_hash());
+ if (_blk == nullptr)
+ throw std::runtime_error("block referred by qc not fetched");
+ blk->qc_ref = std::move(_blk);
+ } // otherwise blk->qc_ref remains null
+
+ for (auto pblk: blk->parents) tails.erase(pblk);
+ tails.insert(blk);
+
+ blk->delivered = true;
+ LOG_DEBUG("delivered %.10s", get_hex(blk->get_hash()).c_str());
+ return true;
+}
+
+void HotStuffCore::check_commit(const block_t &_blk) {
+ const block_t &blk = _blk->qc_ref;
+ if (blk->qc_ref == nullptr) return;
+ if (blk->decision) return;
+ block_t p = blk->parents[0];
+ if (p == blk->qc_ref)
+ { /* commit */
+ std::vector<block_t> commit_queue;
+ block_t b;
+ for (b = p; b->height > bexec->height; b = b->parents[0])
+ { /* todo: also commit the uncles/aunts */
+ commit_queue.push_back(b);
+ }
+ if (b != bexec)
+ throw std::runtime_error("safety breached :(");
+ for (auto it = commit_queue.rbegin(); it != commit_queue.rend(); it++)
+ {
+ const block_t &blk = *it;
+ blk->decision = 1;
+#ifdef HOTSTUFF_ENABLE_LOG_PROTO
+ LOG_INFO("commit blk %.10s", get_hex10(blk->get_hash()).c_str());
+#endif
+ for (auto cmd: blk->cmds)
+ do_decide(cmd);
+ }
+ bexec = p;
+ }
+}
+
+bool HotStuffCore::update(const uint256_t &bqc_hash) {
+ block_t _bqc = sanity_check_delivered(bqc_hash);
+ if (_bqc->qc_ref == nullptr) return false;
+ check_commit(_bqc);
+ if (_bqc->qc_ref->height > bqc->qc_ref->height)
+ bqc = _bqc;
+ return true;
+}
+
+void HotStuffCore::on_propose(const std::vector<command_t> &cmds) {
+ size_t nparents = parent_limit < 1 ? tails.size() : parent_limit;
+ assert(tails.size() > 0);
+ block_t p = *tails.rbegin();
+ std::vector<block_t> parents{p};
+ tails.erase(p);
+ nparents--;
+ /* add the rest of tails as "uncles/aunts" */
+ while (nparents--)
+ {
+ auto it = tails.begin();
+ parents.push_back(*it);
+ tails.erase(it);
+ }
+ quorum_cert_bt qc = nullptr;
+ block_t qc_ref = nullptr;
+ if (p != b0 && p->voted.size() >= config.nmajority)
+ {
+ qc = p->self_qc->clone();
+ qc->compute();
+ qc_ref = p;
+ }
+ /* create a new block */
+ block_t bnew = storage->add_blk(
+ Block(
+ parents,
+ cmds,
+ p->height + 1,
+ std::move(qc), qc_ref,
+ nullptr
+ ));
+ const uint256_t bnew_hash = bnew->get_hash();
+ bnew->self_qc = create_quorum_cert(bnew_hash);
+ on_deliver_blk(bnew);
+ update(bnew_hash);
+ Proposal prop(id, bqc->get_hash(), bnew, nullptr);
+#ifdef HOTSTUFF_ENABLE_LOG_PROTO
+ LOG_INFO("propose %s", std::string(*bnew).c_str());
+#endif
+ /* self-vote */
+ on_receive_vote(
+ Vote(id, bqc->get_hash(), bnew_hash,
+ create_part_cert(*priv_key, bnew_hash), this));
+ on_propose_(bnew);
+ /* boradcast to other replicas */
+ do_broadcast_proposal(prop);
+}
+
+void HotStuffCore::on_receive_proposal(const Proposal &prop) {
+ if (!update(prop.bqc_hash)) return;
+#ifdef HOTSTUFF_ENABLE_LOG_PROTO
+ LOG_INFO("got %s", std::string(prop).c_str());
+#endif
+ block_t bnew = prop.blk;
+ sanity_check_delivered(bnew);
+ bool opinion = false;
+ if (bnew->height > vheight)
+ {
+ block_t pref = bqc->qc_ref;
+ block_t b;
+ for (b = bnew;
+ b->height > pref->height;
+ b = b->parents[0]);
+ opinion = b == pref;
+ vheight = bnew->height;
+ }
+#ifdef HOTSTUFF_ENABLE_LOG_PROTO
+ LOG_INFO("now state: %s", std::string(*this).c_str());
+#endif
+ do_vote(prop.proposer,
+ Vote(id,
+ bqc->get_hash(),
+ bnew->get_hash(),
+ (opinion ?
+ create_part_cert(*priv_key, bnew->get_hash()) :
+ nullptr),
+ nullptr));
+}
+
+void HotStuffCore::on_receive_vote(const Vote &vote) {
+ if (!update(vote.bqc_hash)) return;
+#ifdef HOTSTUFF_ENABLE_LOG_PROTO
+ LOG_INFO("got %s", std::string(vote).c_str());
+ LOG_INFO("now state: %s", std::string(*this).c_str());
+#endif
+
+ block_t blk = sanity_check_delivered(vote.blk_hash);
+ if (vote.cert == nullptr) return;
+ if (!vote.verify())
+ {
+ LOG_WARN("invalid vote");
+ return;
+ }
+ if (!blk->voted.insert(vote.voter).second)
+ {
+ LOG_WARN("duplicate votes");
+ return;
+ }
+ size_t qsize = blk->voted.size();
+ if (qsize <= config.nmajority)
+ {
+ blk->self_qc->add_part(vote.voter, *vote.cert);
+ if (qsize == config.nmajority)
+ on_qc_finish(blk);
+ }
+}
+/*** end HotStuff protocol logic ***/
+
+void HotStuffCore::prune(uint32_t staleness) {
+ block_t start;
+ /* skip the blocks */
+ for (start = bexec; staleness; staleness--, start = start->parents[0])
+ if (!start->parents.size()) return;
+ std::stack<block_t> s;
+ start->qc_ref = nullptr;
+ s.push(start);
+ while (!s.empty())
+ {
+ auto &blk = s.top();
+ if (blk->parents.empty())
+ {
+ storage->try_release_blk(blk);
+ s.pop();
+ continue;
+ }
+ blk->qc_ref = nullptr;
+ s.push(blk->parents.back());
+ blk->parents.pop_back();
+ }
+}
+
+int8_t HotStuffCore::get_cmd_decision(const uint256_t &cmd_hash) {
+ auto cmd = storage->find_cmd(cmd_hash);
+ return cmd != nullptr ? cmd->get_decision() : 0;
+}
+
+void HotStuffCore::add_replica(ReplicaID rid, const NetAddr &addr,
+ pubkey_bt &&pub_key) {
+ config.add_replica(rid,
+ ReplicaInfo(rid, addr, std::move(pub_key)));
+ b0->voted.insert(rid);
+}
+
+promise_t HotStuffCore::async_qc_finish(const block_t &blk) {
+ if (blk->voted.size() >= config.nmajority)
+ return promise_t([](promise_t &pm) {
+ pm.resolve();
+ });
+ auto it = qc_waiting.find(blk);
+ if (it == qc_waiting.end())
+ it = qc_waiting.insert(std::make_pair(blk, promise_t())).first;
+ return it->second;
+}
+
+void HotStuffCore::on_qc_finish(const block_t &blk) {
+ auto it = qc_waiting.find(blk);
+ if (it != qc_waiting.end())
+ {
+ it->second.resolve();
+ qc_waiting.erase(it);
+ }
+}
+
+promise_t HotStuffCore::async_wait_propose() {
+ return propose_waiting;
+}
+
+void HotStuffCore::on_propose_(const block_t &blk) {
+ auto t = std::move(propose_waiting);
+ propose_waiting = promise_t();
+ t.resolve(blk);
+}
+
+HotStuffCore::operator std::string () const {
+ DataStream s;
+ s << "<hotstuff "
+ << "bqc=" << get_hex10(bqc->get_hash()) << " "
+ << "bexec=" << get_hex10(bqc->get_hash()) << " "
+ << "vheight=" << std::to_string(vheight) << " "
+ << "tails=" << std::to_string(tails.size()) << ">";
+ return std::string(std::move(s));
+}
+
+}
diff --git a/src/core.cpp b/src/core.cpp
deleted file mode 100644
index 125e168..0000000
--- a/src/core.cpp
+++ /dev/null
@@ -1,723 +0,0 @@
-#include <stack>
-#include "hotstuff/core.h"
-
-using salticidae::DataStream;
-using salticidae::static_pointer_cast;
-using salticidae::get_hash;
-
-#define LOG_INFO HOTSTUFF_LOG_INFO
-#define LOG_DEBUG HOTSTUFF_LOG_DEBUG
-#define LOG_WARN HOTSTUFF_LOG_WARN
-
-namespace hotstuff {
-
-void MsgHotStuff::gen_propose(const Proposal &proposal) {
- DataStream s;
- set_opcode(PROPOSE);
- s << proposal;
- set_payload(std::move(s));
-}
-
-void MsgHotStuff::parse_propose(Proposal &proposal) const {
- DataStream(get_payload()) >> proposal;
-}
-
-void MsgHotStuff::gen_vote(const Vote &vote) {
- DataStream s;
- set_opcode(VOTE);
- s << vote;
- set_payload(std::move(s));
-}
-
-void MsgHotStuff::parse_vote(Vote &vote) const {
- DataStream(get_payload()) >> vote;
-}
-
-void MsgHotStuff::gen_qfetchblk(const std::vector<uint256_t> &blk_hashes) {
- DataStream s;
- set_opcode(QUERY_FETCH_BLK);
- gen_hash_list(s, blk_hashes);
- set_payload(std::move(s));
-}
-
-void MsgHotStuff::parse_qfetchblk(std::vector<uint256_t> &blk_hashes) const {
- DataStream s(get_payload());
- parse_hash_list(s, blk_hashes);
-}
-
-void MsgHotStuff::gen_rfetchblk(const std::vector<block_t> &blks) {
- DataStream s;
- set_opcode(RESP_FETCH_BLK);
- s << htole((uint32_t)blks.size());
- for (auto blk: blks) s << *blk;
- set_payload(std::move(s));
-}
-
-void MsgHotStuff::parse_rfetchblk(std::vector<block_t> &blks, HotStuffCore *hsc) const {
- DataStream s;
- uint32_t size;
- s >> size;
- size = letoh(size);
- blks.resize(size);
- for (auto &blk: blks)
- {
- Block _blk;
- _blk.unserialize(s, hsc);
- if (!_blk.verify(hsc->get_config()))
- blk = hsc->storage->add_blk(std::move(_blk));
- else
- {
- blk = nullptr;
- LOG_WARN("block is invalid");
- }
- }
-}
-
-/* The core logic of HotStuff, is farily simple :) */
-/*** begin HotStuff protocol logic ***/
-HotStuffCore::HotStuffCore(ReplicaID id,
- privkey_bt &&priv_key,
- int32_t parent_limit):
- b0(new Block(true, 1)),
- bqc(b0),
- bexec(b0),
- vheight(0),
- priv_key(std::move(priv_key)),
- tails{bqc},
- id(id),
- parent_limit(parent_limit),
- storage(new EntityStorage()) {
- storage->add_blk(b0);
- b0->qc_ref = b0;
-}
-
-void HotStuffCore::sanity_check_delivered(const block_t &blk) {
- if (!blk->delivered)
- throw std::runtime_error("block not delivered");
-}
-
-block_t HotStuffCore::sanity_check_delivered(const uint256_t &blk_hash) {
- block_t blk = storage->find_blk(blk_hash);
- if (blk == nullptr || !blk->delivered)
- throw std::runtime_error("block not delivered");
- return std::move(blk);
-}
-
-bool HotStuffCore::on_deliver_blk(const block_t &blk) {
- if (blk->delivered)
- {
- LOG_WARN("attempt to deliver a block twice");
- return false;
- }
- blk->parents.clear();
- for (const auto &hash: blk->parent_hashes)
- {
- block_t p = sanity_check_delivered(hash);
- blk->parents.push_back(p);
- }
- blk->height = blk->parents[0]->height + 1;
- for (const auto &cmd: blk->cmds)
- cmd->container = blk;
-
- if (blk->qc)
- {
- block_t _blk = storage->find_blk(blk->qc->get_blk_hash());
- if (_blk == nullptr)
- throw std::runtime_error("block referred by qc not fetched");
- blk->qc_ref = std::move(_blk);
- } // otherwise blk->qc_ref remains null
-
- for (auto pblk: blk->parents) tails.erase(pblk);
- tails.insert(blk);
-
- blk->delivered = true;
- LOG_DEBUG("delivered %.10s", get_hex(blk->get_hash()).c_str());
- return true;
-}
-
-void HotStuffCore::check_commit(const block_t &_blk) {
- const block_t &blk = _blk->qc_ref;
- if (blk->qc_ref == nullptr) return;
- if (blk->decision) return;
- block_t p = blk->parents[0];
- if (p == blk->qc_ref)
- { /* commit */
- std::vector<block_t> commit_queue;
- block_t b;
- for (b = p; b->height > bexec->height; b = b->parents[0])
- { /* todo: also commit the uncles/aunts */
- commit_queue.push_back(b);
- }
- if (b != bexec)
- throw std::runtime_error("safety breached :(");
- for (auto it = commit_queue.rbegin(); it != commit_queue.rend(); it++)
- {
- const block_t &blk = *it;
- blk->decision = 1;
-#ifdef HOTSTUFF_ENABLE_LOG_PROTO
- LOG_INFO("commit blk %.10s", get_hex10(blk->get_hash()).c_str());
-#endif
- for (auto cmd: blk->cmds)
- do_decide(cmd);
- }
- bexec = p;
- }
-}
-
-bool HotStuffCore::update(const uint256_t &bqc_hash) {
- block_t _bqc = sanity_check_delivered(bqc_hash);
- if (_bqc->qc_ref == nullptr) return false;
- check_commit(_bqc);
- if (_bqc->qc_ref->height > bqc->qc_ref->height)
- bqc = _bqc;
- return true;
-}
-
-void HotStuffCore::on_propose(const std::vector<command_t> &cmds) {
- size_t nparents = parent_limit < 1 ? tails.size() : parent_limit;
- assert(tails.size() > 0);
- block_t p = *tails.rbegin();
- std::vector<block_t> parents{p};
- tails.erase(p);
- nparents--;
- /* add the rest of tails as "uncles/aunts" */
- while (nparents--)
- {
- auto it = tails.begin();
- parents.push_back(*it);
- tails.erase(it);
- }
- quorum_cert_bt qc = nullptr;
- block_t qc_ref = nullptr;
- if (p != b0 && p->voted.size() >= config.nmajority)
- {
- qc = p->self_qc->clone();
- qc->compute();
- qc_ref = p;
- }
- /* create a new block */
- block_t bnew = storage->add_blk(
- Block(
- parents,
- cmds,
- p->height + 1,
- std::move(qc), qc_ref,
- nullptr
- ));
- const uint256_t bnew_hash = bnew->get_hash();
- bnew->self_qc = create_quorum_cert(bnew_hash);
- on_deliver_blk(bnew);
- update(bnew_hash);
- Proposal prop(id, bqc->get_hash(), bnew, nullptr);
-#ifdef HOTSTUFF_ENABLE_LOG_PROTO
- LOG_INFO("propose %s", std::string(*bnew).c_str());
-#endif
- /* self-vote */
- on_receive_vote(
- Vote(id, bqc->get_hash(), bnew_hash,
- create_part_cert(*priv_key, bnew_hash), this));
- on_propose_(bnew);
- /* boradcast to other replicas */
- do_broadcast_proposal(prop);
-}
-
-void HotStuffCore::on_receive_proposal(const Proposal &prop) {
- if (!update(prop.bqc_hash)) return;
-#ifdef HOTSTUFF_ENABLE_LOG_PROTO
- LOG_INFO("got %s", std::string(prop).c_str());
-#endif
- block_t bnew = prop.blk;
- sanity_check_delivered(bnew);
- bool opinion = false;
- if (bnew->height > vheight)
- {
- block_t pref = bqc->qc_ref;
- block_t b;
- for (b = bnew;
- b->height > pref->height;
- b = b->parents[0]);
- opinion = b == pref;
- vheight = bnew->height;
- }
-#ifdef HOTSTUFF_ENABLE_LOG_PROTO
- LOG_INFO("now state: %s", std::string(*this).c_str());
-#endif
- do_vote(prop.proposer,
- Vote(id,
- bqc->get_hash(),
- bnew->get_hash(),
- (opinion ?
- create_part_cert(*priv_key, bnew->get_hash()) :
- nullptr),
- nullptr));
-}
-
-void HotStuffCore::on_receive_vote(const Vote &vote) {
- if (!update(vote.bqc_hash)) return;
-#ifdef HOTSTUFF_ENABLE_LOG_PROTO
- LOG_INFO("got %s", std::string(vote).c_str());
- LOG_INFO("now state: %s", std::string(*this).c_str());
-#endif
-
- block_t blk = sanity_check_delivered(vote.blk_hash);
- if (vote.cert == nullptr) return;
- if (!vote.verify())
- {
- LOG_WARN("invalid vote");
- return;
- }
- if (!blk->voted.insert(vote.voter).second)
- {
- LOG_WARN("duplicate votes");
- return;
- }
- size_t qsize = blk->voted.size();
- if (qsize <= config.nmajority)
- {
- blk->self_qc->add_part(vote.voter, *vote.cert);
- if (qsize == config.nmajority)
- on_qc_finish(blk);
- }
-}
-/*** end HotStuff protocol logic ***/
-
-void HotStuffCore::prune(uint32_t staleness) {
- block_t start;
- /* skip the blocks */
- for (start = bexec; staleness; staleness--, start = start->parents[0])
- if (!start->parents.size()) return;
- std::stack<block_t> s;
- start->qc_ref = nullptr;
- s.push(start);
- while (!s.empty())
- {
- auto &blk = s.top();
- if (blk->parents.empty())
- {
- storage->try_release_blk(blk);
- s.pop();
- continue;
- }
- blk->qc_ref = nullptr;
- s.push(blk->parents.back());
- blk->parents.pop_back();
- }
-}
-
-int8_t HotStuffCore::get_cmd_decision(const uint256_t &cmd_hash) {
- auto cmd = storage->find_cmd(cmd_hash);
- return cmd != nullptr ? cmd->get_decision() : 0;
-}
-
-void HotStuffCore::add_replica(ReplicaID rid, const NetAddr &addr,
- pubkey_bt &&pub_key) {
- config.add_replica(rid,
- ReplicaInfo(rid, addr, std::move(pub_key)));
- b0->voted.insert(rid);
-}
-
-promise_t HotStuffCore::async_qc_finish(const block_t &blk) {
- if (blk->voted.size() >= config.nmajority)
- return promise_t([](promise_t &pm) {
- pm.resolve();
- });
- auto it = qc_waiting.find(blk);
- if (it == qc_waiting.end())
- it = qc_waiting.insert(std::make_pair(blk, promise_t())).first;
- return it->second;
-}
-
-void HotStuffCore::on_qc_finish(const block_t &blk) {
- auto it = qc_waiting.find(blk);
- if (it != qc_waiting.end())
- {
- it->second.resolve();
- qc_waiting.erase(it);
- }
-}
-
-promise_t HotStuffCore::async_wait_propose() {
- return propose_waiting;
-}
-
-void HotStuffCore::on_propose_(const block_t &blk) {
- auto t = std::move(propose_waiting);
- propose_waiting = promise_t();
- t.resolve(blk);
-}
-
-HotStuffCore::operator std::string () const {
- DataStream s;
- s << "<hotstuff "
- << "bqc=" << get_hex10(bqc->get_hash()) << " "
- << "bexec=" << get_hex10(bqc->get_hash()) << " "
- << "vheight=" << std::to_string(vheight) << " "
- << "tails=" << std::to_string(tails.size()) << ">";
- return std::string(std::move(s));
-}
-
-void HotStuffBase::add_replica(ReplicaID idx, const NetAddr &addr,
- pubkey_bt &&pub_key) {
- HotStuffCore::add_replica(idx, addr, std::move(pub_key));
- if (addr != listen_addr)
- pn.add_peer(addr);
-}
-
-void HotStuffBase::on_fetch_blk(const block_t &blk) {
-#ifdef HOTSTUFF_ENABLE_TX_PROFILE
- blk_profiler.get_tx(blk->get_hash());
-#endif
- LOG_DEBUG("fetched %.10s", get_hex(blk->get_hash()).c_str());
- part_fetched++;
- fetched++;
- for (auto cmd: blk->get_cmds()) on_fetch_cmd(cmd);
- const uint256_t &blk_hash = blk->get_hash();
- auto it = blk_fetch_waiting.find(blk_hash);
- if (it != blk_fetch_waiting.end())
- {
- it->second.resolve(blk);
- blk_fetch_waiting.erase(it);
- }
-}
-
-void HotStuffBase::on_fetch_cmd(const command_t &cmd) {
- const uint256_t &cmd_hash = cmd->get_hash();
- auto it = cmd_fetch_waiting.find(cmd_hash);
- if (it != cmd_fetch_waiting.end())
- {
- it->second.resolve(cmd);
- cmd_fetch_waiting.erase(it);
- }
-}
-
-void HotStuffBase::on_deliver_blk(const block_t &blk) {
- const uint256_t &blk_hash = blk->get_hash();
- bool valid;
- /* sanity check: all parents must be delivered */
- for (const auto &p: blk->get_parent_hashes())
- assert(storage->is_blk_delivered(p));
- if ((valid = HotStuffCore::on_deliver_blk(blk)))
- {
- LOG_DEBUG("block %.10s delivered",
- get_hex(blk_hash).c_str());
- part_parent_size += blk->get_parent_hashes().size();
- part_delivered++;
- delivered++;
- }
- else
- {
- LOG_WARN("dropping invalid block");
- }
-
- auto it = blk_delivery_waiting.find(blk_hash);
- if (it != blk_delivery_waiting.end())
- {
- auto &pm = it->second;
- if (valid)
- {
- pm.elapsed.stop(false);
- auto sec = pm.elapsed.elapsed_sec;
- part_delivery_time += sec;
- part_delivery_time_min = std::min(part_delivery_time_min, sec);
- part_delivery_time_max = std::max(part_delivery_time_max, sec);
-
- pm.resolve(blk);
- }
- else
- {
- pm.reject(blk);
- // TODO: do we need to also free it from storage?
- }
- blk_delivery_waiting.erase(it);
- }
-}
-
-promise_t HotStuffBase::async_fetch_blk(const uint256_t &blk_hash,
- const NetAddr *replica_id,
- bool fetch_now) {
- if (storage->is_blk_fetched(blk_hash))
- return promise_t([this, &blk_hash](promise_t pm){
- pm.resolve(storage->find_blk(blk_hash));
- });
- auto it = blk_fetch_waiting.find(blk_hash);
- if (it == blk_fetch_waiting.end())
- {
-#ifdef HOTSTUFF_ENABLE_TX_PROFILE
- blk_profiler.rec_tx(blk_hash, false);
-#endif
- it = blk_fetch_waiting.insert(
- std::make_pair(
- blk_hash,
- BlockFetchContext(blk_hash, this))).first;
- }
- if (replica_id != nullptr)
- it->second.add_replica(*replica_id, fetch_now);
- return static_cast<promise_t &>(it->second);
-}
-
-promise_t HotStuffBase::async_fetch_cmd(const uint256_t &cmd_hash,
- const NetAddr *replica_id,
- bool fetch_now) {
- if (storage->is_cmd_fetched(cmd_hash))
- return promise_t([this, &cmd_hash](promise_t pm){
- pm.resolve(storage->find_cmd(cmd_hash));
- });
- auto it = cmd_fetch_waiting.find(cmd_hash);
- if (it == cmd_fetch_waiting.end())
- {
- it = cmd_fetch_waiting.insert(
- std::make_pair(cmd_hash, CmdFetchContext(cmd_hash, this))).first;
- }
- if (replica_id != nullptr)
- it->second.add_replica(*replica_id, fetch_now);
- return static_cast<promise_t &>(it->second);
-}
-
-promise_t HotStuffBase::async_deliver_blk(const uint256_t &blk_hash,
- const NetAddr &replica_id) {
- if (storage->is_blk_delivered(blk_hash))
- return promise_t([this, &blk_hash](promise_t pm) {
- pm.resolve(storage->find_blk(blk_hash));
- });
- auto it = blk_delivery_waiting.find(blk_hash);
- if (it != blk_delivery_waiting.end())
- return static_cast<promise_t &>(it->second);
- BlockDeliveryContext pm{[](promise_t){}};
- it = blk_delivery_waiting.insert(std::make_pair(blk_hash, pm)).first;
- /* otherwise the on_deliver_batch will resolve */
- async_fetch_blk(blk_hash, &replica_id).then([this, replica_id](block_t blk) {
- /* qc_ref should be fetched */
- std::vector<promise_t> pms;
- const auto &qc = blk->get_qc();
- if (qc)
- pms.push_back(async_fetch_blk(qc->get_blk_hash(), &replica_id));
- /* the parents should be delivered */
- for (const auto &phash: blk->get_parent_hashes())
- pms.push_back(async_deliver_blk(phash, replica_id));
- promise::all(pms).then([this, blk]() {
- on_deliver_blk(blk);
- });
- });
- return static_cast<promise_t &>(pm);
-}
-
-void HotStuffBase::propose_handler(const MsgHotStuff &msg, conn_t conn_) {
- auto conn = static_pointer_cast<PeerNetwork<MsgHotStuff>::Conn>(conn_);
- const NetAddr &peer = conn->get_peer();
- Proposal prop(this);
- msg.parse_propose(prop);
- block_t blk = prop.blk;
- promise::all(std::vector<promise_t>{
- async_deliver_blk(prop.bqc_hash, peer),
- async_deliver_blk(blk->get_hash(), peer),
- }).then([this, prop = std::move(prop)]() {
- on_receive_proposal(prop);
- });
-}
-
-void HotStuffBase::vote_handler(const MsgHotStuff &msg, conn_t conn_) {
- auto conn = static_pointer_cast<PeerNetwork<MsgHotStuff>::Conn>(conn_);
- const NetAddr &peer = conn->get_peer();
- Vote vote(this);
- msg.parse_vote(vote);
- promise::all(std::vector<promise_t>{
- async_deliver_blk(vote.bqc_hash, peer),
- async_deliver_blk(vote.blk_hash, peer)
- }).then([this, vote = std::move(vote)]() {
- on_receive_vote(vote);
- });
-}
-
-void HotStuffBase::query_fetch_blk_handler(const MsgHotStuff &msg, conn_t conn_) {
- auto conn = static_pointer_cast<PeerNetwork<MsgHotStuff>::Conn>(conn_);
- const NetAddr replica = conn->get_peer();
- std::vector<uint256_t> blk_hashes;
- msg.parse_qfetchblk(blk_hashes);
-
- std::vector<promise_t> pms;
- for (const auto &h: blk_hashes)
- pms.push_back(async_fetch_blk(h, nullptr));
- promise::all(pms).then([replica, this](const promise::values_t values) {
- MsgHotStuff resp;
- std::vector<block_t> blks;
- for (auto &v: values)
- {
- auto blk = promise::any_cast<block_t>(v);
- blks.push_back(blk);
- }
- resp.gen_rfetchblk(blks);
- pn.send_msg(resp, replica);
- });
-}
-
-void HotStuffBase::resp_fetch_blk_handler(const MsgHotStuff &msg, conn_t) {
- std::vector<block_t> blks;
- msg.parse_rfetchblk(blks, this);
- for (const auto &blk: blks)
- if (blk) on_fetch_blk(blk);
-}
-
-void HotStuffBase::print_stat() const {
- LOG_INFO("===== begin stats =====");
- LOG_INFO("-------- queues -------");
- LOG_INFO("blk_fetch_waiting: %lu", blk_fetch_waiting.size());
- LOG_INFO("blk_delivery_waiting: %lu", blk_delivery_waiting.size());
- LOG_INFO("cmd_fetch_waiting: %lu", cmd_fetch_waiting.size());
- LOG_INFO("decision_waiting: %lu", decision_waiting.size());
- LOG_INFO("-------- misc ---------");
- LOG_INFO("fetched: %lu", fetched);
- LOG_INFO("delivered: %lu", delivered);
- LOG_INFO("cmd_cache: %lu", storage->get_cmd_cache_size());
- LOG_INFO("blk_cache: %lu", storage->get_blk_cache_size());
- LOG_INFO("------ misc (10s) -----");
- LOG_INFO("fetched: %lu", part_fetched);
- LOG_INFO("delivered: %lu", part_delivered);
- LOG_INFO("decided: %lu", part_decided);
- LOG_INFO("gened: %lu", part_gened);
- LOG_INFO("avg. parent_size: %.3f",
- part_delivered ? part_parent_size / double(part_delivered) : 0);
- LOG_INFO("delivery time: %.3f avg, %.3f min, %.3f max",
- part_delivered ? part_delivery_time / double(part_delivered) : 0,
- part_delivery_time_min == double_inf ? 0 : part_delivery_time_min,
- part_delivery_time_max);
-
- part_parent_size = 0;
- part_fetched = 0;
- part_delivered = 0;
- part_decided = 0;
- part_gened = 0;
- part_delivery_time = 0;
- part_delivery_time_min = double_inf;
- part_delivery_time_max = 0;
- LOG_INFO("-- sent opcode (10s) --");
- auto &sent_op = pn.get_sent_by_opcode();
- for (auto &op: sent_op)
- {
- auto &val = op.second;
- LOG_INFO("%02x: %lu, %.2fBpm", op.first,
- val.first, val.first ? val.second / double(val.first) : 0);
- val.first = val.second = 0;
- }
- LOG_INFO("-- recv opcode (10s) --");
- auto &recv_op = pn.get_recv_by_opcode();
- for (auto &op: recv_op)
- {
- auto &val = op.second;
- LOG_INFO("%02x: %lu, %.2fBpm", op.first,
- val.first, val.first ? val.second / double(val.first) : 0);
- val.first = val.second = 0;
- }
- LOG_INFO("--- replica msg. (10s) ---");
- size_t _nsent = 0;
- size_t _nrecv = 0;
- for (const auto &replica: pn.all_peers())
- {
- auto conn = pn.get_peer_conn(replica);
- size_t ns = conn->get_nsent();
- size_t nr = conn->get_nrecv();
- conn->clear_nsent();
- conn->clear_nrecv();
- LOG_INFO("%s: %u, %u, %u",
- std::string(replica).c_str(), ns, nr, part_fetched_replica[replica]);
- _nsent += ns;
- _nrecv += nr;
- part_fetched_replica[replica] = 0;
- }
- nsent += _nsent;
- nrecv += _nrecv;
- LOG_INFO("sent: %lu", _nsent);
- LOG_INFO("recv: %lu", _nrecv);
- LOG_INFO("--- replica msg. total ---");
- LOG_INFO("sent: %lu", nsent);
- LOG_INFO("recv: %lu", nrecv);
- LOG_INFO("====== end stats ======");
-}
-
-promise_t HotStuffBase::async_decide(const uint256_t &cmd_hash) {
- if (get_cmd_decision(cmd_hash))
- return promise_t([this, cmd_hash](promise_t pm){
- pm.resolve(storage->find_cmd(cmd_hash));
- });
- /* otherwise the do_decide will resolve the promise */
- auto it = decision_waiting.find(cmd_hash);
- if (it == decision_waiting.end())
- {
- promise_t pm{[](promise_t){}};
- it = decision_waiting.insert(std::make_pair(cmd_hash, pm)).first;
- }
- return it->second;
-}
-
-HotStuffBase::HotStuffBase(uint32_t blk_size,
- int32_t parent_limit,
- ReplicaID rid,
- privkey_bt &&priv_key,
- NetAddr listen_addr,
- EventContext eb,
- pacemaker_bt pmaker):
- HotStuffCore(rid, std::move(priv_key), parent_limit),
- listen_addr(listen_addr),
- blk_size(blk_size),
- eb(eb),
- pmaker(std::move(pmaker)),
- pn(eb),
-
- fetched(0), delivered(0),
- nsent(0), nrecv(0),
- part_parent_size(0),
- part_fetched(0),
- part_delivered(0),
- part_decided(0),
- part_gened(0),
- part_delivery_time(0),
- part_delivery_time_min(double_inf),
- part_delivery_time_max(0)
-{
- if (pmaker == nullptr)
- this->pmaker = new PaceMakerDummy(this);
- /* register the handlers for msg from replicas */
- pn.reg_handler(PROPOSE, std::bind(&HotStuffBase::propose_handler, this, _1, _2));
- pn.reg_handler(VOTE, std::bind(&HotStuffBase::vote_handler, this, _1, _2));
- pn.reg_handler(QUERY_FETCH_BLK, std::bind(&HotStuffBase::query_fetch_blk_handler, this, _1, _2));
- pn.reg_handler(RESP_FETCH_BLK, std::bind(&HotStuffBase::resp_fetch_blk_handler, this, _1, _2));
- pn.init(listen_addr);
-}
-
-void HotStuffBase::do_broadcast_proposal(const Proposal &prop) {
- MsgHotStuff prop_msg;
- prop_msg.gen_propose(prop);
- for (const auto &replica: pn.all_peers())
- pn.send_msg(prop_msg, replica);
-}
-