diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/client.cpp | 7 | ||||
-rw-r--r-- | src/consensus.cpp | 6 | ||||
-rw-r--r-- | src/entity.cpp | 6 | ||||
-rw-r--r-- | src/hotstuff.cpp | 41 | ||||
-rw-r--r-- | src/hotstuff_app.cpp | 33 | ||||
-rw-r--r-- | src/hotstuff_client.cpp | 57 |
6 files changed, 73 insertions, 77 deletions
diff --git a/src/client.cpp b/src/client.cpp index 7827b7c..368f746 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -3,13 +3,6 @@ namespace hotstuff { const opcode_t MsgReqCmd::opcode; -MsgReqCmd::MsgReqCmd(const Command &cmd) { serialized << cmd; } -void MsgReqCmd::postponed_parse(HotStuffCore *hsc) { - cmd = hsc->parse_cmd(serialized); -} - const opcode_t MsgRespCmd::opcode; -MsgRespCmd::MsgRespCmd(const Finality &fin) { serialized << fin; } -MsgRespCmd::MsgRespCmd(DataStream &&s) { s >> fin; } } diff --git a/src/consensus.cpp b/src/consensus.cpp index c80de59..dbe79b0 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -50,8 +50,6 @@ bool HotStuffCore::on_deliver_blk(const block_t &blk) { for (const auto &hash: blk->parent_hashes) blk->parents.push_back(get_delivered_blk(hash)); blk->height = blk->parents[0]->height + 1; - for (const auto &cmd: blk->cmds) - cmd->container = blk; if (blk->qc) { @@ -97,7 +95,7 @@ void HotStuffCore::check_commit(const block_t &_blk) { size_t idx = 0; for (auto cmd: blk->cmds) do_decide(Finality(id, 1, idx, blk->height, - cmd->get_hash(), blk->get_hash())); + cmd, blk->get_hash())); } bexec = p; } @@ -114,7 +112,7 @@ bool HotStuffCore::update(const uint256_t &bqc_hash) { return true; } -void HotStuffCore::on_propose(const std::vector<command_t> &cmds, +void HotStuffCore::on_propose(const std::vector<uint256_t> &cmds, const std::vector<block_t> &parents, bytearray_t &&extra) { if (parents.empty()) diff --git a/src/entity.cpp b/src/entity.cpp index 594fdbe..94b896a 100644 --- a/src/entity.cpp +++ b/src/entity.cpp @@ -9,7 +9,7 @@ void Block::serialize(DataStream &s) const { s << hash; s << htole((uint32_t)cmds.size()); for (auto cmd: cmds) - s << *cmd; + s << cmd; if (qc) s << (uint8_t)1 << *qc; else @@ -29,7 +29,9 @@ void Block::unserialize(DataStream &s, HotStuffCore *hsc) { n = letoh(n); cmds.resize(n); for (auto &cmd: cmds) - cmd = hsc->parse_cmd(s); + s >> cmd; +// for (auto &cmd: cmds) +// cmd = hsc->parse_cmd(s); s >> flag; qc = flag ? hsc->parse_quorum_cert(s) : nullptr; s >> n; diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp index 98c2237..99ab80f 100644 --- a/src/hotstuff.cpp +++ b/src/hotstuff.cpp @@ -58,8 +58,7 @@ void MsgRespBlock::postponed_parse(HotStuffCore *hsc) { } // TODO: improve this function -promise_t HotStuffBase::exec_command(command_t cmd) { - const uint256_t &cmd_hash = cmd->get_hash(); +promise_t HotStuffBase::exec_command(uint256_t cmd_hash) { ReplicaID proposer = pmaker->get_proposer(); if (proposer != get_id()) @@ -71,13 +70,13 @@ promise_t HotStuffBase::exec_command(command_t cmd) { auto it = decision_waiting.find(cmd_hash); if (it == decision_waiting.end()) { - cmd_pending.push(storage->add_cmd(cmd)); + cmd_pending.push(cmd_hash); it = decision_waiting.insert(std::make_pair(cmd_hash, promise_t())).first; } if (cmd_pending.size() >= blk_size) { - std::vector<command_t> cmds; + std::vector<uint256_t> cmds; for (uint32_t i = 0; i < blk_size; i++) { cmds.push_back(cmd_pending.front()); @@ -86,9 +85,8 @@ promise_t HotStuffBase::exec_command(command_t cmd) { pmaker->beat().then([this, cmds = std::move(cmds)](ReplicaID proposer) { if (proposer != get_id()) { - for (auto &cmd: cmds) + for (auto &cmd_hash: cmds) { - const auto &cmd_hash = cmd->get_hash(); auto it = decision_waiting.find(cmd_hash); if (it != decision_waiting.end()) { @@ -119,7 +117,7 @@ void HotStuffBase::on_fetch_blk(const block_t &blk) { LOG_DEBUG("fetched %.10s", get_hex(blk->get_hash()).c_str()); part_fetched++; fetched++; - for (auto cmd: blk->get_cmds()) on_fetch_cmd(cmd); + //for (auto cmd: blk->get_cmds()) on_fetch_cmd(cmd); const uint256_t &blk_hash = blk->get_hash(); auto it = blk_fetch_waiting.find(blk_hash); if (it != blk_fetch_waiting.end()) @@ -129,16 +127,6 @@ void HotStuffBase::on_fetch_blk(const block_t &blk) { } } -void HotStuffBase::on_fetch_cmd(const command_t &cmd) { - const uint256_t &cmd_hash = cmd->get_hash(); - auto it = cmd_fetch_waiting.find(cmd_hash); - if (it != cmd_fetch_waiting.end()) - { - it->second.resolve(cmd); - cmd_fetch_waiting.erase(it); - } -} - void HotStuffBase::on_deliver_blk(const block_t &blk) { const uint256_t &blk_hash = blk->get_hash(); bool valid; @@ -204,24 +192,6 @@ promise_t HotStuffBase::async_fetch_blk(const uint256_t &blk_hash, return static_cast<promise_t &>(it->second); } -promise_t HotStuffBase::async_fetch_cmd(const uint256_t &cmd_hash, - const NetAddr *replica_id, - bool fetch_now) { - if (storage->is_cmd_fetched(cmd_hash)) - return promise_t([this, &cmd_hash](promise_t pm){ - pm.resolve(storage->find_cmd(cmd_hash)); - }); - auto it = cmd_fetch_waiting.find(cmd_hash); - if (it == cmd_fetch_waiting.end()) - { - it = cmd_fetch_waiting.insert( - std::make_pair(cmd_hash, CmdFetchContext(cmd_hash, this))).first; - } - if (replica_id != nullptr) - it->second.add_replica(*replica_id, fetch_now); - return static_cast<promise_t &>(it->second); -} - promise_t HotStuffBase::async_deliver_blk(const uint256_t &blk_hash, const NetAddr &replica_id) { if (storage->is_blk_delivered(blk_hash)) @@ -313,7 +283,6 @@ void HotStuffBase::print_stat() const { LOG_INFO("-------- queues -------"); LOG_INFO("blk_fetch_waiting: %lu", blk_fetch_waiting.size()); LOG_INFO("blk_delivery_waiting: %lu", blk_delivery_waiting.size()); - LOG_INFO("cmd_fetch_waiting: %lu", cmd_fetch_waiting.size()); LOG_INFO("decision_waiting: %lu", decision_waiting.size()); LOG_INFO("-------- misc ---------"); LOG_INFO("fetched: %lu", fetched); diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp index 87c80e3..014fe16 100644 --- a/src/hotstuff_app.cpp +++ b/src/hotstuff_app.cpp @@ -61,11 +61,13 @@ class HotStuffApp: public HotStuff { /** The listen address for client RPC */ NetAddr clisten_addr; + std::unordered_map<const uint256_t, promise_t> unconfirmed; + using Conn = ClientNetwork<opcode_t>::Conn; void client_request_cmd_handler(MsgReqCmd &&, Conn &); - command_t parse_cmd(DataStream &s) override { + static command_t parse_cmd(DataStream &s) { auto cmd = new CommandDummy(); s >> *cmd; return cmd; @@ -81,6 +83,12 @@ class HotStuffApp: public HotStuff { #ifndef HOTSTUFF_ENABLE_BENCHMARK HOTSTUFF_LOG_INFO("replicated %s", std::string(fin).c_str()); #endif + auto it = unconfirmed.find(fin.cmd_hash); + if (it != unconfirmed.end()) + { + it->second.resolve(fin); + unconfirmed.erase(it); + } } public: @@ -244,13 +252,26 @@ HotStuffApp::HotStuffApp(uint32_t blk_size, void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, Conn &conn) { const NetAddr addr = conn.get_addr(); - msg.postponed_parse(this); - auto cmd = msg.cmd; + auto cmd = parse_cmd(msg.serialized); + const auto &cmd_hash = cmd->get_hash(); std::vector<promise_t> pms; HOTSTUFF_LOG_DEBUG("processing %s", std::string(*cmd).c_str()); - exec_command(cmd).then([this, addr](Finality fin) { - cn.send_msg(MsgRespCmd(fin), addr); - }); + // record the data of the command + storage->add_cmd(cmd); + if (get_pace_maker().get_proposer() == get_id()) + exec_command(cmd_hash).then([this, addr](Finality fin) { + cn.send_msg(MsgRespCmd(fin), addr); + }); + else + { + auto it = unconfirmed.find(cmd_hash); + if (it == unconfirmed.end()) + it = unconfirmed.insert( + std::make_pair(cmd_hash, promise_t([](promise_t &){}))).first; + it->second.then([this, addr](const Finality &fin) { + cn.send_msg(MsgRespCmd(std::move(fin)), addr); + }); + } } void HotStuffApp::start() { diff --git a/src/hotstuff_client.cpp b/src/hotstuff_client.cpp index 390149f..b32bcb5 100644 --- a/src/hotstuff_client.cpp +++ b/src/hotstuff_client.cpp @@ -31,13 +31,15 @@ size_t max_async_num; int max_iter_num; uint32_t cid; uint32_t cnt = 0; +uint32_t nfaulty; struct Request { ReplicaID rid; command_t cmd; + size_t confirmed; salticidae::ElapsedTime et; Request(ReplicaID rid, const command_t &cmd): - rid(rid), cmd(cmd) { et.start(); } + rid(rid), cmd(cmd), confirmed(0) { et.start(); } }; std::unordered_map<ReplicaID, MsgNetwork<opcode_t>::conn_t> conns; @@ -46,18 +48,25 @@ std::vector<NetAddr> replicas; std::vector<std::pair<struct timeval, double>> elapsed; MsgNetwork<opcode_t> mn(eb, 10, 10, 4096); +void connect_all() { + for (size_t i = 0; i < replicas.size(); i++) + conns.insert(std::make_pair(i, mn.connect(replicas[i]))); +} + void set_proposer(ReplicaID rid) { proposer = rid; - auto it = conns.find(rid); - if (it == conns.end()) - conns.insert(std::make_pair(rid, mn.connect(replicas[rid]))); +// auto it = conns.find(rid); +// if (it == conns.end()) +// conns.insert(std::make_pair(rid, mn.connect(replicas[rid]))); } void try_send() { while (waiting.size() < max_async_num && max_iter_num) { auto cmd = new CommandDummy(cid, cnt++); - mn.send_msg(MsgReqCmd(*cmd), *conns.at(proposer)); + //mn.send_msg(MsgReqCmd(*cmd), *conns.at(proposer)); + for (auto &p: conns) + mn.send_msg(MsgReqCmd(*cmd), *(p.second)); #ifndef HOTSTUFF_ENABLE_BENCHMARK HOTSTUFF_LOG_INFO("send new cmd %.10s", get_hex(cmd->get_hash()).c_str()); @@ -77,23 +86,24 @@ void client_resp_cmd_handler(MsgRespCmd &&msg, MsgNetwork<opcode_t>::Conn &) { auto &et = it->second.et; if (it == waiting.end()) return; et.stop(); - if (fin.rid != proposer) - { - HOTSTUFF_LOG_INFO("reconnect to the new proposer"); - set_proposer(fin.rid); - } - if (fin.rid != it->second.rid) - { - mn.send_msg(MsgReqCmd(*(waiting.find(cmd_hash)->second.cmd)), - *conns.at(proposer)); -#ifndef HOTSTUFF_ENABLE_BENCHMARK - HOTSTUFF_LOG_INFO("resend cmd %.10s", - get_hex(cmd_hash).c_str()); -#endif - et.start(); - it->second.rid = proposer; - return; - } +// if (fin.rid != proposer) +// { +// HOTSTUFF_LOG_INFO("reconnect to the new proposer"); +// set_proposer(fin.rid); +// } +// if (fin.rid != it->second.rid) +// { +// mn.send_msg(MsgReqCmd(*(waiting.find(cmd_hash)->second.cmd)), +// *conns.at(proposer)); +//#ifndef HOTSTUFF_ENABLE_BENCHMARK +// HOTSTUFF_LOG_INFO("resend cmd %.10s", +// get_hex(cmd_hash).c_str()); +//#endif +// et.start(); +// it->second.rid = proposer; +// return; +// } + if (++it->second.confirmed <= nfaulty) return; // wait for f + 1 ack #ifndef HOTSTUFF_ENABLE_BENCHMARK HOTSTUFF_LOG_INFO("got %s, wall: %.3f, cpu: %.3f", std::string(fin).c_str(), @@ -159,6 +169,9 @@ int main(int argc, char **argv) { replicas.push_back(NetAddr(NetAddr(_p.first).ip, htons(stoi(_p.second, &_)))); } + nfaulty = (replicas.size() - 1) / 3; + HOTSTUFF_LOG_INFO("nfaulty = %zu", nfaulty); + connect_all(); set_proposer(idx); try_send(); eb.dispatch(); |