From 4c42ac64c2ff821306609898a58126f2da32c7d4 Mon Sep 17 00:00:00 2001 From: Determinant Date: Thu, 2 Aug 2018 20:26:21 -0400 Subject: WIP: sticky proposer pacemaker --- include/hotstuff/consensus.h | 4 + include/hotstuff/liveness.h | 206 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 210 insertions(+) (limited to 'include') diff --git a/include/hotstuff/consensus.h b/include/hotstuff/consensus.h index 1ff8f79..a373a2b 100644 --- a/include/hotstuff/consensus.h +++ b/include/hotstuff/consensus.h @@ -31,6 +31,7 @@ class HotStuffCore { /* === async event queues === */ std::unordered_map qc_waiting; promise_t propose_waiting; + promise_t receive_proposal_waiting; block_t get_delivered_blk(const uint256_t &blk_hash); void sanity_check_delivered(const block_t &blk); @@ -38,6 +39,7 @@ class HotStuffCore { bool update(const uint256_t &bqc_hash); void on_qc_finish(const block_t &blk); void on_propose_(const block_t &blk); + void on_deliver_blk_(const block_t &blk); protected: ReplicaID id; /**< identity of the replica itself */ @@ -125,6 +127,8 @@ class HotStuffCore { promise_t async_qc_finish(const block_t &blk); /** Get a promise resolved when a new block is proposed. */ promise_t async_wait_propose(); + /** Get a promise resolved when a new proposal is received. */ + promise_t async_wait_receive_proposal(); /* Other useful functions */ block_t get_genesis() { return b0; } diff --git a/include/hotstuff/liveness.h b/include/hotstuff/liveness.h index a625f0d..e71fda6 100644 --- a/include/hotstuff/liveness.h +++ b/include/hotstuff/liveness.h @@ -68,6 +68,7 @@ class PMWaitQC: public virtual PaceMaker { block_t last_proposed; bool locked; + protected: void schedule_next() { if (!pending_beats.empty() && !locked) { @@ -142,6 +143,211 @@ class PaceMakerDummyFixed: public PaceMakerDummy { } }; +/** + * Simple long-standing proposer liveness gadget. + * There are three roles for each replica: proposer, candidate and follower. + * + * For a proposer, it proposes a new block and refrains itself from proposing + * the next block unless it receives the QC for the previous block. It will + * give up the leadership and turn into a candidate when it hasn't seen such QC + * for a while. + * + * For a follower, it never proposes any block, but keeps a timer for the QC + * for the block last proposed by the proposer (the proposer it believes to + * be). When it times out without seeing such QC, the follower turns into a + * candidate. + * + * For a candidate, it periodically proposes empty blocks to synchronize the + * preferred branch, with randomized timeout, and check for any new QC. Once it + * sees such new QC, if the QC is given by itself, it becomes the proposer, + * otherwise yields to the creator of the QC as a follower. + */ +class PMStickyProposer: public PMWaitQC { + enum { + PROPOSER, + FOLLOWER, + CANDIDATE + } role; + double qc_timeout; + double candidate_timeout; + EventContext eb; + /** QC timer or randomized timeout */ + Event timer; + block_t last_proposed; + /** the proposer it believes when it is a follower */ + ReplicaID proposer; + + /* extra state needed for a proposer */ + std::queue pending_beats; + bool locked; + + /* extra state needed for a candidate */ + std::unordered_map> last_proposed_by; + + promise_t pm_wait_receive_proposal; + promise_t pm_wait_propose; + promise_t pm_qc_finish; + + void clear_promises() { + pm_wait_receive_proposal.reject(); + pm_wait_propose.reject(); + pm_qc_finish.reject(); + for (auto &p: last_proposed_by) + p.second.reject(); + last_proposed_by.clear(); + } + + void reg_follower_receive_proposal() { + pm_wait_receive_proposal = + hsc->async_wait_receive_proposal().then( + std::bind(&PMStickyProposer::follower_receive_proposal, this, _1, _2)); + } + + void follower_receive_proposal(const Proposal &prop) { + if (prop.proposer == proposer) + { + auto qc_ref = prop.blk->qc_ref; + if (last_proposed) + { + if (qc_ref != last_proposed) + to_candidate(); /* proposer misbehave */ + } + last_proposed = prop.blk; + /* reset QC timer */ + timer.del(); + timer.add_with_timeout(qc_timeout); + } + reg_follower_receive_proposal(); + } + + void proposer_schedule_next() { + if (!pending_beats.empty() && !locked) + { + auto pm = pending_beats.front(); + pending_beats.pop(); + pm_qc_finish = + hsc->async_qc_finish(last_proposed).then([this, pm]() { + pm.resolve(proposer); + }); + locked = true; + } + } + + void reg_proposer_propose() { + pm_wait_propose = hsc->async_wait_propose().then( + std::bind(&PMStickyProposer::proposer_propose, this, _1, _2)); + } + + void proposer_propose(const block_t &blk) { + last_proposed = blk; + locked = false; + proposer_schedule_next(); + reg_proposer_propose(); + timer.del(); + timer.add_with_timeout(qc_timeout); + } + + void candidate_qc_timeout() { + pm_qc_finish.reject(); + hsc->async_wait_propose().then([this](const block_t &blk) { + pm_qc_finish = hsc->async_qc_finish(blk).then([this]() { + /* managed to collect a QC */ + to_proposer(); + }); + }); + on_propose(std::vector{}, get_parents()); + timer.del(); + timer.add_with_timeout(gen_rand_timeout(candidate_timeout)); + } + + void reg_candidate_receive_proposal() { + pm_wait_receive_proposal = + hsc->async_wait_receive_proposal().then( + std::bind(&PMStickyProposer::candidate_receive_proposal, this, _1, _2)); + } + + void candidate_receive_proposal(const Proposal &prop) { + auto &p = last_proposed_by[prop.proposer]; + p.second.reject(); + p = std::make_pair(prop.blk, hsc->async_qc_finish(prop.blk).then([this]() { + to_follower(prop.proposer); + })); + } + + void to_follower(ReplicaID new_proposer) { + clear_promises(); + role = FOLLOWER; + proposer = new_proposer; + last_proposed = nullptr; + timer = Event(eb, -1, 0, [this](int, short) { + /* unable to get a QC in time */ + to_candidate(); + }); + reg_follower_receive_proposal(); + /* redirect all pending cmds to the new proposer */ + for (auto &pm: pending_beats) + pm.resolve(proposer); + pending_beats.clear(); + } + + void to_proposer() { + clear_promises(); + role = PROPOSER; + proposer = hsc->get_id(); + last_proposed = hsc->get_genesis(); + timer = Event(eb, -1, 0, [this](int, short) { + /* proposer unable to get a QC in time */ + to_candidate(); + }); + /* prepare the variables for the role of a proposer */ + locked = false; + reg_proposer_propose(); + } + + void to_candidate() { + clear_promises(); + role = CANDIDATE; + proposer = hsc->get_id(); + timer = Event(eb, -1, 0, [this](int, short) { + candidate_qc_timeout(); + }); + candidate_timeout = qc_timeout; + candidate_qc_timeout(); + } + + public: + void init(HotStuffCore *hsc) override { + PMWaitQC::init(hsc); + role = CANDIDATE; + proposer = 0; + } + + ReplicaID get_proposer() override { + return proposer; + } + + promise_t beat() override { + if (proposer == hsc->get_id()) + { + promise_t pm; + pending_beats.push(pm); + proposer_schedule_next(); + return std::move(pm); + } + else + return promise_t([](promise_t &pm) { + pm.resolve(proposer); + }); + } + + promise_t next_proposer(ReplicaID last_proposer) override { + return promise_t([last_proposer](promise_t &pm) { + pm.resolve(proposer); + }); + } +}; + } #endif -- cgit v1.2.3