From 4388716ccf1cea6f691b96f0873f4d40312e3037 Mon Sep 17 00:00:00 2001 From: Determinant Date: Thu, 23 Aug 2018 11:41:20 -0400 Subject: add options for the arguments to pace makers --- src/hotstuff_app.cpp | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) (limited to 'src/hotstuff_app.cpp') diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp index ead4e0b..a917976 100644 --- a/src/hotstuff_app.cpp +++ b/src/hotstuff_app.cpp @@ -124,15 +124,19 @@ int main(int argc, char **argv) { auto opt_privkey = Config::OptValStr::create(); auto opt_help = Config::OptValFlag::create(false); auto opt_pace_maker = Config::OptValStr::create("dummy"); + auto opt_fixed_proposer = Config::OptValInt::create(1); + auto opt_qc_timeout = Config::OptValDouble::create(0.5); config.add_opt("block-size", opt_blk_size, Config::SET_VAL); config.add_opt("parent-limit", opt_parent_limit, Config::SET_VAL); config.add_opt("stat-period", opt_stat_period, Config::SET_VAL); - config.add_opt("replica", opt_replicas, Config::APPEND); - config.add_opt("idx", opt_idx, Config::SET_VAL); - config.add_opt("cport", opt_client_port, Config::SET_VAL); + config.add_opt("replica", opt_replicas, Config::APPEND, 'a', "add an replica to the list"); + config.add_opt("idx", opt_idx, Config::SET_VAL, 'i', "specify the index in the replica list"); + config.add_opt("cport", opt_client_port, Config::SET_VAL, 'c', "specify the port listening for clients"); config.add_opt("privkey", opt_privkey, Config::SET_VAL); config.add_opt("pace-maker", opt_pace_maker, Config::SET_VAL, 'p', "specify pace maker (sticky, dummy)"); + config.add_opt("proposer", opt_fixed_proposer, Config::SET_VAL, 'l', "set the fixed proposer (for dummy)"); + config.add_opt("qc-timeout", opt_qc_timeout, Config::SET_VAL, 't', "set QC timeout (for sticky)"); config.add_opt("help", opt_help, Config::SWITCH_ON, 'h', "show this help info"); EventContext ec; @@ -175,9 +179,9 @@ int main(int argc, char **argv) { auto parent_limit = opt_parent_limit->get(); hotstuff::pacemaker_bt pmaker; if (opt_pace_maker->get() == "sticky") - pmaker = new hotstuff::PaceMakerSticky(parent_limit, 0.5, ec); + pmaker = new hotstuff::PaceMakerSticky(parent_limit, opt_qc_timeout->get(), ec); else - pmaker = new hotstuff::PaceMakerDummyFixed(1, parent_limit); + pmaker = new hotstuff::PaceMakerDummyFixed(opt_fixed_proposer->get(), parent_limit); papp = new HotStuffApp(opt_blk_size->get(), opt_stat_period->get(), -- cgit v1.2.3-70-g09d2 From b5e207ed22ca7fcd924ee9545283cb2431fe8ef8 Mon Sep 17 00:00:00 2001 From: Determinant Date: Thu, 23 Aug 2018 14:21:54 -0400 Subject: add impeach() --- include/hotstuff/hotstuff.h | 1 + include/hotstuff/liveness.h | 27 +++++++++++++++++------ src/hotstuff_app.cpp | 52 +++++++++++++++++++++++++++------------------ 3 files changed, 52 insertions(+), 28 deletions(-) (limited to 'src/hotstuff_app.cpp') diff --git a/include/hotstuff/hotstuff.h b/include/hotstuff/hotstuff.h index f3f3f18..f9aad3d 100644 --- a/include/hotstuff/hotstuff.h +++ b/include/hotstuff/hotstuff.h @@ -195,6 +195,7 @@ class HotStuffBase: public HotStuffCore { void start(bool eb_loop = false); size_t size() const { return pn.all_peers().size(); } + PaceMaker &get_pace_maker() { return *pmaker; } void print_stat() const; /* Helper functions */ diff --git a/include/hotstuff/liveness.h b/include/hotstuff/liveness.h index 488143d..77d4232 100644 --- a/include/hotstuff/liveness.h +++ b/include/hotstuff/liveness.h @@ -32,6 +32,8 @@ class PaceMaker { * to vote for a block. The promise is resolved with the next proposer's ID * */ virtual promise_t beat_resp(ReplicaID last_proposer) = 0; + /** Impeach the current proposer. */ + virtual void impeach() {} }; using pacemaker_bt = BoxObj; @@ -285,7 +287,7 @@ class PMStickyProposer: virtual public PaceMaker { } HOTSTUFF_LOG_INFO("proposer emits new QC"); last_proposed = prop.blk; - reset_qc_timer(); + //reset_qc_timer(); } reg_follower_receive_proposal(); } @@ -300,7 +302,7 @@ class PMStickyProposer: virtual public PaceMaker { pm_qc_finish.reject(); (pm_qc_finish = hsc->async_qc_finish(last_proposed)) .then([this, pm]() { - reset_qc_timer(); + timer.del(); pm.resolve(proposer); }); locked = true; @@ -315,6 +317,7 @@ class PMStickyProposer: virtual public PaceMaker { } void proposer_propose(const Proposal &prop) { + reset_qc_timer(); last_proposed = prop.blk; locked = false; proposer_schedule_next(); @@ -379,10 +382,11 @@ class PMStickyProposer: virtual public PaceMaker { proposer = new_proposer; last_proposed = nullptr; hsc->set_neg_vote(false); - timer = Event(ec, -1, 0, [this](int, short) { - /* unable to get a QC in time */ - to_candidate(); - }); + timer.clear(); + //timer = Event(ec, -1, 0, [this](int, short) { + // /* unable to get a QC in time */ + // to_candidate(); + //}); /* redirect all pending cmds to the new proposer */ while (!pending_beats.empty()) { @@ -404,7 +408,6 @@ class PMStickyProposer: virtual public PaceMaker { to_candidate(); }); proposer_propose(Proposal(-1, uint256_t(), hsc->get_genesis(), nullptr)); - reset_qc_timer(); } void to_candidate() { @@ -422,6 +425,16 @@ class PMStickyProposer: virtual public PaceMaker { candidate_qc_timeout(); } + protected: + void impeach() override { + if (role == CANDIDATE) return; + timer = Event(ec, -1, 0, [this](int, short) { + to_candidate(); + }); + timer.add_with_timeout(0); + HOTSTUFF_LOG_INFO("schedule to impeach the proposer"); + } + public: PMStickyProposer(double qc_timeout, const EventContext &ec): qc_timeout(qc_timeout), ec(ec) {} diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp index a917976..210badc 100644 --- a/src/hotstuff_app.cpp +++ b/src/hotstuff_app.cpp @@ -48,25 +48,22 @@ using hotstuff::promise_t; using HotStuff = hotstuff::HotStuffSecp256k1; -#define LOG_INFO HOTSTUFF_LOG_INFO -#define LOG_DEBUG HOTSTUFF_LOG_DEBUG -#define LOG_WARN HOTSTUFF_LOG_WARN -#define LOG_ERROR HOTSTUFF_LOG_ERROR - class HotStuffApp: public HotStuff { double stat_period; + double impeach_timeout; EventContext ec; /** Network messaging between a replica and its client. */ ClientNetwork cn; /** Timer object to schedule a periodic printing of system statistics */ Event ev_stat_timer; + /** Timer object to monitor the progress for simple impeachment */ + Event impeach_timer; /** The listen address for client RPC */ NetAddr clisten_addr; using Conn = ClientNetwork::Conn; void client_request_cmd_handler(MsgReqCmd &&, Conn &); - void print_stat_cb(evutil_socket_t, short); command_t parse_cmd(DataStream &s) override { auto cmd = new CommandDummy(); @@ -74,15 +71,22 @@ class HotStuffApp: public HotStuff { return cmd; } + void reset_imp_timer() { + impeach_timer.del(); + impeach_timer.add_with_timeout(impeach_timeout); + } + void state_machine_execute(const Finality &fin) override { + reset_imp_timer(); #ifndef HOTSTUFF_ENABLE_BENCHMARK - LOG_INFO("replicated %s", std::string(fin).c_str()); + HOTSTUFF_LOG_INFO("replicated %s", std::string(fin).c_str()); #endif } public: HotStuffApp(uint32_t blk_size, double stat_period, + double impeach_timeout, ReplicaID idx, const bytearray_t &raw_privkey, NetAddr plisten_addr, @@ -126,6 +130,7 @@ int main(int argc, char **argv) { auto opt_pace_maker = Config::OptValStr::create("dummy"); auto opt_fixed_proposer = Config::OptValInt::create(1); auto opt_qc_timeout = Config::OptValDouble::create(0.5); + auto opt_imp_timeout = Config::OptValDouble::create(11); config.add_opt("block-size", opt_blk_size, Config::SET_VAL); config.add_opt("parent-limit", opt_parent_limit, Config::SET_VAL); @@ -137,6 +142,7 @@ int main(int argc, char **argv) { config.add_opt("pace-maker", opt_pace_maker, Config::SET_VAL, 'p', "specify pace maker (sticky, dummy)"); config.add_opt("proposer", opt_fixed_proposer, Config::SET_VAL, 'l', "set the fixed proposer (for dummy)"); config.add_opt("qc-timeout", opt_qc_timeout, Config::SET_VAL, 't', "set QC timeout (for sticky)"); + config.add_opt("imp-timeout", opt_imp_timeout, Config::SET_VAL, 'u', "set impeachment timeout (for sticky)"); config.add_opt("help", opt_help, Config::SWITCH_ON, 'h', "show this help info"); EventContext ec; @@ -185,6 +191,7 @@ int main(int argc, char **argv) { papp = new HotStuffApp(opt_blk_size->get(), opt_stat_period->get(), + opt_imp_timeout->get(), idx, hotstuff::from_hex(opt_privkey->get()), plisten_addr, @@ -209,6 +216,7 @@ int main(int argc, char **argv) { HotStuffApp::HotStuffApp(uint32_t blk_size, double stat_period, + double impeach_timeout, ReplicaID idx, const bytearray_t &raw_privkey, NetAddr plisten_addr, @@ -218,6 +226,7 @@ HotStuffApp::HotStuffApp(uint32_t blk_size, HotStuff(blk_size, idx, raw_privkey, plisten_addr, std::move(pmaker), ec), stat_period(stat_period), + impeach_timeout(impeach_timeout), ec(ec), cn(ec), clisten_addr(clisten_addr) { @@ -231,28 +240,29 @@ void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, Conn &conn) { msg.postponed_parse(this); auto cmd = msg.cmd; std::vector pms; - LOG_DEBUG("processing %s", std::string(*cmd).c_str()); + HOTSTUFF_LOG_DEBUG("processing %s", std::string(*cmd).c_str()); exec_command(cmd).then([this, addr](Finality fin) { cn.send_msg(MsgRespCmd(fin), addr); }); } void HotStuffApp::start() { - ev_stat_timer = Event(ec, -1, 0, - std::bind(&HotStuffApp::print_stat_cb, this, _1, _2)); + ev_stat_timer = Event(ec, -1, 0, [this](int, short) { + HotStuff::print_stat(); + //HotStuffCore::prune(100); + ev_stat_timer.add_with_timeout(stat_period); + }); ev_stat_timer.add_with_timeout(stat_period); - LOG_INFO("** starting the system with parameters **"); - LOG_INFO("blk_size = %lu", blk_size); - LOG_INFO("conns = %lu", HotStuff::size()); - LOG_INFO("** starting the event loop..."); + impeach_timer = Event(ec, -1, 0, [this](int, short) { + get_pace_maker().impeach(); + reset_imp_timer(); + }); + impeach_timer.add_with_timeout(impeach_timeout); + HOTSTUFF_LOG_INFO("** starting the system with parameters **"); + HOTSTUFF_LOG_INFO("blk_size = %lu", blk_size); + HOTSTUFF_LOG_INFO("conns = %lu", HotStuff::size()); + HOTSTUFF_LOG_INFO("** starting the event loop..."); HotStuff::start(); /* enter the event main loop */ ec.dispatch(); } - - -void HotStuffApp::print_stat_cb(evutil_socket_t, short) { - HotStuff::print_stat(); - //HotStuffCore::prune(100); - ev_stat_timer.add_with_timeout(stat_period); -} -- cgit v1.2.3-70-g09d2 From 75b5b9c9fe6a847510dfad452104fc6987567453 Mon Sep 17 00:00:00 2001 From: Determinant Date: Fri, 24 Aug 2018 09:18:54 -0400 Subject: WIP: RR pacemaker --- include/hotstuff/liveness.h | 269 +++++++++++++++++++++++++++++++++++++++++++- src/hotstuff_app.cpp | 2 + 2 files changed, 266 insertions(+), 5 deletions(-) (limited to 'src/hotstuff_app.cpp') diff --git a/include/hotstuff/liveness.h b/include/hotstuff/liveness.h index 404972d..8c9c9ab 100644 --- a/include/hotstuff/liveness.h +++ b/include/hotstuff/liveness.h @@ -206,23 +206,29 @@ class PaceMakerDummyFixed: public PaceMakerDummy { }; /** - * Simple long-standing proposer liveness gadget. + * Simple long-standing proposer liveness gadget (with randomization). * There are three roles for each replica: proposer, candidate and follower. * * For a proposer, it proposes a new block and refrains itself from proposing * the next block unless it receives the QC for the previous block. It will - * give up the leadership and turn into a candidate when it hasn't seen such QC - * for a while. + * give up the leadership and turn into a candidate when it sees QC for a + * higher block or being impeached. * * For a follower, it never proposes any block, but keeps a timer for the QC * for the block last proposed by the proposer (the proposer it believes to - * be). When it times out without seeing such QC, the follower turns into a - * candidate. + * be). When it times out without seeing such QC or the proposer is impeached, + * the follower turns into a candidate. * * For a candidate, it periodically proposes empty blocks to synchronize the * preferred branch, with randomized timeout, and check for any new QC. Once it * sees such new QC, if the QC is given by itself, it becomes the proposer, * otherwise yields to the creator of the QC as a follower. + * + * CAUTIONS: This pace maker does not guarantee liveness when a Byzantine node + * tries to contend with correct nodes and always proposes higher blocks to + * grab the leadership. If you want to use this for your system, please make + * sure you introduce mechanism to detect and ban such behavior, or use the + * round-robin style pace maker instead. */ class PMStickyProposer: virtual public PaceMaker { enum { @@ -471,6 +477,259 @@ struct PaceMakerSticky: public PMAllParents, public PMStickyProposer { } }; +/** + * Simple long-standing round-robin style proposer liveness gadget. + */ +class PMRoundRobinProposer: virtual public PaceMaker { + enum { + PROPOSER, + FOLLOWER, + CANDIDATE /* rotating */ + } role; + double qc_timeout; + double candidate_timeout; + EventContext ec; + /** QC timer or randomized timeout */ + Event timer; + Event ev_imp; + block_t last_proposed; + /** the proposer it believes */ + ReplicaID proposer; + + /* extra state needed for a proposer */ + std::queue pending_beats; + bool locked; + + /* extra state needed for a candidate */ + std::unordered_map last_proposed_by; + + promise_t pm_wait_receive_proposal; + promise_t pm_wait_propose; + promise_t pm_qc_finish; + + void clear_promises() { + pm_wait_receive_proposal.reject(); + pm_wait_propose.reject(); + pm_qc_finish.reject(); + for (auto &p: last_proposed_by) + p.second.reject(); + last_proposed_by.clear(); + } + + /* helper functions for a follower */ + + void reg_follower_receive_proposal() { + pm_wait_receive_proposal.reject(); + (pm_wait_receive_proposal = hsc->async_wait_receive_proposal()) + .then( + salticidae::generic_bind( + &PMRoundRobinProposer::follower_receive_proposal, this, _1)); + } + + void follower_receive_proposal(const Proposal &prop) { + if (prop.proposer == proposer) + { + auto &qc_ref = prop.blk->get_qc_ref(); + if (last_proposed && qc_ref != last_proposed) + { + HOTSTUFF_LOG_INFO("proposer misbehave"); + to_candidate(); /* proposer misbehave */ + return; + } + HOTSTUFF_LOG_PROTO("proposer emits new QC"); + last_proposed = prop.blk; + } + reg_follower_receive_proposal(); + } + + /* helper functions for a proposer */ + + void proposer_schedule_next() { + if (!pending_beats.empty() && !locked) + { + auto pm = pending_beats.front(); + pending_beats.pop(); + pm_qc_finish.reject(); + (pm_qc_finish = hsc->async_qc_finish(last_proposed)) + .then([this, pm]() { + timer.del(); + pm.resolve(proposer); + timer.add_with_timeout(qc_timeout); + HOTSTUFF_LOG_PROTO("QC timer reset"); + }); + locked = true; + } + } + + void reg_proposer_propose() { + pm_wait_propose.reject(); + (pm_wait_propose = hsc->async_wait_proposal()).then( + salticidae::generic_bind( + &PMRoundRobinProposer::proposer_propose, this, _1)); + } + + void proposer_propose(const Proposal &prop) { + last_proposed = prop.blk; + locked = false; + proposer_schedule_next(); + reg_proposer_propose(); + } + + void propose_elect_block() { + DataStream s; + /* FIXME: should extra data be the voter's id? */ + s << hsc->get_id(); + /* propose a block for leader election */ + hsc->on_propose(std::vector{}, + get_parents(), std::move(s)); + } + + /* helper functions for a candidate */ + + void reg_cp_receive_proposal() { + pm_wait_receive_proposal.reject(); + (pm_wait_receive_proposal = hsc->async_wait_receive_proposal()) + .then( + salticidae::generic_bind( + &PMRoundRobinProposer::cp_receive_proposal, this, _1)); + } + + void cp_receive_proposal(const Proposal &prop) { + auto _proposer = prop.proposer; + auto &p = last_proposed_by[_proposer]; + HOTSTUFF_LOG_PROTO("got block %s from %d", std::string(*prop.blk).c_str(), _proposer); + p.reject(); + (p = hsc->async_qc_finish(prop.blk)).then([this, blk=prop.blk, _proposer]() { + if (_proposer == proposer) + to_follower(); + }); + reg_cp_receive_proposal(); + } + + void candidate_qc_timeout() { + timer.del(); + timer.add_with_timeout(candidate_timeout); + candidate_timeout *= 1.01; + proposer = (proposer + 1) % hsc->get_config().nreplicas; + if (proposer == hsc->get_id()) + { + pm_qc_finish.reject(); + pm_wait_propose.reject(); + (pm_wait_propose = hsc->async_wait_proposal()).then([this](const Proposal &prop) { + const auto &blk = prop.blk; + (pm_qc_finish = hsc->async_qc_finish(blk)).then([this, blk]() { + HOTSTUFF_LOG_INFO("collected QC for %s", std::string(*blk).c_str()); + /* managed to collect a QC */ + to_proposer(); + propose_elect_block(); + }); + }); + propose_elect_block(); + } + HOTSTUFF_LOG_INFO("candidate rotates to %d, next try in %.2fs", + proposer, candidate_timeout); + } + + /* role transitions */ + + void to_follower() { + HOTSTUFF_LOG_INFO("new role: follower"); + clear_promises(); + role = FOLLOWER; + last_proposed = nullptr; + hsc->set_neg_vote(false); + timer.clear(); + /* redirect all pending cmds to the new proposer */ + while (!pending_beats.empty()) + { + pending_beats.front().resolve(proposer); + pending_beats.pop(); + } + reg_follower_receive_proposal(); + } + + void to_proposer() { + HOTSTUFF_LOG_INFO("new role: proposer"); + clear_promises(); + role = PROPOSER; + last_proposed = nullptr; + hsc->set_neg_vote(true); + timer = Event(ec, -1, 0, [this](int, short) { + /* proposer unable to get a QC in time */ + to_candidate(); + }); + proposer_propose(Proposal(-1, uint256_t(), hsc->get_genesis(), nullptr)); + } + + void to_candidate() { + HOTSTUFF_LOG_INFO("new role: candidate"); + clear_promises(); + role = CANDIDATE; + last_proposed = nullptr; + hsc->set_neg_vote(false); + timer = Event(ec, -1, 0, [this](int, short) { + candidate_qc_timeout(); + }); + candidate_timeout = qc_timeout * 0.1; + reg_cp_receive_proposal(); + candidate_qc_timeout(); + } + + protected: + void impeach() override { + if (role == CANDIDATE) return; + ev_imp = Event(ec, -1, 0, [this](int, short) { + to_candidate(); + }); + ev_imp.add_with_timeout(0); + HOTSTUFF_LOG_INFO("schedule to impeach the proposer"); + } + + public: + PMRoundRobinProposer(double qc_timeout, const EventContext &ec): + qc_timeout(qc_timeout), ec(ec), proposer(0) {} + + void init() { + to_candidate(); + } + + ReplicaID get_proposer() override { + return proposer; + } + + promise_t beat() override { + if (role != FOLLOWER) + { + promise_t pm; + pending_beats.push(pm); + if (role == PROPOSER) + proposer_schedule_next(); + return std::move(pm); + } + else + return promise_t([proposer=proposer](promise_t &pm) { + pm.resolve(proposer); + }); + } + + promise_t beat_resp(ReplicaID last_proposer) override { + return promise_t([this, last_proposer](promise_t &pm) { + pm.resolve(last_proposer); + }); + } +}; + +struct PaceMakerRR: public PMAllParents, public PMRoundRobinProposer { + PaceMakerRR(int32_t parent_limit, double qc_timeout, EventContext eb): + PMAllParents(parent_limit), PMRoundRobinProposer(qc_timeout, eb) {} + + void init(HotStuffCore *hsc) override { + PaceMaker::init(hsc); + PMAllParents::init(); + PMRoundRobinProposer::init(); + } +}; + } #endif diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp index 210badc..768e81e 100644 --- a/src/hotstuff_app.cpp +++ b/src/hotstuff_app.cpp @@ -186,6 +186,8 @@ int main(int argc, char **argv) { hotstuff::pacemaker_bt pmaker; if (opt_pace_maker->get() == "sticky") pmaker = new hotstuff::PaceMakerSticky(parent_limit, opt_qc_timeout->get(), ec); + else if (opt_pace_maker->get() == "rr") + pmaker = new hotstuff::PaceMakerRR(parent_limit, opt_qc_timeout->get(), ec); else pmaker = new hotstuff::PaceMakerDummyFixed(opt_fixed_proposer->get(), parent_limit); -- cgit v1.2.3-70-g09d2