diff options
author | Determinant <[email protected]> | 2018-08-02 20:26:21 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2018-08-02 20:26:21 -0400 |
commit | 4c42ac64c2ff821306609898a58126f2da32c7d4 (patch) | |
tree | be01d2f18b09fc056fa520b9ea548f1742a9781c | |
parent | 13c4335dc46fc1a4d6757279898347d99483dbfe (diff) |
WIP: sticky proposer pacemaker
-rw-r--r-- | include/hotstuff/consensus.h | 4 | ||||
-rw-r--r-- | include/hotstuff/liveness.h | 206 | ||||
m--------- | salticidae | 0 | ||||
-rw-r--r-- | src/consensus.cpp | 13 |
4 files changed, 223 insertions, 0 deletions
diff --git a/include/hotstuff/consensus.h b/include/hotstuff/consensus.h index 1ff8f79..a373a2b 100644 --- a/include/hotstuff/consensus.h +++ b/include/hotstuff/consensus.h @@ -31,6 +31,7 @@ class HotStuffCore { /* === async event queues === */ std::unordered_map<block_t, promise_t> qc_waiting; promise_t propose_waiting; + promise_t receive_proposal_waiting; block_t get_delivered_blk(const uint256_t &blk_hash); void sanity_check_delivered(const block_t &blk); @@ -38,6 +39,7 @@ class HotStuffCore { bool update(const uint256_t &bqc_hash); void on_qc_finish(const block_t &blk); void on_propose_(const block_t &blk); + void on_deliver_blk_(const block_t &blk); protected: ReplicaID id; /**< identity of the replica itself */ @@ -125,6 +127,8 @@ class HotStuffCore { promise_t async_qc_finish(const block_t &blk); /** Get a promise resolved when a new block is proposed. */ promise_t async_wait_propose(); + /** Get a promise resolved when a new proposal is received. */ + promise_t async_wait_receive_proposal(); /* Other useful functions */ block_t get_genesis() { return b0; } diff --git a/include/hotstuff/liveness.h b/include/hotstuff/liveness.h index a625f0d..e71fda6 100644 --- a/include/hotstuff/liveness.h +++ b/include/hotstuff/liveness.h @@ -68,6 +68,7 @@ class PMWaitQC: public virtual PaceMaker { block_t last_proposed; bool locked; + protected: void schedule_next() { if (!pending_beats.empty() && !locked) { @@ -142,6 +143,211 @@ class PaceMakerDummyFixed: public PaceMakerDummy { } }; +/** + * Simple long-standing proposer liveness gadget. + * There are three roles for each replica: proposer, candidate and follower. + * + * For a proposer, it proposes a new block and refrains itself from proposing + * the next block unless it receives the QC for the previous block. It will + * give up the leadership and turn into a candidate when it hasn't seen such QC + * for a while. + * + * For a follower, it never proposes any block, but keeps a timer for the QC + * for the block last proposed by the proposer (the proposer it believes to + * be). When it times out without seeing such QC, the follower turns into a + * candidate. + * + * For a candidate, it periodically proposes empty blocks to synchronize the + * preferred branch, with randomized timeout, and check for any new QC. Once it + * sees such new QC, if the QC is given by itself, it becomes the proposer, + * otherwise yields to the creator of the QC as a follower. + */ +class PMStickyProposer: public PMWaitQC { + enum { + PROPOSER, + FOLLOWER, + CANDIDATE + } role; + double qc_timeout; + double candidate_timeout; + EventContext eb; + /** QC timer or randomized timeout */ + Event timer; + block_t last_proposed; + /** the proposer it believes when it is a follower */ + ReplicaID proposer; + + /* extra state needed for a proposer */ + std::queue<promise_t> pending_beats; + bool locked; + + /* extra state needed for a candidate */ + std::unordered_map<ReplicaID, + std::pair<block_t, promise_t>> last_proposed_by; + + promise_t pm_wait_receive_proposal; + promise_t pm_wait_propose; + promise_t pm_qc_finish; + + void clear_promises() { + pm_wait_receive_proposal.reject(); + pm_wait_propose.reject(); + pm_qc_finish.reject(); + for (auto &p: last_proposed_by) + p.second.reject(); + last_proposed_by.clear(); + } + + void reg_follower_receive_proposal() { + pm_wait_receive_proposal = + hsc->async_wait_receive_proposal().then( + std::bind(&PMStickyProposer::follower_receive_proposal, this, _1, _2)); + } + + void follower_receive_proposal(const Proposal &prop) { + if (prop.proposer == proposer) + { + auto qc_ref = prop.blk->qc_ref; + if (last_proposed) + { + if (qc_ref != last_proposed) + to_candidate(); /* proposer misbehave */ + } + last_proposed = prop.blk; + /* reset QC timer */ + timer.del(); + timer.add_with_timeout(qc_timeout); + } + reg_follower_receive_proposal(); + } + + void proposer_schedule_next() { + if (!pending_beats.empty() && !locked) + { + auto pm = pending_beats.front(); + pending_beats.pop(); + pm_qc_finish = + hsc->async_qc_finish(last_proposed).then([this, pm]() { + pm.resolve(proposer); + }); + locked = true; + } + } + + void reg_proposer_propose() { + pm_wait_propose = hsc->async_wait_propose().then( + std::bind(&PMStickyProposer::proposer_propose, this, _1, _2)); + } + + void proposer_propose(const block_t &blk) { + last_proposed = blk; + locked = false; + proposer_schedule_next(); + reg_proposer_propose(); + timer.del(); + timer.add_with_timeout(qc_timeout); + } + + void candidate_qc_timeout() { + pm_qc_finish.reject(); + hsc->async_wait_propose().then([this](const block_t &blk) { + pm_qc_finish = hsc->async_qc_finish(blk).then([this]() { + /* managed to collect a QC */ + to_proposer(); + }); + }); + on_propose(std::vector<comman_t>{}, get_parents()); + timer.del(); + timer.add_with_timeout(gen_rand_timeout(candidate_timeout)); + } + + void reg_candidate_receive_proposal() { + pm_wait_receive_proposal = + hsc->async_wait_receive_proposal().then( + std::bind(&PMStickyProposer::candidate_receive_proposal, this, _1, _2)); + } + + void candidate_receive_proposal(const Proposal &prop) { + auto &p = last_proposed_by[prop.proposer]; + p.second.reject(); + p = std::make_pair(prop.blk, hsc->async_qc_finish(prop.blk).then([this]() { + to_follower(prop.proposer); + })); + } + + void to_follower(ReplicaID new_proposer) { + clear_promises(); + role = FOLLOWER; + proposer = new_proposer; + last_proposed = nullptr; + timer = Event(eb, -1, 0, [this](int, short) { + /* unable to get a QC in time */ + to_candidate(); + }); + reg_follower_receive_proposal(); + /* redirect all pending cmds to the new proposer */ + for (auto &pm: pending_beats) + pm.resolve(proposer); + pending_beats.clear(); + } + + void to_proposer() { + clear_promises(); + role = PROPOSER; + proposer = hsc->get_id(); + last_proposed = hsc->get_genesis(); + timer = Event(eb, -1, 0, [this](int, short) { + /* proposer unable to get a QC in time */ + to_candidate(); + }); + /* prepare the variables for the role of a proposer */ + locked = false; + reg_proposer_propose(); + } + + void to_candidate() { + clear_promises(); + role = CANDIDATE; + proposer = hsc->get_id(); + timer = Event(eb, -1, 0, [this](int, short) { + candidate_qc_timeout(); + }); + candidate_timeout = qc_timeout; + candidate_qc_timeout(); + } + + public: + void init(HotStuffCore *hsc) override { + PMWaitQC::init(hsc); + role = CANDIDATE; + proposer = 0; + } + + ReplicaID get_proposer() override { + return proposer; + } + + promise_t beat() override { + if (proposer == hsc->get_id()) + { + promise_t pm; + pending_beats.push(pm); + proposer_schedule_next(); + return std::move(pm); + } + else + return promise_t([](promise_t &pm) { + pm.resolve(proposer); + }); + } + + promise_t next_proposer(ReplicaID last_proposer) override { + return promise_t([last_proposer](promise_t &pm) { + pm.resolve(proposer); + }); + } +}; + } #endif diff --git a/salticidae b/salticidae -Subproject 6a10aa1984e1e58f2238a03c4a6627bca58beb8 +Subproject 37dc1f5bfe6630c224d1b131562166ab5bdd997 diff --git a/src/consensus.cpp b/src/consensus.cpp index cc1d572..40a5e44 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -64,6 +64,7 @@ bool HotStuffCore::on_deliver_blk(const block_t &blk) { blk->delivered = true; LOG_DEBUG("deliver %s", std::string(*blk).c_str()); + on_deliver_blk_(blk); return true; } @@ -173,6 +174,8 @@ void HotStuffCore::on_receive_proposal(const Proposal &prop) { #ifdef HOTSTUFF_PROTO_LOG LOG_INFO("now state: %s", std::string(*this).c_str()); #endif + if (bnew->qc_ref) on_qc_finish(bnew); + on_receive_proposal_(prop); do_vote(prop.proposer, Vote(id, bqc->get_hash(), @@ -272,12 +275,22 @@ promise_t HotStuffCore::async_wait_propose() { return propose_waiting; } +promise_t HotStuffCore::async_wait_receive_proposal() { + return receive_proposal_waiting; +} + void HotStuffCore::on_propose_(const block_t &blk) { auto t = std::move(propose_waiting); propose_waiting = promise_t(); t.resolve(blk); } +void HotStuffCore::on_receive_proposal_(const Proposal &prop) { + auto t = std::move(receive_proposal_waiting); + receive_proposal_waiting = promise_t(); + t.resolve(prop); +} + HotStuffCore::operator std::string () const { DataStream s; s << "<hotstuff " |