diff options
-rw-r--r-- | CMakeLists.txt | 33 | ||||
-rw-r--r-- | include/hotstuff/client.h | 4 | ||||
-rw-r--r-- | include/hotstuff/consensus.h | 250 | ||||
-rw-r--r-- | include/hotstuff/core.h | 631 | ||||
-rw-r--r-- | include/hotstuff/crypto.h | 9 | ||||
-rw-r--r-- | include/hotstuff/entity.h | 6 | ||||
-rw-r--r-- | include/hotstuff/hotstuff.h | 320 | ||||
-rw-r--r-- | include/hotstuff/liveness.h | 78 | ||||
-rw-r--r-- | include/hotstuff/type.h | 13 | ||||
m--------- | salticidae | 0 | ||||
-rw-r--r-- | src/consensus.cpp | 296 | ||||
-rw-r--r-- | src/core.cpp | 723 | ||||
-rw-r--r-- | src/entity.cpp | 2 | ||||
-rw-r--r-- | src/hotstuff.cpp | 647 | ||||
-rw-r--r-- | src/hotstuff_app.cpp | 287 | ||||
-rw-r--r-- | src/hotstuff_client.cpp | 8 | ||||
-rw-r--r-- | test/CMakeLists.txt | 2 |
17 files changed, 1677 insertions, 1632 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index d5246c6..d032ae0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,12 +30,25 @@ add_dependencies(secp256k1 libsecp256k1) include_directories(include) add_library(hotstuff - src/entity.cpp - src/core.cpp + OBJECT + src/util.cpp src/client.cpp src/crypto.cpp - src/util.cpp) -target_link_libraries(hotstuff salticidae secp256k1 crypto) + src/entity.cpp + src/consensus.cpp + src/hotstuff.cpp + ) + +option(BUILD_SHARED "build shared library." OFF) +if(BUILD_SHARED) + set_property(TARGET hotstuff PROPERTY POSITION_INDEPENDENT_CODE 1) + add_library(hotstuff_shared SHARED $<TARGET_OBJECTS:hotstuff>) + set_target_properties(hotstuff_shared PROPERTIES OUTPUT_NAME "hotstuff") + target_link_libraries(hotstuff_shared salticidae_static secp256k1 crypto) +endif() +add_library(hotstuff_static STATIC $<TARGET_OBJECTS:hotstuff>) +set_target_properties(hotstuff_static PROPERTIES OUTPUT_NAME "hotstuff") +target_link_libraries(hotstuff_static salticidae_static secp256k1 crypto) add_subdirectory(test) @@ -54,16 +67,16 @@ endif() # add executables add_executable(hotstuff-app - src/hotstuff.cpp) -target_link_libraries(hotstuff-app hotstuff) + src/hotstuff_app.cpp) +target_link_libraries(hotstuff-app hotstuff_static) add_executable(hotstuff-client src/hotstuff_client.cpp) -target_link_libraries(hotstuff-client hotstuff) +target_link_libraries(hotstuff-client hotstuff_static) add_executable(hotstuff-keygen src/hotstuff_keygen.cpp) -target_link_libraries(hotstuff-keygen hotstuff) +target_link_libraries(hotstuff-keygen hotstuff_static) find_package(Doxygen) if (DOXYGEN_FOUND) @@ -80,7 +93,3 @@ macro(remove_cxx_flag flag) endmacro() remove_cxx_flag("-DNDEBUG") -#message(${CMAKE_CXX_FLAGS_RELEASE}) -#remove_cxx_flag("-O3") -#set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O2") -#message(${CMAKE_CXX_FLAGS_RELEASE}) diff --git a/include/hotstuff/client.h b/include/hotstuff/client.h index dd1cfee..00ec77d 100644 --- a/include/hotstuff/client.h +++ b/include/hotstuff/client.h @@ -1,9 +1,9 @@ #ifndef _HOTSTUFF_CLIENT_H #define _HOTSTUFF_CLIENT_H -#include "type.h" #include "salticidae/msg.h" -#include "entity.h" +#include "hotstuff/type.h" +#include "hotstuff/entity.h" namespace hotstuff { diff --git a/include/hotstuff/consensus.h b/include/hotstuff/consensus.h new file mode 100644 index 0000000..18e891e --- /dev/null +++ b/include/hotstuff/consensus.h @@ -0,0 +1,250 @@ +#ifndef _HOTSTUFF_CONSENSUS_H +#define _HOTSTUFF_CONSENSUS_H + +#include <set> +#include <unordered_map> + +#include "hotstuff/promise.hpp" +#include "hotstuff/type.h" +#include "hotstuff/entity.h" +#include "hotstuff/crypto.h" + +namespace hotstuff { + +struct Proposal; +struct Vote; + +/** Abstraction for HotStuff protocol state machine (without network implementation). */ +class HotStuffCore { + block_t b0; /** the genesis block */ + /* === state variables === */ + /** block containing the QC for the highest block having one */ + block_t bqc; + block_t bexec; /**< last executed block */ + uint32_t vheight; /**< height of the block last voted for */ + /* === auxilliary variables === */ + privkey_bt priv_key; /**< private key for signing votes */ + std::set<block_t, BlockHeightCmp> tails; /**< set of tail blocks */ + ReplicaConfig config; /**< replica configuration */ + /* === async event queues === */ + std::unordered_map<block_t, promise_t> qc_waiting; + promise_t propose_waiting; + + block_t sanity_check_delivered(const uint256_t &blk_hash); + void sanity_check_delivered(const block_t &blk); + void check_commit(const block_t &_bqc); + bool update(const uint256_t &bqc_hash); + void on_qc_finish(const block_t &blk); + void on_propose_(const block_t &blk); + + protected: + ReplicaID id; /**< identity of the replica itself */ + const int32_t parent_limit; /**< maximum number of parents */ + + public: + BoxObj<EntityStorage> storage; + + HotStuffCore(ReplicaID id, + privkey_bt &&priv_key, + int32_t parent_limit); + virtual ~HotStuffCore() = default; + + /* Inputs of the state machine triggered by external events, should called + * by the class user, with proper invariants. */ + + /** Call to initialize the protocol, should be called once before all other + * functions. */ + void on_init(uint32_t nfaulty) { config.nmajority = 2 * nfaulty + 1; } + + /** Call to deliver a block. + * A block is only delivered if itself is fetched, the block for the + * contained qc is fetched and all parents are delivered. The user should + * always ensure this invariant. The invalid blocks will be dropped by this + * function. + * @return true if valid */ + bool on_deliver_blk(const block_t &blk); + + /** Call upon the delivery of a proposal message. + * The block mentioned in the message should be already delivered. */ + void on_receive_proposal(const Proposal &prop); + + /** Call upon the delivery of a vote message. + * The block mentioned in the message should be already delivered. */ + void on_receive_vote(const Vote &vote); + + /** Call to submit new commands to be decided (executed). */ + void on_propose(const std::vector<command_t> &cmds); + + /* Functions required to construct concrete instances for abstract classes. + * */ + + /* Outputs of the state machine triggering external events. The virtual + * functions should be implemented by the user to specify the behavior upon + * the events. */ + protected: + /** Called by HotStuffCore upon the decision being made for cmd. */ + virtual void do_decide(const command_t &cmd) = 0; + /** Called by HotStuffCore upon broadcasting a new proposal. + * The user should send the proposal message to all replicas except for + * itself. */ + virtual void do_broadcast_proposal(const Proposal &prop) = 0; + /** Called upon sending out a new vote to the next proposer. The user + * should send the vote message to a *good* proposer to have good liveness, + * while safety is always guaranteed by HotStuffCore. */ + virtual void do_vote(ReplicaID last_proposer, const Vote &vote) = 0; + + /* The user plugs in the detailed instances for those + * polymorphic data types. */ + public: + /** Create a partial certificate that proves the vote for a block. */ + virtual part_cert_bt create_part_cert(const PrivKey &priv_key, const uint256_t &blk_hash) = 0; + /** Create a partial certificate from its seralized form. */ + virtual part_cert_bt parse_part_cert(DataStream &s) = 0; + /** Create a quorum certificate that proves 2f+1 votes for a block. */ + virtual quorum_cert_bt create_quorum_cert(const uint256_t &blk_hash) = 0; + /** Create a quorum certificate from its serialized form. */ + virtual quorum_cert_bt parse_quorum_cert(DataStream &s) = 0; + /** Create a command object from its serialized form. */ + virtual command_t parse_cmd(DataStream &s) = 0; + + public: + /** Add a replica to the current configuration. This should only be called + * before running HotStuffCore protocol. */ + void add_replica(ReplicaID rid, const NetAddr &addr, pubkey_bt &&pub_key); + /** Try to prune blocks lower than last committed height - staleness. */ + void prune(uint32_t staleness); + + /* PaceMaker can use these functions to monitor the core protocol state + * transition */ + /** Get a promise resolved when the block gets a QC. */ + promise_t async_qc_finish(const block_t &blk); + /** Get a promise resolved when a new block is proposed. */ + promise_t async_wait_propose(); + + /* Other useful functions */ + block_t get_genesis() { return b0; } + const ReplicaConfig &get_config() { return config; } + int8_t get_cmd_decision(const uint256_t &cmd_hash); + ReplicaID get_id() { return id; } + operator std::string () const; +}; + +/** Abstraction for proposal messages. */ +struct Proposal: public Serializable { + ReplicaID proposer; + /** hash for the block containing the highest QC */ + uint256_t bqc_hash; + /** block being proposed */ + block_t blk; + + /** handle of the core object to allow polymorphism. The user should use + * a pointer to the object of the class derived from HotStuffCore */ + HotStuffCore *hsc; + + Proposal(HotStuffCore *hsc): blk(nullptr), hsc(hsc) {} + Proposal(ReplicaID proposer, + const uint256_t &bqc_hash, + block_t &blk, + HotStuffCore *hsc): + proposer(proposer), + bqc_hash(bqc_hash), + blk(blk), hsc(hsc) {} + + void serialize(DataStream &s) const override { + s << proposer + << bqc_hash + << *blk; + } + + void unserialize(DataStream &s) override { + assert(hsc != nullptr); + s >> proposer + >> bqc_hash; + Block _blk; + _blk.unserialize(s, hsc); + blk = hsc->storage->add_blk(std::move(_blk)); + } + + operator std::string () const { + DataStream s; + s << "<proposal " + << "rid=" << std::to_string(proposer) << " " + << "bqc=" << get_hex10(bqc_hash) << " " + << "blk=" << get_hex10(blk->get_hash()) << ">"; + return std::string(std::move(s)); + } +}; + +/** Abstraction for vote messages. */ +struct Vote: public Serializable { + ReplicaID voter; + /** hash for the block containing the highest QC */ + uint256_t bqc_hash; + /** block being voted */ + uint256_t blk_hash; + /** proof of validity for the vote (nullptr for a negative vote) */ + part_cert_bt cert; + + /** handle of the core object to allow polymorphism */ + HotStuffCore *hsc; + + Vote(HotStuffCore *hsc): cert(nullptr), hsc(hsc) {} + Vote(ReplicaID voter, + const uint256_t &bqc_hash, + const uint256_t &blk_hash, + part_cert_bt &&cert, + HotStuffCore *hsc): + voter(voter), + bqc_hash(bqc_hash), + blk_hash(blk_hash), + cert(std::move(cert)), hsc(hsc) {} + + Vote(const Vote &other): + voter(other.voter), + bqc_hash(other.bqc_hash), + blk_hash(other.blk_hash), + cert(other.cert->clone()), + hsc(other.hsc) {} + + Vote(Vote &&other) = default; + + void serialize(DataStream &s) const override { + s << voter + << bqc_hash + << blk_hash; + if (cert == nullptr) + s << (uint8_t)0; + else + s << (uint8_t)1 << *cert; + } + + void unserialize(DataStream &s) override { + assert(hsc != nullptr); + uint8_t has_cert; + s >> voter + >> bqc_hash + >> blk_hash + >> has_cert; + cert = has_cert ? hsc->parse_part_cert(s) : nullptr; + } + + bool verify() const { + assert(hsc != nullptr); + return cert->verify(hsc->get_config().get_pubkey(voter)) && + cert->get_blk_hash() == blk_hash; + } + + operator std::string () const { + DataStream s; + s << "<vote " + << "rid=" << std::to_string(voter) << " " + << "bqc=" << get_hex10(bqc_hash) << " " + << "blk=" << get_hex10(blk_hash) << " " + << "cert=" << (cert ? "yes" : "no") << ">"; + return std::string(std::move(s)); + } +}; + +} + +#endif diff --git a/include/hotstuff/core.h b/include/hotstuff/core.h deleted file mode 100644 index c7e1fe6..0000000 --- a/include/hotstuff/core.h +++ /dev/null @@ -1,631 +0,0 @@ -#ifndef _HOTSTUFF_CORE_H -#define _HOTSTUFF_CORE_H - -#include <queue> -#include <set> -#include <unordered_map> -#include <unordered_set> - -#include "salticidae/stream.h" -#include "salticidae/util.h" -#include "salticidae/network.h" -#include "salticidae/msg.h" - -#include "promise.hpp" -#include "util.h" -#include "entity.h" -#include "crypto.h" - -using salticidae::EventContext; -using salticidae::Event; -using salticidae::NetAddr; -using salticidae::MsgNetwork; -using salticidae::PeerNetwork; -using salticidae::ElapsedTime; -using salticidae::_1; -using salticidae::_2; - -namespace hotstuff { - -const double ent_waiting_timeout = 10; -const double double_inf = 1e10; - -enum { - PROPOSE = 0x0, - VOTE = 0x1, - QUERY_FETCH_BLK = 0x2, - RESP_FETCH_BLK = 0x3, -}; - -using promise::promise_t; - -struct Proposal; -struct Vote; - -/** Abstraction for HotStuff protocol state machine (without network implementation). */ -class HotStuffCore { - block_t b0; /** the genesis block */ - /* === state variables === */ - /** block containing the QC for the highest block having one */ - block_t bqc; - block_t bexec; /**< last executed block */ - uint32_t vheight; /**< height of the block last voted for */ - /* === auxilliary variables === */ - privkey_bt priv_key; /**< private key for signing votes */ - std::set<block_t, BlockHeightCmp> tails; /**< set of tail blocks */ - ReplicaConfig config; /**< replica configuration */ - /* === async event queues === */ - std::unordered_map<block_t, promise_t> qc_waiting; - promise_t propose_waiting; - - block_t sanity_check_delivered(const uint256_t &blk_hash); - void sanity_check_delivered(const block_t &blk); - void check_commit(const block_t &_bqc); - bool update(const uint256_t &bqc_hash); - void on_qc_finish(const block_t &blk); - void on_propose_(const block_t &blk); - - protected: - ReplicaID id; /**< identity of the replica itself */ - const int32_t parent_limit; /**< maximum number of parents */ - - public: - BoxObj<EntityStorage> storage; - - HotStuffCore(ReplicaID id, - privkey_bt &&priv_key, - int32_t parent_limit); - virtual ~HotStuffCore() = default; - - /* Inputs of the state machine triggered by external events, should called - * by the class user, with proper invariants. */ - - /** Call to initialize the protocol, should be called once before all other - * functions. */ - void on_init(uint32_t nfaulty) { config.nmajority = 2 * nfaulty + 1; } - - /** Call to deliver a block. - * A block is only delivered if itself is fetched, the block for the - * contained qc is fetched and all parents are delivered. The user should - * always ensure this invariant. The invalid blocks will be dropped by this - * function. - * @return true if valid */ - bool on_deliver_blk(const block_t &blk); - - /** Call upon the delivery of a proposal message. - * The block mentioned in the message should be already delivered. */ - void on_receive_proposal(const Proposal &prop); - - /** Call upon the delivery of a vote message. - * The block mentioned in the message should be already delivered. */ - void on_receive_vote(const Vote &vote); - - /** Call to submit new commands to be decided (executed). */ - void on_propose(const std::vector<command_t> &cmds); - - /* Functions required to construct concrete instances for abstract classes. - * */ - - /* Outputs of the state machine triggering external events. The virtual - * functions should be implemented by the user to specify the behavior upon - * the events. */ - protected: - /** Called by HotStuffCore upon the decision being made for cmd. */ - virtual void do_decide(const command_t &cmd) = 0; - /** Called by HotStuffCore upon broadcasting a new proposal. - * The user should send the proposal message to all replicas except for - * itself. */ - virtual void do_broadcast_proposal(const Proposal &prop) = 0; - /** Called upon sending out a new vote to the next proposer. The user - * should send the vote message to a *good* proposer to have good liveness, - * while safety is always guaranteed by HotStuffCore. */ - virtual void do_vote(ReplicaID last_proposer, const Vote &vote) = 0; - - /* The user plugs in the detailed instances for those - * polymorphic data types. */ - public: - /** Create a partial certificate that proves the vote for a block. */ - virtual part_cert_bt create_part_cert(const PrivKey &priv_key, const uint256_t &blk_hash) = 0; - /** Create a partial certificate from its seralized form. */ - virtual part_cert_bt parse_part_cert(DataStream &s) = 0; - /** Create a quorum certificate that proves 2f+1 votes for a block. */ - virtual quorum_cert_bt create_quorum_cert(const uint256_t &blk_hash) = 0; - /** Create a quorum certificate from its serialized form. */ - virtual quorum_cert_bt parse_quorum_cert(DataStream &s) = 0; - /** Create a command object from its serialized form. */ - virtual command_t parse_cmd(DataStream &s) = 0; - - public: - /** Add a replica to the current configuration. This should only be called - * before running HotStuffCore protocol. */ - void add_replica(ReplicaID rid, const NetAddr &addr, pubkey_bt &&pub_key); - /** Try to prune blocks lower than last committed height - staleness. */ - void prune(uint32_t staleness); - - /* PaceMaker can use these functions to monitor the core protocol state - * transition */ - /** Get a promise resolved when the block gets a QC. */ - promise_t async_qc_finish(const block_t &blk); - /** Get a promise resolved when a new block is proposed. */ - promise_t async_wait_propose(); - - /* Other useful functions */ - block_t get_genesis() { return b0; } - const ReplicaConfig &get_config() { return config; } - int8_t get_cmd_decision(const uint256_t &cmd_hash); - ReplicaID get_id() { return id; } - operator std::string () const; -}; - -/** Abstraction for proposal messages. */ -struct Proposal: public Serializable { - ReplicaID proposer; - /** hash for the block containing the highest QC */ - uint256_t bqc_hash; - /** block being proposed */ - block_t blk; - - /** handle of the core object to allow polymorphism. The user should use - * a pointer to the object of the class derived from HotStuffCore */ - HotStuffCore *hsc; - - Proposal(HotStuffCore *hsc): blk(nullptr), hsc(hsc) {} - Proposal(ReplicaID proposer, - const uint256_t &bqc_hash, - block_t &blk, - HotStuffCore *hsc): - proposer(proposer), - bqc_hash(bqc_hash), - blk(blk), hsc(hsc) {} - - void serialize(DataStream &s) const override { - s << proposer - << bqc_hash - << *blk; - } - - void unserialize(DataStream &s) override { - assert(hsc != nullptr); - s >> proposer - >> bqc_hash; - Block _blk; - _blk.unserialize(s, hsc); - blk = hsc->storage->add_blk(std::move(_blk)); - } - - operator std::string () const { - DataStream s; - s << "<proposal " - << "rid=" << std::to_string(proposer) << " " - << "bqc=" << get_hex10(bqc_hash) << " " - << "blk=" << get_hex10(blk->get_hash()) << ">"; - return std::string(std::move(s)); - } -}; - -/** Abstraction for vote messages. */ -struct Vote: public Serializable { - ReplicaID voter; - /** hash for the block containing the highest QC */ - uint256_t bqc_hash; - /** block being voted */ - uint256_t blk_hash; - /** proof of validity for the vote (nullptr for a negative vote) */ - part_cert_bt cert; - - /** handle of the core object to allow polymorphism */ - HotStuffCore *hsc; - - Vote(HotStuffCore *hsc): cert(nullptr), hsc(hsc) {} - Vote(ReplicaID voter, - const uint256_t &bqc_hash, - const uint256_t &blk_hash, - part_cert_bt &&cert, - HotStuffCore *hsc): - voter(voter), - bqc_hash(bqc_hash), - blk_hash(blk_hash), - cert(std::move(cert)), hsc(hsc) {} - - Vote(const Vote &other): - voter(other.voter), - bqc_hash(other.bqc_hash), - blk_hash(other.blk_hash), - cert(other.cert->clone()), - hsc(other.hsc) {} - - Vote(Vote &&other) = default; - - void serialize(DataStream &s) const override { - s << voter - << bqc_hash - << blk_hash; - if (cert == nullptr) - s << (uint8_t)0; - else - s << (uint8_t)1 << *cert; - } - - void unserialize(DataStream &s) override { - assert(hsc != nullptr); - uint8_t has_cert; - s >> voter - >> bqc_hash - >> blk_hash - >> has_cert; - cert = has_cert ? hsc->parse_part_cert(s) : nullptr; - } - - bool verify() const { - assert(hsc != nullptr); - return cert->verify(hsc->get_config().get_pubkey(voter)) && - cert->get_blk_hash() == blk_hash; - } - - operator std::string () const { - DataStream s; - s << "<vote " - << "rid=" << std::to_string(voter) << " " - << "bqc=" << get_hex10(bqc_hash) << " " - << "blk=" << get_hex10(blk_hash) << " " - << "cert=" << (cert ? "yes" : "no") << ">"; - return std::string(std::move(s)); - } -}; - -/** Abstraction for liveness gadget (oracle). */ -class PaceMaker { - public: - virtual ~PaceMaker() = default; - /** Get a promise resolved when the pace maker thinks it is a *good* time - * to issue new commands. When promise is resolved with the ID of itself, - * the replica should propose the command, otherwise it will forward the - * command to the proposer indicated by the ID. */ - virtual promise_t beat() = 0; - /** Get a promise resolved when the pace maker thinks it is a *good* time - * to vote for a block. The promise is resolved with the next proposer's ID - * */ - virtual promise_t next_proposer(ReplicaID last_proposer) = 0; -}; - -using pacemaker_bt = BoxObj<PaceMaker>; - -/** A pace maker that waits for the qc of the last proposed block. */ -class PaceMakerDummy: public PaceMaker { - HotStuffCore *hsc; - std::queue<promise_t> pending_beats; - block_t last_proposed; - bool locked; - - void schedule_next() { - if (!pending_beats.empty() && !locked) - { - auto pm = pending_beats.front(); - pending_beats.pop(); - hsc->async_qc_finish(last_proposed).then( - [id = hsc->get_id(), pm]() { - pm.resolve(id); - }); - locked = true; - } - } - - void update_last_proposed() { - hsc->async_wait_propose().then([this](block_t blk) { - update_last_proposed(); - last_proposed = blk; - locked = false; - schedule_next(); - }); - } - - public: - PaceMakerDummy(HotStuffCore *hsc): - hsc(hsc), - last_proposed(hsc->get_genesis()), - locked(false) { - update_last_proposed(); - } - - promise_t beat() override { - promise_t pm; - pending_beats.push(pm); - schedule_next(); - return pm; - } - - promise_t next_proposer(ReplicaID last_proposer) override { - return promise_t([last_proposer](promise_t &pm) { - pm.resolve(last_proposer); - }); - } -}; - -/** Network message format for HotStuff. */ -struct MsgHotStuff: public salticidae::MsgBase<> { - using MsgBase::MsgBase; - void gen_propose(const Proposal &); - void parse_propose(Proposal &) const; - - void gen_vote(const Vote &); - void parse_vote(Vote &) const; - - void gen_qfetchblk(const std::vector<uint256_t> &blk_hashes); - void parse_qfetchblk(std::vector<uint256_t> &blk_hashes) const; - - void gen_rfetchblk(const std::vector<block_t> &blks); - void parse_rfetchblk(std::vector<block_t> &blks, HotStuffCore *hsc) const; -}; - -using promise::promise_t; - -class HotStuffBase; - -template<EntityType ent_type> -class FetchContext: public promise_t { - Event timeout; - HotStuffBase *hs; - MsgHotStuff fetch_msg; - const uint256_t ent_hash; - std::unordered_set<NetAddr> replica_ids; - inline void timeout_cb(evutil_socket_t, short); - public: - FetchContext(const FetchContext &) = delete; - FetchContext &operator=(const FetchContext &) = delete; - FetchContext(FetchContext &&other); - - FetchContext(const uint256_t &ent_hash, HotStuffBase *hs); - ~FetchContext() {} - - inline void send(const NetAddr &replica_id); - inline void reset_timeout(); - inline void add_replica(const NetAddr &replica_id, bool fetch_now = true); -}; - -class BlockDeliveryContext: public promise_t { - public: - ElapsedTime elapsed; - BlockDeliveryContext &operator=(const BlockDeliveryContext &) = delete; - BlockDeliveryContext(const BlockDeliveryContext &other): - promise_t(static_cast<const promise_t &>(other)), - elapsed(other.elapsed) {} - BlockDeliveryContext(BlockDeliveryContext &&other): - promise_t(static_cast<const promise_t &>(other)), - elapsed(std::move(other.elapsed)) {} - template<typename Func> - BlockDeliveryContext(Func callback): promise_t(callback) { - elapsed.start(); - } -}; - - -/** HotStuff protocol (with network implementation). */ -class HotStuffBase: public HotStuffCore { - using BlockFetchContext = FetchContext<ENT_TYPE_BLK>; - using CmdFetchContext = FetchContext<ENT_TYPE_CMD>; - using conn_t = MsgNetwork<MsgHotStuff>::conn_t; - - friend BlockFetchContext; - friend CmdFetchContext; - - protected: - /** the binding address in replica network */ - NetAddr listen_addr; - /** the block size */ - size_t blk_size; - /** libevent handle */ - EventContext eb; - pacemaker_bt pmaker; - - private: - /** whether libevent handle is owned by itself */ - bool eb_loop; - /** network stack */ - PeerNetwork<MsgHotStuff> pn; -#ifdef HOTSTUFF_ENABLE_BLK_PROFILE - BlockProfiler blk_profiler; -#endif - /* queues for async tasks */ - std::unordered_map<const uint256_t, BlockFetchContext> blk_fetch_waiting; - std::unordered_map<const uint256_t, BlockDeliveryContext> blk_delivery_waiting; - std::unordered_map<const uint256_t, CmdFetchContext> cmd_fetch_waiting; - std::unordered_map<const uint256_t, promise_t> decision_waiting; - std::queue<command_t> cmd_pending; - - /* statistics */ - uint64_t fetched; - uint64_t delivered; - mutable uint64_t nsent; - mutable uint64_t nrecv; - - mutable uint32_t part_parent_size; - mutable uint32_t part_fetched; - mutable uint32_t part_delivered; - mutable uint32_t part_decided; - mutable uint32_t part_gened; - mutable double part_delivery_time; - mutable double part_delivery_time_min; - mutable double part_delivery_time_max; - mutable std::unordered_map<const NetAddr, uint32_t> part_fetched_replica; - - void on_fetch_cmd(const command_t &cmd); - void on_fetch_blk(const block_t &blk); - void on_deliver_blk(const block_t &blk); - - /** deliver consensus message: <propose> */ - inline void propose_handler(const MsgHotStuff &, conn_t); - /** deliver consensus message: <vote> */ - inline void vote_handler(const MsgHotStuff &, conn_t); - /** fetches full block data */ - inline void query_fetch_blk_handler(const MsgHotStuff &, conn_t); - /** receives a block */ - inline void resp_fetch_blk_handler(const MsgHotStuff &, conn_t); - - void do_broadcast_proposal(const Proposal &) override; - void do_vote(ReplicaID, const Vote &) override; - void do_decide(const command_t &) override; - - public: - HotStuffBase(uint32_t blk_size, - int32_t parent_limit, - ReplicaID rid, - privkey_bt &&priv_key, - NetAddr listen_addr, - EventContext eb = EventContext(), - pacemaker_bt pmaker = nullptr); - - ~HotStuffBase(); - - /* the API for HotStuffBase */ - - /* Submit the command to be decided. */ - void add_command(command_t cmd) { - cmd_pending.push(storage->add_cmd(cmd)); - if (cmd_pending.size() >= blk_size) - { - std::vector<command_t> cmds; - for (uint32_t i = 0; i < blk_size; i++) - { - cmds.push_back(cmd_pending.front()); - cmd_pending.pop(); - } - pmaker->beat().then([this, cmds = std::move(cmds)]() { - on_propose(cmds); - }); - } - } - - void add_replica(ReplicaID idx, const NetAddr &addr, pubkey_bt &&pub_key); - void start(bool eb_loop = false); - - size_t size() const { return pn.all_peers().size(); } - void print_stat() const; - - /* Helper functions */ - /** Returns a promise resolved (with command_t cmd) when Command is fetched. */ - promise_t async_fetch_cmd(const uint256_t &cmd_hash, const NetAddr *replica_id, bool fetch_now = true); - /** Returns a promise resolved (with block_t blk) when Block is fetched. */ - promise_t async_fetch_blk(const uint256_t &blk_hash, const NetAddr *replica_id, bool fetch_now = true); - /** Returns a promise resolved (with block_t blk) when Block is delivered (i.e. prefix is fetched). */ - promise_t async_deliver_blk(const uint256_t &blk_hash, const NetAddr &replica_id); - /** Returns a promise resolved (with command_t cmd) when Command is decided. */ - promise_t async_decide(const uint256_t &cmd_hash); -}; - -/** HotStuff protocol (templated by cryptographic implementation). */ -template<typename PrivKeyType = PrivKeyDummy, - typename PubKeyType = PubKeyDummy, - typename PartCertType = PartCertDummy, - typename QuorumCertType = QuorumCertDummy> -class HotStuff: public HotStuffBase { - using HotStuffBase::HotStuffBase; - protected: - - part_cert_bt create_part_cert(const PrivKey &priv_key, const uint256_t &blk_hash) override { - return new PartCertType( - static_cast<const PrivKeyType &>(priv_key), - blk_hash); - } - - part_cert_bt parse_part_cert(DataStream &s) override { - PartCert *pc = new PartCertType(); - s >> *pc; - return pc; - } - - quorum_cert_bt create_quorum_cert(const uint256_t &blk_hash) override { - return new QuorumCertType(get_config(), blk_hash); - } - - quorum_cert_bt parse_quorum_cert(DataStream &s) override { - QuorumCert *qc = new QuorumCertType(); - s >> *qc; - return qc; - } - - public: - HotStuff(uint32_t blk_size, - int32_t parent_limit, - ReplicaID rid, - const bytearray_t &raw_privkey, - NetAddr listen_addr, - EventContext eb = nullptr): - HotStuffBase(blk_size, - parent_limit, - rid, - new PrivKeyType(raw_privkey), - listen_addr, - eb) {} - - void add_replica(ReplicaID idx, const NetAddr &addr, const bytearray_t &pubkey_raw) { - DataStream s(pubkey_raw); - HotStuffBase::add_replica(idx, addr, new PubKeyType(pubkey_raw)); - } -}; - -using HotStuffNoSig = HotStuff<>; -using HotStuffSecp256k1 = HotStuff<PrivKeySecp256k1, PubKeySecp256k1, - PartCertSecp256k1, QuorumCertSecp256k1>; - -template<EntityType ent_type> -FetchContext<ent_type>::FetchContext(FetchContext && other): - promise_t(static_cast<const promise_t &>(other)), - hs(other.hs), - fetch_msg(std::move(other.fetch_msg)), - ent_hash(other.ent_hash), - replica_ids(std::move(other.replica_ids)) { - other.timeout.del(); - timeout = Event(hs->eb, -1, 0, - std::bind(&FetchContext::timeout_cb, this, _1, _2)); - reset_timeout(); -} - -template<> -inline void FetchContext<ENT_TYPE_CMD>::timeout_cb(evutil_socket_t, short) { - HOTSTUFF_LOG_WARN("cmd fetching %.10s timeout", get_hex(ent_hash).c_str()); - for (const auto &replica_id: replica_ids) - send(replica_id); - reset_timeout(); -} - -template<> -inline void FetchContext<ENT_TYPE_BLK>::timeout_cb(evutil_socket_t, short) { - HOTSTUFF_LOG_WARN("block fetching %.10s timeout", get_hex(ent_hash).c_str()); - for (const auto &replica_id: replica_ids) - send(replica_id); - reset_timeout(); -} - -template<EntityType ent_type> -FetchContext<ent_type>::FetchContext( - const uint256_t &ent_hash, HotStuffBase *hs): - promise_t([](promise_t){}), - hs(hs), ent_hash(ent_hash) { - fetch_msg.gen_qfetchblk(std::vector<uint256_t>{ent_hash}); - - timeout = Event(hs->eb, -1, 0, - std::bind(&FetchContext::timeout_cb, this, _1, _2)); - reset_timeout(); -} - -template<EntityType ent_type> -void FetchContext<ent_type>::send(const NetAddr &replica_id) { - hs->part_fetched_replica[replica_id]++; - hs->pn.send_msg(fetch_msg, replica_id); -} - -template<EntityType ent_type> -void FetchContext<ent_type>::reset_timeout() { - timeout.add_with_timeout(salticidae::gen_rand_timeout(ent_waiting_timeout)); -} - -template<EntityType ent_type> -void FetchContext<ent_type>::add_replica(const NetAddr &replica_id, bool fetch_now) { - if (replica_ids.empty() && fetch_now) - send(replica_id); - replica_ids.insert(replica_id); -} - -} - -#endif diff --git a/include/hotstuff/crypto.h b/include/hotstuff/crypto.h index 2fbf745..32997c8 100644 --- a/include/hotstuff/crypto.h +++ b/include/hotstuff/crypto.h @@ -1,14 +1,11 @@ #ifndef _HOTSTUFF_CRYPTO_H #define _HOTSTUFF_CRYPTO_H -#include "salticidae/crypto.h" -#include "salticidae/ref.h" -#include "secp256k1.h" #include <openssl/rand.h> -#include "type.h" -using salticidae::RcObj; -using salticidae::BoxObj; +#include "secp256k1.h" +#include "salticidae/crypto.h" +#include "hotstuff/type.h" namespace hotstuff { diff --git a/include/hotstuff/entity.h b/include/hotstuff/entity.h index b3a0df4..00c64a6 100644 --- a/include/hotstuff/entity.h +++ b/include/hotstuff/entity.h @@ -10,9 +10,9 @@ #include "salticidae/netaddr.h" #include "salticidae/ref.h" -#include "type.h" -#include "util.h" -#include "crypto.h" +#include "hotstuff/type.h" +#include "hotstuff/util.h" +#include "hotstuff/crypto.h" namespace hotstuff { diff --git a/include/hotstuff/hotstuff.h b/include/hotstuff/hotstuff.h new file mode 100644 index 0000000..9546216 --- /dev/null +++ b/include/hotstuff/hotstuff.h @@ -0,0 +1,320 @@ +#ifndef _HOTSTUFF_CORE_H +#define _HOTSTUFF_CORE_H + +#include <queue> +#include <unordered_map> +#include <unordered_set> + +#include "salticidae/util.h" +#include "salticidae/network.h" +#include "salticidae/msg.h" +#include "hotstuff/util.h" +#include "hotstuff/consensus.h" +#include "hotstuff/liveness.h" + +namespace hotstuff { + +using salticidae::MsgNetwork; +using salticidae::PeerNetwork; +using salticidae::ElapsedTime; +using salticidae::_1; +using salticidae::_2; + +const double ent_waiting_timeout = 10; +const double double_inf = 1e10; + +enum { + PROPOSE = 0x0, + VOTE = 0x1, + QUERY_FETCH_BLK = 0x2, + RESP_FETCH_BLK = 0x3, +}; + +/** Network message format for HotStuff. */ +struct MsgHotStuff: public salticidae::MsgBase<> { + using MsgBase::MsgBase; + void gen_propose(const Proposal &); + void parse_propose(Proposal &) const; + + void gen_vote(const Vote &); + void parse_vote(Vote &) const; + + void gen_qfetchblk(const std::vector<uint256_t> &blk_hashes); + void parse_qfetchblk(std::vector<uint256_t> &blk_hashes) const; + + void gen_rfetchblk(const std::vector<block_t> &blks); + void parse_rfetchblk(std::vector<block_t> &blks, HotStuffCore *hsc) const; +}; + +using promise::promise_t; + +class HotStuffBase; + +template<EntityType ent_type> +class FetchContext: public promise_t { + Event timeout; + HotStuffBase *hs; + MsgHotStuff fetch_msg; + const uint256_t ent_hash; + std::unordered_set<NetAddr> replica_ids; + inline void timeout_cb(evutil_socket_t, short); + public: + FetchContext(const FetchContext &) = delete; + FetchContext &operator=(const FetchContext &) = delete; + FetchContext(FetchContext &&other); + + FetchContext(const uint256_t &ent_hash, HotStuffBase *hs); + ~FetchContext() {} + + inline void send(const NetAddr &replica_id); + inline void reset_timeout(); + inline void add_replica(const NetAddr &replica_id, bool fetch_now = true); +}; + +class BlockDeliveryContext: public promise_t { + public: + ElapsedTime elapsed; + BlockDeliveryContext &operator=(const BlockDeliveryContext &) = delete; + BlockDeliveryContext(const BlockDeliveryContext &other): + promise_t(static_cast<const promise_t &>(other)), + elapsed(other.elapsed) {} + BlockDeliveryContext(BlockDeliveryContext &&other): + promise_t(static_cast<const promise_t &>(other)), + elapsed(std::move(other.elapsed)) {} + template<typename Func> + BlockDeliveryContext(Func callback): promise_t(callback) { + elapsed.start(); + } +}; + + +/** HotStuff protocol (with network implementation). */ +class HotStuffBase: public HotStuffCore { + using BlockFetchContext = FetchContext<ENT_TYPE_BLK>; + using CmdFetchContext = FetchContext<ENT_TYPE_CMD>; + using conn_t = MsgNetwork<MsgHotStuff>::conn_t; + + friend BlockFetchContext; + friend CmdFetchContext; + + protected: + /** the binding address in replica network */ + NetAddr listen_addr; + /** the block size */ + size_t blk_size; + /** libevent handle */ + EventContext eb; + pacemaker_bt pmaker; + + private: + /** whether libevent handle is owned by itself */ + bool eb_loop; + /** network stack */ + PeerNetwork<MsgHotStuff> pn; +#ifdef HOTSTUFF_ENABLE_BLK_PROFILE + BlockProfiler blk_profiler; +#endif + /* queues for async tasks */ + std::unordered_map<const uint256_t, BlockFetchContext> blk_fetch_waiting; + std::unordered_map<const uint256_t, BlockDeliveryContext> blk_delivery_waiting; + std::unordered_map<const uint256_t, CmdFetchContext> cmd_fetch_waiting; + std::unordered_map<const uint256_t, promise_t> decision_waiting; + std::queue<command_t> cmd_pending; + + /* statistics */ + uint64_t fetched; + uint64_t delivered; + mutable uint64_t nsent; + mutable uint64_t nrecv; + + mutable uint32_t part_parent_size; + mutable uint32_t part_fetched; + mutable uint32_t part_delivered; + mutable uint32_t part_decided; + mutable uint32_t part_gened; + mutable double part_delivery_time; + mutable double part_delivery_time_min; + mutable double part_delivery_time_max; + mutable std::unordered_map<const NetAddr, uint32_t> part_fetched_replica; + + void on_fetch_cmd(const command_t &cmd); + void on_fetch_blk(const block_t &blk); + void on_deliver_blk(const block_t &blk); + + /** deliver consensus message: <propose> */ + inline void propose_handler(const MsgHotStuff &, conn_t); + /** deliver consensus message: <vote> */ + inline void vote_handler(const MsgHotStuff &, conn_t); + /** fetches full block data */ + inline void query_fetch_blk_handler(const MsgHotStuff &, conn_t); + /** receives a block */ + inline void resp_fetch_blk_handler(const MsgHotStuff &, conn_t); + + void do_broadcast_proposal(const Proposal &) override; + void do_vote(ReplicaID, const Vote &) override; + void do_decide(const command_t &) override; + + public: + HotStuffBase(uint32_t blk_size, + int32_t parent_limit, + ReplicaID rid, + privkey_bt &&priv_key, + NetAddr listen_addr, + EventContext eb = EventContext(), + pacemaker_bt pmaker = nullptr); + + ~HotStuffBase(); + + /* the API for HotStuffBase */ + + /* Submit the command to be decided. */ + void add_command(command_t cmd) { + cmd_pending.push(storage->add_cmd(cmd)); + if (cmd_pending.size() >= blk_size) + { + std::vector<command_t> cmds; + for (uint32_t i = 0; i < blk_size; i++) + { + cmds.push_back(cmd_pending.front()); + cmd_pending.pop(); + } + pmaker->beat().then([this, cmds = std::move(cmds)]() { + on_propose(cmds); + }); + } + } + + void add_replica(ReplicaID idx, const NetAddr &addr, pubkey_bt &&pub_key); + void start(bool eb_loop = false); + + size_t size() const { return pn.all_peers().size(); } + void print_stat() const; + + /* Helper functions */ + /** Returns a promise resolved (with command_t cmd) when Command is fetched. */ + promise_t async_fetch_cmd(const uint256_t &cmd_hash, const NetAddr *replica_id, bool fetch_now = true); + /** Returns a promise resolved (with block_t blk) when Block is fetched. */ + promise_t async_fetch_blk(const uint256_t &blk_hash, const NetAddr *replica_id, bool fetch_now = true); + /** Returns a promise resolved (with block_t blk) when Block is delivered (i.e. prefix is fetched). */ + promise_t async_deliver_blk(const uint256_t &blk_hash, const NetAddr &replica_id); + /** Returns a promise resolved (with command_t cmd) when Command is decided. */ + promise_t async_decide(const uint256_t &cmd_hash); +}; + +/** HotStuff protocol (templated by cryptographic implementation). */ +template<typename PrivKeyType = PrivKeyDummy, + typename PubKeyType = PubKeyDummy, + typename PartCertType = PartCertDummy, + typename QuorumCertType = QuorumCertDummy> +class HotStuff: public HotStuffBase { + using HotStuffBase::HotStuffBase; + protected: + + part_cert_bt create_part_cert(const PrivKey &priv_key, const uint256_t &blk_hash) override { + return new PartCertType( + static_cast<const PrivKeyType &>(priv_key), + blk_hash); + } + + part_cert_bt parse_part_cert(DataStream &s) override { + PartCert *pc = new PartCertType(); + s >> *pc; + return pc; + } + + quorum_cert_bt create_quorum_cert(const uint256_t &blk_hash) override { + return new QuorumCertType(get_config(), blk_hash); + } + + quorum_cert_bt parse_quorum_cert(DataStream &s) override { + QuorumCert *qc = new QuorumCertType(); + s >> *qc; + return qc; + } + + public: + HotStuff(uint32_t blk_size, + int32_t parent_limit, + ReplicaID rid, + const bytearray_t &raw_privkey, + NetAddr listen_addr, + EventContext eb = nullptr): + HotStuffBase(blk_size, + parent_limit, + rid, + new PrivKeyType(raw_privkey), + listen_addr, + eb) {} + + void add_replica(ReplicaID idx, const NetAddr &addr, const bytearray_t &pubkey_raw) { + DataStream s(pubkey_raw); + HotStuffBase::add_replica(idx, addr, new PubKeyType(pubkey_raw)); + } +}; + +using HotStuffNoSig = HotStuff<>; +using HotStuffSecp256k1 = HotStuff<PrivKeySecp256k1, PubKeySecp256k1, + PartCertSecp256k1, QuorumCertSecp256k1>; + +template<EntityType ent_type> +FetchContext<ent_type>::FetchContext(FetchContext && other): + promise_t(static_cast<const promise_t &>(other)), + hs(other.hs), + fetch_msg(std::move(other.fetch_msg)), + ent_hash(other.ent_hash), + replica_ids(std::move(other.replica_ids)) { + other.timeout.del(); + timeout = Event(hs->eb, -1, 0, + std::bind(&FetchContext::timeout_cb, this, _1, _2)); + reset_timeout(); +} + +template<> +inline void FetchContext<ENT_TYPE_CMD>::timeout_cb(evutil_socket_t, short) { + HOTSTUFF_LOG_WARN("cmd fetching %.10s timeout", get_hex(ent_hash).c_str()); + for (const auto &replica_id: replica_ids) + send(replica_id); + reset_timeout(); +} + +template<> +inline void FetchContext<ENT_TYPE_BLK>::timeout_cb(evutil_socket_t, short) { + HOTSTUFF_LOG_WARN("block fetching %.10s timeout", get_hex(ent_hash).c_str()); + for (const auto &replica_id: replica_ids) + send(replica_id); + reset_timeout(); +} + +template<EntityType ent_type> +FetchContext<ent_type>::FetchContext( + const uint256_t &ent_hash, HotStuffBase *hs): + promise_t([](promise_t){}), + hs(hs), ent_hash(ent_hash) { + fetch_msg.gen_qfetchblk(std::vector<uint256_t>{ent_hash}); + + timeout = Event(hs->eb, -1, 0, + std::bind(&FetchContext::timeout_cb, this, _1, _2)); + reset_timeout(); +} + +template<EntityType ent_type> +void FetchContext<ent_type>::send(const NetAddr &replica_id) { + hs->part_fetched_replica[replica_id]++; + hs->pn.send_msg(fetch_msg, replica_id); +} + +template<EntityType ent_type> +void FetchContext<ent_type>::reset_timeout() { + timeout.add_with_timeout(salticidae::gen_rand_timeout(ent_waiting_timeout)); +} + +template<EntityType ent_type> +void FetchContext<ent_type>::add_replica(const NetAddr &replica_id, bool fetch_now) { + if (replica_ids.empty() && fetch_now) + send(replica_id); + replica_ids.insert(replica_id); +} + +} + +#endif diff --git a/include/hotstuff/liveness.h b/include/hotstuff/liveness.h new file mode 100644 index 0000000..f8d3c50 --- /dev/null +++ b/include/hotstuff/liveness.h @@ -0,0 +1,78 @@ +#ifndef _HOTSTUFF_LIVENESS_H +#define _HOTSTUFF_LIVENESS_H + +#include "hotstuff/consensus.h" + +namespace hotstuff { + +/** Abstraction for liveness gadget (oracle). */ +class PaceMaker { + public: + virtual ~PaceMaker() = default; + /** Get a promise resolved when the pace maker thinks it is a *good* time + * to issue new commands. When promise is resolved with the ID of itself, + * the replica should propose the command, otherwise it will forward the + * command to the proposer indicated by the ID. */ + virtual promise_t beat() = 0; + /** Get a promise resolved when the pace maker thinks it is a *good* time + * to vote for a block. The promise is resolved with the next proposer's ID + * */ + virtual promise_t next_proposer(ReplicaID last_proposer) = 0; +}; + +using pacemaker_bt = BoxObj<PaceMaker>; + +/** A pace maker that waits for the qc of the last proposed block. */ +class PaceMakerDummy: public PaceMaker { + HotStuffCore *hsc; + std::queue<promise_t> pending_beats; + block_t last_proposed; + bool locked; + + void schedule_next() { + if (!pending_beats.empty() && !locked) + { + auto pm = pending_beats.front(); + pending_beats.pop(); + hsc->async_qc_finish(last_proposed).then( + [id = hsc->get_id(), pm]() { + pm.resolve(id); + }); + locked = true; + } + } + + void update_last_proposed() { + hsc->async_wait_propose().then([this](block_t blk) { + update_last_proposed(); + last_proposed = blk; + locked = false; + schedule_next(); + }); + } + + public: + PaceMakerDummy(HotStuffCore *hsc): + hsc(hsc), + last_proposed(hsc->get_genesis()), + locked(false) { + update_last_proposed(); + } + + promise_t beat() override { + promise_t pm; + pending_beats.push(pm); + schedule_next(); + return pm; + } + + promise_t next_proposer(ReplicaID last_proposer) override { + return promise_t([last_proposer](promise_t &pm) { + pm.resolve(last_proposer); + }); + } +}; + +} + +#endif diff --git a/include/hotstuff/type.h b/include/hotstuff/type.h index 4665979..670ee6c 100644 --- a/include/hotstuff/type.h +++ b/include/hotstuff/type.h @@ -1,12 +1,19 @@ #ifndef _HOTSTUFF_TYPE_H #define _HOTSTUFF_TYPE_H +#include "promise.hpp" +#include "salticidae/event.h" +#include "salticidae/ref.h" +#include "salticidae/netaddr.h" #include "salticidae/stream.h" #include "salticidae/type.h" #include "salticidae/util.h" namespace hotstuff { +using salticidae::RcObj; +using salticidae::BoxObj; + using salticidae::uint256_t; using salticidae::DataStream; using salticidae::htole; @@ -14,6 +21,12 @@ using salticidae::letoh; using salticidae::get_hex; using salticidae::from_hex; using salticidae::bytearray_t; +using salticidae::get_hash; + +using salticidae::NetAddr; +using salticidae::Event; +using salticidae::EventContext; +using promise::promise_t; inline std::string get_hex10(const uint256_t &x) { return get_hex(x).substr(0, 10); diff --git a/salticidae b/salticidae -Subproject fd881223556fc608b24626a7bc6e78bf576d3ed +Subproject 12bf781e762705f2bbabe5102148ac699e20ef1 diff --git a/src/consensus.cpp b/src/consensus.cpp new file mode 100644 index 0000000..7749558 --- /dev/null +++ b/src/consensus.cpp @@ -0,0 +1,296 @@ +#include <cassert> +#include <stack> + +#include "hotstuff/util.h" +#include "hotstuff/consensus.h" + +#define LOG_INFO HOTSTUFF_LOG_INFO +#define LOG_DEBUG HOTSTUFF_LOG_DEBUG +#define LOG_WARN HOTSTUFF_LOG_WARN + +namespace hotstuff { + +/* The core logic of HotStuff, is farily simple :) */ +/*** begin HotStuff protocol logic ***/ +HotStuffCore::HotStuffCore(ReplicaID id, + privkey_bt &&priv_key, + int32_t parent_limit): + b0(new Block(true, 1)), + bqc(b0), + bexec(b0), + vheight(0), + priv_key(std::move(priv_key)), + tails{bqc}, + id(id), + parent_limit(parent_limit), + storage(new EntityStorage()) { + storage->add_blk(b0); + b0->qc_ref = b0; +} + +void HotStuffCore::sanity_check_delivered(const block_t &blk) { + if (!blk->delivered) + throw std::runtime_error("block not delivered"); +} + +block_t HotStuffCore::sanity_check_delivered(const uint256_t &blk_hash) { + block_t blk = storage->find_blk(blk_hash); + if (blk == nullptr || !blk->delivered) + throw std::runtime_error("block not delivered"); + return std::move(blk); +} + +bool HotStuffCore::on_deliver_blk(const block_t &blk) { + if (blk->delivered) + { + LOG_WARN("attempt to deliver a block twice"); + return false; + } + blk->parents.clear(); + for (const auto &hash: blk->parent_hashes) + { + block_t p = sanity_check_delivered(hash); + blk->parents.push_back(p); + } + blk->height = blk->parents[0]->height + 1; + for (const auto &cmd: blk->cmds) + cmd->container = blk; + + if (blk->qc) + { + block_t _blk = storage->find_blk(blk->qc->get_blk_hash()); + if (_blk == nullptr) + throw std::runtime_error("block referred by qc not fetched"); + blk->qc_ref = std::move(_blk); + } // otherwise blk->qc_ref remains null + + for (auto pblk: blk->parents) tails.erase(pblk); + tails.insert(blk); + + blk->delivered = true; + LOG_DEBUG("delivered %.10s", get_hex(blk->get_hash()).c_str()); + return true; +} + +void HotStuffCore::check_commit(const block_t &_blk) { + const block_t &blk = _blk->qc_ref; + if (blk->qc_ref == nullptr) return; + if (blk->decision) return; + block_t p = blk->parents[0]; + if (p == blk->qc_ref) + { /* commit */ + std::vector<block_t> commit_queue; + block_t b; + for (b = p; b->height > bexec->height; b = b->parents[0]) + { /* todo: also commit the uncles/aunts */ + commit_queue.push_back(b); + } + if (b != bexec) + throw std::runtime_error("safety breached :("); + for (auto it = commit_queue.rbegin(); it != commit_queue.rend(); it++) + { + const block_t &blk = *it; + blk->decision = 1; +#ifdef HOTSTUFF_ENABLE_LOG_PROTO + LOG_INFO("commit blk %.10s", get_hex10(blk->get_hash()).c_str()); +#endif + for (auto cmd: blk->cmds) + do_decide(cmd); + } + bexec = p; + } +} + +bool HotStuffCore::update(const uint256_t &bqc_hash) { + block_t _bqc = sanity_check_delivered(bqc_hash); + if (_bqc->qc_ref == nullptr) return false; + check_commit(_bqc); + if (_bqc->qc_ref->height > bqc->qc_ref->height) + bqc = _bqc; + return true; +} + +void HotStuffCore::on_propose(const std::vector<command_t> &cmds) { + size_t nparents = parent_limit < 1 ? tails.size() : parent_limit; + assert(tails.size() > 0); + block_t p = *tails.rbegin(); + std::vector<block_t> parents{p}; + tails.erase(p); + nparents--; + /* add the rest of tails as "uncles/aunts" */ + while (nparents--) + { + auto it = tails.begin(); + parents.push_back(*it); + tails.erase(it); + } + quorum_cert_bt qc = nullptr; + block_t qc_ref = nullptr; + if (p != b0 && p->voted.size() >= config.nmajority) + { + qc = p->self_qc->clone(); + qc->compute(); + qc_ref = p; + } + /* create a new block */ + block_t bnew = storage->add_blk( + Block( + parents, + cmds, + p->height + 1, + std::move(qc), qc_ref, + nullptr + )); + const uint256_t bnew_hash = bnew->get_hash(); + bnew->self_qc = create_quorum_cert(bnew_hash); + on_deliver_blk(bnew); + update(bnew_hash); + Proposal prop(id, bqc->get_hash(), bnew, nullptr); +#ifdef HOTSTUFF_ENABLE_LOG_PROTO + LOG_INFO("propose %s", std::string(*bnew).c_str()); +#endif + /* self-vote */ + on_receive_vote( + Vote(id, bqc->get_hash(), bnew_hash, + create_part_cert(*priv_key, bnew_hash), this)); + on_propose_(bnew); + /* boradcast to other replicas */ + do_broadcast_proposal(prop); +} + +void HotStuffCore::on_receive_proposal(const Proposal &prop) { + if (!update(prop.bqc_hash)) return; +#ifdef HOTSTUFF_ENABLE_LOG_PROTO + LOG_INFO("got %s", std::string(prop).c_str()); +#endif + block_t bnew = prop.blk; + sanity_check_delivered(bnew); + bool opinion = false; + if (bnew->height > vheight) + { + block_t pref = bqc->qc_ref; + block_t b; + for (b = bnew; + b->height > pref->height; + b = b->parents[0]); + opinion = b == pref; + vheight = bnew->height; + } +#ifdef HOTSTUFF_ENABLE_LOG_PROTO + LOG_INFO("now state: %s", std::string(*this).c_str()); +#endif + do_vote(prop.proposer, + Vote(id, + bqc->get_hash(), + bnew->get_hash(), + (opinion ? + create_part_cert(*priv_key, bnew->get_hash()) : + nullptr), + nullptr)); +} + +void HotStuffCore::on_receive_vote(const Vote &vote) { + if (!update(vote.bqc_hash)) return; +#ifdef HOTSTUFF_ENABLE_LOG_PROTO + LOG_INFO("got %s", std::string(vote).c_str()); + LOG_INFO("now state: %s", std::string(*this).c_str()); +#endif + + block_t blk = sanity_check_delivered(vote.blk_hash); + if (vote.cert == nullptr) return; + if (!vote.verify()) + { + LOG_WARN("invalid vote"); + return; + } + if (!blk->voted.insert(vote.voter).second) + { + LOG_WARN("duplicate votes"); + return; + } + size_t qsize = blk->voted.size(); + if (qsize <= config.nmajority) + { + blk->self_qc->add_part(vote.voter, *vote.cert); + if (qsize == config.nmajority) + on_qc_finish(blk); + } +} +/*** end HotStuff protocol logic ***/ + +void HotStuffCore::prune(uint32_t staleness) { + block_t start; + /* skip the blocks */ + for (start = bexec; staleness; staleness--, start = start->parents[0]) + if (!start->parents.size()) return; + std::stack<block_t> s; + start->qc_ref = nullptr; + s.push(start); + while (!s.empty()) + { + auto &blk = s.top(); + if (blk->parents.empty()) + { + storage->try_release_blk(blk); + s.pop(); + continue; + } + blk->qc_ref = nullptr; + s.push(blk->parents.back()); + blk->parents.pop_back(); + } +} + +int8_t HotStuffCore::get_cmd_decision(const uint256_t &cmd_hash) { + auto cmd = storage->find_cmd(cmd_hash); + return cmd != nullptr ? cmd->get_decision() : 0; +} + +void HotStuffCore::add_replica(ReplicaID rid, const NetAddr &addr, + pubkey_bt &&pub_key) { + config.add_replica(rid, + ReplicaInfo(rid, addr, std::move(pub_key))); + b0->voted.insert(rid); +} + +promise_t HotStuffCore::async_qc_finish(const block_t &blk) { + if (blk->voted.size() >= config.nmajority) + return promise_t([](promise_t &pm) { + pm.resolve(); + }); + auto it = qc_waiting.find(blk); + if (it == qc_waiting.end()) + it = qc_waiting.insert(std::make_pair(blk, promise_t())).first; + return it->second; +} + +void HotStuffCore::on_qc_finish(const block_t &blk) { + auto it = qc_waiting.find(blk); + if (it != qc_waiting.end()) + { + it->second.resolve(); + qc_waiting.erase(it); + } +} + +promise_t HotStuffCore::async_wait_propose() { + return propose_waiting; +} + +void HotStuffCore::on_propose_(const block_t &blk) { + auto t = std::move(propose_waiting); + propose_waiting = promise_t(); + t.resolve(blk); +} + +HotStuffCore::operator std::string () const { + DataStream s; + s << "<hotstuff " + << "bqc=" << get_hex10(bqc->get_hash()) << " " + << "bexec=" << get_hex10(bqc->get_hash()) << " " + << "vheight=" << std::to_string(vheight) << " " + << "tails=" << std::to_string(tails.size()) << ">"; + return std::string(std::move(s)); +} + +} diff --git a/src/core.cpp b/src/core.cpp deleted file mode 100644 index 125e168..0000000 --- a/src/core.cpp +++ /dev/null @@ -1,723 +0,0 @@ -#include <stack> -#include "hotstuff/core.h" - -using salticidae::DataStream; -using salticidae::static_pointer_cast; -using salticidae::get_hash; - -#define LOG_INFO HOTSTUFF_LOG_INFO -#define LOG_DEBUG HOTSTUFF_LOG_DEBUG -#define LOG_WARN HOTSTUFF_LOG_WARN - -namespace hotstuff { - -void MsgHotStuff::gen_propose(const Proposal &proposal) { - DataStream s; - set_opcode(PROPOSE); - s << proposal; - set_payload(std::move(s)); -} - -void MsgHotStuff::parse_propose(Proposal &proposal) const { - DataStream(get_payload()) >> proposal; -} - -void MsgHotStuff::gen_vote(const Vote &vote) { - DataStream s; - set_opcode(VOTE); - s << vote; - set_payload(std::move(s)); -} - -void MsgHotStuff::parse_vote(Vote &vote) const { - DataStream(get_payload()) >> vote; -} - -void MsgHotStuff::gen_qfetchblk(const std::vector<uint256_t> &blk_hashes) { - DataStream s; - set_opcode(QUERY_FETCH_BLK); - gen_hash_list(s, blk_hashes); - set_payload(std::move(s)); -} - -void MsgHotStuff::parse_qfetchblk(std::vector<uint256_t> &blk_hashes) const { - DataStream s(get_payload()); - parse_hash_list(s, blk_hashes); -} - -void MsgHotStuff::gen_rfetchblk(const std::vector<block_t> &blks) { - DataStream s; - set_opcode(RESP_FETCH_BLK); - s << htole((uint32_t)blks.size()); - for (auto blk: blks) s << *blk; - set_payload(std::move(s)); -} - -void MsgHotStuff::parse_rfetchblk(std::vector<block_t> &blks, HotStuffCore *hsc) const { - DataStream s; - uint32_t size; - s >> size; - size = letoh(size); - blks.resize(size); - for (auto &blk: blks) - { - Block _blk; - _blk.unserialize(s, hsc); - if (!_blk.verify(hsc->get_config())) - blk = hsc->storage->add_blk(std::move(_blk)); - else - { - blk = nullptr; - LOG_WARN("block is invalid"); - } - } -} - -/* The core logic of HotStuff, is farily simple :) */ -/*** begin HotStuff protocol logic ***/ -HotStuffCore::HotStuffCore(ReplicaID id, - privkey_bt &&priv_key, - int32_t parent_limit): - b0(new Block(true, 1)), - bqc(b0), - bexec(b0), - vheight(0), - priv_key(std::move(priv_key)), - tails{bqc}, - id(id), - parent_limit(parent_limit), - storage(new EntityStorage()) { - storage->add_blk(b0); - b0->qc_ref = b0; -} - -void HotStuffCore::sanity_check_delivered(const block_t &blk) { - if (!blk->delivered) - throw std::runtime_error("block not delivered"); -} - -block_t HotStuffCore::sanity_check_delivered(const uint256_t &blk_hash) { - block_t blk = storage->find_blk(blk_hash); - if (blk == nullptr || !blk->delivered) - throw std::runtime_error("block not delivered"); - return std::move(blk); -} - -bool HotStuffCore::on_deliver_blk(const block_t &blk) { - if (blk->delivered) - { - LOG_WARN("attempt to deliver a block twice"); - return false; - } - blk->parents.clear(); - for (const auto &hash: blk->parent_hashes) - { - block_t p = sanity_check_delivered(hash); - blk->parents.push_back(p); - } - blk->height = blk->parents[0]->height + 1; - for (const auto &cmd: blk->cmds) - cmd->container = blk; - - if (blk->qc) - { - block_t _blk = storage->find_blk(blk->qc->get_blk_hash()); - if (_blk == nullptr) - throw std::runtime_error("block referred by qc not fetched"); - blk->qc_ref = std::move(_blk); - } // otherwise blk->qc_ref remains null - - for (auto pblk: blk->parents) tails.erase(pblk); - tails.insert(blk); - - blk->delivered = true; - LOG_DEBUG("delivered %.10s", get_hex(blk->get_hash()).c_str()); - return true; -} - -void HotStuffCore::check_commit(const block_t &_blk) { - const block_t &blk = _blk->qc_ref; - if (blk->qc_ref == nullptr) return; - if (blk->decision) return; - block_t p = blk->parents[0]; - if (p == blk->qc_ref) - { /* commit */ - std::vector<block_t> commit_queue; - block_t b; - for (b = p; b->height > bexec->height; b = b->parents[0]) - { /* todo: also commit the uncles/aunts */ - commit_queue.push_back(b); - } - if (b != bexec) - throw std::runtime_error("safety breached :("); - for (auto it = commit_queue.rbegin(); it != commit_queue.rend(); it++) - { - const block_t &blk = *it; - blk->decision = 1; -#ifdef HOTSTUFF_ENABLE_LOG_PROTO - LOG_INFO("commit blk %.10s", get_hex10(blk->get_hash()).c_str()); -#endif - for (auto cmd: blk->cmds) - do_decide(cmd); - } - bexec = p; - } -} - -bool HotStuffCore::update(const uint256_t &bqc_hash) { - block_t _bqc = sanity_check_delivered(bqc_hash); - if (_bqc->qc_ref == nullptr) return false; - check_commit(_bqc); - if (_bqc->qc_ref->height > bqc->qc_ref->height) - bqc = _bqc; - return true; -} - -void HotStuffCore::on_propose(const std::vector<command_t> &cmds) { - size_t nparents = parent_limit < 1 ? tails.size() : parent_limit; - assert(tails.size() > 0); - block_t p = *tails.rbegin(); - std::vector<block_t> parents{p}; - tails.erase(p); - nparents--; - /* add the rest of tails as "uncles/aunts" */ - while (nparents--) - { - auto it = tails.begin(); - parents.push_back(*it); - tails.erase(it); - } - quorum_cert_bt qc = nullptr; - block_t qc_ref = nullptr; - if (p != b0 && p->voted.size() >= config.nmajority) - { - qc = p->self_qc->clone(); - qc->compute(); - qc_ref = p; - } - /* create a new block */ - block_t bnew = storage->add_blk( - Block( - parents, - cmds, - p->height + 1, - std::move(qc), qc_ref, - nullptr - )); - const uint256_t bnew_hash = bnew->get_hash(); - bnew->self_qc = create_quorum_cert(bnew_hash); - on_deliver_blk(bnew); - update(bnew_hash); - Proposal prop(id, bqc->get_hash(), bnew, nullptr); -#ifdef HOTSTUFF_ENABLE_LOG_PROTO - LOG_INFO("propose %s", std::string(*bnew).c_str()); -#endif - /* self-vote */ - on_receive_vote( - Vote(id, bqc->get_hash(), bnew_hash, - create_part_cert(*priv_key, bnew_hash), this)); - on_propose_(bnew); - /* boradcast to other replicas */ - do_broadcast_proposal(prop); -} - -void HotStuffCore::on_receive_proposal(const Proposal &prop) { - if (!update(prop.bqc_hash)) return; -#ifdef HOTSTUFF_ENABLE_LOG_PROTO - LOG_INFO("got %s", std::string(prop).c_str()); -#endif - block_t bnew = prop.blk; - sanity_check_delivered(bnew); - bool opinion = false; - if (bnew->height > vheight) - { - block_t pref = bqc->qc_ref; - block_t b; - for (b = bnew; - b->height > pref->height; - b = b->parents[0]); - opinion = b == pref; - vheight = bnew->height; - } -#ifdef HOTSTUFF_ENABLE_LOG_PROTO - LOG_INFO("now state: %s", std::string(*this).c_str()); -#endif - do_vote(prop.proposer, - Vote(id, - bqc->get_hash(), - bnew->get_hash(), - (opinion ? - create_part_cert(*priv_key, bnew->get_hash()) : - nullptr), - nullptr)); -} - -void HotStuffCore::on_receive_vote(const Vote &vote) { - if (!update(vote.bqc_hash)) return; -#ifdef HOTSTUFF_ENABLE_LOG_PROTO - LOG_INFO("got %s", std::string(vote).c_str()); - LOG_INFO("now state: %s", std::string(*this).c_str()); -#endif - - block_t blk = sanity_check_delivered(vote.blk_hash); - if (vote.cert == nullptr) return; - if (!vote.verify()) - { - LOG_WARN("invalid vote"); - return; - } - if (!blk->voted.insert(vote.voter).second) - { - LOG_WARN("duplicate votes"); - return; - } - size_t qsize = blk->voted.size(); - if (qsize <= config.nmajority) - { - blk->self_qc->add_part(vote.voter, *vote.cert); - if (qsize == config.nmajority) - on_qc_finish(blk); - } -} -/*** end HotStuff protocol logic ***/ - -void HotStuffCore::prune(uint32_t staleness) { - block_t start; - /* skip the blocks */ - for (start = bexec; staleness; staleness--, start = start->parents[0]) - if (!start->parents.size()) return; - std::stack<block_t> s; - start->qc_ref = nullptr; - s.push(start); - while (!s.empty()) - { - auto &blk = s.top(); - if (blk->parents.empty()) - { - storage->try_release_blk(blk); - s.pop(); - continue; - } - blk->qc_ref = nullptr; - s.push(blk->parents.back()); - blk->parents.pop_back(); - } -} - -int8_t HotStuffCore::get_cmd_decision(const uint256_t &cmd_hash) { - auto cmd = storage->find_cmd(cmd_hash); - return cmd != nullptr ? cmd->get_decision() : 0; -} - -void HotStuffCore::add_replica(ReplicaID rid, const NetAddr &addr, - pubkey_bt &&pub_key) { - config.add_replica(rid, - ReplicaInfo(rid, addr, std::move(pub_key))); - b0->voted.insert(rid); -} - -promise_t HotStuffCore::async_qc_finish(const block_t &blk) { - if (blk->voted.size() >= config.nmajority) - return promise_t([](promise_t &pm) { - pm.resolve(); - }); - auto it = qc_waiting.find(blk); - if (it == qc_waiting.end()) - it = qc_waiting.insert(std::make_pair(blk, promise_t())).first; - return it->second; -} - -void HotStuffCore::on_qc_finish(const block_t &blk) { - auto it = qc_waiting.find(blk); - if (it != qc_waiting.end()) - { - it->second.resolve(); - qc_waiting.erase(it); - } -} - -promise_t HotStuffCore::async_wait_propose() { - return propose_waiting; -} - -void HotStuffCore::on_propose_(const block_t &blk) { - auto t = std::move(propose_waiting); - propose_waiting = promise_t(); - t.resolve(blk); -} - -HotStuffCore::operator std::string () const { - DataStream s; - s << "<hotstuff " - << "bqc=" << get_hex10(bqc->get_hash()) << " " - << "bexec=" << get_hex10(bqc->get_hash()) << " " - << "vheight=" << std::to_string(vheight) << " " - << "tails=" << std::to_string(tails.size()) << ">"; - return std::string(std::move(s)); -} - -void HotStuffBase::add_replica(ReplicaID idx, const NetAddr &addr, - pubkey_bt &&pub_key) { - HotStuffCore::add_replica(idx, addr, std::move(pub_key)); - if (addr != listen_addr) - pn.add_peer(addr); -} - -void HotStuffBase::on_fetch_blk(const block_t &blk) { -#ifdef HOTSTUFF_ENABLE_TX_PROFILE - blk_profiler.get_tx(blk->get_hash()); -#endif - LOG_DEBUG("fetched %.10s", get_hex(blk->get_hash()).c_str()); - part_fetched++; - fetched++; - for (auto cmd: blk->get_cmds()) on_fetch_cmd(cmd); - const uint256_t &blk_hash = blk->get_hash(); - auto it = blk_fetch_waiting.find(blk_hash); - if (it != blk_fetch_waiting.end()) - { - it->second.resolve(blk); - blk_fetch_waiting.erase(it); - } -} - -void HotStuffBase::on_fetch_cmd(const command_t &cmd) { - const uint256_t &cmd_hash = cmd->get_hash(); - auto it = cmd_fetch_waiting.find(cmd_hash); - if (it != cmd_fetch_waiting.end()) - { - it->second.resolve(cmd); - cmd_fetch_waiting.erase(it); - } -} - -void HotStuffBase::on_deliver_blk(const block_t &blk) { - const uint256_t &blk_hash = blk->get_hash(); - bool valid; - /* sanity check: all parents must be delivered */ - for (const auto &p: blk->get_parent_hashes()) - assert(storage->is_blk_delivered(p)); - if ((valid = HotStuffCore::on_deliver_blk(blk))) - { - LOG_DEBUG("block %.10s delivered", - get_hex(blk_hash).c_str()); - part_parent_size += blk->get_parent_hashes().size(); - part_delivered++; - delivered++; - } - else - { - LOG_WARN("dropping invalid block"); - } - - auto it = blk_delivery_waiting.find(blk_hash); - if (it != blk_delivery_waiting.end()) - { - auto &pm = it->second; - if (valid) - { - pm.elapsed.stop(false); - auto sec = pm.elapsed.elapsed_sec; - part_delivery_time += sec; - part_delivery_time_min = std::min(part_delivery_time_min, sec); - part_delivery_time_max = std::max(part_delivery_time_max, sec); - - pm.resolve(blk); - } - else - { - pm.reject(blk); - // TODO: do we need to also free it from storage? - } - blk_delivery_waiting.erase(it); - } -} - -promise_t HotStuffBase::async_fetch_blk(const uint256_t &blk_hash, - const NetAddr *replica_id, - bool fetch_now) { - if (storage->is_blk_fetched(blk_hash)) - return promise_t([this, &blk_hash](promise_t pm){ - pm.resolve(storage->find_blk(blk_hash)); - }); - auto it = blk_fetch_waiting.find(blk_hash); - if (it == blk_fetch_waiting.end()) - { -#ifdef HOTSTUFF_ENABLE_TX_PROFILE - blk_profiler.rec_tx(blk_hash, false); -#endif - it = blk_fetch_waiting.insert( - std::make_pair( - blk_hash, - BlockFetchContext(blk_hash, this))).first; - } - if (replica_id != nullptr) - it->second.add_replica(*replica_id, fetch_now); - return static_cast<promise_t &>(it->second); -} - -promise_t HotStuffBase::async_fetch_cmd(const uint256_t &cmd_hash, - const NetAddr *replica_id, - bool fetch_now) { - if (storage->is_cmd_fetched(cmd_hash)) - return promise_t([this, &cmd_hash](promise_t pm){ - pm.resolve(storage->find_cmd(cmd_hash)); - }); - auto it = cmd_fetch_waiting.find(cmd_hash); - if (it == cmd_fetch_waiting.end()) - { - it = cmd_fetch_waiting.insert( - std::make_pair(cmd_hash, CmdFetchContext(cmd_hash, this))).first; - } - if (replica_id != nullptr) - it->second.add_replica(*replica_id, fetch_now); - return static_cast<promise_t &>(it->second); -} - -promise_t HotStuffBase::async_deliver_blk(const uint256_t &blk_hash, - const NetAddr &replica_id) { - if (storage->is_blk_delivered(blk_hash)) - return promise_t([this, &blk_hash](promise_t pm) { - pm.resolve(storage->find_blk(blk_hash)); - }); - auto it = blk_delivery_waiting.find(blk_hash); - if (it != blk_delivery_waiting.end()) - return static_cast<promise_t &>(it->second); - BlockDeliveryContext pm{[](promise_t){}}; - it = blk_delivery_waiting.insert(std::make_pair(blk_hash, pm)).first; - /* otherwise the on_deliver_batch will resolve */ - async_fetch_blk(blk_hash, &replica_id).then([this, replica_id](block_t blk) { - /* qc_ref should be fetched */ - std::vector<promise_t> pms; - const auto &qc = blk->get_qc(); - if (qc) - pms.push_back(async_fetch_blk(qc->get_blk_hash(), &replica_id)); - /* the parents should be delivered */ - for (const auto &phash: blk->get_parent_hashes()) - pms.push_back(async_deliver_blk(phash, replica_id)); - promise::all(pms).then([this, blk]() { - on_deliver_blk(blk); - }); - }); - return static_cast<promise_t &>(pm); -} - -void HotStuffBase::propose_handler(const MsgHotStuff &msg, conn_t conn_) { - auto conn = static_pointer_cast<PeerNetwork<MsgHotStuff>::Conn>(conn_); - const NetAddr &peer = conn->get_peer(); - Proposal prop(this); - msg.parse_propose(prop); - block_t blk = prop.blk; - promise::all(std::vector<promise_t>{ - async_deliver_blk(prop.bqc_hash, peer), - async_deliver_blk(blk->get_hash(), peer), - }).then([this, prop = std::move(prop)]() { - on_receive_proposal(prop); - }); -} - -void HotStuffBase::vote_handler(const MsgHotStuff &msg, conn_t conn_) { - auto conn = static_pointer_cast<PeerNetwork<MsgHotStuff>::Conn>(conn_); - const NetAddr &peer = conn->get_peer(); - Vote vote(this); - msg.parse_vote(vote); - promise::all(std::vector<promise_t>{ - async_deliver_blk(vote.bqc_hash, peer), - async_deliver_blk(vote.blk_hash, peer) - }).then([this, vote = std::move(vote)]() { - on_receive_vote(vote); - }); -} - -void HotStuffBase::query_fetch_blk_handler(const MsgHotStuff &msg, conn_t conn_) { - auto conn = static_pointer_cast<PeerNetwork<MsgHotStuff>::Conn>(conn_); - const NetAddr replica = conn->get_peer(); - std::vector<uint256_t> blk_hashes; - msg.parse_qfetchblk(blk_hashes); - - std::vector<promise_t> pms; - for (const auto &h: blk_hashes) - pms.push_back(async_fetch_blk(h, nullptr)); - promise::all(pms).then([replica, this](const promise::values_t values) { - MsgHotStuff resp; - std::vector<block_t> blks; - for (auto &v: values) - { - auto blk = promise::any_cast<block_t>(v); - blks.push_back(blk); - } - resp.gen_rfetchblk(blks); - pn.send_msg(resp, replica); - }); -} - -void HotStuffBase::resp_fetch_blk_handler(const MsgHotStuff &msg, conn_t) { - std::vector<block_t> blks; - msg.parse_rfetchblk(blks, this); - for (const auto &blk: blks) - if (blk) on_fetch_blk(blk); -} - -void HotStuffBase::print_stat() const { - LOG_INFO("===== begin stats ====="); - LOG_INFO("-------- queues -------"); - LOG_INFO("blk_fetch_waiting: %lu", blk_fetch_waiting.size()); - LOG_INFO("blk_delivery_waiting: %lu", blk_delivery_waiting.size()); - LOG_INFO("cmd_fetch_waiting: %lu", cmd_fetch_waiting.size()); - LOG_INFO("decision_waiting: %lu", decision_waiting.size()); - LOG_INFO("-------- misc ---------"); - LOG_INFO("fetched: %lu", fetched); - LOG_INFO("delivered: %lu", delivered); - LOG_INFO("cmd_cache: %lu", storage->get_cmd_cache_size()); - LOG_INFO("blk_cache: %lu", storage->get_blk_cache_size()); - LOG_INFO("------ misc (10s) -----"); - LOG_INFO("fetched: %lu", part_fetched); - LOG_INFO("delivered: %lu", part_delivered); - LOG_INFO("decided: %lu", part_decided); - LOG_INFO("gened: %lu", part_gened); - LOG_INFO("avg. parent_size: %.3f", - part_delivered ? part_parent_size / double(part_delivered) : 0); - LOG_INFO("delivery time: %.3f avg, %.3f min, %.3f max", - part_delivered ? part_delivery_time / double(part_delivered) : 0, - part_delivery_time_min == double_inf ? 0 : part_delivery_time_min, - part_delivery_time_max); - - part_parent_size = 0; - part_fetched = 0; - part_delivered = 0; - part_decided = 0; - part_gened = 0; - part_delivery_time = 0; - part_delivery_time_min = double_inf; - part_delivery_time_max = 0; - LOG_INFO("-- sent opcode (10s) --"); - auto &sent_op = pn.get_sent_by_opcode(); - for (auto &op: sent_op) - { - auto &val = op.second; - LOG_INFO("%02x: %lu, %.2fBpm", op.first, - val.first, val.first ? val.second / double(val.first) : 0); - val.first = val.second = 0; - } - LOG_INFO("-- recv opcode (10s) --"); - auto &recv_op = pn.get_recv_by_opcode(); - for (auto &op: recv_op) - { - auto &val = op.second; - LOG_INFO("%02x: %lu, %.2fBpm", op.first, - val.first, val.first ? val.second / double(val.first) : 0); - val.first = val.second = 0; - } - LOG_INFO("--- replica msg. (10s) ---"); - size_t _nsent = 0; - size_t _nrecv = 0; - for (const auto &replica: pn.all_peers()) - { - auto conn = pn.get_peer_conn(replica); - size_t ns = conn->get_nsent(); - size_t nr = conn->get_nrecv(); - conn->clear_nsent(); - conn->clear_nrecv(); - LOG_INFO("%s: %u, %u, %u", - std::string(replica).c_str(), ns, nr, part_fetched_replica[replica]); - _nsent += ns; - _nrecv += nr; - part_fetched_replica[replica] = 0; - } - nsent += _nsent; - nrecv += _nrecv; - LOG_INFO("sent: %lu", _nsent); - LOG_INFO("recv: %lu", _nrecv); - LOG_INFO("--- replica msg. total ---"); - LOG_INFO("sent: %lu", nsent); - LOG_INFO("recv: %lu", nrecv); - LOG_INFO("====== end stats ======"); -} - -promise_t HotStuffBase::async_decide(const uint256_t &cmd_hash) { - if (get_cmd_decision(cmd_hash)) - return promise_t([this, cmd_hash](promise_t pm){ - pm.resolve(storage->find_cmd(cmd_hash)); - }); - /* otherwise the do_decide will resolve the promise */ - auto it = decision_waiting.find(cmd_hash); - if (it == decision_waiting.end()) - { - promise_t pm{[](promise_t){}}; - it = decision_waiting.insert(std::make_pair(cmd_hash, pm)).first; - } - return it->second; -} - -HotStuffBase::HotStuffBase(uint32_t blk_size, - int32_t parent_limit, - ReplicaID rid, - privkey_bt &&priv_key, - NetAddr listen_addr, - EventContext eb, - pacemaker_bt pmaker): - HotStuffCore(rid, std::move(priv_key), parent_limit), - listen_addr(listen_addr), - blk_size(blk_size), - eb(eb), - pmaker(std::move(pmaker)), - pn(eb), - - fetched(0), delivered(0), - nsent(0), nrecv(0), - part_parent_size(0), - part_fetched(0), - part_delivered(0), - part_decided(0), - part_gened(0), - part_delivery_time(0), - part_delivery_time_min(double_inf), - part_delivery_time_max(0) -{ - if (pmaker == nullptr) - this->pmaker = new PaceMakerDummy(this); - /* register the handlers for msg from replicas */ - pn.reg_handler(PROPOSE, std::bind(&HotStuffBase::propose_handler, this, _1, _2)); - pn.reg_handler(VOTE, std::bind(&HotStuffBase::vote_handler, this, _1, _2)); - pn.reg_handler(QUERY_FETCH_BLK, std::bind(&HotStuffBase::query_fetch_blk_handler, this, _1, _2)); - pn.reg_handler(RESP_FETCH_BLK, std::bind(&HotStuffBase::resp_fetch_blk_handler, this, _1, _2)); - pn.init(listen_addr); -} - -void HotStuffBase::do_broadcast_proposal(const Proposal &prop) { - MsgHotStuff prop_msg; - prop_msg.gen_propose(prop); - for (const auto &replica: pn.all_peers()) - pn.send_msg(prop_msg, replica); -} - -void HotStuffBase::do_vote(ReplicaID last_proposer, const Vote &vote) { - MsgHotStuff vote_msg; - vote_msg.gen_vote(vote); - pmaker->next_proposer(last_proposer) - .then([this, vote_msg](ReplicaID proposer) { - pn.send_msg(vote_msg, get_config().get_addr(proposer)); - }); -} - -void HotStuffBase::do_decide(const command_t &cmd) { - auto it = decision_waiting.find(cmd->get_hash()); - if (it != decision_waiting.end()) - { - it->second.resolve(cmd); - decision_waiting.erase(it); - } -} - -HotStuffBase::~HotStuffBase() {} - -void HotStuffBase::start(bool eb_loop) { - /* ((n - 1) + 1 - 1) / 3 */ - uint32_t nfaulty = pn.all_peers().size() / 3; - if (nfaulty == 0) - LOG_WARN("too few replicas in the system to tolerate any failure"); - on_init(nfaulty); - if (eb_loop) - eb.dispatch(); -} - -} diff --git a/src/entity.cpp b/src/entity.cpp index 1d2e926..80c9cf9 100644 --- a/src/entity.cpp +++ b/src/entity.cpp @@ -1,5 +1,5 @@ #include "hotstuff/entity.h" -#include "hotstuff/core.h" +#include "hotstuff/hotstuff.h" namespace hotstuff { diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp index 74d2f04..9582531 100644 --- a/src/hotstuff.cpp +++ b/src/hotstuff.cpp @@ -1,288 +1,437 @@ -#include <iostream> -#include <cstring> -#include <cassert> -#include <algorithm> -#include <random> -#include <unistd.h> -#include <signal.h> -#include <event2/event.h> - -#include "salticidae/stream.h" -#include "salticidae/util.h" -#include "salticidae/network.h" -#include "salticidae/msg.h" - -#include "hotstuff/promise.hpp" -#include "hotstuff/type.h" -#include "hotstuff/core.h" -#include "hotstuff/entity.h" -#include "hotstuff/util.h" -#include "hotstuff/client.h" - -using promise::promise_t; -using salticidae::NetAddr; -using salticidae::MsgNetwork; -using salticidae::ClientNetwork; -using salticidae::Event; -using salticidae::ElapsedTime; -using salticidae::Config; -using salticidae::_1; -using salticidae::_2; +#include "hotstuff/hotstuff.h" + using salticidae::static_pointer_cast; -using salticidae::get_hash; -using salticidae::trim_all; -using salticidae::split; - -using hotstuff::HotStuffError; -using hotstuff::CommandDummy; -using hotstuff::Finality; -using hotstuff::command_t; -using hotstuff::uint256_t; -using hotstuff::bytearray_t; -using hotstuff::DataStream; -using hotstuff::ReplicaID; -using hotstuff::MsgClient; - -using HotStuff = hotstuff::HotStuffSecp256k1; #define LOG_INFO HOTSTUFF_LOG_INFO #define LOG_DEBUG HOTSTUFF_LOG_DEBUG #define LOG_WARN HOTSTUFF_LOG_WARN -#define LOG_ERROR HOTSTUFF_LOG_ERROR - -class HotStuffApp; - -class HotStuffApp: public HotStuff { - double stat_period; - /** libevent handle */ - EventContext eb; - /** network messaging between a replica and its client */ - ClientNetwork<MsgClient> cn; - /** timer object to schedule a periodic printing of system statistics */ - Event ev_stat_timer; - /** the binding address for client RPC */ - NetAddr clisten_addr; - - using conn_client_t = MsgNetwork<MsgClient>::conn_t; - - /** Client */ - /** submits a new command */ - inline void client_request_cmd_handler(const MsgClient &, conn_client_t); - /** checks if a cmd is decided */ - inline void client_check_cmd_handler(const MsgClient &, conn_client_t); - - /** The callback function to print stat */ - inline void print_stat_cb(evutil_socket_t, short); - - command_t parse_cmd(DataStream &s) override { - auto cmd = new CommandDummy(); - s >> *cmd; - return cmd; - } - public: - HotStuffApp(uint32_t blk_size, - int32_t parent_limit, - double stat_period, - ReplicaID idx, - const bytearray_t &raw_privkey, - NetAddr plisten_addr, - NetAddr clisten_addr, - const EventContext &eb); - - void start(); -}; - - -std::pair<std::string, std::string> split_ip_port_cport(const std::string &s) { - auto ret = trim_all(split(s, ";")); - if (ret.size() != 2) - throw std::invalid_argument("invalid cport format"); - return std::make_pair(ret[0], ret[1]); +namespace hotstuff { + +void MsgHotStuff::gen_propose(const Proposal &proposal) { + DataStream s; + set_opcode(PROPOSE); + s << proposal; + set_payload(std::move(s)); } -void signal_handler(int) { - throw HotStuffError("got terminal signal"); +void MsgHotStuff::parse_propose(Proposal &proposal) const { + DataStream(get_payload()) >> proposal; } -BoxObj<HotStuffApp> papp = nullptr; - -int main(int argc, char **argv) { - Config config("hotstuff.conf"); - - ElapsedTime elapsed; - elapsed.start(); - - signal(SIGTERM, signal_handler); - signal(SIGINT, signal_handler); - - auto opt_blk_size = Config::OptValInt::create(1); - auto opt_parent_limit = Config::OptValInt::create(-1); - auto opt_stat_period = Config::OptValDouble::create(10); - auto opt_replicas = Config::OptValStrVec::create(); - auto opt_idx = Config::OptValInt::create(0); - auto opt_client_port = Config::OptValInt::create(-1); - auto opt_privkey = Config::OptValStr::create(); - auto opt_help = Config::OptValFlag::create(false); - - config.add_opt("block-size", opt_blk_size, Config::SET_VAL); - config.add_opt("parent-limit", opt_parent_limit, Config::SET_VAL); - config.add_opt("stat-period", opt_stat_period, Config::SET_VAL); - config.add_opt("replica", opt_replicas, Config::APPEND); - config.add_opt("idx", opt_idx, Config::SET_VAL); - config.add_opt("cport", opt_client_port, Config::SET_VAL); - config.add_opt("privkey", opt_privkey, Config::SET_VAL); - config.add_opt("help", opt_help, Config::SWITCH_ON, 'h', "show this help info"); - - EventContext eb; -#ifndef HOTSTUFF_ENABLE_LOG_DEBUG - try { -#endif - config.parse(argc, argv); - if (opt_help->get()) - { - config.print_help(); - exit(0); - } - auto idx = opt_idx->get(); - auto client_port = opt_client_port->get(); - std::vector<std::pair<std::string, std::string>> replicas; - for (const auto &s: opt_replicas->get()) +void MsgHotStuff::gen_vote(const Vote &vote) { + DataStream s; + set_opcode(VOTE); + s << vote; + set_payload(std::move(s)); +} + +void MsgHotStuff::parse_vote(Vote &vote) const { + DataStream(get_payload()) >> vote; +} + +void MsgHotStuff::gen_qfetchblk(const std::vector<uint256_t> &blk_hashes) { + DataStream s; + set_opcode(QUERY_FETCH_BLK); + gen_hash_list(s, blk_hashes); + set_payload(std::move(s)); +} + +void MsgHotStuff::parse_qfetchblk(std::vector<uint256_t> &blk_hashes) const { + DataStream s(get_payload()); + parse_hash_list(s, blk_hashes); +} + +void MsgHotStuff::gen_rfetchblk(const std::vector<block_t> &blks) { + DataStream s; + set_opcode(RESP_FETCH_BLK); + s << htole((uint32_t)blks.size()); + for (auto blk: blks) s << *blk; + set_payload(std::move(s)); +} + +void MsgHotStuff::parse_rfetchblk(std::vector<block_t> &blks, HotStuffCore *hsc) const { + DataStream s; + uint32_t size; + s >> size; + size = letoh(size); + blks.resize(size); + for (auto &blk: blks) + { + Block _blk; + _blk.unserialize(s, hsc); + if (!_blk.verify(hsc->get_config())) + blk = hsc->storage->add_blk(std::move(_blk)); + else { - auto res = trim_all(split(s, ",")); - if (res.size() != 2) - throw HotStuffError("invalid replica info"); - replicas.push_back(std::make_pair(res[0], res[1])); + blk = nullptr; + LOG_WARN("block is invalid"); } + } +} - if (!(0 <= idx && (size_t)idx < replicas.size())) - throw HotStuffError("replica idx out of range"); - std::string binding_addr = replicas[idx].first; - if (client_port == -1) +void HotStuffBase::add_replica(ReplicaID idx, const NetAddr &addr, + pubkey_bt &&pub_key) { + HotStuffCore::add_replica(idx, addr, std::move(pub_key)); + if (addr != listen_addr) + pn.add_peer(addr); +} + +void HotStuffBase::on_fetch_blk(const block_t &blk) { +#ifdef HOTSTUFF_ENABLE_TX_PROFILE + blk_profiler.get_tx(blk->get_hash()); +#endif + LOG_DEBUG("fetched %.10s", get_hex(blk->get_hash()).c_str()); + part_fetched++; + fetched++; + for (auto cmd: blk->get_cmds()) on_fetch_cmd(cmd); + const uint256_t &blk_hash = blk->get_hash(); + auto it = blk_fetch_waiting.find(blk_hash); + if (it != blk_fetch_waiting.end()) + { + it->second.resolve(blk); + blk_fetch_waiting.erase(it); + } +} + +void HotStuffBase::on_fetch_cmd(const command_t &cmd) { + const uint256_t &cmd_hash = cmd->get_hash(); + auto it = cmd_fetch_waiting.find(cmd_hash); + if (it != cmd_fetch_waiting.end()) + { + it->second.resolve(cmd); + cmd_fetch_waiting.erase(it); + } +} + +void HotStuffBase::on_deliver_blk(const block_t &blk) { + const uint256_t &blk_hash = blk->get_hash(); + bool valid; + /* sanity check: all parents must be delivered */ + for (const auto &p: blk->get_parent_hashes()) + assert(storage->is_blk_delivered(p)); + if ((valid = HotStuffCore::on_deliver_blk(blk))) + { + LOG_DEBUG("block %.10s delivered", + get_hex(blk_hash).c_str()); + part_parent_size += blk->get_parent_hashes().size(); + part_delivered++; + delivered++; + } + else + { + LOG_WARN("dropping invalid block"); + } + + auto it = blk_delivery_waiting.find(blk_hash); + if (it != blk_delivery_waiting.end()) + { + auto &pm = it->second; + if (valid) { - auto p = split_ip_port_cport(binding_addr); - size_t idx; - try { - client_port = stoi(p.second, &idx); - } catch (std::invalid_argument &) { - throw HotStuffError("client port not specified"); - } - } + pm.elapsed.stop(false); + auto sec = pm.elapsed.elapsed_sec; + part_delivery_time += sec; + part_delivery_time_min = std::min(part_delivery_time_min, sec); + part_delivery_time_max = std::max(part_delivery_time_max, sec); - NetAddr plisten_addr{split_ip_port_cport(binding_addr).first}; - - papp = new HotStuffApp(opt_blk_size->get(), - opt_parent_limit->get(), - opt_stat_period->get(), - idx, - hotstuff::from_hex(opt_privkey->get()), - plisten_addr, - NetAddr("0.0.0.0", client_port), - eb); - for (size_t i = 0; i < replicas.size(); i++) + pm.resolve(blk); + } + else { - auto p = split_ip_port_cport(replicas[i].first); - papp->add_replica(i, NetAddr(p.first), - hotstuff::from_hex(replicas[i].second)); + pm.reject(blk); + // TODO: do we need to also free it from storage? } - papp->start(); -#ifndef HOTSTUFF_ENABLE_LOG_DEBUG - } catch (std::exception &e) { - HOTSTUFF_LOG_INFO("exception: %s", e.what()); - elapsed.stop(true); + blk_delivery_waiting.erase(it); } +} + +promise_t HotStuffBase::async_fetch_blk(const uint256_t &blk_hash, + const NetAddr *replica_id, + bool fetch_now) { + if (storage->is_blk_fetched(blk_hash)) + return promise_t([this, &blk_hash](promise_t pm){ + pm.resolve(storage->find_blk(blk_hash)); + }); + auto it = blk_fetch_waiting.find(blk_hash); + if (it == blk_fetch_waiting.end()) + { +#ifdef HOTSTUFF_ENABLE_TX_PROFILE + blk_profiler.rec_tx(blk_hash, false); #endif - return 0; + it = blk_fetch_waiting.insert( + std::make_pair( + blk_hash, + BlockFetchContext(blk_hash, this))).first; + } + if (replica_id != nullptr) + it->second.add_replica(*replica_id, fetch_now); + return static_cast<promise_t &>(it->second); } -HotStuffApp::HotStuffApp(uint32_t blk_size, - int32_t parent_limit, - double stat_period, - ReplicaID idx, - const bytearray_t &raw_privkey, - NetAddr plisten_addr, - NetAddr clisten_addr, - const EventContext &eb): - HotStuff(blk_size, parent_limit, idx, raw_privkey, - plisten_addr, eb), - stat_period(stat_period), - eb(eb), - cn(eb), - clisten_addr(clisten_addr) { - /* register the handlers for msg from clients */ - cn.reg_handler(hotstuff::REQ_CMD, std::bind(&HotStuffApp::client_request_cmd_handler, this, _1, _2)); - cn.reg_handler(hotstuff::CHK_CMD, std::bind(&HotStuffApp::client_check_cmd_handler, this, _1, _2)); - cn.init(clisten_addr); +promise_t HotStuffBase::async_fetch_cmd(const uint256_t &cmd_hash, + const NetAddr *replica_id, + bool fetch_now) { + if (storage->is_cmd_fetched(cmd_hash)) + return promise_t([this, &cmd_hash](promise_t pm){ + pm.resolve(storage->find_cmd(cmd_hash)); + }); + auto it = cmd_fetch_waiting.find(cmd_hash); + if (it == cmd_fetch_waiting.end()) + { + it = cmd_fetch_waiting.insert( + std::make_pair(cmd_hash, CmdFetchContext(cmd_hash, this))).first; + } + if (replica_id != nullptr) + it->second.add_replica(*replica_id, fetch_now); + return static_cast<promise_t &>(it->second); } -void HotStuffApp::client_request_cmd_handler(const MsgClient &msg, conn_client_t conn_) { - auto conn = static_pointer_cast<ClientNetwork<MsgClient>::Conn>(conn_); - const NetAddr addr = conn->get_addr(); - command_t cmd = new CommandDummy(); +promise_t HotStuffBase::async_deliver_blk(const uint256_t &blk_hash, + const NetAddr &replica_id) { + if (storage->is_blk_delivered(blk_hash)) + return promise_t([this, &blk_hash](promise_t pm) { + pm.resolve(storage->find_blk(blk_hash)); + }); + auto it = blk_delivery_waiting.find(blk_hash); + if (it != blk_delivery_waiting.end()) + return static_cast<promise_t &>(it->second); + BlockDeliveryContext pm{[](promise_t){}}; + it = blk_delivery_waiting.insert(std::make_pair(blk_hash, pm)).first; + /* otherwise the on_deliver_batch will resolve */ + async_fetch_blk(blk_hash, &replica_id).then([this, replica_id](block_t blk) { + /* qc_ref should be fetched */ + std::vector<promise_t> pms; + const auto &qc = blk->get_qc(); + if (qc) + pms.push_back(async_fetch_blk(qc->get_blk_hash(), &replica_id)); + /* the parents should be delivered */ + for (const auto &phash: blk->get_parent_hashes()) + pms.push_back(async_deliver_blk(phash, replica_id)); + promise::all(pms).then([this, blk]() { + on_deliver_blk(blk); + }); + }); + return static_cast<promise_t &>(pm); +} + +void HotStuffBase::propose_handler(const MsgHotStuff &msg, conn_t conn_) { + auto conn = static_pointer_cast<PeerNetwork<MsgHotStuff>::Conn>(conn_); + const NetAddr &peer = conn->get_peer(); + Proposal prop(this); + msg.parse_propose(prop); + block_t blk = prop.blk; + promise::all(std::vector<promise_t>{ + async_deliver_blk(prop.bqc_hash, peer), + async_deliver_blk(blk->get_hash(), peer), + }).then([this, prop = std::move(prop)]() { + on_receive_proposal(prop); + }); +} + +void HotStuffBase::vote_handler(const MsgHotStuff &msg, conn_t conn_) { + auto conn = static_pointer_cast<PeerNetwork<MsgHotStuff>::Conn>(conn_); + const NetAddr &peer = conn->get_peer(); + Vote vote(this); + msg.parse_vote(vote); + promise::all(std::vector<promise_t>{ + async_deliver_blk(vote.bqc_hash, peer), + async_deliver_blk(vote.blk_hash, peer) + }).then([this, vote = std::move(vote)]() { + on_receive_vote(vote); + }); +} + +void HotStuffBase::query_fetch_blk_handler(const MsgHotStuff &msg, conn_t conn_) { + auto conn = static_pointer_cast<PeerNetwork<MsgHotStuff>::Conn>(conn_); + const NetAddr replica = conn->get_peer(); + std::vector<uint256_t> blk_hashes; + msg.parse_qfetchblk(blk_hashes); + std::vector<promise_t> pms; - msg.parse_reqcmd(static_cast<CommandDummy &>(*cmd)); + for (const auto &h: blk_hashes) + pms.push_back(async_fetch_blk(h, nullptr)); + promise::all(pms).then([replica, this](const promise::values_t values) { + MsgHotStuff resp; + std::vector<block_t> blks; + for (auto &v: values) + { + auto blk = promise::any_cast<block_t>(v); + blks.push_back(blk); + } + resp.gen_rfetchblk(blks); + pn.send_msg(resp, replica); + }); +} - bool flag = true; -#ifndef HOTSTUFF_DISABLE_TX_VERIFY - flag &= cmd->verify(); -#endif - if (!flag) +void HotStuffBase::resp_fetch_blk_handler(const MsgHotStuff &msg, conn_t) { + std::vector<block_t> blks; + msg.parse_rfetchblk(blks, this); + for (const auto &blk: blks) + if (blk) on_fetch_blk(blk); +} + +void HotStuffBase::print_stat() const { + LOG_INFO("===== begin stats ====="); + LOG_INFO("-------- queues -------"); + LOG_INFO("blk_fetch_waiting: %lu", blk_fetch_waiting.size()); + LOG_INFO("blk_delivery_waiting: %lu", blk_delivery_waiting.size()); + LOG_INFO("cmd_fetch_waiting: %lu", cmd_fetch_waiting.size()); + LOG_INFO("decision_waiting: %lu", decision_waiting.size()); + LOG_INFO("-------- misc ---------"); + LOG_INFO("fetched: %lu", fetched); + LOG_INFO("delivered: %lu", delivered); + LOG_INFO("cmd_cache: %lu", storage->get_cmd_cache_size()); + LOG_INFO("blk_cache: %lu", storage->get_blk_cache_size()); + LOG_INFO("------ misc (10s) -----"); + LOG_INFO("fetched: %lu", part_fetched); + LOG_INFO("delivered: %lu", part_delivered); + LOG_INFO("decided: %lu", part_decided); + LOG_INFO("gened: %lu", part_gened); + LOG_INFO("avg. parent_size: %.3f", + part_delivered ? part_parent_size / double(part_delivered) : 0); + LOG_INFO("delivery time: %.3f avg, %.3f min, %.3f max", + part_delivered ? part_delivery_time / double(part_delivered) : 0, + part_delivery_time_min == double_inf ? 0 : part_delivery_time_min, + part_delivery_time_max); + + part_parent_size = 0; + part_fetched = 0; + part_delivered = 0; + part_decided = 0; + part_gened = 0; + part_delivery_time = 0; + part_delivery_time_min = double_inf; + part_delivery_time_max = 0; + LOG_INFO("-- sent opcode (10s) --"); + auto &sent_op = pn.get_sent_by_opcode(); + for (auto &op: sent_op) { - LOG_WARN("invalid client cmd"); - MsgClient resp; - resp.gen_respcmd(cmd->get_hash(), Finality(-1, uint256_t())); - cn.send_msg(resp, addr); + auto &val = op.second; + LOG_INFO("%02x: %lu, %.2fBpm", op.first, + val.first, val.first ? val.second / double(val.first) : 0); + val.first = val.second = 0; } - else + LOG_INFO("-- recv opcode (10s) --"); + auto &recv_op = pn.get_recv_by_opcode(); + for (auto &op: recv_op) { - const uint256_t cmd_hash = cmd->get_hash(); - add_command(cmd); - /** wait for the decision of tx */ - LOG_DEBUG("processing client cmd %.10s", get_hex(cmd_hash).c_str()); - async_decide(cmd_hash).then([this, addr](command_t cmd) { - MsgClient resp; - resp.gen_respcmd(cmd->get_hash(), cmd->get_finality()); - cn.send_msg(resp, addr); + auto &val = op.second; + LOG_INFO("%02x: %lu, %.2fBpm", op.first, + val.first, val.first ? val.second / double(val.first) : 0); + val.first = val.second = 0; + } + LOG_INFO("--- replica msg. (10s) ---"); + size_t _nsent = 0; + size_t _nrecv = 0; + for (const auto &replica: pn.all_peers()) + { + auto conn = pn.get_peer_conn(replica); + size_t ns = conn->get_nsent(); + size_t nr = conn->get_nrecv(); + conn->clear_nsent(); + conn->clear_nrecv(); + LOG_INFO("%s: %u, %u, %u", + std::string(replica).c_str(), ns, nr, part_fetched_replica[replica]); + _nsent += ns; + _nrecv += nr; + part_fetched_replica[replica] = 0; + } + nsent += _nsent; + nrecv += _nrecv; + LOG_INFO("sent: %lu", _nsent); + LOG_INFO("recv: %lu", _nrecv); + LOG_INFO("--- replica msg. total ---"); + LOG_INFO("sent: %lu", nsent); + LOG_INFO("recv: %lu", nrecv); + LOG_INFO("====== end stats ======"); +} + +promise_t HotStuffBase::async_decide(const uint256_t &cmd_hash) { + if (get_cmd_decision(cmd_hash)) + return promise_t([this, cmd_hash](promise_t pm){ + pm.resolve(storage->find_cmd(cmd_hash)); }); + /* otherwise the do_decide will resolve the promise */ + auto it = decision_waiting.find(cmd_hash); + if (it == decision_waiting.end()) + { + promise_t pm{[](promise_t){}}; + it = decision_waiting.insert(std::make_pair(cmd_hash, pm)).first; } + return it->second; } -void HotStuffApp::client_check_cmd_handler(const MsgClient &msg, conn_client_t conn_) { - auto conn = static_pointer_cast<ClientNetwork<MsgClient>::Conn>(conn_); - const NetAddr addr = conn->get_addr(); - uint256_t cmd_hash; - msg.parse_chkcmd(cmd_hash); - MsgClient resp; - command_t cmd = storage->find_cmd(cmd_hash); - Finality fin; - if (cmd) fin = cmd->get_finality(); - resp.gen_respcmd(cmd_hash, fin); - cn.send_msg(resp, addr); +HotStuffBase::HotStuffBase(uint32_t blk_size, + int32_t parent_limit, + ReplicaID rid, + privkey_bt &&priv_key, + NetAddr listen_addr, + EventContext eb, + pacemaker_bt pmaker): + HotStuffCore(rid, std::move(priv_key), parent_limit), + listen_addr(listen_addr), + blk_size(blk_size), + eb(eb), + pmaker(std::move(pmaker)), + pn(eb), + + fetched(0), delivered(0), + nsent(0), nrecv(0), + part_parent_size(0), + part_fetched(0), + part_delivered(0), + part_decided(0), + part_gened(0), + part_delivery_time(0), + part_delivery_time_min(double_inf), + part_delivery_time_max(0) +{ + if (pmaker == nullptr) + this->pmaker = new PaceMakerDummy(this); + /* register the handlers for msg from replicas */ + pn.reg_handler(PROPOSE, std::bind(&HotStuffBase::propose_handler, this, _1, _2)); + pn.reg_handler(VOTE, std::bind(&HotStuffBase::vote_handler, this, _1, _2)); + pn.reg_handler(QUERY_FETCH_BLK, std::bind(&HotStuffBase::query_fetch_blk_handler, this, _1, _2)); + pn.reg_handler(RESP_FETCH_BLK, std::bind(&HotStuffBase::resp_fetch_blk_handler, this, _1, _2)); + pn.init(listen_addr); } +void HotStuffBase::do_broadcast_proposal(const Proposal &prop) { + MsgHotStuff prop_msg; + prop_msg.gen_propose(prop); + for (const auto &replica: pn.all_peers()) + pn.send_msg(prop_msg, replica); +} -void HotStuffApp::start() { - ev_stat_timer = Event(eb, -1, 0, - std::bind(&HotStuffApp::print_stat_cb, this, _1, _2)); - ev_stat_timer.add_with_timeout(stat_period); - LOG_INFO("** starting the system with parameters **"); - LOG_INFO("blk_size = %lu", blk_size); - LOG_INFO("parent_limit = %d", parent_limit); - LOG_INFO("conns = %lu", HotStuff::size()); - LOG_INFO("** starting the event loop..."); -#ifdef HOTSTUFF_DISABLE_TX_VERIFY - LOG_INFO("!! verification disabled !!"); -#else - LOG_INFO("** verification enabled **"); -#endif - HotStuff::start(); - /* enter the event main loop */ - eb.dispatch(); +void HotStuffBase::do_vote(ReplicaID last_proposer, const Vote &vote) { + MsgHotStuff vote_msg; + vote_msg.gen_vote(vote); + pmaker->next_proposer(last_proposer) + .then([this, vote_msg](ReplicaID proposer) { + pn.send_msg(vote_msg, get_config().get_addr(proposer)); + }); } +void HotStuffBase::do_decide(const command_t &cmd) { + auto it = decision_waiting.find(cmd->get_hash()); + if (it != decision_waiting.end()) + { + it->second.resolve(cmd); + decision_waiting.erase(it); + } +} + +HotStuffBase::~HotStuffBase() {} + +void HotStuffBase::start(bool eb_loop) { + /* ((n - 1) + 1 - 1) / 3 */ + uint32_t nfaulty = pn.all_peers().size() / 3; + if (nfaulty == 0) + LOG_WARN("too few replicas in the system to tolerate any failure"); + on_init(nfaulty); + if (eb_loop) + eb.dispatch(); +} -void HotStuffApp::print_stat_cb(evutil_socket_t, short) { - HotStuff::print_stat(); - HotStuffCore::prune(100); - ev_stat_timer.add_with_timeout(stat_period); } diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp new file mode 100644 index 0000000..e0c9c3c --- /dev/null +++ b/src/hotstuff_app.cpp @@ -0,0 +1,287 @@ +#include <iostream> +#include <cstring> +#include <cassert> +#include <algorithm> +#include <random> +#include <unistd.h> +#include <signal.h> +#include <event2/event.h> + +#include "salticidae/stream.h" +#include "salticidae/util.h" +#include "salticidae/network.h" +#include "salticidae/msg.h" + +#include "hotstuff/promise.hpp" +#include "hotstuff/type.h" +#include "hotstuff/entity.h" +#include "hotstuff/util.h" +#include "hotstuff/client.h" +#include "hotstuff/hotstuff.h" + +using salticidae::MsgNetwork; +using salticidae::ClientNetwork; +using salticidae::ElapsedTime; +using salticidae::Config; +using salticidae::_1; +using salticidae::_2; +using salticidae::static_pointer_cast; +using salticidae::trim_all; +using salticidae::split; + +using hotstuff::Event; +using hotstuff::EventContext; +using hotstuff::NetAddr; +using hotstuff::HotStuffError; +using hotstuff::CommandDummy; +using hotstuff::Finality; +using hotstuff::command_t; +using hotstuff::uint256_t; +using hotstuff::bytearray_t; +using hotstuff::DataStream; +using hotstuff::ReplicaID; +using hotstuff::MsgClient; +using hotstuff::get_hash; +using hotstuff::promise_t; + +using HotStuff = hotstuff::HotStuffSecp256k1; + +#define LOG_INFO HOTSTUFF_LOG_INFO +#define LOG_DEBUG HOTSTUFF_LOG_DEBUG +#define LOG_WARN HOTSTUFF_LOG_WARN +#define LOG_ERROR HOTSTUFF_LOG_ERROR + +class HotStuffApp: public HotStuff { + double stat_period; + /** libevent handle */ + EventContext eb; + /** network messaging between a replica and its client */ + ClientNetwork<MsgClient> cn; + /** timer object to schedule a periodic printing of system statistics */ + Event ev_stat_timer; + /** the binding address for client RPC */ + NetAddr clisten_addr; + + using conn_client_t = MsgNetwork<MsgClient>::conn_t; + + /** Client */ + /** submits a new command */ + inline void client_request_cmd_handler(const MsgClient &, conn_client_t); + /** checks if a cmd is decided */ + inline void client_check_cmd_handler(const MsgClient &, conn_client_t); + + /** The callback function to print stat */ + inline void print_stat_cb(evutil_socket_t, short); + + command_t parse_cmd(DataStream &s) override { + auto cmd = new CommandDummy(); + s >> *cmd; + return cmd; + } + + public: + HotStuffApp(uint32_t blk_size, + int32_t parent_limit, + double stat_period, + ReplicaID idx, + const bytearray_t &raw_privkey, + NetAddr plisten_addr, + NetAddr clisten_addr, + const EventContext &eb); + + void start(); +}; + + +std::pair<std::string, std::string> split_ip_port_cport(const std::string &s) { + auto ret = trim_all(split(s, ";")); + if (ret.size() != 2) + throw std::invalid_argument("invalid cport format"); + return std::make_pair(ret[0], ret[1]); +} + +void signal_handler(int) { + throw HotStuffError("got terminal signal"); +} + +salticidae::BoxObj<HotStuffApp> papp = nullptr; + +int main(int argc, char **argv) { + Config config("hotstuff.conf"); + + ElapsedTime elapsed; + elapsed.start(); + + signal(SIGTERM, signal_handler); + signal(SIGINT, signal_handler); + + auto opt_blk_size = Config::OptValInt::create(1); + auto opt_parent_limit = Config::OptValInt::create(-1); + auto opt_stat_period = Config::OptValDouble::create(10); + auto opt_replicas = Config::OptValStrVec::create(); + auto opt_idx = Config::OptValInt::create(0); + auto opt_client_port = Config::OptValInt::create(-1); + auto opt_privkey = Config::OptValStr::create(); + auto opt_help = Config::OptValFlag::create(false); + + config.add_opt("block-size", opt_blk_size, Config::SET_VAL); + config.add_opt("parent-limit", opt_parent_limit, Config::SET_VAL); + config.add_opt("stat-period", opt_stat_period, Config::SET_VAL); + config.add_opt("replica", opt_replicas, Config::APPEND); + config.add_opt("idx", opt_idx, Config::SET_VAL); + config.add_opt("cport", opt_client_port, Config::SET_VAL); + config.add_opt("privkey", opt_privkey, Config::SET_VAL); + config.add_opt("help", opt_help, Config::SWITCH_ON, 'h', "show this help info"); + + EventContext eb; +#ifndef HOTSTUFF_ENABLE_LOG_DEBUG + try { +#endif + config.parse(argc, argv); + if (opt_help->get()) + { + config.print_help(); + exit(0); + } + auto idx = opt_idx->get(); + auto client_port = opt_client_port->get(); + std::vector<std::pair<std::string, std::string>> replicas; + for (const auto &s: opt_replicas->get()) + { + auto res = trim_all(split(s, ",")); + if (res.size() != 2) + throw HotStuffError("invalid replica info"); + replicas.push_back(std::make_pair(res[0], res[1])); + } + + if (!(0 <= idx && (size_t)idx < replicas.size())) + throw HotStuffError("replica idx out of range"); + std::string binding_addr = replicas[idx].first; + if (client_port == -1) + { + auto p = split_ip_port_cport(binding_addr); + size_t idx; + try { + client_port = stoi(p.second, &idx); + } catch (std::invalid_argument &) { + throw HotStuffError("client port not specified"); + } + } + + NetAddr plisten_addr{split_ip_port_cport(binding_addr).first}; + + papp = new HotStuffApp(opt_blk_size->get(), + opt_parent_limit->get(), + opt_stat_period->get(), + idx, + hotstuff::from_hex(opt_privkey->get()), + plisten_addr, + NetAddr("0.0.0.0", client_port), + eb); + for (size_t i = 0; i < replicas.size(); i++) + { + auto p = split_ip_port_cport(replicas[i].first); + papp->add_replica(i, NetAddr(p.first), + hotstuff::from_hex(replicas[i].second)); + } + papp->start(); +#ifndef HOTSTUFF_ENABLE_LOG_DEBUG + } catch (std::exception &e) { + HOTSTUFF_LOG_INFO("exception: %s", e.what()); + elapsed.stop(true); + } +#endif + return 0; +} + +HotStuffApp::HotStuffApp(uint32_t blk_size, + int32_t parent_limit, + double stat_period, + ReplicaID idx, + const bytearray_t &raw_privkey, + NetAddr plisten_addr, + NetAddr clisten_addr, + const EventContext &eb): + HotStuff(blk_size, parent_limit, idx, raw_privkey, + plisten_addr, eb), + stat_period(stat_period), + eb(eb), + cn(eb), + clisten_addr(clisten_addr) { + /* register the handlers for msg from clients */ + cn.reg_handler(hotstuff::REQ_CMD, std::bind(&HotStuffApp::client_request_cmd_handler, this, _1, _2)); + cn.reg_handler(hotstuff::CHK_CMD, std::bind(&HotStuffApp::client_check_cmd_handler, this, _1, _2)); + cn.init(clisten_addr); +} + +void HotStuffApp::client_request_cmd_handler(const MsgClient &msg, conn_client_t conn_) { + auto conn = static_pointer_cast<ClientNetwork<MsgClient>::Conn>(conn_); + const NetAddr addr = conn->get_addr(); + command_t cmd = new CommandDummy(); + std::vector<promise_t> pms; + msg.parse_reqcmd(static_cast<CommandDummy &>(*cmd)); + + bool flag = true; +#ifndef HOTSTUFF_DISABLE_TX_VERIFY + flag &= cmd->verify(); +#endif + if (!flag) + { + LOG_WARN("invalid client cmd"); + MsgClient resp; + resp.gen_respcmd(cmd->get_hash(), Finality(-1, uint256_t())); + cn.send_msg(resp, addr); + } + else + { + const uint256_t cmd_hash = cmd->get_hash(); + add_command(cmd); + /** wait for the decision of tx */ + LOG_DEBUG("processing client cmd %.10s", get_hex(cmd_hash).c_str()); + async_decide(cmd_hash).then([this, addr](command_t cmd) { + MsgClient resp; + resp.gen_respcmd(cmd->get_hash(), cmd->get_finality()); + cn.send_msg(resp, addr); + }); + } +} + +void HotStuffApp::client_check_cmd_handler(const MsgClient &msg, conn_client_t conn_) { + auto conn = static_pointer_cast<ClientNetwork<MsgClient>::Conn>(conn_); + const NetAddr addr = conn->get_addr(); + uint256_t cmd_hash; + msg.parse_chkcmd(cmd_hash); + MsgClient resp; + command_t cmd = storage->find_cmd(cmd_hash); + Finality fin; + if (cmd) fin = cmd->get_finality(); + resp.gen_respcmd(cmd_hash, fin); + cn.send_msg(resp, addr); +} + + +void HotStuffApp::start() { + ev_stat_timer = Event(eb, -1, 0, + std::bind(&HotStuffApp::print_stat_cb, this, _1, _2)); + ev_stat_timer.add_with_timeout(stat_period); + LOG_INFO("** starting the system with parameters **"); + LOG_INFO("blk_size = %lu", blk_size); + LOG_INFO("parent_limit = %d", parent_limit); + LOG_INFO("conns = %lu", HotStuff::size()); + LOG_INFO("** starting the event loop..."); +#ifdef HOTSTUFF_DISABLE_TX_VERIFY + LOG_INFO("!! verification disabled !!"); +#else + LOG_INFO("** verification enabled **"); +#endif + HotStuff::start(); + /* enter the event main loop */ + eb.dispatch(); +} + + +void HotStuffApp::print_stat_cb(evutil_socket_t, short) { + HotStuff::print_stat(); + HotStuffCore::prune(100); + ev_stat_timer.add_with_timeout(stat_period); +} diff --git a/src/hotstuff_client.cpp b/src/hotstuff_client.cpp index 6b26c9c..8ee90d7 100644 --- a/src/hotstuff_client.cpp +++ b/src/hotstuff_client.cpp @@ -8,16 +8,16 @@ #include "hotstuff/type.h" #include "hotstuff/client.h" -using salticidae::NetAddr; using salticidae::Config; using salticidae::ElapsedTime; -using salticidae::EventContext; -using salticidae::Event; -using salticidae::bytearray_t; using salticidae::trim_all; using salticidae::split; +using hotstuff::NetAddr; +using hotstuff::EventContext; +using hotstuff::Event; using hotstuff::uint256_t; +using hotstuff::bytearray_t; using hotstuff::MsgClient; using hotstuff::CommandDummy; using hotstuff::Finality; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 60ffdc3..9e44cbe 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,4 +1,4 @@ include_directories(../src/ ../salticidae/include/) add_executable(test_secp256k1 test_secp256k1.cpp) -target_link_libraries(test_secp256k1 hotstuff) +target_link_libraries(test_secp256k1 hotstuff_static) |