diff options
author | Determinant <[email protected]> | 2018-09-10 20:49:34 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2018-09-10 20:49:34 -0400 |
commit | c4d3e5fe66568ccd0732edf7cf80d37959d6abda (patch) | |
tree | d21fffc5209fad03bc5ac81f4fee3d3e887dac22 | |
parent | 95b193bb3b0bba62f168c7e2c3f45c54d4849e27 (diff) |
let client send cmd data, the replicas should only work on the hash
-rw-r--r-- | include/hotstuff/client.h | 27 | ||||
-rw-r--r-- | include/hotstuff/consensus.h | 4 | ||||
-rw-r--r-- | include/hotstuff/entity.h | 22 | ||||
-rw-r--r-- | include/hotstuff/hotstuff.h | 5 | ||||
-rw-r--r-- | include/hotstuff/liveness.h | 4 | ||||
-rw-r--r-- | src/client.cpp | 7 | ||||
-rw-r--r-- | src/consensus.cpp | 6 | ||||
-rw-r--r-- | src/entity.cpp | 6 | ||||
-rw-r--r-- | src/hotstuff.cpp | 41 | ||||
-rw-r--r-- | src/hotstuff_app.cpp | 33 | ||||
-rw-r--r-- | src/hotstuff_client.cpp | 57 |
11 files changed, 101 insertions, 111 deletions
diff --git a/include/hotstuff/client.h b/include/hotstuff/client.h index d61d9e7..ca18062 100644 --- a/include/hotstuff/client.h +++ b/include/hotstuff/client.h @@ -12,25 +12,32 @@ struct MsgReqCmd { static const opcode_t opcode = 0x4; DataStream serialized; command_t cmd; - MsgReqCmd(const Command &cmd); + MsgReqCmd(const Command &cmd) { serialized << cmd; } MsgReqCmd(DataStream &&s): serialized(std::move(s)) {} - void postponed_parse(HotStuffCore *hsc); }; struct MsgRespCmd { static const opcode_t opcode = 0x5; DataStream serialized; +#if HOTSTUFF_CMD_RESPSIZE > 0 + uint8_t payload[HOTSTUFF_CMD_RESPSIZE]; +#endif Finality fin; - MsgRespCmd(const Finality &fin); - MsgRespCmd(DataStream &&s); + MsgRespCmd(const Finality &fin) { + serialized << fin; +#if HOTSTUFF_CMD_RESPSIZE > 0 + serialized.put_data(payload, payload + HOTSTUFF_CMD_RESPSIZE); +#endif + } + MsgRespCmd(DataStream &&s) { s >> fin; } }; class CommandDummy: public Command { uint32_t cid; uint32_t n; uint256_t hash; -#if HOTSTUFF_CMD_DMSIZE > 0 - uint8_t payload[HOTSTUFF_CMD_DMSIZE]; +#if HOTSTUFF_CMD_REQSIZE > 0 + uint8_t payload[HOTSTUFF_CMD_REQSIZE]; #endif uint256_t compute_hash() { DataStream s; @@ -47,17 +54,13 @@ class CommandDummy: public Command { void serialize(DataStream &s) const override { s << cid << n; -#if HOTSTUFF_CMD_DMSIZE > 0 - s.put_data(payload, payload + HOTSTUFF_CMD_DMSIZE); +#if HOTSTUFF_CMD_REQSIZE > 0 + s.put_data(payload, payload + HOTSTUFF_CMD_REQSIZE); #endif } void unserialize(DataStream &s) override { s >> cid >> n; -#if HOTSTUFF_CMD_DMSIZE > 0 - auto base = s.get_data_inplace(HOTSTUFF_CMD_DMSIZE); - memmove(payload, base, HOTSTUFF_CMD_DMSIZE); -#endif hash = compute_hash(); } diff --git a/include/hotstuff/consensus.h b/include/hotstuff/consensus.h index 9e2558c..e8f9765 100644 --- a/include/hotstuff/consensus.h +++ b/include/hotstuff/consensus.h @@ -84,7 +84,7 @@ class HotStuffCore { /** Call to submit new commands to be decided (executed). "Parents" must * contain at least one block, and the first block is the actual parent, * while the others are uncles/aunts */ - void on_propose(const std::vector<command_t> &cmds, + void on_propose(const std::vector<uint256_t> &cmds, const std::vector<block_t> &parents, bytearray_t &&extra = bytearray_t()); @@ -118,7 +118,7 @@ class HotStuffCore { /** Create a quorum certificate from its serialized form. */ virtual quorum_cert_bt parse_quorum_cert(DataStream &s) = 0; /** Create a command object from its serialized form. */ - virtual command_t parse_cmd(DataStream &s) = 0; + //virtual command_t parse_cmd(DataStream &s) = 0; public: /** Add a replica to the current configuration. This should only be called diff --git a/include/hotstuff/entity.h b/include/hotstuff/entity.h index 6327dfe..f9e7bc8 100644 --- a/include/hotstuff/entity.h +++ b/include/hotstuff/entity.h @@ -79,7 +79,6 @@ using block_weak_t = salticidae::WeakObj<Block>; class Command: public Serializable { friend HotStuffCore; - block_weak_t container; public: virtual ~Command() = default; virtual const uint256_t &get_hash() const = 0; @@ -105,7 +104,7 @@ get_hashes(const std::vector<Hashable> &plist) { class Block { friend HotStuffCore; std::vector<uint256_t> parent_hashes; - std::vector<command_t> cmds; + std::vector<uint256_t> cmds; quorum_cert_bt qc; bytearray_t extra; @@ -135,7 +134,7 @@ class Block { delivered(delivered), decision(decision) {} Block(const std::vector<block_t> &parents, - const std::vector<command_t> &cmds, + const std::vector<uint256_t> &cmds, quorum_cert_bt &&qc, bytearray_t &&extra, uint32_t height, @@ -158,7 +157,7 @@ class Block { void unserialize(DataStream &s, HotStuffCore *hsc); - const std::vector<command_t> &get_cmds() const { + const std::vector<uint256_t> &get_cmds() const { return cmds; } @@ -174,19 +173,12 @@ class Block { bool verify(const ReplicaConfig &config) const { if (qc && !qc->verify(config)) return false; - for (auto cmd: cmds) - if (!cmd->verify()) return false; return true; } promise_t verify(const ReplicaConfig &config, VeriPool &vpool) const { return (qc ? qc->verify(config, vpool) : - promise_t([](promise_t &pm) { pm.resolve(true); })).then([this](bool result) { - if (!result) return false; - for (auto cmd: cmds) - if (!cmd->verify()) return false; - return true; - }); + promise_t([](promise_t &pm) { pm.resolve(true); })); } int8_t get_decision() const { return decision; } @@ -232,7 +224,7 @@ class EntityStorage { return blk_cache.count(blk_hash); } - block_t add_blk(Block &&_blk, const ReplicaConfig &config) { + block_t add_blk(Block &&_blk, const ReplicaConfig &/*config*/) { //if (!_blk.verify(config)) //{ // HOTSTUFF_LOG_WARN("invalid %s", std::string(_blk).c_str()); @@ -288,8 +280,8 @@ class EntityStorage { #ifdef HOTSTUFF_PROTO_LOG HOTSTUFF_LOG_INFO("releasing blk %.10s", get_hex(blk_hash).c_str()); #endif - for (const auto &cmd: blk->get_cmds()) - try_release_cmd(cmd); +// for (const auto &cmd: blk->get_cmds()) +// try_release_cmd(cmd); blk_cache.erase(blk_hash); return true; } diff --git a/include/hotstuff/hotstuff.h b/include/hotstuff/hotstuff.h index eeffaab..3d1c7b6 100644 --- a/include/hotstuff/hotstuff.h +++ b/include/hotstuff/hotstuff.h @@ -135,9 +135,8 @@ class HotStuffBase: public HotStuffCore { /* queues for async tasks */ std::unordered_map<const uint256_t, BlockFetchContext> blk_fetch_waiting; std::unordered_map<const uint256_t, BlockDeliveryContext> blk_delivery_waiting; - std::unordered_map<const uint256_t, CmdFetchContext> cmd_fetch_waiting; std::unordered_map<const uint256_t, promise_t> decision_waiting; - std::queue<command_t> cmd_pending; + std::queue<uint256_t> cmd_pending; /* statistics */ uint64_t fetched; @@ -192,7 +191,7 @@ class HotStuffBase: public HotStuffCore { /* the API for HotStuffBase */ /* Submit the command to be decided. */ - promise_t exec_command(command_t cmd); + promise_t exec_command(uint256_t cmd); void add_replica(ReplicaID idx, const NetAddr &addr, pubkey_bt &&pub_key); void start(bool eb_loop = false); diff --git a/include/hotstuff/liveness.h b/include/hotstuff/liveness.h index 8c9c9ab..c88a0a1 100644 --- a/include/hotstuff/liveness.h +++ b/include/hotstuff/liveness.h @@ -330,7 +330,7 @@ class PMStickyProposer: virtual public PaceMaker { /* FIXME: should extra data be the voter's id? */ s << hsc->get_id(); /* propose a block for leader election */ - hsc->on_propose(std::vector<command_t>{}, + hsc->on_propose(std::vector<uint256_t>{}, get_parents(), std::move(s)); } @@ -580,7 +580,7 @@ class PMRoundRobinProposer: virtual public PaceMaker { /* FIXME: should extra data be the voter's id? */ s << hsc->get_id(); /* propose a block for leader election */ - hsc->on_propose(std::vector<command_t>{}, + hsc->on_propose(std::vector<uint256_t>{}, get_parents(), std::move(s)); } diff --git a/src/client.cpp b/src/client.cpp index 7827b7c..368f746 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -3,13 +3,6 @@ namespace hotstuff { const opcode_t MsgReqCmd::opcode; -MsgReqCmd::MsgReqCmd(const Command &cmd) { serialized << cmd; } -void MsgReqCmd::postponed_parse(HotStuffCore *hsc) { - cmd = hsc->parse_cmd(serialized); -} - const opcode_t MsgRespCmd::opcode; -MsgRespCmd::MsgRespCmd(const Finality &fin) { serialized << fin; } -MsgRespCmd::MsgRespCmd(DataStream &&s) { s >> fin; } } diff --git a/src/consensus.cpp b/src/consensus.cpp index c80de59..dbe79b0 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -50,8 +50,6 @@ bool HotStuffCore::on_deliver_blk(const block_t &blk) { for (const auto &hash: blk->parent_hashes) blk->parents.push_back(get_delivered_blk(hash)); blk->height = blk->parents[0]->height + 1; - for (const auto &cmd: blk->cmds) - cmd->container = blk; if (blk->qc) { @@ -97,7 +95,7 @@ void HotStuffCore::check_commit(const block_t &_blk) { size_t idx = 0; for (auto cmd: blk->cmds) do_decide(Finality(id, 1, idx, blk->height, - cmd->get_hash(), blk->get_hash())); + cmd, blk->get_hash())); } bexec = p; } @@ -114,7 +112,7 @@ bool HotStuffCore::update(const uint256_t &bqc_hash) { return true; } -void HotStuffCore::on_propose(const std::vector<command_t> &cmds, +void HotStuffCore::on_propose(const std::vector<uint256_t> &cmds, const std::vector<block_t> &parents, bytearray_t &&extra) { if (parents.empty()) diff --git a/src/entity.cpp b/src/entity.cpp index 594fdbe..94b896a 100644 --- a/src/entity.cpp +++ b/src/entity.cpp @@ -9,7 +9,7 @@ void Block::serialize(DataStream &s) const { s << hash; s << htole((uint32_t)cmds.size()); for (auto cmd: cmds) - s << *cmd; + s << cmd; if (qc) s << (uint8_t)1 << *qc; else @@ -29,7 +29,9 @@ void Block::unserialize(DataStream &s, HotStuffCore *hsc) { n = letoh(n); cmds.resize(n); for (auto &cmd: cmds) - cmd = hsc->parse_cmd(s); + s >> cmd; +// for (auto &cmd: cmds) +// cmd = hsc->parse_cmd(s); s >> flag; qc = flag ? hsc->parse_quorum_cert(s) : nullptr; s >> n; diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp index 98c2237..99ab80f 100644 --- a/src/hotstuff.cpp +++ b/src/hotstuff.cpp @@ -58,8 +58,7 @@ void MsgRespBlock::postponed_parse(HotStuffCore *hsc) { } // TODO: improve this function -promise_t HotStuffBase::exec_command(command_t cmd) { - const uint256_t &cmd_hash = cmd->get_hash(); +promise_t HotStuffBase::exec_command(uint256_t cmd_hash) { ReplicaID proposer = pmaker->get_proposer(); if (proposer != get_id()) @@ -71,13 +70,13 @@ promise_t HotStuffBase::exec_command(command_t cmd) { auto it = decision_waiting.find(cmd_hash); if (it == decision_waiting.end()) { - cmd_pending.push(storage->add_cmd(cmd)); + cmd_pending.push(cmd_hash); it = decision_waiting.insert(std::make_pair(cmd_hash, promise_t())).first; } if (cmd_pending.size() >= blk_size) { - std::vector<command_t> cmds; + std::vector<uint256_t> cmds; for (uint32_t i = 0; i < blk_size; i++) { cmds.push_back(cmd_pending.front()); @@ -86,9 +85,8 @@ promise_t HotStuffBase::exec_command(command_t cmd) { pmaker->beat().then([this, cmds = std::move(cmds)](ReplicaID proposer) { if (proposer != get_id()) { - for (auto &cmd: cmds) + for (auto &cmd_hash: cmds) { - const auto &cmd_hash = cmd->get_hash(); auto it = decision_waiting.find(cmd_hash); if (it != decision_waiting.end()) { @@ -119,7 +117,7 @@ void HotStuffBase::on_fetch_blk(const block_t &blk) { LOG_DEBUG("fetched %.10s", get_hex(blk->get_hash()).c_str()); part_fetched++; fetched++; - for (auto cmd: blk->get_cmds()) on_fetch_cmd(cmd); + //for (auto cmd: blk->get_cmds()) on_fetch_cmd(cmd); const uint256_t &blk_hash = blk->get_hash(); auto it = blk_fetch_waiting.find(blk_hash); if (it != blk_fetch_waiting.end()) @@ -129,16 +127,6 @@ void HotStuffBase::on_fetch_blk(const block_t &blk) { } } -void HotStuffBase::on_fetch_cmd(const command_t &cmd) { - const uint256_t &cmd_hash = cmd->get_hash(); - auto it = cmd_fetch_waiting.find(cmd_hash); - if (it != cmd_fetch_waiting.end()) - { - it->second.resolve(cmd); - cmd_fetch_waiting.erase(it); - } -} - void HotStuffBase::on_deliver_blk(const block_t &blk) { const uint256_t &blk_hash = blk->get_hash(); bool valid; @@ -204,24 +192,6 @@ promise_t HotStuffBase::async_fetch_blk(const uint256_t &blk_hash, return static_cast<promise_t &>(it->second); } -promise_t HotStuffBase::async_fetch_cmd(const uint256_t &cmd_hash, - const NetAddr *replica_id, - bool fetch_now) { - if (storage->is_cmd_fetched(cmd_hash)) - return promise_t([this, &cmd_hash](promise_t pm){ - pm.resolve(storage->find_cmd(cmd_hash)); - }); - auto it = cmd_fetch_waiting.find(cmd_hash); - if (it == cmd_fetch_waiting.end()) - { - it = cmd_fetch_waiting.insert( - std::make_pair(cmd_hash, CmdFetchContext(cmd_hash, this))).first; - } - if (replica_id != nullptr) - it->second.add_replica(*replica_id, fetch_now); - return static_cast<promise_t &>(it->second); -} - promise_t HotStuffBase::async_deliver_blk(const uint256_t &blk_hash, const NetAddr &replica_id) { if (storage->is_blk_delivered(blk_hash)) @@ -313,7 +283,6 @@ void HotStuffBase::print_stat() const { LOG_INFO("-------- queues -------"); LOG_INFO("blk_fetch_waiting: %lu", blk_fetch_waiting.size()); LOG_INFO("blk_delivery_waiting: %lu", blk_delivery_waiting.size()); - LOG_INFO("cmd_fetch_waiting: %lu", cmd_fetch_waiting.size()); LOG_INFO("decision_waiting: %lu", decision_waiting.size()); LOG_INFO("-------- misc ---------"); LOG_INFO("fetched: %lu", fetched); diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp index 87c80e3..014fe16 100644 --- a/src/hotstuff_app.cpp +++ b/src/hotstuff_app.cpp @@ -61,11 +61,13 @@ class HotStuffApp: public HotStuff { /** The listen address for client RPC */ NetAddr clisten_addr; + std::unordered_map<const uint256_t, promise_t> unconfirmed; + using Conn = ClientNetwork<opcode_t>::Conn; void client_request_cmd_handler(MsgReqCmd &&, Conn &); - command_t parse_cmd(DataStream &s) override { + static command_t parse_cmd(DataStream &s) { auto cmd = new CommandDummy(); s >> *cmd; return cmd; @@ -81,6 +83,12 @@ class HotStuffApp: public HotStuff { #ifndef HOTSTUFF_ENABLE_BENCHMARK HOTSTUFF_LOG_INFO("replicated %s", std::string(fin).c_str()); #endif + auto it = unconfirmed.find(fin.cmd_hash); + if (it != unconfirmed.end()) + { + it->second.resolve(fin); + unconfirmed.erase(it); + } } public: @@ -244,13 +252,26 @@ HotStuffApp::HotStuffApp(uint32_t blk_size, void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, Conn &conn) { const NetAddr addr = conn.get_addr(); - msg.postponed_parse(this); - auto cmd = msg.cmd; + auto cmd = parse_cmd(msg.serialized); + const auto &cmd_hash = cmd->get_hash(); std::vector<promise_t> pms; HOTSTUFF_LOG_DEBUG("processing %s", std::string(*cmd).c_str()); - exec_command(cmd).then([this, addr](Finality fin) { - cn.send_msg(MsgRespCmd(fin), addr); - }); + // record the data of the command + storage->add_cmd(cmd); + if (get_pace_maker().get_proposer() == get_id()) + exec_command(cmd_hash).then([this, addr](Finality fin) { + cn.send_msg(MsgRespCmd(fin), addr); + }); + else + { + auto it = unconfirmed.find(cmd_hash); + if (it == unconfirmed.end()) + it = unconfirmed.insert( + std::make_pair(cmd_hash, promise_t([](promise_t &){}))).first; + it->second.then([this, addr](const Finality &fin) { + cn.send_msg(MsgRespCmd(std::move(fin)), addr); + }); + } } void HotStuffApp::start() { diff --git a/src/hotstuff_client.cpp b/src/hotstuff_client.cpp index 390149f..b32bcb5 100644 --- a/src/hotstuff_client.cpp +++ b/src/hotstuff_client.cpp @@ -31,13 +31,15 @@ size_t max_async_num; int max_iter_num; uint32_t cid; uint32_t cnt = 0; +uint32_t nfaulty; struct Request { ReplicaID rid; command_t cmd; + size_t confirmed; salticidae::ElapsedTime et; Request(ReplicaID rid, const command_t &cmd): - rid(rid), cmd(cmd) { et.start(); } + rid(rid), cmd(cmd), confirmed(0) { et.start(); } }; std::unordered_map<ReplicaID, MsgNetwork<opcode_t>::conn_t> conns; @@ -46,18 +48,25 @@ std::vector<NetAddr> replicas; std::vector<std::pair<struct timeval, double>> elapsed; MsgNetwork<opcode_t> mn(eb, 10, 10, 4096); +void connect_all() { + for (size_t i = 0; i < replicas.size(); i++) + conns.insert(std::make_pair(i, mn.connect(replicas[i]))); +} + void set_proposer(ReplicaID rid) { proposer = rid; - auto it = conns.find(rid); - if (it == conns.end()) - conns.insert(std::make_pair(rid, mn.connect(replicas[rid]))); +// auto it = conns.find(rid); +// if (it == conns.end()) +// conns.insert(std::make_pair(rid, mn.connect(replicas[rid]))); } void try_send() { while (waiting.size() < max_async_num && max_iter_num) { auto cmd = new CommandDummy(cid, cnt++); - mn.send_msg(MsgReqCmd(*cmd), *conns.at(proposer)); + //mn.send_msg(MsgReqCmd(*cmd), *conns.at(proposer)); + for (auto &p: conns) + mn.send_msg(MsgReqCmd(*cmd), *(p.second)); #ifndef HOTSTUFF_ENABLE_BENCHMARK HOTSTUFF_LOG_INFO("send new cmd %.10s", get_hex(cmd->get_hash()).c_str()); @@ -77,23 +86,24 @@ void client_resp_cmd_handler(MsgRespCmd &&msg, MsgNetwork<opcode_t>::Conn &) { auto &et = it->second.et; if (it == waiting.end()) return; et.stop(); - if (fin.rid != proposer) - { - HOTSTUFF_LOG_INFO("reconnect to the new proposer"); - set_proposer(fin.rid); - } - if (fin.rid != it->second.rid) - { - mn.send_msg(MsgReqCmd(*(waiting.find(cmd_hash)->second.cmd)), - *conns.at(proposer)); -#ifndef HOTSTUFF_ENABLE_BENCHMARK - HOTSTUFF_LOG_INFO("resend cmd %.10s", - get_hex(cmd_hash).c_str()); -#endif - et.start(); - it->second.rid = proposer; - return; - } +// if (fin.rid != proposer) +// { +// HOTSTUFF_LOG_INFO("reconnect to the new proposer"); +// set_proposer(fin.rid); +// } +// if (fin.rid != it->second.rid) +// { +// mn.send_msg(MsgReqCmd(*(waiting.find(cmd_hash)->second.cmd)), +// *conns.at(proposer)); +//#ifndef HOTSTUFF_ENABLE_BENCHMARK +// HOTSTUFF_LOG_INFO("resend cmd %.10s", +// get_hex(cmd_hash).c_str()); +//#endif +// et.start(); +// it->second.rid = proposer; +// return; +// } + if (++it->second.confirmed <= nfaulty) return; // wait for f + 1 ack #ifndef HOTSTUFF_ENABLE_BENCHMARK HOTSTUFF_LOG_INFO("got %s, wall: %.3f, cpu: %.3f", std::string(fin).c_str(), @@ -159,6 +169,9 @@ int main(int argc, char **argv) { replicas.push_back(NetAddr(NetAddr(_p.first).ip, htons(stoi(_p.second, &_)))); } + nfaulty = (replicas.size() - 1) / 3; + HOTSTUFF_LOG_INFO("nfaulty = %zu", nfaulty); + connect_all(); set_proposer(idx); try_send(); eb.dispatch(); |