From 7d5b607f5e4efc4ab12a5cce49bd8198d8d03fe6 Mon Sep 17 00:00:00 2001 From: Determinant Date: Fri, 3 Aug 2018 13:55:31 -0400 Subject: finish coding PMStickyProposer --- include/hotstuff/consensus.h | 2 +- include/hotstuff/entity.h | 2 ++ include/hotstuff/liveness.h | 84 ++++++++++++++++++++++++++++---------------- 3 files changed, 56 insertions(+), 32 deletions(-) (limited to 'include/hotstuff') diff --git a/include/hotstuff/consensus.h b/include/hotstuff/consensus.h index a373a2b..b176eb4 100644 --- a/include/hotstuff/consensus.h +++ b/include/hotstuff/consensus.h @@ -39,7 +39,7 @@ class HotStuffCore { bool update(const uint256_t &bqc_hash); void on_qc_finish(const block_t &blk); void on_propose_(const block_t &blk); - void on_deliver_blk_(const block_t &blk); + void on_receive_proposal_(const Proposal &prop); protected: ReplicaID id; /**< identity of the replica itself */ diff --git a/include/hotstuff/entity.h b/include/hotstuff/entity.h index 333b66e..09b40aa 100644 --- a/include/hotstuff/entity.h +++ b/include/hotstuff/entity.h @@ -183,6 +183,8 @@ class Block { const quorum_cert_bt &get_qc() const { return qc; } + const block_t &get_qc_ref() const { return qc_ref; } + operator std::string () const { DataStream s; s << "> last_proposed_by; + std::unordered_map last_proposed_by; promise_t pm_wait_receive_proposal; promise_t pm_wait_propose; promise_t pm_qc_finish; + void reset_qc_timer() { + timer.del(); + timer.add_with_timeout(qc_timeout); + HOTSTUFF_LOG_INFO("QC timer reset"); + } + void clear_promises() { pm_wait_receive_proposal.reject(); pm_wait_propose.reject(); @@ -201,22 +210,25 @@ class PMStickyProposer: public PMWaitQC { void reg_follower_receive_proposal() { pm_wait_receive_proposal = hsc->async_wait_receive_proposal().then( - std::bind(&PMStickyProposer::follower_receive_proposal, this, _1, _2)); + salticidae::generic_bind( + &PMStickyProposer::follower_receive_proposal, this, _1)); } void follower_receive_proposal(const Proposal &prop) { if (prop.proposer == proposer) { - auto qc_ref = prop.blk->qc_ref; + auto &qc_ref = prop.blk->get_qc_ref(); if (last_proposed) { if (qc_ref != last_proposed) + { + HOTSTUFF_LOG_INFO("proposer misbehave"); to_candidate(); /* proposer misbehave */ + } } + HOTSTUFF_LOG_INFO("proposer emits new QC"); last_proposed = prop.blk; - /* reset QC timer */ - timer.del(); - timer.add_with_timeout(qc_timeout); + reset_qc_timer(); } reg_follower_receive_proposal(); } @@ -228,6 +240,7 @@ class PMStickyProposer: public PMWaitQC { pending_beats.pop(); pm_qc_finish = hsc->async_qc_finish(last_proposed).then([this, pm]() { + reset_qc_timer(); pm.resolve(proposer); }); locked = true; @@ -236,7 +249,8 @@ class PMStickyProposer: public PMWaitQC { void reg_proposer_propose() { pm_wait_propose = hsc->async_wait_propose().then( - std::bind(&PMStickyProposer::proposer_propose, this, _1, _2)); + salticidae::generic_bind( + &PMStickyProposer::proposer_propose, this, _1)); } void proposer_propose(const block_t &blk) { @@ -244,8 +258,6 @@ class PMStickyProposer: public PMWaitQC { locked = false; proposer_schedule_next(); reg_proposer_propose(); - timer.del(); - timer.add_with_timeout(qc_timeout); } void candidate_qc_timeout() { @@ -256,26 +268,30 @@ class PMStickyProposer: public PMWaitQC { to_proposer(); }); }); - on_propose(std::vector{}, get_parents()); - timer.del(); - timer.add_with_timeout(gen_rand_timeout(candidate_timeout)); + reset_qc_timer(); + hsc->on_propose(std::vector{}, get_parents()); } void reg_candidate_receive_proposal() { pm_wait_receive_proposal = hsc->async_wait_receive_proposal().then( - std::bind(&PMStickyProposer::candidate_receive_proposal, this, _1, _2)); + salticidae::generic_bind( + &PMStickyProposer::candidate_receive_proposal, this, _1)); } void candidate_receive_proposal(const Proposal &prop) { - auto &p = last_proposed_by[prop.proposer]; - p.second.reject(); - p = std::make_pair(prop.blk, hsc->async_qc_finish(prop.blk).then([this]() { - to_follower(prop.proposer); - })); + auto proposer = prop.proposer; + auto &p = last_proposed_by[proposer]; + HOTSTUFF_LOG_INFO("got block from %d", proposer); + p.reject(); + p = hsc->async_qc_finish(prop.blk).then([this, proposer]() { + to_follower(proposer); + }); + reg_candidate_receive_proposal(); } void to_follower(ReplicaID new_proposer) { + HOTSTUFF_LOG_INFO("new role: follower"); clear_promises(); role = FOLLOWER; proposer = new_proposer; @@ -284,14 +300,17 @@ class PMStickyProposer: public PMWaitQC { /* unable to get a QC in time */ to_candidate(); }); - reg_follower_receive_proposal(); /* redirect all pending cmds to the new proposer */ - for (auto &pm: pending_beats) - pm.resolve(proposer); - pending_beats.clear(); + while (!pending_beats.empty()) + { + pending_beats.front().resolve(proposer); + pending_beats.pop(); + } + reg_follower_receive_proposal(); } void to_proposer() { + HOTSTUFF_LOG_INFO("new role: proposer"); clear_promises(); role = PROPOSER; proposer = hsc->get_id(); @@ -306,21 +325,23 @@ class PMStickyProposer: public PMWaitQC { } void to_candidate() { + HOTSTUFF_LOG_INFO("new role: candidate"); clear_promises(); role = CANDIDATE; proposer = hsc->get_id(); + last_proposed = nullptr; timer = Event(eb, -1, 0, [this](int, short) { candidate_qc_timeout(); }); candidate_timeout = qc_timeout; - candidate_qc_timeout(); + timer.add_with_timeout(salticidae::gen_rand_timeout(candidate_timeout)); + reg_candidate_receive_proposal(); } public: void init(HotStuffCore *hsc) override { PMWaitQC::init(hsc); - role = CANDIDATE; - proposer = 0; + to_candidate(); } ReplicaID get_proposer() override { @@ -328,21 +349,22 @@ class PMStickyProposer: public PMWaitQC { } promise_t beat() override { - if (proposer == hsc->get_id()) + if (role != FOLLOWER) { promise_t pm; pending_beats.push(pm); - proposer_schedule_next(); + if (role == PROPOSER) + proposer_schedule_next(); return std::move(pm); } else - return promise_t([](promise_t &pm) { + return promise_t([proposer=proposer](promise_t &pm) { pm.resolve(proposer); }); } - promise_t next_proposer(ReplicaID last_proposer) override { - return promise_t([last_proposer](promise_t &pm) { + promise_t next_proposer(ReplicaID) override { + return promise_t([proposer=proposer](promise_t &pm) { pm.resolve(proposer); }); } -- cgit v1.2.3