aboutsummaryrefslogtreecommitdiff
path: root/src/hotstuff.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/hotstuff.cpp')
-rw-r--r--src/hotstuff.cpp647
1 files changed, 398 insertions, 249 deletions
diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp
index 74d2f04..9582531 100644
--- a/src/hotstuff.cpp
+++ b/src/hotstuff.cpp
@@ -1,288 +1,437 @@
-#include <iostream>
-#include <cstring>
-#include <cassert>
-#include <algorithm>
-#include <random>
-#include <unistd.h>
-#include <signal.h>
-#include <event2/event.h>
-
-#include "salticidae/stream.h"
-#include "salticidae/util.h"
-#include "salticidae/network.h"
-#include "salticidae/msg.h"
-
-#include "hotstuff/promise.hpp"
-#include "hotstuff/type.h"
-#include "hotstuff/core.h"
-#include "hotstuff/entity.h"
-#include "hotstuff/util.h"
-#include "hotstuff/client.h"
-
-using promise::promise_t;
-using salticidae::NetAddr;
-using salticidae::MsgNetwork;
-using salticidae::ClientNetwork;
-using salticidae::Event;
-using salticidae::ElapsedTime;
-using salticidae::Config;
-using salticidae::_1;
-using salticidae::_2;
+#include "hotstuff/hotstuff.h"
+
using salticidae::static_pointer_cast;
-using salticidae::get_hash;
-using salticidae::trim_all;
-using salticidae::split;
-
-using hotstuff::HotStuffError;
-using hotstuff::CommandDummy;
-using hotstuff::Finality;
-using hotstuff::command_t;
-using hotstuff::uint256_t;
-using hotstuff::bytearray_t;
-using hotstuff::DataStream;
-using hotstuff::ReplicaID;
-using hotstuff::MsgClient;
-
-using HotStuff = hotstuff::HotStuffSecp256k1;
#define LOG_INFO HOTSTUFF_LOG_INFO
#define LOG_DEBUG HOTSTUFF_LOG_DEBUG
#define LOG_WARN HOTSTUFF_LOG_WARN
-#define LOG_ERROR HOTSTUFF_LOG_ERROR
-
-class HotStuffApp;
-
-class HotStuffApp: public HotStuff {
- double stat_period;
- /** libevent handle */
- EventContext eb;
- /** network messaging between a replica and its client */
- ClientNetwork<MsgClient> cn;
- /** timer object to schedule a periodic printing of system statistics */
- Event ev_stat_timer;
- /** the binding address for client RPC */
- NetAddr clisten_addr;
-
- using conn_client_t = MsgNetwork<MsgClient>::conn_t;
-
- /** Client */
- /** submits a new command */
- inline void client_request_cmd_handler(const MsgClient &, conn_client_t);
- /** checks if a cmd is decided */
- inline void client_check_cmd_handler(const MsgClient &, conn_client_t);
-
- /** The callback function to print stat */
- inline void print_stat_cb(evutil_socket_t, short);
-
- command_t parse_cmd(DataStream &s) override {
- auto cmd = new CommandDummy();
- s >> *cmd;
- return cmd;
- }
- public:
- HotStuffApp(uint32_t blk_size,
- int32_t parent_limit,
- double stat_period,
- ReplicaID idx,
- const bytearray_t &raw_privkey,
- NetAddr plisten_addr,
- NetAddr clisten_addr,
- const EventContext &eb);
-
- void start();
-};
-
-
-std::pair<std::string, std::string> split_ip_port_cport(const std::string &s) {
- auto ret = trim_all(split(s, ";"));
- if (ret.size() != 2)
- throw std::invalid_argument("invalid cport format");
- return std::make_pair(ret[0], ret[1]);
+namespace hotstuff {
+
+void MsgHotStuff::gen_propose(const Proposal &proposal) {
+ DataStream s;
+ set_opcode(PROPOSE);
+ s << proposal;
+ set_payload(std::move(s));
}
-void signal_handler(int) {
- throw HotStuffError("got terminal signal");
+void MsgHotStuff::parse_propose(Proposal &proposal) const {
+ DataStream(get_payload()) >> proposal;
}
-BoxObj<HotStuffApp> papp = nullptr;
-
-int main(int argc, char **argv) {
- Config config("hotstuff.conf");
-
- ElapsedTime elapsed;
- elapsed.start();
-
- signal(SIGTERM, signal_handler);
- signal(SIGINT, signal_handler);
-
- auto opt_blk_size = Config::OptValInt::create(1);
- auto opt_parent_limit = Config::OptValInt::create(-1);
- auto opt_stat_period = Config::OptValDouble::create(10);
- auto opt_replicas = Config::OptValStrVec::create();
- auto opt_idx = Config::OptValInt::create(0);
- auto opt_client_port = Config::OptValInt::create(-1);
- auto opt_privkey = Config::OptValStr::create();
- auto opt_help = Config::OptValFlag::create(false);
-
- config.add_opt("block-size", opt_blk_size, Config::SET_VAL);
- config.add_opt("parent-limit", opt_parent_limit, Config::SET_VAL);
- config.add_opt("stat-period", opt_stat_period, Config::SET_VAL);
- config.add_opt("replica", opt_replicas, Config::APPEND);
- config.add_opt("idx", opt_idx, Config::SET_VAL);
- config.add_opt("cport", opt_client_port, Config::SET_VAL);
- config.add_opt("privkey", opt_privkey, Config::SET_VAL);
- config.add_opt("help", opt_help, Config::SWITCH_ON, 'h', "show this help info");
-
- EventContext eb;
-#ifndef HOTSTUFF_ENABLE_LOG_DEBUG
- try {
-#endif
- config.parse(argc, argv);
- if (opt_help->get())
- {
- config.print_help();
- exit(0);
- }
- auto idx = opt_idx->get();
- auto client_port = opt_client_port->get();
- std::vector<std::pair<std::string, std::string>> replicas;
- for (const auto &s: opt_replicas->get())
+void MsgHotStuff::gen_vote(const Vote &vote) {
+ DataStream s;
+ set_opcode(VOTE);
+ s << vote;
+ set_payload(std::move(s));
+}
+
+void MsgHotStuff::parse_vote(Vote &vote) const {
+ DataStream(get_payload()) >> vote;
+}
+
+void MsgHotStuff::gen_qfetchblk(const std::vector<uint256_t> &blk_hashes) {
+ DataStream s;
+ set_opcode(QUERY_FETCH_BLK);
+ gen_hash_list(s, blk_hashes);
+ set_payload(std::move(s));
+}
+
+void MsgHotStuff::parse_qfetchblk(std::vector<uint256_t> &blk_hashes) const {
+ DataStream s(get_payload());
+ parse_hash_list(s, blk_hashes);
+}
+
+void MsgHotStuff::gen_rfetchblk(const std::vector<block_t> &blks) {
+ DataStream s;
+ set_opcode(RESP_FETCH_BLK);
+ s << htole((uint32_t)blks.size());
+ for (auto blk: blks) s << *blk;
+ set_payload(std::move(s));
+}
+
+void MsgHotStuff::parse_rfetchblk(std::vector<block_t> &blks, HotStuffCore *hsc) const {
+ DataStream s;
+ uint32_t size;
+ s >> size;
+ size = letoh(size);
+ blks.resize(size);
+ for (auto &blk: blks)
+ {
+ Block _blk;
+ _blk.unserialize(s, hsc);
+ if (!_blk.verify(hsc->get_config()))
+ blk = hsc->storage->add_blk(std::move(_blk));
+ else
{
- auto res = trim_all(split(s, ","));
- if (res.size() != 2)
- throw HotStuffError("invalid replica info");
- replicas.push_back(std::make_pair(res[0], res[1]));
+ blk = nullptr;
+ LOG_WARN("block is invalid");
}
+ }
+}
- if (!(0 <= idx && (size_t)idx < replicas.size()))
- throw HotStuffError("replica idx out of range");
- std::string binding_addr = replicas[idx].first;
- if (client_port == -1)
+void HotStuffBase::add_replica(ReplicaID idx, const NetAddr &addr,
+ pubkey_bt &&pub_key) {
+ HotStuffCore::add_replica(idx, addr, std::move(pub_key));
+ if (addr != listen_addr)
+ pn.add_peer(addr);
+}
+
+void HotStuffBase::on_fetch_blk(const block_t &blk) {
+#ifdef HOTSTUFF_ENABLE_TX_PROFILE
+ blk_profiler.get_tx(blk->get_hash());
+#endif
+ LOG_DEBUG("fetched %.10s", get_hex(blk->get_hash()).c_str());
+ part_fetched++;
+ fetched++;
+ for (auto cmd: blk->get_cmds()) on_fetch_cmd(cmd);
+ const uint256_t &blk_hash = blk->get_hash();
+ auto it = blk_fetch_waiting.find(blk_hash);
+ if (it != blk_fetch_waiting.end())
+ {
+ it->second.resolve(blk);
+ blk_fetch_waiting.erase(it);
+ }
+}
+
+void HotStuffBase::on_fetch_cmd(const command_t &cmd) {
+ const uint256_t &cmd_hash = cmd->get_hash();
+ auto it = cmd_fetch_waiting.find(cmd_hash);
+ if (it != cmd_fetch_waiting.end())
+ {
+ it->second.resolve(cmd);
+ cmd_fetch_waiting.erase(it);
+ }
+}
+
+void HotStuffBase::on_deliver_blk(const block_t &blk) {
+ const uint256_t &blk_hash = blk->get_hash();
+ bool valid;
+ /* sanity check: all parents must be delivered */
+ for (const auto &p: blk->get_parent_hashes())
+ assert(storage->is_blk_delivered(p));
+ if ((valid = HotStuffCore::on_deliver_blk(blk)))
+ {
+ LOG_DEBUG("block %.10s delivered",
+ get_hex(blk_hash).c_str());
+ part_parent_size += blk->get_parent_hashes().size();
+ part_delivered++;
+ delivered++;
+ }
+ else
+ {
+ LOG_WARN("dropping invalid block");
+ }
+
+ auto it = blk_delivery_waiting.find(blk_hash);
+ if (it != blk_delivery_waiting.end())
+ {
+ auto &pm = it->second;
+ if (valid)
{
- auto p = split_ip_port_cport(binding_addr);
- size_t idx;
- try {
- client_port = stoi(p.second, &idx);
- } catch (std::invalid_argument &) {
- throw HotStuffError("client port not specified");
- }
- }
+ pm.elapsed.stop(false);
+ auto sec = pm.elapsed.elapsed_sec;
+ part_delivery_time += sec;
+ part_delivery_time_min = std::min(part_delivery_time_min, sec);
+ part_delivery_time_max = std::max(part_delivery_time_max, sec);
- NetAddr plisten_addr{split_ip_port_cport(binding_addr).first};
-
- papp = new HotStuffApp(opt_blk_size->get(),
- opt_parent_limit->get(),
- opt_stat_period->get(),
- idx,
- hotstuff::from_hex(opt_privkey->get()),
- plisten_addr,
- NetAddr("0.0.0.0", client_port),
- eb);
- for (size_t i = 0; i < replicas.size(); i++)
+ pm.resolve(blk);
+ }
+ else
{
- auto p = split_ip_port_cport(replicas[i].first);
- papp->add_replica(i, NetAddr(p.first),
- hotstuff::from_hex(replicas[i].second));
+ pm.reject(blk);
+ // TODO: do we need to also free it from storage?
}
- papp->start();
-#ifndef HOTSTUFF_ENABLE_LOG_DEBUG
- } catch (std::exception &e) {
- HOTSTUFF_LOG_INFO("exception: %s", e.what());
- elapsed.stop(true);
+ blk_delivery_waiting.erase(it);
}
+}
+
+promise_t HotStuffBase::async_fetch_blk(const uint256_t &blk_hash,
+ const NetAddr *replica_id,
+ bool fetch_now) {
+ if (storage->is_blk_fetched(blk_hash))
+ return promise_t([this, &blk_hash](promise_t pm){
+ pm.resolve(storage->find_blk(blk_hash));
+ });
+ auto it = blk_fetch_waiting.find(blk_hash);
+ if (it == blk_fetch_waiting.end())
+ {
+#ifdef HOTSTUFF_ENABLE_TX_PROFILE
+ blk_profiler.rec_tx(blk_hash, false);
#endif
- return 0;
+ it = blk_fetch_waiting.insert(
+ std::make_pair(
+ blk_hash,
+ BlockFetchContext(blk_hash, this))).first;
+ }
+ if (replica_id != nullptr)
+ it->second.add_replica(*replica_id, fetch_now);
+ return static_cast<promise_t &>(it->second);
}
-HotStuffApp::HotStuffApp(uint32_t blk_size,
- int32_t parent_limit,
- double stat_period,
- ReplicaID idx,
- const bytearray_t &raw_privkey,
- NetAddr plisten_addr,
- NetAddr clisten_addr,
- const EventContext &eb):
- HotStuff(blk_size, parent_limit, idx, raw_privkey,
- plisten_addr, eb),
- stat_period(stat_period),
- eb(eb),
- cn(eb),
- clisten_addr(clisten_addr) {
- /* register the handlers for msg from clients */
- cn.reg_handler(hotstuff::REQ_CMD, std::bind(&HotStuffApp::client_request_cmd_handler, this, _1, _2));
- cn.reg_handler(hotstuff::CHK_CMD, std::bind(&HotStuffApp::client_check_cmd_handler, this, _1, _2));
- cn.init(clisten_addr);
+promise_t HotStuffBase::async_fetch_cmd(const uint256_t &cmd_hash,
+ const NetAddr *replica_id,
+ bool fetch_now) {
+ if (storage->is_cmd_fetched(cmd_hash))
+ return promise_t([this, &cmd_hash](promise_t pm){
+ pm.resolve(storage->find_cmd(cmd_hash));
+ });
+ auto it = cmd_fetch_waiting.find(cmd_hash);
+ if (it == cmd_fetch_waiting.end())
+ {
+ it = cmd_fetch_waiting.insert(
+ std::make_pair(cmd_hash, CmdFetchContext(cmd_hash, this))).first;
+ }
+ if (replica_id != nullptr)
+ it->second.add_replica(*replica_id, fetch_now);
+ return static_cast<promise_t &>(it->second);
}
-void HotStuffApp::client_request_cmd_handler(const MsgClient &msg, conn_client_t conn_) {
- auto conn = static_pointer_cast<ClientNetwork<MsgClient>::Conn>(conn_);
- const NetAddr addr = conn->get_addr();
- command_t cmd = new CommandDummy();
+promise_t HotStuffBase::async_deliver_blk(const uint256_t &blk_hash,
+ const NetAddr &replica_id) {
+ if (storage->is_blk_delivered(blk_hash))
+ return promise_t([this, &blk_hash](promise_t pm) {
+ pm.resolve(storage->find_blk(blk_hash));
+ });
+ auto it = blk_delivery_waiting.find(blk_hash);
+ if (it != blk_delivery_waiting.end())
+ return static_cast<promise_t &>(it->second);
+ BlockDeliveryContext pm{[](promise_t){}};
+ it = blk_delivery_waiting.insert(std::make_pair(blk_hash, pm)).first;
+ /* otherwise the on_deliver_batch will resolve */
+ async_fetch_blk(blk_hash, &replica_id).then([this, replica_id](block_t blk) {
+ /* qc_ref should be fetched */
+ std::vector<promise_t> pms;
+ const auto &qc = blk->get_qc();
+ if (qc)
+ pms.push_back(async_fetch_blk(qc->get_blk_hash(), &replica_id));
+ /* the parents should be delivered */
+ for (const auto &phash: blk->get_parent_hashes())
+ pms.push_back(async_deliver_blk(phash, replica_id));
+ promise::all(pms).then([this, blk]() {
+ on_deliver_blk(blk);
+ });
+ });
+ return static_cast<promise_t &>(pm);
+}
+
+void HotStuffBase::propose_handler(const MsgHotStuff &msg, conn_t conn_) {
+ auto conn = static_pointer_cast<PeerNetwork<MsgHotStuff>::Conn>(conn_);
+ const NetAddr &peer = conn->get_peer();
+ Proposal prop(this);
+ msg.parse_propose(prop);
+ block_t blk = prop.blk;
+ promise::all(std::vector<promise_t>{
+ async_deliver_blk(prop.bqc_hash, peer),
+ async_deliver_blk(blk->get_hash(), peer),
+ }).then([this, prop = std::move(prop)]() {
+ on_receive_proposal(prop);
+ });
+}
+
+void HotStuffBase::vote_handler(const MsgHotStuff &msg, conn_t conn_) {
+ auto conn = static_pointer_cast<PeerNetwork<MsgHotStuff>::Conn>(conn_);
+ const NetAddr &peer = conn->get_peer();
+ Vote vote(this);
+ msg.parse_vote(vote);
+ promise::all(std::vector<promise_t>{
+ async_deliver_blk(vote.bqc_hash, peer),
+ async_deliver_blk(vote.blk_hash, peer)
+ }).then([this, vote = std::move(vote)]() {
+ on_receive_vote(vote);
+ });
+}
+
+void HotStuffBase::query_fetch_blk_handler(const MsgHotStuff &msg, conn_t conn_) {
+ auto conn = static_pointer_cast<PeerNetwork<MsgHotStuff>::Conn>(conn_);
+ const NetAddr replica = conn->get_peer();
+ std::vector<uint256_t> blk_hashes;
+ msg.parse_qfetchblk(blk_hashes);
+
std::vector<promise_t> pms;
- msg.parse_reqcmd(static_cast<CommandDummy &>(*cmd));
+ for (const auto &h: blk_hashes)
+ pms.push_back(async_fetch_blk(h, nullptr));
+ promise::all(pms).then([replica, this](const promise::values_t values) {
+ MsgHotStuff resp;
+ std::vector<block_t> blks;
+ for (auto &v: values)
+ {
+ auto blk = promise::any_cast<block_t>(v);
+ blks.push_back(blk);
+ }
+ resp.gen_rfetchblk(blks);
+ pn.send_msg(resp, replica);
+ });
+}
- bool flag = true;
-#ifndef HOTSTUFF_DISABLE_TX_VERIFY
- flag &= cmd->verify();
-#endif
- if (!flag)
+void HotStuffBase::resp_fetch_blk_handler(const MsgHotStuff &msg, conn_t) {
+ std::vector<block_t> blks;
+ msg.parse_rfetchblk(blks, this);
+ for (const auto &blk: blks)
+ if (blk) on_fetch_blk(blk);
+}
+
+void HotStuffBase::print_stat() const {
+ LOG_INFO("===== begin stats =====");
+ LOG_INFO("-------- queues -------");
+ LOG_INFO("blk_fetch_waiting: %lu", blk_fetch_waiting.size());
+ LOG_INFO("blk_delivery_waiting: %lu", blk_delivery_waiting.size());
+ LOG_INFO("cmd_fetch_waiting: %lu", cmd_fetch_waiting.size());
+ LOG_INFO("decision_waiting: %lu", decision_waiting.size());
+ LOG_INFO("-------- misc ---------");
+ LOG_INFO("fetched: %lu", fetched);
+ LOG_INFO("delivered: %lu", delivered);
+ LOG_INFO("cmd_cache: %lu", storage->get_cmd_cache_size());
+ LOG_INFO("blk_cache: %lu", storage->get_blk_cache_size());
+ LOG_INFO("------ misc (10s) -----");
+ LOG_INFO("fetched: %lu", part_fetched);
+ LOG_INFO("delivered: %lu", part_delivered);
+ LOG_INFO("decided: %lu", part_decided);
+ LOG_INFO("gened: %lu", part_gened);
+ LOG_INFO("avg. parent_size: %.3f",
+ part_delivered ? part_parent_size / double(part_delivered) : 0);
+ LOG_INFO("delivery time: %.3f avg, %.3f min, %.3f max",
+ part_delivered ? part_delivery_time / double(part_delivered) : 0,
+ part_delivery_time_min == double_inf ? 0 : part_delivery_time_min,
+ part_delivery_time_max);
+
+ part_parent_size = 0;
+ part_fetched = 0;
+ part_delivered = 0;
+ part_decided = 0;
+ part_gened = 0;
+ part_delivery_time = 0;
+ part_delivery_time_min = double_inf;
+ part_delivery_time_max = 0;
+ LOG_INFO("-- sent opcode (10s) --");
+ auto &sent_op = pn.get_sent_by_opcode();
+ for (auto &op: sent_op)
{
- LOG_WARN("invalid client cmd");
- MsgClient resp;
- resp.gen_respcmd(cmd->get_hash(), Finality(-1, uint256_t()));
- cn.send_msg(resp, addr);
+ auto &val = op.second;
+ LOG_INFO("%02x: %lu, %.2fBpm", op.first,
+ val.first, val.first ? val.second / double(val.first) : 0);
+ val.first = val.second = 0;
}
- else
+ LOG_INFO("-- recv opcode (10s) --");
+ auto &recv_op = pn.get_recv_by_opcode();
+ for (auto &op: recv_op)
{
- const uint256_t cmd_hash = cmd->get_hash();
- add_command(cmd);
- /** wait for the decision of tx */
- LOG_DEBUG("processing client cmd %.10s", get_hex(cmd_hash).c_str());
- async_decide(cmd_hash).then([this, addr](command_t cmd) {
- MsgClient resp;
- resp.gen_respcmd(cmd->get_hash(), cmd->get_finality());
- cn.send_msg(resp, addr);
+ auto &val = op.second;
+ LOG_INFO("%02x: %lu, %.2fBpm", op.first,
+ val.first, val.first ? val.second / double(val.first) : 0);
+ val.first = val.second = 0;
+ }
+ LOG_INFO("--- replica msg. (10s) ---");
+ size_t _nsent = 0;
+ size_t _nrecv = 0;
+ for (const auto &replica: pn.all_peers())
+ {
+ auto conn = pn.get_peer_conn(replica);
+ size_t ns = conn->get_nsent();
+ size_t nr = conn->get_nrecv();
+ conn->clear_nsent();
+ conn->clear_nrecv();
+ LOG_INFO("%s: %u, %u, %u",
+ std::string(replica).c_str(), ns, nr, part_fetched_replica[replica]);
+ _nsent += ns;
+ _nrecv += nr;
+ part_fetched_replica[replica] = 0;
+ }
+ nsent += _nsent;
+ nrecv += _nrecv;
+ LOG_INFO("sent: %lu", _nsent);
+ LOG_INFO("recv: %lu", _nrecv);
+ LOG_INFO("--- replica msg. total ---");
+ LOG_INFO("sent: %lu", nsent);
+ LOG_INFO("recv: %lu", nrecv);
+ LOG_INFO("====== end stats ======");
+}
+
+promise_t HotStuffBase::async_decide(const uint256_t &cmd_hash) {
+ if (get_cmd_decision(cmd_hash))
+ return promise_t([this, cmd_hash](promise_t pm){
+ pm.resolve(storage->find_cmd(cmd_hash));
});
+ /* otherwise the do_decide will resolve the promise */
+ auto it = decision_waiting.find(cmd_hash);
+ if (it == decision_waiting.end())
+ {
+ promise_t pm{[](promise_t){}};
+ it = decision_waiting.insert(std::make_pair(cmd_hash, pm)).first;
}
+ return it->second;
}
-void HotStuffApp::client_check_cmd_handler(const MsgClient &msg, conn_client_t conn_) {
- auto conn = static_pointer_cast<ClientNetwork<MsgClient>::Conn>(conn_);
- const NetAddr addr = conn->get_addr();
- uint256_t cmd_hash;
- msg.parse_chkcmd(cmd_hash);
- MsgClient resp;
- command_t cmd = storage->find_cmd(cmd_hash);
- Finality fin;
- if (cmd) fin = cmd->get_finality();
- resp.gen_respcmd(cmd_hash, fin);
- cn.send_msg(resp, addr);
+HotStuffBase::HotStuffBase(uint32_t blk_size,
+ int32_t parent_limit,
+ ReplicaID rid,
+ privkey_bt &&priv_key,
+ NetAddr listen_addr,
+ EventContext eb,
+ pacemaker_bt pmaker):
+ HotStuffCore(rid, std::move(priv_key), parent_limit),
+ listen_addr(listen_addr),
+ blk_size(blk_size),
+ eb(eb),
+ pmaker(std::move(pmaker)),
+ pn(eb),
+
+ fetched(0), delivered(0),
+ nsent(0), nrecv(0),
+ part_parent_size(0),
+ part_fetched(0),
+ part_delivered(0),
+ part_decided(0),
+ part_gened(0),
+ part_delivery_time(0),
+ part_delivery_time_min(double_inf),
+ part_delivery_time_max(0)
+{
+ if (pmaker == nullptr)
+ this->pmaker = new PaceMakerDummy(this);
+ /* register the handlers for msg from replicas */
+ pn.reg_handler(PROPOSE, std::bind(&HotStuffBase::propose_handler, this, _1, _2));
+ pn.reg_handler(VOTE, std::bind(&HotStuffBase::vote_handler, this, _1, _2));
+ pn.reg_handler(QUERY_FETCH_BLK, std::bind(&HotStuffBase::query_fetch_blk_handler, this, _1, _2));
+ pn.reg_handler(RESP_FETCH_BLK, std::bind(&HotStuffBase::resp_fetch_blk_handler, this, _1, _2));
+ pn.init(listen_addr);
}
+void HotStuffBase::do_broadcast_proposal(const Proposal &prop) {
+ MsgHotStuff prop_msg;
+ prop_msg.gen_propose(prop);
+ for (const auto &replica: pn.all_peers())
+ pn.send_msg(prop_msg, replica);
+}
-void HotStuffApp::start() {
- ev_stat_timer = Event(eb, -1, 0,
- std::bind(&HotStuffApp::print_stat_cb, this, _1, _2));
- ev_stat_timer.add_with_timeout(stat_period);
- LOG_INFO("** starting the system with parameters **");
- LOG_INFO("blk_size = %lu", blk_size);
- LOG_INFO("parent_limit = %d", parent_limit);
- LOG_INFO("conns = %lu", HotStuff::size());
- LOG_INFO("** starting the event loop...");
-#ifdef HOTSTUFF_DISABLE_TX_VERIFY
- LOG_INFO("!! verification disabled !!");
-#else
- LOG_INFO("** verification enabled **");
-#endif
- HotStuff::start();
- /* enter the event main loop */
- eb.dispatch();
+void HotStuffBase::do_vote(ReplicaID last_proposer, const Vote &vote) {
+ MsgHotStuff vote_msg;
+ vote_msg.gen_vote(vote);
+ pmaker->next_proposer(last_proposer)
+ .then([this, vote_msg](ReplicaID proposer) {
+ pn.send_msg(vote_msg, get_config().get_addr(proposer));
+ });
}
+void HotStuffBase::do_decide(const command_t &cmd) {
+ auto it = decision_waiting.find(cmd->get_hash());
+ if (it != decision_waiting.end())
+ {
+ it->second.resolve(cmd);
+ decision_waiting.erase(it);
+ }
+}
+
+HotStuffBase::~HotStuffBase() {}
+
+void HotStuffBase::start(bool eb_loop) {
+ /* ((n - 1) + 1 - 1) / 3 */
+ uint32_t nfaulty = pn.all_peers().size() / 3;
+ if (nfaulty == 0)
+ LOG_WARN("too few replicas in the system to tolerate any failure");
+ on_init(nfaulty);
+ if (eb_loop)
+ eb.dispatch();
+}
-void HotStuffApp::print_stat_cb(evutil_socket_t, short) {
- HotStuff::print_stat();
- HotStuffCore::prune(100);
- ev_stat_timer.add_with_timeout(stat_period);
}