diff options
author | Determinant <[email protected]> | 2018-07-18 20:15:28 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2018-07-18 20:15:28 -0400 |
commit | 70ab6576db5e49f7b2a38ea955e75328a6376812 (patch) | |
tree | f809f33c8fca2ce09c99cbb8af158f3819f34490 /src | |
parent | 960e06035636693b902d8523f1e50cafa1d62233 (diff) |
improve network impl
Diffstat (limited to 'src')
-rw-r--r-- | src/consensus.cpp | 67 | ||||
-rw-r--r-- | src/hotstuff.cpp | 2 | ||||
-rw-r--r-- | src/hotstuff_app.cpp | 2 | ||||
-rw-r--r-- | src/hotstuff_client.cpp | 131 |
4 files changed, 63 insertions, 139 deletions
diff --git a/src/consensus.cpp b/src/consensus.cpp index 688c450..bf4868d 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -31,7 +31,7 @@ void HotStuffCore::sanity_check_delivered(const block_t &blk) { throw std::runtime_error("block not delivered"); } -block_t HotStuffCore::sanity_check_delivered(const uint256_t &blk_hash) { +block_t HotStuffCore::get_delivered_blk(const uint256_t &blk_hash) { block_t blk = storage->find_blk(blk_hash); if (blk == nullptr || !blk->delivered) throw std::runtime_error("block not delivered"); @@ -46,10 +46,7 @@ bool HotStuffCore::on_deliver_blk(const block_t &blk) { } blk->parents.clear(); for (const auto &hash: blk->parent_hashes) - { - block_t p = sanity_check_delivered(hash); - blk->parents.push_back(p); - } + blk->parents.push_back(get_delivered_blk(hash)); blk->height = blk->parents[0]->height + 1; for (const auto &cmd: blk->cmds) cmd->container = blk; @@ -66,41 +63,41 @@ bool HotStuffCore::on_deliver_blk(const block_t &blk) { tails.insert(blk); blk->delivered = true; - LOG_DEBUG("delivered %.10s", get_hex(blk->get_hash()).c_str()); + LOG_DEBUG("deliver %s", std::string(*blk).c_str()); return true; } void HotStuffCore::check_commit(const block_t &_blk) { const block_t &blk = _blk->qc_ref; if (blk->qc_ref == nullptr) return; + /* decided blk could possible be incomplete due to pruning */ if (blk->decision) return; block_t p = blk->parents[0]; - if (p == blk->qc_ref) - { /* commit */ - std::vector<block_t> commit_queue; - block_t b; - for (b = p; b->height > bexec->height; b = b->parents[0]) - { /* todo: also commit the uncles/aunts */ - commit_queue.push_back(b); - } - if (b != bexec) - throw std::runtime_error("safety breached :("); - for (auto it = commit_queue.rbegin(); it != commit_queue.rend(); it++) - { - const block_t &blk = *it; - blk->decision = 1; + /* commit requires direct parent */ + if (p != blk->qc_ref) return; + /* otherwise commit */ + std::vector<block_t> commit_queue; + block_t b; + for (b = p; b->height > bexec->height; b = b->parents[0]) + { /* TODO: also commit the uncles/aunts */ + commit_queue.push_back(b); + } + if (b != bexec) + throw std::runtime_error("safety breached :("); + for (auto it = commit_queue.rbegin(); it != commit_queue.rend(); it++) + { + const block_t &blk = *it; + blk->decision = 1; #ifdef HOTSTUFF_ENABLE_LOG_PROTO - LOG_INFO("commit blk %.10s", get_hex10(blk->get_hash()).c_str()); + LOG_INFO("commit %s", std::string(*blk).c_str()); #endif - for (auto cmd: blk->cmds) - do_decide(cmd); - } - bexec = p; + for (auto cmd: blk->cmds) do_decide(cmd); } + bexec = p; } bool HotStuffCore::update(const uint256_t &bqc_hash) { - block_t _bqc = sanity_check_delivered(bqc_hash); + block_t _bqc = get_delivered_blk(bqc_hash); if (_bqc->qc_ref == nullptr) return false; check_commit(_bqc); if (_bqc->qc_ref->height > bqc->qc_ref->height) @@ -116,13 +113,14 @@ void HotStuffCore::on_propose(const std::vector<command_t> &cmds, block_t p = parents[0]; quorum_cert_bt qc = nullptr; block_t qc_ref = nullptr; + /* a block can optionally carray a QC */ if (p != b0 && p->voted.size() >= config.nmajority) { qc = p->self_qc->clone(); qc->compute(); qc_ref = p; } - /* create a new block */ + /* create the new block */ block_t bnew = storage->add_blk( Block( parents, @@ -163,8 +161,11 @@ void HotStuffCore::on_receive_proposal(const Proposal &prop) { for (b = bnew; b->height > pref->height; b = b->parents[0]); - opinion = b == pref; - vheight = bnew->height; + if (b == pref) /* on the same branch */ + { + opinion = true; + vheight = bnew->height; + } } #ifdef HOTSTUFF_ENABLE_LOG_PROTO LOG_INFO("now state: %s", std::string(*this).c_str()); @@ -185,17 +186,17 @@ void HotStuffCore::on_receive_vote(const Vote &vote) { LOG_INFO("got %s", std::string(vote).c_str()); LOG_INFO("now state: %s", std::string(*this).c_str()); #endif - - block_t blk = sanity_check_delivered(vote.blk_hash); + block_t blk = get_delivered_blk(vote.blk_hash); if (vote.cert == nullptr) return; + /* otherwise the vote is positive */ if (!vote.verify()) { - LOG_WARN("invalid vote"); + LOG_WARN("invalid vote from %d", vote.voter); return; } if (!blk->voted.insert(vote.voter).second) { - LOG_WARN("duplicate votes"); + LOG_WARN("duplicate vote from %d", vote.voter); return; } size_t qsize = blk->voted.size(); diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp index d0b42c3..ed15cc1 100644 --- a/src/hotstuff.cpp +++ b/src/hotstuff.cpp @@ -411,7 +411,7 @@ HotStuffBase::HotStuffBase(uint32_t blk_size, pn.reg_handler(VOTE, std::bind(&HotStuffBase::vote_handler, this, _1, _2)); pn.reg_handler(QUERY_FETCH_BLK, std::bind(&HotStuffBase::query_fetch_blk_handler, this, _1, _2)); pn.reg_handler(RESP_FETCH_BLK, std::bind(&HotStuffBase::resp_fetch_blk_handler, this, _1, _2)); - pn.init(listen_addr); + pn.listen(listen_addr); } void HotStuffBase::do_broadcast_proposal(const Proposal &prop) { diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp index e1eec1b..28f02fc 100644 --- a/src/hotstuff_app.cpp +++ b/src/hotstuff_app.cpp @@ -221,7 +221,7 @@ HotStuffApp::HotStuffApp(uint32_t blk_size, /* register the handlers for msg from clients */ cn.reg_handler(hotstuff::REQ_CMD, std::bind(&HotStuffApp::client_request_cmd_handler, this, _1, _2)); cn.reg_handler(hotstuff::CHK_CMD, std::bind(&HotStuffApp::client_check_cmd_handler, this, _1, _2)); - cn.init(clisten_addr); + cn.listen(clisten_addr); } void HotStuffApp::client_request_cmd_handler(const MsgClient &msg, conn_client_t conn_) { diff --git a/src/hotstuff_client.cpp b/src/hotstuff_client.cpp index 9478d36..1363f39 100644 --- a/src/hotstuff_client.cpp +++ b/src/hotstuff_client.cpp @@ -12,39 +12,22 @@ using salticidae::Config; using salticidae::ElapsedTime; using salticidae::trim_all; using salticidae::split; +using salticidae::MsgNetwork; using hotstuff::ReplicaID; using hotstuff::NetAddr; using hotstuff::EventContext; -using hotstuff::Event; using hotstuff::uint256_t; -using hotstuff::bytearray_t; using hotstuff::MsgClient; using hotstuff::CommandDummy; using hotstuff::command_t; using hotstuff::Finality; +using hotstuff::HotStuffError; EventContext eb; -size_t max_async_num = 10; -int max_iter_num = 100; ReplicaID proposer; - -int connect(const NetAddr &node) { - int fd; - if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) - assert(0); - struct sockaddr_in sockin; - memset(&sockin, 0, sizeof(struct sockaddr_in)); - sockin.sin_family = AF_INET; - sockin.sin_addr.s_addr = node.ip; - sockin.sin_port = node.port; - if (connect(fd, (struct sockaddr *)&sockin, sizeof(struct sockaddr_in)) == -1) - assert(0); - HOTSTUFF_LOG_INFO("connected to %s", std::string(node).c_str()); - return fd; -} - -void on_receive(int); +size_t max_async_num; +int max_iter_num; struct Request { ReplicaID rid; @@ -54,80 +37,18 @@ struct Request { rid(rid), cmd(cmd) { et.start(); } }; -struct Conn { - int fd; - Event on_receive_ev; - - Conn(const NetAddr &addr): - fd(connect(addr)), - on_receive_ev(eb, fd, EV_READ, [this](int fd, short) { - on_receive(fd); - on_receive_ev.add(); - }) { on_receive_ev.add(); } - - Conn(Conn &&other): - fd(other.fd), - on_receive_ev(eb, fd, EV_READ, [this](int fd, short) { - on_receive(fd); - on_receive_ev.add(); - }) { - other.fd = -1; - other.on_receive_ev.del(); - on_receive_ev.add(); - } - - ~Conn() { if (fd != -1) close(fd); } -}; - std::unordered_map<int, salticidae::RingBuffer> buffers; std::unordered_map<const uint256_t, Request> waiting; -std::unordered_map<ReplicaID, Conn> conns; -std::vector<NetAddr> replicas; +MsgNetwork<MsgClient> mn(eb, 10, 0, 2, 4096); +std::unordered_map<ReplicaID, MsgNetwork<MsgClient>::conn_t> conns; +std::vector<NetAddr> replicas; -void setup(ReplicaID rid) { +void set_proposer(ReplicaID rid) { proposer = rid; auto it = conns.find(rid); if (it == conns.end()) - conns.insert(std::make_pair(rid, Conn(replicas[rid]))); -} - -void write_msg(int fd, const MsgClient &msg) { - bytearray_t msg_data = msg.serialize(); - if (write(fd, msg_data.data(), msg_data.size()) != (ssize_t)msg_data.size()) - assert(0); -} - -void read_msg(int fd, MsgClient &msg) { - static const size_t BUFF_SEG_SIZE = 1024; - ssize_t ret; - auto &buffer = buffers[fd]; - bool read_body = false; - for (;;) - { - bytearray_t buff_seg; - if (!read_body && buffer.size() >= MsgClient::header_size) - { - buff_seg = buffer.pop(MsgClient::header_size); - msg = MsgClient(buff_seg.data()); - read_body = true; - } - if (read_body && buffer.size() >= msg.get_length()) - { - buff_seg = buffer.pop(msg.get_length()); - msg.set_payload(std::move(buff_seg)); - return; - } - - buff_seg.resize(BUFF_SEG_SIZE); - ret = read(fd, buff_seg.data(), BUFF_SEG_SIZE); - assert(ret != -1); - if (ret > 0) - { - buff_seg.resize(ret); - buffer.push(std::move(buff_seg)); - } - } + conns.insert(std::make_pair(rid, mn.create_conn(replicas[rid]))); } void try_send() { @@ -136,7 +57,7 @@ void try_send() { auto cmd = CommandDummy::make_cmd(); MsgClient msg; msg.gen_reqcmd(*cmd); - write_msg(conns.find(proposer)->second.fd, msg); + mn.send_msg(msg, conns.find(proposer)->second); HOTSTUFF_LOG_INFO("send new cmd %.10s", get_hex(cmd->get_hash()).c_str()); waiting.insert(std::make_pair( @@ -146,11 +67,9 @@ void try_send() { } } -void on_receive(int fd) { - MsgClient msg; +void on_receive(const MsgClient &msg, MsgNetwork<MsgClient>::conn_t) { uint256_t cmd_hash; Finality fin; - read_msg(fd, msg); HOTSTUFF_LOG_DEBUG("got %s", std::string(msg).c_str()); if (!msg.verify_checksum()) HOTSTUFF_LOG_ERROR("incorrect checksum %08x", msg.get_checksum()); @@ -159,13 +78,13 @@ void on_receive(int fd) { if (fin.rid != proposer) { HOTSTUFF_LOG_INFO("reconnect to the new proposer"); - setup(fin.rid); + set_proposer(fin.rid); } if (fin.rid != it->second.rid) { MsgClient msg; msg.gen_reqcmd(*(waiting.find(cmd_hash)->second.cmd)); - write_msg(conns.find(proposer)->second.fd, msg); + mn.send_msg(msg, conns.find(proposer)->second); HOTSTUFF_LOG_INFO("resend cmd %.10s", get_hex(cmd_hash).c_str()); it->second.et.start(); @@ -173,8 +92,8 @@ void on_receive(int fd) { return; } HOTSTUFF_LOG_INFO( - "fd %d got response for %.10s: <decision=%d, blk=%.10s>", - fd, get_hex(cmd_hash).c_str(), + "got response for %.10s: <decision=%d, blk=%.10s>", + get_hex(cmd_hash).c_str(), fin.decision, get_hex(fin.blk_hash).c_str()); if (it == waiting.end()) return; @@ -187,30 +106,34 @@ std::pair<std::string, std::string> split_ip_port_cport(const std::string &s) { return std::make_pair(ret[0], ret[1]); } - int main(int argc, char **argv) { Config config("hotstuff.conf"); auto opt_idx = Config::OptValInt::create(0); auto opt_replicas = Config::OptValStrVec::create(); - auto opt_max_iter_num = Config::OptValInt::create(); + auto opt_max_iter_num = Config::OptValInt::create(100); + auto opt_max_async_num = Config::OptValInt::create(10); + + mn.reg_handler(hotstuff::RESP_CMD, on_receive); try { config.add_opt("idx", opt_idx, Config::SET_VAL); config.add_opt("replica", opt_replicas, Config::APPEND); - config.add_opt("ntx", opt_max_iter_num, Config::SET_VAL); + config.add_opt("iter", opt_max_iter_num, Config::SET_VAL); + config.add_opt("max-async", opt_max_async_num, Config::SET_VAL); config.parse(argc, argv); auto idx = opt_idx->get(); max_iter_num = opt_max_iter_num->get(); + max_async_num = opt_max_async_num->get(); std::vector<std::pair<std::string, std::string>> raw; for (const auto &s: opt_replicas->get()) { auto res = trim_all(split(s, ",")); - assert(res.size() == 2); + if (res.size() != 2) + throw HotStuffError("format error"); raw.push_back(std::make_pair(res[0], res[1])); } - if (!(0 <= idx && (size_t)idx < raw.size() && - raw.size() > 0)) + if (!(0 <= idx && (size_t)idx < raw.size() && raw.size() > 0)) throw std::invalid_argument("out of range"); for (const auto &p: raw) { @@ -219,10 +142,10 @@ int main(int argc, char **argv) { replicas.push_back(NetAddr(NetAddr(_p.first).ip, htons(stoi(_p.second, &_)))); } - setup(idx); + set_proposer(idx); try_send(); eb.dispatch(); - } catch (hotstuff::HotStuffError &e) { + } catch (HotStuffError &e) { HOTSTUFF_LOG_ERROR("exception: %s", std::string(e).c_str()); } return 0; |