From efd49718dfbb6c1f329e72026770b07a67348168 Mon Sep 17 00:00:00 2001 From: Determinant Date: Mon, 16 Jul 2018 04:13:57 -0400 Subject: init --- src/core.cpp | 723 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 723 insertions(+) create mode 100644 src/core.cpp (limited to 'src/core.cpp') diff --git a/src/core.cpp b/src/core.cpp new file mode 100644 index 0000000..d6a4cc7 --- /dev/null +++ b/src/core.cpp @@ -0,0 +1,723 @@ +#include +#include "core.h" + +using salticidae::DataStream; +using salticidae::static_pointer_cast; +using salticidae::get_hash; + +#define LOG_INFO HOTSTUFF_LOG_INFO +#define LOG_DEBUG HOTSTUFF_LOG_DEBUG +#define LOG_WARN HOTSTUFF_LOG_WARN + +namespace hotstuff { + +void MsgHotStuff::gen_propose(const Proposal &proposal) { + DataStream s; + set_opcode(PROPOSE); + s << proposal; + set_payload(std::move(s)); +} + +void MsgHotStuff::parse_propose(Proposal &proposal) const { + DataStream(get_payload()) >> proposal; +} + +void MsgHotStuff::gen_vote(const Vote &vote) { + DataStream s; + set_opcode(VOTE); + s << vote; + set_payload(std::move(s)); +} + +void MsgHotStuff::parse_vote(Vote &vote) const { + DataStream(get_payload()) >> vote; +} + +void MsgHotStuff::gen_qfetchblk(const std::vector &blk_hashes) { + DataStream s; + set_opcode(QUERY_FETCH_BLK); + gen_hash_list(s, blk_hashes); + set_payload(std::move(s)); +} + +void MsgHotStuff::parse_qfetchblk(std::vector &blk_hashes) const { + DataStream s(get_payload()); + parse_hash_list(s, blk_hashes); +} + +void MsgHotStuff::gen_rfetchblk(const std::vector &blks) { + DataStream s; + set_opcode(RESP_FETCH_BLK); + s << htole((uint32_t)blks.size()); + for (auto blk: blks) s << *blk; + set_payload(std::move(s)); +} + +void MsgHotStuff::parse_rfetchblk(std::vector &blks, HotStuffCore *hsc) const { + DataStream s; + uint32_t size; + s >> size; + size = letoh(size); + blks.resize(size); + for (auto &blk: blks) + { + Block _blk; + _blk.unserialize(s, hsc); + if (!_blk.verify(hsc->get_config())) + blk = hsc->storage->add_blk(std::move(_blk)); + else + { + blk = nullptr; + LOG_WARN("block is invalid"); + } + } +} + +/* The core logic of HotStuff, is farily simple :) */ +/*** begin HotStuff protocol logic ***/ +HotStuffCore::HotStuffCore(ReplicaID id, + privkey_bt &&priv_key, + int32_t parent_limit): + b0(new Block(true, 1)), + bqc(b0), + bexec(b0), + vheight(0), + priv_key(std::move(priv_key)), + tails{bqc}, + id(id), + parent_limit(parent_limit), + storage(new EntityStorage()) { + storage->add_blk(b0); + b0->qc_ref = b0; +} + +void HotStuffCore::sanity_check_delivered(const block_t &blk) { + if (!blk->delivered) + throw std::runtime_error("block not delivered"); +} + +block_t HotStuffCore::sanity_check_delivered(const uint256_t &blk_hash) { + block_t blk = storage->find_blk(blk_hash); + if (blk == nullptr || !blk->delivered) + throw std::runtime_error("block not delivered"); + return std::move(blk); +} + +bool HotStuffCore::on_deliver_blk(const block_t &blk) { + if (blk->delivered) + { + LOG_WARN("attempt to deliver a block twice"); + return false; + } + blk->parents.clear(); + for (const auto &hash: blk->parent_hashes) + { + block_t p = sanity_check_delivered(hash); + blk->parents.push_back(p); + } + blk->height = blk->parents[0]->height + 1; + for (const auto &cmd: blk->cmds) + cmd->container = blk; + + if (blk->qc) + { + block_t _blk = storage->find_blk(blk->qc->get_blk_hash()); + if (_blk == nullptr) + throw std::runtime_error("block referred by qc not fetched"); + blk->qc_ref = std::move(_blk); + } // otherwise blk->qc_ref remains null + + for (auto pblk: blk->parents) tails.erase(pblk); + tails.insert(blk); + + blk->delivered = true; + LOG_DEBUG("delivered %.10s", get_hex(blk->get_hash()).c_str()); + return true; +} + +void HotStuffCore::check_commit(const block_t &_blk) { + const block_t &blk = _blk->qc_ref; + if (blk->qc_ref == nullptr) return; + if (blk->decision) return; + block_t p = blk->parents[0]; + if (p == blk->qc_ref) + { /* commit */ + std::vector commit_queue; + block_t b; + for (b = p; b->height > bexec->height; b = b->parents[0]) + { /* todo: also commit the uncles/aunts */ + commit_queue.push_back(b); + } + if (b != bexec) + throw std::runtime_error("safety breached :("); + for (auto it = commit_queue.rbegin(); it != commit_queue.rend(); it++) + { + const block_t &blk = *it; + blk->decision = 1; +#ifdef HOTSTUFF_ENABLE_LOG_PROTO + LOG_INFO("commit blk %.10s", get_hex10(blk->get_hash()).c_str()); +#endif + for (auto cmd: blk->cmds) + do_decide(cmd); + } + bexec = p; + } +} + +bool HotStuffCore::update(const uint256_t &bqc_hash) { + block_t _bqc = sanity_check_delivered(bqc_hash); + if (_bqc->qc_ref == nullptr) return false; + check_commit(_bqc); + if (_bqc->qc_ref->height > bqc->qc_ref->height) + bqc = _bqc; + return true; +} + +void HotStuffCore::on_propose(const std::vector &cmds) { + size_t nparents = parent_limit < 1 ? tails.size() : parent_limit; + assert(tails.size() > 0); + block_t p = *tails.rbegin(); + std::vector parents{p}; + tails.erase(p); + nparents--; + /* add the rest of tails as "uncles/aunts" */ + while (nparents--) + { + auto it = tails.begin(); + parents.push_back(*it); + tails.erase(it); + } + quorum_cert_bt qc = nullptr; + block_t qc_ref = nullptr; + if (p != b0 && p->voted.size() >= config.nmajority) + { + qc = p->self_qc->clone(); + qc->compute(); + qc_ref = p; + } + /* create a new block */ + block_t bnew = storage->add_blk( + Block( + parents, + cmds, + p->height + 1, + std::move(qc), qc_ref, + nullptr + )); + const uint256_t bnew_hash = bnew->get_hash(); + bnew->self_qc = create_quorum_cert(bnew_hash); + on_deliver_blk(bnew); + update(bnew_hash); + Proposal prop(id, bqc->get_hash(), bnew, nullptr); +#ifdef HOTSTUFF_ENABLE_LOG_PROTO + LOG_INFO("propose %s", std::string(*bnew).c_str()); +#endif + /* self-vote */ + on_receive_vote( + Vote(id, bqc->get_hash(), bnew_hash, + create_part_cert(*priv_key, bnew_hash), this)); + on_propose_(bnew); + /* boradcast to other replicas */ + do_broadcast_proposal(prop); +} + +void HotStuffCore::on_receive_proposal(const Proposal &prop) { + if (!update(prop.bqc_hash)) return; +#ifdef HOTSTUFF_ENABLE_LOG_PROTO + LOG_INFO("got %s", std::string(prop).c_str()); +#endif + block_t bnew = prop.blk; + sanity_check_delivered(bnew); + bool opinion = false; + if (bnew->height > vheight) + { + block_t pref = bqc->qc_ref; + block_t b; + for (b = bnew; + b->height > pref->height; + b = b->parents[0]); + opinion = b == pref; + vheight = bnew->height; + } +#ifdef HOTSTUFF_ENABLE_LOG_PROTO + LOG_INFO("now state: %s", std::string(*this).c_str()); +#endif + do_vote(prop.proposer, + Vote(id, + bqc->get_hash(), + bnew->get_hash(), + (opinion ? + create_part_cert(*priv_key, bnew->get_hash()) : + nullptr), + nullptr)); +} + +void HotStuffCore::on_receive_vote(const Vote &vote) { + if (!update(vote.bqc_hash)) return; +#ifdef HOTSTUFF_ENABLE_LOG_PROTO + LOG_INFO("got %s", std::string(vote).c_str()); + LOG_INFO("now state: %s", std::string(*this).c_str()); +#endif + + block_t blk = sanity_check_delivered(vote.blk_hash); + if (vote.cert == nullptr) return; + if (!vote.verify()) + { + LOG_WARN("invalid vote"); + return; + } + if (!blk->voted.insert(vote.voter).second) + { + LOG_WARN("duplicate votes"); + return; + } + size_t qsize = blk->voted.size(); + if (qsize <= config.nmajority) + { + blk->self_qc->add_part(vote.voter, *vote.cert); + if (qsize == config.nmajority) + on_qc_finish(blk); + } +} +/*** end HotStuff protocol logic ***/ + +void HotStuffCore::prune(uint32_t staleness) { + block_t start; + /* skip the blocks */ + for (start = bexec; staleness; staleness--, start = start->parents[0]) + if (!start->parents.size()) return; + std::stack s; + start->qc_ref = nullptr; + s.push(start); + while (!s.empty()) + { + auto &blk = s.top(); + if (blk->parents.empty()) + { + storage->try_release_blk(blk); + s.pop(); + continue; + } + blk->qc_ref = nullptr; + s.push(blk->parents.back()); + blk->parents.pop_back(); + } +} + +int8_t HotStuffCore::get_cmd_decision(const uint256_t &cmd_hash) { + auto cmd = storage->find_cmd(cmd_hash); + return cmd != nullptr ? cmd->get_decision() : 0; +} + +void HotStuffCore::add_replica(ReplicaID rid, const NetAddr &addr, + pubkey_bt &&pub_key) { + config.add_replica(rid, + ReplicaInfo(rid, addr, std::move(pub_key))); + b0->voted.insert(rid); +} + +promise_t HotStuffCore::async_qc_finish(const block_t &blk) { + if (blk->voted.size() >= config.nmajority) + return promise_t([](promise_t &pm) { + pm.resolve(); + }); + auto it = qc_waiting.find(blk); + if (it == qc_waiting.end()) + it = qc_waiting.insert(std::make_pair(blk, promise_t())).first; + return it->second; +} + +void HotStuffCore::on_qc_finish(const block_t &blk) { + auto it = qc_waiting.find(blk); + if (it != qc_waiting.end()) + { + it->second.resolve(); + qc_waiting.erase(it); + } +} + +promise_t HotStuffCore::async_wait_propose() { + return propose_waiting; +} + +void HotStuffCore::on_propose_(const block_t &blk) { + auto t = std::move(propose_waiting); + propose_waiting = promise_t(); + t.resolve(blk); +} + +HotStuffCore::operator std::string () const { + DataStream s; + s << ""; + return std::string(std::move(s)); +} + +void HotStuffBase::add_replica(ReplicaID idx, const NetAddr &addr, + pubkey_bt &&pub_key) { + HotStuffCore::add_replica(idx, addr, std::move(pub_key)); + if (addr != listen_addr) + pn.add_peer(addr); +} + +void HotStuffBase::on_fetch_blk(const block_t &blk) { +#ifdef HOTSTUFF_ENABLE_TX_PROFILE + blk_profiler.get_tx(blk->get_hash()); +#endif + LOG_DEBUG("fetched %.10s", get_hex(blk->get_hash()).c_str()); + part_fetched++; + fetched++; + for (auto cmd: blk->get_cmds()) on_fetch_cmd(cmd); + const uint256_t &blk_hash = blk->get_hash(); + auto it = blk_fetch_waiting.find(blk_hash); + if (it != blk_fetch_waiting.end()) + { + it->second.resolve(blk); + blk_fetch_waiting.erase(it); + } +} + +void HotStuffBase::on_fetch_cmd(const command_t &cmd) { + const uint256_t &cmd_hash = cmd->get_hash(); + auto it = cmd_fetch_waiting.find(cmd_hash); + if (it != cmd_fetch_waiting.end()) + { + it->second.resolve(cmd); + cmd_fetch_waiting.erase(it); + } +} + +void HotStuffBase::on_deliver_blk(const block_t &blk) { + const uint256_t &blk_hash = blk->get_hash(); + bool valid; + /* sanity check: all parents must be delivered */ + for (const auto &p: blk->get_parent_hashes()) + assert(storage->is_blk_delivered(p)); + if ((valid = HotStuffCore::on_deliver_blk(blk))) + { + LOG_DEBUG("block %.10s delivered", + get_hex(blk_hash).c_str()); + part_parent_size += blk->get_parent_hashes().size(); + part_delivered++; + delivered++; + } + else + { + LOG_WARN("dropping invalid block"); + } + + auto it = blk_delivery_waiting.find(blk_hash); + if (it != blk_delivery_waiting.end()) + { + auto &pm = it->second; + if (valid) + { + pm.elapsed.stop(false); + auto sec = pm.elapsed.elapsed_sec; + part_delivery_time += sec; + part_delivery_time_min = std::min(part_delivery_time_min, sec); + part_delivery_time_max = std::max(part_delivery_time_max, sec); + + pm.resolve(blk); + } + else + { + pm.reject(blk); + // TODO: do we need to also free it from storage? + } + blk_delivery_waiting.erase(it); + } +} + +promise_t HotStuffBase::async_fetch_blk(const uint256_t &blk_hash, + const NetAddr *replica_id, + bool fetch_now) { + if (storage->is_blk_fetched(blk_hash)) + return promise_t([this, &blk_hash](promise_t pm){ + pm.resolve(storage->find_blk(blk_hash)); + }); + auto it = blk_fetch_waiting.find(blk_hash); + if (it == blk_fetch_waiting.end()) + { +#ifdef HOTSTUFF_ENABLE_TX_PROFILE + blk_profiler.rec_tx(blk_hash, false); +#endif + it = blk_fetch_waiting.insert( + std::make_pair( + blk_hash, + BlockFetchContext(blk_hash, this))).first; + } + if (replica_id != nullptr) + it->second.add_replica(*replica_id, fetch_now); + return static_cast(it->second); +} + +promise_t HotStuffBase::async_fetch_cmd(const uint256_t &cmd_hash, + const NetAddr *replica_id, + bool fetch_now) { + if (storage->is_cmd_fetched(cmd_hash)) + return promise_t([this, &cmd_hash](promise_t pm){ + pm.resolve(storage->find_cmd(cmd_hash)); + }); + auto it = cmd_fetch_waiting.find(cmd_hash); + if (it == cmd_fetch_waiting.end()) + { + it = cmd_fetch_waiting.insert( + std::make_pair(cmd_hash, CmdFetchContext(cmd_hash, this))).first; + } + if (replica_id != nullptr) + it->second.add_replica(*replica_id, fetch_now); + return static_cast(it->second); +} + +promise_t HotStuffBase::async_deliver_blk(const uint256_t &blk_hash, + const NetAddr &replica_id) { + if (storage->is_blk_delivered(blk_hash)) + return promise_t([this, &blk_hash](promise_t pm) { + pm.resolve(storage->find_blk(blk_hash)); + }); + auto it = blk_delivery_waiting.find(blk_hash); + if (it != blk_delivery_waiting.end()) + return static_cast(it->second); + BlockDeliveryContext pm{[](promise_t){}}; + it = blk_delivery_waiting.insert(std::make_pair(blk_hash, pm)).first; + /* otherwise the on_deliver_batch will resolve */ + async_fetch_blk(blk_hash, &replica_id).then([this, replica_id](block_t blk) { + /* qc_ref should be fetched */ + std::vector pms; + const auto &qc = blk->get_qc(); + if (qc) + pms.push_back(async_fetch_blk(qc->get_blk_hash(), &replica_id)); + /* the parents should be delivered */ + for (const auto &phash: blk->get_parent_hashes()) + pms.push_back(async_deliver_blk(phash, replica_id)); + promise::all(pms).then([this, blk]() { + on_deliver_blk(blk); + }); + }); + return static_cast(pm); +} + +void HotStuffBase::propose_handler(const MsgHotStuff &msg, conn_t conn_) { + auto conn = static_pointer_cast::Conn>(conn_); + const NetAddr &peer = conn->get_peer(); + Proposal prop(this); + msg.parse_propose(prop); + block_t blk = prop.blk; + promise::all(std::vector{ + async_deliver_blk(prop.bqc_hash, peer), + async_deliver_blk(blk->get_hash(), peer), + }).then([this, prop = std::move(prop)]() { + on_receive_proposal(prop); + }); +} + +void HotStuffBase::vote_handler(const MsgHotStuff &msg, conn_t conn_) { + auto conn = static_pointer_cast::Conn>(conn_); + const NetAddr &peer = conn->get_peer(); + Vote vote(this); + msg.parse_vote(vote); + promise::all(std::vector{ + async_deliver_blk(vote.bqc_hash, peer), + async_deliver_blk(vote.blk_hash, peer) + }).then([this, vote = std::move(vote)]() { + on_receive_vote(vote); + }); +} + +void HotStuffBase::query_fetch_blk_handler(const MsgHotStuff &msg, conn_t conn_) { + auto conn = static_pointer_cast::Conn>(conn_); + const NetAddr replica = conn->get_peer(); + std::vector blk_hashes; + msg.parse_qfetchblk(blk_hashes); + + std::vector pms; + for (const auto &h: blk_hashes) + pms.push_back(async_fetch_blk(h, nullptr)); + promise::all(pms).then([replica, this](const promise::values_t values) { + MsgHotStuff resp; + std::vector blks; + for (auto &v: values) + { + auto blk = promise::any_cast(v); + blks.push_back(blk); + } + resp.gen_rfetchblk(blks); + pn.send_msg(resp, replica); + }); +} + +void HotStuffBase::resp_fetch_blk_handler(const MsgHotStuff &msg, conn_t) { + std::vector blks; + msg.parse_rfetchblk(blks, this); + for (const auto &blk: blks) + if (blk) on_fetch_blk(blk); +} + +void HotStuffBase::print_stat() const { + LOG_INFO("===== begin stats ====="); + LOG_INFO("-------- queues -------"); + LOG_INFO("blk_fetch_waiting: %lu", blk_fetch_waiting.size()); + LOG_INFO("blk_delivery_waiting: %lu", blk_delivery_waiting.size()); + LOG_INFO("cmd_fetch_waiting: %lu", cmd_fetch_waiting.size()); + LOG_INFO("decision_waiting: %lu", decision_waiting.size()); + LOG_INFO("-------- misc ---------"); + LOG_INFO("fetched: %lu", fetched); + LOG_INFO("delivered: %lu", delivered); + LOG_INFO("cmd_cache: %lu", storage->get_cmd_cache_size()); + LOG_INFO("blk_cache: %lu", storage->get_blk_cache_size()); + LOG_INFO("------ misc (10s) -----"); + LOG_INFO("fetched: %lu", part_fetched); + LOG_INFO("delivered: %lu", part_delivered); + LOG_INFO("decided: %lu", part_decided); + LOG_INFO("gened: %lu", part_gened); + LOG_INFO("avg. parent_size: %.3f", + part_delivered ? part_parent_size / double(part_delivered) : 0); + LOG_INFO("delivery time: %.3f avg, %.3f min, %.3f max", + part_delivered ? part_delivery_time / double(part_delivered) : 0, + part_delivery_time_min == double_inf ? 0 : part_delivery_time_min, + part_delivery_time_max); + + part_parent_size = 0; + part_fetched = 0; + part_delivered = 0; + part_decided = 0; + part_gened = 0; + part_delivery_time = 0; + part_delivery_time_min = double_inf; + part_delivery_time_max = 0; + LOG_INFO("-- sent opcode (10s) --"); + auto &sent_op = pn.get_sent_by_opcode(); + for (auto &op: sent_op) + { + auto &val = op.second; + LOG_INFO("%02x: %lu, %.2fBpm", op.first, + val.first, val.first ? val.second / double(val.first) : 0); + val.first = val.second = 0; + } + LOG_INFO("-- recv opcode (10s) --"); + auto &recv_op = pn.get_recv_by_opcode(); + for (auto &op: recv_op) + { + auto &val = op.second; + LOG_INFO("%02x: %lu, %.2fBpm", op.first, + val.first, val.first ? val.second / double(val.first) : 0); + val.first = val.second = 0; + } + LOG_INFO("--- replica msg. (10s) ---"); + size_t _nsent = 0; + size_t _nrecv = 0; + for (const auto &replica: pn.all_peers()) + { + auto conn = pn.get_peer_conn(replica); + size_t ns = conn->get_nsent(); + size_t nr = conn->get_nrecv(); + conn->clear_nsent(); + conn->clear_nrecv(); + LOG_INFO("%s: %u, %u, %u", + std::string(replica).c_str(), ns, nr, part_fetched_replica[replica]); + _nsent += ns; + _nrecv += nr; + part_fetched_replica[replica] = 0; + } + nsent += _nsent; + nrecv += _nrecv; + LOG_INFO("sent: %lu", _nsent); + LOG_INFO("recv: %lu", _nrecv); + LOG_INFO("--- replica msg. total ---"); + LOG_INFO("sent: %lu", nsent); + LOG_INFO("recv: %lu", nrecv); + LOG_INFO("====== end stats ======"); +} + +promise_t HotStuffBase::async_decide(const uint256_t &cmd_hash) { + if (get_cmd_decision(cmd_hash)) + return promise_t([this, cmd_hash](promise_t pm){ + pm.resolve(storage->find_cmd(cmd_hash)); + }); + /* otherwise the do_decide will resolve the promise */ + auto it = decision_waiting.find(cmd_hash); + if (it == decision_waiting.end()) + { + promise_t pm{[](promise_t){}}; + it = decision_waiting.insert(std::make_pair(cmd_hash, pm)).first; + } + return it->second; +} + +HotStuffBase::HotStuffBase(uint32_t blk_size, + int32_t parent_limit, + ReplicaID rid, + privkey_bt &&priv_key, + NetAddr listen_addr, + EventContext eb, + pacemaker_bt pmaker): + HotStuffCore(rid, std::move(priv_key), parent_limit), + listen_addr(listen_addr), + blk_size(blk_size), + eb(eb), + pmaker(std::move(pmaker)), + pn(eb), + + fetched(0), delivered(0), + nsent(0), nrecv(0), + part_parent_size(0), + part_fetched(0), + part_delivered(0), + part_decided(0), + part_gened(0), + part_delivery_time(0), + part_delivery_time_min(double_inf), + part_delivery_time_max(0) +{ + if (pmaker == nullptr) + this->pmaker = new PaceMakerDummy(this); + /* register the handlers for msg from replicas */ + pn.reg_handler(PROPOSE, std::bind(&HotStuffBase::propose_handler, this, _1, _2)); + pn.reg_handler(VOTE, std::bind(&HotStuffBase::vote_handler, this, _1, _2)); + pn.reg_handler(QUERY_FETCH_BLK, std::bind(&HotStuffBase::query_fetch_blk_handler, this, _1, _2)); + pn.reg_handler(RESP_FETCH_BLK, std::bind(&HotStuffBase::resp_fetch_blk_handler, this, _1, _2)); + pn.init(listen_addr); +} + +void HotStuffBase::do_broadcast_proposal(const Proposal &prop) { + MsgHotStuff prop_msg; + prop_msg.gen_propose(prop); + for (const auto &replica: pn.all_peers()) + pn.send_msg(prop_msg, replica); +} + +void HotStuffBase::do_vote(ReplicaID last_proposer, const Vote &vote) { + MsgHotStuff vote_msg; + vote_msg.gen_vote(vote); + pmaker->next_proposer(last_proposer) + .then([this, vote_msg](ReplicaID proposer) { + pn.send_msg(vote_msg, get_config().get_addr(proposer)); + }); +} + +void HotStuffBase::do_decide(const command_t &cmd) { + auto it = decision_waiting.find(cmd->get_hash()); + if (it != decision_waiting.end()) + { + it->second.resolve(cmd); + decision_waiting.erase(it); + } +} + +HotStuffBase::~HotStuffBase() {} + +void HotStuffBase::start(bool eb_loop) { + /* ((n - 1) + 1 - 1) / 3 */ + uint32_t nfaulty = pn.all_peers().size() / 3; + if (nfaulty == 0) + LOG_WARN("too few replicas in the system to tolerate any failure"); + on_init(nfaulty); + if (eb_loop) + eb.dispatch(); +} + +} -- cgit v1.2.3