diff options
author | Determinant <[email protected]> | 2019-07-06 18:37:11 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2019-07-06 18:37:11 -0400 |
commit | 24b6cea7be8b78eaa3681d6274d671057ed112b5 (patch) | |
tree | f3e8ae2a279344340085b96aa5622452dd5af624 | |
parent | 346f688916d87ff856a81e9cf3f3e69245101475 (diff) |
finish the simple Round-Robin Pacemakerpacemaker-clean-up
-rw-r--r-- | include/hotstuff/hotstuff.h | 7 | ||||
-rw-r--r-- | include/hotstuff/liveness.h | 285 | ||||
-rw-r--r-- | src/hotstuff.cpp | 25 | ||||
-rw-r--r-- | src/hotstuff_app.cpp | 20 | ||||
-rw-r--r-- | src/hotstuff_client.cpp | 1 |
5 files changed, 38 insertions, 300 deletions
diff --git a/include/hotstuff/hotstuff.h b/include/hotstuff/hotstuff.h index ffc5e3d..291f9a7 100644 --- a/include/hotstuff/hotstuff.h +++ b/include/hotstuff/hotstuff.h @@ -27,7 +27,6 @@ #include "salticidae/msg.h" #include "hotstuff/util.h" #include "hotstuff/consensus.h" -#include "hotstuff/liveness.h" namespace hotstuff { @@ -82,6 +81,7 @@ struct MsgRespBlock { using promise::promise_t; class HotStuffBase; +using pacemaker_bt = BoxObj<class PaceMaker>; template<EntityType ent_type> class FetchContext: public promise_t { @@ -140,6 +140,7 @@ class HotStuffBase: public HotStuffCore { size_t blk_size; /** libevent handle */ EventContext ec; + salticidae::ThreadCall tcall; VeriPool vpool; std::vector<NetAddr> peers; @@ -223,8 +224,10 @@ class HotStuffBase: public HotStuffCore { bool ec_loop = false); size_t size() const { return peers.size(); } - PaceMaker &get_pace_maker() { return *pmaker; } + auto get_decision_waiting() const { return decision_waiting; } + PaceMaker *get_pace_maker() { return pmaker.get(); } void print_stat() const; + virtual void do_elected(); //#ifdef HOTSTUFF_AUTOCLI // virtual void do_demand_commands(size_t) {} //#endif diff --git a/include/hotstuff/liveness.h b/include/hotstuff/liveness.h index cdefdeb..1286c45 100644 --- a/include/hotstuff/liveness.h +++ b/include/hotstuff/liveness.h @@ -19,7 +19,7 @@ #define _HOTSTUFF_LIVENESS_H #include "salticidae/util.h" -#include "hotstuff/consensus.h" +#include "hotstuff/hotstuff.h" namespace hotstuff { @@ -224,280 +224,6 @@ class PaceMakerDummyFixed: public PaceMakerDummy { }; /** - * Simple long-standing proposer liveness gadget (with randomization). - * There are three roles for each replica: proposer, candidate and follower. - * - * For a proposer, it proposes a new block and refrains itself from proposing - * the next block unless it receives the QC for the previous block. It will - * give up the leadership and turn into a candidate when it sees QC for a - * higher block or being impeached. - * - * For a follower, it never proposes any block, but keeps a timer for the QC - * for the block last proposed by the proposer (the proposer it believes to - * be). When it times out without seeing such QC or the proposer is impeached, - * the follower turns into a candidate. - * - * For a candidate, it periodically proposes empty blocks to synchronize the - * preferred branch, with randomized timeout, and check for any new QC. Once it - * sees such new QC, if the QC is given by itself, it becomes the proposer, - * otherwise yields to the creator of the QC as a follower. - * - * CAUTIONS: This pace maker does not guarantee liveness when a Byzantine node - * tries to contend with correct nodes and always proposes higher blocks to - * grab the leadership. If you want to use this for your system, please make - * sure you introduce mechanism to detect and ban such behavior, or use the - * round-robin style pace maker instead. - */ -class PMStickyProposer: virtual public PaceMaker { - enum { - PROPOSER, - FOLLOWER, - CANDIDATE - } role; - double qc_timeout; - double candidate_timeout; - EventContext ec; - /** QC timer or randomized timeout */ - TimerEvent timer; - TimerEvent ev_imp; - block_t last_proposed; - /** the proposer it believes */ - ReplicaID proposer; - - /* extra state needed for a proposer */ - std::queue<promise_t> pending_beats; - bool locked; - - /* extra state needed for a candidate */ - std::unordered_map<ReplicaID, promise_t> last_proposed_by; - - promise_t pm_wait_receive_proposal; - promise_t pm_wait_propose; - promise_t pm_qc_finish; - - void clear_promises() { - pm_wait_receive_proposal.reject(); - pm_wait_propose.reject(); - pm_qc_finish.reject(); - for (auto &p: last_proposed_by) - p.second.reject(); - last_proposed_by.clear(); - } - - /* helper functions for a follower */ - - void reg_follower_receive_proposal() { - pm_wait_receive_proposal.reject(); - (pm_wait_receive_proposal = hsc->async_wait_receive_proposal()) - .then( - salticidae::generic_bind( - &PMStickyProposer::follower_receive_proposal, this, _1)); - } - - void follower_receive_proposal(const Proposal &prop) { - if (prop.proposer == proposer) - { - auto &qc_ref = prop.blk->get_qc_ref(); - if (last_proposed && qc_ref != last_proposed) - { - HOTSTUFF_LOG_INFO("proposer misbehave"); - to_candidate(); /* proposer misbehave */ - return; - } - HOTSTUFF_LOG_PROTO("proposer emits new QC"); - last_proposed = prop.blk; - } - reg_follower_receive_proposal(); - } - - /* helper functions for a proposer */ - - void proposer_schedule_next() { - if (!pending_beats.empty() && !locked) - { - auto pm = pending_beats.front(); - pending_beats.pop(); - pm_qc_finish.reject(); - (pm_qc_finish = hsc->async_qc_finish(last_proposed)) - .then([this, pm]() { - timer.del(); - pm.resolve(proposer); - timer.add(qc_timeout); - HOTSTUFF_LOG_PROTO("QC timer reset"); - }); - locked = true; - } - } - - void reg_proposer_propose() { - pm_wait_propose.reject(); - (pm_wait_propose = hsc->async_wait_proposal()).then( - salticidae::generic_bind( - &PMStickyProposer::proposer_propose, this, _1)); - } - - void proposer_propose(const Proposal &prop) { - last_proposed = prop.blk; - locked = false; - proposer_schedule_next(); - reg_proposer_propose(); - } - - void propose_elect_block() { - DataStream s; - /* FIXME: should extra data be the voter's id? */ - s << hsc->get_id(); - /* propose a block for leader election */ - hsc->on_propose(std::vector<uint256_t>{}, - get_parents(), std::move(s)); - } - - /* helper functions for a candidate */ - - void candidate_qc_timeout() { - pm_qc_finish.reject(); - pm_wait_propose.reject(); - (pm_wait_propose = hsc->async_wait_proposal()).then([this](const Proposal &prop) { - const auto &blk = prop.blk; - (pm_qc_finish = hsc->async_qc_finish(blk)).then([this, blk]() { - HOTSTUFF_LOG_INFO("collected QC for %s", std::string(*blk).c_str()); - /* managed to collect a QC */ - to_proposer(); - propose_elect_block(); - }); - }); - double t = salticidae::gen_rand_timeout(candidate_timeout); - timer.del(); - timer.add(t); - HOTSTUFF_LOG_INFO("candidate next try in %.2fs", t); - propose_elect_block(); - } - - void reg_cp_receive_proposal() { - pm_wait_receive_proposal.reject(); - (pm_wait_receive_proposal = hsc->async_wait_receive_proposal()) - .then( - salticidae::generic_bind( - &PMStickyProposer::cp_receive_proposal, this, _1)); - } - - void cp_receive_proposal(const Proposal &prop) { - auto _proposer = prop.proposer; - auto &p = last_proposed_by[_proposer]; - HOTSTUFF_LOG_PROTO("got block %s from %d", std::string(*prop.blk).c_str(), _proposer); - p.reject(); - (p = hsc->async_qc_finish(prop.blk)).then([this, blk=prop.blk, _proposer]() { - if (hsc->get_hqc() == blk) - to_follower(_proposer); - }); - reg_cp_receive_proposal(); - } - - /* role transitions */ - - void to_follower(ReplicaID new_proposer) { - HOTSTUFF_LOG_INFO("new role: follower"); - clear_promises(); - role = FOLLOWER; - proposer = new_proposer; - last_proposed = nullptr; - hsc->set_vote_disabled(false); - timer.clear(); - /* redirect all pending cmds to the new proposer */ - while (!pending_beats.empty()) - { - pending_beats.front().resolve(proposer); - pending_beats.pop(); - } - reg_follower_receive_proposal(); - } - - void to_proposer() { - HOTSTUFF_LOG_INFO("new role: proposer"); - clear_promises(); - role = PROPOSER; - proposer = hsc->get_id(); - last_proposed = nullptr; - hsc->set_vote_disabled(true); - timer = TimerEvent(ec, [this](TimerEvent &) { - /* proposer unable to get a QC in time */ - to_candidate(); - }); - reg_cp_receive_proposal(); - proposer_propose(Proposal(-1, hsc->get_genesis(), nullptr)); - } - - void to_candidate() { - HOTSTUFF_LOG_INFO("new role: candidate"); - clear_promises(); - role = CANDIDATE; - proposer = hsc->get_id(); - last_proposed = nullptr; - hsc->set_vote_disabled(false); - timer = TimerEvent(ec, [this](TimerEvent &) { - candidate_qc_timeout(); - }); - candidate_timeout = qc_timeout; - reg_cp_receive_proposal(); - candidate_qc_timeout(); - } - - protected: - void impeach() override { - if (role == CANDIDATE) return; - ev_imp = TimerEvent(ec, [this](TimerEvent &) { - to_candidate(); - }); - ev_imp.add(0); - HOTSTUFF_LOG_INFO("schedule to impeach the proposer"); - } - - public: - PMStickyProposer(double qc_timeout, const EventContext &ec): - qc_timeout(qc_timeout), ec(ec) {} - - size_t get_pending_size() override { return pending_beats.size(); } - - void init() { to_candidate(); } - - ReplicaID get_proposer() override { - return proposer; - } - - promise_t beat() override { - if (role != FOLLOWER) - { - promise_t pm; - pending_beats.push(pm); - if (role == PROPOSER) - proposer_schedule_next(); - return std::move(pm); - } - else - return promise_t([proposer=proposer](promise_t &pm) { - pm.resolve(proposer); - }); - } - - promise_t beat_resp(ReplicaID last_proposer) override { - return promise_t([this, last_proposer](promise_t &pm) { - pm.resolve(last_proposer); - }); - } -}; - -struct PaceMakerSticky: public PMHighTail, public PMStickyProposer { - PaceMakerSticky(int32_t parent_limit, double qc_timeout, EventContext eb): - PMHighTail(parent_limit), PMStickyProposer(qc_timeout, eb) {} - - void init(HotStuffCore *hsc) override { - PaceMaker::init(hsc); - PMHighTail::init(); - PMStickyProposer::init(); - } -}; - -/** * Simple long-standing round-robin style proposer liveness gadget. */ class PMRoundRobinProposer: virtual public PaceMaker { @@ -610,15 +336,14 @@ class PMRoundRobinProposer: virtual public PaceMaker { locked = false; last_proposed = hsc->get_genesis(); proposer_update_last_proposed(); + if (proposer == hsc->get_id()) + static_cast<hotstuff::HotStuffBase *>(hsc)->do_elected(); } protected: void on_consensus(const block_t &blk) override { - if (!rotating) - { - timer.del(); - exp_timeout = base_timeout; - } + timer.del(); + exp_timeout = base_timeout; if (prop_blk[proposer] == blk) stop_rotate(); } diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp index 22f0d82..69501c0 100644 --- a/src/hotstuff.cpp +++ b/src/hotstuff.cpp @@ -17,6 +17,7 @@ #include "hotstuff/hotstuff.h" #include "hotstuff/client.h" +#include "hotstuff/liveness.h" using salticidae::static_pointer_cast; @@ -79,6 +80,17 @@ void HotStuffBase::exec_command(uint256_t cmd_hash, commit_cb_t callback) { cmd_pending.enqueue(std::make_pair(cmd_hash, callback)); } +void HotStuffBase::do_elected() { + // TODO: improve this + tcall.async_call([this](salticidae::ThreadCall::Handle &) { + HOTSTUFF_LOG_PROTO("reproposing waiting commands"); + std::vector<uint256_t> cmds; + for (auto &p: decision_waiting) + cmds.push_back(p.first); + on_propose(cmds, pmaker->get_parents()); + }); +} + void HotStuffBase::on_fetch_blk(const block_t &blk) { #ifdef HOTSTUFF_BLK_PROFILE blk_profiler.get_tx(blk->get_hash()); @@ -329,6 +341,7 @@ HotStuffBase::HotStuffBase(uint32_t blk_size, listen_addr(listen_addr), blk_size(blk_size), ec(ec), + tcall(ec), vpool(ec, nworker), pn(ec, netconfig), pmaker(std::move(pmaker)), @@ -420,21 +433,15 @@ void HotStuffBase::start( while (q.try_dequeue(e)) { ReplicaID proposer = pmaker->get_proposer(); - if (proposer != get_id()) continue; const auto &cmd_hash = e.first; - cmd_pending_buffer.push(cmd_hash); - auto it = decision_waiting.find(cmd_hash); if (it == decision_waiting.end()) - { it = decision_waiting.insert(std::make_pair(cmd_hash, e.second)).first; - } else - { - // TODO: duplicate commands - } - + e.second(Finality(id, 0, 0, 0, cmd_hash, uint256_t())); + if (proposer != get_id()) continue; + cmd_pending_buffer.push(cmd_hash); if (cmd_pending_buffer.size() >= blk_size) { std::vector<uint256_t> cmds; diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp index 778c195..871bafb 100644 --- a/src/hotstuff_app.cpp +++ b/src/hotstuff_app.cpp @@ -35,6 +35,7 @@ #include "hotstuff/util.h" #include "hotstuff/client.h" #include "hotstuff/hotstuff.h" +#include "hotstuff/liveness.h" using salticidae::MsgNetwork; using salticidae::ClientNetwork; @@ -113,6 +114,10 @@ class HotStuffApp: public HotStuff { resp_queue.enqueue(fin); } + void do_elected() override { + HotStuff::do_elected(); + } + //#ifdef HOTSTUFF_AUTOCLI // void do_demand_commands(size_t blk_size) override { // size_t ncli = client_conns.size(); @@ -191,7 +196,7 @@ int main(int argc, char **argv) { config.add_opt("privkey", opt_privkey, Config::SET_VAL); config.add_opt("tls-privkey", opt_tls_privkey, Config::SET_VAL); config.add_opt("tls-cert", opt_tls_cert, Config::SET_VAL); - config.add_opt("pace-maker", opt_pace_maker, Config::SET_VAL, 'p', "specify pace maker (sticky, dummy)"); + config.add_opt("pace-maker", opt_pace_maker, Config::SET_VAL, 'p', "specify pace maker (dummy, rr)"); config.add_opt("proposer", opt_fixed_proposer, Config::SET_VAL, 'l', "set the fixed proposer (for dummy)"); config.add_opt("qc-timeout", opt_qc_timeout, Config::SET_VAL, 't', "set QC timeout (for sticky)"); config.add_opt("imp-timeout", opt_imp_timeout, Config::SET_VAL, 'u', "set impeachment timeout (for sticky)"); @@ -239,12 +244,10 @@ int main(int argc, char **argv) { auto parent_limit = opt_parent_limit->get(); hotstuff::pacemaker_bt pmaker; - if (opt_pace_maker->get() == "sticky") - pmaker = new hotstuff::PaceMakerSticky(parent_limit, opt_qc_timeout->get(), ec); - else if (opt_pace_maker->get() == "rr") - pmaker = new hotstuff::PaceMakerRR(parent_limit, opt_qc_timeout->get(), ec); - else + if (opt_pace_maker->get() == "dummy") pmaker = new hotstuff::PaceMakerDummyFixed(opt_fixed_proposer->get(), parent_limit); + else + pmaker = new hotstuff::PaceMakerRR(parent_limit, opt_qc_timeout->get(), ec); HotStuffApp::Net::Config repnet_config; ClientNetwork<opcode_t>::Config clinet_config; @@ -374,10 +377,11 @@ void HotStuffApp::start(const std::vector<std::tuple<NetAddr, bytearray_t, bytea }); ev_stat_timer.add(stat_period); impeach_timer = TimerEvent(ec, [this](TimerEvent &) { - get_pace_maker().impeach(); + if (get_decision_waiting().size()) + get_pace_maker()->impeach(); reset_imp_timer(); }); - //impeach_timer.add(impeach_timeout); + impeach_timer.add(impeach_timeout); HOTSTUFF_LOG_INFO("** starting the system with parameters **"); HOTSTUFF_LOG_INFO("blk_size = %lu", blk_size); HOTSTUFF_LOG_INFO("conns = %lu", HotStuff::size()); diff --git a/src/hotstuff_client.cpp b/src/hotstuff_client.cpp index 9f7423d..7914125 100644 --- a/src/hotstuff_client.cpp +++ b/src/hotstuff_client.cpp @@ -37,7 +37,6 @@ using hotstuff::EventContext; using hotstuff::MsgReqCmd; using hotstuff::MsgRespCmd; using hotstuff::CommandDummy; -using hotstuff::Finality; using hotstuff::HotStuffError; using hotstuff::uint256_t; using hotstuff::opcode_t; |