diff options
author | Determinant <[email protected]> | 2018-11-21 13:04:14 -0500 |
---|---|---|
committer | Determinant <[email protected]> | 2018-11-21 13:04:14 -0500 |
commit | 603ceda21e85d8568ec6197db9bc9293dda3b483 (patch) | |
tree | d926d57bfcfd38ca2764759e8c495073169b387f | |
parent | 89165b6ab0c365eb23cb17bc56a788c7dddc1867 (diff) |
upgrade salticidae
-rw-r--r-- | include/hotstuff/hotstuff.h | 49 | ||||
-rw-r--r-- | include/hotstuff/liveness.h | 32 | ||||
-rw-r--r-- | include/hotstuff/type.h | 3 | ||||
-rw-r--r-- | include/hotstuff/util.h | 2 | ||||
-rw-r--r-- | include/hotstuff/worker.h | 8 | ||||
m--------- | salticidae | 0 | ||||
-rw-r--r-- | src/hotstuff.cpp | 59 | ||||
-rw-r--r-- | src/hotstuff_app.cpp | 158 | ||||
-rw-r--r-- | src/hotstuff_client.cpp | 116 | ||||
-rw-r--r-- | test/CMakeLists.txt | 3 | ||||
-rw-r--r-- | test/test_concurrent_queue.cpp | 68 |
11 files changed, 204 insertions, 294 deletions
diff --git a/include/hotstuff/hotstuff.h b/include/hotstuff/hotstuff.h index 3d1c7b6..bc8d960 100644 --- a/include/hotstuff/hotstuff.h +++ b/include/hotstuff/hotstuff.h @@ -14,7 +14,6 @@ namespace hotstuff { -using salticidae::MsgNetwork; using salticidae::PeerNetwork; using salticidae::ElapsedTime; using salticidae::_1; @@ -69,12 +68,12 @@ class HotStuffBase; template<EntityType ent_type> class FetchContext: public promise_t { - Event timeout; + TimerEvent timeout; HotStuffBase *hs; MsgReqBlock fetch_msg; const uint256_t ent_hash; std::unordered_set<NetAddr> replica_ids; - inline void timeout_cb(evutil_socket_t, short); + inline void timeout_cb(TimerEvent &); public: FetchContext(const FetchContext &) = delete; FetchContext &operator=(const FetchContext &) = delete; @@ -109,7 +108,7 @@ class BlockDeliveryContext: public promise_t { class HotStuffBase: public HotStuffCore { using BlockFetchContext = FetchContext<ENT_TYPE_BLK>; using CmdFetchContext = FetchContext<ENT_TYPE_CMD>; - using Conn = PeerNetwork<opcode_t>::Conn; + using Net = PeerNetwork<opcode_t>; friend BlockFetchContext; friend CmdFetchContext; @@ -120,14 +119,15 @@ class HotStuffBase: public HotStuffCore { /** the block size */ size_t blk_size; /** libevent handle */ - EventContext eb; + EventContext ec; VeriPool vpool; + std::unordered_set<NetAddr> peers; private: /** whether libevent handle is owned by itself */ - bool eb_loop; + bool ec_loop; /** network stack */ - PeerNetwork<opcode_t> pn; + Net pn; #ifdef HOTSTUFF_BLK_PROFILE BlockProfiler blk_profiler; #endif @@ -159,13 +159,13 @@ class HotStuffBase: public HotStuffCore { void on_deliver_blk(const block_t &blk); /** deliver consensus message: <propose> */ - inline void propose_handler(MsgPropose &&, Conn &); + inline void propose_handler(MsgPropose &&, const Net::conn_t &); /** deliver consensus message: <vote> */ - inline void vote_handler(MsgVote &&, Conn &); + inline void vote_handler(MsgVote &&, const Net::conn_t &); /** fetches full block data */ - inline void req_blk_handler(MsgReqBlock &&, Conn &); + inline void req_blk_handler(MsgReqBlock &&, const Net::conn_t &); /** receives a block */ - inline void resp_blk_handler(MsgRespBlock &&, Conn &); + inline void resp_blk_handler(MsgRespBlock &&, const Net::conn_t &); void do_broadcast_proposal(const Proposal &) override; void do_vote(ReplicaID, const Vote &) override; @@ -183,8 +183,9 @@ class HotStuffBase: public HotStuffCore { privkey_bt &&priv_key, NetAddr listen_addr, pacemaker_bt pmaker, - EventContext eb, - size_t nworker); + EventContext ec, + size_t nworker, + const Net::Config &config = Net::Config()); ~HotStuffBase(); @@ -193,9 +194,9 @@ class HotStuffBase: public HotStuffCore { /* Submit the command to be decided. */ promise_t exec_command(uint256_t cmd); void add_replica(ReplicaID idx, const NetAddr &addr, pubkey_bt &&pub_key); - void start(bool eb_loop = false); + void start(bool ec_loop = false); - size_t size() const { return pn.all_peers().size(); } + size_t size() const { return peers.size(); } PaceMaker &get_pace_maker() { return *pmaker; } void print_stat() const; @@ -247,14 +248,14 @@ class HotStuff: public HotStuffBase { const bytearray_t &raw_privkey, NetAddr listen_addr, pacemaker_bt pmaker, - EventContext eb = EventContext(), + EventContext ec = EventContext(), size_t nworker = 4): HotStuffBase(blk_size, rid, new PrivKeyType(raw_privkey), listen_addr, std::move(pmaker), - eb, + ec, nworker) {} void add_replica(ReplicaID idx, const NetAddr &addr, const bytearray_t &pubkey_raw) { @@ -275,13 +276,13 @@ FetchContext<ent_type>::FetchContext(FetchContext && other): ent_hash(other.ent_hash), replica_ids(std::move(other.replica_ids)) { other.timeout.del(); - timeout = Event(hs->eb, -1, 0, - std::bind(&FetchContext::timeout_cb, this, _1, _2)); + timeout = TimerEvent(hs->ec, + std::bind(&FetchContext::timeout_cb, this, _1)); reset_timeout(); } template<> -inline void FetchContext<ENT_TYPE_CMD>::timeout_cb(evutil_socket_t, short) { +inline void FetchContext<ENT_TYPE_CMD>::timeout_cb(TimerEvent &) { HOTSTUFF_LOG_WARN("cmd fetching %.10s timeout", get_hex(ent_hash).c_str()); for (const auto &replica_id: replica_ids) send(replica_id); @@ -289,7 +290,7 @@ inline void FetchContext<ENT_TYPE_CMD>::timeout_cb(evutil_socket_t, short) { } template<> -inline void FetchContext<ENT_TYPE_BLK>::timeout_cb(evutil_socket_t, short) { +inline void FetchContext<ENT_TYPE_BLK>::timeout_cb(TimerEvent &) { HOTSTUFF_LOG_WARN("block fetching %.10s timeout", get_hex(ent_hash).c_str()); for (const auto &replica_id: replica_ids) send(replica_id); @@ -303,8 +304,8 @@ FetchContext<ent_type>::FetchContext( hs(hs), ent_hash(ent_hash) { fetch_msg = std::vector<uint256_t>{ent_hash}; - timeout = Event(hs->eb, -1, 0, - std::bind(&FetchContext::timeout_cb, this, _1, _2)); + timeout = TimerEvent(hs->ec, + std::bind(&FetchContext::timeout_cb, this, _1)); reset_timeout(); } @@ -316,7 +317,7 @@ void FetchContext<ent_type>::send(const NetAddr &replica_id) { template<EntityType ent_type> void FetchContext<ent_type>::reset_timeout() { - timeout.add_with_timeout(salticidae::gen_rand_timeout(ent_waiting_timeout)); + timeout.add(salticidae::gen_rand_timeout(ent_waiting_timeout)); } template<EntityType ent_type> diff --git a/include/hotstuff/liveness.h b/include/hotstuff/liveness.h index c88a0a1..36caca2 100644 --- a/include/hotstuff/liveness.h +++ b/include/hotstuff/liveness.h @@ -240,8 +240,8 @@ class PMStickyProposer: virtual public PaceMaker { double candidate_timeout; EventContext ec; /** QC timer or randomized timeout */ - Event timer; - Event ev_imp; + TimerEvent timer; + TimerEvent ev_imp; block_t last_proposed; /** the proposer it believes */ ReplicaID proposer; @@ -304,7 +304,7 @@ class PMStickyProposer: virtual public PaceMaker { .then([this, pm]() { timer.del(); pm.resolve(proposer); - timer.add_with_timeout(qc_timeout); + timer.add(qc_timeout); HOTSTUFF_LOG_PROTO("QC timer reset"); }); locked = true; @@ -350,7 +350,7 @@ class PMStickyProposer: virtual public PaceMaker { }); double t = salticidae::gen_rand_timeout(candidate_timeout); timer.del(); - timer.add_with_timeout(t); + timer.add(t); HOTSTUFF_LOG_INFO("candidate next try in %.2fs", t); propose_elect_block(); } @@ -401,7 +401,7 @@ class PMStickyProposer: virtual public PaceMaker { proposer = hsc->get_id(); last_proposed = nullptr; hsc->set_neg_vote(true); - timer = Event(ec, -1, 0, [this](int, short) { + timer = TimerEvent(ec, [this](TimerEvent &) { /* proposer unable to get a QC in time */ to_candidate(); }); @@ -416,7 +416,7 @@ class PMStickyProposer: virtual public PaceMaker { proposer = hsc->get_id(); last_proposed = nullptr; hsc->set_neg_vote(false); - timer = Event(ec, -1, 0, [this](int, short) { + timer = TimerEvent(ec, [this](TimerEvent &) { candidate_qc_timeout(); }); candidate_timeout = qc_timeout; @@ -427,10 +427,10 @@ class PMStickyProposer: virtual public PaceMaker { protected: void impeach() override { if (role == CANDIDATE) return; - ev_imp = Event(ec, -1, 0, [this](int, short) { + ev_imp = TimerEvent(ec, [this](TimerEvent &) { to_candidate(); }); - ev_imp.add_with_timeout(0); + ev_imp.add(0); HOTSTUFF_LOG_INFO("schedule to impeach the proposer"); } @@ -490,8 +490,8 @@ class PMRoundRobinProposer: virtual public PaceMaker { double candidate_timeout; EventContext ec; /** QC timer or randomized timeout */ - Event timer; - Event ev_imp; + TimerEvent timer; + TimerEvent ev_imp; block_t last_proposed; /** the proposer it believes */ ReplicaID proposer; @@ -554,7 +554,7 @@ class PMRoundRobinProposer: virtual public PaceMaker { .then([this, pm]() { timer.del(); pm.resolve(proposer); - timer.add_with_timeout(qc_timeout); + timer.add(qc_timeout); HOTSTUFF_LOG_PROTO("QC timer reset"); }); locked = true; @@ -608,7 +608,7 @@ class PMRoundRobinProposer: virtual public PaceMaker { void candidate_qc_timeout() { timer.del(); - timer.add_with_timeout(candidate_timeout); + timer.add(candidate_timeout); candidate_timeout *= 1.01; proposer = (proposer + 1) % hsc->get_config().nreplicas; if (proposer == hsc->get_id()) @@ -654,7 +654,7 @@ class PMRoundRobinProposer: virtual public PaceMaker { role = PROPOSER; last_proposed = nullptr; hsc->set_neg_vote(true); - timer = Event(ec, -1, 0, [this](int, short) { + timer = TimerEvent(ec, [this](TimerEvent &) { /* proposer unable to get a QC in time */ to_candidate(); }); @@ -667,7 +667,7 @@ class PMRoundRobinProposer: virtual public PaceMaker { role = CANDIDATE; last_proposed = nullptr; hsc->set_neg_vote(false); - timer = Event(ec, -1, 0, [this](int, short) { + timer = TimerEvent(ec, [this](TimerEvent &) { candidate_qc_timeout(); }); candidate_timeout = qc_timeout * 0.1; @@ -678,10 +678,10 @@ class PMRoundRobinProposer: virtual public PaceMaker { protected: void impeach() override { if (role == CANDIDATE) return; - ev_imp = Event(ec, -1, 0, [this](int, short) { + ev_imp = TimerEvent(ec, [this](TimerEvent &) { to_candidate(); }); - ev_imp.add_with_timeout(0); + ev_imp.add(0); HOTSTUFF_LOG_INFO("schedule to impeach the proposer"); } diff --git a/include/hotstuff/type.h b/include/hotstuff/type.h index 0897956..784a952 100644 --- a/include/hotstuff/type.h +++ b/include/hotstuff/type.h @@ -24,7 +24,8 @@ using salticidae::bytearray_t; using salticidae::get_hash; using salticidae::NetAddr; -using salticidae::Event; +using salticidae::TimerEvent; +using salticidae::FdEvent; using salticidae::EventContext; using promise::promise_t; diff --git a/include/hotstuff/util.h b/include/hotstuff/util.h index 25dda70..efec4be 100644 --- a/include/hotstuff/util.h +++ b/include/hotstuff/util.h @@ -13,7 +13,7 @@ class Logger: public salticidae::Logger { void proto(const char *fmt, ...) { va_list ap; va_start(ap, fmt); - write("proto", fmt, ap); + write("proto", is_tty() ? salticidae::TTY_COLOR_MAGENTA : nullptr, fmt, ap); va_end(ap); } }; diff --git a/include/hotstuff/worker.h b/include/hotstuff/worker.h index 229b1bf..e39ea66 100644 --- a/include/hotstuff/worker.h +++ b/include/hotstuff/worker.h @@ -21,7 +21,7 @@ using veritask_ut = BoxObj<VeriTask>; class VeriPool { using queue_t = moodycamel::BlockingConcurrentQueue<VeriTask *>; int fin_fd[2]; - Event fin_ev; + FdEvent fin_ev; queue_t in_queue; queue_t out_queue; std::thread notifier; @@ -30,7 +30,7 @@ class VeriPool { public: VeriPool(EventContext ec, size_t nworker) { pipe(fin_fd); - fin_ev = Event(ec, fin_fd[0], EV_READ, [&](int fd, short) { + fin_ev = FdEvent(ec, fin_fd[0], [&](int fd, short) { VeriTask *task; bool result; read(fd, &task, sizeof(VeriTask *)); @@ -38,9 +38,9 @@ class VeriPool { auto it = pms.find(task); it->second.second.resolve(result); pms.erase(it); - fin_ev.add(); + fin_ev.add(FdEvent::READ); }); - fin_ev.add(); + fin_ev.add(FdEvent::READ); // finish notifier thread notifier = std::thread([this]() { while (true) diff --git a/salticidae b/salticidae -Subproject 0eea9ddc7cfb2820295dd87aed3dc911a100ecd +Subproject 2b1f6791ddfd8ef4fb21cb4b50a3d6bc8694586 diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp index 1912946..a8cc625 100644 --- a/src/hotstuff.cpp +++ b/src/hotstuff.cpp @@ -107,7 +107,10 @@ void HotStuffBase::add_replica(ReplicaID idx, const NetAddr &addr, pubkey_bt &&pub_key) { HotStuffCore::add_replica(idx, addr, std::move(pub_key)); if (addr != listen_addr) + { + peers.insert(addr); pn.add_peer(addr); + } } void HotStuffBase::on_fetch_blk(const block_t &blk) { @@ -221,8 +224,8 @@ promise_t HotStuffBase::async_deliver_blk(const uint256_t &blk_hash, return static_cast<promise_t &>(pm); } -void HotStuffBase::propose_handler(MsgPropose &&msg, Conn &conn) { - const NetAddr &peer = conn.get_peer(); +void HotStuffBase::propose_handler(MsgPropose &&msg, const Net::conn_t &conn) { + const NetAddr &peer = conn->get_peer(); msg.postponed_parse(this); auto &prop = msg.proposal; block_t blk = prop.blk; @@ -235,8 +238,8 @@ void HotStuffBase::propose_handler(MsgPropose &&msg, Conn &conn) { }); } -void HotStuffBase::vote_handler(MsgVote &&msg, Conn &conn) { - const NetAddr &peer = conn.get_peer(); +void HotStuffBase::vote_handler(MsgVote &&msg, const Net::conn_t &conn) { + const NetAddr &peer = conn->get_peer(); msg.postponed_parse(this); //auto &vote = msg.vote; RcObj<Vote> v(new Vote(std::move(msg.vote))); @@ -255,8 +258,8 @@ void HotStuffBase::vote_handler(MsgVote &&msg, Conn &conn) { }); } -void HotStuffBase::req_blk_handler(MsgReqBlock &&msg, Conn &conn) { - const NetAddr replica = conn.get_peer(); +void HotStuffBase::req_blk_handler(MsgReqBlock &&msg, const Net::conn_t &conn) { + const NetAddr replica = conn->get_peer(); auto &blk_hashes = msg.blk_hashes; std::vector<promise_t> pms; for (const auto &h: blk_hashes) @@ -272,7 +275,7 @@ void HotStuffBase::req_blk_handler(MsgReqBlock &&msg, Conn &conn) { }); } -void HotStuffBase::resp_blk_handler(MsgRespBlock &&msg, Conn &) { +void HotStuffBase::resp_blk_handler(MsgRespBlock &&msg, const Net::conn_t &) { msg.postponed_parse(this); for (const auto &blk: msg.blks) if (blk) on_fetch_blk(blk); @@ -310,28 +313,10 @@ void HotStuffBase::print_stat() const { part_delivery_time_min = double_inf; part_delivery_time_max = 0; #ifdef HOTSTUFF_MSG_STAT - LOG_INFO("-- sent opcode (10s) --"); - auto &sent_op = pn.get_sent_by_opcode(); - for (auto &op: sent_op) - { - auto &val = op.second; - LOG_INFO("%02x: %lu, %.2fBpm", op.first, - val.first, val.first ? val.second / double(val.first) : 0); - val.first = val.second = 0; - } - LOG_INFO("-- recv opcode (10s) --"); - auto &recv_op = pn.get_recv_by_opcode(); - for (auto &op: recv_op) - { - auto &val = op.second; - LOG_INFO("%02x: %lu, %.2fBpm", op.first, - val.first, val.first ? val.second / double(val.first) : 0); - val.first = val.second = 0; - } LOG_INFO("--- replica msg. (10s) ---"); size_t _nsent = 0; size_t _nrecv = 0; - for (const auto &replica: pn.all_peers()) + for (const auto &replica: peers) { auto conn = pn.get_peer_conn(replica); if (conn == nullptr) continue; @@ -361,14 +346,15 @@ HotStuffBase::HotStuffBase(uint32_t blk_size, privkey_bt &&priv_key, NetAddr listen_addr, pacemaker_bt pmaker, - EventContext eb, - size_t nworker): + EventContext ec, + size_t nworker, + const Net::Config &config): HotStuffCore(rid, std::move(priv_key)), listen_addr(listen_addr), blk_size(blk_size), - eb(eb), - vpool(eb, nworker), - pn(eb), + ec(ec), + vpool(ec, nworker), + pn(ec, config), pmaker(std::move(pmaker)), fetched(0), delivered(0), @@ -387,12 +373,13 @@ HotStuffBase::HotStuffBase(uint32_t blk_size, pn.reg_handler(salticidae::generic_bind(&HotStuffBase::vote_handler, this, _1, _2)); pn.reg_handler(salticidae::generic_bind(&HotStuffBase::req_blk_handler, this, _1, _2)); pn.reg_handler(salticidae::generic_bind(&HotStuffBase::resp_blk_handler, this, _1, _2)); + pn.start(); pn.listen(listen_addr); } void HotStuffBase::do_broadcast_proposal(const Proposal &prop) { MsgPropose prop_msg(prop); - for (const auto &replica: pn.all_peers()) + for (const auto &replica: peers) pn.send_msg(prop_msg, replica); } @@ -422,15 +409,15 @@ void HotStuffBase::do_decide(Finality &&fin) { HotStuffBase::~HotStuffBase() {} -void HotStuffBase::start(bool eb_loop) { +void HotStuffBase::start(bool ec_loop) { /* ((n - 1) + 1 - 1) / 3 */ - uint32_t nfaulty = pn.all_peers().size() / 3; + uint32_t nfaulty = peers.size() / 3; if (nfaulty == 0) LOG_WARN("too few replicas in the system to tolerate any failure"); on_init(nfaulty); pmaker->init(this); - if (eb_loop) - eb.dispatch(); + if (ec_loop) + ec.dispatch(); } } diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp index 014fe16..ee57a3a 100644 --- a/src/hotstuff_app.cpp +++ b/src/hotstuff_app.cpp @@ -29,7 +29,7 @@ using salticidae::static_pointer_cast; using salticidae::trim_all; using salticidae::split; -using hotstuff::Event; +using hotstuff::TimerEvent; using hotstuff::EventContext; using hotstuff::NetAddr; using hotstuff::HotStuffError; @@ -55,17 +55,17 @@ class HotStuffApp: public HotStuff { /** Network messaging between a replica and its client. */ ClientNetwork<opcode_t> cn; /** Timer object to schedule a periodic printing of system statistics */ - Event ev_stat_timer; + TimerEvent ev_stat_timer; /** Timer object to monitor the progress for simple impeachment */ - Event impeach_timer; + TimerEvent impeach_timer; /** The listen address for client RPC */ NetAddr clisten_addr; std::unordered_map<const uint256_t, promise_t> unconfirmed; - using Conn = ClientNetwork<opcode_t>::Conn; + using conn_t = ClientNetwork<opcode_t>::conn_t; - void client_request_cmd_handler(MsgReqCmd &&, Conn &); + void client_request_cmd_handler(MsgReqCmd &&, const conn_t &); static command_t parse_cmd(DataStream &s) { auto cmd = new CommandDummy(); @@ -75,7 +75,7 @@ class HotStuffApp: public HotStuff { void reset_imp_timer() { impeach_timer.del(); - impeach_timer.add_with_timeout(impeach_timeout); + impeach_timer.add(impeach_timeout); } void state_machine_execute(const Finality &fin) override { @@ -113,10 +113,6 @@ std::pair<std::string, std::string> split_ip_port_cport(const std::string &s) { return std::make_pair(ret[0], ret[1]); } -void signal_handler(int) { - throw HotStuffError("got terminal signal"); -} - salticidae::BoxObj<HotStuffApp> papp = nullptr; int main(int argc, char **argv) { @@ -125,9 +121,6 @@ int main(int argc, char **argv) { ElapsedTime elapsed; elapsed.start(); - signal(SIGTERM, signal_handler); - signal(SIGINT, signal_handler); - auto opt_blk_size = Config::OptValInt::create(1); auto opt_parent_limit = Config::OptValInt::create(-1); auto opt_stat_period = Config::OptValDouble::create(10); @@ -157,74 +150,72 @@ int main(int argc, char **argv) { config.add_opt("help", opt_help, Config::SWITCH_ON, 'h', "show this help info"); EventContext ec; -#ifdef HOTSTUFF_NORMAL_LOG - try { -#endif - config.parse(argc, argv); - if (opt_help->get()) - { - config.print_help(); - exit(0); - } - auto idx = opt_idx->get(); - auto client_port = opt_client_port->get(); - std::vector<std::pair<std::string, std::string>> replicas; - for (const auto &s: opt_replicas->get()) - { - auto res = trim_all(split(s, ",")); - if (res.size() != 2) - throw HotStuffError("invalid replica info"); - replicas.push_back(std::make_pair(res[0], res[1])); - } + config.parse(argc, argv); + if (opt_help->get()) + { + config.print_help(); + exit(0); + } + auto idx = opt_idx->get(); + auto client_port = opt_client_port->get(); + std::vector<std::pair<std::string, std::string>> replicas; + for (const auto &s: opt_replicas->get()) + { + auto res = trim_all(split(s, ",")); + if (res.size() != 2) + throw HotStuffError("invalid replica info"); + replicas.push_back(std::make_pair(res[0], res[1])); + } - if (!(0 <= idx && (size_t)idx < replicas.size())) - throw HotStuffError("replica idx out of range"); - std::string binding_addr = replicas[idx].first; - if (client_port == -1) - { - auto p = split_ip_port_cport(binding_addr); - size_t idx; - try { - client_port = stoi(p.second, &idx); - } catch (std::invalid_argument &) { - throw HotStuffError("client port not specified"); - } + if (!(0 <= idx && (size_t)idx < replicas.size())) + throw HotStuffError("replica idx out of range"); + std::string binding_addr = replicas[idx].first; + if (client_port == -1) + { + auto p = split_ip_port_cport(binding_addr); + size_t idx; + try { + client_port = stoi(p.second, &idx); + } catch (std::invalid_argument &) { + throw HotStuffError("client port not specified"); } + } - NetAddr plisten_addr{split_ip_port_cport(binding_addr).first}; + NetAddr plisten_addr{split_ip_port_cport(binding_addr).first}; - auto parent_limit = opt_parent_limit->get(); - hotstuff::pacemaker_bt pmaker; - if (opt_pace_maker->get() == "sticky") - pmaker = new hotstuff::PaceMakerSticky(parent_limit, opt_qc_timeout->get(), ec); - else if (opt_pace_maker->get() == "rr") - pmaker = new hotstuff::PaceMakerRR(parent_limit, opt_qc_timeout->get(), ec); - else - pmaker = new hotstuff::PaceMakerDummyFixed(opt_fixed_proposer->get(), parent_limit); + auto parent_limit = opt_parent_limit->get(); + hotstuff::pacemaker_bt pmaker; + if (opt_pace_maker->get() == "sticky") + pmaker = new hotstuff::PaceMakerSticky(parent_limit, opt_qc_timeout->get(), ec); + else if (opt_pace_maker->get() == "rr") + pmaker = new hotstuff::PaceMakerRR(parent_limit, opt_qc_timeout->get(), ec); + else + pmaker = new hotstuff::PaceMakerDummyFixed(opt_fixed_proposer->get(), parent_limit); - papp = new HotStuffApp(opt_blk_size->get(), - opt_stat_period->get(), - opt_imp_timeout->get(), - idx, - hotstuff::from_hex(opt_privkey->get()), - plisten_addr, - NetAddr("0.0.0.0", client_port), - std::move(pmaker), - ec, - opt_nworker->get()); - for (size_t i = 0; i < replicas.size(); i++) - { - auto p = split_ip_port_cport(replicas[i].first); - papp->add_replica(i, NetAddr(p.first), - hotstuff::from_hex(replicas[i].second)); - } - papp->start(); -#ifdef HOTSTUFF_NORMAL_LOG - } catch (std::exception &e) { - HOTSTUFF_LOG_INFO("exception: %s", e.what()); - elapsed.stop(true); + papp = new HotStuffApp(opt_blk_size->get(), + opt_stat_period->get(), + opt_imp_timeout->get(), + idx, + hotstuff::from_hex(opt_privkey->get()), + plisten_addr, + NetAddr("0.0.0.0", client_port), + std::move(pmaker), + ec, + opt_nworker->get()); + for (size_t i = 0; i < replicas.size(); i++) + { + auto p = split_ip_port_cport(replicas[i].first); + papp->add_replica(i, NetAddr(p.first), + hotstuff::from_hex(replicas[i].second)); } -#endif + auto shutdown = [&](int) { ec.stop(); }; + salticidae::SigEvent ev_sigint(ec, shutdown); + salticidae::SigEvent ev_sigterm(ec, shutdown); + ev_sigint.add(SIGINT); + ev_sigterm.add(SIGTERM); + + papp->start(); + elapsed.stop(true); return 0; } @@ -243,15 +234,16 @@ HotStuffApp::HotStuffApp(uint32_t blk_size, stat_period(stat_period), impeach_timeout(impeach_timeout), ec(ec), - cn(ec), + cn(ec, ClientNetwork<opcode_t>::Config()), clisten_addr(clisten_addr) { /* register the handlers for msg from clients */ cn.reg_handler(salticidae::generic_bind(&HotStuffApp::client_request_cmd_handler, this, _1, _2)); + cn.start(); cn.listen(clisten_addr); } -void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, Conn &conn) { - const NetAddr addr = conn.get_addr(); +void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, const conn_t &conn) { + const NetAddr addr = conn->get_addr(); auto cmd = parse_cmd(msg.serialized); const auto &cmd_hash = cmd->get_hash(); std::vector<promise_t> pms; @@ -275,17 +267,17 @@ void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, Conn &conn) { } void HotStuffApp::start() { - ev_stat_timer = Event(ec, -1, 0, [this](int, short) { + ev_stat_timer = TimerEvent(ec, [this](TimerEvent &) { HotStuff::print_stat(); //HotStuffCore::prune(100); - ev_stat_timer.add_with_timeout(stat_period); + ev_stat_timer.add(stat_period); }); - ev_stat_timer.add_with_timeout(stat_period); - impeach_timer = Event(ec, -1, 0, [this](int, short) { + ev_stat_timer.add(stat_period); + impeach_timer = TimerEvent(ec, [this](TimerEvent &) { get_pace_maker().impeach(); reset_imp_timer(); }); - impeach_timer.add_with_timeout(impeach_timeout); + impeach_timer.add(impeach_timeout); HOTSTUFF_LOG_INFO("** starting the system with parameters **"); HOTSTUFF_LOG_INFO("blk_size = %lu", blk_size); HOTSTUFF_LOG_INFO("conns = %lu", HotStuff::size()); diff --git a/src/hotstuff_client.cpp b/src/hotstuff_client.cpp index d8a6087..e8d7b9e 100644 --- a/src/hotstuff_client.cpp +++ b/src/hotstuff_client.cpp @@ -1,6 +1,8 @@ #include <cassert> #include <random> #include <signal.h> +#include <sys/time.h> + #include "salticidae/type.h" #include "salticidae/netaddr.h" #include "salticidae/network.h" @@ -11,7 +13,6 @@ #include "hotstuff/client.h" using salticidae::Config; -using salticidae::MsgNetwork; using hotstuff::ReplicaID; using hotstuff::NetAddr; @@ -25,7 +26,7 @@ using hotstuff::uint256_t; using hotstuff::opcode_t; using hotstuff::command_t; -EventContext eb; +EventContext ec; ReplicaID proposer; size_t max_async_num; int max_iter_num; @@ -42,11 +43,13 @@ struct Request { rid(rid), cmd(cmd), confirmed(0) { et.start(); } }; -std::unordered_map<ReplicaID, MsgNetwork<opcode_t>::conn_t> conns; +using Net = salticidae::MsgNetwork<opcode_t>; + +std::unordered_map<ReplicaID, Net::conn_t> conns; std::unordered_map<const uint256_t, Request> waiting; std::vector<NetAddr> replicas; std::vector<std::pair<struct timeval, double>> elapsed; -MsgNetwork<opcode_t> mn(eb, 10, 10, 4096); +Net mn(ec, Net::Config()); void connect_all() { for (size_t i = 0; i < replicas.size(); i++) @@ -66,7 +69,7 @@ void try_send() { auto cmd = new CommandDummy(cid, cnt++); //mn.send_msg(MsgReqCmd(*cmd), *conns.at(proposer)); MsgReqCmd msg(*cmd); - for (auto &p: conns) mn.send_msg(msg, *(p.second)); + for (auto &p: conns) mn.send_msg(msg, p.second); #ifndef HOTSTUFF_ENABLE_BENCHMARK HOTSTUFF_LOG_INFO("send new cmd %.10s", get_hex(cmd->get_hash()).c_str()); @@ -78,7 +81,7 @@ void try_send() { } } -void client_resp_cmd_handler(MsgRespCmd &&msg, MsgNetwork<opcode_t>::Conn &) { +void client_resp_cmd_handler(MsgRespCmd &&msg, const Net::conn_t &) { auto &fin = msg.fin; HOTSTUFF_LOG_DEBUG("got %s", std::string(msg.fin).c_str()); const uint256_t &cmd_hash = fin.cmd_hash; @@ -122,70 +125,67 @@ std::pair<std::string, std::string> split_ip_port_cport(const std::string &s) { return std::make_pair(ret[0], ret[1]); } -void signal_handler(int) { - throw HotStuffError("got terminal signal"); -} - int main(int argc, char **argv) { Config config("hotstuff.conf"); - signal(SIGTERM, signal_handler); - signal(SIGINT, signal_handler); - auto opt_idx = Config::OptValInt::create(0); auto opt_replicas = Config::OptValStrVec::create(); auto opt_max_iter_num = Config::OptValInt::create(100); auto opt_max_async_num = Config::OptValInt::create(10); auto opt_cid = Config::OptValInt::create(-1); + auto shutdown = [&](int) { ec.stop(); }; + salticidae::SigEvent ev_sigint(ec, shutdown); + salticidae::SigEvent ev_sigterm(ec, shutdown); + ev_sigint.add(SIGINT); + ev_sigterm.add(SIGTERM); + mn.reg_handler(client_resp_cmd_handler); + mn.start(); + + config.add_opt("idx", opt_idx, Config::SET_VAL); + config.add_opt("cid", opt_cid, Config::SET_VAL); + config.add_opt("replica", opt_replicas, Config::APPEND); + config.add_opt("iter", opt_max_iter_num, Config::SET_VAL); + config.add_opt("max-async", opt_max_async_num, Config::SET_VAL); + config.parse(argc, argv); + auto idx = opt_idx->get(); + max_iter_num = opt_max_iter_num->get(); + max_async_num = opt_max_async_num->get(); + std::vector<std::pair<std::string, std::string>> raw; + for (const auto &s: opt_replicas->get()) + { + auto res = salticidae::trim_all(salticidae::split(s, ",")); + if (res.size() != 2) + throw HotStuffError("format error"); + raw.push_back(std::make_pair(res[0], res[1])); + } + + if (!(0 <= idx && (size_t)idx < raw.size() && raw.size() > 0)) + throw std::invalid_argument("out of range"); + cid = opt_cid->get() != -1 ? opt_cid->get() : idx; + for (const auto &p: raw) + { + auto _p = split_ip_port_cport(p.first); + size_t _; + replicas.push_back(NetAddr(NetAddr(_p.first).ip, htons(stoi(_p.second, &_)))); + } + + nfaulty = (replicas.size() - 1) / 3; + HOTSTUFF_LOG_INFO("nfaulty = %zu", nfaulty); + connect_all(); + set_proposer(idx); + try_send(); + ec.dispatch(); - try { - config.add_opt("idx", opt_idx, Config::SET_VAL); - config.add_opt("cid", opt_cid, Config::SET_VAL); - config.add_opt("replica", opt_replicas, Config::APPEND); - config.add_opt("iter", opt_max_iter_num, Config::SET_VAL); - config.add_opt("max-async", opt_max_async_num, Config::SET_VAL); - config.parse(argc, argv); - auto idx = opt_idx->get(); - max_iter_num = opt_max_iter_num->get(); - max_async_num = opt_max_async_num->get(); - std::vector<std::pair<std::string, std::string>> raw; - for (const auto &s: opt_replicas->get()) - { - auto res = salticidae::trim_all(salticidae::split(s, ",")); - if (res.size() != 2) - throw HotStuffError("format error"); - raw.push_back(std::make_pair(res[0], res[1])); - } - - if (!(0 <= idx && (size_t)idx < raw.size() && raw.size() > 0)) - throw std::invalid_argument("out of range"); - cid = opt_cid->get() != -1 ? opt_cid->get() : idx; - for (const auto &p: raw) - { - auto _p = split_ip_port_cport(p.first); - size_t _; - replicas.push_back(NetAddr(NetAddr(_p.first).ip, htons(stoi(_p.second, &_)))); - } - - nfaulty = (replicas.size() - 1) / 3; - HOTSTUFF_LOG_INFO("nfaulty = %zu", nfaulty); - connect_all(); - set_proposer(idx); - try_send(); - eb.dispatch(); - } catch (HotStuffError &e) { - HOTSTUFF_LOG_ERROR("exception: %s", std::string(e).c_str()); #ifdef HOTSTUFF_ENABLE_BENCHMARK - for (const auto &e: elapsed) - { - char fmt[64]; - struct tm *tmp = localtime(&e.first.tv_sec); - strftime(fmt, sizeof fmt, "%Y-%m-%d %H:%M:%S.%%06u [hotstuff info] %%.6f\n", tmp); - fprintf(stderr, fmt, e.first.tv_usec, e.second); - } -#endif + for (const auto &e: elapsed) + { + char fmt[64]; + struct tm *tmp = localtime(&e.first.tv_sec); + strftime(fmt, sizeof fmt, "%Y-%m-%d %H:%M:%S.%%06u [hotstuff info] %%.6f\n", tmp); + fprintf(stderr, fmt, e.first.tv_usec, e.second); } +#endif return 0; } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index e4c7a1b..f8796d0 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -4,6 +4,3 @@ include_directories(../src/ add_executable(test_secp256k1 test_secp256k1.cpp) target_link_libraries(test_secp256k1 hotstuff_static) - -add_executable(test_concurrent_queue test_concurrent_queue.cpp) -target_link_libraries(test_concurrent_queue salticidae_static pthread) diff --git a/test/test_concurrent_queue.cpp b/test/test_concurrent_queue.cpp deleted file mode 100644 index 7412213..0000000 --- a/test/test_concurrent_queue.cpp +++ /dev/null @@ -1,68 +0,0 @@ -#include "salticidae/event.h" -#include "concurrentqueue/blockingconcurrentqueue.h" -#include <thread> -#include <unistd.h> - -class VeriPool { - int fin_fd[2]; - moodycamel::BlockingConcurrentQueue<int> in_queue; - moodycamel::BlockingConcurrentQueue<int> out_queue; - std::thread notifier; - std::vector<std::thread> workers; - public: - VeriPool(size_t nworker) { - pipe(fin_fd); - // finish notifier thread - notifier = std::thread([this]() { - while (true) - { - int item; - out_queue.wait_dequeue(item); - write(fin_fd[1], &item, sizeof(item)); - } - }); - for (size_t i = 0; i < nworker; i++) - { - workers.push_back(std::thread([this]() { - while (true) - { - int item; - in_queue.wait_dequeue(item); - fprintf(stderr, "%lu working on %d\n", std::this_thread::get_id(), item); - out_queue.enqueue(item * 1000); - } - })); - } - } - - ~VeriPool() { - notifier.detach(); - for (auto &w: workers) w.detach(); - close(fin_fd[0]); - close(fin_fd[1]); - } - - void submit(int item) { - in_queue.enqueue(item); - } - - int get_fd() { - return fin_fd[0]; - } -}; - -int main() { - VeriPool p(2); - salticidae::EventContext ec; - salticidae::Event ev; - ev = salticidae::Event(ec, p.get_fd(), EV_READ, [&ev](int fd, short) { - int item; - read(fd, &item, sizeof(item)); - printf("finished %d\n", item); - ev.add(); - }); - for (int i = 0; i < 10000; i++) - p.submit(i); - ev.add(); - ec.dispatch(); -} |