diff options
author | Determinant <[email protected]> | 2018-07-18 20:15:28 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2018-07-18 20:15:28 -0400 |
commit | 70ab6576db5e49f7b2a38ea955e75328a6376812 (patch) | |
tree | f809f33c8fca2ce09c99cbb8af158f3819f34490 /src/hotstuff_client.cpp | |
parent | 960e06035636693b902d8523f1e50cafa1d62233 (diff) |
improve network impl
Diffstat (limited to 'src/hotstuff_client.cpp')
-rw-r--r-- | src/hotstuff_client.cpp | 131 |
1 files changed, 27 insertions, 104 deletions
diff --git a/src/hotstuff_client.cpp b/src/hotstuff_client.cpp index 9478d36..1363f39 100644 --- a/src/hotstuff_client.cpp +++ b/src/hotstuff_client.cpp @@ -12,39 +12,22 @@ using salticidae::Config; using salticidae::ElapsedTime; using salticidae::trim_all; using salticidae::split; +using salticidae::MsgNetwork; using hotstuff::ReplicaID; using hotstuff::NetAddr; using hotstuff::EventContext; -using hotstuff::Event; using hotstuff::uint256_t; -using hotstuff::bytearray_t; using hotstuff::MsgClient; using hotstuff::CommandDummy; using hotstuff::command_t; using hotstuff::Finality; +using hotstuff::HotStuffError; EventContext eb; -size_t max_async_num = 10; -int max_iter_num = 100; ReplicaID proposer; - -int connect(const NetAddr &node) { - int fd; - if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) - assert(0); - struct sockaddr_in sockin; - memset(&sockin, 0, sizeof(struct sockaddr_in)); - sockin.sin_family = AF_INET; - sockin.sin_addr.s_addr = node.ip; - sockin.sin_port = node.port; - if (connect(fd, (struct sockaddr *)&sockin, sizeof(struct sockaddr_in)) == -1) - assert(0); - HOTSTUFF_LOG_INFO("connected to %s", std::string(node).c_str()); - return fd; -} - -void on_receive(int); +size_t max_async_num; +int max_iter_num; struct Request { ReplicaID rid; @@ -54,80 +37,18 @@ struct Request { rid(rid), cmd(cmd) { et.start(); } }; -struct Conn { - int fd; - Event on_receive_ev; - - Conn(const NetAddr &addr): - fd(connect(addr)), - on_receive_ev(eb, fd, EV_READ, [this](int fd, short) { - on_receive(fd); - on_receive_ev.add(); - }) { on_receive_ev.add(); } - - Conn(Conn &&other): - fd(other.fd), - on_receive_ev(eb, fd, EV_READ, [this](int fd, short) { - on_receive(fd); - on_receive_ev.add(); - }) { - other.fd = -1; - other.on_receive_ev.del(); - on_receive_ev.add(); - } - - ~Conn() { if (fd != -1) close(fd); } -}; - std::unordered_map<int, salticidae::RingBuffer> buffers; std::unordered_map<const uint256_t, Request> waiting; -std::unordered_map<ReplicaID, Conn> conns; -std::vector<NetAddr> replicas; +MsgNetwork<MsgClient> mn(eb, 10, 0, 2, 4096); +std::unordered_map<ReplicaID, MsgNetwork<MsgClient>::conn_t> conns; +std::vector<NetAddr> replicas; -void setup(ReplicaID rid) { +void set_proposer(ReplicaID rid) { proposer = rid; auto it = conns.find(rid); if (it == conns.end()) - conns.insert(std::make_pair(rid, Conn(replicas[rid]))); -} - -void write_msg(int fd, const MsgClient &msg) { - bytearray_t msg_data = msg.serialize(); - if (write(fd, msg_data.data(), msg_data.size()) != (ssize_t)msg_data.size()) - assert(0); -} - -void read_msg(int fd, MsgClient &msg) { - static const size_t BUFF_SEG_SIZE = 1024; - ssize_t ret; - auto &buffer = buffers[fd]; - bool read_body = false; - for (;;) - { - bytearray_t buff_seg; - if (!read_body && buffer.size() >= MsgClient::header_size) - { - buff_seg = buffer.pop(MsgClient::header_size); - msg = MsgClient(buff_seg.data()); - read_body = true; - } - if (read_body && buffer.size() >= msg.get_length()) - { - buff_seg = buffer.pop(msg.get_length()); - msg.set_payload(std::move(buff_seg)); - return; - } - - buff_seg.resize(BUFF_SEG_SIZE); - ret = read(fd, buff_seg.data(), BUFF_SEG_SIZE); - assert(ret != -1); - if (ret > 0) - { - buff_seg.resize(ret); - buffer.push(std::move(buff_seg)); - } - } + conns.insert(std::make_pair(rid, mn.create_conn(replicas[rid]))); } void try_send() { @@ -136,7 +57,7 @@ void try_send() { auto cmd = CommandDummy::make_cmd(); MsgClient msg; msg.gen_reqcmd(*cmd); - write_msg(conns.find(proposer)->second.fd, msg); + mn.send_msg(msg, conns.find(proposer)->second); HOTSTUFF_LOG_INFO("send new cmd %.10s", get_hex(cmd->get_hash()).c_str()); waiting.insert(std::make_pair( @@ -146,11 +67,9 @@ void try_send() { } } -void on_receive(int fd) { - MsgClient msg; +void on_receive(const MsgClient &msg, MsgNetwork<MsgClient>::conn_t) { uint256_t cmd_hash; Finality fin; - read_msg(fd, msg); HOTSTUFF_LOG_DEBUG("got %s", std::string(msg).c_str()); if (!msg.verify_checksum()) HOTSTUFF_LOG_ERROR("incorrect checksum %08x", msg.get_checksum()); @@ -159,13 +78,13 @@ void on_receive(int fd) { if (fin.rid != proposer) { HOTSTUFF_LOG_INFO("reconnect to the new proposer"); - setup(fin.rid); + set_proposer(fin.rid); } if (fin.rid != it->second.rid) { MsgClient msg; msg.gen_reqcmd(*(waiting.find(cmd_hash)->second.cmd)); - write_msg(conns.find(proposer)->second.fd, msg); + mn.send_msg(msg, conns.find(proposer)->second); HOTSTUFF_LOG_INFO("resend cmd %.10s", get_hex(cmd_hash).c_str()); it->second.et.start(); @@ -173,8 +92,8 @@ void on_receive(int fd) { return; } HOTSTUFF_LOG_INFO( - "fd %d got response for %.10s: <decision=%d, blk=%.10s>", - fd, get_hex(cmd_hash).c_str(), + "got response for %.10s: <decision=%d, blk=%.10s>", + get_hex(cmd_hash).c_str(), fin.decision, get_hex(fin.blk_hash).c_str()); if (it == waiting.end()) return; @@ -187,30 +106,34 @@ std::pair<std::string, std::string> split_ip_port_cport(const std::string &s) { return std::make_pair(ret[0], ret[1]); } - int main(int argc, char **argv) { Config config("hotstuff.conf"); auto opt_idx = Config::OptValInt::create(0); auto opt_replicas = Config::OptValStrVec::create(); - auto opt_max_iter_num = Config::OptValInt::create(); + auto opt_max_iter_num = Config::OptValInt::create(100); + auto opt_max_async_num = Config::OptValInt::create(10); + + mn.reg_handler(hotstuff::RESP_CMD, on_receive); try { config.add_opt("idx", opt_idx, Config::SET_VAL); config.add_opt("replica", opt_replicas, Config::APPEND); - config.add_opt("ntx", opt_max_iter_num, Config::SET_VAL); + config.add_opt("iter", opt_max_iter_num, Config::SET_VAL); + config.add_opt("max-async", opt_max_async_num, Config::SET_VAL); config.parse(argc, argv); auto idx = opt_idx->get(); max_iter_num = opt_max_iter_num->get(); + max_async_num = opt_max_async_num->get(); std::vector<std::pair<std::string, std::string>> raw; for (const auto &s: opt_replicas->get()) { auto res = trim_all(split(s, ",")); - assert(res.size() == 2); + if (res.size() != 2) + throw HotStuffError("format error"); raw.push_back(std::make_pair(res[0], res[1])); } - if (!(0 <= idx && (size_t)idx < raw.size() && - raw.size() > 0)) + if (!(0 <= idx && (size_t)idx < raw.size() && raw.size() > 0)) throw std::invalid_argument("out of range"); for (const auto &p: raw) { @@ -219,10 +142,10 @@ int main(int argc, char **argv) { replicas.push_back(NetAddr(NetAddr(_p.first).ip, htons(stoi(_p.second, &_)))); } - setup(idx); + set_proposer(idx); try_send(); eb.dispatch(); - } catch (hotstuff::HotStuffError &e) { + } catch (HotStuffError &e) { HOTSTUFF_LOG_ERROR("exception: %s", std::string(e).c_str()); } return 0; |