diff options
author | Determinant <[email protected]> | 2018-07-18 20:15:28 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2018-07-18 20:15:28 -0400 |
commit | 70ab6576db5e49f7b2a38ea955e75328a6376812 (patch) | |
tree | f809f33c8fca2ce09c99cbb8af158f3819f34490 | |
parent | 960e06035636693b902d8523f1e50cafa1d62233 (diff) |
improve network impl
-rw-r--r-- | include/hotstuff/consensus.h | 2 | ||||
-rw-r--r-- | include/hotstuff/liveness.h | 16 | ||||
-rwxr-xr-x | run_client.sh | 2 | ||||
m--------- | salticidae | 0 | ||||
-rw-r--r-- | src/consensus.cpp | 67 | ||||
-rw-r--r-- | src/hotstuff.cpp | 2 | ||||
-rw-r--r-- | src/hotstuff_app.cpp | 2 | ||||
-rw-r--r-- | src/hotstuff_client.cpp | 131 |
8 files changed, 80 insertions, 142 deletions
diff --git a/include/hotstuff/consensus.h b/include/hotstuff/consensus.h index 73d47ef..1f1e6ea 100644 --- a/include/hotstuff/consensus.h +++ b/include/hotstuff/consensus.h @@ -31,7 +31,7 @@ class HotStuffCore { std::unordered_map<block_t, promise_t> qc_waiting; promise_t propose_waiting; - block_t sanity_check_delivered(const uint256_t &blk_hash); + block_t get_delivered_blk(const uint256_t &blk_hash); void sanity_check_delivered(const block_t &blk); void check_commit(const block_t &_bqc); bool update(const uint256_t &bqc_hash); diff --git a/include/hotstuff/liveness.h b/include/hotstuff/liveness.h index a51c032..a625f0d 100644 --- a/include/hotstuff/liveness.h +++ b/include/hotstuff/liveness.h @@ -11,12 +11,18 @@ class PaceMaker { HotStuffCore *hsc; public: virtual ~PaceMaker() = default; + /** Initialize the PaceMaker. A derived class should also call the + * default implementation to set `hsc`. */ virtual void init(HotStuffCore *_hsc) { hsc = _hsc; } /** Get a promise resolved when the pace maker thinks it is a *good* time * to issue new commands. When promise is resolved, the replica should * propose the command. */ virtual promise_t beat() = 0; + /** Get the current proposer. */ virtual ReplicaID get_proposer() = 0; + /** Select the parent blocks for a new block. + * @return Parent blocks. The block at index 0 is the direct parent, while + * the others are uncles/aunts. The returned vector should be non-empty. */ virtual std::vector<block_t> get_parents() = 0; /** Get a promise resolved when the pace maker thinks it is a *good* time * to vote for a block. The promise is resolved with the next proposer's ID @@ -26,6 +32,10 @@ class PaceMaker { using pacemaker_bt = BoxObj<PaceMaker>; +/** Parent selection implementation for PaceMaker: select all parents. + * PaceMakers derived from this class will select the highest block as the + * direct parent, while including other tail blocks (up to parent_limit) as + * uncles/aunts. */ class PMAllParents: public virtual PaceMaker { const int32_t parent_limit; /**< maximum number of parents */ public: @@ -50,6 +60,9 @@ class PMAllParents: public virtual PaceMaker { } }; +/** Beat implementation for PaceMaker: simply wait for the QC of last proposed + * block. PaceMakers derived from this class will beat only when the last + * block proposed by itself gets its QC. */ class PMWaitQC: public virtual PaceMaker { std::queue<promise_t> pending_beats; block_t last_proposed; @@ -102,12 +115,13 @@ class PMWaitQC: public virtual PaceMaker { } }; -/** A pace maker that waits for the qc of the last proposed block. */ +/** Naive PaceMaker where everyone can be a proposer at any moment. */ struct PaceMakerDummy: public PMAllParents, public PMWaitQC { PaceMakerDummy(int32_t parent_limit): PMAllParents(parent_limit), PMWaitQC() {} }; +/** PaceMakerDummy with a fixed proposer. */ class PaceMakerDummyFixed: public PaceMakerDummy { ReplicaID proposer; diff --git a/run_client.sh b/run_client.sh index 0efc198..93a9148 100755 --- a/run_client.sh +++ b/run_client.sh @@ -1,2 +1,2 @@ #!/bin/bash -./hotstuff-client --idx 0 --ntx -1 +./hotstuff-client --idx 0 --iter -1 --max-async 3 diff --git a/salticidae b/salticidae -Subproject 12bf781e762705f2bbabe5102148ac699e20ef1 +Subproject a75778995a4e0742f244670e9cc02a56611ccfe diff --git a/src/consensus.cpp b/src/consensus.cpp index 688c450..bf4868d 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -31,7 +31,7 @@ void HotStuffCore::sanity_check_delivered(const block_t &blk) { throw std::runtime_error("block not delivered"); } -block_t HotStuffCore::sanity_check_delivered(const uint256_t &blk_hash) { +block_t HotStuffCore::get_delivered_blk(const uint256_t &blk_hash) { block_t blk = storage->find_blk(blk_hash); if (blk == nullptr || !blk->delivered) throw std::runtime_error("block not delivered"); @@ -46,10 +46,7 @@ bool HotStuffCore::on_deliver_blk(const block_t &blk) { } blk->parents.clear(); for (const auto &hash: blk->parent_hashes) - { - block_t p = sanity_check_delivered(hash); - blk->parents.push_back(p); - } + blk->parents.push_back(get_delivered_blk(hash)); blk->height = blk->parents[0]->height + 1; for (const auto &cmd: blk->cmds) cmd->container = blk; @@ -66,41 +63,41 @@ bool HotStuffCore::on_deliver_blk(const block_t &blk) { tails.insert(blk); blk->delivered = true; - LOG_DEBUG("delivered %.10s", get_hex(blk->get_hash()).c_str()); + LOG_DEBUG("deliver %s", std::string(*blk).c_str()); return true; } void HotStuffCore::check_commit(const block_t &_blk) { const block_t &blk = _blk->qc_ref; if (blk->qc_ref == nullptr) return; + /* decided blk could possible be incomplete due to pruning */ if (blk->decision) return; block_t p = blk->parents[0]; - if (p == blk->qc_ref) - { /* commit */ - std::vector<block_t> commit_queue; - block_t b; - for (b = p; b->height > bexec->height; b = b->parents[0]) - { /* todo: also commit the uncles/aunts */ - commit_queue.push_back(b); - } - if (b != bexec) - throw std::runtime_error("safety breached :("); - for (auto it = commit_queue.rbegin(); it != commit_queue.rend(); it++) - { - const block_t &blk = *it; - blk->decision = 1; + /* commit requires direct parent */ + if (p != blk->qc_ref) return; + /* otherwise commit */ + std::vector<block_t> commit_queue; + block_t b; + for (b = p; b->height > bexec->height; b = b->parents[0]) + { /* TODO: also commit the uncles/aunts */ + commit_queue.push_back(b); + } + if (b != bexec) + throw std::runtime_error("safety breached :("); + for (auto it = commit_queue.rbegin(); it != commit_queue.rend(); it++) + { + const block_t &blk = *it; + blk->decision = 1; #ifdef HOTSTUFF_ENABLE_LOG_PROTO - LOG_INFO("commit blk %.10s", get_hex10(blk->get_hash()).c_str()); + LOG_INFO("commit %s", std::string(*blk).c_str()); #endif - for (auto cmd: blk->cmds) - do_decide(cmd); - } - bexec = p; + for (auto cmd: blk->cmds) do_decide(cmd); } + bexec = p; } bool HotStuffCore::update(const uint256_t &bqc_hash) { - block_t _bqc = sanity_check_delivered(bqc_hash); + block_t _bqc = get_delivered_blk(bqc_hash); if (_bqc->qc_ref == nullptr) return false; check_commit(_bqc); if (_bqc->qc_ref->height > bqc->qc_ref->height) @@ -116,13 +113,14 @@ void HotStuffCore::on_propose(const std::vector<command_t> &cmds, block_t p = parents[0]; quorum_cert_bt qc = nullptr; block_t qc_ref = nullptr; + /* a block can optionally carray a QC */ if (p != b0 && p->voted.size() >= config.nmajority) { qc = p->self_qc->clone(); qc->compute(); qc_ref = p; } - /* create a new block */ + /* create the new block */ block_t bnew = storage->add_blk( Block( parents, @@ -163,8 +161,11 @@ void HotStuffCore::on_receive_proposal(const Proposal &prop) { for (b = bnew; b->height > pref->height; b = b->parents[0]); - opinion = b == pref; - vheight = bnew->height; + if (b == pref) /* on the same branch */ + { + opinion = true; + vheight = bnew->height; + } } #ifdef HOTSTUFF_ENABLE_LOG_PROTO LOG_INFO("now state: %s", std::string(*this).c_str()); @@ -185,17 +186,17 @@ void HotStuffCore::on_receive_vote(const Vote &vote) { LOG_INFO("got %s", std::string(vote).c_str()); LOG_INFO("now state: %s", std::string(*this).c_str()); #endif - - block_t blk = sanity_check_delivered(vote.blk_hash); + block_t blk = get_delivered_blk(vote.blk_hash); if (vote.cert == nullptr) return; + /* otherwise the vote is positive */ if (!vote.verify()) { - LOG_WARN("invalid vote"); + LOG_WARN("invalid vote from %d", vote.voter); return; } if (!blk->voted.insert(vote.voter).second) { - LOG_WARN("duplicate votes"); + LOG_WARN("duplicate vote from %d", vote.voter); return; } size_t qsize = blk->voted.size(); diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp index d0b42c3..ed15cc1 100644 --- a/src/hotstuff.cpp +++ b/src/hotstuff.cpp @@ -411,7 +411,7 @@ HotStuffBase::HotStuffBase(uint32_t blk_size, pn.reg_handler(VOTE, std::bind(&HotStuffBase::vote_handler, this, _1, _2)); pn.reg_handler(QUERY_FETCH_BLK, std::bind(&HotStuffBase::query_fetch_blk_handler, this, _1, _2)); pn.reg_handler(RESP_FETCH_BLK, std::bind(&HotStuffBase::resp_fetch_blk_handler, this, _1, _2)); - pn.init(listen_addr); + pn.listen(listen_addr); } void HotStuffBase::do_broadcast_proposal(const Proposal &prop) { diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp index e1eec1b..28f02fc 100644 --- a/src/hotstuff_app.cpp +++ b/src/hotstuff_app.cpp @@ -221,7 +221,7 @@ HotStuffApp::HotStuffApp(uint32_t blk_size, /* register the handlers for msg from clients */ cn.reg_handler(hotstuff::REQ_CMD, std::bind(&HotStuffApp::client_request_cmd_handler, this, _1, _2)); cn.reg_handler(hotstuff::CHK_CMD, std::bind(&HotStuffApp::client_check_cmd_handler, this, _1, _2)); - cn.init(clisten_addr); + cn.listen(clisten_addr); } void HotStuffApp::client_request_cmd_handler(const MsgClient &msg, conn_client_t conn_) { diff --git a/src/hotstuff_client.cpp b/src/hotstuff_client.cpp index 9478d36..1363f39 100644 --- a/src/hotstuff_client.cpp +++ b/src/hotstuff_client.cpp @@ -12,39 +12,22 @@ using salticidae::Config; using salticidae::ElapsedTime; using salticidae::trim_all; using salticidae::split; +using salticidae::MsgNetwork; using hotstuff::ReplicaID; using hotstuff::NetAddr; using hotstuff::EventContext; -using hotstuff::Event; using hotstuff::uint256_t; -using hotstuff::bytearray_t; using hotstuff::MsgClient; using hotstuff::CommandDummy; using hotstuff::command_t; using hotstuff::Finality; +using hotstuff::HotStuffError; EventContext eb; -size_t max_async_num = 10; -int max_iter_num = 100; ReplicaID proposer; - -int connect(const NetAddr &node) { - int fd; - if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) - assert(0); - struct sockaddr_in sockin; - memset(&sockin, 0, sizeof(struct sockaddr_in)); - sockin.sin_family = AF_INET; - sockin.sin_addr.s_addr = node.ip; - sockin.sin_port = node.port; - if (connect(fd, (struct sockaddr *)&sockin, sizeof(struct sockaddr_in)) == -1) - assert(0); - HOTSTUFF_LOG_INFO("connected to %s", std::string(node).c_str()); - return fd; -} - -void on_receive(int); +size_t max_async_num; +int max_iter_num; struct Request { ReplicaID rid; @@ -54,80 +37,18 @@ struct Request { rid(rid), cmd(cmd) { et.start(); } }; -struct Conn { - int fd; - Event on_receive_ev; - - Conn(const NetAddr &addr): - fd(connect(addr)), - on_receive_ev(eb, fd, EV_READ, [this](int fd, short) { - on_receive(fd); - on_receive_ev.add(); - }) { on_receive_ev.add(); } - - Conn(Conn &&other): - fd(other.fd), - on_receive_ev(eb, fd, EV_READ, [this](int fd, short) { - on_receive(fd); - on_receive_ev.add(); - }) { - other.fd = -1; - other.on_receive_ev.del(); - on_receive_ev.add(); - } - - ~Conn() { if (fd != -1) close(fd); } -}; - std::unordered_map<int, salticidae::RingBuffer> buffers; std::unordered_map<const uint256_t, Request> waiting; -std::unordered_map<ReplicaID, Conn> conns; -std::vector<NetAddr> replicas; +MsgNetwork<MsgClient> mn(eb, 10, 0, 2, 4096); +std::unordered_map<ReplicaID, MsgNetwork<MsgClient>::conn_t> conns; +std::vector<NetAddr> replicas; -void setup(ReplicaID rid) { +void set_proposer(ReplicaID rid) { proposer = rid; auto it = conns.find(rid); if (it == conns.end()) - conns.insert(std::make_pair(rid, Conn(replicas[rid]))); -} - -void write_msg(int fd, const MsgClient &msg) { - bytearray_t msg_data = msg.serialize(); - if (write(fd, msg_data.data(), msg_data.size()) != (ssize_t)msg_data.size()) - assert(0); -} - -void read_msg(int fd, MsgClient &msg) { - static const size_t BUFF_SEG_SIZE = 1024; - ssize_t ret; - auto &buffer = buffers[fd]; - bool read_body = false; - for (;;) - { - bytearray_t buff_seg; - if (!read_body && buffer.size() >= MsgClient::header_size) - { - buff_seg = buffer.pop(MsgClient::header_size); - msg = MsgClient(buff_seg.data()); - read_body = true; - } - if (read_body && buffer.size() >= msg.get_length()) - { - buff_seg = buffer.pop(msg.get_length()); - msg.set_payload(std::move(buff_seg)); - return; - } - - buff_seg.resize(BUFF_SEG_SIZE); - ret = read(fd, buff_seg.data(), BUFF_SEG_SIZE); - assert(ret != -1); - if (ret > 0) - { - buff_seg.resize(ret); - buffer.push(std::move(buff_seg)); - } - } + conns.insert(std::make_pair(rid, mn.create_conn(replicas[rid]))); } void try_send() { @@ -136,7 +57,7 @@ void try_send() { auto cmd = CommandDummy::make_cmd(); MsgClient msg; msg.gen_reqcmd(*cmd); - write_msg(conns.find(proposer)->second.fd, msg); + mn.send_msg(msg, conns.find(proposer)->second); HOTSTUFF_LOG_INFO("send new cmd %.10s", get_hex(cmd->get_hash()).c_str()); waiting.insert(std::make_pair( @@ -146,11 +67,9 @@ void try_send() { } } -void on_receive(int fd) { - MsgClient msg; +void on_receive(const MsgClient &msg, MsgNetwork<MsgClient>::conn_t) { uint256_t cmd_hash; Finality fin; - read_msg(fd, msg); HOTSTUFF_LOG_DEBUG("got %s", std::string(msg).c_str()); if (!msg.verify_checksum()) HOTSTUFF_LOG_ERROR("incorrect checksum %08x", msg.get_checksum()); @@ -159,13 +78,13 @@ void on_receive(int fd) { if (fin.rid != proposer) { HOTSTUFF_LOG_INFO("reconnect to the new proposer"); - setup(fin.rid); + set_proposer(fin.rid); } if (fin.rid != it->second.rid) { MsgClient msg; msg.gen_reqcmd(*(waiting.find(cmd_hash)->second.cmd)); - write_msg(conns.find(proposer)->second.fd, msg); + mn.send_msg(msg, conns.find(proposer)->second); HOTSTUFF_LOG_INFO("resend cmd %.10s", get_hex(cmd_hash).c_str()); it->second.et.start(); @@ -173,8 +92,8 @@ void on_receive(int fd) { return; } HOTSTUFF_LOG_INFO( - "fd %d got response for %.10s: <decision=%d, blk=%.10s>", - fd, get_hex(cmd_hash).c_str(), + "got response for %.10s: <decision=%d, blk=%.10s>", + get_hex(cmd_hash).c_str(), fin.decision, get_hex(fin.blk_hash).c_str()); if (it == waiting.end()) return; @@ -187,30 +106,34 @@ std::pair<std::string, std::string> split_ip_port_cport(const std::string &s) { return std::make_pair(ret[0], ret[1]); } - int main(int argc, char **argv) { Config config("hotstuff.conf"); auto opt_idx = Config::OptValInt::create(0); auto opt_replicas = Config::OptValStrVec::create(); - auto opt_max_iter_num = Config::OptValInt::create(); + auto opt_max_iter_num = Config::OptValInt::create(100); + auto opt_max_async_num = Config::OptValInt::create(10); + + mn.reg_handler(hotstuff::RESP_CMD, on_receive); try { config.add_opt("idx", opt_idx, Config::SET_VAL); config.add_opt("replica", opt_replicas, Config::APPEND); - config.add_opt("ntx", opt_max_iter_num, Config::SET_VAL); + config.add_opt("iter", opt_max_iter_num, Config::SET_VAL); + config.add_opt("max-async", opt_max_async_num, Config::SET_VAL); config.parse(argc, argv); auto idx = opt_idx->get(); max_iter_num = opt_max_iter_num->get(); + max_async_num = opt_max_async_num->get(); std::vector<std::pair<std::string, std::string>> raw; for (const auto &s: opt_replicas->get()) { auto res = trim_all(split(s, ",")); - assert(res.size() == 2); + if (res.size() != 2) + throw HotStuffError("format error"); raw.push_back(std::make_pair(res[0], res[1])); } - if (!(0 <= idx && (size_t)idx < raw.size() && - raw.size() > 0)) + if (!(0 <= idx && (size_t)idx < raw.size() && raw.size() > 0)) throw std::invalid_argument("out of range"); for (const auto &p: raw) { @@ -219,10 +142,10 @@ int main(int argc, char **argv) { replicas.push_back(NetAddr(NetAddr(_p.first).ip, htons(stoi(_p.second, &_)))); } - setup(idx); + set_proposer(idx); try_send(); eb.dispatch(); - } catch (hotstuff::HotStuffError &e) { + } catch (HotStuffError &e) { HOTSTUFF_LOG_ERROR("exception: %s", std::string(e).c_str()); } return 0; |