aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2018-09-10 20:49:34 -0400
committerDeterminant <[email protected]>2018-09-10 20:49:34 -0400
commitc4d3e5fe66568ccd0732edf7cf80d37959d6abda (patch)
treed21fffc5209fad03bc5ac81f4fee3d3e887dac22
parent95b193bb3b0bba62f168c7e2c3f45c54d4849e27 (diff)
let client send cmd data, the replicas should only work on the hash
-rw-r--r--include/hotstuff/client.h27
-rw-r--r--include/hotstuff/consensus.h4
-rw-r--r--include/hotstuff/entity.h22
-rw-r--r--include/hotstuff/hotstuff.h5
-rw-r--r--include/hotstuff/liveness.h4
-rw-r--r--src/client.cpp7
-rw-r--r--src/consensus.cpp6
-rw-r--r--src/entity.cpp6
-rw-r--r--src/hotstuff.cpp41
-rw-r--r--src/hotstuff_app.cpp33
-rw-r--r--src/hotstuff_client.cpp57
11 files changed, 101 insertions, 111 deletions
diff --git a/include/hotstuff/client.h b/include/hotstuff/client.h
index d61d9e7..ca18062 100644
--- a/include/hotstuff/client.h
+++ b/include/hotstuff/client.h
@@ -12,25 +12,32 @@ struct MsgReqCmd {
static const opcode_t opcode = 0x4;
DataStream serialized;
command_t cmd;
- MsgReqCmd(const Command &cmd);
+ MsgReqCmd(const Command &cmd) { serialized << cmd; }
MsgReqCmd(DataStream &&s): serialized(std::move(s)) {}
- void postponed_parse(HotStuffCore *hsc);
};
struct MsgRespCmd {
static const opcode_t opcode = 0x5;
DataStream serialized;
+#if HOTSTUFF_CMD_RESPSIZE > 0
+ uint8_t payload[HOTSTUFF_CMD_RESPSIZE];
+#endif
Finality fin;
- MsgRespCmd(const Finality &fin);
- MsgRespCmd(DataStream &&s);
+ MsgRespCmd(const Finality &fin) {
+ serialized << fin;
+#if HOTSTUFF_CMD_RESPSIZE > 0
+ serialized.put_data(payload, payload + HOTSTUFF_CMD_RESPSIZE);
+#endif
+ }
+ MsgRespCmd(DataStream &&s) { s >> fin; }
};
class CommandDummy: public Command {
uint32_t cid;
uint32_t n;
uint256_t hash;
-#if HOTSTUFF_CMD_DMSIZE > 0
- uint8_t payload[HOTSTUFF_CMD_DMSIZE];
+#if HOTSTUFF_CMD_REQSIZE > 0
+ uint8_t payload[HOTSTUFF_CMD_REQSIZE];
#endif
uint256_t compute_hash() {
DataStream s;
@@ -47,17 +54,13 @@ class CommandDummy: public Command {
void serialize(DataStream &s) const override {
s << cid << n;
-#if HOTSTUFF_CMD_DMSIZE > 0
- s.put_data(payload, payload + HOTSTUFF_CMD_DMSIZE);
+#if HOTSTUFF_CMD_REQSIZE > 0
+ s.put_data(payload, payload + HOTSTUFF_CMD_REQSIZE);
#endif
}
void unserialize(DataStream &s) override {
s >> cid >> n;
-#if HOTSTUFF_CMD_DMSIZE > 0
- auto base = s.get_data_inplace(HOTSTUFF_CMD_DMSIZE);
- memmove(payload, base, HOTSTUFF_CMD_DMSIZE);
-#endif
hash = compute_hash();
}
diff --git a/include/hotstuff/consensus.h b/include/hotstuff/consensus.h
index 9e2558c..e8f9765 100644
--- a/include/hotstuff/consensus.h
+++ b/include/hotstuff/consensus.h
@@ -84,7 +84,7 @@ class HotStuffCore {
/** Call to submit new commands to be decided (executed). "Parents" must
* contain at least one block, and the first block is the actual parent,
* while the others are uncles/aunts */
- void on_propose(const std::vector<command_t> &cmds,
+ void on_propose(const std::vector<uint256_t> &cmds,
const std::vector<block_t> &parents,
bytearray_t &&extra = bytearray_t());
@@ -118,7 +118,7 @@ class HotStuffCore {
/** Create a quorum certificate from its serialized form. */
virtual quorum_cert_bt parse_quorum_cert(DataStream &s) = 0;
/** Create a command object from its serialized form. */
- virtual command_t parse_cmd(DataStream &s) = 0;
+ //virtual command_t parse_cmd(DataStream &s) = 0;
public:
/** Add a replica to the current configuration. This should only be called
diff --git a/include/hotstuff/entity.h b/include/hotstuff/entity.h
index 6327dfe..f9e7bc8 100644
--- a/include/hotstuff/entity.h
+++ b/include/hotstuff/entity.h
@@ -79,7 +79,6 @@ using block_weak_t = salticidae::WeakObj<Block>;
class Command: public Serializable {
friend HotStuffCore;
- block_weak_t container;
public:
virtual ~Command() = default;
virtual const uint256_t &get_hash() const = 0;
@@ -105,7 +104,7 @@ get_hashes(const std::vector<Hashable> &plist) {
class Block {
friend HotStuffCore;
std::vector<uint256_t> parent_hashes;
- std::vector<command_t> cmds;
+ std::vector<uint256_t> cmds;
quorum_cert_bt qc;
bytearray_t extra;
@@ -135,7 +134,7 @@ class Block {
delivered(delivered), decision(decision) {}
Block(const std::vector<block_t> &parents,
- const std::vector<command_t> &cmds,
+ const std::vector<uint256_t> &cmds,
quorum_cert_bt &&qc,
bytearray_t &&extra,
uint32_t height,
@@ -158,7 +157,7 @@ class Block {
void unserialize(DataStream &s, HotStuffCore *hsc);
- const std::vector<command_t> &get_cmds() const {
+ const std::vector<uint256_t> &get_cmds() const {
return cmds;
}
@@ -174,19 +173,12 @@ class Block {
bool verify(const ReplicaConfig &config) const {
if (qc && !qc->verify(config)) return false;
- for (auto cmd: cmds)
- if (!cmd->verify()) return false;
return true;
}
promise_t verify(const ReplicaConfig &config, VeriPool &vpool) const {
return (qc ? qc->verify(config, vpool) :
- promise_t([](promise_t &pm) { pm.resolve(true); })).then([this](bool result) {
- if (!result) return false;
- for (auto cmd: cmds)
- if (!cmd->verify()) return false;
- return true;
- });
+ promise_t([](promise_t &pm) { pm.resolve(true); }));
}
int8_t get_decision() const { return decision; }
@@ -232,7 +224,7 @@ class EntityStorage {
return blk_cache.count(blk_hash);
}
- block_t add_blk(Block &&_blk, const ReplicaConfig &config) {
+ block_t add_blk(Block &&_blk, const ReplicaConfig &/*config*/) {
//if (!_blk.verify(config))
//{
// HOTSTUFF_LOG_WARN("invalid %s", std::string(_blk).c_str());
@@ -288,8 +280,8 @@ class EntityStorage {
#ifdef HOTSTUFF_PROTO_LOG
HOTSTUFF_LOG_INFO("releasing blk %.10s", get_hex(blk_hash).c_str());
#endif
- for (const auto &cmd: blk->get_cmds())
- try_release_cmd(cmd);
+// for (const auto &cmd: blk->get_cmds())
+// try_release_cmd(cmd);
blk_cache.erase(blk_hash);
return true;
}
diff --git a/include/hotstuff/hotstuff.h b/include/hotstuff/hotstuff.h
index eeffaab..3d1c7b6 100644
--- a/include/hotstuff/hotstuff.h
+++ b/include/hotstuff/hotstuff.h
@@ -135,9 +135,8 @@ class HotStuffBase: public HotStuffCore {
/* queues for async tasks */
std::unordered_map<const uint256_t, BlockFetchContext> blk_fetch_waiting;
std::unordered_map<const uint256_t, BlockDeliveryContext> blk_delivery_waiting;
- std::unordered_map<const uint256_t, CmdFetchContext> cmd_fetch_waiting;
std::unordered_map<const uint256_t, promise_t> decision_waiting;
- std::queue<command_t> cmd_pending;
+ std::queue<uint256_t> cmd_pending;
/* statistics */
uint64_t fetched;
@@ -192,7 +191,7 @@ class HotStuffBase: public HotStuffCore {
/* the API for HotStuffBase */
/* Submit the command to be decided. */
- promise_t exec_command(command_t cmd);
+ promise_t exec_command(uint256_t cmd);
void add_replica(ReplicaID idx, const NetAddr &addr, pubkey_bt &&pub_key);
void start(bool eb_loop = false);
diff --git a/include/hotstuff/liveness.h b/include/hotstuff/liveness.h
index 8c9c9ab..c88a0a1 100644
--- a/include/hotstuff/liveness.h
+++ b/include/hotstuff/liveness.h
@@ -330,7 +330,7 @@ class PMStickyProposer: virtual public PaceMaker {
/* FIXME: should extra data be the voter's id? */
s << hsc->get_id();
/* propose a block for leader election */
- hsc->on_propose(std::vector<command_t>{},
+ hsc->on_propose(std::vector<uint256_t>{},
get_parents(), std::move(s));
}
@@ -580,7 +580,7 @@ class PMRoundRobinProposer: virtual public PaceMaker {
/* FIXME: should extra data be the voter's id? */
s << hsc->get_id();
/* propose a block for leader election */
- hsc->on_propose(std::vector<command_t>{},
+ hsc->on_propose(std::vector<uint256_t>{},
get_parents(), std::move(s));
}
diff --git a/src/client.cpp b/src/client.cpp
index 7827b7c..368f746 100644
--- a/src/client.cpp
+++ b/src/client.cpp
@@ -3,13 +3,6 @@
namespace hotstuff {
const opcode_t MsgReqCmd::opcode;
-MsgReqCmd::MsgReqCmd(const Command &cmd) { serialized << cmd; }
-void MsgReqCmd::postponed_parse(HotStuffCore *hsc) {
- cmd = hsc->parse_cmd(serialized);
-}
-
const opcode_t MsgRespCmd::opcode;
-MsgRespCmd::MsgRespCmd(const Finality &fin) { serialized << fin; }
-MsgRespCmd::MsgRespCmd(DataStream &&s) { s >> fin; }
}
diff --git a/src/consensus.cpp b/src/consensus.cpp
index c80de59..dbe79b0 100644
--- a/src/consensus.cpp
+++ b/src/consensus.cpp
@@ -50,8 +50,6 @@ bool HotStuffCore::on_deliver_blk(const block_t &blk) {
for (const auto &hash: blk->parent_hashes)
blk->parents.push_back(get_delivered_blk(hash));
blk->height = blk->parents[0]->height + 1;
- for (const auto &cmd: blk->cmds)
- cmd->container = blk;
if (blk->qc)
{
@@ -97,7 +95,7 @@ void HotStuffCore::check_commit(const block_t &_blk) {
size_t idx = 0;
for (auto cmd: blk->cmds)
do_decide(Finality(id, 1, idx, blk->height,
- cmd->get_hash(), blk->get_hash()));
+ cmd, blk->get_hash()));
}
bexec = p;
}
@@ -114,7 +112,7 @@ bool HotStuffCore::update(const uint256_t &bqc_hash) {
return true;
}
-void HotStuffCore::on_propose(const std::vector<command_t> &cmds,
+void HotStuffCore::on_propose(const std::vector<uint256_t> &cmds,
const std::vector<block_t> &parents,
bytearray_t &&extra) {
if (parents.empty())
diff --git a/src/entity.cpp b/src/entity.cpp
index 594fdbe..94b896a 100644
--- a/src/entity.cpp
+++ b/src/entity.cpp
@@ -9,7 +9,7 @@ void Block::serialize(DataStream &s) const {
s << hash;
s << htole((uint32_t)cmds.size());
for (auto cmd: cmds)
- s << *cmd;
+ s << cmd;
if (qc)
s << (uint8_t)1 << *qc;
else
@@ -29,7 +29,9 @@ void Block::unserialize(DataStream &s, HotStuffCore *hsc) {
n = letoh(n);
cmds.resize(n);
for (auto &cmd: cmds)
- cmd = hsc->parse_cmd(s);
+ s >> cmd;
+// for (auto &cmd: cmds)
+// cmd = hsc->parse_cmd(s);
s >> flag;
qc = flag ? hsc->parse_quorum_cert(s) : nullptr;
s >> n;
diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp
index 98c2237..99ab80f 100644
--- a/src/hotstuff.cpp
+++ b/src/hotstuff.cpp
@@ -58,8 +58,7 @@ void MsgRespBlock::postponed_parse(HotStuffCore *hsc) {
}
// TODO: improve this function
-promise_t HotStuffBase::exec_command(command_t cmd) {
- const uint256_t &cmd_hash = cmd->get_hash();
+promise_t HotStuffBase::exec_command(uint256_t cmd_hash) {
ReplicaID proposer = pmaker->get_proposer();
if (proposer != get_id())
@@ -71,13 +70,13 @@ promise_t HotStuffBase::exec_command(command_t cmd) {
auto it = decision_waiting.find(cmd_hash);
if (it == decision_waiting.end())
{
- cmd_pending.push(storage->add_cmd(cmd));
+ cmd_pending.push(cmd_hash);
it = decision_waiting.insert(std::make_pair(cmd_hash, promise_t())).first;
}
if (cmd_pending.size() >= blk_size)
{
- std::vector<command_t> cmds;
+ std::vector<uint256_t> cmds;
for (uint32_t i = 0; i < blk_size; i++)
{
cmds.push_back(cmd_pending.front());
@@ -86,9 +85,8 @@ promise_t HotStuffBase::exec_command(command_t cmd) {
pmaker->beat().then([this, cmds = std::move(cmds)](ReplicaID proposer) {
if (proposer != get_id())
{
- for (auto &cmd: cmds)
+ for (auto &cmd_hash: cmds)
{
- const auto &cmd_hash = cmd->get_hash();
auto it = decision_waiting.find(cmd_hash);
if (it != decision_waiting.end())
{
@@ -119,7 +117,7 @@ void HotStuffBase::on_fetch_blk(const block_t &blk) {
LOG_DEBUG("fetched %.10s", get_hex(blk->get_hash()).c_str());
part_fetched++;
fetched++;
- for (auto cmd: blk->get_cmds()) on_fetch_cmd(cmd);
+ //for (auto cmd: blk->get_cmds()) on_fetch_cmd(cmd);
const uint256_t &blk_hash = blk->get_hash();
auto it = blk_fetch_waiting.find(blk_hash);
if (it != blk_fetch_waiting.end())
@@ -129,16 +127,6 @@ void HotStuffBase::on_fetch_blk(const block_t &blk) {
}
}
-void HotStuffBase::on_fetch_cmd(const command_t &cmd) {
- const uint256_t &cmd_hash = cmd->get_hash();
- auto it = cmd_fetch_waiting.find(cmd_hash);
- if (it != cmd_fetch_waiting.end())
- {
- it->second.resolve(cmd);
- cmd_fetch_waiting.erase(it);
- }
-}
-
void HotStuffBase::on_deliver_blk(const block_t &blk) {
const uint256_t &blk_hash = blk->get_hash();
bool valid;
@@ -204,24 +192,6 @@ promise_t HotStuffBase::async_fetch_blk(const uint256_t &blk_hash,
return static_cast<promise_t &>(it->second);
}
-promise_t HotStuffBase::async_fetch_cmd(const uint256_t &cmd_hash,
- const NetAddr *replica_id,
- bool fetch_now) {
- if (storage->is_cmd_fetched(cmd_hash))
- return promise_t([this, &cmd_hash](promise_t pm){
- pm.resolve(storage->find_cmd(cmd_hash));
- });
- auto it = cmd_fetch_waiting.find(cmd_hash);
- if (it == cmd_fetch_waiting.end())
- {
- it = cmd_fetch_waiting.insert(
- std::make_pair(cmd_hash, CmdFetchContext(cmd_hash, this))).first;
- }
- if (replica_id != nullptr)
- it->second.add_replica(*replica_id, fetch_now);
- return static_cast<promise_t &>(it->second);
-}
-
promise_t HotStuffBase::async_deliver_blk(const uint256_t &blk_hash,
const NetAddr &replica_id) {
if (storage->is_blk_delivered(blk_hash))
@@ -313,7 +283,6 @@ void HotStuffBase::print_stat() const {
LOG_INFO("-------- queues -------");
LOG_INFO("blk_fetch_waiting: %lu", blk_fetch_waiting.size());
LOG_INFO("blk_delivery_waiting: %lu", blk_delivery_waiting.size());
- LOG_INFO("cmd_fetch_waiting: %lu", cmd_fetch_waiting.size());
LOG_INFO("decision_waiting: %lu", decision_waiting.size());
LOG_INFO("-------- misc ---------");
LOG_INFO("fetched: %lu", fetched);
diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp
index 87c80e3..014fe16 100644
--- a/src/hotstuff_app.cpp
+++ b/src/hotstuff_app.cpp
@@ -61,11 +61,13 @@ class HotStuffApp: public HotStuff {
/** The listen address for client RPC */
NetAddr clisten_addr;
+ std::unordered_map<const uint256_t, promise_t> unconfirmed;
+
using Conn = ClientNetwork<opcode_t>::Conn;
void client_request_cmd_handler(MsgReqCmd &&, Conn &);
- command_t parse_cmd(DataStream &s) override {
+ static command_t parse_cmd(DataStream &s) {
auto cmd = new CommandDummy();
s >> *cmd;
return cmd;
@@ -81,6 +83,12 @@ class HotStuffApp: public HotStuff {
#ifndef HOTSTUFF_ENABLE_BENCHMARK
HOTSTUFF_LOG_INFO("replicated %s", std::string(fin).c_str());
#endif
+ auto it = unconfirmed.find(fin.cmd_hash);
+ if (it != unconfirmed.end())
+ {
+ it->second.resolve(fin);
+ unconfirmed.erase(it);
+ }
}
public:
@@ -244,13 +252,26 @@ HotStuffApp::HotStuffApp(uint32_t blk_size,
void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, Conn &conn) {
const NetAddr addr = conn.get_addr();
- msg.postponed_parse(this);
- auto cmd = msg.cmd;
+ auto cmd = parse_cmd(msg.serialized);
+ const auto &cmd_hash = cmd->get_hash();
std::vector<promise_t> pms;
HOTSTUFF_LOG_DEBUG("processing %s", std::string(*cmd).c_str());
- exec_command(cmd).then([this, addr](Finality fin) {
- cn.send_msg(MsgRespCmd(fin), addr);
- });
+ // record the data of the command
+ storage->add_cmd(cmd);
+ if (get_pace_maker().get_proposer() == get_id())
+ exec_command(cmd_hash).then([this, addr](Finality fin) {
+ cn.send_msg(MsgRespCmd(fin), addr);
+ });
+ else
+ {
+ auto it = unconfirmed.find(cmd_hash);
+ if (it == unconfirmed.end())
+ it = unconfirmed.insert(
+ std::make_pair(cmd_hash, promise_t([](promise_t &){}))).first;
+ it->second.then([this, addr](const Finality &fin) {
+ cn.send_msg(MsgRespCmd(std::move(fin)), addr);
+ });
+ }
}
void HotStuffApp::start() {
diff --git a/src/hotstuff_client.cpp b/src/hotstuff_client.cpp
index 390149f..b32bcb5 100644
--- a/src/hotstuff_client.cpp
+++ b/src/hotstuff_client.cpp
@@ -31,13 +31,15 @@ size_t max_async_num;
int max_iter_num;
uint32_t cid;
uint32_t cnt = 0;
+uint32_t nfaulty;
struct Request {
ReplicaID rid;
command_t cmd;
+ size_t confirmed;
salticidae::ElapsedTime et;
Request(ReplicaID rid, const command_t &cmd):
- rid(rid), cmd(cmd) { et.start(); }
+ rid(rid), cmd(cmd), confirmed(0) { et.start(); }
};
std::unordered_map<ReplicaID, MsgNetwork<opcode_t>::conn_t> conns;
@@ -46,18 +48,25 @@ std::vector<NetAddr> replicas;
std::vector<std::pair<struct timeval, double>> elapsed;
MsgNetwork<opcode_t> mn(eb, 10, 10, 4096);
+void connect_all() {
+ for (size_t i = 0; i < replicas.size(); i++)
+ conns.insert(std::make_pair(i, mn.connect(replicas[i])));
+}
+
void set_proposer(ReplicaID rid) {
proposer = rid;
- auto it = conns.find(rid);
- if (it == conns.end())
- conns.insert(std::make_pair(rid, mn.connect(replicas[rid])));
+// auto it = conns.find(rid);
+// if (it == conns.end())
+// conns.insert(std::make_pair(rid, mn.connect(replicas[rid])));
}
void try_send() {
while (waiting.size() < max_async_num && max_iter_num)
{
auto cmd = new CommandDummy(cid, cnt++);
- mn.send_msg(MsgReqCmd(*cmd), *conns.at(proposer));
+ //mn.send_msg(MsgReqCmd(*cmd), *conns.at(proposer));
+ for (auto &p: conns)
+ mn.send_msg(MsgReqCmd(*cmd), *(p.second));
#ifndef HOTSTUFF_ENABLE_BENCHMARK
HOTSTUFF_LOG_INFO("send new cmd %.10s",
get_hex(cmd->get_hash()).c_str());
@@ -77,23 +86,24 @@ void client_resp_cmd_handler(MsgRespCmd &&msg, MsgNetwork<opcode_t>::Conn &) {
auto &et = it->second.et;
if (it == waiting.end()) return;
et.stop();
- if (fin.rid != proposer)
- {
- HOTSTUFF_LOG_INFO("reconnect to the new proposer");
- set_proposer(fin.rid);
- }
- if (fin.rid != it->second.rid)
- {
- mn.send_msg(MsgReqCmd(*(waiting.find(cmd_hash)->second.cmd)),
- *conns.at(proposer));
-#ifndef HOTSTUFF_ENABLE_BENCHMARK
- HOTSTUFF_LOG_INFO("resend cmd %.10s",
- get_hex(cmd_hash).c_str());
-#endif
- et.start();
- it->second.rid = proposer;
- return;
- }
+// if (fin.rid != proposer)
+// {
+// HOTSTUFF_LOG_INFO("reconnect to the new proposer");
+// set_proposer(fin.rid);
+// }
+// if (fin.rid != it->second.rid)
+// {
+// mn.send_msg(MsgReqCmd(*(waiting.find(cmd_hash)->second.cmd)),
+// *conns.at(proposer));
+//#ifndef HOTSTUFF_ENABLE_BENCHMARK
+// HOTSTUFF_LOG_INFO("resend cmd %.10s",
+// get_hex(cmd_hash).c_str());
+//#endif
+// et.start();
+// it->second.rid = proposer;
+// return;
+// }
+ if (++it->second.confirmed <= nfaulty) return; // wait for f + 1 ack
#ifndef HOTSTUFF_ENABLE_BENCHMARK
HOTSTUFF_LOG_INFO("got %s, wall: %.3f, cpu: %.3f",
std::string(fin).c_str(),
@@ -159,6 +169,9 @@ int main(int argc, char **argv) {
replicas.push_back(NetAddr(NetAddr(_p.first).ip, htons(stoi(_p.second, &_))));
}
+ nfaulty = (replicas.size() - 1) / 3;
+ HOTSTUFF_LOG_INFO("nfaulty = %zu", nfaulty);
+ connect_all();
set_proposer(idx);
try_send();
eb.dispatch();