diff options
-rw-r--r-- | README.rst | 9 | ||||
-rw-r--r-- | include/hotstuff/consensus.h | 8 | ||||
-rw-r--r-- | include/hotstuff/entity.h | 22 | ||||
-rw-r--r-- | include/hotstuff/hotstuff.h | 42 | ||||
-rw-r--r-- | include/hotstuff/liveness.h | 6 | ||||
-rw-r--r-- | include/hotstuff/type.h | 8 | ||||
m--------- | salticidae | 0 | ||||
-rwxr-xr-x | scripts/run_demo_client.sh | 5 | ||||
-rw-r--r-- | src/consensus.cpp | 10 | ||||
-rw-r--r-- | src/entity.cpp | 16 | ||||
-rw-r--r-- | src/hotstuff.cpp | 55 |
11 files changed, 98 insertions, 83 deletions
@@ -63,7 +63,14 @@ section may be incomplete and subject to changes. make # start 4 demo replicas with scripts/run_demo.sh - # start the demo client with scripts/run_demo_client.sh + # then, start the demo client with scripts/run_demo_client.sh + + + # Fault tolerance: + # Try to run run_demo.sh first and then run_demo_client.sh, then use Ctrl-C + # to terminate the proposing replica (e.g. replica 0). Leader rotation will + # be scheduled. Try to kill and run run_demo_client.sh again, new commands + # should still get through (be replicated) once the new leader becomes stable. TODO ==== diff --git a/include/hotstuff/consensus.h b/include/hotstuff/consensus.h index 8a4f5f0..e2f7cfc 100644 --- a/include/hotstuff/consensus.h +++ b/include/hotstuff/consensus.h @@ -142,7 +142,7 @@ class HotStuffCore { public: /** Add a replica to the current configuration. This should only be called * before running HotStuffCore protocol. */ - void add_replica(ReplicaID rid, const NetAddr &addr, pubkey_bt &&pub_key); + void add_replica(ReplicaID rid, const PeerId &peer_id, pubkey_bt &&pub_key); /** Try to prune blocks lower than last committed height - staleness. */ void prune(uint32_t staleness); @@ -201,7 +201,7 @@ struct Proposal: public Serializable { s << "<proposal " << "rid=" << std::to_string(proposer) << " " << "blk=" << get_hex10(blk->get_hash()) << ">"; - return std::move(s); + return s; } }; @@ -261,7 +261,7 @@ struct Vote: public Serializable { s << "<vote " << "rid=" << std::to_string(voter) << " " << "blk=" << get_hex10(blk_hash) << ">"; - return std::move(s); + return s; } }; @@ -307,7 +307,7 @@ struct Finality: public Serializable { << "cmd_height=" << std::to_string(cmd_height) << " " << "cmd=" << get_hex10(cmd_hash) << " " << "blk=" << get_hex10(blk_hash) << ">"; - return std::move(s); + return s; } }; diff --git a/include/hotstuff/entity.h b/include/hotstuff/entity.h index b3bf6a8..dea980d 100644 --- a/include/hotstuff/entity.h +++ b/include/hotstuff/entity.h @@ -39,20 +39,20 @@ enum EntityType { struct ReplicaInfo { ReplicaID id; - salticidae::NetAddr addr; + salticidae::PeerId peer_id; pubkey_bt pubkey; ReplicaInfo(ReplicaID id, - const salticidae::NetAddr &addr, + const salticidae::PeerId &peer_id, pubkey_bt &&pubkey): - id(id), addr(addr), pubkey(std::move(pubkey)) {} + id(id), peer_id(peer_id), pubkey(std::move(pubkey)) {} ReplicaInfo(const ReplicaInfo &other): - id(other.id), addr(other.addr), + id(other.id), peer_id(other.peer_id), pubkey(other.pubkey->clone()) {} ReplicaInfo(ReplicaInfo &&other): - id(other.id), addr(other.addr), + id(other.id), peer_id(other.peer_id), pubkey(std::move(other.pubkey)) {} }; @@ -82,8 +82,8 @@ class ReplicaConfig { return *(get_info(rid).pubkey); } - const salticidae::NetAddr &get_addr(ReplicaID rid) const { - return get_info(rid).addr; + const salticidae::PeerId &get_peer_id(ReplicaID rid) const { + return get_info(rid).peer_id; } }; @@ -101,7 +101,7 @@ class Command: public Serializable { virtual operator std::string () const { DataStream s; s << "<cmd id=" << get_hex10(get_hash()) << ">"; - return std::move(s); + return s; } }; @@ -113,7 +113,7 @@ get_hashes(const std::vector<Hashable> &plist) { std::vector<uint256_t> hashes; for (const auto &p: plist) hashes.push_back(p->get_hash()); - return std::move(hashes); + return hashes; } class Block { @@ -142,7 +142,7 @@ class Block { delivered(false), decision(0) {} Block(bool delivered, int8_t decision): - qc(nullptr), + qc(new QuorumCertDummy()), hash(salticidae::get_hash(*this)), qc_ref(nullptr), self_qc(nullptr), height(0), @@ -209,7 +209,7 @@ class Block { << "height=" << std::to_string(height) << " " << "parent=" << get_hex10(parent_hashes[0]) << " " << "qc_ref=" << (qc_ref ? get_hex10(qc_ref->get_hash()) : "null") << ">"; - return std::move(s); + return s; } }; diff --git a/include/hotstuff/hotstuff.h b/include/hotstuff/hotstuff.h index 07f69d9..33b673f 100644 --- a/include/hotstuff/hotstuff.h +++ b/include/hotstuff/hotstuff.h @@ -89,7 +89,7 @@ class FetchContext: public promise_t { HotStuffBase *hs; MsgReqBlock fetch_msg; const uint256_t ent_hash; - std::unordered_set<NetAddr> replica_ids; + std::unordered_set<PeerId> replicas; inline void timeout_cb(TimerEvent &); public: FetchContext(const FetchContext &) = delete; @@ -99,9 +99,9 @@ class FetchContext: public promise_t { FetchContext(const uint256_t &ent_hash, HotStuffBase *hs); ~FetchContext() {} - inline void send(const NetAddr &replica_id); + inline void send(const PeerId &replica); inline void reset_timeout(); - inline void add_replica(const NetAddr &replica_id, bool fetch_now = true); + inline void add_replica(const PeerId &replica, bool fetch_now = true); }; class BlockDeliveryContext: public promise_t { @@ -142,7 +142,7 @@ class HotStuffBase: public HotStuffCore { EventContext ec; salticidae::ThreadCall tcall; VeriPool vpool; - std::vector<NetAddr> peers; + std::vector<PeerId> peers; private: /** whether libevent handle is owned by itself */ @@ -176,11 +176,11 @@ class HotStuffBase: public HotStuffCore { mutable double part_delivery_time; mutable double part_delivery_time_min; mutable double part_delivery_time_max; - mutable std::unordered_map<const NetAddr, uint32_t> part_fetched_replica; + mutable std::unordered_map<const PeerId, uint32_t> part_fetched_replica; void on_fetch_cmd(const command_t &cmd); void on_fetch_blk(const block_t &blk); - void on_deliver_blk(const block_t &blk); + bool on_deliver_blk(const block_t &blk); /** deliver consensus message: <propose> */ inline void propose_handler(MsgPropose &&, const Net::conn_t &); @@ -235,11 +235,11 @@ class HotStuffBase: public HotStuffCore { /* Helper functions */ /** Returns a promise resolved (with command_t cmd) when Command is fetched. */ - promise_t async_fetch_cmd(const uint256_t &cmd_hash, const NetAddr *replica_id, bool fetch_now = true); + promise_t async_fetch_cmd(const uint256_t &cmd_hash, const PeerId *replica, bool fetch_now = true); /** Returns a promise resolved (with block_t blk) when Block is fetched. */ - promise_t async_fetch_blk(const uint256_t &blk_hash, const NetAddr *replica_id, bool fetch_now = true); + promise_t async_fetch_blk(const uint256_t &blk_hash, const PeerId *replica, bool fetch_now = true); /** Returns a promise resolved (with block_t blk) when Block is delivered (i.e. prefix is fetched). */ - promise_t async_deliver_blk(const uint256_t &blk_hash, const NetAddr &replica_id); + promise_t async_deliver_blk(const uint256_t &blk_hash, const PeerId &replica); }; /** HotStuff protocol (templated by cryptographic implementation). */ @@ -316,7 +316,7 @@ FetchContext<ent_type>::FetchContext(FetchContext && other): hs(other.hs), fetch_msg(std::move(other.fetch_msg)), ent_hash(other.ent_hash), - replica_ids(std::move(other.replica_ids)) { + replicas(std::move(other.replicas)) { other.timeout.del(); timeout = TimerEvent(hs->ec, std::bind(&FetchContext::timeout_cb, this, _1)); @@ -326,16 +326,16 @@ FetchContext<ent_type>::FetchContext(FetchContext && other): template<> inline void FetchContext<ENT_TYPE_CMD>::timeout_cb(TimerEvent &) { HOTSTUFF_LOG_WARN("cmd fetching %.10s timeout", get_hex(ent_hash).c_str()); - for (const auto &replica_id: replica_ids) - send(replica_id); + for (const auto &replica: replicas) + send(replica); reset_timeout(); } template<> inline void FetchContext<ENT_TYPE_BLK>::timeout_cb(TimerEvent &) { HOTSTUFF_LOG_WARN("block fetching %.10s timeout", get_hex(ent_hash).c_str()); - for (const auto &replica_id: replica_ids) - send(replica_id); + for (const auto &replica: replicas) + send(replica); reset_timeout(); } @@ -352,9 +352,9 @@ FetchContext<ent_type>::FetchContext( } template<EntityType ent_type> -void FetchContext<ent_type>::send(const NetAddr &replica_id) { - hs->part_fetched_replica[replica_id]++; - hs->pn.send_msg(fetch_msg, replica_id); +void FetchContext<ent_type>::send(const PeerId &replica) { + hs->part_fetched_replica[replica]++; + hs->pn.send_msg(fetch_msg, replica); } template<EntityType ent_type> @@ -363,10 +363,10 @@ void FetchContext<ent_type>::reset_timeout() { } template<EntityType ent_type> -void FetchContext<ent_type>::add_replica(const NetAddr &replica_id, bool fetch_now) { - if (replica_ids.empty() && fetch_now) - send(replica_id); - replica_ids.insert(replica_id); +void FetchContext<ent_type>::add_replica(const PeerId &replica, bool fetch_now) { + if (replicas.empty() && fetch_now) + send(replica); + replicas.insert(replica); } } diff --git a/include/hotstuff/liveness.h b/include/hotstuff/liveness.h index 452ecdc..eff1406 100644 --- a/include/hotstuff/liveness.h +++ b/include/hotstuff/liveness.h @@ -124,7 +124,7 @@ class PMHighTail: public virtual PaceMaker { // if (!--nparents) break; // } // } - return std::move(parents); + return parents; } }; @@ -182,7 +182,7 @@ class PMWaitQC: public virtual PaceMaker { promise_t pm; pending_beats.push(pm); schedule_next(); - return std::move(pm); + return pm; } promise_t beat_resp(ReplicaID last_proposer) override { @@ -393,7 +393,7 @@ class PMRoundRobinProposer: virtual public PaceMaker { promise_t pm; pending_beats.push(pm); proposer_schedule_next(); - return std::move(pm); + return pm; } else return promise_t([proposer=proposer](promise_t &pm) { diff --git a/include/hotstuff/type.h b/include/hotstuff/type.h index 07c1e72..d895b73 100644 --- a/include/hotstuff/type.h +++ b/include/hotstuff/type.h @@ -21,6 +21,7 @@ #include "salticidae/event.h" #include "salticidae/ref.h" #include "salticidae/netaddr.h" +#include "salticidae/network.h" #include "salticidae/stream.h" #include "salticidae/type.h" #include "salticidae/util.h" @@ -38,19 +39,16 @@ using salticidae::letoh; using salticidae::get_hex; using salticidae::from_hex; using salticidae::bytearray_t; +using salticidae::get_hex10; using salticidae::get_hash; using salticidae::NetAddr; +using salticidae::PeerId; using salticidae::TimerEvent; using salticidae::FdEvent; using salticidae::EventContext; using promise::promise_t; -template<typename SerialType> -inline std::string get_hex10(const SerialType &x) { - return get_hex(x).substr(0, 10); -} - class HotStuffError: public salticidae::SalticidaeError { public: template<typename... Args> diff --git a/salticidae b/salticidae -Subproject 2a99baafdcac46931b00a9ef9e77340dbc319b5 +Subproject 53c1165645ad4fa29fbab68440a69881922c28f diff --git a/scripts/run_demo_client.sh b/scripts/run_demo_client.sh index f27b44d..c124d5d 100755 --- a/scripts/run_demo_client.sh +++ b/scripts/run_demo_client.sh @@ -1,2 +1,7 @@ #!/bin/bash +# Try to run run_demo.sh first and then this script, then use Ctrl-C to +# terminate the proposing replica (e.g. replica 0). Leader rotation will be +# scheduled. Try to kill and run this script again, new commands should still +# get through (be replicated) once the new leader becomes stable. + ./examples/hotstuff-client --idx 0 --iter -1 --max-async 4 diff --git a/src/consensus.cpp b/src/consensus.cpp index 7a577c5..9de7cc2 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -53,7 +53,7 @@ block_t HotStuffCore::get_delivered_blk(const uint256_t &blk_hash) { block_t blk = storage->find_blk(blk_hash); if (blk == nullptr || !blk->delivered) throw std::runtime_error("block not delivered"); - return std::move(blk); + return blk; } bool HotStuffCore::on_deliver_blk(const block_t &blk) { @@ -279,10 +279,10 @@ void HotStuffCore::prune(uint32_t staleness) { } } -void HotStuffCore::add_replica(ReplicaID rid, const NetAddr &addr, +void HotStuffCore::add_replica(ReplicaID rid, const PeerId &peer_id, pubkey_bt &&pub_key) { - config.add_replica(rid, - ReplicaInfo(rid, addr, std::move(pub_key))); + config.add_replica(rid, + ReplicaInfo(rid, peer_id, std::move(pub_key))); b0->voted.insert(rid); } @@ -351,7 +351,7 @@ HotStuffCore::operator std::string () const { << "b_exec=" << get_hex10(b_exec->get_hash()) << " " << "vheight=" << std::to_string(vheight) << " " << "tails=" << std::to_string(tails.size()) << ">"; - return std::move(s); + return s; } } diff --git a/src/entity.cpp b/src/entity.cpp index 59febac..649ce19 100644 --- a/src/entity.cpp +++ b/src/entity.cpp @@ -26,16 +26,11 @@ void Block::serialize(DataStream &s) const { s << htole((uint32_t)cmds.size()); for (auto cmd: cmds) s << cmd; - if (qc) - s << (uint8_t)1 << *qc; - else - s << (uint8_t)0; - s << htole((uint32_t)extra.size()) << extra; + s << *qc << htole((uint32_t)extra.size()) << extra; } void Block::unserialize(DataStream &s, HotStuffCore *hsc) { uint32_t n; - uint8_t flag; s >> n; n = letoh(n); parent_hashes.resize(n); @@ -48,8 +43,7 @@ void Block::unserialize(DataStream &s, HotStuffCore *hsc) { s >> cmd; // for (auto &cmd: cmds) // cmd = hsc->parse_cmd(s); - s >> flag; - qc = flag ? hsc->parse_quorum_cert(s) : nullptr; + qc = hsc->parse_quorum_cert(s); s >> n; n = letoh(n); if (n == 0) @@ -63,12 +57,12 @@ void Block::unserialize(DataStream &s, HotStuffCore *hsc) { } bool Block::verify(const HotStuffCore *hsc) const { - return qc && qc->verify(hsc->get_config()); + if (qc->get_obj_hash() == hsc->get_genesis()->get_hash()) + return true; + return qc->verify(hsc->get_config()); } promise_t Block::verify(const HotStuffCore *hsc, VeriPool &vpool) const { - if (!qc) - return promise_t([](promise_t &pm) { pm.resolve(false); }); if (qc->get_obj_hash() == hsc->get_genesis()->get_hash()) return promise_t([](promise_t &pm) { pm.resolve(true); }); return qc->verify(hsc->get_config(), vpool); diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp index b584af0..ad2ed55 100644 --- a/src/hotstuff.cpp +++ b/src/hotstuff.cpp @@ -97,7 +97,7 @@ void HotStuffBase::on_fetch_blk(const block_t &blk) { } } -void HotStuffBase::on_deliver_blk(const block_t &blk) { +bool HotStuffBase::on_deliver_blk(const block_t &blk) { const uint256_t &blk_hash = blk->get_hash(); bool valid; /* sanity check: all parents must be delivered */ @@ -116,6 +116,7 @@ void HotStuffBase::on_deliver_blk(const block_t &blk) { LOG_WARN("dropping invalid block"); } + bool res = true; auto it = blk_delivery_waiting.find(blk_hash); if (it != blk_delivery_waiting.end()) { @@ -133,14 +134,16 @@ void HotStuffBase::on_deliver_blk(const block_t &blk) { else { pm.reject(blk); + res = false; // TODO: do we need to also free it from storage? } blk_delivery_waiting.erase(it); } + return res; } promise_t HotStuffBase::async_fetch_blk(const uint256_t &blk_hash, - const NetAddr *replica_id, + const PeerId *replica, bool fetch_now) { if (storage->is_blk_fetched(blk_hash)) return promise_t([this, &blk_hash](promise_t pm){ @@ -157,13 +160,13 @@ promise_t HotStuffBase::async_fetch_blk(const uint256_t &blk_hash, blk_hash, BlockFetchContext(blk_hash, this))).first; } - if (replica_id != nullptr) - it->second.add_replica(*replica_id, fetch_now); + if (replica != nullptr) + it->second.add_replica(*replica, fetch_now); return static_cast<promise_t &>(it->second); } promise_t HotStuffBase::async_deliver_blk(const uint256_t &blk_hash, - const NetAddr &replica_id) { + const PeerId &replica) { if (storage->is_blk_delivered(blk_hash)) return promise_t([this, &blk_hash](promise_t pm) { pm.resolve(storage->find_blk(blk_hash)); @@ -174,26 +177,30 @@ promise_t HotStuffBase::async_deliver_blk(const uint256_t &blk_hash, BlockDeliveryContext pm{[](promise_t){}}; it = blk_delivery_waiting.insert(std::make_pair(blk_hash, pm)).first; /* otherwise the on_deliver_batch will resolve */ - async_fetch_blk(blk_hash, &replica_id).then([this, replica_id](block_t blk) { + async_fetch_blk(blk_hash, &replica).then([this, replica](block_t blk) { /* qc_ref should be fetched */ std::vector<promise_t> pms; const auto &qc = blk->get_qc(); - if (qc) - pms.push_back(async_fetch_blk(qc->get_obj_hash(), &replica_id)); + assert(qc); + if (blk == get_genesis()) + pms.push_back(promise_t([](promise_t &pm){ pm.resolve(true); })); + else + pms.push_back(blk->verify(this, vpool)); + pms.push_back(async_fetch_blk(qc->get_obj_hash(), &replica)); /* the parents should be delivered */ for (const auto &phash: blk->get_parent_hashes()) - pms.push_back(async_deliver_blk(phash, replica_id)); - if (blk != get_genesis()) - pms.push_back(blk->verify(this, vpool)); - promise::all(pms).then([this, blk]() { - on_deliver_blk(blk); + pms.push_back(async_deliver_blk(phash, replica)); + promise::all(pms).then([this, blk](const promise::values_t values) { + auto ret = promise::any_cast<bool>(values[0]) && this->on_deliver_blk(blk); + if (!ret) + HOTSTUFF_LOG_WARN("verification failed during async delivery"); }); }); return static_cast<promise_t &>(pm); } void HotStuffBase::propose_handler(MsgPropose &&msg, const Net::conn_t &conn) { - const NetAddr &peer = conn->get_peer_addr(); + const PeerId &peer = conn->get_peer_id(); if (peer.is_null()) return; msg.postponed_parse(this); auto &prop = msg.proposal; @@ -207,7 +214,7 @@ void HotStuffBase::propose_handler(MsgPropose &&msg, const Net::conn_t &conn) { } void HotStuffBase::vote_handler(MsgVote &&msg, const Net::conn_t &conn) { - const NetAddr &peer = conn->get_peer_addr(); + const auto &peer = conn->get_peer_id(); if (peer.is_null()) return; msg.postponed_parse(this); //auto &vote = msg.vote; @@ -224,7 +231,7 @@ void HotStuffBase::vote_handler(MsgVote &&msg, const Net::conn_t &conn) { } void HotStuffBase::req_blk_handler(MsgReqBlock &&msg, const Net::conn_t &conn) { - const NetAddr replica = conn->get_peer_addr(); + const PeerId replica = conn->get_peer_id(); if (replica.is_null()) return; auto &blk_hashes = msg.blk_hashes; std::vector<promise_t> pms; @@ -302,7 +309,7 @@ void HotStuffBase::print_stat() const { size_t nrb = conn->get_nrecvb(); conn->clear_msgstat(); LOG_INFO("%s: %u(%u), %u(%u), %u", - std::string(replica).c_str(), ns, nsb, nr, nrb, part_fetched_replica[replica]); + get_hex10(replica).c_str(), ns, nsb, nr, nrb, part_fetched_replica[replica]); _nsent += ns; _nrecv += nr; part_fetched_replica[replica] = 0; @@ -372,7 +379,7 @@ void HotStuffBase::do_vote(ReplicaID last_proposer, const Vote &vote) { //on_receive_vote(vote); } else - pn.send_msg(MsgVote(vote), get_config().get_addr(proposer)); + pn.send_msg(MsgVote(vote), get_config().get_peer_id(proposer)); }); } @@ -399,12 +406,16 @@ void HotStuffBase::start( for (size_t i = 0; i < replicas.size(); i++) { auto &addr = std::get<0>(replicas[i]); - HotStuffCore::add_replica(i, addr, std::move(std::get<1>(replicas[i]))); - valid_tls_certs.insert(std::move(std::get<2>(replicas[i]))); + auto cert_hash = std::move(std::get<2>(replicas[i])); + valid_tls_certs.insert(cert_hash); + salticidae::PeerId peer{cert_hash}; + HotStuffCore::add_replica(i, peer, std::move(std::get<1>(replicas[i]))); if (addr != listen_addr) { - peers.push_back(addr); - pn.add_peer(addr); + peers.push_back(peer); + pn.add_peer(peer); + pn.set_peer_addr(peer, addr); + pn.conn_peer(peer); } } |