aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <tederminant@gmail.com>2018-08-02 20:26:21 -0400
committerDeterminant <tederminant@gmail.com>2018-08-02 20:26:21 -0400
commit4c42ac64c2ff821306609898a58126f2da32c7d4 (patch)
treebe01d2f18b09fc056fa520b9ea548f1742a9781c
parent13c4335dc46fc1a4d6757279898347d99483dbfe (diff)
WIP: sticky proposer pacemaker
-rw-r--r--include/hotstuff/consensus.h4
-rw-r--r--include/hotstuff/liveness.h206
m---------salticidae0
-rw-r--r--src/consensus.cpp13
4 files changed, 223 insertions, 0 deletions
diff --git a/include/hotstuff/consensus.h b/include/hotstuff/consensus.h
index 1ff8f79..a373a2b 100644
--- a/include/hotstuff/consensus.h
+++ b/include/hotstuff/consensus.h
@@ -31,6 +31,7 @@ class HotStuffCore {
/* === async event queues === */
std::unordered_map<block_t, promise_t> qc_waiting;
promise_t propose_waiting;
+ promise_t receive_proposal_waiting;
block_t get_delivered_blk(const uint256_t &blk_hash);
void sanity_check_delivered(const block_t &blk);
@@ -38,6 +39,7 @@ class HotStuffCore {
bool update(const uint256_t &bqc_hash);
void on_qc_finish(const block_t &blk);
void on_propose_(const block_t &blk);
+ void on_deliver_blk_(const block_t &blk);
protected:
ReplicaID id; /**< identity of the replica itself */
@@ -125,6 +127,8 @@ class HotStuffCore {
promise_t async_qc_finish(const block_t &blk);
/** Get a promise resolved when a new block is proposed. */
promise_t async_wait_propose();
+ /** Get a promise resolved when a new proposal is received. */
+ promise_t async_wait_receive_proposal();
/* Other useful functions */
block_t get_genesis() { return b0; }
diff --git a/include/hotstuff/liveness.h b/include/hotstuff/liveness.h
index a625f0d..e71fda6 100644
--- a/include/hotstuff/liveness.h
+++ b/include/hotstuff/liveness.h
@@ -68,6 +68,7 @@ class PMWaitQC: public virtual PaceMaker {
block_t last_proposed;
bool locked;
+ protected:
void schedule_next() {
if (!pending_beats.empty() && !locked)
{
@@ -142,6 +143,211 @@ class PaceMakerDummyFixed: public PaceMakerDummy {
}
};
+/**
+ * Simple long-standing proposer liveness gadget.
+ * There are three roles for each replica: proposer, candidate and follower.
+ *
+ * For a proposer, it proposes a new block and refrains itself from proposing
+ * the next block unless it receives the QC for the previous block. It will
+ * give up the leadership and turn into a candidate when it hasn't seen such QC
+ * for a while.
+ *
+ * For a follower, it never proposes any block, but keeps a timer for the QC
+ * for the block last proposed by the proposer (the proposer it believes to
+ * be). When it times out without seeing such QC, the follower turns into a
+ * candidate.
+ *
+ * For a candidate, it periodically proposes empty blocks to synchronize the
+ * preferred branch, with randomized timeout, and check for any new QC. Once it
+ * sees such new QC, if the QC is given by itself, it becomes the proposer,
+ * otherwise yields to the creator of the QC as a follower.
+ */
+class PMStickyProposer: public PMWaitQC {
+ enum {
+ PROPOSER,
+ FOLLOWER,
+ CANDIDATE
+ } role;
+ double qc_timeout;
+ double candidate_timeout;
+ EventContext eb;
+ /** QC timer or randomized timeout */
+ Event timer;
+ block_t last_proposed;
+ /** the proposer it believes when it is a follower */
+ ReplicaID proposer;
+
+ /* extra state needed for a proposer */
+ std::queue<promise_t> pending_beats;
+ bool locked;
+
+ /* extra state needed for a candidate */
+ std::unordered_map<ReplicaID,
+ std::pair<block_t, promise_t>> last_proposed_by;
+
+ promise_t pm_wait_receive_proposal;
+ promise_t pm_wait_propose;
+ promise_t pm_qc_finish;
+
+ void clear_promises() {
+ pm_wait_receive_proposal.reject();
+ pm_wait_propose.reject();
+ pm_qc_finish.reject();
+ for (auto &p: last_proposed_by)
+ p.second.reject();
+ last_proposed_by.clear();
+ }
+
+ void reg_follower_receive_proposal() {
+ pm_wait_receive_proposal =
+ hsc->async_wait_receive_proposal().then(
+ std::bind(&PMStickyProposer::follower_receive_proposal, this, _1, _2));
+ }
+
+ void follower_receive_proposal(const Proposal &prop) {
+ if (prop.proposer == proposer)
+ {
+ auto qc_ref = prop.blk->qc_ref;
+ if (last_proposed)
+ {
+ if (qc_ref != last_proposed)
+ to_candidate(); /* proposer misbehave */
+ }
+ last_proposed = prop.blk;
+ /* reset QC timer */
+ timer.del();
+ timer.add_with_timeout(qc_timeout);
+ }
+ reg_follower_receive_proposal();
+ }
+
+ void proposer_schedule_next() {
+ if (!pending_beats.empty() && !locked)
+ {
+ auto pm = pending_beats.front();
+ pending_beats.pop();
+ pm_qc_finish =
+ hsc->async_qc_finish(last_proposed).then([this, pm]() {
+ pm.resolve(proposer);
+ });
+ locked = true;
+ }
+ }
+
+ void reg_proposer_propose() {
+ pm_wait_propose = hsc->async_wait_propose().then(
+ std::bind(&PMStickyProposer::proposer_propose, this, _1, _2));
+ }
+
+ void proposer_propose(const block_t &blk) {
+ last_proposed = blk;
+ locked = false;
+ proposer_schedule_next();
+ reg_proposer_propose();
+ timer.del();
+ timer.add_with_timeout(qc_timeout);
+ }
+
+ void candidate_qc_timeout() {
+ pm_qc_finish.reject();
+ hsc->async_wait_propose().then([this](const block_t &blk) {
+ pm_qc_finish = hsc->async_qc_finish(blk).then([this]() {
+ /* managed to collect a QC */
+ to_proposer();
+ });
+ });
+ on_propose(std::vector<comman_t>{}, get_parents());
+ timer.del();
+ timer.add_with_timeout(gen_rand_timeout(candidate_timeout));
+ }
+
+ void reg_candidate_receive_proposal() {
+ pm_wait_receive_proposal =
+ hsc->async_wait_receive_proposal().then(
+ std::bind(&PMStickyProposer::candidate_receive_proposal, this, _1, _2));
+ }
+
+ void candidate_receive_proposal(const Proposal &prop) {
+ auto &p = last_proposed_by[prop.proposer];
+ p.second.reject();
+ p = std::make_pair(prop.blk, hsc->async_qc_finish(prop.blk).then([this]() {
+ to_follower(prop.proposer);
+ }));
+ }
+
+ void to_follower(ReplicaID new_proposer) {
+ clear_promises();
+ role = FOLLOWER;
+ proposer = new_proposer;
+ last_proposed = nullptr;
+ timer = Event(eb, -1, 0, [this](int, short) {
+ /* unable to get a QC in time */
+ to_candidate();
+ });
+ reg_follower_receive_proposal();
+ /* redirect all pending cmds to the new proposer */
+ for (auto &pm: pending_beats)
+ pm.resolve(proposer);
+ pending_beats.clear();
+ }
+
+ void to_proposer() {
+ clear_promises();
+ role = PROPOSER;
+ proposer = hsc->get_id();
+ last_proposed = hsc->get_genesis();
+ timer = Event(eb, -1, 0, [this](int, short) {
+ /* proposer unable to get a QC in time */
+ to_candidate();
+ });
+ /* prepare the variables for the role of a proposer */
+ locked = false;
+ reg_proposer_propose();
+ }
+
+ void to_candidate() {
+ clear_promises();
+ role = CANDIDATE;
+ proposer = hsc->get_id();
+ timer = Event(eb, -1, 0, [this](int, short) {
+ candidate_qc_timeout();
+ });
+ candidate_timeout = qc_timeout;
+ candidate_qc_timeout();
+ }
+
+ public:
+ void init(HotStuffCore *hsc) override {
+ PMWaitQC::init(hsc);
+ role = CANDIDATE;
+ proposer = 0;
+ }
+
+ ReplicaID get_proposer() override {
+ return proposer;
+ }
+
+ promise_t beat() override {
+ if (proposer == hsc->get_id())
+ {
+ promise_t pm;
+ pending_beats.push(pm);
+ proposer_schedule_next();
+ return std::move(pm);
+ }
+ else
+ return promise_t([](promise_t &pm) {
+ pm.resolve(proposer);
+ });
+ }
+
+ promise_t next_proposer(ReplicaID last_proposer) override {
+ return promise_t([last_proposer](promise_t &pm) {
+ pm.resolve(proposer);
+ });
+ }
+};
+
}
#endif
diff --git a/salticidae b/salticidae
-Subproject 6a10aa1984e1e58f2238a03c4a6627bca58beb8
+Subproject 37dc1f5bfe6630c224d1b131562166ab5bdd997
diff --git a/src/consensus.cpp b/src/consensus.cpp
index cc1d572..40a5e44 100644
--- a/src/consensus.cpp
+++ b/src/consensus.cpp
@@ -64,6 +64,7 @@ bool HotStuffCore::on_deliver_blk(const block_t &blk) {
blk->delivered = true;
LOG_DEBUG("deliver %s", std::string(*blk).c_str());
+ on_deliver_blk_(blk);
return true;
}
@@ -173,6 +174,8 @@ void HotStuffCore::on_receive_proposal(const Proposal &prop) {
#ifdef HOTSTUFF_PROTO_LOG
LOG_INFO("now state: %s", std::string(*this).c_str());
#endif
+ if (bnew->qc_ref) on_qc_finish(bnew);
+ on_receive_proposal_(prop);
do_vote(prop.proposer,
Vote(id,
bqc->get_hash(),
@@ -272,12 +275,22 @@ promise_t HotStuffCore::async_wait_propose() {
return propose_waiting;
}
+promise_t HotStuffCore::async_wait_receive_proposal() {
+ return receive_proposal_waiting;
+}
+
void HotStuffCore::on_propose_(const block_t &blk) {
auto t = std::move(propose_waiting);
propose_waiting = promise_t();
t.resolve(blk);
}
+void HotStuffCore::on_receive_proposal_(const Proposal &prop) {
+ auto t = std::move(receive_proposal_waiting);
+ receive_proposal_waiting = promise_t();
+ t.resolve(prop);
+}
+
HotStuffCore::operator std::string () const {
DataStream s;
s << "<hotstuff "