diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/hotstuff.cpp | 59 | ||||
-rw-r--r-- | src/hotstuff_app.cpp | 158 | ||||
-rw-r--r-- | src/hotstuff_client.cpp | 116 |
3 files changed, 156 insertions, 177 deletions
diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp index 1912946..a8cc625 100644 --- a/src/hotstuff.cpp +++ b/src/hotstuff.cpp @@ -107,7 +107,10 @@ void HotStuffBase::add_replica(ReplicaID idx, const NetAddr &addr, pubkey_bt &&pub_key) { HotStuffCore::add_replica(idx, addr, std::move(pub_key)); if (addr != listen_addr) + { + peers.insert(addr); pn.add_peer(addr); + } } void HotStuffBase::on_fetch_blk(const block_t &blk) { @@ -221,8 +224,8 @@ promise_t HotStuffBase::async_deliver_blk(const uint256_t &blk_hash, return static_cast<promise_t &>(pm); } -void HotStuffBase::propose_handler(MsgPropose &&msg, Conn &conn) { - const NetAddr &peer = conn.get_peer(); +void HotStuffBase::propose_handler(MsgPropose &&msg, const Net::conn_t &conn) { + const NetAddr &peer = conn->get_peer(); msg.postponed_parse(this); auto &prop = msg.proposal; block_t blk = prop.blk; @@ -235,8 +238,8 @@ void HotStuffBase::propose_handler(MsgPropose &&msg, Conn &conn) { }); } -void HotStuffBase::vote_handler(MsgVote &&msg, Conn &conn) { - const NetAddr &peer = conn.get_peer(); +void HotStuffBase::vote_handler(MsgVote &&msg, const Net::conn_t &conn) { + const NetAddr &peer = conn->get_peer(); msg.postponed_parse(this); //auto &vote = msg.vote; RcObj<Vote> v(new Vote(std::move(msg.vote))); @@ -255,8 +258,8 @@ void HotStuffBase::vote_handler(MsgVote &&msg, Conn &conn) { }); } -void HotStuffBase::req_blk_handler(MsgReqBlock &&msg, Conn &conn) { - const NetAddr replica = conn.get_peer(); +void HotStuffBase::req_blk_handler(MsgReqBlock &&msg, const Net::conn_t &conn) { + const NetAddr replica = conn->get_peer(); auto &blk_hashes = msg.blk_hashes; std::vector<promise_t> pms; for (const auto &h: blk_hashes) @@ -272,7 +275,7 @@ void HotStuffBase::req_blk_handler(MsgReqBlock &&msg, Conn &conn) { }); } -void HotStuffBase::resp_blk_handler(MsgRespBlock &&msg, Conn &) { +void HotStuffBase::resp_blk_handler(MsgRespBlock &&msg, const Net::conn_t &) { msg.postponed_parse(this); for (const auto &blk: msg.blks) if (blk) on_fetch_blk(blk); @@ -310,28 +313,10 @@ void HotStuffBase::print_stat() const { part_delivery_time_min = double_inf; part_delivery_time_max = 0; #ifdef HOTSTUFF_MSG_STAT - LOG_INFO("-- sent opcode (10s) --"); - auto &sent_op = pn.get_sent_by_opcode(); - for (auto &op: sent_op) - { - auto &val = op.second; - LOG_INFO("%02x: %lu, %.2fBpm", op.first, - val.first, val.first ? val.second / double(val.first) : 0); - val.first = val.second = 0; - } - LOG_INFO("-- recv opcode (10s) --"); - auto &recv_op = pn.get_recv_by_opcode(); - for (auto &op: recv_op) - { - auto &val = op.second; - LOG_INFO("%02x: %lu, %.2fBpm", op.first, - val.first, val.first ? val.second / double(val.first) : 0); - val.first = val.second = 0; - } LOG_INFO("--- replica msg. (10s) ---"); size_t _nsent = 0; size_t _nrecv = 0; - for (const auto &replica: pn.all_peers()) + for (const auto &replica: peers) { auto conn = pn.get_peer_conn(replica); if (conn == nullptr) continue; @@ -361,14 +346,15 @@ HotStuffBase::HotStuffBase(uint32_t blk_size, privkey_bt &&priv_key, NetAddr listen_addr, pacemaker_bt pmaker, - EventContext eb, - size_t nworker): + EventContext ec, + size_t nworker, + const Net::Config &config): HotStuffCore(rid, std::move(priv_key)), listen_addr(listen_addr), blk_size(blk_size), - eb(eb), - vpool(eb, nworker), - pn(eb), + ec(ec), + vpool(ec, nworker), + pn(ec, config), pmaker(std::move(pmaker)), fetched(0), delivered(0), @@ -387,12 +373,13 @@ HotStuffBase::HotStuffBase(uint32_t blk_size, pn.reg_handler(salticidae::generic_bind(&HotStuffBase::vote_handler, this, _1, _2)); pn.reg_handler(salticidae::generic_bind(&HotStuffBase::req_blk_handler, this, _1, _2)); pn.reg_handler(salticidae::generic_bind(&HotStuffBase::resp_blk_handler, this, _1, _2)); + pn.start(); pn.listen(listen_addr); } void HotStuffBase::do_broadcast_proposal(const Proposal &prop) { MsgPropose prop_msg(prop); - for (const auto &replica: pn.all_peers()) + for (const auto &replica: peers) pn.send_msg(prop_msg, replica); } @@ -422,15 +409,15 @@ void HotStuffBase::do_decide(Finality &&fin) { HotStuffBase::~HotStuffBase() {} -void HotStuffBase::start(bool eb_loop) { +void HotStuffBase::start(bool ec_loop) { /* ((n - 1) + 1 - 1) / 3 */ - uint32_t nfaulty = pn.all_peers().size() / 3; + uint32_t nfaulty = peers.size() / 3; if (nfaulty == 0) LOG_WARN("too few replicas in the system to tolerate any failure"); on_init(nfaulty); pmaker->init(this); - if (eb_loop) - eb.dispatch(); + if (ec_loop) + ec.dispatch(); } } diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp index 014fe16..ee57a3a 100644 --- a/src/hotstuff_app.cpp +++ b/src/hotstuff_app.cpp @@ -29,7 +29,7 @@ using salticidae::static_pointer_cast; using salticidae::trim_all; using salticidae::split; -using hotstuff::Event; +using hotstuff::TimerEvent; using hotstuff::EventContext; using hotstuff::NetAddr; using hotstuff::HotStuffError; @@ -55,17 +55,17 @@ class HotStuffApp: public HotStuff { /** Network messaging between a replica and its client. */ ClientNetwork<opcode_t> cn; /** Timer object to schedule a periodic printing of system statistics */ - Event ev_stat_timer; + TimerEvent ev_stat_timer; /** Timer object to monitor the progress for simple impeachment */ - Event impeach_timer; + TimerEvent impeach_timer; /** The listen address for client RPC */ NetAddr clisten_addr; std::unordered_map<const uint256_t, promise_t> unconfirmed; - using Conn = ClientNetwork<opcode_t>::Conn; + using conn_t = ClientNetwork<opcode_t>::conn_t; - void client_request_cmd_handler(MsgReqCmd &&, Conn &); + void client_request_cmd_handler(MsgReqCmd &&, const conn_t &); static command_t parse_cmd(DataStream &s) { auto cmd = new CommandDummy(); @@ -75,7 +75,7 @@ class HotStuffApp: public HotStuff { void reset_imp_timer() { impeach_timer.del(); - impeach_timer.add_with_timeout(impeach_timeout); + impeach_timer.add(impeach_timeout); } void state_machine_execute(const Finality &fin) override { @@ -113,10 +113,6 @@ std::pair<std::string, std::string> split_ip_port_cport(const std::string &s) { return std::make_pair(ret[0], ret[1]); } -void signal_handler(int) { - throw HotStuffError("got terminal signal"); -} - salticidae::BoxObj<HotStuffApp> papp = nullptr; int main(int argc, char **argv) { @@ -125,9 +121,6 @@ int main(int argc, char **argv) { ElapsedTime elapsed; elapsed.start(); - signal(SIGTERM, signal_handler); - signal(SIGINT, signal_handler); - auto opt_blk_size = Config::OptValInt::create(1); auto opt_parent_limit = Config::OptValInt::create(-1); auto opt_stat_period = Config::OptValDouble::create(10); @@ -157,74 +150,72 @@ int main(int argc, char **argv) { config.add_opt("help", opt_help, Config::SWITCH_ON, 'h', "show this help info"); EventContext ec; -#ifdef HOTSTUFF_NORMAL_LOG - try { -#endif - config.parse(argc, argv); - if (opt_help->get()) - { - config.print_help(); - exit(0); - } - auto idx = opt_idx->get(); - auto client_port = opt_client_port->get(); - std::vector<std::pair<std::string, std::string>> replicas; - for (const auto &s: opt_replicas->get()) - { - auto res = trim_all(split(s, ",")); - if (res.size() != 2) - throw HotStuffError("invalid replica info"); - replicas.push_back(std::make_pair(res[0], res[1])); - } + config.parse(argc, argv); + if (opt_help->get()) + { + config.print_help(); + exit(0); + } + auto idx = opt_idx->get(); + auto client_port = opt_client_port->get(); + std::vector<std::pair<std::string, std::string>> replicas; + for (const auto &s: opt_replicas->get()) + { + auto res = trim_all(split(s, ",")); + if (res.size() != 2) + throw HotStuffError("invalid replica info"); + replicas.push_back(std::make_pair(res[0], res[1])); + } - if (!(0 <= idx && (size_t)idx < replicas.size())) - throw HotStuffError("replica idx out of range"); - std::string binding_addr = replicas[idx].first; - if (client_port == -1) - { - auto p = split_ip_port_cport(binding_addr); - size_t idx; - try { - client_port = stoi(p.second, &idx); - } catch (std::invalid_argument &) { - throw HotStuffError("client port not specified"); - } + if (!(0 <= idx && (size_t)idx < replicas.size())) + throw HotStuffError("replica idx out of range"); + std::string binding_addr = replicas[idx].first; + if (client_port == -1) + { + auto p = split_ip_port_cport(binding_addr); + size_t idx; + try { + client_port = stoi(p.second, &idx); + } catch (std::invalid_argument &) { + throw HotStuffError("client port not specified"); } + } - NetAddr plisten_addr{split_ip_port_cport(binding_addr).first}; + NetAddr plisten_addr{split_ip_port_cport(binding_addr).first}; - auto parent_limit = opt_parent_limit->get(); - hotstuff::pacemaker_bt pmaker; - if (opt_pace_maker->get() == "sticky") - pmaker = new hotstuff::PaceMakerSticky(parent_limit, opt_qc_timeout->get(), ec); - else if (opt_pace_maker->get() == "rr") - pmaker = new hotstuff::PaceMakerRR(parent_limit, opt_qc_timeout->get(), ec); - else - pmaker = new hotstuff::PaceMakerDummyFixed(opt_fixed_proposer->get(), parent_limit); + auto parent_limit = opt_parent_limit->get(); + hotstuff::pacemaker_bt pmaker; + if (opt_pace_maker->get() == "sticky") + pmaker = new hotstuff::PaceMakerSticky(parent_limit, opt_qc_timeout->get(), ec); + else if (opt_pace_maker->get() == "rr") + pmaker = new hotstuff::PaceMakerRR(parent_limit, opt_qc_timeout->get(), ec); + else + pmaker = new hotstuff::PaceMakerDummyFixed(opt_fixed_proposer->get(), parent_limit); - papp = new HotStuffApp(opt_blk_size->get(), - opt_stat_period->get(), - opt_imp_timeout->get(), - idx, - hotstuff::from_hex(opt_privkey->get()), - plisten_addr, - NetAddr("0.0.0.0", client_port), - std::move(pmaker), - ec, - opt_nworker->get()); - for (size_t i = 0; i < replicas.size(); i++) - { - auto p = split_ip_port_cport(replicas[i].first); - papp->add_replica(i, NetAddr(p.first), - hotstuff::from_hex(replicas[i].second)); - } - papp->start(); -#ifdef HOTSTUFF_NORMAL_LOG - } catch (std::exception &e) { - HOTSTUFF_LOG_INFO("exception: %s", e.what()); - elapsed.stop(true); + papp = new HotStuffApp(opt_blk_size->get(), + opt_stat_period->get(), + opt_imp_timeout->get(), + idx, + hotstuff::from_hex(opt_privkey->get()), + plisten_addr, + NetAddr("0.0.0.0", client_port), + std::move(pmaker), + ec, + opt_nworker->get()); + for (size_t i = 0; i < replicas.size(); i++) + { + auto p = split_ip_port_cport(replicas[i].first); + papp->add_replica(i, NetAddr(p.first), + hotstuff::from_hex(replicas[i].second)); } -#endif + auto shutdown = [&](int) { ec.stop(); }; + salticidae::SigEvent ev_sigint(ec, shutdown); + salticidae::SigEvent ev_sigterm(ec, shutdown); + ev_sigint.add(SIGINT); + ev_sigterm.add(SIGTERM); + + papp->start(); + elapsed.stop(true); return 0; } @@ -243,15 +234,16 @@ HotStuffApp::HotStuffApp(uint32_t blk_size, stat_period(stat_period), impeach_timeout(impeach_timeout), ec(ec), - cn(ec), + cn(ec, ClientNetwork<opcode_t>::Config()), clisten_addr(clisten_addr) { /* register the handlers for msg from clients */ cn.reg_handler(salticidae::generic_bind(&HotStuffApp::client_request_cmd_handler, this, _1, _2)); + cn.start(); cn.listen(clisten_addr); } -void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, Conn &conn) { - const NetAddr addr = conn.get_addr(); +void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, const conn_t &conn) { + const NetAddr addr = conn->get_addr(); auto cmd = parse_cmd(msg.serialized); const auto &cmd_hash = cmd->get_hash(); std::vector<promise_t> pms; @@ -275,17 +267,17 @@ void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, Conn &conn) { } void HotStuffApp::start() { - ev_stat_timer = Event(ec, -1, 0, [this](int, short) { + ev_stat_timer = TimerEvent(ec, [this](TimerEvent &) { HotStuff::print_stat(); //HotStuffCore::prune(100); - ev_stat_timer.add_with_timeout(stat_period); + ev_stat_timer.add(stat_period); }); - ev_stat_timer.add_with_timeout(stat_period); - impeach_timer = Event(ec, -1, 0, [this](int, short) { + ev_stat_timer.add(stat_period); + impeach_timer = TimerEvent(ec, [this](TimerEvent &) { get_pace_maker().impeach(); reset_imp_timer(); }); - impeach_timer.add_with_timeout(impeach_timeout); + impeach_timer.add(impeach_timeout); HOTSTUFF_LOG_INFO("** starting the system with parameters **"); HOTSTUFF_LOG_INFO("blk_size = %lu", blk_size); HOTSTUFF_LOG_INFO("conns = %lu", HotStuff::size()); diff --git a/src/hotstuff_client.cpp b/src/hotstuff_client.cpp index d8a6087..e8d7b9e 100644 --- a/src/hotstuff_client.cpp +++ b/src/hotstuff_client.cpp @@ -1,6 +1,8 @@ #include <cassert> #include <random> #include <signal.h> +#include <sys/time.h> + #include "salticidae/type.h" #include "salticidae/netaddr.h" #include "salticidae/network.h" @@ -11,7 +13,6 @@ #include "hotstuff/client.h" using salticidae::Config; -using salticidae::MsgNetwork; using hotstuff::ReplicaID; using hotstuff::NetAddr; @@ -25,7 +26,7 @@ using hotstuff::uint256_t; using hotstuff::opcode_t; using hotstuff::command_t; -EventContext eb; +EventContext ec; ReplicaID proposer; size_t max_async_num; int max_iter_num; @@ -42,11 +43,13 @@ struct Request { rid(rid), cmd(cmd), confirmed(0) { et.start(); } }; -std::unordered_map<ReplicaID, MsgNetwork<opcode_t>::conn_t> conns; +using Net = salticidae::MsgNetwork<opcode_t>; + +std::unordered_map<ReplicaID, Net::conn_t> conns; std::unordered_map<const uint256_t, Request> waiting; std::vector<NetAddr> replicas; std::vector<std::pair<struct timeval, double>> elapsed; -MsgNetwork<opcode_t> mn(eb, 10, 10, 4096); +Net mn(ec, Net::Config()); void connect_all() { for (size_t i = 0; i < replicas.size(); i++) @@ -66,7 +69,7 @@ void try_send() { auto cmd = new CommandDummy(cid, cnt++); //mn.send_msg(MsgReqCmd(*cmd), *conns.at(proposer)); MsgReqCmd msg(*cmd); - for (auto &p: conns) mn.send_msg(msg, *(p.second)); + for (auto &p: conns) mn.send_msg(msg, p.second); #ifndef HOTSTUFF_ENABLE_BENCHMARK HOTSTUFF_LOG_INFO("send new cmd %.10s", get_hex(cmd->get_hash()).c_str()); @@ -78,7 +81,7 @@ void try_send() { } } -void client_resp_cmd_handler(MsgRespCmd &&msg, MsgNetwork<opcode_t>::Conn &) { +void client_resp_cmd_handler(MsgRespCmd &&msg, const Net::conn_t &) { auto &fin = msg.fin; HOTSTUFF_LOG_DEBUG("got %s", std::string(msg.fin).c_str()); const uint256_t &cmd_hash = fin.cmd_hash; @@ -122,70 +125,67 @@ std::pair<std::string, std::string> split_ip_port_cport(const std::string &s) { return std::make_pair(ret[0], ret[1]); } -void signal_handler(int) { - throw HotStuffError("got terminal signal"); -} - int main(int argc, char **argv) { Config config("hotstuff.conf"); - signal(SIGTERM, signal_handler); - signal(SIGINT, signal_handler); - auto opt_idx = Config::OptValInt::create(0); auto opt_replicas = Config::OptValStrVec::create(); auto opt_max_iter_num = Config::OptValInt::create(100); auto opt_max_async_num = Config::OptValInt::create(10); auto opt_cid = Config::OptValInt::create(-1); + auto shutdown = [&](int) { ec.stop(); }; + salticidae::SigEvent ev_sigint(ec, shutdown); + salticidae::SigEvent ev_sigterm(ec, shutdown); + ev_sigint.add(SIGINT); + ev_sigterm.add(SIGTERM); + mn.reg_handler(client_resp_cmd_handler); + mn.start(); + + config.add_opt("idx", opt_idx, Config::SET_VAL); + config.add_opt("cid", opt_cid, Config::SET_VAL); + config.add_opt("replica", opt_replicas, Config::APPEND); + config.add_opt("iter", opt_max_iter_num, Config::SET_VAL); + config.add_opt("max-async", opt_max_async_num, Config::SET_VAL); + config.parse(argc, argv); + auto idx = opt_idx->get(); + max_iter_num = opt_max_iter_num->get(); + max_async_num = opt_max_async_num->get(); + std::vector<std::pair<std::string, std::string>> raw; + for (const auto &s: opt_replicas->get()) + { + auto res = salticidae::trim_all(salticidae::split(s, ",")); + if (res.size() != 2) + throw HotStuffError("format error"); + raw.push_back(std::make_pair(res[0], res[1])); + } + + if (!(0 <= idx && (size_t)idx < raw.size() && raw.size() > 0)) + throw std::invalid_argument("out of range"); + cid = opt_cid->get() != -1 ? opt_cid->get() : idx; + for (const auto &p: raw) + { + auto _p = split_ip_port_cport(p.first); + size_t _; + replicas.push_back(NetAddr(NetAddr(_p.first).ip, htons(stoi(_p.second, &_)))); + } + + nfaulty = (replicas.size() - 1) / 3; + HOTSTUFF_LOG_INFO("nfaulty = %zu", nfaulty); + connect_all(); + set_proposer(idx); + try_send(); + ec.dispatch(); - try { - config.add_opt("idx", opt_idx, Config::SET_VAL); - config.add_opt("cid", opt_cid, Config::SET_VAL); - config.add_opt("replica", opt_replicas, Config::APPEND); - config.add_opt("iter", opt_max_iter_num, Config::SET_VAL); - config.add_opt("max-async", opt_max_async_num, Config::SET_VAL); - config.parse(argc, argv); - auto idx = opt_idx->get(); - max_iter_num = opt_max_iter_num->get(); - max_async_num = opt_max_async_num->get(); - std::vector<std::pair<std::string, std::string>> raw; - for (const auto &s: opt_replicas->get()) - { - auto res = salticidae::trim_all(salticidae::split(s, ",")); - if (res.size() != 2) - throw HotStuffError("format error"); - raw.push_back(std::make_pair(res[0], res[1])); - } - - if (!(0 <= idx && (size_t)idx < raw.size() && raw.size() > 0)) - throw std::invalid_argument("out of range"); - cid = opt_cid->get() != -1 ? opt_cid->get() : idx; - for (const auto &p: raw) - { - auto _p = split_ip_port_cport(p.first); - size_t _; - replicas.push_back(NetAddr(NetAddr(_p.first).ip, htons(stoi(_p.second, &_)))); - } - - nfaulty = (replicas.size() - 1) / 3; - HOTSTUFF_LOG_INFO("nfaulty = %zu", nfaulty); - connect_all(); - set_proposer(idx); - try_send(); - eb.dispatch(); - } catch (HotStuffError &e) { - HOTSTUFF_LOG_ERROR("exception: %s", std::string(e).c_str()); #ifdef HOTSTUFF_ENABLE_BENCHMARK - for (const auto &e: elapsed) - { - char fmt[64]; - struct tm *tmp = localtime(&e.first.tv_sec); - strftime(fmt, sizeof fmt, "%Y-%m-%d %H:%M:%S.%%06u [hotstuff info] %%.6f\n", tmp); - fprintf(stderr, fmt, e.first.tv_usec, e.second); - } -#endif + for (const auto &e: elapsed) + { + char fmt[64]; + struct tm *tmp = localtime(&e.first.tv_sec); + strftime(fmt, sizeof fmt, "%Y-%m-%d %H:%M:%S.%%06u [hotstuff info] %%.6f\n", tmp); + fprintf(stderr, fmt, e.first.tv_usec, e.second); } +#endif return 0; } |