aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <tederminant@gmail.com>2018-07-20 20:09:24 -0400
committerDeterminant <tederminant@gmail.com>2018-07-20 20:09:24 -0400
commit41efd33a3e165ac329f14b6e1cea935076a8b790 (patch)
tree129dc7a6e161edffa35917707df7273a97cf1dfd
parent928b2c1910bfe957a4cc31746aa395c9ed98006f (diff)
improve msg & msg network interface
-rw-r--r--include/hotstuff/client.h27
-rw-r--r--include/hotstuff/consensus.h4
-rw-r--r--include/hotstuff/hotstuff.h60
-rw-r--r--include/hotstuff/type.h1
-rwxr-xr-xrun_replicas.sh4
m---------salticidae0
-rw-r--r--src/client.cpp40
-rw-r--r--src/hotstuff.cpp115
-rw-r--r--src/hotstuff_app.cpp29
-rw-r--r--src/hotstuff_client.cpp26
10 files changed, 138 insertions, 168 deletions
diff --git a/include/hotstuff/client.h b/include/hotstuff/client.h
index 95a003d..92b4eec 100644
--- a/include/hotstuff/client.h
+++ b/include/hotstuff/client.h
@@ -8,22 +8,21 @@
namespace hotstuff {
-enum {
- REQ_CMD = 0x4,
- RESP_CMD = 0x5,
- CHK_CMD = 0x6
+struct MsgReqCmd {
+ static const opcode_t opcode = 0x4;
+ DataStream serialized;
+ command_t cmd;
+ MsgReqCmd(const Command &cmd);
+ MsgReqCmd(DataStream &&s): serialized(std::move(s)) {}
+ void postponed_parse(HotStuffCore *hsc);
};
-struct MsgClient: public salticidae::MsgBase<> {
- using MsgBase::MsgBase;
- void gen_reqcmd(const Command &cmd);
- void parse_reqcmd(command_t &cmd, HotStuffCore *hsc) const;
-
- void gen_respcmd(const Finality &fin);
- void parse_respcmd(Finality &fin) const;
-
- void gen_chkcmd(const uint256_t &cmd_hash);
- void parse_chkcmd(uint256_t &cmd_hash) const;
+struct MsgRespCmd {
+ static const opcode_t opcode = 0x5;
+ DataStream serialized;
+ Finality fin;
+ MsgRespCmd(const Finality &fin);
+ MsgRespCmd(DataStream &&s);
};
class CommandDummy: public Command {
diff --git a/include/hotstuff/consensus.h b/include/hotstuff/consensus.h
index 475f1f2..6de6bd3 100644
--- a/include/hotstuff/consensus.h
+++ b/include/hotstuff/consensus.h
@@ -145,7 +145,7 @@ struct Proposal: public Serializable {
* a pointer to the object of the class derived from HotStuffCore */
HotStuffCore *hsc;
- Proposal(HotStuffCore *hsc): blk(nullptr), hsc(hsc) {}
+ Proposal(): blk(nullptr), hsc(nullptr) {}
Proposal(ReplicaID proposer,
const uint256_t &bqc_hash,
block_t &blk,
@@ -192,7 +192,7 @@ struct Vote: public Serializable {
/** handle of the core object to allow polymorphism */
HotStuffCore *hsc;
- Vote(HotStuffCore *hsc): cert(nullptr), hsc(hsc) {}
+ Vote(): cert(nullptr), hsc(nullptr) {}
Vote(ReplicaID voter,
const uint256_t &bqc_hash,
const uint256_t &blk_hash,
diff --git a/include/hotstuff/hotstuff.h b/include/hotstuff/hotstuff.h
index 45992f1..b6bebb9 100644
--- a/include/hotstuff/hotstuff.h
+++ b/include/hotstuff/hotstuff.h
@@ -31,19 +31,43 @@ enum {
};
/** Network message format for HotStuff. */
-struct MsgHotStuff: public salticidae::MsgBase<> {
- using MsgBase::MsgBase;
- void gen_propose(const Proposal &);
- void parse_propose(Proposal &) const;
+struct MsgPropose {
+ static const opcode_t opcode = 0x0;
+ DataStream serialized;
+ Proposal proposal;
+ MsgPropose(const Proposal &);
+ /** Only move the data to serialized, do not parse immediately. */
+ MsgPropose(DataStream &&s): serialized(std::move(s)) {}
+ /** Parse the serialized data to blks now, with `hsc->storage`. */
+ void postponed_parse(HotStuffCore *hsc);
+};
+
+struct MsgVote {
+ static const opcode_t opcode = 0x1;
+ DataStream serialized;
+ Vote vote;
+ MsgVote(const Vote &);
+ MsgVote(DataStream &&s): serialized(std::move(s)) {}
+ void postponed_parse(HotStuffCore *hsc);
+};
- void gen_vote(const Vote &);
- void parse_vote(Vote &) const;
+struct MsgReqBlock {
+ static const opcode_t opcode = 0x2;
+ DataStream serialized;
+ std::vector<uint256_t> blk_hashes;
+ MsgReqBlock() = default;
+ MsgReqBlock(const std::vector<uint256_t> &blk_hashes);
+ MsgReqBlock(DataStream &&s);
+};
- void gen_qfetchblk(const std::vector<uint256_t> &blk_hashes);
- void parse_qfetchblk(std::vector<uint256_t> &blk_hashes) const;
- void gen_rfetchblk(const std::vector<block_t> &blks);
- void parse_rfetchblk(std::vector<block_t> &blks, HotStuffCore *hsc) const;
+struct MsgRespBlock {
+ static const opcode_t opcode = 0x3;
+ DataStream serialized;
+ std::vector<block_t> blks;
+ MsgRespBlock(const std::vector<block_t> &blks);
+ MsgRespBlock(DataStream &&s): serialized(std::move(s)) {}
+ void postponed_parse(HotStuffCore *hsc);
};
using promise::promise_t;
@@ -54,7 +78,7 @@ template<EntityType ent_type>
class FetchContext: public promise_t {
Event timeout;
HotStuffBase *hs;
- MsgHotStuff fetch_msg;
+ MsgReqBlock fetch_msg;
const uint256_t ent_hash;
std::unordered_set<NetAddr> replica_ids;
inline void timeout_cb(evutil_socket_t, short);
@@ -92,7 +116,7 @@ class BlockDeliveryContext: public promise_t {
class HotStuffBase: public HotStuffCore {
using BlockFetchContext = FetchContext<ENT_TYPE_BLK>;
using CmdFetchContext = FetchContext<ENT_TYPE_CMD>;
- using conn_t = MsgNetwork<MsgHotStuff>::conn_t;
+ using conn_t = PeerNetwork<opcode_t>::conn_t;
friend BlockFetchContext;
friend CmdFetchContext;
@@ -110,7 +134,7 @@ class HotStuffBase: public HotStuffCore {
/** whether libevent handle is owned by itself */
bool eb_loop;
/** network stack */
- PeerNetwork<MsgHotStuff> pn;
+ PeerNetwork<opcode_t> pn;
#ifdef HOTSTUFF_ENABLE_BLK_PROFILE
BlockProfiler blk_profiler;
#endif
@@ -142,13 +166,13 @@ class HotStuffBase: public HotStuffCore {
void on_deliver_blk(const block_t &blk);
/** deliver consensus message: <propose> */
- inline void propose_handler(const MsgHotStuff &, conn_t);
+ inline void propose_handler(MsgPropose &&, conn_t);
/** deliver consensus message: <vote> */
- inline void vote_handler(const MsgHotStuff &, conn_t);
+ inline void vote_handler(MsgVote &&, conn_t);
/** fetches full block data */
- inline void query_fetch_blk_handler(const MsgHotStuff &, conn_t);
+ inline void req_blk_handler(MsgReqBlock &&, conn_t);
/** receives a block */
- inline void resp_fetch_blk_handler(const MsgHotStuff &, conn_t);
+ inline void resp_blk_handler(MsgRespBlock &&, conn_t);
void do_broadcast_proposal(const Proposal &) override;
void do_vote(ReplicaID, const Vote &) override;
@@ -278,7 +302,7 @@ FetchContext<ent_type>::FetchContext(
const uint256_t &ent_hash, HotStuffBase *hs):
promise_t([](promise_t){}),
hs(hs), ent_hash(ent_hash) {
- fetch_msg.gen_qfetchblk(std::vector<uint256_t>{ent_hash});
+ fetch_msg = std::vector<uint256_t>{ent_hash};
timeout = Event(hs->eb, -1, 0,
std::bind(&FetchContext::timeout_cb, this, _1, _2));
diff --git a/include/hotstuff/type.h b/include/hotstuff/type.h
index 670ee6c..6014dab 100644
--- a/include/hotstuff/type.h
+++ b/include/hotstuff/type.h
@@ -53,6 +53,7 @@ class Cloneable {
};
using ReplicaID = uint16_t;
+using opcode_t = uint8_t;
}
diff --git a/run_replicas.sh b/run_replicas.sh
index 52f33e8..f053701 100755
--- a/run_replicas.sh
+++ b/run_replicas.sh
@@ -5,8 +5,8 @@ if [[ $# -gt 0 ]]; then
fi
for i in "${rep[@]}"; do
echo "starting replica $i"
- #valgrind --leak-check=full ./hotstuff-app --conf hotstuff-sec${i}.conf > log${i} 2>&1 &
- gdb -ex r -ex bt -ex q --args ./hotstuff-app --conf hotstuff-sec${i}.conf > log${i} 2>&1 &
+ valgrind --leak-check=full ./hotstuff-app --conf hotstuff-sec${i}.conf > log${i} 2>&1 &
+ #gdb -ex r -ex bt -ex q --args ./hotstuff-app --conf hotstuff-sec${i}.conf > log${i} 2>&1 &
#./hotstuff-app --conf hotstuff-sec${i}.conf > log${i} 2>&1 &
done
wait
diff --git a/salticidae b/salticidae
-Subproject 0bd0ddd69c5f6d5f59fcf73a943491ba026b2c4
+Subproject 0465243c710ede74a78885077140d8673efbc64
diff --git a/src/client.cpp b/src/client.cpp
index bc790bc..562fab5 100644
--- a/src/client.cpp
+++ b/src/client.cpp
@@ -4,40 +4,14 @@ namespace hotstuff {
uint64_t CommandDummy::cnt = 0;
-void MsgClient::gen_reqcmd(const Command &cmd) {
- DataStream s;
- set_opcode(REQ_CMD);
- s << cmd;
- set_payload(std::move(s));
+const opcode_t MsgReqCmd::opcode;
+MsgReqCmd::MsgReqCmd(const Command &cmd) { serialized << cmd; }
+void MsgReqCmd::postponed_parse(HotStuffCore *hsc) {
+ cmd = hsc->parse_cmd(serialized);
}
-void MsgClient::parse_reqcmd(command_t &cmd, HotStuffCore *hsc) const {
- DataStream s(get_payload());
- cmd = hsc->parse_cmd(s);
-}
-
-void MsgClient::gen_respcmd(const Finality &fin) {
- DataStream s;
- set_opcode(RESP_CMD);
- s << fin;
- set_payload(std::move(s));
-}
-
-void MsgClient::parse_respcmd(Finality &fin) const {
- DataStream s(get_payload());
- s >> fin;
-}
-
-void MsgClient::gen_chkcmd(const uint256_t &cmd_hash) {
- DataStream s;
- set_opcode(CHK_CMD);
- s << cmd_hash;
- set_payload(std::move(s));
-}
-
-void MsgClient::parse_chkcmd(uint256_t &cmd_hash) const {
- DataStream s(get_payload());
- s >> cmd_hash;
-}
+const opcode_t MsgRespCmd::opcode;
+MsgRespCmd::MsgRespCmd(const Finality &fin) { serialized << fin; }
+MsgRespCmd::MsgRespCmd(DataStream &&s) { s >> fin; }
}
diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp
index b247d2e..43da8a8 100644
--- a/src/hotstuff.cpp
+++ b/src/hotstuff.cpp
@@ -9,58 +9,50 @@ using salticidae::static_pointer_cast;
namespace hotstuff {
-void MsgHotStuff::gen_propose(const Proposal &proposal) {
- DataStream s;
- set_opcode(PROPOSE);
- s << proposal;
- set_payload(std::move(s));
+const opcode_t MsgPropose::opcode;
+MsgPropose::MsgPropose(const Proposal &proposal) { serialized << proposal; }
+void MsgPropose::postponed_parse(HotStuffCore *hsc) {
+ proposal.hsc = hsc;
+ serialized >> proposal;
}
-void MsgHotStuff::parse_propose(Proposal &proposal) const {
- DataStream(get_payload()) >> proposal;
+const opcode_t MsgVote::opcode;
+MsgVote::MsgVote(const Vote &vote) { serialized << vote; }
+void MsgVote::postponed_parse(HotStuffCore *hsc) {
+ vote.hsc = hsc;
+ serialized >> vote;
}
-void MsgHotStuff::gen_vote(const Vote &vote) {
- DataStream s;
- set_opcode(VOTE);
- s << vote;
- set_payload(std::move(s));
-}
-
-void MsgHotStuff::parse_vote(Vote &vote) const {
- DataStream(get_payload()) >> vote;
-}
-
-void MsgHotStuff::gen_qfetchblk(const std::vector<uint256_t> &blk_hashes) {
- DataStream s;
- set_opcode(QUERY_FETCH_BLK);
- gen_hash_list(s, blk_hashes);
- set_payload(std::move(s));
+const opcode_t MsgReqBlock::opcode;
+MsgReqBlock::MsgReqBlock(const std::vector<uint256_t> &blk_hashes) {
+ serialized << (uint32_t)htole(blk_hashes.size());
+ for (const auto &h: blk_hashes)
+ serialized << h;
}
-void MsgHotStuff::parse_qfetchblk(std::vector<uint256_t> &blk_hashes) const {
- DataStream s(get_payload());
- parse_hash_list(s, blk_hashes);
+MsgReqBlock::MsgReqBlock(DataStream &&s) {
+ uint32_t size;
+ s >> size;
+ size = letoh(size);
+ blk_hashes.resize(size);
+ for (auto &h: blk_hashes) s >> h;
}
-void MsgHotStuff::gen_rfetchblk(const std::vector<block_t> &blks) {
- DataStream s;
- set_opcode(RESP_FETCH_BLK);
- s << htole((uint32_t)blks.size());
- for (auto blk: blks) s << *blk;
- set_payload(std::move(s));
+const opcode_t MsgRespBlock::opcode;
+MsgRespBlock::MsgRespBlock(const std::vector<block_t> &blks) {
+ serialized << htole((uint32_t)blks.size());
+ for (auto blk: blks) serialized << *blk;
}
-void MsgHotStuff::parse_rfetchblk(std::vector<block_t> &blks, HotStuffCore *hsc) const {
- DataStream s(get_payload());
+void MsgRespBlock::postponed_parse(HotStuffCore *hsc) {
uint32_t size;
- s >> size;
+ serialized >> size;
size = letoh(size);
blks.resize(size);
for (auto &blk: blks)
{
Block _blk;
- _blk.unserialize(s, hsc);
+ _blk.unserialize(serialized, hsc);
if (!_blk.verify(hsc->get_config()))
blk = hsc->storage->add_blk(std::move(_blk));
else
@@ -247,11 +239,10 @@ promise_t HotStuffBase::async_deliver_blk(const uint256_t &blk_hash,
return static_cast<promise_t &>(pm);
}
-void HotStuffBase::propose_handler(const MsgHotStuff &msg, conn_t conn_) {
- auto conn = static_pointer_cast<PeerNetwork<MsgHotStuff>::Conn>(conn_);
+void HotStuffBase::propose_handler(MsgPropose &&msg, conn_t conn) {
const NetAddr &peer = conn->get_peer();
- Proposal prop(this);
- msg.parse_propose(prop);
+ msg.postponed_parse(this);
+ auto &prop = msg.proposal;
block_t blk = prop.blk;
promise::all(std::vector<promise_t>{
async_deliver_blk(prop.bqc_hash, peer),
@@ -261,11 +252,10 @@ void HotStuffBase::propose_handler(const MsgHotStuff &msg, conn_t conn_) {
});
}
-void HotStuffBase::vote_handler(const MsgHotStuff &msg, conn_t conn_) {
- auto conn = static_pointer_cast<PeerNetwork<MsgHotStuff>::Conn>(conn_);
+void HotStuffBase::vote_handler(MsgVote &&msg, conn_t conn) {
const NetAddr &peer = conn->get_peer();
- Vote vote(this);
- msg.parse_vote(vote);
+ msg.postponed_parse(this);
+ auto &vote = msg.vote;
promise::all(std::vector<promise_t>{
async_deliver_blk(vote.bqc_hash, peer),
async_deliver_blk(vote.blk_hash, peer)
@@ -274,32 +264,26 @@ void HotStuffBase::vote_handler(const MsgHotStuff &msg, conn_t conn_) {
});
}
-void HotStuffBase::query_fetch_blk_handler(const MsgHotStuff &msg, conn_t conn_) {
- auto conn = static_pointer_cast<PeerNetwork<MsgHotStuff>::Conn>(conn_);
+void HotStuffBase::req_blk_handler(MsgReqBlock &&msg, conn_t conn) {
const NetAddr replica = conn->get_peer();
- std::vector<uint256_t> blk_hashes;
- msg.parse_qfetchblk(blk_hashes);
-
+ auto &blk_hashes = msg.blk_hashes;
std::vector<promise_t> pms;
for (const auto &h: blk_hashes)
pms.push_back(async_fetch_blk(h, nullptr));
promise::all(pms).then([replica, this](const promise::values_t values) {
- MsgHotStuff resp;
std::vector<block_t> blks;
for (auto &v: values)
{
auto blk = promise::any_cast<block_t>(v);
blks.push_back(blk);
}
- resp.gen_rfetchblk(blks);
- pn.send_msg(resp, replica);
+ pn.send_msg(MsgRespBlock(blks), replica);
});
}
-void HotStuffBase::resp_fetch_blk_handler(const MsgHotStuff &msg, conn_t) {
- std::vector<block_t> blks;
- msg.parse_rfetchblk(blks, this);
- for (const auto &blk: blks)
+void HotStuffBase::resp_blk_handler(MsgRespBlock &&msg, conn_t) {
+ msg.postponed_parse(this);
+ for (const auto &blk: msg.blks)
if (blk) on_fetch_blk(blk);
}
@@ -377,8 +361,8 @@ void HotStuffBase::print_stat() const {
LOG_INFO("--- replica msg. total ---");
LOG_INFO("sent: %lu", nsent);
LOG_INFO("recv: %lu", nrecv);
- LOG_INFO("====== end stats ======");
#endif
+ LOG_INFO("====== end stats ======");
}
HotStuffBase::HotStuffBase(uint32_t blk_size,
@@ -406,16 +390,15 @@ HotStuffBase::HotStuffBase(uint32_t blk_size,
part_delivery_time_max(0)
{
/* register the handlers for msg from replicas */
- pn.reg_handler(PROPOSE, std::bind(&HotStuffBase::propose_handler, this, _1, _2));
- pn.reg_handler(VOTE, std::bind(&HotStuffBase::vote_handler, this, _1, _2));
- pn.reg_handler(QUERY_FETCH_BLK, std::bind(&HotStuffBase::query_fetch_blk_handler, this, _1, _2));
- pn.reg_handler(RESP_FETCH_BLK, std::bind(&HotStuffBase::resp_fetch_blk_handler, this, _1, _2));
+ pn.reg_handler(salticidae::handler_bind(&HotStuffBase::propose_handler, this, _1, _2));
+ pn.reg_handler(salticidae::handler_bind(&HotStuffBase::vote_handler, this, _1, _2));
+ pn.reg_handler(salticidae::handler_bind(&HotStuffBase::req_blk_handler, this, _1, _2));
+ pn.reg_handler(salticidae::handler_bind(&HotStuffBase::resp_blk_handler, this, _1, _2));
pn.listen(listen_addr);
}
void HotStuffBase::do_broadcast_proposal(const Proposal &prop) {
- MsgHotStuff prop_msg;
- prop_msg.gen_propose(prop);
+ MsgPropose prop_msg(prop);
for (const auto &replica: pn.all_peers())
pn.send_msg(prop_msg, replica);
}
@@ -426,11 +409,7 @@ void HotStuffBase::do_vote(ReplicaID last_proposer, const Vote &vote) {
if (proposer == get_id())
on_receive_vote(vote);
else
- {
- MsgHotStuff vote_msg;
- vote_msg.gen_vote(vote);
- pn.send_msg(vote_msg, get_config().get_addr(proposer));
- }
+ pn.send_msg(MsgVote(vote), get_config().get_addr(proposer));
});
}
diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp
index eae73d1..e91919e 100644
--- a/src/hotstuff_app.cpp
+++ b/src/hotstuff_app.cpp
@@ -37,10 +37,12 @@ using hotstuff::CommandDummy;
using hotstuff::Finality;
using hotstuff::command_t;
using hotstuff::uint256_t;
+using hotstuff::opcode_t;
using hotstuff::bytearray_t;
using hotstuff::DataStream;
using hotstuff::ReplicaID;
-using hotstuff::MsgClient;
+using hotstuff::MsgReqCmd;
+using hotstuff::MsgRespCmd;
using hotstuff::get_hash;
using hotstuff::promise_t;
@@ -55,7 +57,7 @@ class HotStuffApp: public HotStuff {
double stat_period;
EventContext eb;
/** Network messaging between a replica and its client. */
- ClientNetwork<MsgClient> cn;
+ ClientNetwork<opcode_t> cn;
/** Timer object to schedule a periodic printing of system statistics */
Event ev_stat_timer;
/** The listen address for client RPC */
@@ -63,9 +65,9 @@ class HotStuffApp: public HotStuff {
/** Maximum number of parents. */
int32_t parent_limit;
- using conn_client_t = MsgNetwork<MsgClient>::conn_t;
+ using conn_t = ClientNetwork<opcode_t>::conn_t;
- void client_request_cmd_handler(const MsgClient &, conn_client_t);
+ void client_request_cmd_handler(MsgReqCmd &&, conn_t);
void print_stat_cb(evutil_socket_t, short);
command_t parse_cmd(DataStream &s) override {
@@ -209,17 +211,15 @@ HotStuffApp::HotStuffApp(uint32_t blk_size,
clisten_addr(clisten_addr),
parent_limit(parent_limit) {
/* register the handlers for msg from clients */
- cn.reg_handler(hotstuff::REQ_CMD, std::bind(&HotStuffApp::client_request_cmd_handler, this, _1, _2));
+ cn.reg_handler(salticidae::handler_bind(&HotStuffApp::client_request_cmd_handler, this, _1, _2));
cn.listen(clisten_addr);
}
-void HotStuffApp::client_request_cmd_handler(const MsgClient &msg, conn_client_t conn_) {
- auto conn = static_pointer_cast<ClientNetwork<MsgClient>::Conn>(conn_);
+void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, conn_t conn) {
const NetAddr addr = conn->get_addr();
- command_t cmd;
+ msg.postponed_parse(this);
+ auto cmd = msg.cmd;
std::vector<promise_t> pms;
- msg.parse_reqcmd(cmd, this);
-
bool flag = true;
#ifndef HOTSTUFF_DISABLE_TX_VERIFY
flag &= cmd->verify();
@@ -228,17 +228,14 @@ void HotStuffApp::client_request_cmd_handler(const MsgClient &msg, conn_client_t
if (!flag)
{
LOG_WARN("invalid client cmd");
- MsgClient resp;
- resp.gen_respcmd(Finality(get_id(), -1, 0, 0, cmd_hash, uint256_t()));
- cn.send_msg(resp, addr);
+ cn.send_msg(MsgRespCmd(
+ Finality(get_id(), -1, 0, 0, cmd_hash, uint256_t())), addr);
}
else
{
LOG_DEBUG("processing %s", std::string(*cmd).c_str());
exec_command(cmd).then([this, addr](Finality fin) {
- MsgClient resp;
- resp.gen_respcmd(fin);
- cn.send_msg(resp, addr);
+ cn.send_msg(MsgRespCmd(fin), addr);
});
}
}
diff --git a/src/hotstuff_client.cpp b/src/hotstuff_client.cpp
index 2fd36bd..c1e7ea4 100644
--- a/src/hotstuff_client.cpp
+++ b/src/hotstuff_client.cpp
@@ -18,11 +18,13 @@ using hotstuff::ReplicaID;
using hotstuff::NetAddr;
using hotstuff::EventContext;
using hotstuff::uint256_t;
-using hotstuff::MsgClient;
+using hotstuff::MsgReqCmd;
+using hotstuff::MsgRespCmd;
using hotstuff::CommandDummy;
using hotstuff::command_t;
using hotstuff::Finality;
using hotstuff::HotStuffError;
+using hotstuff::opcode_t;
EventContext eb;
ReplicaID proposer;
@@ -37,10 +39,10 @@ struct Request {
rid(rid), cmd(cmd) { et.start(); }
};
-std::unordered_map<ReplicaID, MsgNetwork<MsgClient>::conn_t> conns;
+std::unordered_map<ReplicaID, MsgNetwork<opcode_t>::conn_t> conns;
std::unordered_map<const uint256_t, Request> waiting;
std::vector<NetAddr> replicas;
-MsgNetwork<MsgClient> mn(eb, 10, 10, 4096);
+MsgNetwork<opcode_t> mn(eb, 10, 10, 4096);
void set_proposer(ReplicaID rid) {
proposer = rid;
@@ -53,9 +55,7 @@ void try_send() {
while (waiting.size() < max_async_num && max_iter_num)
{
auto cmd = CommandDummy::make_cmd();
- MsgClient msg;
- msg.gen_reqcmd(*cmd);
- mn.send_msg(msg, conns.find(proposer)->second);
+ mn.send_msg(MsgReqCmd(*cmd), conns.find(proposer)->second);
HOTSTUFF_LOG_INFO("send new cmd %.10s",
get_hex(cmd->get_hash()).c_str());
waiting.insert(std::make_pair(
@@ -65,12 +65,9 @@ void try_send() {
}
}
-void on_receive(const MsgClient &msg, MsgNetwork<MsgClient>::conn_t) {
- Finality fin;
+void client_resp_cmd_handler(MsgRespCmd &&msg, MsgNetwork<opcode_t>::conn_t) {
+ auto &fin = msg.fin;
HOTSTUFF_LOG_DEBUG("got %s", std::string(msg).c_str());
- if (!msg.verify_checksum())
- HOTSTUFF_LOG_ERROR("incorrect checksum %08x", msg.get_checksum());
- msg.parse_respcmd(fin);
const uint256_t &cmd_hash = fin.cmd_hash;
auto it = waiting.find(cmd_hash);
if (fin.rid != proposer)
@@ -80,9 +77,8 @@ void on_receive(const MsgClient &msg, MsgNetwork<MsgClient>::conn_t) {
}
if (fin.rid != it->second.rid)
{
- MsgClient msg;
- msg.gen_reqcmd(*(waiting.find(cmd_hash)->second.cmd));
- mn.send_msg(msg, conns.find(proposer)->second);
+ mn.send_msg(MsgReqCmd(*(waiting.find(cmd_hash)->second.cmd)),
+ conns.find(proposer)->second);
HOTSTUFF_LOG_INFO("resend cmd %.10s",
get_hex(cmd_hash).c_str());
it->second.et.start();
@@ -107,7 +103,7 @@ int main(int argc, char **argv) {
auto opt_max_iter_num = Config::OptValInt::create(100);
auto opt_max_async_num = Config::OptValInt::create(10);
- mn.reg_handler(hotstuff::RESP_CMD, on_receive);
+ mn.reg_handler(client_resp_cmd_handler);
try {
config.add_opt("idx", opt_idx, Config::SET_VAL);