From 41efd33a3e165ac329f14b6e1cea935076a8b790 Mon Sep 17 00:00:00 2001 From: Determinant Date: Fri, 20 Jul 2018 20:09:24 -0400 Subject: improve msg & msg network interface --- src/hotstuff.cpp | 115 +++++++++++++++++++++++-------------------------------- 1 file changed, 47 insertions(+), 68 deletions(-) (limited to 'src/hotstuff.cpp') diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp index b247d2e..43da8a8 100644 --- a/src/hotstuff.cpp +++ b/src/hotstuff.cpp @@ -9,58 +9,50 @@ using salticidae::static_pointer_cast; namespace hotstuff { -void MsgHotStuff::gen_propose(const Proposal &proposal) { - DataStream s; - set_opcode(PROPOSE); - s << proposal; - set_payload(std::move(s)); +const opcode_t MsgPropose::opcode; +MsgPropose::MsgPropose(const Proposal &proposal) { serialized << proposal; } +void MsgPropose::postponed_parse(HotStuffCore *hsc) { + proposal.hsc = hsc; + serialized >> proposal; } -void MsgHotStuff::parse_propose(Proposal &proposal) const { - DataStream(get_payload()) >> proposal; +const opcode_t MsgVote::opcode; +MsgVote::MsgVote(const Vote &vote) { serialized << vote; } +void MsgVote::postponed_parse(HotStuffCore *hsc) { + vote.hsc = hsc; + serialized >> vote; } -void MsgHotStuff::gen_vote(const Vote &vote) { - DataStream s; - set_opcode(VOTE); - s << vote; - set_payload(std::move(s)); -} - -void MsgHotStuff::parse_vote(Vote &vote) const { - DataStream(get_payload()) >> vote; -} - -void MsgHotStuff::gen_qfetchblk(const std::vector &blk_hashes) { - DataStream s; - set_opcode(QUERY_FETCH_BLK); - gen_hash_list(s, blk_hashes); - set_payload(std::move(s)); +const opcode_t MsgReqBlock::opcode; +MsgReqBlock::MsgReqBlock(const std::vector &blk_hashes) { + serialized << (uint32_t)htole(blk_hashes.size()); + for (const auto &h: blk_hashes) + serialized << h; } -void MsgHotStuff::parse_qfetchblk(std::vector &blk_hashes) const { - DataStream s(get_payload()); - parse_hash_list(s, blk_hashes); +MsgReqBlock::MsgReqBlock(DataStream &&s) { + uint32_t size; + s >> size; + size = letoh(size); + blk_hashes.resize(size); + for (auto &h: blk_hashes) s >> h; } -void MsgHotStuff::gen_rfetchblk(const std::vector &blks) { - DataStream s; - set_opcode(RESP_FETCH_BLK); - s << htole((uint32_t)blks.size()); - for (auto blk: blks) s << *blk; - set_payload(std::move(s)); +const opcode_t MsgRespBlock::opcode; +MsgRespBlock::MsgRespBlock(const std::vector &blks) { + serialized << htole((uint32_t)blks.size()); + for (auto blk: blks) serialized << *blk; } -void MsgHotStuff::parse_rfetchblk(std::vector &blks, HotStuffCore *hsc) const { - DataStream s(get_payload()); +void MsgRespBlock::postponed_parse(HotStuffCore *hsc) { uint32_t size; - s >> size; + serialized >> size; size = letoh(size); blks.resize(size); for (auto &blk: blks) { Block _blk; - _blk.unserialize(s, hsc); + _blk.unserialize(serialized, hsc); if (!_blk.verify(hsc->get_config())) blk = hsc->storage->add_blk(std::move(_blk)); else @@ -247,11 +239,10 @@ promise_t HotStuffBase::async_deliver_blk(const uint256_t &blk_hash, return static_cast(pm); } -void HotStuffBase::propose_handler(const MsgHotStuff &msg, conn_t conn_) { - auto conn = static_pointer_cast::Conn>(conn_); +void HotStuffBase::propose_handler(MsgPropose &&msg, conn_t conn) { const NetAddr &peer = conn->get_peer(); - Proposal prop(this); - msg.parse_propose(prop); + msg.postponed_parse(this); + auto &prop = msg.proposal; block_t blk = prop.blk; promise::all(std::vector{ async_deliver_blk(prop.bqc_hash, peer), @@ -261,11 +252,10 @@ void HotStuffBase::propose_handler(const MsgHotStuff &msg, conn_t conn_) { }); } -void HotStuffBase::vote_handler(const MsgHotStuff &msg, conn_t conn_) { - auto conn = static_pointer_cast::Conn>(conn_); +void HotStuffBase::vote_handler(MsgVote &&msg, conn_t conn) { const NetAddr &peer = conn->get_peer(); - Vote vote(this); - msg.parse_vote(vote); + msg.postponed_parse(this); + auto &vote = msg.vote; promise::all(std::vector{ async_deliver_blk(vote.bqc_hash, peer), async_deliver_blk(vote.blk_hash, peer) @@ -274,32 +264,26 @@ void HotStuffBase::vote_handler(const MsgHotStuff &msg, conn_t conn_) { }); } -void HotStuffBase::query_fetch_blk_handler(const MsgHotStuff &msg, conn_t conn_) { - auto conn = static_pointer_cast::Conn>(conn_); +void HotStuffBase::req_blk_handler(MsgReqBlock &&msg, conn_t conn) { const NetAddr replica = conn->get_peer(); - std::vector blk_hashes; - msg.parse_qfetchblk(blk_hashes); - + auto &blk_hashes = msg.blk_hashes; std::vector pms; for (const auto &h: blk_hashes) pms.push_back(async_fetch_blk(h, nullptr)); promise::all(pms).then([replica, this](const promise::values_t values) { - MsgHotStuff resp; std::vector blks; for (auto &v: values) { auto blk = promise::any_cast(v); blks.push_back(blk); } - resp.gen_rfetchblk(blks); - pn.send_msg(resp, replica); + pn.send_msg(MsgRespBlock(blks), replica); }); } -void HotStuffBase::resp_fetch_blk_handler(const MsgHotStuff &msg, conn_t) { - std::vector blks; - msg.parse_rfetchblk(blks, this); - for (const auto &blk: blks) +void HotStuffBase::resp_blk_handler(MsgRespBlock &&msg, conn_t) { + msg.postponed_parse(this); + for (const auto &blk: msg.blks) if (blk) on_fetch_blk(blk); } @@ -377,8 +361,8 @@ void HotStuffBase::print_stat() const { LOG_INFO("--- replica msg. total ---"); LOG_INFO("sent: %lu", nsent); LOG_INFO("recv: %lu", nrecv); - LOG_INFO("====== end stats ======"); #endif + LOG_INFO("====== end stats ======"); } HotStuffBase::HotStuffBase(uint32_t blk_size, @@ -406,16 +390,15 @@ HotStuffBase::HotStuffBase(uint32_t blk_size, part_delivery_time_max(0) { /* register the handlers for msg from replicas */ - pn.reg_handler(PROPOSE, std::bind(&HotStuffBase::propose_handler, this, _1, _2)); - pn.reg_handler(VOTE, std::bind(&HotStuffBase::vote_handler, this, _1, _2)); - pn.reg_handler(QUERY_FETCH_BLK, std::bind(&HotStuffBase::query_fetch_blk_handler, this, _1, _2)); - pn.reg_handler(RESP_FETCH_BLK, std::bind(&HotStuffBase::resp_fetch_blk_handler, this, _1, _2)); + pn.reg_handler(salticidae::handler_bind(&HotStuffBase::propose_handler, this, _1, _2)); + pn.reg_handler(salticidae::handler_bind(&HotStuffBase::vote_handler, this, _1, _2)); + pn.reg_handler(salticidae::handler_bind(&HotStuffBase::req_blk_handler, this, _1, _2)); + pn.reg_handler(salticidae::handler_bind(&HotStuffBase::resp_blk_handler, this, _1, _2)); pn.listen(listen_addr); } void HotStuffBase::do_broadcast_proposal(const Proposal &prop) { - MsgHotStuff prop_msg; - prop_msg.gen_propose(prop); + MsgPropose prop_msg(prop); for (const auto &replica: pn.all_peers()) pn.send_msg(prop_msg, replica); } @@ -426,11 +409,7 @@ void HotStuffBase::do_vote(ReplicaID last_proposer, const Vote &vote) { if (proposer == get_id()) on_receive_vote(vote); else - { - MsgHotStuff vote_msg; - vote_msg.gen_vote(vote); - pn.send_msg(vote_msg, get_config().get_addr(proposer)); - } + pn.send_msg(MsgVote(vote), get_config().get_addr(proposer)); }); } -- cgit v1.2.3