From 02e347dae1a01172dbcc2efe054014c015d96507 Mon Sep 17 00:00:00 2001 From: Determinant Date: Mon, 16 Jul 2018 19:26:36 -0400 Subject: ... --- src/core.cpp | 723 ----------------------------------------------------------- 1 file changed, 723 deletions(-) delete mode 100644 src/core.cpp (limited to 'src/core.cpp') diff --git a/src/core.cpp b/src/core.cpp deleted file mode 100644 index 125e168..0000000 --- a/src/core.cpp +++ /dev/null @@ -1,723 +0,0 @@ -#include -#include "hotstuff/core.h" - -using salticidae::DataStream; -using salticidae::static_pointer_cast; -using salticidae::get_hash; - -#define LOG_INFO HOTSTUFF_LOG_INFO -#define LOG_DEBUG HOTSTUFF_LOG_DEBUG -#define LOG_WARN HOTSTUFF_LOG_WARN - -namespace hotstuff { - -void MsgHotStuff::gen_propose(const Proposal &proposal) { - DataStream s; - set_opcode(PROPOSE); - s << proposal; - set_payload(std::move(s)); -} - -void MsgHotStuff::parse_propose(Proposal &proposal) const { - DataStream(get_payload()) >> proposal; -} - -void MsgHotStuff::gen_vote(const Vote &vote) { - DataStream s; - set_opcode(VOTE); - s << vote; - set_payload(std::move(s)); -} - -void MsgHotStuff::parse_vote(Vote &vote) const { - DataStream(get_payload()) >> vote; -} - -void MsgHotStuff::gen_qfetchblk(const std::vector &blk_hashes) { - DataStream s; - set_opcode(QUERY_FETCH_BLK); - gen_hash_list(s, blk_hashes); - set_payload(std::move(s)); -} - -void MsgHotStuff::parse_qfetchblk(std::vector &blk_hashes) const { - DataStream s(get_payload()); - parse_hash_list(s, blk_hashes); -} - -void MsgHotStuff::gen_rfetchblk(const std::vector &blks) { - DataStream s; - set_opcode(RESP_FETCH_BLK); - s << htole((uint32_t)blks.size()); - for (auto blk: blks) s << *blk; - set_payload(std::move(s)); -} - -void MsgHotStuff::parse_rfetchblk(std::vector &blks, HotStuffCore *hsc) const { - DataStream s; - uint32_t size; - s >> size; - size = letoh(size); - blks.resize(size); - for (auto &blk: blks) - { - Block _blk; - _blk.unserialize(s, hsc); - if (!_blk.verify(hsc->get_config())) - blk = hsc->storage->add_blk(std::move(_blk)); - else - { - blk = nullptr; - LOG_WARN("block is invalid"); - } - } -} - -/* The core logic of HotStuff, is farily simple :) */ -/*** begin HotStuff protocol logic ***/ -HotStuffCore::HotStuffCore(ReplicaID id, - privkey_bt &&priv_key, - int32_t parent_limit): - b0(new Block(true, 1)), - bqc(b0), - bexec(b0), - vheight(0), - priv_key(std::move(priv_key)), - tails{bqc}, - id(id), - parent_limit(parent_limit), - storage(new EntityStorage()) { - storage->add_blk(b0); - b0->qc_ref = b0; -} - -void HotStuffCore::sanity_check_delivered(const block_t &blk) { - if (!blk->delivered) - throw std::runtime_error("block not delivered"); -} - -block_t HotStuffCore::sanity_check_delivered(const uint256_t &blk_hash) { - block_t blk = storage->find_blk(blk_hash); - if (blk == nullptr || !blk->delivered) - throw std::runtime_error("block not delivered"); - return std::move(blk); -} - -bool HotStuffCore::on_deliver_blk(const block_t &blk) { - if (blk->delivered) - { - LOG_WARN("attempt to deliver a block twice"); - return false; - } - blk->parents.clear(); - for (const auto &hash: blk->parent_hashes) - { - block_t p = sanity_check_delivered(hash); - blk->parents.push_back(p); - } - blk->height = blk->parents[0]->height + 1; - for (const auto &cmd: blk->cmds) - cmd->container = blk; - - if (blk->qc) - { - block_t _blk = storage->find_blk(blk->qc->get_blk_hash()); - if (_blk == nullptr) - throw std::runtime_error("block referred by qc not fetched"); - blk->qc_ref = std::move(_blk); - } // otherwise blk->qc_ref remains null - - for (auto pblk: blk->parents) tails.erase(pblk); - tails.insert(blk); - - blk->delivered = true; - LOG_DEBUG("delivered %.10s", get_hex(blk->get_hash()).c_str()); - return true; -} - -void HotStuffCore::check_commit(const block_t &_blk) { - const block_t &blk = _blk->qc_ref; - if (blk->qc_ref == nullptr) return; - if (blk->decision) return; - block_t p = blk->parents[0]; - if (p == blk->qc_ref) - { /* commit */ - std::vector commit_queue; - block_t b; - for (b = p; b->height > bexec->height; b = b->parents[0]) - { /* todo: also commit the uncles/aunts */ - commit_queue.push_back(b); - } - if (b != bexec) - throw std::runtime_error("safety breached :("); - for (auto it = commit_queue.rbegin(); it != commit_queue.rend(); it++) - { - const block_t &blk = *it; - blk->decision = 1; -#ifdef HOTSTUFF_ENABLE_LOG_PROTO - LOG_INFO("commit blk %.10s", get_hex10(blk->get_hash()).c_str()); -#endif - for (auto cmd: blk->cmds) - do_decide(cmd); - } - bexec = p; - } -} - -bool HotStuffCore::update(const uint256_t &bqc_hash) { - block_t _bqc = sanity_check_delivered(bqc_hash); - if (_bqc->qc_ref == nullptr) return false; - check_commit(_bqc); - if (_bqc->qc_ref->height > bqc->qc_ref->height) - bqc = _bqc; - return true; -} - -void HotStuffCore::on_propose(const std::vector &cmds) { - size_t nparents = parent_limit < 1 ? tails.size() : parent_limit; - assert(tails.size() > 0); - block_t p = *tails.rbegin(); - std::vector parents{p}; - tails.erase(p); - nparents--; - /* add the rest of tails as "uncles/aunts" */ - while (nparents--) - { - auto it = tails.begin(); - parents.push_back(*it); - tails.erase(it); - } - quorum_cert_bt qc = nullptr; - block_t qc_ref = nullptr; - if (p != b0 && p->voted.size() >= config.nmajority) - { - qc = p->self_qc->clone(); - qc->compute(); - qc_ref = p; - } - /* create a new block */ - block_t bnew = storage->add_blk( - Block( - parents, - cmds, - p->height + 1, - std::move(qc), qc_ref, - nullptr - )); - const uint256_t bnew_hash = bnew->get_hash(); - bnew->self_qc = create_quorum_cert(bnew_hash); - on_deliver_blk(bnew); - update(bnew_hash); - Proposal prop(id, bqc->get_hash(), bnew, nullptr); -#ifdef HOTSTUFF_ENABLE_LOG_PROTO - LOG_INFO("propose %s", std::string(*bnew).c_str()); -#endif - /* self-vote */ - on_receive_vote( - Vote(id, bqc->get_hash(), bnew_hash, - create_part_cert(*priv_key, bnew_hash), this)); - on_propose_(bnew); - /* boradcast to other replicas */ - do_broadcast_proposal(prop); -} - -void HotStuffCore::on_receive_proposal(const Proposal &prop) { - if (!update(prop.bqc_hash)) return; -#ifdef HOTSTUFF_ENABLE_LOG_PROTO - LOG_INFO("got %s", std::string(prop).c_str()); -#endif - block_t bnew = prop.blk; - sanity_check_delivered(bnew); - bool opinion = false; - if (bnew->height > vheight) - { - block_t pref = bqc->qc_ref; - block_t b; - for (b = bnew; - b->height > pref->height; - b = b->parents[0]); - opinion = b == pref; - vheight = bnew->height; - } -#ifdef HOTSTUFF_ENABLE_LOG_PROTO - LOG_INFO("now state: %s", std::string(*this).c_str()); -#endif - do_vote(prop.proposer, - Vote(id, - bqc->get_hash(), - bnew->get_hash(), - (opinion ? - create_part_cert(*priv_key, bnew->get_hash()) : - nullptr), - nullptr)); -} - -void HotStuffCore::on_receive_vote(const Vote &vote) { - if (!update(vote.bqc_hash)) return; -#ifdef HOTSTUFF_ENABLE_LOG_PROTO - LOG_INFO("got %s", std::string(vote).c_str()); - LOG_INFO("now state: %s", std::string(*this).c_str()); -#endif - - block_t blk = sanity_check_delivered(vote.blk_hash); - if (vote.cert == nullptr) return; - if (!vote.verify()) - { - LOG_WARN("invalid vote"); - return; - } - if (!blk->voted.insert(vote.voter).second) - { - LOG_WARN("duplicate votes"); - return; - } - size_t qsize = blk->voted.size(); - if (qsize <= config.nmajority) - { - blk->self_qc->add_part(vote.voter, *vote.cert); - if (qsize == config.nmajority) - on_qc_finish(blk); - } -} -/*** end HotStuff protocol logic ***/ - -void HotStuffCore::prune(uint32_t staleness) { - block_t start; - /* skip the blocks */ - for (start = bexec; staleness; staleness--, start = start->parents[0]) - if (!start->parents.size()) return; - std::stack s; - start->qc_ref = nullptr; - s.push(start); - while (!s.empty()) - { - auto &blk = s.top(); - if (blk->parents.empty()) - { - storage->try_release_blk(blk); - s.pop(); - continue; - } - blk->qc_ref = nullptr; - s.push(blk->parents.back()); - blk->parents.pop_back(); - } -} - -int8_t HotStuffCore::get_cmd_decision(const uint256_t &cmd_hash) { - auto cmd = storage->find_cmd(cmd_hash); - return cmd != nullptr ? cmd->get_decision() : 0; -} - -void HotStuffCore::add_replica(ReplicaID rid, const NetAddr &addr, - pubkey_bt &&pub_key) { - config.add_replica(rid, - ReplicaInfo(rid, addr, std::move(pub_key))); - b0->voted.insert(rid); -} - -promise_t HotStuffCore::async_qc_finish(const block_t &blk) { - if (blk->voted.size() >= config.nmajority) - return promise_t([](promise_t &pm) { - pm.resolve(); - }); - auto it = qc_waiting.find(blk); - if (it == qc_waiting.end()) - it = qc_waiting.insert(std::make_pair(blk, promise_t())).first; - return it->second; -} - -void HotStuffCore::on_qc_finish(const block_t &blk) { - auto it = qc_waiting.find(blk); - if (it != qc_waiting.end()) - { - it->second.resolve(); - qc_waiting.erase(it); - } -} - -promise_t HotStuffCore::async_wait_propose() { - return propose_waiting; -} - -void HotStuffCore::on_propose_(const block_t &blk) { - auto t = std::move(propose_waiting); - propose_waiting = promise_t(); - t.resolve(blk); -} - -HotStuffCore::operator std::string () const { - DataStream s; - s << ""; - return std::string(std::move(s)); -} - -void HotStuffBase::add_replica(ReplicaID idx, const NetAddr &addr, - pubkey_bt &&pub_key) { - HotStuffCore::add_replica(idx, addr, std::move(pub_key)); - if (addr != listen_addr) - pn.add_peer(addr); -} - -void HotStuffBase::on_fetch_blk(const block_t &blk) { -#ifdef HOTSTUFF_ENABLE_TX_PROFILE - blk_profiler.get_tx(blk->get_hash()); -#endif - LOG_DEBUG("fetched %.10s", get_hex(blk->get_hash()).c_str()); - part_fetched++; - fetched++; - for (auto cmd: blk->get_cmds()) on_fetch_cmd(cmd); - const uint256_t &blk_hash = blk->get_hash(); - auto it = blk_fetch_waiting.find(blk_hash); - if (it != blk_fetch_waiting.end()) - { - it->second.resolve(blk); - blk_fetch_waiting.erase(it); - } -} - -void HotStuffBase::on_fetch_cmd(const command_t &cmd) { - const uint256_t &cmd_hash = cmd->get_hash(); - auto it = cmd_fetch_waiting.find(cmd_hash); - if (it != cmd_fetch_waiting.end()) - { - it->second.resolve(cmd); - cmd_fetch_waiting.erase(it); - } -} - -void HotStuffBase::on_deliver_blk(const block_t &blk) { - const uint256_t &blk_hash = blk->get_hash(); - bool valid; - /* sanity check: all parents must be delivered */ - for (const auto &p: blk->get_parent_hashes()) - assert(storage->is_blk_delivered(p)); - if ((valid = HotStuffCore::on_deliver_blk(blk))) - { - LOG_DEBUG("block %.10s delivered", - get_hex(blk_hash).c_str()); - part_parent_size += blk->get_parent_hashes().size(); - part_delivered++; - delivered++; - } - else - { - LOG_WARN("dropping invalid block"); - } - - auto it = blk_delivery_waiting.find(blk_hash); - if (it != blk_delivery_waiting.end()) - { - auto &pm = it->second; - if (valid) - { - pm.elapsed.stop(false); - auto sec = pm.elapsed.elapsed_sec; - part_delivery_time += sec; - part_delivery_time_min = std::min(part_delivery_time_min, sec); - part_delivery_time_max = std::max(part_delivery_time_max, sec); - - pm.resolve(blk); - } - else - { - pm.reject(blk); - // TODO: do we need to also free it from storage? - } - blk_delivery_waiting.erase(it); - } -} - -promise_t HotStuffBase::async_fetch_blk(const uint256_t &blk_hash, - const NetAddr *replica_id, - bool fetch_now) { - if (storage->is_blk_fetched(blk_hash)) - return promise_t([this, &blk_hash](promise_t pm){ - pm.resolve(storage->find_blk(blk_hash)); - }); - auto it = blk_fetch_waiting.find(blk_hash); - if (it == blk_fetch_waiting.end()) - { -#ifdef HOTSTUFF_ENABLE_TX_PROFILE - blk_profiler.rec_tx(blk_hash, false); -#endif - it = blk_fetch_waiting.insert( - std::make_pair( - blk_hash, - BlockFetchContext(blk_hash, this))).first; - } - if (replica_id != nullptr) - it->second.add_replica(*replica_id, fetch_now); - return static_cast(it->second); -} - -promise_t HotStuffBase::async_fetch_cmd(const uint256_t &cmd_hash, - const NetAddr *replica_id, - bool fetch_now) { - if (storage->is_cmd_fetched(cmd_hash)) - return promise_t([this, &cmd_hash](promise_t pm){ - pm.resolve(storage->find_cmd(cmd_hash)); - }); - auto it = cmd_fetch_waiting.find(cmd_hash); - if (it == cmd_fetch_waiting.end()) - { - it = cmd_fetch_waiting.insert( - std::make_pair(cmd_hash, CmdFetchContext(cmd_hash, this))).first; - } - if (replica_id != nullptr) - it->second.add_replica(*replica_id, fetch_now); - return static_cast(it->second); -} - -promise_t HotStuffBase::async_deliver_blk(const uint256_t &blk_hash, - const NetAddr &replica_id) { - if (storage->is_blk_delivered(blk_hash)) - return promise_t([this, &blk_hash](promise_t pm) { - pm.resolve(storage->find_blk(blk_hash)); - }); - auto it = blk_delivery_waiting.find(blk_hash); - if (it != blk_delivery_waiting.end()) - return static_cast(it->second); - BlockDeliveryContext pm{[](promise_t){}}; - it = blk_delivery_waiting.insert(std::make_pair(blk_hash, pm)).first; - /* otherwise the on_deliver_batch will resolve */ - async_fetch_blk(blk_hash, &replica_id).then([this, replica_id](block_t blk) { - /* qc_ref should be fetched */ - std::vector pms; - const auto &qc = blk->get_qc(); - if (qc) - pms.push_back(async_fetch_blk(qc->get_blk_hash(), &replica_id)); - /* the parents should be delivered */ - for (const auto &phash: blk->get_parent_hashes()) - pms.push_back(async_deliver_blk(phash, replica_id)); - promise::all(pms).then([this, blk]() { - on_deliver_blk(blk); - }); - }); - return static_cast(pm); -} - -void HotStuffBase::propose_handler(const MsgHotStuff &msg, conn_t conn_) { - auto conn = static_pointer_cast::Conn>(conn_); - const NetAddr &peer = conn->get_peer(); - Proposal prop(this); - msg.parse_propose(prop); - block_t blk = prop.blk; - promise::all(std::vector{ - async_deliver_blk(prop.bqc_hash, peer), - async_deliver_blk(blk->get_hash(), peer), - }).then([this, prop = std::move(prop)]() { - on_receive_proposal(prop); - }); -} - -void HotStuffBase::vote_handler(const MsgHotStuff &msg, conn_t conn_) { - auto conn = static_pointer_cast::Conn>(conn_); - const NetAddr &peer = conn->get_peer(); - Vote vote(this); - msg.parse_vote(vote); - promise::all(std::vector{ - async_deliver_blk(vote.bqc_hash, peer), - async_deliver_blk(vote.blk_hash, peer) - }).then([this, vote = std::move(vote)]() { - on_receive_vote(vote); - }); -} - -void HotStuffBase::query_fetch_blk_handler(const MsgHotStuff &msg, conn_t conn_) { - auto conn = static_pointer_cast::Conn>(conn_); - const NetAddr replica = conn->get_peer(); - std::vector blk_hashes; - msg.parse_qfetchblk(blk_hashes); - - std::vector pms; - for (const auto &h: blk_hashes) - pms.push_back(async_fetch_blk(h, nullptr)); - promise::all(pms).then([replica, this](const promise::values_t values) { - MsgHotStuff resp; - std::vector blks; - for (auto &v: values) - { - auto blk = promise::any_cast(v); - blks.push_back(blk); - } - resp.gen_rfetchblk(blks); - pn.send_msg(resp, replica); - }); -} - -void HotStuffBase::resp_fetch_blk_handler(const MsgHotStuff &msg, conn_t) { - std::vector blks; - msg.parse_rfetchblk(blks, this); - for (const auto &blk: blks) - if (blk) on_fetch_blk(blk); -} - -void HotStuffBase::print_stat() const { - LOG_INFO("===== begin stats ====="); - LOG_INFO("-------- queues -------"); - LOG_INFO("blk_fetch_waiting: %lu", blk_fetch_waiting.size()); - LOG_INFO("blk_delivery_waiting: %lu", blk_delivery_waiting.size()); - LOG_INFO("cmd_fetch_waiting: %lu", cmd_fetch_waiting.size()); - LOG_INFO("decision_waiting: %lu", decision_waiting.size()); - LOG_INFO("-------- misc ---------"); - LOG_INFO("fetched: %lu", fetched); - LOG_INFO("delivered: %lu", delivered); - LOG_INFO("cmd_cache: %lu", storage->get_cmd_cache_size()); - LOG_INFO("blk_cache: %lu", storage->get_blk_cache_size()); - LOG_INFO("------ misc (10s) -----"); - LOG_INFO("fetched: %lu", part_fetched); - LOG_INFO("delivered: %lu", part_delivered); - LOG_INFO("decided: %lu", part_decided); - LOG_INFO("gened: %lu", part_gened); - LOG_INFO("avg. parent_size: %.3f", - part_delivered ? part_parent_size / double(part_delivered) : 0); - LOG_INFO("delivery time: %.3f avg, %.3f min, %.3f max", - part_delivered ? part_delivery_time / double(part_delivered) : 0, - part_delivery_time_min == double_inf ? 0 : part_delivery_time_min, - part_delivery_time_max); - - part_parent_size = 0; - part_fetched = 0; - part_delivered = 0; - part_decided = 0; - part_gened = 0; - part_delivery_time = 0; - part_delivery_time_min = double_inf; - part_delivery_time_max = 0; - LOG_INFO("-- sent opcode (10s) --"); - auto &sent_op = pn.get_sent_by_opcode(); - for (auto &op: sent_op) - { - auto &val = op.second; - LOG_INFO("%02x: %lu, %.2fBpm", op.first, - val.first, val.first ? val.second / double(val.first) : 0); - val.first = val.second = 0; - } - LOG_INFO("-- recv opcode (10s) --"); - auto &recv_op = pn.get_recv_by_opcode(); - for (auto &op: recv_op) - { - auto &val = op.second; - LOG_INFO("%02x: %lu, %.2fBpm", op.first, - val.first, val.first ? val.second / double(val.first) : 0); - val.first = val.second = 0; - } - LOG_INFO("--- replica msg. (10s) ---"); - size_t _nsent = 0; - size_t _nrecv = 0; - for (const auto &replica: pn.all_peers()) - { - auto conn = pn.get_peer_conn(replica); - size_t ns = conn->get_nsent(); - size_t nr = conn->get_nrecv(); - conn->clear_nsent(); - conn->clear_nrecv(); - LOG_INFO("%s: %u, %u, %u", - std::string(replica).c_str(), ns, nr, part_fetched_replica[replica]); - _nsent += ns; - _nrecv += nr; - part_fetched_replica[replica] = 0; - } - nsent += _nsent; - nrecv += _nrecv; - LOG_INFO("sent: %lu", _nsent); - LOG_INFO("recv: %lu", _nrecv); - LOG_INFO("--- replica msg. total ---"); - LOG_INFO("sent: %lu", nsent); - LOG_INFO("recv: %lu", nrecv); - LOG_INFO("====== end stats ======"); -} - -promise_t HotStuffBase::async_decide(const uint256_t &cmd_hash) { - if (get_cmd_decision(cmd_hash)) - return promise_t([this, cmd_hash](promise_t pm){ - pm.resolve(storage->find_cmd(cmd_hash)); - }); - /* otherwise the do_decide will resolve the promise */ - auto it = decision_waiting.find(cmd_hash); - if (it == decision_waiting.end()) - { - promise_t pm{[](promise_t){}}; - it = decision_waiting.insert(std::make_pair(cmd_hash, pm)).first; - } - return it->second; -} - -HotStuffBase::HotStuffBase(uint32_t blk_size, - int32_t parent_limit, - ReplicaID rid, - privkey_bt &&priv_key, - NetAddr listen_addr, - EventContext eb, - pacemaker_bt pmaker): - HotStuffCore(rid, std::move(priv_key), parent_limit), - listen_addr(listen_addr), - blk_size(blk_size), - eb(eb), - pmaker(std::move(pmaker)), - pn(eb), - - fetched(0), delivered(0), - nsent(0), nrecv(0), - part_parent_size(0), - part_fetched(0), - part_delivered(0), - part_decided(0), - part_gened(0), - part_delivery_time(0), - part_delivery_time_min(double_inf), - part_delivery_time_max(0) -{ - if (pmaker == nullptr) - this->pmaker = new PaceMakerDummy(this); - /* register the handlers for msg from replicas */ - pn.reg_handler(PROPOSE, std::bind(&HotStuffBase::propose_handler, this, _1, _2)); - pn.reg_handler(VOTE, std::bind(&HotStuffBase::vote_handler, this, _1, _2)); - pn.reg_handler(QUERY_FETCH_BLK, std::bind(&HotStuffBase::query_fetch_blk_handler, this, _1, _2)); - pn.reg_handler(RESP_FETCH_BLK, std::bind(&HotStuffBase::resp_fetch_blk_handler, this, _1, _2)); - pn.init(listen_addr); -} - -void HotStuffBase::do_broadcast_proposal(const Proposal &prop) { - MsgHotStuff prop_msg; - prop_msg.gen_propose(prop); - for (const auto &replica: pn.all_peers()) - pn.send_msg(prop_msg, replica); -} - -void HotStuffBase::do_vote(ReplicaID last_proposer, const Vote &vote) { - MsgHotStuff vote_msg; - vote_msg.gen_vote(vote); - pmaker->next_proposer(last_proposer) - .then([this, vote_msg](ReplicaID proposer) { - pn.send_msg(vote_msg, get_config().get_addr(proposer)); - }); -} - -void HotStuffBase::do_decide(const command_t &cmd) { - auto it = decision_waiting.find(cmd->get_hash()); - if (it != decision_waiting.end()) - { - it->second.resolve(cmd); - decision_waiting.erase(it); - } -} - -HotStuffBase::~HotStuffBase() {} - -void HotStuffBase::start(bool eb_loop) { - /* ((n - 1) + 1 - 1) / 3 */ - uint32_t nfaulty = pn.all_peers().size() / 3; - if (nfaulty == 0) - LOG_WARN("too few replicas in the system to tolerate any failure"); - on_init(nfaulty); - if (eb_loop) - eb.dispatch(); -} - -} -- cgit v1.2.3