diff options
Diffstat (limited to 'include/hotstuff')
-rw-r--r-- | include/hotstuff/hotstuff.h | 49 | ||||
-rw-r--r-- | include/hotstuff/liveness.h | 32 | ||||
-rw-r--r-- | include/hotstuff/type.h | 3 | ||||
-rw-r--r-- | include/hotstuff/util.h | 2 | ||||
-rw-r--r-- | include/hotstuff/worker.h | 8 |
5 files changed, 48 insertions, 46 deletions
diff --git a/include/hotstuff/hotstuff.h b/include/hotstuff/hotstuff.h index 3d1c7b6..bc8d960 100644 --- a/include/hotstuff/hotstuff.h +++ b/include/hotstuff/hotstuff.h @@ -14,7 +14,6 @@ namespace hotstuff { -using salticidae::MsgNetwork; using salticidae::PeerNetwork; using salticidae::ElapsedTime; using salticidae::_1; @@ -69,12 +68,12 @@ class HotStuffBase; template<EntityType ent_type> class FetchContext: public promise_t { - Event timeout; + TimerEvent timeout; HotStuffBase *hs; MsgReqBlock fetch_msg; const uint256_t ent_hash; std::unordered_set<NetAddr> replica_ids; - inline void timeout_cb(evutil_socket_t, short); + inline void timeout_cb(TimerEvent &); public: FetchContext(const FetchContext &) = delete; FetchContext &operator=(const FetchContext &) = delete; @@ -109,7 +108,7 @@ class BlockDeliveryContext: public promise_t { class HotStuffBase: public HotStuffCore { using BlockFetchContext = FetchContext<ENT_TYPE_BLK>; using CmdFetchContext = FetchContext<ENT_TYPE_CMD>; - using Conn = PeerNetwork<opcode_t>::Conn; + using Net = PeerNetwork<opcode_t>; friend BlockFetchContext; friend CmdFetchContext; @@ -120,14 +119,15 @@ class HotStuffBase: public HotStuffCore { /** the block size */ size_t blk_size; /** libevent handle */ - EventContext eb; + EventContext ec; VeriPool vpool; + std::unordered_set<NetAddr> peers; private: /** whether libevent handle is owned by itself */ - bool eb_loop; + bool ec_loop; /** network stack */ - PeerNetwork<opcode_t> pn; + Net pn; #ifdef HOTSTUFF_BLK_PROFILE BlockProfiler blk_profiler; #endif @@ -159,13 +159,13 @@ class HotStuffBase: public HotStuffCore { void on_deliver_blk(const block_t &blk); /** deliver consensus message: <propose> */ - inline void propose_handler(MsgPropose &&, Conn &); + inline void propose_handler(MsgPropose &&, const Net::conn_t &); /** deliver consensus message: <vote> */ - inline void vote_handler(MsgVote &&, Conn &); + inline void vote_handler(MsgVote &&, const Net::conn_t &); /** fetches full block data */ - inline void req_blk_handler(MsgReqBlock &&, Conn &); + inline void req_blk_handler(MsgReqBlock &&, const Net::conn_t &); /** receives a block */ - inline void resp_blk_handler(MsgRespBlock &&, Conn &); + inline void resp_blk_handler(MsgRespBlock &&, const Net::conn_t &); void do_broadcast_proposal(const Proposal &) override; void do_vote(ReplicaID, const Vote &) override; @@ -183,8 +183,9 @@ class HotStuffBase: public HotStuffCore { privkey_bt &&priv_key, NetAddr listen_addr, pacemaker_bt pmaker, - EventContext eb, - size_t nworker); + EventContext ec, + size_t nworker, + const Net::Config &config = Net::Config()); ~HotStuffBase(); @@ -193,9 +194,9 @@ class HotStuffBase: public HotStuffCore { /* Submit the command to be decided. */ promise_t exec_command(uint256_t cmd); void add_replica(ReplicaID idx, const NetAddr &addr, pubkey_bt &&pub_key); - void start(bool eb_loop = false); + void start(bool ec_loop = false); - size_t size() const { return pn.all_peers().size(); } + size_t size() const { return peers.size(); } PaceMaker &get_pace_maker() { return *pmaker; } void print_stat() const; @@ -247,14 +248,14 @@ class HotStuff: public HotStuffBase { const bytearray_t &raw_privkey, NetAddr listen_addr, pacemaker_bt pmaker, - EventContext eb = EventContext(), + EventContext ec = EventContext(), size_t nworker = 4): HotStuffBase(blk_size, rid, new PrivKeyType(raw_privkey), listen_addr, std::move(pmaker), - eb, + ec, nworker) {} void add_replica(ReplicaID idx, const NetAddr &addr, const bytearray_t &pubkey_raw) { @@ -275,13 +276,13 @@ FetchContext<ent_type>::FetchContext(FetchContext && other): ent_hash(other.ent_hash), replica_ids(std::move(other.replica_ids)) { other.timeout.del(); - timeout = Event(hs->eb, -1, 0, - std::bind(&FetchContext::timeout_cb, this, _1, _2)); + timeout = TimerEvent(hs->ec, + std::bind(&FetchContext::timeout_cb, this, _1)); reset_timeout(); } template<> -inline void FetchContext<ENT_TYPE_CMD>::timeout_cb(evutil_socket_t, short) { +inline void FetchContext<ENT_TYPE_CMD>::timeout_cb(TimerEvent &) { HOTSTUFF_LOG_WARN("cmd fetching %.10s timeout", get_hex(ent_hash).c_str()); for (const auto &replica_id: replica_ids) send(replica_id); @@ -289,7 +290,7 @@ inline void FetchContext<ENT_TYPE_CMD>::timeout_cb(evutil_socket_t, short) { } template<> -inline void FetchContext<ENT_TYPE_BLK>::timeout_cb(evutil_socket_t, short) { +inline void FetchContext<ENT_TYPE_BLK>::timeout_cb(TimerEvent &) { HOTSTUFF_LOG_WARN("block fetching %.10s timeout", get_hex(ent_hash).c_str()); for (const auto &replica_id: replica_ids) send(replica_id); @@ -303,8 +304,8 @@ FetchContext<ent_type>::FetchContext( hs(hs), ent_hash(ent_hash) { fetch_msg = std::vector<uint256_t>{ent_hash}; - timeout = Event(hs->eb, -1, 0, - std::bind(&FetchContext::timeout_cb, this, _1, _2)); + timeout = TimerEvent(hs->ec, + std::bind(&FetchContext::timeout_cb, this, _1)); reset_timeout(); } @@ -316,7 +317,7 @@ void FetchContext<ent_type>::send(const NetAddr &replica_id) { template<EntityType ent_type> void FetchContext<ent_type>::reset_timeout() { - timeout.add_with_timeout(salticidae::gen_rand_timeout(ent_waiting_timeout)); + timeout.add(salticidae::gen_rand_timeout(ent_waiting_timeout)); } template<EntityType ent_type> diff --git a/include/hotstuff/liveness.h b/include/hotstuff/liveness.h index c88a0a1..36caca2 100644 --- a/include/hotstuff/liveness.h +++ b/include/hotstuff/liveness.h @@ -240,8 +240,8 @@ class PMStickyProposer: virtual public PaceMaker { double candidate_timeout; EventContext ec; /** QC timer or randomized timeout */ - Event timer; - Event ev_imp; + TimerEvent timer; + TimerEvent ev_imp; block_t last_proposed; /** the proposer it believes */ ReplicaID proposer; @@ -304,7 +304,7 @@ class PMStickyProposer: virtual public PaceMaker { .then([this, pm]() { timer.del(); pm.resolve(proposer); - timer.add_with_timeout(qc_timeout); + timer.add(qc_timeout); HOTSTUFF_LOG_PROTO("QC timer reset"); }); locked = true; @@ -350,7 +350,7 @@ class PMStickyProposer: virtual public PaceMaker { }); double t = salticidae::gen_rand_timeout(candidate_timeout); timer.del(); - timer.add_with_timeout(t); + timer.add(t); HOTSTUFF_LOG_INFO("candidate next try in %.2fs", t); propose_elect_block(); } @@ -401,7 +401,7 @@ class PMStickyProposer: virtual public PaceMaker { proposer = hsc->get_id(); last_proposed = nullptr; hsc->set_neg_vote(true); - timer = Event(ec, -1, 0, [this](int, short) { + timer = TimerEvent(ec, [this](TimerEvent &) { /* proposer unable to get a QC in time */ to_candidate(); }); @@ -416,7 +416,7 @@ class PMStickyProposer: virtual public PaceMaker { proposer = hsc->get_id(); last_proposed = nullptr; hsc->set_neg_vote(false); - timer = Event(ec, -1, 0, [this](int, short) { + timer = TimerEvent(ec, [this](TimerEvent &) { candidate_qc_timeout(); }); candidate_timeout = qc_timeout; @@ -427,10 +427,10 @@ class PMStickyProposer: virtual public PaceMaker { protected: void impeach() override { if (role == CANDIDATE) return; - ev_imp = Event(ec, -1, 0, [this](int, short) { + ev_imp = TimerEvent(ec, [this](TimerEvent &) { to_candidate(); }); - ev_imp.add_with_timeout(0); + ev_imp.add(0); HOTSTUFF_LOG_INFO("schedule to impeach the proposer"); } @@ -490,8 +490,8 @@ class PMRoundRobinProposer: virtual public PaceMaker { double candidate_timeout; EventContext ec; /** QC timer or randomized timeout */ - Event timer; - Event ev_imp; + TimerEvent timer; + TimerEvent ev_imp; block_t last_proposed; /** the proposer it believes */ ReplicaID proposer; @@ -554,7 +554,7 @@ class PMRoundRobinProposer: virtual public PaceMaker { .then([this, pm]() { timer.del(); pm.resolve(proposer); - timer.add_with_timeout(qc_timeout); + timer.add(qc_timeout); HOTSTUFF_LOG_PROTO("QC timer reset"); }); locked = true; @@ -608,7 +608,7 @@ class PMRoundRobinProposer: virtual public PaceMaker { void candidate_qc_timeout() { timer.del(); - timer.add_with_timeout(candidate_timeout); + timer.add(candidate_timeout); candidate_timeout *= 1.01; proposer = (proposer + 1) % hsc->get_config().nreplicas; if (proposer == hsc->get_id()) @@ -654,7 +654,7 @@ class PMRoundRobinProposer: virtual public PaceMaker { role = PROPOSER; last_proposed = nullptr; hsc->set_neg_vote(true); - timer = Event(ec, -1, 0, [this](int, short) { + timer = TimerEvent(ec, [this](TimerEvent &) { /* proposer unable to get a QC in time */ to_candidate(); }); @@ -667,7 +667,7 @@ class PMRoundRobinProposer: virtual public PaceMaker { role = CANDIDATE; last_proposed = nullptr; hsc->set_neg_vote(false); - timer = Event(ec, -1, 0, [this](int, short) { + timer = TimerEvent(ec, [this](TimerEvent &) { candidate_qc_timeout(); }); candidate_timeout = qc_timeout * 0.1; @@ -678,10 +678,10 @@ class PMRoundRobinProposer: virtual public PaceMaker { protected: void impeach() override { if (role == CANDIDATE) return; - ev_imp = Event(ec, -1, 0, [this](int, short) { + ev_imp = TimerEvent(ec, [this](TimerEvent &) { to_candidate(); }); - ev_imp.add_with_timeout(0); + ev_imp.add(0); HOTSTUFF_LOG_INFO("schedule to impeach the proposer"); } diff --git a/include/hotstuff/type.h b/include/hotstuff/type.h index 0897956..784a952 100644 --- a/include/hotstuff/type.h +++ b/include/hotstuff/type.h @@ -24,7 +24,8 @@ using salticidae::bytearray_t; using salticidae::get_hash; using salticidae::NetAddr; -using salticidae::Event; +using salticidae::TimerEvent; +using salticidae::FdEvent; using salticidae::EventContext; using promise::promise_t; diff --git a/include/hotstuff/util.h b/include/hotstuff/util.h index 25dda70..efec4be 100644 --- a/include/hotstuff/util.h +++ b/include/hotstuff/util.h @@ -13,7 +13,7 @@ class Logger: public salticidae::Logger { void proto(const char *fmt, ...) { va_list ap; va_start(ap, fmt); - write("proto", fmt, ap); + write("proto", is_tty() ? salticidae::TTY_COLOR_MAGENTA : nullptr, fmt, ap); va_end(ap); } }; diff --git a/include/hotstuff/worker.h b/include/hotstuff/worker.h index 229b1bf..e39ea66 100644 --- a/include/hotstuff/worker.h +++ b/include/hotstuff/worker.h @@ -21,7 +21,7 @@ using veritask_ut = BoxObj<VeriTask>; class VeriPool { using queue_t = moodycamel::BlockingConcurrentQueue<VeriTask *>; int fin_fd[2]; - Event fin_ev; + FdEvent fin_ev; queue_t in_queue; queue_t out_queue; std::thread notifier; @@ -30,7 +30,7 @@ class VeriPool { public: VeriPool(EventContext ec, size_t nworker) { pipe(fin_fd); - fin_ev = Event(ec, fin_fd[0], EV_READ, [&](int fd, short) { + fin_ev = FdEvent(ec, fin_fd[0], [&](int fd, short) { VeriTask *task; bool result; read(fd, &task, sizeof(VeriTask *)); @@ -38,9 +38,9 @@ class VeriPool { auto it = pms.find(task); it->second.second.resolve(result); pms.erase(it); - fin_ev.add(); + fin_ev.add(FdEvent::READ); }); - fin_ev.add(); + fin_ev.add(FdEvent::READ); // finish notifier thread notifier = std::thread([this]() { while (true) |