diff options
Diffstat (limited to 'src/hotstuff_client.cpp')
-rw-r--r-- | src/hotstuff_client.cpp | 123 |
1 files changed, 85 insertions, 38 deletions
diff --git a/src/hotstuff_client.cpp b/src/hotstuff_client.cpp index 8ee90d7..9478d36 100644 --- a/src/hotstuff_client.cpp +++ b/src/hotstuff_client.cpp @@ -13,6 +13,7 @@ using salticidae::ElapsedTime; using salticidae::trim_all; using salticidae::split; +using hotstuff::ReplicaID; using hotstuff::NetAddr; using hotstuff::EventContext; using hotstuff::Event; @@ -20,18 +21,13 @@ using hotstuff::uint256_t; using hotstuff::bytearray_t; using hotstuff::MsgClient; using hotstuff::CommandDummy; +using hotstuff::command_t; using hotstuff::Finality; +EventContext eb; size_t max_async_num = 10; int max_iter_num = 100; - -struct Request { - ElapsedTime et; - Request() { et.start(); } -}; - -std::unordered_map<int, salticidae::RingBuffer> buffers; -std::unordered_map<const uint256_t, Request> waiting; +ReplicaID proposer; int connect(const NetAddr &node) { int fd; @@ -48,6 +44,54 @@ int connect(const NetAddr &node) { return fd; } +void on_receive(int); + +struct Request { + ReplicaID rid; + command_t cmd; + ElapsedTime et; + Request(ReplicaID rid, const command_t &cmd): + rid(rid), cmd(cmd) { et.start(); } +}; + +struct Conn { + int fd; + Event on_receive_ev; + + Conn(const NetAddr &addr): + fd(connect(addr)), + on_receive_ev(eb, fd, EV_READ, [this](int fd, short) { + on_receive(fd); + on_receive_ev.add(); + }) { on_receive_ev.add(); } + + Conn(Conn &&other): + fd(other.fd), + on_receive_ev(eb, fd, EV_READ, [this](int fd, short) { + on_receive(fd); + on_receive_ev.add(); + }) { + other.fd = -1; + other.on_receive_ev.del(); + on_receive_ev.add(); + } + + ~Conn() { if (fd != -1) close(fd); } +}; + +std::unordered_map<int, salticidae::RingBuffer> buffers; +std::unordered_map<const uint256_t, Request> waiting; +std::unordered_map<ReplicaID, Conn> conns; +std::vector<NetAddr> replicas; + + +void setup(ReplicaID rid) { + proposer = rid; + auto it = conns.find(rid); + if (it == conns.end()) + conns.insert(std::make_pair(rid, Conn(replicas[rid]))); +} + void write_msg(int fd, const MsgClient &msg) { bytearray_t msg_data = msg.serialize(); if (write(fd, msg_data.data(), msg_data.size()) != (ssize_t)msg_data.size()) @@ -86,17 +130,17 @@ void read_msg(int fd, MsgClient &msg) { } } -void try_send(int fd) { +void try_send() { while (waiting.size() < max_async_num && max_iter_num) { auto cmd = CommandDummy::make_cmd(); MsgClient msg; msg.gen_reqcmd(*cmd); - write_msg(fd, msg); + write_msg(conns.find(proposer)->second.fd, msg); HOTSTUFF_LOG_INFO("send new cmd %.10s", get_hex(cmd->get_hash()).c_str()); waiting.insert(std::make_pair( - cmd->get_hash(), Request())); + cmd->get_hash(), Request(proposer, cmd))); if (max_iter_num > 0) max_iter_num--; } @@ -111,15 +155,31 @@ void on_receive(int fd) { if (!msg.verify_checksum()) HOTSTUFF_LOG_ERROR("incorrect checksum %08x", msg.get_checksum()); msg.parse_respcmd(cmd_hash, fin); + auto it = waiting.find(cmd_hash); + if (fin.rid != proposer) + { + HOTSTUFF_LOG_INFO("reconnect to the new proposer"); + setup(fin.rid); + } + if (fin.rid != it->second.rid) + { + MsgClient msg; + msg.gen_reqcmd(*(waiting.find(cmd_hash)->second.cmd)); + write_msg(conns.find(proposer)->second.fd, msg); + HOTSTUFF_LOG_INFO("resend cmd %.10s", + get_hex(cmd_hash).c_str()); + it->second.et.start(); + it->second.rid = proposer; + return; + } HOTSTUFF_LOG_INFO( "fd %d got response for %.10s: <decision=%d, blk=%.10s>", fd, get_hex(cmd_hash).c_str(), fin.decision, get_hex(fin.blk_hash).c_str()); - auto it = waiting.find(cmd_hash); if (it == waiting.end()) return; waiting.erase(it); - try_send(fd); + try_send(); } std::pair<std::string, std::string> split_ip_port_cport(const std::string &s) { @@ -127,53 +187,40 @@ std::pair<std::string, std::string> split_ip_port_cport(const std::string &s) { return std::make_pair(ret[0], ret[1]); } -Event *on_receive_ev; int main(int argc, char **argv) { Config config("hotstuff.conf"); - std::vector<NetAddr> peers2; - EventContext eb; - auto opt_idx = Config::OptValInt::create(-1); - auto opt_server_addr = Config::OptValStr::create("127.0.0.1:2234"); + auto opt_idx = Config::OptValInt::create(0); auto opt_replicas = Config::OptValStrVec::create(); auto opt_max_iter_num = Config::OptValInt::create(); try { config.add_opt("idx", opt_idx, Config::SET_VAL); - config.add_opt("server", opt_server_addr, Config::SET_VAL); config.add_opt("replica", opt_replicas, Config::APPEND); config.add_opt("ntx", opt_max_iter_num, Config::SET_VAL); config.parse(argc, argv); auto idx = opt_idx->get(); max_iter_num = opt_max_iter_num->get(); - std::vector<std::pair<std::string, std::string>> replicas; + std::vector<std::pair<std::string, std::string>> raw; for (const auto &s: opt_replicas->get()) { auto res = trim_all(split(s, ",")); assert(res.size() == 2); - replicas.push_back(std::make_pair(res[0], res[1])); + raw.push_back(std::make_pair(res[0], res[1])); } - NetAddr server(opt_server_addr->get()); - if (-1 < idx && (size_t)idx < replicas.size() && - replicas.size() > 0) + if (!(0 <= idx && (size_t)idx < raw.size() && + raw.size() > 0)) + throw std::invalid_argument("out of range"); + for (const auto &p: raw) { - for (const auto &p: replicas) - { - auto _p = split_ip_port_cport(p.first); - size_t _; - peers2.push_back(NetAddr(NetAddr(_p.first).ip, htons(stoi(_p.second, &_)))); - } - server = peers2[idx]; + auto _p = split_ip_port_cport(p.first); + size_t _; + replicas.push_back(NetAddr(NetAddr(_p.first).ip, htons(stoi(_p.second, &_)))); } - int fd = connect(server); - on_receive_ev = new Event{eb, fd, EV_READ, [](int fd, short) { - on_receive(fd); - on_receive_ev->add(); - }}; - on_receive_ev->add(); - try_send(fd); + setup(idx); + try_send(); eb.dispatch(); } catch (hotstuff::HotStuffError &e) { HOTSTUFF_LOG_ERROR("exception: %s", std::string(e).c_str()); |