diff options
author | Determinant <[email protected]> | 2019-07-06 23:01:10 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2019-07-06 23:01:10 -0400 |
commit | a442616edf97dc65f6e768d249eb318fa317c747 (patch) | |
tree | 785fb67feaf7018e0bd22b28af01e988a895008f /src | |
parent | b5d5e8375373eaed6a5544860336a74bbd96446d (diff) |
move the demo app to examples/
Diffstat (limited to 'src')
-rw-r--r-- | src/hotstuff_app.cpp | 425 | ||||
-rw-r--r-- | src/hotstuff_client.cpp | 181 |
2 files changed, 0 insertions, 606 deletions
diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp deleted file mode 100644 index a0fe80b..0000000 --- a/src/hotstuff_app.cpp +++ /dev/null @@ -1,425 +0,0 @@ -/** - * Copyright 2018 VMware - * Copyright 2018 Ted Yin - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <iostream> -#include <cstring> -#include <cassert> -#include <algorithm> -#include <random> -#include <unistd.h> -#include <signal.h> -#include <event2/event.h> - -#include "salticidae/stream.h" -#include "salticidae/util.h" -#include "salticidae/network.h" -#include "salticidae/msg.h" - -#include "hotstuff/promise.hpp" -#include "hotstuff/type.h" -#include "hotstuff/entity.h" -#include "hotstuff/util.h" -#include "hotstuff/client.h" -#include "hotstuff/hotstuff.h" -#include "hotstuff/liveness.h" - -using salticidae::MsgNetwork; -using salticidae::ClientNetwork; -using salticidae::ElapsedTime; -using salticidae::Config; -using salticidae::_1; -using salticidae::_2; -using salticidae::static_pointer_cast; -using salticidae::trim_all; -using salticidae::split; - -using hotstuff::TimerEvent; -using hotstuff::EventContext; -using hotstuff::NetAddr; -using hotstuff::HotStuffError; -using hotstuff::CommandDummy; -using hotstuff::Finality; -using hotstuff::command_t; -using hotstuff::uint256_t; -using hotstuff::opcode_t; -using hotstuff::bytearray_t; -using hotstuff::DataStream; -using hotstuff::ReplicaID; -using hotstuff::MsgReqCmd; -using hotstuff::MsgRespCmd; -using hotstuff::get_hash; -using hotstuff::promise_t; - -using HotStuff = hotstuff::HotStuffSecp256k1; - -class HotStuffApp: public HotStuff { - double stat_period; - double impeach_timeout; - EventContext ec; - EventContext req_ec; - EventContext resp_ec; - /** Network messaging between a replica and its client. */ - ClientNetwork<opcode_t> cn; - /** Timer object to schedule a periodic printing of system statistics */ - TimerEvent ev_stat_timer; - /** Timer object to monitor the progress for simple impeachment */ - TimerEvent impeach_timer; - /** The listen address for client RPC */ - NetAddr clisten_addr; - - std::unordered_map<const uint256_t, promise_t> unconfirmed; - - using conn_t = ClientNetwork<opcode_t>::conn_t; - using resp_queue_t = salticidae::MPSCQueueEventDriven<Finality>; - - /* for the dedicated thread sending responses to the clients */ - std::thread req_thread; - std::thread resp_thread; - resp_queue_t resp_queue; - salticidae::BoxObj<salticidae::ThreadCall> resp_tcall; - salticidae::BoxObj<salticidae::ThreadCall> req_tcall; - - void client_request_cmd_handler(MsgReqCmd &&, const conn_t &); - - static command_t parse_cmd(DataStream &s) { - auto cmd = new CommandDummy(); - s >> *cmd; - return cmd; - } - - void reset_imp_timer() { - impeach_timer.del(); - impeach_timer.add(impeach_timeout); - } - - void state_machine_execute(const Finality &fin) override { - reset_imp_timer(); -#ifndef HOTSTUFF_ENABLE_BENCHMARK - HOTSTUFF_LOG_INFO("replicated %s", std::string(fin).c_str()); -#endif - resp_queue.enqueue(fin); - } - -#ifdef HOTSTUFF_MSG_STAT - std::unordered_set<conn_t> client_conns; - void print_stat() const; -#endif - - public: - HotStuffApp(uint32_t blk_size, - double stat_period, - double impeach_timeout, - ReplicaID idx, - const bytearray_t &raw_privkey, - NetAddr plisten_addr, - NetAddr clisten_addr, - hotstuff::pacemaker_bt pmaker, - const EventContext &ec, - size_t nworker, - const Net::Config &repnet_config, - const ClientNetwork<opcode_t>::Config &clinet_config); - - void start(const std::vector<std::tuple<NetAddr, bytearray_t, bytearray_t>> &reps); - void stop(); -}; - -std::pair<std::string, std::string> split_ip_port_cport(const std::string &s) { - auto ret = trim_all(split(s, ";")); - if (ret.size() != 2) - throw std::invalid_argument("invalid cport format"); - return std::make_pair(ret[0], ret[1]); -} - -salticidae::BoxObj<HotStuffApp> papp = nullptr; - -int main(int argc, char **argv) { - Config config("hotstuff.conf"); - - ElapsedTime elapsed; - elapsed.start(); - - auto opt_blk_size = Config::OptValInt::create(1); - auto opt_parent_limit = Config::OptValInt::create(-1); - auto opt_stat_period = Config::OptValDouble::create(10); - auto opt_replicas = Config::OptValStrVec::create(); - auto opt_idx = Config::OptValInt::create(0); - auto opt_client_port = Config::OptValInt::create(-1); - auto opt_privkey = Config::OptValStr::create(); - auto opt_tls_privkey = Config::OptValStr::create(); - auto opt_tls_cert = Config::OptValStr::create(); - auto opt_help = Config::OptValFlag::create(false); - auto opt_pace_maker = Config::OptValStr::create("dummy"); - auto opt_fixed_proposer = Config::OptValInt::create(1); - auto opt_base_timeout = Config::OptValDouble::create(1); - auto opt_prop_delay = Config::OptValDouble::create(1); - auto opt_imp_timeout = Config::OptValDouble::create(11); - auto opt_nworker = Config::OptValInt::create(1); - auto opt_repnworker = Config::OptValInt::create(1); - auto opt_repburst = Config::OptValInt::create(100); - auto opt_clinworker = Config::OptValInt::create(8); - auto opt_cliburst = Config::OptValInt::create(1000); - auto opt_notls = Config::OptValFlag::create(false); - - config.add_opt("block-size", opt_blk_size, Config::SET_VAL); - config.add_opt("parent-limit", opt_parent_limit, Config::SET_VAL); - config.add_opt("stat-period", opt_stat_period, Config::SET_VAL); - config.add_opt("replica", opt_replicas, Config::APPEND, 'a', "add an replica to the list"); - config.add_opt("idx", opt_idx, Config::SET_VAL, 'i', "specify the index in the replica list"); - config.add_opt("cport", opt_client_port, Config::SET_VAL, 'c', "specify the port listening for clients"); - config.add_opt("privkey", opt_privkey, Config::SET_VAL); - config.add_opt("tls-privkey", opt_tls_privkey, Config::SET_VAL); - config.add_opt("tls-cert", opt_tls_cert, Config::SET_VAL); - config.add_opt("pace-maker", opt_pace_maker, Config::SET_VAL, 'p', "specify pace maker (dummy, rr)"); - config.add_opt("proposer", opt_fixed_proposer, Config::SET_VAL, 'l', "set the fixed proposer (for dummy)"); - config.add_opt("base-timeout", opt_base_timeout, Config::SET_VAL, 't', "set the initial timeout for the Round-Robin Pacemaker"); - config.add_opt("prop-delay", opt_prop_delay, Config::SET_VAL, 't', "set the delay that follows the timeout for the Round-Robin Pacemaker"); - config.add_opt("imp-timeout", opt_imp_timeout, Config::SET_VAL, 'u', "set impeachment timeout (for sticky)"); - config.add_opt("nworker", opt_nworker, Config::SET_VAL, 'n', "the number of threads for verification"); - config.add_opt("repnworker", opt_repnworker, Config::SET_VAL, 'm', "the number of threads for replica network"); - config.add_opt("repburst", opt_repburst, Config::SET_VAL, 'b', ""); - config.add_opt("clinworker", opt_clinworker, Config::SET_VAL, 'M', "the number of threads for client network"); - config.add_opt("cliburst", opt_cliburst, Config::SET_VAL, 'B', ""); - config.add_opt("notls", opt_notls, Config::SWITCH_ON, 's', "disable TLS"); - config.add_opt("help", opt_help, Config::SWITCH_ON, 'h', "show this help info"); - - EventContext ec; - config.parse(argc, argv); - if (opt_help->get()) - { - config.print_help(); - exit(0); - } - auto idx = opt_idx->get(); - auto client_port = opt_client_port->get(); - std::vector<std::tuple<std::string, std::string, std::string>> replicas; - for (const auto &s: opt_replicas->get()) - { - auto res = trim_all(split(s, ",")); - if (res.size() != 3) - throw HotStuffError("invalid replica info"); - replicas.push_back(std::make_tuple(res[0], res[1], res[2])); - } - - if (!(0 <= idx && (size_t)idx < replicas.size())) - throw HotStuffError("replica idx out of range"); - std::string binding_addr = std::get<0>(replicas[idx]); - if (client_port == -1) - { - auto p = split_ip_port_cport(binding_addr); - size_t idx; - try { - client_port = stoi(p.second, &idx); - } catch (std::invalid_argument &) { - throw HotStuffError("client port not specified"); - } - } - - NetAddr plisten_addr{split_ip_port_cport(binding_addr).first}; - - auto parent_limit = opt_parent_limit->get(); - hotstuff::pacemaker_bt pmaker; - if (opt_pace_maker->get() == "dummy") - pmaker = new hotstuff::PaceMakerDummyFixed(opt_fixed_proposer->get(), parent_limit); - else - pmaker = new hotstuff::PaceMakerRR(ec, parent_limit, opt_base_timeout->get(), opt_prop_delay->get()); - - HotStuffApp::Net::Config repnet_config; - ClientNetwork<opcode_t>::Config clinet_config; - if (!opt_tls_privkey->get().empty() && !opt_notls->get()) - { - auto tls_priv_key = new salticidae::PKey( - salticidae::PKey::create_privkey_from_der( - hotstuff::from_hex(opt_tls_privkey->get()))); - auto tls_cert = new salticidae::X509( - salticidae::X509::create_from_der( - hotstuff::from_hex(opt_tls_cert->get()))); - repnet_config - .enable_tls(true) - .tls_key(tls_priv_key) - .tls_cert(tls_cert); - } - repnet_config - .burst_size(opt_repburst->get()) - .nworker(opt_repnworker->get()); - clinet_config - .burst_size(opt_cliburst->get()) - .nworker(opt_clinworker->get()); - papp = new HotStuffApp(opt_blk_size->get(), - opt_stat_period->get(), - opt_imp_timeout->get(), - idx, - hotstuff::from_hex(opt_privkey->get()), - plisten_addr, - NetAddr("0.0.0.0", client_port), - std::move(pmaker), - ec, - opt_nworker->get(), - repnet_config, - clinet_config); - std::vector<std::tuple<NetAddr, bytearray_t, bytearray_t>> reps; - for (auto &r: replicas) - { - auto p = split_ip_port_cport(std::get<0>(r)); - reps.push_back(std::make_tuple( - NetAddr(p.first), - hotstuff::from_hex(std::get<1>(r)), - hotstuff::from_hex(std::get<2>(r)))); - } - auto shutdown = [&](int) { papp->stop(); }; - salticidae::SigEvent ev_sigint(ec, shutdown); - salticidae::SigEvent ev_sigterm(ec, shutdown); - ev_sigint.add(SIGINT); - ev_sigterm.add(SIGTERM); - - papp->start(reps); - elapsed.stop(true); - return 0; -} - -HotStuffApp::HotStuffApp(uint32_t blk_size, - double stat_period, - double impeach_timeout, - ReplicaID idx, - const bytearray_t &raw_privkey, - NetAddr plisten_addr, - NetAddr clisten_addr, - hotstuff::pacemaker_bt pmaker, - const EventContext &ec, - size_t nworker, - const Net::Config &repnet_config, - const ClientNetwork<opcode_t>::Config &clinet_config): - HotStuff(blk_size, idx, raw_privkey, - plisten_addr, std::move(pmaker), ec, nworker, repnet_config), - stat_period(stat_period), - impeach_timeout(impeach_timeout), - ec(ec), - cn(req_ec, clinet_config), - clisten_addr(clisten_addr) { - /* prepare the thread used for sending back confirmations */ - resp_tcall = new salticidae::ThreadCall(resp_ec); - req_tcall = new salticidae::ThreadCall(req_ec); - resp_queue.reg_handler(resp_ec, [this](resp_queue_t &q) { - Finality fin; - while (q.try_dequeue(fin)) - { - auto it = unconfirmed.find(fin.cmd_hash); - if (it != unconfirmed.end()) - { - it->second.resolve(fin); - unconfirmed.erase(it); - } - } - return false; - }); - - /* register the handlers for msg from clients */ - cn.reg_handler(salticidae::generic_bind(&HotStuffApp::client_request_cmd_handler, this, _1, _2)); - cn.start(); - cn.listen(clisten_addr); -} - -void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, const conn_t &conn) { - const NetAddr addr = conn->get_addr(); - auto cmd = parse_cmd(msg.serialized); - const auto &cmd_hash = cmd->get_hash(); - HOTSTUFF_LOG_DEBUG("processing %s", std::string(*cmd).c_str()); - exec_command(cmd_hash, [this, addr](Finality fin) { - resp_queue.enqueue(fin); - }); - /* the following function is executed on the dedicated thread for confirming commands */ - resp_tcall->async_call([this, addr, cmd_hash](salticidae::ThreadCall::Handle &) { - auto it = unconfirmed.find(cmd_hash); - if (it == unconfirmed.end()) - it = unconfirmed.insert( - std::make_pair(cmd_hash, promise_t([](promise_t &){}))).first; - it->second.then([this, addr](const Finality &fin) { - try { - cn.send_msg(MsgRespCmd(std::move(fin)), addr); - } catch (std::exception &err) { - HOTSTUFF_LOG_WARN("unable to send to the client: %s", err.what()); - } - }); - }); -} - -void HotStuffApp::start(const std::vector<std::tuple<NetAddr, bytearray_t, bytearray_t>> &reps) { - ev_stat_timer = TimerEvent(ec, [this](TimerEvent &) { - HotStuff::print_stat(); - HotStuffApp::print_stat(); - //HotStuffCore::prune(100); - ev_stat_timer.add(stat_period); - }); - ev_stat_timer.add(stat_period); - impeach_timer = TimerEvent(ec, [this](TimerEvent &) { - if (get_decision_waiting().size()) - get_pace_maker()->impeach(); - reset_imp_timer(); - }); - impeach_timer.add(impeach_timeout); - HOTSTUFF_LOG_INFO("** starting the system with parameters **"); - HOTSTUFF_LOG_INFO("blk_size = %lu", blk_size); - HOTSTUFF_LOG_INFO("conns = %lu", HotStuff::size()); - HOTSTUFF_LOG_INFO("** starting the event loop..."); - HotStuff::start(reps); - cn.reg_conn_handler([this](const salticidae::ConnPool::conn_t &_conn, bool connected) { - auto conn = salticidae::static_pointer_cast<conn_t::type>(_conn); - if (connected) - client_conns.insert(conn); - else - client_conns.erase(conn); - return true; - }); - req_thread = std::thread([this]() { req_ec.dispatch(); }); - resp_thread = std::thread([this]() { resp_ec.dispatch(); }); - /* enter the event main loop */ - ec.dispatch(); -} - -void HotStuffApp::stop() { - papp->req_tcall->async_call([this](salticidae::ThreadCall::Handle &) { - req_ec.stop(); - }); - papp->resp_tcall->async_call([this](salticidae::ThreadCall::Handle &) { - resp_ec.stop(); - }); - - req_thread.join(); - resp_thread.join(); - ec.stop(); -} - -void HotStuffApp::print_stat() const { -#ifdef HOTSTUFF_MSG_STAT - HOTSTUFF_LOG_INFO("--- client msg. (10s) ---"); - size_t _nsent = 0; - size_t _nrecv = 0; - for (const auto &conn: client_conns) - { - if (conn == nullptr) continue; - size_t ns = conn->get_nsent(); - size_t nr = conn->get_nrecv(); - size_t nsb = conn->get_nsentb(); - size_t nrb = conn->get_nrecvb(); - conn->clear_msgstat(); - HOTSTUFF_LOG_INFO("%s: %u(%u), %u(%u)", - std::string(conn->get_addr()).c_str(), ns, nsb, nr, nrb); - _nsent += ns; - _nrecv += nr; - } - HOTSTUFF_LOG_INFO("--- end client msg. ---"); -#endif -} diff --git a/src/hotstuff_client.cpp b/src/hotstuff_client.cpp deleted file mode 100644 index 831b7ff..0000000 --- a/src/hotstuff_client.cpp +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Copyright 2018 VMware - * Copyright 2018 Ted Yin - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <cassert> -#include <random> -#include <signal.h> -#include <sys/time.h> - -#include "salticidae/type.h" -#include "salticidae/netaddr.h" -#include "salticidae/network.h" -#include "salticidae/util.h" - -#include "hotstuff/util.h" -#include "hotstuff/type.h" -#include "hotstuff/client.h" - -using salticidae::Config; - -using hotstuff::ReplicaID; -using hotstuff::NetAddr; -using hotstuff::EventContext; -using hotstuff::MsgReqCmd; -using hotstuff::MsgRespCmd; -using hotstuff::CommandDummy; -using hotstuff::HotStuffError; -using hotstuff::uint256_t; -using hotstuff::opcode_t; -using hotstuff::command_t; - -EventContext ec; -ReplicaID proposer; -size_t max_async_num; -int max_iter_num; -uint32_t cid; -uint32_t cnt = 0; -uint32_t nfaulty; - -struct Request { - command_t cmd; - size_t confirmed; - salticidae::ElapsedTime et; - Request(const command_t &cmd): cmd(cmd), confirmed(0) { et.start(); } -}; - -using Net = salticidae::MsgNetwork<opcode_t>; - -std::unordered_map<ReplicaID, Net::conn_t> conns; -std::unordered_map<const uint256_t, Request> waiting; -std::vector<NetAddr> replicas; -std::vector<std::pair<struct timeval, double>> elapsed; -Net mn(ec, Net::Config()); - -void connect_all() { - for (size_t i = 0; i < replicas.size(); i++) - conns.insert(std::make_pair(i, mn.connect_sync(replicas[i]))); -} - -bool try_send(bool check = true) { - if ((!check || waiting.size() < max_async_num) && max_iter_num) - { - auto cmd = new CommandDummy(cid, cnt++); - MsgReqCmd msg(*cmd); - for (auto &p: conns) mn.send_msg(msg, p.second); -#ifndef HOTSTUFF_ENABLE_BENCHMARK - HOTSTUFF_LOG_INFO("send new cmd %.10s", - get_hex(cmd->get_hash()).c_str()); -#endif - waiting.insert(std::make_pair( - cmd->get_hash(), Request(cmd))); - if (max_iter_num > 0) - max_iter_num--; - return true; - } - return false; -} - -void client_resp_cmd_handler(MsgRespCmd &&msg, const Net::conn_t &) { - auto &fin = msg.fin; - HOTSTUFF_LOG_DEBUG("got %s", std::string(msg.fin).c_str()); - const uint256_t &cmd_hash = fin.cmd_hash; - auto it = waiting.find(cmd_hash); - auto &et = it->second.et; - if (it == waiting.end()) return; - et.stop(); - if (++it->second.confirmed <= nfaulty) return; // wait for f + 1 ack -#ifndef HOTSTUFF_ENABLE_BENCHMARK - HOTSTUFF_LOG_INFO("got %s, wall: %.3f, cpu: %.3f", - std::string(fin).c_str(), - et.elapsed_sec, et.cpu_elapsed_sec); -#else - struct timeval tv; - gettimeofday(&tv, nullptr); - elapsed.push_back(std::make_pair(tv, et.elapsed_sec)); -#endif - waiting.erase(it); - while (try_send()); -} - -std::pair<std::string, std::string> split_ip_port_cport(const std::string &s) { - auto ret = salticidae::trim_all(salticidae::split(s, ";")); - return std::make_pair(ret[0], ret[1]); -} - -int main(int argc, char **argv) { - Config config("hotstuff.conf"); - - auto opt_idx = Config::OptValInt::create(0); - auto opt_replicas = Config::OptValStrVec::create(); - auto opt_max_iter_num = Config::OptValInt::create(100); - auto opt_max_async_num = Config::OptValInt::create(10); - auto opt_cid = Config::OptValInt::create(-1); - - auto shutdown = [&](int) { ec.stop(); }; - salticidae::SigEvent ev_sigint(ec, shutdown); - salticidae::SigEvent ev_sigterm(ec, shutdown); - ev_sigint.add(SIGINT); - ev_sigterm.add(SIGTERM); - - mn.reg_handler(client_resp_cmd_handler); - mn.start(); - - config.add_opt("idx", opt_idx, Config::SET_VAL); - config.add_opt("cid", opt_cid, Config::SET_VAL); - config.add_opt("replica", opt_replicas, Config::APPEND); - config.add_opt("iter", opt_max_iter_num, Config::SET_VAL); - config.add_opt("max-async", opt_max_async_num, Config::SET_VAL); - config.parse(argc, argv); - auto idx = opt_idx->get(); - max_iter_num = opt_max_iter_num->get(); - max_async_num = opt_max_async_num->get(); - std::vector<std::string> raw; - for (const auto &s: opt_replicas->get()) - { - auto res = salticidae::trim_all(salticidae::split(s, ",")); - if (res.size() < 1) - throw HotStuffError("format error"); - raw.push_back(res[0]); - } - - if (!(0 <= idx && (size_t)idx < raw.size() && raw.size() > 0)) - throw std::invalid_argument("out of range"); - cid = opt_cid->get() != -1 ? opt_cid->get() : idx; - for (const auto &p: raw) - { - auto _p = split_ip_port_cport(p); - size_t _; - replicas.push_back(NetAddr(NetAddr(_p.first).ip, htons(stoi(_p.second, &_)))); - } - - nfaulty = (replicas.size() - 1) / 3; - HOTSTUFF_LOG_INFO("nfaulty = %zu", nfaulty); - connect_all(); - while (try_send()); - ec.dispatch(); - -#ifdef HOTSTUFF_ENABLE_BENCHMARK - for (const auto &e: elapsed) - { - char fmt[64]; - struct tm *tmp = localtime(&e.first.tv_sec); - strftime(fmt, sizeof fmt, "%Y-%m-%d %H:%M:%S.%%06u [hotstuff info] %%.6f\n", tmp); - fprintf(stderr, fmt, e.first.tv_usec, e.second); - } -#endif - return 0; -} |