/** * Copyright 2018 VMware * Copyright 2018 Ted Yin * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef _HOTSTUFF_CORE_H #define _HOTSTUFF_CORE_H #include #include #include #include "salticidae/util.h" #include "salticidae/network.h" #include "salticidae/msg.h" #include "hotstuff/util.h" #include "hotstuff/consensus.h" namespace hotstuff { using salticidae::PeerNetwork; using salticidae::ElapsedTime; using salticidae::_1; using salticidae::_2; const double ent_waiting_timeout = 10; const double double_inf = 1e10; /** Network message format for HotStuff. */ struct MsgPropose { static const opcode_t opcode = 0x0; DataStream serialized; Proposal proposal; MsgPropose(const Proposal &); /** Only move the data to serialized, do not parse immediately. */ MsgPropose(DataStream &&s): serialized(std::move(s)) {} /** Parse the serialized data to blks now, with `hsc->storage`. */ void postponed_parse(HotStuffCore *hsc); }; struct MsgVote { static const opcode_t opcode = 0x1; DataStream serialized; Vote vote; MsgVote(const Vote &); MsgVote(DataStream &&s): serialized(std::move(s)) {} void postponed_parse(HotStuffCore *hsc); }; struct MsgReqBlock { static const opcode_t opcode = 0x2; DataStream serialized; std::vector blk_hashes; MsgReqBlock() = default; MsgReqBlock(const std::vector &blk_hashes); MsgReqBlock(DataStream &&s); }; struct MsgRespBlock { static const opcode_t opcode = 0x3; DataStream serialized; std::vector blks; MsgRespBlock(const std::vector &blks); MsgRespBlock(DataStream &&s): serialized(std::move(s)) {} void postponed_parse(HotStuffCore *hsc); }; using promise::promise_t; class HotStuffBase; using pacemaker_bt = BoxObj; template class FetchContext: public promise_t { TimerEvent timeout; HotStuffBase *hs; MsgReqBlock fetch_msg; const uint256_t ent_hash; std::unordered_set replicas; inline void timeout_cb(TimerEvent &); public: FetchContext(const FetchContext &) = delete; FetchContext &operator=(const FetchContext &) = delete; FetchContext(FetchContext &&other); FetchContext(const uint256_t &ent_hash, HotStuffBase *hs); ~FetchContext() {} inline void send(const PeerId &replica); inline void reset_timeout(); inline void add_replica(const PeerId &replica, bool fetch_now = true); }; class BlockDeliveryContext: public promise_t { public: ElapsedTime elapsed; BlockDeliveryContext &operator=(const BlockDeliveryContext &) = delete; BlockDeliveryContext(const BlockDeliveryContext &other): promise_t(static_cast(other)), elapsed(other.elapsed) {} BlockDeliveryContext(BlockDeliveryContext &&other): promise_t(static_cast(other)), elapsed(std::move(other.elapsed)) {} template BlockDeliveryContext(Func callback): promise_t(callback) { elapsed.start(); } }; /** HotStuff protocol (with network implementation). */ class HotStuffBase: public HotStuffCore { using BlockFetchContext = FetchContext; using CmdFetchContext = FetchContext; friend BlockFetchContext; friend CmdFetchContext; public: using Net = PeerNetwork; using commit_cb_t = std::function; protected: /** the binding address in replica network */ NetAddr listen_addr; /** the block size */ size_t blk_size; /** libevent handle */ EventContext ec; salticidae::ThreadCall tcall; VeriPool vpool; std::vector peers; private: /** whether libevent handle is owned by itself */ bool ec_loop; /** network stack */ Net pn; std::unordered_set valid_tls_certs; #ifdef HOTSTUFF_BLK_PROFILE BlockProfiler blk_profiler; #endif pacemaker_bt pmaker; /* queues for async tasks */ std::unordered_map blk_fetch_waiting; std::unordered_map blk_delivery_waiting; std::unordered_map decision_waiting; using cmd_queue_t = salticidae::MPSCQueueEventDriven>; cmd_queue_t cmd_pending; std::queue cmd_pending_buffer; /* statistics */ uint64_t fetched; uint64_t delivered; mutable uint64_t nsent; mutable uint64_t nrecv; mutable uint32_t part_parent_size; mutable uint32_t part_fetched; mutable uint32_t part_delivered; mutable uint32_t part_decided; mutable uint32_t part_gened; mutable double part_delivery_time; mutable double part_delivery_time_min; mutable double part_delivery_time_max; mutable std::unordered_map part_fetched_replica; void on_fetch_cmd(const command_t &cmd); void on_fetch_blk(const block_t &blk); bool on_deliver_blk(const block_t &blk); /** deliver consensus message: */ inline void propose_handler(MsgPropose &&, const Net::conn_t &); /** deliver consensus message: */ inline void vote_handler(MsgVote &&, const Net::conn_t &); /** fetches full block data */ inline void req_blk_handler(MsgReqBlock &&, const Net::conn_t &); /** receives a block */ inline void resp_blk_handler(MsgRespBlock &&, const Net::conn_t &); inline bool conn_handler(const salticidae::ConnPool::conn_t &, bool); void do_broadcast_proposal(const Proposal &) override; void do_vote(ReplicaID, const Vote &) override; void do_decide(Finality &&) override; void do_consensus(const block_t &blk) override; protected: /** Called to replicate the execution of a command, the application should * implement this to make transition for the application state. */ virtual void state_machine_execute(const Finality &) = 0; public: HotStuffBase(uint32_t blk_size, ReplicaID rid, privkey_bt &&priv_key, NetAddr listen_addr, pacemaker_bt pmaker, EventContext ec, size_t nworker, const Net::Config &netconfig); ~HotStuffBase(); /* the API for HotStuffBase */ /* Submit the command to be decided. */ void exec_command(uint256_t cmd_hash, commit_cb_t callback); void start(std::vector> &&replicas, bool ec_loop = false); size_t size() const { return peers.size(); } const auto &get_decision_waiting() const { return decision_waiting; } ThreadCall &get_tcall() { return tcall; } PaceMaker *get_pace_maker() { return pmaker.get(); } void print_stat() const; virtual void do_elected() {} //#ifdef HOTSTUFF_AUTOCLI // virtual void do_demand_commands(size_t) {} //#endif /* Helper functions */ /** Returns a promise resolved (with command_t cmd) when Command is fetched. */ promise_t async_fetch_cmd(const uint256_t &cmd_hash, const PeerId *replica, bool fetch_now = true); /** Returns a promise resolved (with block_t blk) when Block is fetched. */ promise_t async_fetch_blk(const uint256_t &blk_hash, const PeerId *replica, bool fetch_now = true); /** Returns a promise resolved (with block_t blk) when Block is delivered (i.e. prefix is fetched). */ promise_t async_deliver_blk(const uint256_t &blk_hash, const PeerId &replica); }; /** HotStuff protocol (templated by cryptographic implementation). */ template class HotStuff: public HotStuffBase { using HotStuffBase::HotStuffBase; protected: part_cert_bt create_part_cert(const PrivKey &priv_key, const uint256_t &blk_hash) override { HOTSTUFF_LOG_DEBUG("create part cert with priv=%s, blk_hash=%s", get_hex10(priv_key).c_str(), get_hex10(blk_hash).c_str()); return new PartCertType( static_cast(priv_key), blk_hash); } part_cert_bt parse_part_cert(DataStream &s) override { PartCert *pc = new PartCertType(); s >> *pc; return pc; } quorum_cert_bt create_quorum_cert(const uint256_t &blk_hash) override { return new QuorumCertType(get_config(), blk_hash); } quorum_cert_bt parse_quorum_cert(DataStream &s) override { QuorumCert *qc = new QuorumCertType(); s >> *qc; return qc; } public: HotStuff(uint32_t blk_size, ReplicaID rid, const bytearray_t &raw_privkey, NetAddr listen_addr, pacemaker_bt pmaker, EventContext ec = EventContext(), size_t nworker = 4, const Net::Config &netconfig = Net::Config()): HotStuffBase(blk_size, rid, new PrivKeyType(raw_privkey), listen_addr, std::move(pmaker), ec, nworker, netconfig) {} void start(const std::vector> &replicas, bool ec_loop = false) { std::vector> reps; for (auto &r: replicas) reps.push_back( std::make_tuple( std::get<0>(r), new PubKeyType(std::get<1>(r)), uint256_t(std::get<2>(r)) )); HotStuffBase::start(std::move(reps), ec_loop); } }; using HotStuffNoSig = HotStuff<>; using HotStuffSecp256k1 = HotStuff; template FetchContext::FetchContext(FetchContext && other): promise_t(static_cast(other)), hs(other.hs), fetch_msg(std::move(other.fetch_msg)), ent_hash(other.ent_hash), replicas(std::move(other.replicas)) { other.timeout.del(); timeout = TimerEvent(hs->ec, std::bind(&FetchContext::timeout_cb, this, _1)); reset_timeout(); } template<> inline void FetchContext::timeout_cb(TimerEvent &) { HOTSTUFF_LOG_WARN("cmd fetching %.10s timeout", get_hex(ent_hash).c_str()); for (const auto &replica: replicas) send(replica); reset_timeout(); } template<> inline void FetchContext::timeout_cb(TimerEvent &) { HOTSTUFF_LOG_WARN("block fetching %.10s timeout", get_hex(ent_hash).c_str()); for (const auto &replica: replicas) send(replica); reset_timeout(); } template FetchContext::FetchContext( const uint256_t &ent_hash, HotStuffBase *hs): promise_t([](promise_t){}), hs(hs), ent_hash(ent_hash) { fetch_msg = std::vector{ent_hash}; timeout = TimerEvent(hs->ec, std::bind(&FetchContext::timeout_cb, this, _1)); reset_timeout(); } template void FetchContext::send(const PeerId &replica) { hs->part_fetched_replica[replica]++; hs->pn.send_msg(fetch_msg, replica); } template void FetchContext::reset_timeout() { timeout.add(salticidae::gen_rand_timeout(ent_waiting_timeout)); } template void FetchContext::add_replica(const PeerId &replica, bool fetch_now) { if (replicas.empty() && fetch_now) send(replica); replicas.insert(replica); } } #endif