From 75b5b9c9fe6a847510dfad452104fc6987567453 Mon Sep 17 00:00:00 2001
From: Determinant <ted.sybil@gmail.com>
Date: Fri, 24 Aug 2018 09:18:54 -0400
Subject: WIP: RR pacemaker

---
 include/hotstuff/liveness.h | 269 +++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 264 insertions(+), 5 deletions(-)

(limited to 'include')

diff --git a/include/hotstuff/liveness.h b/include/hotstuff/liveness.h
index 404972d..8c9c9ab 100644
--- a/include/hotstuff/liveness.h
+++ b/include/hotstuff/liveness.h
@@ -206,23 +206,29 @@ class PaceMakerDummyFixed: public PaceMakerDummy {
 };
 
 /**
- * Simple long-standing proposer liveness gadget.
+ * Simple long-standing proposer liveness gadget (with randomization).
  * There are three roles for each replica: proposer, candidate and follower.
  *
  * For a proposer, it proposes a new block and refrains itself from proposing
  * the next block unless it receives the QC for the previous block. It will
- * give up the leadership and turn into a candidate when it hasn't seen such QC
- * for a while.
+ * give up the leadership and turn into a candidate when it sees QC for a
+ * higher block or being impeached.
  *
  * For a follower, it never proposes any block, but keeps a timer for the QC
  * for the block last proposed by the proposer (the proposer it believes to
- * be). When it times out without seeing such QC, the follower turns into a
- * candidate.
+ * be). When it times out without seeing such QC or the proposer is impeached,
+ * the follower turns into a candidate.
  *
  * For a candidate, it periodically proposes empty blocks to synchronize the
  * preferred branch, with randomized timeout, and check for any new QC. Once it
  * sees such new QC, if the QC is given by itself, it becomes the proposer,
  * otherwise yields to the creator of the QC as a follower.
+ *
+ * CAUTIONS: This pace maker does not guarantee liveness when a Byzantine node
+ * tries to contend with correct nodes and always proposes higher blocks to
+ * grab the leadership. If you want to use this for your system, please make
+ * sure you introduce mechanism to detect and ban such behavior, or use the
+ * round-robin style pace maker instead.
  */
 class PMStickyProposer: virtual public PaceMaker {
     enum {
@@ -471,6 +477,259 @@ struct PaceMakerSticky: public PMAllParents, public PMStickyProposer {
     }
 };
 
+/**
+ * Simple long-standing round-robin style proposer liveness gadget.
+ */
+class PMRoundRobinProposer: virtual public PaceMaker {
+    enum {
+        PROPOSER,
+        FOLLOWER,
+        CANDIDATE /* rotating */
+    } role;
+    double qc_timeout;
+    double candidate_timeout;
+    EventContext ec;
+    /** QC timer or randomized timeout */
+    Event timer;
+    Event ev_imp;
+    block_t last_proposed;
+    /** the proposer it believes */
+    ReplicaID proposer;
+
+    /* extra state needed for a proposer */
+    std::queue<promise_t> pending_beats;
+    bool locked;
+
+    /* extra state needed for a candidate */
+    std::unordered_map<ReplicaID, promise_t> last_proposed_by;
+
+    promise_t pm_wait_receive_proposal;
+    promise_t pm_wait_propose;
+    promise_t pm_qc_finish;
+
+    void clear_promises() {
+        pm_wait_receive_proposal.reject();
+        pm_wait_propose.reject();
+        pm_qc_finish.reject();
+        for (auto &p: last_proposed_by)
+            p.second.reject();
+        last_proposed_by.clear();
+    }
+
+    /* helper functions for a follower */
+
+    void reg_follower_receive_proposal() {
+        pm_wait_receive_proposal.reject();
+        (pm_wait_receive_proposal = hsc->async_wait_receive_proposal())
+            .then(
+                salticidae::generic_bind(
+                    &PMRoundRobinProposer::follower_receive_proposal, this, _1));
+    }
+
+    void follower_receive_proposal(const Proposal &prop) {
+        if (prop.proposer == proposer)
+        {
+            auto &qc_ref = prop.blk->get_qc_ref();
+            if (last_proposed && qc_ref != last_proposed)
+            {
+                HOTSTUFF_LOG_INFO("proposer misbehave");
+                to_candidate(); /* proposer misbehave */
+                return;
+            }
+            HOTSTUFF_LOG_PROTO("proposer emits new QC");
+            last_proposed = prop.blk;
+        }
+        reg_follower_receive_proposal();
+    }
+
+    /* helper functions for a proposer */
+
+    void proposer_schedule_next() {
+        if (!pending_beats.empty() && !locked)
+        {
+            auto pm = pending_beats.front();
+            pending_beats.pop();
+            pm_qc_finish.reject();
+            (pm_qc_finish = hsc->async_qc_finish(last_proposed))
+                .then([this, pm]() {
+                    timer.del();
+                    pm.resolve(proposer);
+                    timer.add_with_timeout(qc_timeout);
+                    HOTSTUFF_LOG_PROTO("QC timer reset");
+                });
+            locked = true;
+        }
+    }
+
+    void reg_proposer_propose() {
+        pm_wait_propose.reject();
+        (pm_wait_propose = hsc->async_wait_proposal()).then(
+            salticidae::generic_bind(
+                &PMRoundRobinProposer::proposer_propose, this, _1));
+    }
+
+    void proposer_propose(const Proposal &prop) {
+        last_proposed = prop.blk;
+        locked = false;
+        proposer_schedule_next();
+        reg_proposer_propose();
+    }
+
+    void propose_elect_block() {
+        DataStream s;
+        /* FIXME: should extra data be the voter's id? */
+        s << hsc->get_id();
+        /* propose a block for leader election */
+        hsc->on_propose(std::vector<command_t>{},
+                get_parents(), std::move(s));
+    }
+
+    /* helper functions for a candidate */
+
+    void reg_cp_receive_proposal() {
+        pm_wait_receive_proposal.reject();
+        (pm_wait_receive_proposal = hsc->async_wait_receive_proposal())
+            .then(
+                salticidae::generic_bind(
+                    &PMRoundRobinProposer::cp_receive_proposal, this, _1));
+    }
+
+    void cp_receive_proposal(const Proposal &prop) {
+        auto _proposer = prop.proposer;
+        auto &p = last_proposed_by[_proposer];
+        HOTSTUFF_LOG_PROTO("got block %s from %d", std::string(*prop.blk).c_str(), _proposer);
+        p.reject();
+        (p = hsc->async_qc_finish(prop.blk)).then([this, blk=prop.blk, _proposer]() {
+            if (_proposer == proposer)
+                to_follower();
+        });
+        reg_cp_receive_proposal();
+    }
+
+    void candidate_qc_timeout() {
+        timer.del();
+        timer.add_with_timeout(candidate_timeout);
+        candidate_timeout *= 1.01;
+        proposer = (proposer + 1) % hsc->get_config().nreplicas;
+        if (proposer == hsc->get_id())
+        {
+            pm_qc_finish.reject();
+            pm_wait_propose.reject();
+            (pm_wait_propose = hsc->async_wait_proposal()).then([this](const Proposal &prop) {
+                const auto &blk = prop.blk;
+                (pm_qc_finish = hsc->async_qc_finish(blk)).then([this, blk]() {
+                    HOTSTUFF_LOG_INFO("collected QC for %s", std::string(*blk).c_str());
+                    /* managed to collect a QC */
+                    to_proposer();
+                    propose_elect_block();
+                });
+            });
+            propose_elect_block();
+        }
+        HOTSTUFF_LOG_INFO("candidate rotates to %d, next try in %.2fs",
+                            proposer, candidate_timeout);
+    }
+
+    /* role transitions */
+
+    void to_follower() {
+        HOTSTUFF_LOG_INFO("new role: follower");
+        clear_promises();
+        role = FOLLOWER;
+        last_proposed = nullptr;
+        hsc->set_neg_vote(false);
+        timer.clear();
+        /* redirect all pending cmds to the new proposer */
+        while (!pending_beats.empty())
+        {
+            pending_beats.front().resolve(proposer);
+            pending_beats.pop();
+        }
+        reg_follower_receive_proposal();
+    }
+
+    void to_proposer() {
+        HOTSTUFF_LOG_INFO("new role: proposer");
+        clear_promises();
+        role = PROPOSER;
+        last_proposed = nullptr;
+        hsc->set_neg_vote(true);
+        timer = Event(ec, -1, 0, [this](int, short) {
+            /* proposer unable to get a QC in time */
+            to_candidate();
+        });
+        proposer_propose(Proposal(-1, uint256_t(), hsc->get_genesis(), nullptr));
+    }
+
+    void to_candidate() {
+        HOTSTUFF_LOG_INFO("new role: candidate");
+        clear_promises();
+        role = CANDIDATE;
+        last_proposed = nullptr;
+        hsc->set_neg_vote(false);
+        timer = Event(ec, -1, 0, [this](int, short) {
+            candidate_qc_timeout();
+        });
+        candidate_timeout = qc_timeout * 0.1;
+        reg_cp_receive_proposal();
+        candidate_qc_timeout();
+    }
+
+    protected:
+    void impeach() override {
+        if (role == CANDIDATE) return;
+        ev_imp = Event(ec, -1, 0, [this](int, short) {
+            to_candidate();
+        });
+        ev_imp.add_with_timeout(0);
+        HOTSTUFF_LOG_INFO("schedule to impeach the proposer");
+    }
+
+    public:
+    PMRoundRobinProposer(double qc_timeout, const EventContext &ec):
+        qc_timeout(qc_timeout), ec(ec), proposer(0) {}
+
+    void init() {
+        to_candidate();
+    }
+
+    ReplicaID get_proposer() override {
+        return proposer;
+    }
+
+    promise_t beat() override {
+        if (role != FOLLOWER)
+        {
+            promise_t pm;
+            pending_beats.push(pm);
+            if (role == PROPOSER)
+                proposer_schedule_next();
+            return std::move(pm);
+        }
+        else
+            return promise_t([proposer=proposer](promise_t &pm) {
+                pm.resolve(proposer);
+            });
+    }
+
+    promise_t beat_resp(ReplicaID last_proposer) override {
+        return promise_t([this, last_proposer](promise_t &pm) {
+            pm.resolve(last_proposer);
+        });
+    }
+};
+
+struct PaceMakerRR: public PMAllParents, public PMRoundRobinProposer {
+    PaceMakerRR(int32_t parent_limit, double qc_timeout, EventContext eb):
+        PMAllParents(parent_limit), PMRoundRobinProposer(qc_timeout, eb) {}
+
+    void init(HotStuffCore *hsc) override {
+        PaceMaker::init(hsc);
+        PMAllParents::init();
+        PMRoundRobinProposer::init();
+    }
+};
+
 }
 
 #endif
-- 
cgit v1.2.3-70-g09d2