From e08bf4e6a40cf82822c50b1433a573d0d8800f80 Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 17 Jul 2018 20:02:52 -0400 Subject: add PaceMakerDummyFixed --- src/client.cpp | 4 +- src/consensus.cpp | 2 +- src/hotstuff.cpp | 45 +++++++++++++++--- src/hotstuff_app.cpp | 39 ++++++++++----- src/hotstuff_client.cpp | 123 +++++++++++++++++++++++++++++++++--------------- 5 files changed, 155 insertions(+), 58 deletions(-) (limited to 'src') diff --git a/src/client.cpp b/src/client.cpp index 482c23d..486594a 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -11,9 +11,9 @@ void MsgClient::gen_reqcmd(const Command &cmd) { set_payload(std::move(s)); } -void MsgClient::parse_reqcmd(CommandDummy &cmd) const { +void MsgClient::parse_reqcmd(command_t &cmd, HotStuffCore *hsc) const { DataStream s(get_payload()); - s >> cmd; + cmd = hsc->parse_cmd(s); } void MsgClient::gen_respcmd(const uint256_t &cmd_hash, const Finality &fin) { diff --git a/src/consensus.cpp b/src/consensus.cpp index 7749558..e42fb49 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -186,7 +186,7 @@ void HotStuffCore::on_receive_proposal(const Proposal &prop) { (opinion ? create_part_cert(*priv_key, bnew->get_hash()) : nullptr), - nullptr)); + this)); } void HotStuffCore::on_receive_vote(const Vote &vote) { diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp index 9582531..f4454d4 100644 --- a/src/hotstuff.cpp +++ b/src/hotstuff.cpp @@ -70,6 +70,26 @@ void MsgHotStuff::parse_rfetchblk(std::vector &blks, HotStuffCore *hsc) } } +ReplicaID HotStuffBase::add_command(command_t cmd) { + ReplicaID proposer = pmaker->get_proposer(); + if (proposer != get_id()) + return proposer; + cmd_pending.push(storage->add_cmd(cmd)); + if (cmd_pending.size() >= blk_size) + { + std::vector cmds; + for (uint32_t i = 0; i < blk_size; i++) + { + cmds.push_back(cmd_pending.front()); + cmd_pending.pop(); + } + pmaker->beat().then([this, cmds = std::move(cmds)]() { + on_propose(cmds); + }); + } + return proposer; +} + void HotStuffBase::add_replica(ReplicaID idx, const NetAddr &addr, pubkey_bt &&pub_key) { HotStuffCore::add_replica(idx, addr, std::move(pub_key)); @@ -387,8 +407,6 @@ HotStuffBase::HotStuffBase(uint32_t blk_size, part_delivery_time_min(double_inf), part_delivery_time_max(0) { - if (pmaker == nullptr) - this->pmaker = new PaceMakerDummy(this); /* register the handlers for msg from replicas */ pn.reg_handler(PROPOSE, std::bind(&HotStuffBase::propose_handler, this, _1, _2)); pn.reg_handler(VOTE, std::bind(&HotStuffBase::vote_handler, this, _1, _2)); @@ -405,11 +423,16 @@ void HotStuffBase::do_broadcast_proposal(const Proposal &prop) { } void HotStuffBase::do_vote(ReplicaID last_proposer, const Vote &vote) { - MsgHotStuff vote_msg; - vote_msg.gen_vote(vote); pmaker->next_proposer(last_proposer) - .then([this, vote_msg](ReplicaID proposer) { - pn.send_msg(vote_msg, get_config().get_addr(proposer)); + .then([this, vote](ReplicaID proposer) { + if (proposer == get_id()) + on_receive_vote(vote); + else + { + MsgHotStuff vote_msg; + vote_msg.gen_vote(vote); + pn.send_msg(vote_msg, get_config().get_addr(proposer)); + } }); } @@ -422,6 +445,15 @@ void HotStuffBase::do_decide(const command_t &cmd) { } } +void HotStuffBase::do_forward(const uint256_t &cmd_hash, ReplicaID rid) { + auto it = decision_waiting.find(cmd_hash); + if (it != decision_waiting.end()) + { + it->second.reject(rid); + decision_waiting.erase(it); + } +} + HotStuffBase::~HotStuffBase() {} void HotStuffBase::start(bool eb_loop) { @@ -429,6 +461,7 @@ void HotStuffBase::start(bool eb_loop) { uint32_t nfaulty = pn.all_peers().size() / 3; if (nfaulty == 0) LOG_WARN("too few replicas in the system to tolerate any failure"); + pmaker->init(this); on_init(nfaulty); if (eb_loop) eb.dispatch(); diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp index e0c9c3c..5f21fec 100644 --- a/src/hotstuff_app.cpp +++ b/src/hotstuff_app.cpp @@ -70,6 +70,13 @@ class HotStuffApp: public HotStuff { /** checks if a cmd is decided */ inline void client_check_cmd_handler(const MsgClient &, conn_client_t); + Finality get_finality(const command_t &cmd) const { + hotstuff::block_t blk = cmd->get_container(); + return Finality(get_id(), + cmd->get_decision(), + blk ? blk->get_hash() : uint256_t()); + } + /** The callback function to print stat */ inline void print_stat_cb(evutil_socket_t, short); @@ -203,7 +210,7 @@ HotStuffApp::HotStuffApp(uint32_t blk_size, NetAddr clisten_addr, const EventContext &eb): HotStuff(blk_size, parent_limit, idx, raw_privkey, - plisten_addr, eb), + plisten_addr, eb, new hotstuff::PaceMakerDummyFixed(1)), stat_period(stat_period), eb(eb), cn(eb), @@ -217,9 +224,9 @@ HotStuffApp::HotStuffApp(uint32_t blk_size, void HotStuffApp::client_request_cmd_handler(const MsgClient &msg, conn_client_t conn_) { auto conn = static_pointer_cast::Conn>(conn_); const NetAddr addr = conn->get_addr(); - command_t cmd = new CommandDummy(); + command_t cmd; std::vector pms; - msg.parse_reqcmd(static_cast(*cmd)); + msg.parse_reqcmd(cmd, this); bool flag = true; #ifndef HOTSTUFF_DISABLE_TX_VERIFY @@ -229,20 +236,30 @@ void HotStuffApp::client_request_cmd_handler(const MsgClient &msg, conn_client_t { LOG_WARN("invalid client cmd"); MsgClient resp; - resp.gen_respcmd(cmd->get_hash(), Finality(-1, uint256_t())); + resp.gen_respcmd(cmd->get_hash(), Finality(get_id(), -1, uint256_t())); cn.send_msg(resp, addr); } else { const uint256_t cmd_hash = cmd->get_hash(); - add_command(cmd); - /** wait for the decision of tx */ - LOG_DEBUG("processing client cmd %.10s", get_hex(cmd_hash).c_str()); - async_decide(cmd_hash).then([this, addr](command_t cmd) { + ReplicaID rid = add_command(cmd); + if (rid == get_id()) + { + /** wait for the decision of tx */ + LOG_DEBUG("processing client cmd %.10s", get_hex(cmd_hash).c_str()); + async_decide(cmd_hash).then([this, addr](command_t cmd) { + MsgClient resp; + resp.gen_respcmd(cmd->get_hash(), get_finality(cmd)); + cn.send_msg(resp, addr); + }); + } + else + { + LOG_INFO("redirect"); MsgClient resp; - resp.gen_respcmd(cmd->get_hash(), cmd->get_finality()); + resp.gen_respcmd(cmd_hash, Finality(rid, 0, cmd_hash)); cn.send_msg(resp, addr); - }); + } } } @@ -254,7 +271,7 @@ void HotStuffApp::client_check_cmd_handler(const MsgClient &msg, conn_client_t c MsgClient resp; command_t cmd = storage->find_cmd(cmd_hash); Finality fin; - if (cmd) fin = cmd->get_finality(); + if (cmd) fin = get_finality(cmd); resp.gen_respcmd(cmd_hash, fin); cn.send_msg(resp, addr); } diff --git a/src/hotstuff_client.cpp b/src/hotstuff_client.cpp index 8ee90d7..9478d36 100644 --- a/src/hotstuff_client.cpp +++ b/src/hotstuff_client.cpp @@ -13,6 +13,7 @@ using salticidae::ElapsedTime; using salticidae::trim_all; using salticidae::split; +using hotstuff::ReplicaID; using hotstuff::NetAddr; using hotstuff::EventContext; using hotstuff::Event; @@ -20,18 +21,13 @@ using hotstuff::uint256_t; using hotstuff::bytearray_t; using hotstuff::MsgClient; using hotstuff::CommandDummy; +using hotstuff::command_t; using hotstuff::Finality; +EventContext eb; size_t max_async_num = 10; int max_iter_num = 100; - -struct Request { - ElapsedTime et; - Request() { et.start(); } -}; - -std::unordered_map buffers; -std::unordered_map waiting; +ReplicaID proposer; int connect(const NetAddr &node) { int fd; @@ -48,6 +44,54 @@ int connect(const NetAddr &node) { return fd; } +void on_receive(int); + +struct Request { + ReplicaID rid; + command_t cmd; + ElapsedTime et; + Request(ReplicaID rid, const command_t &cmd): + rid(rid), cmd(cmd) { et.start(); } +}; + +struct Conn { + int fd; + Event on_receive_ev; + + Conn(const NetAddr &addr): + fd(connect(addr)), + on_receive_ev(eb, fd, EV_READ, [this](int fd, short) { + on_receive(fd); + on_receive_ev.add(); + }) { on_receive_ev.add(); } + + Conn(Conn &&other): + fd(other.fd), + on_receive_ev(eb, fd, EV_READ, [this](int fd, short) { + on_receive(fd); + on_receive_ev.add(); + }) { + other.fd = -1; + other.on_receive_ev.del(); + on_receive_ev.add(); + } + + ~Conn() { if (fd != -1) close(fd); } +}; + +std::unordered_map buffers; +std::unordered_map waiting; +std::unordered_map conns; +std::vector replicas; + + +void setup(ReplicaID rid) { + proposer = rid; + auto it = conns.find(rid); + if (it == conns.end()) + conns.insert(std::make_pair(rid, Conn(replicas[rid]))); +} + void write_msg(int fd, const MsgClient &msg) { bytearray_t msg_data = msg.serialize(); if (write(fd, msg_data.data(), msg_data.size()) != (ssize_t)msg_data.size()) @@ -86,17 +130,17 @@ void read_msg(int fd, MsgClient &msg) { } } -void try_send(int fd) { +void try_send() { while (waiting.size() < max_async_num && max_iter_num) { auto cmd = CommandDummy::make_cmd(); MsgClient msg; msg.gen_reqcmd(*cmd); - write_msg(fd, msg); + write_msg(conns.find(proposer)->second.fd, msg); HOTSTUFF_LOG_INFO("send new cmd %.10s", get_hex(cmd->get_hash()).c_str()); waiting.insert(std::make_pair( - cmd->get_hash(), Request())); + cmd->get_hash(), Request(proposer, cmd))); if (max_iter_num > 0) max_iter_num--; } @@ -111,15 +155,31 @@ void on_receive(int fd) { if (!msg.verify_checksum()) HOTSTUFF_LOG_ERROR("incorrect checksum %08x", msg.get_checksum()); msg.parse_respcmd(cmd_hash, fin); + auto it = waiting.find(cmd_hash); + if (fin.rid != proposer) + { + HOTSTUFF_LOG_INFO("reconnect to the new proposer"); + setup(fin.rid); + } + if (fin.rid != it->second.rid) + { + MsgClient msg; + msg.gen_reqcmd(*(waiting.find(cmd_hash)->second.cmd)); + write_msg(conns.find(proposer)->second.fd, msg); + HOTSTUFF_LOG_INFO("resend cmd %.10s", + get_hex(cmd_hash).c_str()); + it->second.et.start(); + it->second.rid = proposer; + return; + } HOTSTUFF_LOG_INFO( "fd %d got response for %.10s: ", fd, get_hex(cmd_hash).c_str(), fin.decision, get_hex(fin.blk_hash).c_str()); - auto it = waiting.find(cmd_hash); if (it == waiting.end()) return; waiting.erase(it); - try_send(fd); + try_send(); } std::pair split_ip_port_cport(const std::string &s) { @@ -127,53 +187,40 @@ std::pair split_ip_port_cport(const std::string &s) { return std::make_pair(ret[0], ret[1]); } -Event *on_receive_ev; int main(int argc, char **argv) { Config config("hotstuff.conf"); - std::vector peers2; - EventContext eb; - auto opt_idx = Config::OptValInt::create(-1); - auto opt_server_addr = Config::OptValStr::create("127.0.0.1:2234"); + auto opt_idx = Config::OptValInt::create(0); auto opt_replicas = Config::OptValStrVec::create(); auto opt_max_iter_num = Config::OptValInt::create(); try { config.add_opt("idx", opt_idx, Config::SET_VAL); - config.add_opt("server", opt_server_addr, Config::SET_VAL); config.add_opt("replica", opt_replicas, Config::APPEND); config.add_opt("ntx", opt_max_iter_num, Config::SET_VAL); config.parse(argc, argv); auto idx = opt_idx->get(); max_iter_num = opt_max_iter_num->get(); - std::vector> replicas; + std::vector> raw; for (const auto &s: opt_replicas->get()) { auto res = trim_all(split(s, ",")); assert(res.size() == 2); - replicas.push_back(std::make_pair(res[0], res[1])); + raw.push_back(std::make_pair(res[0], res[1])); } - NetAddr server(opt_server_addr->get()); - if (-1 < idx && (size_t)idx < replicas.size() && - replicas.size() > 0) + if (!(0 <= idx && (size_t)idx < raw.size() && + raw.size() > 0)) + throw std::invalid_argument("out of range"); + for (const auto &p: raw) { - for (const auto &p: replicas) - { - auto _p = split_ip_port_cport(p.first); - size_t _; - peers2.push_back(NetAddr(NetAddr(_p.first).ip, htons(stoi(_p.second, &_)))); - } - server = peers2[idx]; + auto _p = split_ip_port_cport(p.first); + size_t _; + replicas.push_back(NetAddr(NetAddr(_p.first).ip, htons(stoi(_p.second, &_)))); } - int fd = connect(server); - on_receive_ev = new Event{eb, fd, EV_READ, [](int fd, short) { - on_receive(fd); - on_receive_ev->add(); - }}; - on_receive_ev->add(); - try_send(fd); + setup(idx); + try_send(); eb.dispatch(); } catch (hotstuff::HotStuffError &e) { HOTSTUFF_LOG_ERROR("exception: %s", std::string(e).c_str()); -- cgit v1.2.3