/** * Copyright 2018 VMware * Copyright 2018 Ted Yin * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "hotstuff/hotstuff.h" #include "hotstuff/client.h" #include "hotstuff/liveness.h" using salticidae::static_pointer_cast; #define LOG_INFO HOTSTUFF_LOG_INFO #define LOG_DEBUG HOTSTUFF_LOG_DEBUG #define LOG_WARN HOTSTUFF_LOG_WARN namespace hotstuff { const opcode_t MsgPropose::opcode; MsgPropose::MsgPropose(const Proposal &proposal) { serialized << proposal; } void MsgPropose::postponed_parse(HotStuffCore *hsc) { proposal.hsc = hsc; serialized >> proposal; } const opcode_t MsgVote::opcode; MsgVote::MsgVote(const Vote &vote) { serialized << vote; } void MsgVote::postponed_parse(HotStuffCore *hsc) { vote.hsc = hsc; serialized >> vote; } const opcode_t MsgReqBlock::opcode; MsgReqBlock::MsgReqBlock(const std::vector &blk_hashes) { serialized << htole((uint32_t)blk_hashes.size()); for (const auto &h: blk_hashes) serialized << h; } MsgReqBlock::MsgReqBlock(DataStream &&s) { uint32_t size; s >> size; size = letoh(size); blk_hashes.resize(size); for (auto &h: blk_hashes) s >> h; } const opcode_t MsgRespBlock::opcode; MsgRespBlock::MsgRespBlock(const std::vector &blks) { serialized << htole((uint32_t)blks.size()); for (auto blk: blks) serialized << *blk; } void MsgRespBlock::postponed_parse(HotStuffCore *hsc) { uint32_t size; serialized >> size; size = letoh(size); blks.resize(size); for (auto &blk: blks) { Block _blk; _blk.unserialize(serialized, hsc); blk = hsc->storage->add_blk(std::move(_blk), hsc->get_config()); } } // TODO: improve this function void HotStuffBase::exec_command(uint256_t cmd_hash, commit_cb_t callback) { cmd_pending.enqueue(std::make_pair(cmd_hash, callback)); } void HotStuffBase::on_fetch_blk(const block_t &blk) { #ifdef HOTSTUFF_BLK_PROFILE blk_profiler.get_tx(blk->get_hash()); #endif LOG_DEBUG("fetched %.10s", get_hex(blk->get_hash()).c_str()); part_fetched++; fetched++; //for (auto cmd: blk->get_cmds()) on_fetch_cmd(cmd); const uint256_t &blk_hash = blk->get_hash(); auto it = blk_fetch_waiting.find(blk_hash); if (it != blk_fetch_waiting.end()) { it->second.resolve(blk); blk_fetch_waiting.erase(it); } } bool HotStuffBase::on_deliver_blk(const block_t &blk) { const uint256_t &blk_hash = blk->get_hash(); bool valid; /* sanity check: all parents must be delivered */ for (const auto &p: blk->get_parent_hashes()) assert(storage->is_blk_delivered(p)); if ((valid = HotStuffCore::on_deliver_blk(blk))) { LOG_DEBUG("block %.10s delivered", get_hex(blk_hash).c_str()); part_parent_size += blk->get_parent_hashes().size(); part_delivered++; delivered++; } else { LOG_WARN("dropping invalid block"); } bool res = true; auto it = blk_delivery_waiting.find(blk_hash); if (it != blk_delivery_waiting.end()) { auto &pm = it->second; if (valid) { pm.elapsed.stop(false); auto sec = pm.elapsed.elapsed_sec; part_delivery_time += sec; part_delivery_time_min = std::min(part_delivery_time_min, sec); part_delivery_time_max = std::max(part_delivery_time_max, sec); pm.resolve(blk); } else { pm.reject(blk); res = false; // TODO: do we need to also free it from storage? } blk_delivery_waiting.erase(it); } return res; } promise_t HotStuffBase::async_fetch_blk(const uint256_t &blk_hash, const PeerId *replica, bool fetch_now) { if (storage->is_blk_fetched(blk_hash)) return promise_t([this, &blk_hash](promise_t pm){ pm.resolve(storage->find_blk(blk_hash)); }); auto it = blk_fetch_waiting.find(blk_hash); if (it == blk_fetch_waiting.end()) { #ifdef HOTSTUFF_BLK_PROFILE blk_profiler.rec_tx(blk_hash, false); #endif it = blk_fetch_waiting.insert( std::make_pair( blk_hash, BlockFetchContext(blk_hash, this))).first; } if (replica != nullptr) it->second.add_replica(*replica, fetch_now); return static_cast(it->second); } promise_t HotStuffBase::async_deliver_blk(const uint256_t &blk_hash, const PeerId &replica) { if (storage->is_blk_delivered(blk_hash)) return promise_t([this, &blk_hash](promise_t pm) { pm.resolve(storage->find_blk(blk_hash)); }); auto it = blk_delivery_waiting.find(blk_hash); if (it != blk_delivery_waiting.end()) return static_cast(it->second); BlockDeliveryContext pm{[](promise_t){}}; it = blk_delivery_waiting.insert(std::make_pair(blk_hash, pm)).first; /* otherwise the on_deliver_batch will resolve */ async_fetch_blk(blk_hash, &replica).then([this, replica](block_t blk) { /* qc_ref should be fetched */ std::vector pms; const auto &qc = blk->get_qc(); assert(qc); if (blk == get_genesis()) pms.push_back(promise_t([](promise_t &pm){ pm.resolve(true); })); else pms.push_back(blk->verify(this, vpool)); pms.push_back(async_fetch_blk(qc->get_obj_hash(), &replica)); /* the parents should be delivered */ for (const auto &phash: blk->get_parent_hashes()) pms.push_back(async_deliver_blk(phash, replica)); promise::all(pms).then([this, blk](const promise::values_t values) { auto ret = promise::any_cast(values[0]) && this->on_deliver_blk(blk); if (!ret) HOTSTUFF_LOG_WARN("verification failed during async delivery"); }); }); return static_cast(pm); } void HotStuffBase::propose_handler(MsgPropose &&msg, const Net::conn_t &conn) { const PeerId &peer = conn->get_peer_id(); if (peer.is_null()) return; msg.postponed_parse(this); auto &prop = msg.proposal; block_t blk = prop.blk; if (!blk) return; if (peer != get_config().get_peer_id(prop.proposer)) { LOG_WARN("invalid proposal from %d", prop.proposer); return; } promise::all(std::vector{ async_deliver_blk(blk->get_hash(), peer) }).then([this, prop = std::move(prop)]() { on_receive_proposal(prop); }); } void HotStuffBase::vote_handler(MsgVote &&msg, const Net::conn_t &conn) { const auto &peer = conn->get_peer_id(); if (peer.is_null()) return; msg.postponed_parse(this); //auto &vote = msg.vote; RcObj v(new Vote(std::move(msg.vote))); promise::all(std::vector{ async_deliver_blk(v->blk_hash, peer), v->verify(vpool), }).then([this, v=std::move(v)](const promise::values_t values) { if (!promise::any_cast(values[1])) LOG_WARN("invalid vote from %d", v->voter); else on_receive_vote(*v); }); } void HotStuffBase::req_blk_handler(MsgReqBlock &&msg, const Net::conn_t &conn) { const PeerId replica = conn->get_peer_id(); if (replica.is_null()) return; auto &blk_hashes = msg.blk_hashes; std::vector pms; for (const auto &h: blk_hashes) pms.push_back(async_fetch_blk(h, nullptr)); promise::all(pms).then([replica, this](const promise::values_t values) { std::vector blks; for (auto &v: values) { auto blk = promise::any_cast(v); blks.push_back(blk); } pn.send_msg(MsgRespBlock(blks), replica); }); } void HotStuffBase::resp_blk_handler(MsgRespBlock &&msg, const Net::conn_t &) { msg.postponed_parse(this); for (const auto &blk: msg.blks) if (blk) on_fetch_blk(blk); } bool HotStuffBase::conn_handler(const salticidae::ConnPool::conn_t &conn, bool connected) { if (connected) { if (!pn.enable_tls) return true; auto cert = conn->get_peer_cert(); //SALTICIDAE_LOG_INFO("%s", salticidae::get_hash(cert->get_der()).to_hex().c_str()); return valid_tls_certs.count(salticidae::get_hash(cert->get_der())); } return true; } void HotStuffBase::print_stat() const { LOG_INFO("===== begin stats ====="); LOG_INFO("-------- queues -------"); LOG_INFO("blk_fetch_waiting: %lu", blk_fetch_waiting.size()); LOG_INFO("blk_delivery_waiting: %lu", blk_delivery_waiting.size()); LOG_INFO("decision_waiting: %lu", decision_waiting.size()); LOG_INFO("-------- misc ---------"); LOG_INFO("fetched: %lu", fetched); LOG_INFO("delivered: %lu", delivered); LOG_INFO("cmd_cache: %lu", storage->get_cmd_cache_size()); LOG_INFO("blk_cache: %lu", storage->get_blk_cache_size()); LOG_INFO("------ misc (10s) -----"); LOG_INFO("fetched: %lu", part_fetched); LOG_INFO("delivered: %lu", part_delivered); LOG_INFO("decided: %lu", part_decided); LOG_INFO("gened: %lu", part_gened); LOG_INFO("avg. parent_size: %.3f", part_delivered ? part_parent_size / double(part_delivered) : 0); LOG_INFO("delivery time: %.3f avg, %.3f min, %.3f max", part_delivered ? part_delivery_time / double(part_delivered) : 0, part_delivery_time_min == double_inf ? 0 : part_delivery_time_min, part_delivery_time_max); part_parent_size = 0; part_fetched = 0; part_delivered = 0; part_decided = 0; part_gened = 0; part_delivery_time = 0; part_delivery_time_min = double_inf; part_delivery_time_max = 0; #ifdef HOTSTUFF_MSG_STAT LOG_INFO("--- replica msg. (10s) ---"); size_t _nsent = 0; size_t _nrecv = 0; for (const auto &replica: peers) { auto conn = pn.get_peer_conn(replica); if (conn == nullptr) continue; size_t ns = conn->get_nsent(); size_t nr = conn->get_nrecv(); size_t nsb = conn->get_nsentb(); size_t nrb = conn->get_nrecvb(); conn->clear_msgstat(); LOG_INFO("%s: %u(%u), %u(%u), %u", get_hex10(replica).c_str(), ns, nsb, nr, nrb, part_fetched_replica[replica]); _nsent += ns; _nrecv += nr; part_fetched_replica[replica] = 0; } nsent += _nsent; nrecv += _nrecv; LOG_INFO("sent: %lu", _nsent); LOG_INFO("recv: %lu", _nrecv); LOG_INFO("--- replica msg. total ---"); LOG_INFO("sent: %lu", nsent); LOG_INFO("recv: %lu", nrecv); #endif LOG_INFO("====== end stats ======"); } HotStuffBase::HotStuffBase(uint32_t blk_size, ReplicaID rid, privkey_bt &&priv_key, NetAddr listen_addr, pacemaker_bt pmaker, EventContext ec, size_t nworker, const Net::Config &netconfig): HotStuffCore(rid, std::move(priv_key)), listen_addr(listen_addr), blk_size(blk_size), ec(ec), tcall(ec), vpool(ec, nworker), pn(ec, netconfig), pmaker(std::move(pmaker)), fetched(0), delivered(0), nsent(0), nrecv(0), part_parent_size(0), part_fetched(0), part_delivered(0), part_decided(0), part_gened(0), part_delivery_time(0), part_delivery_time_min(double_inf), part_delivery_time_max(0) { /* register the handlers for msg from replicas */ pn.reg_handler(salticidae::generic_bind(&HotStuffBase::propose_handler, this, _1, _2)); pn.reg_handler(salticidae::generic_bind(&HotStuffBase::vote_handler, this, _1, _2)); pn.reg_handler(salticidae::generic_bind(&HotStuffBase::req_blk_handler, this, _1, _2)); pn.reg_handler(salticidae::generic_bind(&HotStuffBase::resp_blk_handler, this, _1, _2)); pn.reg_conn_handler(salticidae::generic_bind(&HotStuffBase::conn_handler, this, _1, _2)); pn.reg_error_handler([](const std::exception_ptr _err, bool fatal, int32_t async_id) { try { std::rethrow_exception(_err); } catch (const std::exception &err) { HOTSTUFF_LOG_WARN("network async error: %s\n", err.what()); } }); pn.start(); pn.listen(listen_addr); } void HotStuffBase::do_broadcast_proposal(const Proposal &prop) { //MsgPropose prop_msg(prop); pn.multicast_msg(MsgPropose(prop), peers); //for (const auto &replica: peers) // pn.send_msg(prop_msg, replica); } void HotStuffBase::do_vote(ReplicaID last_proposer, const Vote &vote) { pmaker->beat_resp(last_proposer) .then([this, vote](ReplicaID proposer) { if (proposer == get_id()) { //throw HotStuffError("unreachable line"); on_receive_vote(vote); } else pn.send_msg(MsgVote(vote), get_config().get_peer_id(proposer)); }); } void HotStuffBase::do_consensus(const block_t &blk) { pmaker->on_consensus(blk); } void HotStuffBase::do_decide(Finality &&fin) { part_decided++; state_machine_execute(fin); auto it = decision_waiting.find(fin.cmd_hash); if (it != decision_waiting.end()) { it->second(std::move(fin)); decision_waiting.erase(it); } } HotStuffBase::~HotStuffBase() {} void HotStuffBase::start( std::vector> &&replicas, bool ec_loop) { for (size_t i = 0; i < replicas.size(); i++) { auto &addr = std::get<0>(replicas[i]); auto cert_hash = std::move(std::get<2>(replicas[i])); valid_tls_certs.insert(cert_hash); auto peer = pn.enable_tls ? salticidae::PeerId(cert_hash) : salticidae::PeerId(addr); HotStuffCore::add_replica(i, peer, std::move(std::get<1>(replicas[i]))); if (addr != listen_addr) { peers.push_back(peer); pn.add_peer(peer); pn.set_peer_addr(peer, addr); pn.conn_peer(peer); } } /* ((n - 1) + 1 - 1) / 3 */ uint32_t nfaulty = peers.size() / 3; if (nfaulty == 0) LOG_WARN("too few replicas in the system to tolerate any failure"); on_init(nfaulty); pmaker->init(this); if (ec_loop) ec.dispatch(); cmd_pending.reg_handler(ec, [this](cmd_queue_t &q) { std::pair e; while (q.try_dequeue(e)) { ReplicaID proposer = pmaker->get_proposer(); const auto &cmd_hash = e.first; auto it = decision_waiting.find(cmd_hash); if (it == decision_waiting.end()) it = decision_waiting.insert(std::make_pair(cmd_hash, e.second)).first; else e.second(Finality(id, 0, 0, 0, cmd_hash, uint256_t())); if (proposer != get_id()) continue; cmd_pending_buffer.push(cmd_hash); if (cmd_pending_buffer.size() >= blk_size) { std::vector cmds; for (uint32_t i = 0; i < blk_size; i++) { cmds.push_back(cmd_pending_buffer.front()); cmd_pending_buffer.pop(); } pmaker->beat().then([this, cmds = std::move(cmds)](ReplicaID proposer) { if (proposer == get_id()) on_propose(cmds, pmaker->get_parents()); }); return true; } } return false; }); } }