From 7d5b607f5e4efc4ab12a5cce49bd8198d8d03fe6 Mon Sep 17 00:00:00 2001 From: Determinant Date: Fri, 3 Aug 2018 13:55:31 -0400 Subject: finish coding PMStickyProposer --- include/hotstuff/consensus.h | 2 +- include/hotstuff/entity.h | 2 ++ include/hotstuff/liveness.h | 84 ++++++++++++++++++++++++++++---------------- salticidae | 2 +- src/consensus.cpp | 1 - src/hotstuff.cpp | 37 ++++++++++++------- src/hotstuff_app.cpp | 2 +- 7 files changed, 82 insertions(+), 48 deletions(-) diff --git a/include/hotstuff/consensus.h b/include/hotstuff/consensus.h index a373a2b..b176eb4 100644 --- a/include/hotstuff/consensus.h +++ b/include/hotstuff/consensus.h @@ -39,7 +39,7 @@ class HotStuffCore { bool update(const uint256_t &bqc_hash); void on_qc_finish(const block_t &blk); void on_propose_(const block_t &blk); - void on_deliver_blk_(const block_t &blk); + void on_receive_proposal_(const Proposal &prop); protected: ReplicaID id; /**< identity of the replica itself */ diff --git a/include/hotstuff/entity.h b/include/hotstuff/entity.h index 333b66e..09b40aa 100644 --- a/include/hotstuff/entity.h +++ b/include/hotstuff/entity.h @@ -183,6 +183,8 @@ class Block { const quorum_cert_bt &get_qc() const { return qc; } + const block_t &get_qc_ref() const { return qc_ref; } + operator std::string () const { DataStream s; s << "> last_proposed_by; + std::unordered_map last_proposed_by; promise_t pm_wait_receive_proposal; promise_t pm_wait_propose; promise_t pm_qc_finish; + void reset_qc_timer() { + timer.del(); + timer.add_with_timeout(qc_timeout); + HOTSTUFF_LOG_INFO("QC timer reset"); + } + void clear_promises() { pm_wait_receive_proposal.reject(); pm_wait_propose.reject(); @@ -201,22 +210,25 @@ class PMStickyProposer: public PMWaitQC { void reg_follower_receive_proposal() { pm_wait_receive_proposal = hsc->async_wait_receive_proposal().then( - std::bind(&PMStickyProposer::follower_receive_proposal, this, _1, _2)); + salticidae::generic_bind( + &PMStickyProposer::follower_receive_proposal, this, _1)); } void follower_receive_proposal(const Proposal &prop) { if (prop.proposer == proposer) { - auto qc_ref = prop.blk->qc_ref; + auto &qc_ref = prop.blk->get_qc_ref(); if (last_proposed) { if (qc_ref != last_proposed) + { + HOTSTUFF_LOG_INFO("proposer misbehave"); to_candidate(); /* proposer misbehave */ + } } + HOTSTUFF_LOG_INFO("proposer emits new QC"); last_proposed = prop.blk; - /* reset QC timer */ - timer.del(); - timer.add_with_timeout(qc_timeout); + reset_qc_timer(); } reg_follower_receive_proposal(); } @@ -228,6 +240,7 @@ class PMStickyProposer: public PMWaitQC { pending_beats.pop(); pm_qc_finish = hsc->async_qc_finish(last_proposed).then([this, pm]() { + reset_qc_timer(); pm.resolve(proposer); }); locked = true; @@ -236,7 +249,8 @@ class PMStickyProposer: public PMWaitQC { void reg_proposer_propose() { pm_wait_propose = hsc->async_wait_propose().then( - std::bind(&PMStickyProposer::proposer_propose, this, _1, _2)); + salticidae::generic_bind( + &PMStickyProposer::proposer_propose, this, _1)); } void proposer_propose(const block_t &blk) { @@ -244,8 +258,6 @@ class PMStickyProposer: public PMWaitQC { locked = false; proposer_schedule_next(); reg_proposer_propose(); - timer.del(); - timer.add_with_timeout(qc_timeout); } void candidate_qc_timeout() { @@ -256,26 +268,30 @@ class PMStickyProposer: public PMWaitQC { to_proposer(); }); }); - on_propose(std::vector{}, get_parents()); - timer.del(); - timer.add_with_timeout(gen_rand_timeout(candidate_timeout)); + reset_qc_timer(); + hsc->on_propose(std::vector{}, get_parents()); } void reg_candidate_receive_proposal() { pm_wait_receive_proposal = hsc->async_wait_receive_proposal().then( - std::bind(&PMStickyProposer::candidate_receive_proposal, this, _1, _2)); + salticidae::generic_bind( + &PMStickyProposer::candidate_receive_proposal, this, _1)); } void candidate_receive_proposal(const Proposal &prop) { - auto &p = last_proposed_by[prop.proposer]; - p.second.reject(); - p = std::make_pair(prop.blk, hsc->async_qc_finish(prop.blk).then([this]() { - to_follower(prop.proposer); - })); + auto proposer = prop.proposer; + auto &p = last_proposed_by[proposer]; + HOTSTUFF_LOG_INFO("got block from %d", proposer); + p.reject(); + p = hsc->async_qc_finish(prop.blk).then([this, proposer]() { + to_follower(proposer); + }); + reg_candidate_receive_proposal(); } void to_follower(ReplicaID new_proposer) { + HOTSTUFF_LOG_INFO("new role: follower"); clear_promises(); role = FOLLOWER; proposer = new_proposer; @@ -284,14 +300,17 @@ class PMStickyProposer: public PMWaitQC { /* unable to get a QC in time */ to_candidate(); }); - reg_follower_receive_proposal(); /* redirect all pending cmds to the new proposer */ - for (auto &pm: pending_beats) - pm.resolve(proposer); - pending_beats.clear(); + while (!pending_beats.empty()) + { + pending_beats.front().resolve(proposer); + pending_beats.pop(); + } + reg_follower_receive_proposal(); } void to_proposer() { + HOTSTUFF_LOG_INFO("new role: proposer"); clear_promises(); role = PROPOSER; proposer = hsc->get_id(); @@ -306,21 +325,23 @@ class PMStickyProposer: public PMWaitQC { } void to_candidate() { + HOTSTUFF_LOG_INFO("new role: candidate"); clear_promises(); role = CANDIDATE; proposer = hsc->get_id(); + last_proposed = nullptr; timer = Event(eb, -1, 0, [this](int, short) { candidate_qc_timeout(); }); candidate_timeout = qc_timeout; - candidate_qc_timeout(); + timer.add_with_timeout(salticidae::gen_rand_timeout(candidate_timeout)); + reg_candidate_receive_proposal(); } public: void init(HotStuffCore *hsc) override { PMWaitQC::init(hsc); - role = CANDIDATE; - proposer = 0; + to_candidate(); } ReplicaID get_proposer() override { @@ -328,21 +349,22 @@ class PMStickyProposer: public PMWaitQC { } promise_t beat() override { - if (proposer == hsc->get_id()) + if (role != FOLLOWER) { promise_t pm; pending_beats.push(pm); - proposer_schedule_next(); + if (role == PROPOSER) + proposer_schedule_next(); return std::move(pm); } else - return promise_t([](promise_t &pm) { + return promise_t([proposer=proposer](promise_t &pm) { pm.resolve(proposer); }); } - promise_t next_proposer(ReplicaID last_proposer) override { - return promise_t([last_proposer](promise_t &pm) { + promise_t next_proposer(ReplicaID) override { + return promise_t([proposer=proposer](promise_t &pm) { pm.resolve(proposer); }); } diff --git a/salticidae b/salticidae index 37dc1f5..3f1c768 160000 --- a/salticidae +++ b/salticidae @@ -1 +1 @@ -Subproject commit 37dc1f5bfe6630c224d1b131562166ab5bdd9976 +Subproject commit 3f1c768e2d5b5e51dec08499d6a877220f33d7a6 diff --git a/src/consensus.cpp b/src/consensus.cpp index 40a5e44..2cac22c 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -64,7 +64,6 @@ bool HotStuffCore::on_deliver_blk(const block_t &blk) { blk->delivered = true; LOG_DEBUG("deliver %s", std::string(*blk).c_str()); - on_deliver_blk_(blk); return true; } diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp index 9a56626..2bde97e 100644 --- a/src/hotstuff.cpp +++ b/src/hotstuff.cpp @@ -66,7 +66,15 @@ promise_t HotStuffBase::exec_command(command_t cmd) { pm.resolve(Finality(proposer, -1, 0, 0, cmd_hash, uint256_t())); }); - cmd_pending.push(storage->add_cmd(cmd)); + + + auto it = decision_waiting.find(cmd_hash); + if (it == decision_waiting.end()) + { + cmd_pending.push(storage->add_cmd(cmd)); + it = decision_waiting.insert(std::make_pair(cmd_hash, promise_t())).first; + } + if (cmd_pending.size() >= blk_size) { std::vector cmds; @@ -75,16 +83,19 @@ promise_t HotStuffBase::exec_command(command_t cmd) { cmds.push_back(cmd_pending.front()); cmd_pending.pop(); } - pmaker->beat().then([this, cmds = std::move(cmds)]() { - on_propose(cmds, pmaker->get_parents()); + pmaker->beat().then([this, cmds = std::move(cmds)](ReplicaID proposer) { + if (proposer != get_id()) + { + for (auto &cmd: cmds) + decision_waiting + .at(cmd->get_hash()) + .resolve(Finality(proposer, -1, 0, 0, + cmd->get_hash(), uint256_t())); + } + else + on_propose(cmds, pmaker->get_parents()); }); } - auto it = decision_waiting.find(cmd_hash); - if (it == decision_waiting.end()) - { - promise_t pm{[](promise_t){}}; - it = decision_waiting.insert(std::make_pair(cmd_hash, pm)).first; - } return it->second; } @@ -385,10 +396,10 @@ HotStuffBase::HotStuffBase(uint32_t blk_size, part_delivery_time_max(0) { /* register the handlers for msg from replicas */ - pn.reg_handler(salticidae::handler_bind(&HotStuffBase::propose_handler, this, _1, _2)); - pn.reg_handler(salticidae::handler_bind(&HotStuffBase::vote_handler, this, _1, _2)); - pn.reg_handler(salticidae::handler_bind(&HotStuffBase::req_blk_handler, this, _1, _2)); - pn.reg_handler(salticidae::handler_bind(&HotStuffBase::resp_blk_handler, this, _1, _2)); + pn.reg_handler(salticidae::generic_bind(&HotStuffBase::propose_handler, this, _1, _2)); + pn.reg_handler(salticidae::generic_bind(&HotStuffBase::vote_handler, this, _1, _2)); + pn.reg_handler(salticidae::generic_bind(&HotStuffBase::req_blk_handler, this, _1, _2)); + pn.reg_handler(salticidae::generic_bind(&HotStuffBase::resp_blk_handler, this, _1, _2)); pn.listen(listen_addr); } diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp index 9a6e5fd..f7270f2 100644 --- a/src/hotstuff_app.cpp +++ b/src/hotstuff_app.cpp @@ -211,7 +211,7 @@ HotStuffApp::HotStuffApp(uint32_t blk_size, clisten_addr(clisten_addr), parent_limit(parent_limit) { /* register the handlers for msg from clients */ - cn.reg_handler(salticidae::handler_bind(&HotStuffApp::client_request_cmd_handler, this, _1, _2)); + cn.reg_handler(salticidae::generic_bind(&HotStuffApp::client_request_cmd_handler, this, _1, _2)); cn.listen(clisten_addr); } -- cgit v1.2.3-70-g09d2