aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/client.cpp7
-rw-r--r--src/consensus.cpp6
-rw-r--r--src/entity.cpp6
-rw-r--r--src/hotstuff.cpp41
-rw-r--r--src/hotstuff_app.cpp33
-rw-r--r--src/hotstuff_client.cpp57
6 files changed, 73 insertions, 77 deletions
diff --git a/src/client.cpp b/src/client.cpp
index 7827b7c..368f746 100644
--- a/src/client.cpp
+++ b/src/client.cpp
@@ -3,13 +3,6 @@
namespace hotstuff {
const opcode_t MsgReqCmd::opcode;
-MsgReqCmd::MsgReqCmd(const Command &cmd) { serialized << cmd; }
-void MsgReqCmd::postponed_parse(HotStuffCore *hsc) {
- cmd = hsc->parse_cmd(serialized);
-}
-
const opcode_t MsgRespCmd::opcode;
-MsgRespCmd::MsgRespCmd(const Finality &fin) { serialized << fin; }
-MsgRespCmd::MsgRespCmd(DataStream &&s) { s >> fin; }
}
diff --git a/src/consensus.cpp b/src/consensus.cpp
index c80de59..dbe79b0 100644
--- a/src/consensus.cpp
+++ b/src/consensus.cpp
@@ -50,8 +50,6 @@ bool HotStuffCore::on_deliver_blk(const block_t &blk) {
for (const auto &hash: blk->parent_hashes)
blk->parents.push_back(get_delivered_blk(hash));
blk->height = blk->parents[0]->height + 1;
- for (const auto &cmd: blk->cmds)
- cmd->container = blk;
if (blk->qc)
{
@@ -97,7 +95,7 @@ void HotStuffCore::check_commit(const block_t &_blk) {
size_t idx = 0;
for (auto cmd: blk->cmds)
do_decide(Finality(id, 1, idx, blk->height,
- cmd->get_hash(), blk->get_hash()));
+ cmd, blk->get_hash()));
}
bexec = p;
}
@@ -114,7 +112,7 @@ bool HotStuffCore::update(const uint256_t &bqc_hash) {
return true;
}
-void HotStuffCore::on_propose(const std::vector<command_t> &cmds,
+void HotStuffCore::on_propose(const std::vector<uint256_t> &cmds,
const std::vector<block_t> &parents,
bytearray_t &&extra) {
if (parents.empty())
diff --git a/src/entity.cpp b/src/entity.cpp
index 594fdbe..94b896a 100644
--- a/src/entity.cpp
+++ b/src/entity.cpp
@@ -9,7 +9,7 @@ void Block::serialize(DataStream &s) const {
s << hash;
s << htole((uint32_t)cmds.size());
for (auto cmd: cmds)
- s << *cmd;
+ s << cmd;
if (qc)
s << (uint8_t)1 << *qc;
else
@@ -29,7 +29,9 @@ void Block::unserialize(DataStream &s, HotStuffCore *hsc) {
n = letoh(n);
cmds.resize(n);
for (auto &cmd: cmds)
- cmd = hsc->parse_cmd(s);
+ s >> cmd;
+// for (auto &cmd: cmds)
+// cmd = hsc->parse_cmd(s);
s >> flag;
qc = flag ? hsc->parse_quorum_cert(s) : nullptr;
s >> n;
diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp
index 98c2237..99ab80f 100644
--- a/src/hotstuff.cpp
+++ b/src/hotstuff.cpp
@@ -58,8 +58,7 @@ void MsgRespBlock::postponed_parse(HotStuffCore *hsc) {
}
// TODO: improve this function
-promise_t HotStuffBase::exec_command(command_t cmd) {
- const uint256_t &cmd_hash = cmd->get_hash();
+promise_t HotStuffBase::exec_command(uint256_t cmd_hash) {
ReplicaID proposer = pmaker->get_proposer();
if (proposer != get_id())
@@ -71,13 +70,13 @@ promise_t HotStuffBase::exec_command(command_t cmd) {
auto it = decision_waiting.find(cmd_hash);
if (it == decision_waiting.end())
{
- cmd_pending.push(storage->add_cmd(cmd));
+ cmd_pending.push(cmd_hash);
it = decision_waiting.insert(std::make_pair(cmd_hash, promise_t())).first;
}
if (cmd_pending.size() >= blk_size)
{
- std::vector<command_t> cmds;
+ std::vector<uint256_t> cmds;
for (uint32_t i = 0; i < blk_size; i++)
{
cmds.push_back(cmd_pending.front());
@@ -86,9 +85,8 @@ promise_t HotStuffBase::exec_command(command_t cmd) {
pmaker->beat().then([this, cmds = std::move(cmds)](ReplicaID proposer) {
if (proposer != get_id())
{
- for (auto &cmd: cmds)
+ for (auto &cmd_hash: cmds)
{
- const auto &cmd_hash = cmd->get_hash();
auto it = decision_waiting.find(cmd_hash);
if (it != decision_waiting.end())
{
@@ -119,7 +117,7 @@ void HotStuffBase::on_fetch_blk(const block_t &blk) {
LOG_DEBUG("fetched %.10s", get_hex(blk->get_hash()).c_str());
part_fetched++;
fetched++;
- for (auto cmd: blk->get_cmds()) on_fetch_cmd(cmd);
+ //for (auto cmd: blk->get_cmds()) on_fetch_cmd(cmd);
const uint256_t &blk_hash = blk->get_hash();
auto it = blk_fetch_waiting.find(blk_hash);
if (it != blk_fetch_waiting.end())
@@ -129,16 +127,6 @@ void HotStuffBase::on_fetch_blk(const block_t &blk) {
}
}
-void HotStuffBase::on_fetch_cmd(const command_t &cmd) {
- const uint256_t &cmd_hash = cmd->get_hash();
- auto it = cmd_fetch_waiting.find(cmd_hash);
- if (it != cmd_fetch_waiting.end())
- {
- it->second.resolve(cmd);
- cmd_fetch_waiting.erase(it);
- }
-}
-
void HotStuffBase::on_deliver_blk(const block_t &blk) {
const uint256_t &blk_hash = blk->get_hash();
bool valid;
@@ -204,24 +192,6 @@ promise_t HotStuffBase::async_fetch_blk(const uint256_t &blk_hash,
return static_cast<promise_t &>(it->second);
}
-promise_t HotStuffBase::async_fetch_cmd(const uint256_t &cmd_hash,
- const NetAddr *replica_id,
- bool fetch_now) {
- if (storage->is_cmd_fetched(cmd_hash))
- return promise_t([this, &cmd_hash](promise_t pm){
- pm.resolve(storage->find_cmd(cmd_hash));
- });
- auto it = cmd_fetch_waiting.find(cmd_hash);
- if (it == cmd_fetch_waiting.end())
- {
- it = cmd_fetch_waiting.insert(
- std::make_pair(cmd_hash, CmdFetchContext(cmd_hash, this))).first;
- }
- if (replica_id != nullptr)
- it->second.add_replica(*replica_id, fetch_now);
- return static_cast<promise_t &>(it->second);
-}
-
promise_t HotStuffBase::async_deliver_blk(const uint256_t &blk_hash,
const NetAddr &replica_id) {
if (storage->is_blk_delivered(blk_hash))
@@ -313,7 +283,6 @@ void HotStuffBase::print_stat() const {
LOG_INFO("-------- queues -------");
LOG_INFO("blk_fetch_waiting: %lu", blk_fetch_waiting.size());
LOG_INFO("blk_delivery_waiting: %lu", blk_delivery_waiting.size());
- LOG_INFO("cmd_fetch_waiting: %lu", cmd_fetch_waiting.size());
LOG_INFO("decision_waiting: %lu", decision_waiting.size());
LOG_INFO("-------- misc ---------");
LOG_INFO("fetched: %lu", fetched);
diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp
index 87c80e3..014fe16 100644
--- a/src/hotstuff_app.cpp
+++ b/src/hotstuff_app.cpp
@@ -61,11 +61,13 @@ class HotStuffApp: public HotStuff {
/** The listen address for client RPC */
NetAddr clisten_addr;
+ std::unordered_map<const uint256_t, promise_t> unconfirmed;
+
using Conn = ClientNetwork<opcode_t>::Conn;
void client_request_cmd_handler(MsgReqCmd &&, Conn &);
- command_t parse_cmd(DataStream &s) override {
+ static command_t parse_cmd(DataStream &s) {
auto cmd = new CommandDummy();
s >> *cmd;
return cmd;
@@ -81,6 +83,12 @@ class HotStuffApp: public HotStuff {
#ifndef HOTSTUFF_ENABLE_BENCHMARK
HOTSTUFF_LOG_INFO("replicated %s", std::string(fin).c_str());
#endif
+ auto it = unconfirmed.find(fin.cmd_hash);
+ if (it != unconfirmed.end())
+ {
+ it->second.resolve(fin);
+ unconfirmed.erase(it);
+ }
}
public:
@@ -244,13 +252,26 @@ HotStuffApp::HotStuffApp(uint32_t blk_size,
void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, Conn &conn) {
const NetAddr addr = conn.get_addr();
- msg.postponed_parse(this);
- auto cmd = msg.cmd;
+ auto cmd = parse_cmd(msg.serialized);
+ const auto &cmd_hash = cmd->get_hash();
std::vector<promise_t> pms;
HOTSTUFF_LOG_DEBUG("processing %s", std::string(*cmd).c_str());
- exec_command(cmd).then([this, addr](Finality fin) {
- cn.send_msg(MsgRespCmd(fin), addr);
- });
+ // record the data of the command
+ storage->add_cmd(cmd);
+ if (get_pace_maker().get_proposer() == get_id())
+ exec_command(cmd_hash).then([this, addr](Finality fin) {
+ cn.send_msg(MsgRespCmd(fin), addr);
+ });
+ else
+ {
+ auto it = unconfirmed.find(cmd_hash);
+ if (it == unconfirmed.end())
+ it = unconfirmed.insert(
+ std::make_pair(cmd_hash, promise_t([](promise_t &){}))).first;
+ it->second.then([this, addr](const Finality &fin) {
+ cn.send_msg(MsgRespCmd(std::move(fin)), addr);
+ });
+ }
}
void HotStuffApp::start() {
diff --git a/src/hotstuff_client.cpp b/src/hotstuff_client.cpp
index 390149f..b32bcb5 100644
--- a/src/hotstuff_client.cpp
+++ b/src/hotstuff_client.cpp
@@ -31,13 +31,15 @@ size_t max_async_num;
int max_iter_num;
uint32_t cid;
uint32_t cnt = 0;
+uint32_t nfaulty;
struct Request {
ReplicaID rid;
command_t cmd;
+ size_t confirmed;
salticidae::ElapsedTime et;
Request(ReplicaID rid, const command_t &cmd):
- rid(rid), cmd(cmd) { et.start(); }
+ rid(rid), cmd(cmd), confirmed(0) { et.start(); }
};
std::unordered_map<ReplicaID, MsgNetwork<opcode_t>::conn_t> conns;
@@ -46,18 +48,25 @@ std::vector<NetAddr> replicas;
std::vector<std::pair<struct timeval, double>> elapsed;
MsgNetwork<opcode_t> mn(eb, 10, 10, 4096);
+void connect_all() {
+ for (size_t i = 0; i < replicas.size(); i++)
+ conns.insert(std::make_pair(i, mn.connect(replicas[i])));
+}
+
void set_proposer(ReplicaID rid) {
proposer = rid;
- auto it = conns.find(rid);
- if (it == conns.end())
- conns.insert(std::make_pair(rid, mn.connect(replicas[rid])));
+// auto it = conns.find(rid);
+// if (it == conns.end())
+// conns.insert(std::make_pair(rid, mn.connect(replicas[rid])));
}
void try_send() {
while (waiting.size() < max_async_num && max_iter_num)
{
auto cmd = new CommandDummy(cid, cnt++);
- mn.send_msg(MsgReqCmd(*cmd), *conns.at(proposer));
+ //mn.send_msg(MsgReqCmd(*cmd), *conns.at(proposer));
+ for (auto &p: conns)
+ mn.send_msg(MsgReqCmd(*cmd), *(p.second));
#ifndef HOTSTUFF_ENABLE_BENCHMARK
HOTSTUFF_LOG_INFO("send new cmd %.10s",
get_hex(cmd->get_hash()).c_str());
@@ -77,23 +86,24 @@ void client_resp_cmd_handler(MsgRespCmd &&msg, MsgNetwork<opcode_t>::Conn &) {
auto &et = it->second.et;
if (it == waiting.end()) return;
et.stop();
- if (fin.rid != proposer)
- {
- HOTSTUFF_LOG_INFO("reconnect to the new proposer");
- set_proposer(fin.rid);
- }
- if (fin.rid != it->second.rid)
- {
- mn.send_msg(MsgReqCmd(*(waiting.find(cmd_hash)->second.cmd)),
- *conns.at(proposer));
-#ifndef HOTSTUFF_ENABLE_BENCHMARK
- HOTSTUFF_LOG_INFO("resend cmd %.10s",
- get_hex(cmd_hash).c_str());
-#endif
- et.start();
- it->second.rid = proposer;
- return;
- }
+// if (fin.rid != proposer)
+// {
+// HOTSTUFF_LOG_INFO("reconnect to the new proposer");
+// set_proposer(fin.rid);
+// }
+// if (fin.rid != it->second.rid)
+// {
+// mn.send_msg(MsgReqCmd(*(waiting.find(cmd_hash)->second.cmd)),
+// *conns.at(proposer));
+//#ifndef HOTSTUFF_ENABLE_BENCHMARK
+// HOTSTUFF_LOG_INFO("resend cmd %.10s",
+// get_hex(cmd_hash).c_str());
+//#endif
+// et.start();
+// it->second.rid = proposer;
+// return;
+// }
+ if (++it->second.confirmed <= nfaulty) return; // wait for f + 1 ack
#ifndef HOTSTUFF_ENABLE_BENCHMARK
HOTSTUFF_LOG_INFO("got %s, wall: %.3f, cpu: %.3f",
std::string(fin).c_str(),
@@ -159,6 +169,9 @@ int main(int argc, char **argv) {
replicas.push_back(NetAddr(NetAddr(_p.first).ip, htons(stoi(_p.second, &_))));
}
+ nfaulty = (replicas.size() - 1) / 3;
+ HOTSTUFF_LOG_INFO("nfaulty = %zu", nfaulty);
+ connect_all();
set_proposer(idx);
try_send();
eb.dispatch();