aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2018-09-03 00:13:58 -0400
committerDeterminant <[email protected]>2018-09-03 00:13:58 -0400
commitc3368b286fbb1d6b8c22af8ce21e57b5a5720445 (patch)
tree061de96c58873f642a24a151af1bf7ed937fb1c3
parent2535cd89c13485cc4a8e68145c7cb5e8e9398e5c (diff)
parent17f7fd821cf71717a158e2c38699baa6ab2f2af8 (diff)
Merge branch 'master' of github.com:Determinant/hot-stuff
-rw-r--r--README.rst8
-rw-r--r--include/hotstuff/client.h16
-rw-r--r--include/hotstuff/hotstuff.h8
-rw-r--r--include/hotstuff/liveness.h345
-rw-r--r--scripts/gen_conf.py20
-rwxr-xr-xscripts/run.sh457
-rwxr-xr-xscripts/run_client.sh354
-rwxr-xr-xscripts/run_demo.sh (renamed from scripts/run_replicas.sh)0
-rwxr-xr-xscripts/run_demo_client.sh2
-rw-r--r--scripts/thr_hist.py5
-rw-r--r--src/client.cpp2
-rw-r--r--src/hotstuff.cpp2
-rw-r--r--src/hotstuff_app.cpp68
-rw-r--r--src/hotstuff_client.cpp39
14 files changed, 1220 insertions, 106 deletions
diff --git a/README.rst b/README.rst
index 7c526d5..ad93287 100644
--- a/README.rst
+++ b/README.rst
@@ -38,14 +38,14 @@ section may be incomplete and subject to changes.
::
- # clone from the repo
+ # install from the repo
git clone https://github.com/Determinant/hot-stuff.git
cd hot-stuff/
- git submodule update --recursive
+ git submodule update --init --recursive
# ensure openssl and libevent are installed on your machine
cmake -DCMAKE_BUILD_TYPE=Release -DBUILD_SHARED=ON -DHOTSTUFF_PROTO_LOG=ON
make
- # start 4 demo replicas with scripts/run_replicas.sh
- # start the demo client with scripts/run_client.sh
+ # start 4 demo replicas with scripts/run_demo.sh
+ # start the demo client with scripts/run_demo_client.sh
diff --git a/include/hotstuff/client.h b/include/hotstuff/client.h
index 92b4eec..447a9db 100644
--- a/include/hotstuff/client.h
+++ b/include/hotstuff/client.h
@@ -26,8 +26,8 @@ struct MsgRespCmd {
};
class CommandDummy: public Command {
- static uint64_t cnt;
- uint64_t n;
+ uint32_t cid;
+ uint32_t n;
uint256_t hash;
public:
@@ -36,19 +36,15 @@ class CommandDummy: public Command {
~CommandDummy() override {}
- CommandDummy(uint64_t n):
- n(n), hash(salticidae::get_hash(*this)) {}
-
- static command_t make_cmd() {
- return new CommandDummy(cnt++);
- }
+ CommandDummy(uint32_t cid, uint32_t n):
+ cid(cid), n(n), hash(salticidae::get_hash(*this)) {}
void serialize(DataStream &s) const override {
- s << n;
+ s << cid << n;
}
void unserialize(DataStream &s) override {
- s >> n;
+ s >> cid >> n;
hash = salticidae::get_hash(*this);
}
diff --git a/include/hotstuff/hotstuff.h b/include/hotstuff/hotstuff.h
index b0a6827..f9aad3d 100644
--- a/include/hotstuff/hotstuff.h
+++ b/include/hotstuff/hotstuff.h
@@ -23,13 +23,6 @@ using salticidae::_2;
const double ent_waiting_timeout = 10;
const double double_inf = 1e10;
-enum {
- PROPOSE = 0x0,
- VOTE = 0x1,
- QUERY_FETCH_BLK = 0x2,
- RESP_FETCH_BLK = 0x3,
-};
-
/** Network message format for HotStuff. */
struct MsgPropose {
static const opcode_t opcode = 0x0;
@@ -202,6 +195,7 @@ class HotStuffBase: public HotStuffCore {
void start(bool eb_loop = false);
size_t size() const { return pn.all_peers().size(); }
+ PaceMaker &get_pace_maker() { return *pmaker; }
void print_stat() const;
/* Helper functions */
diff --git a/include/hotstuff/liveness.h b/include/hotstuff/liveness.h
index 59306ab..8c9c9ab 100644
--- a/include/hotstuff/liveness.h
+++ b/include/hotstuff/liveness.h
@@ -32,6 +32,8 @@ class PaceMaker {
* to vote for a block. The promise is resolved with the next proposer's ID
* */
virtual promise_t beat_resp(ReplicaID last_proposer) = 0;
+ /** Impeach the current proposer. */
+ virtual void impeach() {}
};
using pacemaker_bt = BoxObj<PaceMaker>;
@@ -117,6 +119,8 @@ class PMWaitQC: public virtual PaceMaker {
std::queue<promise_t> pending_beats;
block_t last_proposed;
bool locked;
+ promise_t pm_qc_finish;
+ promise_t pm_wait_propose;
protected:
void schedule_next() {
@@ -124,19 +128,23 @@ class PMWaitQC: public virtual PaceMaker {
{
auto pm = pending_beats.front();
pending_beats.pop();
- hsc->async_qc_finish(last_proposed).then([this, pm]() {
- pm.resolve(get_proposer());
- });
+ pm_qc_finish.reject();
+ (pm_qc_finish = hsc->async_qc_finish(last_proposed))
+ .then([this, pm]() {
+ pm.resolve(get_proposer());
+ });
locked = true;
}
}
void update_last_proposed() {
- hsc->async_wait_proposal().then([this](const Proposal &prop) {
- update_last_proposed();
+ pm_wait_propose.reject();
+ (pm_wait_propose = hsc->async_wait_proposal()).then(
+ [this](const Proposal &prop) {
last_proposed = prop.blk;
locked = false;
schedule_next();
+ update_last_proposed();
});
}
@@ -155,7 +163,7 @@ class PMWaitQC: public virtual PaceMaker {
promise_t pm;
pending_beats.push(pm);
schedule_next();
- return pm;
+ return std::move(pm);
}
promise_t beat_resp(ReplicaID last_proposer) override {
@@ -198,23 +206,29 @@ class PaceMakerDummyFixed: public PaceMakerDummy {
};
/**
- * Simple long-standing proposer liveness gadget.
+ * Simple long-standing proposer liveness gadget (with randomization).
* There are three roles for each replica: proposer, candidate and follower.
*
* For a proposer, it proposes a new block and refrains itself from proposing
* the next block unless it receives the QC for the previous block. It will
- * give up the leadership and turn into a candidate when it hasn't seen such QC
- * for a while.
+ * give up the leadership and turn into a candidate when it sees QC for a
+ * higher block or being impeached.
*
* For a follower, it never proposes any block, but keeps a timer for the QC
* for the block last proposed by the proposer (the proposer it believes to
- * be). When it times out without seeing such QC, the follower turns into a
- * candidate.
+ * be). When it times out without seeing such QC or the proposer is impeached,
+ * the follower turns into a candidate.
*
* For a candidate, it periodically proposes empty blocks to synchronize the
* preferred branch, with randomized timeout, and check for any new QC. Once it
* sees such new QC, if the QC is given by itself, it becomes the proposer,
* otherwise yields to the creator of the QC as a follower.
+ *
+ * CAUTIONS: This pace maker does not guarantee liveness when a Byzantine node
+ * tries to contend with correct nodes and always proposes higher blocks to
+ * grab the leadership. If you want to use this for your system, please make
+ * sure you introduce mechanism to detect and ban such behavior, or use the
+ * round-robin style pace maker instead.
*/
class PMStickyProposer: virtual public PaceMaker {
enum {
@@ -227,6 +241,7 @@ class PMStickyProposer: virtual public PaceMaker {
EventContext ec;
/** QC timer or randomized timeout */
Event timer;
+ Event ev_imp;
block_t last_proposed;
/** the proposer it believes */
ReplicaID proposer;
@@ -242,12 +257,6 @@ class PMStickyProposer: virtual public PaceMaker {
promise_t pm_wait_propose;
promise_t pm_qc_finish;
- void reset_qc_timer() {
- timer.del();
- timer.add_with_timeout(qc_timeout);
- HOTSTUFF_LOG_PROTO("QC timer reset");
- }
-
void clear_promises() {
pm_wait_receive_proposal.reject();
pm_wait_propose.reject();
@@ -273,13 +282,12 @@ class PMStickyProposer: virtual public PaceMaker {
auto &qc_ref = prop.blk->get_qc_ref();
if (last_proposed && qc_ref != last_proposed)
{
- HOTSTUFF_LOG_PROTO("proposer misbehave");
+ HOTSTUFF_LOG_INFO("proposer misbehave");
to_candidate(); /* proposer misbehave */
return;
}
HOTSTUFF_LOG_PROTO("proposer emits new QC");
last_proposed = prop.blk;
- reset_qc_timer();
}
reg_follower_receive_proposal();
}
@@ -294,8 +302,10 @@ class PMStickyProposer: virtual public PaceMaker {
pm_qc_finish.reject();
(pm_qc_finish = hsc->async_qc_finish(last_proposed))
.then([this, pm]() {
- reset_qc_timer();
+ timer.del();
pm.resolve(proposer);
+ timer.add_with_timeout(qc_timeout);
+ HOTSTUFF_LOG_PROTO("QC timer reset");
});
locked = true;
}
@@ -332,7 +342,7 @@ class PMStickyProposer: virtual public PaceMaker {
(pm_wait_propose = hsc->async_wait_proposal()).then([this](const Proposal &prop) {
const auto &blk = prop.blk;
(pm_qc_finish = hsc->async_qc_finish(blk)).then([this, blk]() {
- HOTSTUFF_LOG_PROTO("collected QC for %s", std::string(*blk).c_str());
+ HOTSTUFF_LOG_INFO("collected QC for %s", std::string(*blk).c_str());
/* managed to collect a QC */
to_proposer();
propose_elect_block();
@@ -341,42 +351,40 @@ class PMStickyProposer: virtual public PaceMaker {
double t = salticidae::gen_rand_timeout(candidate_timeout);
timer.del();
timer.add_with_timeout(t);
- HOTSTUFF_LOG_PROTO("candidate next try in %.2fs", t);
+ HOTSTUFF_LOG_INFO("candidate next try in %.2fs", t);
propose_elect_block();
}
- void reg_candidate_receive_proposal() {
+ void reg_cp_receive_proposal() {
pm_wait_receive_proposal.reject();
(pm_wait_receive_proposal = hsc->async_wait_receive_proposal())
.then(
salticidae::generic_bind(
- &PMStickyProposer::candidate_receive_proposal, this, _1));
+ &PMStickyProposer::cp_receive_proposal, this, _1));
}
- void candidate_receive_proposal(const Proposal &prop) {
+ void cp_receive_proposal(const Proposal &prop) {
auto _proposer = prop.proposer;
auto &p = last_proposed_by[_proposer];
HOTSTUFF_LOG_PROTO("got block %s from %d", std::string(*prop.blk).c_str(), _proposer);
p.reject();
- (p = hsc->async_qc_finish(prop.blk)).then([this, _proposer]() {
- to_follower(_proposer);
+ (p = hsc->async_qc_finish(prop.blk)).then([this, blk=prop.blk, _proposer]() {
+ if (hsc->get_bqc()->get_qc_ref() == blk)
+ to_follower(_proposer);
});
- reg_candidate_receive_proposal();
+ reg_cp_receive_proposal();
}
/* role transitions */
void to_follower(ReplicaID new_proposer) {
- HOTSTUFF_LOG_PROTO("new role: follower");
+ HOTSTUFF_LOG_INFO("new role: follower");
clear_promises();
role = FOLLOWER;
proposer = new_proposer;
last_proposed = nullptr;
hsc->set_neg_vote(false);
- timer = Event(ec, -1, 0, [this](int, short) {
- /* unable to get a QC in time */
- to_candidate();
- });
+ timer.clear();
/* redirect all pending cmds to the new proposer */
while (!pending_beats.empty())
{
@@ -387,7 +395,7 @@ class PMStickyProposer: virtual public PaceMaker {
}
void to_proposer() {
- HOTSTUFF_LOG_PROTO("new role: proposer");
+ HOTSTUFF_LOG_INFO("new role: proposer");
clear_promises();
role = PROPOSER;
proposer = hsc->get_id();
@@ -397,12 +405,12 @@ class PMStickyProposer: virtual public PaceMaker {
/* proposer unable to get a QC in time */
to_candidate();
});
+ reg_cp_receive_proposal();
proposer_propose(Proposal(-1, uint256_t(), hsc->get_genesis(), nullptr));
- reset_qc_timer();
}
void to_candidate() {
- HOTSTUFF_LOG_PROTO("new role: candidate");
+ HOTSTUFF_LOG_INFO("new role: candidate");
clear_promises();
role = CANDIDATE;
proposer = hsc->get_id();
@@ -412,10 +420,20 @@ class PMStickyProposer: virtual public PaceMaker {
candidate_qc_timeout();
});
candidate_timeout = qc_timeout;
- reg_candidate_receive_proposal();
+ reg_cp_receive_proposal();
candidate_qc_timeout();
}
+ protected:
+ void impeach() override {
+ if (role == CANDIDATE) return;
+ ev_imp = Event(ec, -1, 0, [this](int, short) {
+ to_candidate();
+ });
+ ev_imp.add_with_timeout(0);
+ HOTSTUFF_LOG_INFO("schedule to impeach the proposer");
+ }
+
public:
PMStickyProposer(double qc_timeout, const EventContext &ec):
qc_timeout(qc_timeout), ec(ec) {}
@@ -459,6 +477,259 @@ struct PaceMakerSticky: public PMAllParents, public PMStickyProposer {
}
};
+/**
+ * Simple long-standing round-robin style proposer liveness gadget.
+ */
+class PMRoundRobinProposer: virtual public PaceMaker {
+ enum {
+ PROPOSER,
+ FOLLOWER,
+ CANDIDATE /* rotating */
+ } role;
+ double qc_timeout;
+ double candidate_timeout;
+ EventContext ec;
+ /** QC timer or randomized timeout */
+ Event timer;
+ Event ev_imp;
+ block_t last_proposed;
+ /** the proposer it believes */
+ ReplicaID proposer;
+
+ /* extra state needed for a proposer */
+ std::queue<promise_t> pending_beats;
+ bool locked;
+
+ /* extra state needed for a candidate */
+ std::unordered_map<ReplicaID, promise_t> last_proposed_by;
+
+ promise_t pm_wait_receive_proposal;
+ promise_t pm_wait_propose;
+ promise_t pm_qc_finish;
+
+ void clear_promises() {
+ pm_wait_receive_proposal.reject();
+ pm_wait_propose.reject();
+ pm_qc_finish.reject();
+ for (auto &p: last_proposed_by)
+ p.second.reject();
+ last_proposed_by.clear();
+ }
+
+ /* helper functions for a follower */
+
+ void reg_follower_receive_proposal() {
+ pm_wait_receive_proposal.reject();
+ (pm_wait_receive_proposal = hsc->async_wait_receive_proposal())
+ .then(
+ salticidae::generic_bind(
+ &PMRoundRobinProposer::follower_receive_proposal, this, _1));
+ }
+
+ void follower_receive_proposal(const Proposal &prop) {
+ if (prop.proposer == proposer)
+ {
+ auto &qc_ref = prop.blk->get_qc_ref();
+ if (last_proposed && qc_ref != last_proposed)
+ {
+ HOTSTUFF_LOG_INFO("proposer misbehave");
+ to_candidate(); /* proposer misbehave */
+ return;
+ }
+ HOTSTUFF_LOG_PROTO("proposer emits new QC");
+ last_proposed = prop.blk;
+ }
+ reg_follower_receive_proposal();
+ }
+
+ /* helper functions for a proposer */
+
+ void proposer_schedule_next() {
+ if (!pending_beats.empty() && !locked)
+ {
+ auto pm = pending_beats.front();
+ pending_beats.pop();
+ pm_qc_finish.reject();
+ (pm_qc_finish = hsc->async_qc_finish(last_proposed))
+ .then([this, pm]() {
+ timer.del();
+ pm.resolve(proposer);
+ timer.add_with_timeout(qc_timeout);
+ HOTSTUFF_LOG_PROTO("QC timer reset");
+ });
+ locked = true;
+ }
+ }
+
+ void reg_proposer_propose() {
+ pm_wait_propose.reject();
+ (pm_wait_propose = hsc->async_wait_proposal()).then(
+ salticidae::generic_bind(
+ &PMRoundRobinProposer::proposer_propose, this, _1));
+ }
+
+ void proposer_propose(const Proposal &prop) {
+ last_proposed = prop.blk;
+ locked = false;
+ proposer_schedule_next();
+ reg_proposer_propose();
+ }
+
+ void propose_elect_block() {
+ DataStream s;
+ /* FIXME: should extra data be the voter's id? */
+ s << hsc->get_id();
+ /* propose a block for leader election */
+ hsc->on_propose(std::vector<command_t>{},
+ get_parents(), std::move(s));
+ }
+
+ /* helper functions for a candidate */
+
+ void reg_cp_receive_proposal() {
+ pm_wait_receive_proposal.reject();
+ (pm_wait_receive_proposal = hsc->async_wait_receive_proposal())
+ .then(
+ salticidae::generic_bind(
+ &PMRoundRobinProposer::cp_receive_proposal, this, _1));
+ }
+
+ void cp_receive_proposal(const Proposal &prop) {
+ auto _proposer = prop.proposer;
+ auto &p = last_proposed_by[_proposer];
+ HOTSTUFF_LOG_PROTO("got block %s from %d", std::string(*prop.blk).c_str(), _proposer);
+ p.reject();
+ (p = hsc->async_qc_finish(prop.blk)).then([this, blk=prop.blk, _proposer]() {
+ if (_proposer == proposer)
+ to_follower();
+ });
+ reg_cp_receive_proposal();
+ }
+
+ void candidate_qc_timeout() {
+ timer.del();
+ timer.add_with_timeout(candidate_timeout);
+ candidate_timeout *= 1.01;
+ proposer = (proposer + 1) % hsc->get_config().nreplicas;
+ if (proposer == hsc->get_id())
+ {
+ pm_qc_finish.reject();
+ pm_wait_propose.reject();
+ (pm_wait_propose = hsc->async_wait_proposal()).then([this](const Proposal &prop) {
+ const auto &blk = prop.blk;
+ (pm_qc_finish = hsc->async_qc_finish(blk)).then([this, blk]() {
+ HOTSTUFF_LOG_INFO("collected QC for %s", std::string(*blk).c_str());
+ /* managed to collect a QC */
+ to_proposer();
+ propose_elect_block();
+ });
+ });
+ propose_elect_block();
+ }
+ HOTSTUFF_LOG_INFO("candidate rotates to %d, next try in %.2fs",
+ proposer, candidate_timeout);
+ }
+
+ /* role transitions */
+
+ void to_follower() {
+ HOTSTUFF_LOG_INFO("new role: follower");
+ clear_promises();
+ role = FOLLOWER;
+ last_proposed = nullptr;
+ hsc->set_neg_vote(false);
+ timer.clear();
+ /* redirect all pending cmds to the new proposer */
+ while (!pending_beats.empty())
+ {
+ pending_beats.front().resolve(proposer);
+ pending_beats.pop();
+ }
+ reg_follower_receive_proposal();
+ }
+
+ void to_proposer() {
+ HOTSTUFF_LOG_INFO("new role: proposer");
+ clear_promises();
+ role = PROPOSER;
+ last_proposed = nullptr;
+ hsc->set_neg_vote(true);
+ timer = Event(ec, -1, 0, [this](int, short) {
+ /* proposer unable to get a QC in time */
+ to_candidate();
+ });
+ proposer_propose(Proposal(-1, uint256_t(), hsc->get_genesis(), nullptr));
+ }
+
+ void to_candidate() {
+ HOTSTUFF_LOG_INFO("new role: candidate");
+ clear_promises();
+ role = CANDIDATE;
+ last_proposed = nullptr;
+ hsc->set_neg_vote(false);
+ timer = Event(ec, -1, 0, [this](int, short) {
+ candidate_qc_timeout();
+ });
+ candidate_timeout = qc_timeout * 0.1;
+ reg_cp_receive_proposal();
+ candidate_qc_timeout();
+ }
+
+ protected:
+ void impeach() override {
+ if (role == CANDIDATE) return;
+ ev_imp = Event(ec, -1, 0, [this](int, short) {
+ to_candidate();
+ });
+ ev_imp.add_with_timeout(0);
+ HOTSTUFF_LOG_INFO("schedule to impeach the proposer");
+ }
+
+ public:
+ PMRoundRobinProposer(double qc_timeout, const EventContext &ec):
+ qc_timeout(qc_timeout), ec(ec), proposer(0) {}
+
+ void init() {
+ to_candidate();
+ }
+
+ ReplicaID get_proposer() override {
+ return proposer;
+ }
+
+ promise_t beat() override {
+ if (role != FOLLOWER)
+ {
+ promise_t pm;
+ pending_beats.push(pm);
+ if (role == PROPOSER)
+ proposer_schedule_next();
+ return std::move(pm);
+ }
+ else
+ return promise_t([proposer=proposer](promise_t &pm) {
+ pm.resolve(proposer);
+ });
+ }
+
+ promise_t beat_resp(ReplicaID last_proposer) override {
+ return promise_t([this, last_proposer](promise_t &pm) {
+ pm.resolve(last_proposer);
+ });
+ }
+};
+
+struct PaceMakerRR: public PMAllParents, public PMRoundRobinProposer {
+ PaceMakerRR(int32_t parent_limit, double qc_timeout, EventContext eb):
+ PMAllParents(parent_limit), PMRoundRobinProposer(qc_timeout, eb) {}
+
+ void init(HotStuffCore *hsc) override {
+ PaceMaker::init(hsc);
+ PMAllParents::init();
+ PMRoundRobinProposer::init();
+ }
+};
+
}
#endif
diff --git a/scripts/gen_conf.py b/scripts/gen_conf.py
index bc45540..391e0d6 100644
--- a/scripts/gen_conf.py
+++ b/scripts/gen_conf.py
@@ -6,33 +6,43 @@ import argparse
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Generate configuration file for a batch of replicas')
parser.add_argument('--prefix', type=str, default='hotstuff.gen')
- parser.add_argument('--iplist', type=str, default=None)
+ parser.add_argument('--ips', type=str, default=None)
parser.add_argument('--iter', type=int, default=10)
parser.add_argument('--pport', type=int, default=10000)
parser.add_argument('--cport', type=int, default=20000)
parser.add_argument('--keygen', type=str, default='./hotstuff-keygen')
+ parser.add_argument('--nodes', type=str, default='nodes.txt')
+ parser.add_argument('--block-size', type=int, default=1)
+ parser.add_argument('--pace-maker', type=str, default='dummy')
args = parser.parse_args()
- if args.iplist is None:
+ if args.ips is None:
ips = ['127.0.0.1']
else:
- ips = [l.strip() for l in open(args.iplist, 'r').readlines()]
+ ips = [l.strip() for l in open(args.ips, 'r').readlines()]
prefix = args.prefix
iter = args.iter
base_pport = args.pport
base_cport = args.cport
- keygen_bin= args.keygen
+ keygen_bin = args.keygen
main_conf = open("{}.conf".format(prefix), 'w')
+ nodes = open(args.nodes, 'w')
replicas = ["{}:{};{}".format(ip, base_pport + i, base_cport + i)
for ip in ips
for i in range(iter)]
p = subprocess.Popen([keygen_bin, '--num', str(len(replicas))],
stdout=subprocess.PIPE, stderr=open(os.devnull, 'w'))
keys = [[t[4:] for t in l.decode('ascii').split()] for l in p.stdout]
+ if not (args.block_size is None):
+ main_conf.write("block-size = {}\n".format(args.block_size))
+ if not (args.pace_maker is None):
+ main_conf.write("pace-maker = {}\n".format(args.pace_maker))
for r in zip(replicas, keys, itertools.count(0)):
main_conf.write("replica = {}, {}\n".format(r[0], r[1][0]))
- r_conf = open("{}-sec{}.conf".format(prefix, r[2]), 'w')
+ r_conf_name = "{}-sec{}.conf".format(prefix, r[2])
+ nodes.write("{}:{}\t{}\n".format(r[2], r[0], r_conf_name))
+ r_conf = open(r_conf_name, 'w')
r_conf.write("privkey = {}\n".format(r[1][1]))
r_conf.write("idx = {}\n".format(r[2]))
diff --git a/scripts/run.sh b/scripts/run.sh
new file mode 100755
index 0000000..53d9923
--- /dev/null
+++ b/scripts/run.sh
@@ -0,0 +1,457 @@
+#!/bin/bash
+
+proj_server_bin="hotstuff-app"
+proj_server_path="/home/ted/hot-stuff/$proj_server_bin"
+proj_conf_name="hotstuff.conf"
+
+peer_list="./nodes.txt" # the list of nodes
+conf_src="./hotstuff.gen.conf"
+server_map="./server_map.txt" # optional mapping from node ip to server ip
+template_dir="template" # the dir that keeps the content shared among all nodes
+remote_base="/home/ted/testbed" # remote dir used to keep files for the experiment
+#remote_base="/tmp/" # remote dir used to keep files for the experiment
+remote_log="log" # log filename
+remote_user="ted"
+copy_to_remote_pat="rsync -avz <local_path> <remote_user>@<remote_ip>:<remote_path>"
+copy_from_remote_pat="rsync -avz <remote_user>@<remote_ip>:<remote_path> <local_path>"
+exe_remote_pat="ssh <remote_user>@<remote_ip> bash"
+run_remote_pat="cd \"<rworkdir>\"; gdb -ex r -ex bt -ex generate-core-file -ex q --args '$proj_server_path' --conf \"hotstuff.gen-sec<node_id>.conf\""
+reset_remote_pat="pgrep -f '$proj_server_bin' | xargs kill -9"
+
+fin_keyword="error:" # the keyword indicating completion of execution
+fin_chk_period=1
+fin_chk_skip_pat='^([A-O][0-9]*)|(_ctl)$'
+force_peer_list=0
+
+function join { local IFS="$1"; shift; echo "$*"; }
+function split {
+ local IFS="$1"
+ local arr=($2)
+ echo "${arr[@]}"
+}
+
+function die { echo "$1"; exit 1; }
+
+declare -A nodes
+declare -A node_confs
+nodes_cnt=0
+function get_node_info {
+ pl="$1"
+ if [[ "$force_peer_list" == 1 ]]; then
+ pl="$peer_list"
+ fi
+ OIFS="$IFS"
+ IFS=$'\n'
+ node_list=($(cat "$pl"))
+ IFS="$OIFS"
+ for tuple in "${node_list[@]}"; do
+ tup0=($(split $'\t' "$tuple"))
+ tup=($(split : "${tup0[0]}"))
+ nodes[${tup[0]}]="${tup[1]}:${tup[2]}"
+ node_confs[${tup[0]}]="${tup0[@]:1}"
+ echo "${tup[0]} => ${nodes[${tup[0]}]} & ${node_confs[${tup[0]}]}"
+ let nodes_cnt++
+ done
+}
+
+declare -A server_map
+function get_server_map {
+ {
+ IFS=$'\n'
+ map_list=($(cat "$1"))
+ }
+ IFS=$'\n \t'
+ for pair in "${map_list[@]}"; do
+ p=($pair)
+ server_map[${p[0]}]="${p[1]}"
+ echo "mapping ${p[0]} => ${p[1]}"
+ done
+}
+
+
+function get_addr {
+ tup=($(split ';' $1))
+ echo "${tup[0]}"
+}
+
+function get_ip {
+ tup=($(split : $1))
+ echo "${tup[0]}"
+}
+
+function get_peer_port {
+ tup=($(split : $1))
+ tup2=($(split ';' ${tup[1]}))
+ echo "${tup2[0]}"
+}
+
+
+function get_client_port {
+ tup=($(split : $1))
+ tup2=($(split ';' ${tup[1]}))
+ echo "${tup2[1]}"
+}
+
+
+function get_ip_by_id {
+ get_ip "${nodes[$1]}"
+}
+
+function get_peer_port_by_id {
+ get_peer_port "${nodes[$1]}"
+}
+
+
+function get_client_port_by_id {
+ get_client_port "${nodes[$1]}"
+}
+
+function copy_file {
+ local pat="$1"
+ local cmd="${pat//<local_path>/$2}"
+ cmd="${cmd//<remote_ip>/$3}"
+ cmd="${cmd//<remote_user>/$remote_user}"
+ cmd="${cmd//<remote_path>/$4}"
+ echo $cmd
+ eval "$cmd"
+} >> log 2>&1
+
+function execute_remote_cmd_pid {
+ local node_ip="$1"
+ local c="$2"
+ local l="$3"
+ local cmd="${exe_remote_pat//<remote_ip>/$node_ip}"
+ cmd="${cmd//<remote_user>/$remote_user}"
+ eval $cmd << EOF
+$c > $l 2>&1 & echo \$!
+EOF
+}
+
+
+
+function execute_remote_cmd_stat {
+ local node_ip="$1"
+ local c="$2"
+ local l="$3"
+ local cmd="${exe_remote_pat//<remote_ip>/$node_ip}"
+ cmd="${cmd//<remote_user>/$remote_user}"
+ eval $cmd << EOF
+$c > $l 2>&1 ; echo \$?
+EOF
+}
+
+
+function _remote_load {
+ local workdir="$1"
+ local rworkdir="$2"
+ local node_ip="$3"
+ local rid="$4"
+ local extra_conf=($5)
+ local tmpldir="$workdir/$template_dir/"
+ local node_tmpldir="$workdir/$rid"
+ [[ $(execute_remote_cmd_stat "$node_ip" \
+ "mkdir -p \"$rworkdir\"" \
+ /dev/null) == 0 ]] || die "failed to create directory $rworkdir"
+ copy_file "$copy_to_remote_pat" "$tmpldir" "$node_ip" "$rworkdir"
+ for conf in "${extra_conf[@]}"; do
+ copy_file "$copy_to_remote_pat" "$node_tmpldir/$conf" "$node_ip" "$rworkdir"
+ done
+}
+
+function _remote_start {
+ local workdir="$1"
+ local rworkdir="$2"
+ local node_id="$3"
+ local node_ip="$4"
+ local client_port="$5"
+ local cmd="${run_remote_pat//<rworkdir>/$rworkdir}"
+ cmd="${cmd//<node_id>/$node_id}"
+ cmd="${cmd//<cport>/$client_port}"
+ execute_remote_cmd_pid "$node_ip" "$cmd" \
+ "\"$rworkdir/$remote_log\"" > "$workdir/${node_id}.pid"
+}
+
+function _remote_exec {
+ local workdir="$1"
+ local rworkdir="$2"
+ local node_ip="$3"
+ local cmd="$4"
+ [[ $(execute_remote_cmd_stat "$node_ip" "$cmd" /dev/null) == 0 ]]
+}
+
+function _remote_stop {
+ local node_pid="$4"
+ _remote_exec "$1" "$2" "$3" "kill $node_pid"
+}
+
+function _remote_status {
+ local node_pid="$4"
+ _remote_exec "$1" "$2" "$3" "kill -0 $node_pid"
+}
+
+function _remote_finished {
+ _remote_exec "$1" "$2" "$3" "grep \"$fin_keyword\" \"$rworkdir/$remote_log\""
+}
+
+function _remote_fetch {
+ local workdir="$1"
+ local rworkdir="$2"
+ local node_id="$3"
+ local node_ip="$4"
+ copy_file "$copy_from_remote_pat" "$workdir/${node_id}.log" "$node_ip" "$rworkdir/$remote_log"
+}
+
+function start_all {
+ local workdir="$1"
+ local tmpldir="$workdir/$template_dir/"
+ mkdir "$workdir" > /dev/null 2>&1 || die "workdir already exists"
+ rm -rf "$tmpldir"
+ mkdir "$tmpldir"
+ cp "$peer_list" "$workdir/peer_list.txt"
+ cp "$server_map" "$workdir/server_map.txt"
+ get_node_info "$workdir/peer_list.txt"
+ get_server_map "$workdir/server_map.txt"
+ echo "copying configuration file"
+ cp "$conf_src" "$tmpldir/$proj_conf_name"
+ for rid in "${!nodes[@]}"; do
+ local node_tmpldir="$workdir/$rid"
+ local ip="$(get_ip_by_id $rid)"
+ ip="${server_map[$ip]:-$ip}"
+ local pport="$(get_peer_port_by_id $rid)"
+ local cport="$(get_client_port_by_id $rid)"
+ local rworkdir="$remote_base/$workdir/${rid}"
+ local extra_conf_=(${node_confs[$rid]})
+ rm -rf "$node_tmpldir"
+ mkdir "$node_tmpldir"
+ (
+ local extra_conf=()
+ for conf in "${extra_conf_[@]}"; do
+ cp "$conf" "$node_tmpldir/"
+ extra_conf+=($(basename "$conf"))
+ copy_file "$copy_to_remote_pat" "$tmpldir/$conf" "$node_ip" "$rworkdir"
+ done
+ echo "Starting $rid @ $ip, $pport and $cport"
+ _remote_load "$workdir" "$rworkdir" "$ip" "$rid" "${extra_conf[@]}"
+ echo "$rid loaded"
+ ) &
+ done
+ wait
+ for rid in "${!nodes[@]}"; do
+ local ip="$(get_ip_by_id $rid)"
+ ip="${server_map[$ip]:-$ip}"
+ local pport="$(get_peer_port_by_id $rid)"
+ local cport="$(get_client_port_by_id $rid)"
+ local rworkdir="$remote_base/$workdir/${rid}"
+ (
+ echo "Starting $rid @ $ip, $pport and $cport"
+ _remote_start "$workdir" "$rworkdir" "$rid" "$ip" "$cport"
+ echo "$rid started"
+ ) &
+ done
+ wait
+}
+
+function fetch_all {
+ local workdir="$1"
+ get_node_info "$workdir/peer_list.txt"
+ get_server_map "$workdir/server_map.txt"
+ for rid in "${!nodes[@]}"; do
+ local ip="$(get_ip_by_id $rid)"
+ ip="${server_map[$ip]:-$ip}"
+ local port="$(get_peer_port_by_id $rid)"
+ local rworkdir="$remote_base/$workdir/${rid}"
+ local pid="$(cat $workdir/${rid}.pid)"
+ local msg="Fetching $rid @ $ip, $port "
+ _remote_fetch "$workdir" "$rworkdir" "$rid" "$ip" && echo "$msg: copied" || echo "$msg: failed" &
+ done
+ wait
+}
+
+function exec_all {
+ local workdir="$1"
+ local cmd="$2"
+ get_node_info "$workdir/peer_list.txt"
+ get_server_map "$workdir/server_map.txt"
+ for rid in "${!nodes[@]}"; do
+ local ip="$(get_ip_by_id $rid)"
+ ip="${server_map[$ip]:-$ip}"
+ local port="$(get_peer_port_by_id $rid)"
+ local rworkdir="$remote_base/$workdir/${rid}"
+ local msg="Executing $rid @ $ip, $port "
+ _remote_exec "$workdir" "$rworkdir" "$ip" "$cmd" && echo "$msg: succeeded" || echo "$msg: failed" &
+ done
+ wait
+}
+
+function reset_all {
+ exec_all "$1" "$reset_remote_pat"
+}
+
+function stop_all {
+ local workdir="$1"
+ get_node_info "$workdir/peer_list.txt"
+ get_server_map "$workdir/server_map.txt"
+ for rid in "${!nodes[@]}"; do
+ local ip="$(get_ip_by_id $rid)"
+ ip="${server_map[$ip]:-$ip}"
+ local port="$(get_peer_port_by_id $rid)"
+ local rworkdir="$remote_base/$workdir/${rid}"
+ local pid="$(cat $workdir/${rid}.pid)"
+ local msg="Killing $rid @ $ip, $port "
+ _remote_stop "$workdir" "$rworkdir" "$ip" "$pid" && echo "$msg: stopped" || echo "$msg: failed" &
+ done
+ wait
+}
+
+function status_all {
+ local workdir="$1"
+ get_node_info "$workdir/peer_list.txt"
+ get_server_map "$workdir/server_map.txt"
+ for rid in "${!nodes[@]}"; do
+ local ip="$(get_ip_by_id $rid)"
+ ip="${server_map[$ip]:-$ip}"
+ local port="$(get_peer_port_by_id $rid)"
+ local rworkdir="$remote_base/$workdir/${rid}"
+ local pid="$(cat $workdir/${rid}.pid)"
+ local msg="$rid @ $ip, $port "
+ _remote_status "$workdir" "$rworkdir" "$ip" "$pid" && echo "$msg: running" || echo "$msg: dead" &
+ done
+ wait
+}
+
+function finished_all {
+ local workdir="$1"
+ get_node_info "$workdir/peer_list.txt"
+ get_server_map "$workdir/server_map.txt"
+ for rid in "${!nodes[@]}"; do
+ local ip="$(get_ip_by_id $rid)"
+ ip="${server_map[$ip]:-$ip}"
+ local port="$(get_peer_port_by_id $rid)"
+ local rworkdir="$remote_base/$workdir/${rid}"
+ if [[ "$rid" =~ $fin_chk_skip_pat ]]; then
+ continue
+ fi
+ printf "$rid @ $ip, $port "
+ _remote_finished "$workdir" "$rworkdir" "$ip" && echo "finished" || echo "in-progress"
+ done
+}
+
+function wait_all {
+ local workdir="$1"
+ get_node_info "$workdir/peer_list.txt"
+ get_server_map "$workdir/server_map.txt"
+ while true; do
+ finished=1
+ printf "checking the nodes..."
+ for rid in "${!nodes[@]}"; do
+ local ip="$(get_ip_by_id $rid)"
+ ip="${server_map[$ip]:-$ip}"
+ local port="$(get_peer_port_by_id $rid)"
+ local rworkdir="$remote_base/$workdir/${rid}"
+ if [[ "$rid" =~ $fin_chk_skip_pat ]]; then
+ continue
+ fi
+ if ! _remote_finished "$workdir" "$rworkdir" "$ip"; then
+ finished=0
+ break
+ fi
+ done
+ if [[ $finished == 1 ]]; then
+ break
+ fi
+ echo "not finished yet, wait for $fin_chk_period secs"
+ sleep "$fin_chk_period"
+ done
+ echo "finished"
+}
+
+function check_all {
+ status_all "$1" | grep dead -q
+ [[ "$?" -eq 0 ]] && die "some nodes are dead"
+ echo "ok"
+}
+
+function print_help {
+echo "Usage: $0 [--bin] [--path] [--conf] [--conf-src] [--peer-list] [--server-map] [--user] [--force-peer-list] [--help] COMMAND WORKDIR
+
+ --help show this help and exit
+ --bin name of binary executable
+ --path path to the binary
+ --conf shared configuration filename
+ --conf-src shared configuration source file
+ --peer-list FILE read peer list from FILE (default: $peer_list)
+ --server-map FILE read server map from FILE (default: $server_map)
+ --user USER the username to login the remote machines
+ --force-peer-list force the use of FILE specified by --peer-list
+ instead of the peer list in WORKDIR"
+ exit 0
+}
+
+function check_argnum {
+ argnum=$(($# - 1))
+ [[ "$1" -eq "$argnum" ]] || die "incorrect argnum: got $argnum, $1 expected"
+}
+
+getopt --test > /dev/null
+[[ $? -ne 4 ]] && die "getopt unsupported"
+
+SHORT=
+LONG='\
+bin:,path:,conf:,conf-src:,\
+peer-list:,\
+server-map:,\
+remote-base:,\
+remote-user:,\
+copy-to-remote-pat:,\
+copy-from-remote-pat:,\
+exe-remote-pat:,\
+run-remote-pat:,\
+reset-remote-pat:,\
+fin-keyword:,\
+fin-chk-period:,\
+fin-chk-skip-pat:,\
+force-peer-list,\
+help'
+
+PARSED=$(getopt --options "$SHORT" --longoptions "$LONG" --name "$0" -- "$@")
+[[ $? -ne 0 ]] && exit 1
+eval set -- "$PARSED"
+
+while true; do
+ case "$1" in
+ --bin) proj_server_bin="$2"; shift 2;;
+ --path) proj_server_path="$2"; shift 2;;
+ --conf) proj_conf_name="$2"; shift 2;;
+ --conf-src) conf_src="$2"; shift 2;;
+ --peer-list) peer_list="$2"; shift 2;;
+ --server-map) server_map="$2"; shift 2;;
+ --remote-base) remote_base="$2"; shift 2;;
+ --remote-user) remote_user="$2"; shift 2;;
+ --copy-to-remote-pat) copy_to_remote_pat="$2"; shift 2;;
+ --copy-from-remote-pat) copy_from_remote_pat="$2"; shift 2;;
+ --exe-remote-pat) exe_remote_pat="$2"; shift 2;;
+ --run-remote-pat) run_remote_pat="$2"; shift 2;;
+ --reset-remote-pat) reset_remote_pat="$2"; shift 2;;
+ --fin-keyword) fin_keyword="$2"; shift 2;;
+ --fin-chk-period) fin_chk_period="$2"; shift 2;;
+ --fin-chk-skip-pat) fin_chk_skip_pat="$2"; shift 2;;
+ --force-peer-list) force_peer_list=1; shift 1;;
+ --help) print_help; shift 1;;
+ --) shift; break;;
+ *) die "internal error";;
+ esac
+done
+cmd="$1"
+shift 1
+case "$cmd" in
+ start) check_argnum 1 "$@" && start_all "$1" ;;
+ stop) check_argnum 1 "$@" && stop_all "$1" ;;
+ status) check_argnum 1 "$@" && status_all "$1" ;;
+ check) check_argnum 1 "$@" && check_all "$1" ;;
+ finished) check_argnum 1 "$@" && finished_all "$1" ;;
+ fetch) check_argnum 1 "$@" && fetch_all "$1" ;;
+ wait) check_argnum 1 "$@" && wait_all "$1" ;;
+ reset) check_argnum 1 "$@" && reset_all "$1" ;;
+ exec) check_argnum 2 "$@" && exec_all "$1" "$2" ;;
+ *) print_help ;;
+esac
diff --git a/scripts/run_client.sh b/scripts/run_client.sh
index 93a9148..090dce1 100755
--- a/scripts/run_client.sh
+++ b/scripts/run_client.sh
@@ -1,2 +1,354 @@
#!/bin/bash
-./hotstuff-client --idx 0 --iter -1 --max-async 3
+
+proj_client_bin="hotstuff-client"
+proj_client_path="/home/ted/hot-stuff/$proj_client_bin"
+proj_conf_name="hotstuff.conf"
+
+peer_list="./nodes.txt" # the list of nodes
+client_list="./clients.txt" # the list of clients
+conf_src="./hotstuff.gen.conf"
+template_dir="template" # the dir that keeps the content shared among all nodes
+remote_base="/home/ted/testbed" # remote dir used to keep files for the experiment
+#remote_base="/tmp/" # remote dir used to keep files for the experiment
+remote_log="log" # log filename
+remote_user="ted"
+copy_to_remote_pat="rsync -avz <local_path> <remote_user>@<remote_ip>:<remote_path>"
+copy_from_remote_pat="rsync -avz <remote_user>@<remote_ip>:<remote_path> <local_path>"
+exe_remote_pat="ssh <remote_user>@<remote_ip> bash"
+run_remote_pat="cd \"<rworkdir>\"; '$proj_client_path' --idx \"<node_id>\" --iter -1 --max-async 3"
+reset_remote_pat="pgrep -f '$proj_client_bin' | xargs kill -9"
+
+function join { local IFS="$1"; shift; echo "$*"; }
+function split {
+ local IFS="$1"
+ local arr=($2)
+ echo "${arr[@]}"
+}
+
+function die { echo "$1"; exit 1; }
+
+declare -A nodes
+nodes_cnt=0
+function get_node_info {
+ pl="$1"
+ if [[ "$force_peer_list" == 1 ]]; then
+ pl="$peer_list"
+ fi
+ OIFS="$IFS"
+ IFS=$'\n'
+ node_list=($(cat "$pl"))
+ IFS="$OIFS"
+ for tuple in "${node_list[@]}"; do
+ tup0=($(split $'\t' "$tuple"))
+ tup=($(split : "${tup0[0]}"))
+ nodes[${tup[0]}]="${tup[1]}:${tup[2]}"
+ echo "${tup[0]} => ${nodes[${tup[0]}]}"
+ let nodes_cnt++
+ done
+}
+
+function get_client_info {
+ cip_list=($(cat "$1"))
+}
+
+
+function get_addr {
+ tup=($(split ';' $1))
+ echo "${tup[0]}"
+}
+
+function get_ip {
+ tup=($(split : $1))
+ echo "${tup[0]}"
+}
+
+function get_peer_port {
+ tup=($(split : $1))
+ tup2=($(split ';' ${tup[1]}))
+ echo "${tup2[0]}"
+}
+
+
+function get_client_port {
+ tup=($(split : $1))
+ tup2=($(split ';' ${tup[1]}))
+ echo "${tup2[1]}"
+}
+
+
+function get_ip_by_id {
+ get_ip "${nodes[$1]}"
+}
+
+function get_peer_port_by_id {
+ get_peer_port "${nodes[$1]}"
+}
+
+
+function get_client_port_by_id {
+ get_client_port "${nodes[$1]}"
+}
+
+function copy_file {
+ local pat="$1"
+ local cmd="${pat//<local_path>/$2}"
+ cmd="${cmd//<remote_ip>/$3}"
+ cmd="${cmd//<remote_user>/$remote_user}"
+ cmd="${cmd//<remote_path>/$4}"
+ echo $cmd
+ eval "$cmd"
+} >> log 2>&1
+
+function execute_remote_cmd_pid {
+ local node_ip="$1"
+ local c="$2"
+ local l="$3"
+ local cmd="${exe_remote_pat//<remote_ip>/$node_ip}"
+ cmd="${cmd//<remote_user>/$remote_user}"
+ eval $cmd << EOF
+$c > $l 2>&1 & echo \$!
+EOF
+}
+
+
+
+function execute_remote_cmd_stat {
+ local node_ip="$1"
+ local c="$2"
+ local l="$3"
+ local cmd="${exe_remote_pat//<remote_ip>/$node_ip}"
+ cmd="${cmd//<remote_user>/$remote_user}"
+ eval $cmd << EOF
+$c > $l 2>&1 ; echo \$?
+EOF
+}
+
+
+function _remote_load {
+ local workdir="$1"
+ local rworkdir="$2"
+ local node_ip="$3"
+ local tmpldir="$workdir/$template_dir/"
+ [[ $(execute_remote_cmd_stat "$node_ip" \
+ "mkdir -p \"$rworkdir\"" \
+ /dev/null) == 0 ]] || die "failed to create directory $rworkdir"
+ copy_file "$copy_to_remote_pat" "$tmpldir" "$node_ip" "$rworkdir"
+}
+
+function _remote_start {
+ local workdir="$1"
+ local rworkdir="$2"
+ local node_id="$3"
+ local node_ip="$4"
+ local client_port="$5"
+ local client_ip="$6"
+ local cmd="${run_remote_pat//<rworkdir>/$rworkdir}"
+ cmd="${cmd//<node_id>/$node_id}"
+ cmd="${cmd//<server>/$node_ip:$client_port}"
+ execute_remote_cmd_pid "$client_ip" "$cmd" \
+ "\"$rworkdir/$remote_log\"" > "$workdir/${node_id}.pid"
+}
+
+function _remote_exec {
+ local workdir="$1"
+ local rworkdir="$2"
+ local node_ip="$3"
+ local cmd="$4"
+ [[ $(execute_remote_cmd_stat "$node_ip" "$cmd" /dev/null) == 0 ]]
+}
+
+function _remote_stop {
+ local node_pid="$4"
+ _remote_exec "$1" "$2" "$3" "kill $node_pid"
+}
+
+function _remote_status {
+ local node_pid="$4"
+ _remote_exec "$1" "$2" "$3" "kill -0 $node_pid"
+}
+
+function _remote_fetch {
+ local workdir="$1"
+ local rworkdir="$2"
+ local node_id="$3"
+ local node_ip="$4"
+ copy_file "$copy_from_remote_pat" "$workdir/${node_id}.log" "$node_ip" "$rworkdir/$remote_log"
+}
+
+function start_all {
+ local workdir="$1"
+ local tmpldir="$workdir/$template_dir/"
+ mkdir "$workdir" > /dev/null 2>&1 || die "workdir already exists"
+ rm -rf "$tmpldir"
+ mkdir "$tmpldir"
+ cp "$peer_list" "$workdir/peer_list.txt"
+ cp "$client_list" "$workdir/client_list.txt"
+ get_node_info "$workdir/peer_list.txt"
+ get_client_info "$workdir/client_list.txt"
+ echo "coyping configuration file"
+ cp "$conf_src" "$tmpldir/$proj_conf_name"
+ local nclient="${#cip_list[@]}"
+ local i=0
+ for tuple in "${node_list[@]}"; do
+ local cip="${cip_list[$i]}"
+ local tup=($(split : "$tuple"))
+ local rid="${tup[0]}"
+ local ip="$(get_ip_by_id $rid)"
+ local pport="$(get_peer_port_by_id $rid)"
+ local cport="$(get_client_port_by_id $rid)"
+ local rworkdir="$remote_base/$workdir/${i}"
+ (
+ echo "Starting a client @ $cip, connecting to server #$rid @ $ip:$cport"
+ _remote_load "$workdir" "$rworkdir" "$cip"
+ _remote_start "$workdir" "$rworkdir" "$i" "$ip" "$cport" "$cip"
+ echo "client #$i started"
+ ) &
+ let i++
+ if [[ "$i" -eq "$nclient" ]]; then
+ break
+ fi
+ done
+ wait
+}
+
+function fetch_all {
+ local workdir="$1"
+ get_client_info "$workdir/client_list.txt"
+ local i=0
+ for cip in "${cip_list[@]}"; do
+ local rworkdir="$remote_base/$workdir/${i}"
+ local pid="$(cat $workdir/${i}.pid)"
+ local msg="Fetching $i @ $cip"
+ _remote_fetch "$workdir" "$rworkdir" "$i" "$cip" && echo "$msg: copied" || echo "$msg: failed" &
+ let i++
+ done
+ wait
+}
+
+function exec_all {
+ local workdir="$1"
+ local cmd="$2"
+ get_client_info "$workdir/client_list.txt"
+ local i=0
+ for cip in "${cip_list[@]}"; do
+ local rworkdir="$remote_base/$workdir/${i}"
+ local msg="Executing $i @ $cip"
+ _remote_exec "$workdir" "$rworkdir" "$cip" "$cmd" && echo "$msg: succeeded" || echo "$msg: failed" &
+ let i++
+ done
+ wait
+}
+
+function reset_all {
+ exec_all "$1" "$reset_remote_pat"
+}
+
+function stop_all {
+ local workdir="$1"
+ get_client_info "$workdir/client_list.txt"
+ local i=0
+ for cip in "${cip_list[@]}"; do
+ local rworkdir="$remote_base/$workdir/${i}"
+ local pid="$(cat $workdir/${i}.pid)"
+ local msg="Killing $i @ $cip"
+ _remote_stop "$workdir" "$rworkdir" "$cip" "$pid" && echo "$msg: stopped" || echo "$msg: failed" &
+ let i++
+ done
+ wait
+}
+
+function status_all {
+ local workdir="$1"
+ get_client_info "$workdir/client_list.txt"
+ local i=0
+ for cip in "${cip_list[@]}"; do
+ local rworkdir="$remote_base/$workdir/${i}"
+ local pid="$(cat $workdir/${i}.pid)"
+ local msg="$i @ $cip"
+ _remote_status "$workdir" "$rworkdir" "$cip" "$pid" && echo "$msg: running" || echo "$msg: dead" &
+ let i++
+ done
+ wait
+}
+
+function check_all {
+ status_all "$1" | grep dead -q
+ [[ "$?" -eq 0 ]] && die "some nodes are dead"
+ echo "ok"
+}
+
+function print_help {
+echo "Usage: $0 [--bin] [--path] [--conf] [--conf-src] [--peer-list] [--client-list] [--user] [--force-peer-list] [--help] COMMAND WORKDIR
+
+ --help show this help and exit
+ --bin name of binary executable
+ --path path to the binary
+ --conf shared configuration filename
+ --conf-src shared configuration source file
+ --peer-list FILE read peer list from FILE (default: $peer_list)
+ --client-list FILE read client list from FILE (default: $client_list)
+ --user USER the username to login the remote machines
+ --force-peer-list force the use of FILE specified by --peer-list
+ instead of the peer list in WORKDIR"
+ exit 0
+}
+
+function check_argnum {
+ argnum=$(($# - 1))
+ [[ "$1" -eq "$argnum" ]] || die "incorrect argnum: got $argnum, $1 expected"
+}
+
+getopt --test > /dev/null
+[[ $? -ne 4 ]] && die "getopt unsupported"
+
+SHORT=
+LONG='\
+bin:,path:,conf:,conf-src:,\
+peer-list:,\
+client-list:,\
+remote-base:,\
+remote-user:,\
+copy-to-remote-pat:,\
+copy-from-remote-pat:,\
+exe-remote-pat:,\
+run-remote-pat:,\
+reset-remote-pat:,\
+force-peer-list,\
+help'
+
+PARSED=$(getopt --options "$SHORT" --longoptions "$LONG" --name "$0" -- "$@")
+[[ $? -ne 0 ]] && exit 1
+eval set -- "$PARSED"
+
+while true; do
+ case "$1" in
+ --bin) proj_client_bin="$2"; shift 2;;
+ --path) proj_client_path="$2"; shift 2;;
+ --conf) proj_conf_name="$2"; shift 2;;
+ --conf-src) conf_src="$2"; shift 2;;
+ --peer-list) peer_list="$2"; shift 2;;
+ --client-list) client_list="$2"; shift 2;;
+ --remote-base) remote_base="$2"; shift 2;;
+ --remote-user) remote_user="$2"; shift 2;;
+ --copy-to-remote-pat) copy_to_remote_pat="$2"; shift 2;;
+ --copy-from-remote-pat) copy_from_remote_pat="$2"; shift 2;;
+ --exe-remote-pat) exe_remote_pat="$2"; shift 2;;
+ --run-remote-pat) run_remote_pat="$2"; shift 2;;
+ --reset-remote-pat) reset_remote_pat="$2"; shift 2;;
+ --help) print_help; shift 1;;
+ --) shift; break;;
+ *) die "internal error";;
+ esac
+done
+cmd="$1"
+shift 1
+case "$cmd" in
+ start) check_argnum 1 "$@" && start_all "$1" ;;
+ stop) check_argnum 1 "$@" && stop_all "$1" ;;
+ status) check_argnum 1 "$@" && status_all "$1" ;;
+ check) check_argnum 1 "$@" && check_all "$1" ;;
+ fetch) check_argnum 1 "$@" && fetch_all "$1" ;;
+ reset) check_argnum 1 "$@" && reset_all "$1" ;;
+ exec) check_argnum 2 "$@" && exec_all "$1" "$2" ;;
+ *) print_help ;;
+esac
diff --git a/scripts/run_replicas.sh b/scripts/run_demo.sh
index 5f54787..5f54787 100755
--- a/scripts/run_replicas.sh
+++ b/scripts/run_demo.sh
diff --git a/scripts/run_demo_client.sh b/scripts/run_demo_client.sh
new file mode 100755
index 0000000..93a9148
--- /dev/null
+++ b/scripts/run_demo_client.sh
@@ -0,0 +1,2 @@
+#!/bin/bash
+./hotstuff-client --idx 0 --iter -1 --max-async 3
diff --git a/scripts/thr_hist.py b/scripts/thr_hist.py
index 6f6a43f..c5f2a72 100644
--- a/scripts/thr_hist.py
+++ b/scripts/thr_hist.py
@@ -24,17 +24,19 @@ if __name__ == '__main__':
parser.add_argument('--interval', type=float, default=1, required=False)
parser.add_argument('--output', type=str, default="hist.png", required=False)
args = parser.parse_args()
- commit_pat = re.compile('([^[].*) \[hotstuff info\].*got <fin decision=1')
+ commit_pat = re.compile('([^[].*) \[hotstuff info\] ([0-9.]*) [0-9.]*$')
interval = args.interval
begin_time = None
next_begin_time = None
cnt = 0
+ lat = 0
timestamps = []
values = []
for line in sys.stdin:
m = commit_pat.match(line)
if m:
timestamps.append(str2datetime(m.group(1)))
+ lat += float(m.group(2))
timestamps.sort()
for timestamp in timestamps:
if begin_time and timestamp < next_begin_time:
@@ -47,4 +49,5 @@ if __name__ == '__main__':
cnt = 1
values.append(cnt)
print(values)
+ print("lat = {:.3f}ms".format(lat / len(timestamps) * 1e3))
plot_thr(args.output)
diff --git a/src/client.cpp b/src/client.cpp
index 562fab5..7827b7c 100644
--- a/src/client.cpp
+++ b/src/client.cpp
@@ -2,8 +2,6 @@
namespace hotstuff {
-uint64_t CommandDummy::cnt = 0;
-
const opcode_t MsgReqCmd::opcode;
MsgReqCmd::MsgReqCmd(const Command &cmd) { serialized << cmd; }
void MsgReqCmd::postponed_parse(HotStuffCore *hsc) {
diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp
index e1e2f81..e235bd8 100644
--- a/src/hotstuff.cpp
+++ b/src/hotstuff.cpp
@@ -25,7 +25,7 @@ void MsgVote::postponed_parse(HotStuffCore *hsc) {
const opcode_t MsgReqBlock::opcode;
MsgReqBlock::MsgReqBlock(const std::vector<uint256_t> &blk_hashes) {
- serialized << (uint32_t)htole(blk_hashes.size());
+ serialized << htole((uint32_t)blk_hashes.size());
for (const auto &h: blk_hashes)
serialized << h;
}
diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp
index ead4e0b..768e81e 100644
--- a/src/hotstuff_app.cpp
+++ b/src/hotstuff_app.cpp
@@ -48,25 +48,22 @@ using hotstuff::promise_t;
using HotStuff = hotstuff::HotStuffSecp256k1;
-#define LOG_INFO HOTSTUFF_LOG_INFO
-#define LOG_DEBUG HOTSTUFF_LOG_DEBUG
-#define LOG_WARN HOTSTUFF_LOG_WARN
-#define LOG_ERROR HOTSTUFF_LOG_ERROR
-
class HotStuffApp: public HotStuff {
double stat_period;
+ double impeach_timeout;
EventContext ec;
/** Network messaging between a replica and its client. */
ClientNetwork<opcode_t> cn;
/** Timer object to schedule a periodic printing of system statistics */
Event ev_stat_timer;
+ /** Timer object to monitor the progress for simple impeachment */
+ Event impeach_timer;
/** The listen address for client RPC */
NetAddr clisten_addr;
using Conn = ClientNetwork<opcode_t>::Conn;
void client_request_cmd_handler(MsgReqCmd &&, Conn &);
- void print_stat_cb(evutil_socket_t, short);
command_t parse_cmd(DataStream &s) override {
auto cmd = new CommandDummy();
@@ -74,15 +71,22 @@ class HotStuffApp: public HotStuff {
return cmd;
}
+ void reset_imp_timer() {
+ impeach_timer.del();
+ impeach_timer.add_with_timeout(impeach_timeout);
+ }
+
void state_machine_execute(const Finality &fin) override {
+ reset_imp_timer();
#ifndef HOTSTUFF_ENABLE_BENCHMARK
- LOG_INFO("replicated %s", std::string(fin).c_str());
+ HOTSTUFF_LOG_INFO("replicated %s", std::string(fin).c_str());
#endif
}
public:
HotStuffApp(uint32_t blk_size,
double stat_period,
+ double impeach_timeout,
ReplicaID idx,
const bytearray_t &raw_privkey,
NetAddr plisten_addr,
@@ -124,15 +128,21 @@ int main(int argc, char **argv) {
auto opt_privkey = Config::OptValStr::create();
auto opt_help = Config::OptValFlag::create(false);
auto opt_pace_maker = Config::OptValStr::create("dummy");
+ auto opt_fixed_proposer = Config::OptValInt::create(1);
+ auto opt_qc_timeout = Config::OptValDouble::create(0.5);
+ auto opt_imp_timeout = Config::OptValDouble::create(11);
config.add_opt("block-size", opt_blk_size, Config::SET_VAL);
config.add_opt("parent-limit", opt_parent_limit, Config::SET_VAL);
config.add_opt("stat-period", opt_stat_period, Config::SET_VAL);
- config.add_opt("replica", opt_replicas, Config::APPEND);
- config.add_opt("idx", opt_idx, Config::SET_VAL);
- config.add_opt("cport", opt_client_port, Config::SET_VAL);
+ config.add_opt("replica", opt_replicas, Config::APPEND, 'a', "add an replica to the list");
+ config.add_opt("idx", opt_idx, Config::SET_VAL, 'i', "specify the index in the replica list");
+ config.add_opt("cport", opt_client_port, Config::SET_VAL, 'c', "specify the port listening for clients");
config.add_opt("privkey", opt_privkey, Config::SET_VAL);
config.add_opt("pace-maker", opt_pace_maker, Config::SET_VAL, 'p', "specify pace maker (sticky, dummy)");
+ config.add_opt("proposer", opt_fixed_proposer, Config::SET_VAL, 'l', "set the fixed proposer (for dummy)");
+ config.add_opt("qc-timeout", opt_qc_timeout, Config::SET_VAL, 't', "set QC timeout (for sticky)");
+ config.add_opt("imp-timeout", opt_imp_timeout, Config::SET_VAL, 'u', "set impeachment timeout (for sticky)");
config.add_opt("help", opt_help, Config::SWITCH_ON, 'h', "show this help info");
EventContext ec;
@@ -175,12 +185,15 @@ int main(int argc, char **argv) {
auto parent_limit = opt_parent_limit->get();
hotstuff::pacemaker_bt pmaker;
if (opt_pace_maker->get() == "sticky")
- pmaker = new hotstuff::PaceMakerSticky(parent_limit, 0.5, ec);
+ pmaker = new hotstuff::PaceMakerSticky(parent_limit, opt_qc_timeout->get(), ec);
+ else if (opt_pace_maker->get() == "rr")
+ pmaker = new hotstuff::PaceMakerRR(parent_limit, opt_qc_timeout->get(), ec);
else
- pmaker = new hotstuff::PaceMakerDummyFixed(1, parent_limit);
+ pmaker = new hotstuff::PaceMakerDummyFixed(opt_fixed_proposer->get(), parent_limit);
papp = new HotStuffApp(opt_blk_size->get(),
opt_stat_period->get(),
+ opt_imp_timeout->get(),
idx,
hotstuff::from_hex(opt_privkey->get()),
plisten_addr,
@@ -205,6 +218,7 @@ int main(int argc, char **argv) {
HotStuffApp::HotStuffApp(uint32_t blk_size,
double stat_period,
+ double impeach_timeout,
ReplicaID idx,
const bytearray_t &raw_privkey,
NetAddr plisten_addr,
@@ -214,6 +228,7 @@ HotStuffApp::HotStuffApp(uint32_t blk_size,
HotStuff(blk_size, idx, raw_privkey,
plisten_addr, std::move(pmaker), ec),
stat_period(stat_period),
+ impeach_timeout(impeach_timeout),
ec(ec),
cn(ec),
clisten_addr(clisten_addr) {
@@ -227,28 +242,29 @@ void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, Conn &conn) {
msg.postponed_parse(this);
auto cmd = msg.cmd;
std::vector<promise_t> pms;
- LOG_DEBUG("processing %s", std::string(*cmd).c_str());
+ HOTSTUFF_LOG_DEBUG("processing %s", std::string(*cmd).c_str());
exec_command(cmd).then([this, addr](Finality fin) {
cn.send_msg(MsgRespCmd(fin), addr);
});
}
void HotStuffApp::start() {
- ev_stat_timer = Event(ec, -1, 0,
- std::bind(&HotStuffApp::print_stat_cb, this, _1, _2));
+ ev_stat_timer = Event(ec, -1, 0, [this](int, short) {
+ HotStuff::print_stat();
+ //HotStuffCore::prune(100);
+ ev_stat_timer.add_with_timeout(stat_period);
+ });
ev_stat_timer.add_with_timeout(stat_period);
- LOG_INFO("** starting the system with parameters **");
- LOG_INFO("blk_size = %lu", blk_size);
- LOG_INFO("conns = %lu", HotStuff::size());
- LOG_INFO("** starting the event loop...");
+ impeach_timer = Event(ec, -1, 0, [this](int, short) {
+ get_pace_maker().impeach();
+ reset_imp_timer();
+ });
+ impeach_timer.add_with_timeout(impeach_timeout);
+ HOTSTUFF_LOG_INFO("** starting the system with parameters **");
+ HOTSTUFF_LOG_INFO("blk_size = %lu", blk_size);
+ HOTSTUFF_LOG_INFO("conns = %lu", HotStuff::size());
+ HOTSTUFF_LOG_INFO("** starting the event loop...");
HotStuff::start();
/* enter the event main loop */
ec.dispatch();
}
-
-
-void HotStuffApp::print_stat_cb(evutil_socket_t, short) {
- HotStuff::print_stat();
- //HotStuffCore::prune(100);
- ev_stat_timer.add_with_timeout(stat_period);
-}
diff --git a/src/hotstuff_client.cpp b/src/hotstuff_client.cpp
index bee8abd..62b13ed 100644
--- a/src/hotstuff_client.cpp
+++ b/src/hotstuff_client.cpp
@@ -1,4 +1,5 @@
#include <cassert>
+#include <random>
#include "salticidae/type.h"
#include "salticidae/netaddr.h"
#include "salticidae/network.h"
@@ -9,32 +10,31 @@
#include "hotstuff/client.h"
using salticidae::Config;
-using salticidae::ElapsedTime;
-using salticidae::trim_all;
-using salticidae::split;
using salticidae::MsgNetwork;
using hotstuff::ReplicaID;
using hotstuff::NetAddr;
using hotstuff::EventContext;
-using hotstuff::uint256_t;
using hotstuff::MsgReqCmd;
using hotstuff::MsgRespCmd;
using hotstuff::CommandDummy;
-using hotstuff::command_t;
using hotstuff::Finality;
using hotstuff::HotStuffError;
+using hotstuff::uint256_t;
using hotstuff::opcode_t;
+using hotstuff::command_t;
EventContext eb;
ReplicaID proposer;
size_t max_async_num;
int max_iter_num;
+uint32_t cid;
+uint32_t cnt = 0;
struct Request {
ReplicaID rid;
command_t cmd;
- ElapsedTime et;
+ salticidae::ElapsedTime et;
Request(ReplicaID rid, const command_t &cmd):
rid(rid), cmd(cmd) { et.start(); }
};
@@ -54,10 +54,12 @@ void set_proposer(ReplicaID rid) {
void try_send() {
while (waiting.size() < max_async_num && max_iter_num)
{
- auto cmd = CommandDummy::make_cmd();
+ auto cmd = new CommandDummy(cid, cnt++);
mn.send_msg(MsgReqCmd(*cmd), *conns.at(proposer));
+#ifndef HOTSTUFF_ENABLE_BENCHMARK
HOTSTUFF_LOG_INFO("send new cmd %.10s",
get_hex(cmd->get_hash()).c_str());
+#endif
waiting.insert(std::make_pair(
cmd->get_hash(), Request(proposer, cmd)));
if (max_iter_num > 0)
@@ -70,6 +72,9 @@ void client_resp_cmd_handler(MsgRespCmd &&msg, MsgNetwork<opcode_t>::Conn &) {
HOTSTUFF_LOG_DEBUG("got %s", std::string(msg.fin).c_str());
const uint256_t &cmd_hash = fin.cmd_hash;
auto it = waiting.find(cmd_hash);
+ auto &et = it->second.et;
+ if (it == waiting.end()) return;
+ et.stop();
if (fin.rid != proposer)
{
HOTSTUFF_LOG_INFO("reconnect to the new proposer");
@@ -79,20 +84,27 @@ void client_resp_cmd_handler(MsgRespCmd &&msg, MsgNetwork<opcode_t>::Conn &) {
{
mn.send_msg(MsgReqCmd(*(waiting.find(cmd_hash)->second.cmd)),
*conns.at(proposer));
+#ifndef HOTSTUFF_ENABLE_BENCHMARK
HOTSTUFF_LOG_INFO("resend cmd %.10s",
get_hex(cmd_hash).c_str());
- it->second.et.start();
+#endif
+ et.start();
it->second.rid = proposer;
return;
}
- HOTSTUFF_LOG_INFO("got %s", std::string(fin).c_str());
- if (it == waiting.end()) return;
+#ifndef HOTSTUFF_ENABLE_BENCHMARK
+ HOTSTUFF_LOG_INFO("got %s, wall: %.3f, cpu: %.3f",
+ std::string(fin).c_str(),
+ et.elapsed_sec, et.cpu_elapsed_sec);
+#else
+ HOTSTUFF_LOG_INFO("%.6f %.6f", et.elapsed_sec, et.cpu_elapsed_sec);
+#endif
waiting.erase(it);
try_send();
}
std::pair<std::string, std::string> split_ip_port_cport(const std::string &s) {
- auto ret = trim_all(split(s, ";"));
+ auto ret = salticidae::trim_all(salticidae::split(s, ";"));
return std::make_pair(ret[0], ret[1]);
}
@@ -102,11 +114,13 @@ int main(int argc, char **argv) {
auto opt_replicas = Config::OptValStrVec::create();
auto opt_max_iter_num = Config::OptValInt::create(100);
auto opt_max_async_num = Config::OptValInt::create(10);
+ auto opt_cid = Config::OptValInt::create(-1);
mn.reg_handler(client_resp_cmd_handler);
try {
config.add_opt("idx", opt_idx, Config::SET_VAL);
+ config.add_opt("cid", opt_cid, Config::SET_VAL);
config.add_opt("replica", opt_replicas, Config::APPEND);
config.add_opt("iter", opt_max_iter_num, Config::SET_VAL);
config.add_opt("max-async", opt_max_async_num, Config::SET_VAL);
@@ -117,7 +131,7 @@ int main(int argc, char **argv) {
std::vector<std::pair<std::string, std::string>> raw;
for (const auto &s: opt_replicas->get())
{
- auto res = trim_all(split(s, ","));
+ auto res = salticidae::trim_all(salticidae::split(s, ","));
if (res.size() != 2)
throw HotStuffError("format error");
raw.push_back(std::make_pair(res[0], res[1]));
@@ -125,6 +139,7 @@ int main(int argc, char **argv) {
if (!(0 <= idx && (size_t)idx < raw.size() && raw.size() > 0))
throw std::invalid_argument("out of range");
+ cid = opt_cid->get() != -1 ? opt_cid->get() : idx;
for (const auto &p: raw)
{
auto _p = split_ip_port_cport(p.first);