#include <stack>
#include "core.h"
using salticidae::DataStream;
using salticidae::static_pointer_cast;
using salticidae::get_hash;
#define LOG_INFO HOTSTUFF_LOG_INFO
#define LOG_DEBUG HOTSTUFF_LOG_DEBUG
#define LOG_WARN HOTSTUFF_LOG_WARN
namespace hotstuff {
void MsgHotStuff::gen_propose(const Proposal &proposal) {
DataStream s;
set_opcode(PROPOSE);
s << proposal;
set_payload(std::move(s));
}
void MsgHotStuff::parse_propose(Proposal &proposal) const {
DataStream(get_payload()) >> proposal;
}
void MsgHotStuff::gen_vote(const Vote &vote) {
DataStream s;
set_opcode(VOTE);
s << vote;
set_payload(std::move(s));
}
void MsgHotStuff::parse_vote(Vote &vote) const {
DataStream(get_payload()) >> vote;
}
void MsgHotStuff::gen_qfetchblk(const std::vector<uint256_t> &blk_hashes) {
DataStream s;
set_opcode(QUERY_FETCH_BLK);
gen_hash_list(s, blk_hashes);
set_payload(std::move(s));
}
void MsgHotStuff::parse_qfetchblk(std::vector<uint256_t> &blk_hashes) const {
DataStream s(get_payload());
parse_hash_list(s, blk_hashes);
}
void MsgHotStuff::gen_rfetchblk(const std::vector<block_t> &blks) {
DataStream s;
set_opcode(RESP_FETCH_BLK);
s << htole((uint32_t)blks.size());
for (auto blk: blks) s << *blk;
set_payload(std::move(s));
}
void MsgHotStuff::parse_rfetchblk(std::vector<block_t> &blks, HotStuffCore *hsc) const {
DataStream s;
uint32_t size;
s >> size;
size = letoh(size);
blks.resize(size);
for (auto &blk: blks)
{
Block _blk;
_blk.unserialize(s, hsc);
if (!_blk.verify(hsc->get_config()))
blk = hsc->storage->add_blk(std::move(_blk));
else
{
blk = nullptr;
LOG_WARN("block is invalid");
}
}
}
/* The core logic of HotStuff, is farily simple :) */
/*** begin HotStuff protocol logic ***/
HotStuffCore::HotStuffCore(ReplicaID id,
privkey_bt &&priv_key,
int32_t parent_limit):
b0(new Block(true, 1)),
bqc(b0),
bexec(b0),
vheight(0),
priv_key(std::move(priv_key)),
tails{bqc},
id(id),
parent_limit(parent_limit),
storage(new EntityStorage()) {
storage->add_blk(b0);
b0->qc_ref = b0;
}
void HotStuffCore::sanity_check_delivered(const block_t &blk) {
if (!blk->delivered)
throw std::runtime_error("block not delivered");
}
block_t HotStuffCore::sanity_check_delivered(const uint256_t &blk_hash) {
block_t blk = storage->find_blk(blk_hash);
if (blk == nullptr || !blk->delivered)
throw std::runtime_error("block not delivered");
return std::move(blk);
}
bool HotStuffCore::on_deliver_blk(const block_t &blk) {
if (blk->delivered)
{
LOG_WARN("attempt to deliver a block twice");
return false;
}
blk->parents.clear();
for (const auto &hash: blk->parent_hashes)
{
block_t p = sanity_check_delivered(hash);
blk->parents.push_back(p);
}
blk->height = blk->parents[0]->height + 1;
for (const auto &cmd: blk->cmds)
cmd->container = blk;
if (blk->qc)
{
block_t _blk = storage->find_blk(blk->qc->get_blk_hash());
if (_blk == nullptr)
throw std::runtime_error("block referred by qc not fetched");
blk->qc_ref = std::move(_blk);
} // otherwise blk->qc_ref remains null
for (auto pblk: blk->parents) tails.erase(pblk);
tails.insert(blk);
blk->delivered = true;
LOG_DEBUG("delivered %.10s", get_hex(blk->get_hash()).c_str());
return true;
}
void HotStuffCore::check_commit(const block_t &_blk) {
const block_t &blk = _blk->qc_ref;
if (blk->qc_ref == nullptr) return;
if (blk->decision) return;
block_t p = blk->parents[0];
if (p == blk->qc_ref)
{ /* commit */
std::vector<block_t> commit_queue;
block_t b;
for (b = p; b->height > bexec->height; b = b->parents[0])
{ /* todo: also commit the uncles/aunts */
commit_queue.push_back(b);
}
if (b != bexec)
throw std::runtime_error("safety breached :(");
for (auto it = commit_queue.rbegin(); it != commit_queue.rend(); it++)
{
const block_t &blk = *it;
blk->decision = 1;
#ifdef HOTSTUFF_ENABLE_LOG_PROTO
LOG_INFO("commit blk %.10s", get_hex10(blk->get_hash()).c_str());
#endif
for (auto cmd: blk->cmds)
do_decide(cmd);
}
bexec = p;
}
}
bool HotStuffCore::update(const uint256_t &bqc_hash) {
block_t _bqc = sanity_check_delivered(bqc_hash);
if (_bqc->qc_ref == nullptr) return false;
check_commit(_bqc);
if (_bqc->qc_ref->height > bqc->qc_ref->height)
bqc = _bqc;
return true;
}
void HotStuffCore::on_propose(const std::vector<command_t> &cmds) {
size_t nparents = parent_limit < 1 ? tails.size() : parent_limit;
assert(tails.size() > 0);
block_t p = *tails.rbegin();
std::vector<block_t> parents{p};
tails.erase(p);
nparents--;
/* add the rest of tails as "uncles/aunts" */
while (nparents--)
{
auto it = tails.begin();
parents.push_back(*it);
tails.erase(it);
}
quorum_cert_bt qc = nullptr;
block_t qc_ref = nullptr;
if (p != b0 && p->voted.size() >= config.nmajority)
{
qc = p->self_qc->clone();
qc->compute();
qc_ref = p;
}
/* create a new block */
block_t bnew = storage->add_blk(
Block(
parents,
cmds,
p->height + 1,
std::move(qc), qc_ref,
nullptr
));
const uint256_t bnew_hash = bnew->get_hash();
bnew->self_qc = create_quorum_cert(bnew_hash);
on_deliver_blk(bnew);
update(bnew_hash);
Proposal prop(id, bqc->get_hash(), bnew, nullptr);
#ifdef HOTSTUFF_ENABLE_LOG_PROTO
LOG_INFO("propose %s", std::string(*bnew).c_str());
#endif
/* self-vote */
on_receive_vote(
Vote(id, bqc->get_hash(), bnew_hash,
create_part_cert(*priv_key, bnew_hash), this));
on_propose_(bnew);
/* boradcast to other replicas */
do_broadcast_proposal(prop);
}
void HotStuffCore::on_receive_proposal(const Proposal &prop) {
if (!update(prop.bqc_hash)) return;
#ifdef HOTSTUFF_ENABLE_LOG_PROTO
LOG_INFO("got %s", std::string(prop).c_str());
#endif
block_t bnew = prop.blk;
sanity_check_delivered(bnew);
bool opinion = false;
if (bnew->height > vheight)
{
block_t pref = bqc->qc_ref;
block_t b;
for (b = bnew;
b->height > pref->height;
b = b->parents[0]);
opinion = b == pref;
vheight = bnew->height;
}
#ifdef HOTSTUFF_ENABLE_LOG_PROTO
LOG_INFO("now state: %s", std::string(*this).c_str());
#endif
do_vote(prop.proposer,
Vote(id,
bqc->get_hash(),
bnew->get_hash(),
(opinion ?
create_part_cert(*priv_key, bnew->get_hash()) :
nullptr),
nullptr));
}
void HotStuffCore::on_receive_vote(const Vote &vote) {
if (!update(vote.bqc_hash)) return;
#ifdef HOTSTUFF_ENABLE_LOG_PROTO
LOG_INFO("got %s", std::string(vote).c_str());
LOG_INFO("now state: %s", std::string(*this).c_str());
#endif
block_t blk = sanity_check_delivered(vote.blk_hash);
if (vote.cert == nullptr) return;
if (!vote.verify())
{
LOG_WARN("invalid vote");
return;
}
if (!blk->voted.insert(vote.voter).second)
{
LOG_WARN("duplicate votes");
return;
}
size_t qsize = blk->voted.size();
if (qsize <= config.nmajority)
{
blk->self_qc->add_part(vote.voter, *vote.cert);
if (qsize == config.nmajority)
on_qc_finish(blk);
}
}
/*** end HotStuff protocol logic ***/
void HotStuffCore::prune(uint32_t staleness) {
block_t start;
/* skip the blocks */
for (start = bexec; staleness; staleness--, start = start->parents[0])
if (!start->parents.size()) return;
std::stack<block_t> s;
start->qc_ref = nullptr;
s.push(start);
while (!s.empty())
{
auto &blk = s.top();
if (blk->parents.empty())
{
storage->try_release_blk(blk);
s.pop();
continue;
}
blk->qc_ref = nullptr;
s.push(blk->parents.back());
blk->parents.pop_back();
}
}
int8_t HotStuffCore::get_cmd_decision(const uint256_t &cmd_hash) {
auto cmd = storage->find_cmd(cmd_hash);
return cmd != nullptr ? cmd->get_decision() : 0;
}
void HotStuffCore::add_replica(ReplicaID rid, const NetAddr &addr,
pubkey_bt &&pub_key) {
config.add_replica(rid,
ReplicaInfo(rid, addr, std::move(pub_key)));
b0->voted.insert(rid);
}
promise_t HotStuffCore::async_qc_finish(const block_t &blk) {
if (blk->voted.size() >= config.nmajority)
return promise_t([](promise_t &pm) {
pm.resolve();
});
auto it = qc_waiting.find(blk);
if (it == qc_waiting.end())
it = qc_waiting.insert(std::make_pair(blk, promise_t())).first;
return it->second;
}
void HotStuffCore::on_qc_finish(const block_t &blk) {
auto it = qc_waiting.find(blk);
if (it != qc_waiting.end())
{
it->second.resolve();
qc_waiting.erase(it);
}
}
promise_t HotStuffCore::async_wait_propose() {
return propose_waiting;
}
void HotStuffCore::on_propose_(const block_t &blk) {
auto t = std::move(propose_waiting);
propose_waiting = promise_t();
t.resolve(blk);
}
HotStuffCore::operator std::string () const {
DataStream s;
s << "<hotstuff "
<< "bqc=" << get_hex10(bqc->get_hash()) << " "
<< "bexec=" << get_hex10(bqc->get_hash()) << " "
<< "vheight=" << std::to_string(vheight) << " "
<< "tails=" << std::to_string(tails.size()) << ">";
return std::string(std::move(s));
}
void HotStuffBase::add_replica(ReplicaID idx, const NetAddr &addr,
pubkey_bt &&pub_key) {
HotStuffCore::add_replica(idx, addr, std::move(pub_key));
if (addr != listen_addr)
pn.add_peer(addr);
}
void HotStuffBase::on_fetch_blk(const block_t &blk) {
#ifdef HOTSTUFF_ENABLE_TX_PROFILE
blk_profiler.get_tx(blk->get_hash());
#endif
LOG_DEBUG("fetched %.10s", get_hex(blk->get_hash()).c_str());
part_fetched++;
fetched++;
for (auto cmd: blk->get_cmds()) on_fetch_cmd(cmd);
const uint256_t &blk_hash = blk->get_hash();
auto it = blk_fetch_waiting.find(blk_hash);
if (it != blk_fetch_waiting.end())
{
it->second.resolve(blk);
blk_fetch_waiting.erase(it);
}
}
void HotStuffBase::on_fetch_cmd(const command_t &cmd) {
const uint256_t &cmd_hash = cmd->get_hash();
auto it = cmd_fetch_waiting.find(cmd_hash);
if (it != cmd_fetch_waiting.end())
{
it->second.resolve(cmd);
cmd_fetch_waiting.erase(it);
}
}
void HotStuffBase::on_deliver_blk(const block_t &blk) {
const uint256_t &blk_hash = blk->get_hash();
bool valid;
/* sanity check: all parents must be delivered */
for (const auto &p: blk->get_parent_hashes())
assert(storage->is_blk_delivered(p));
if ((valid = HotStuffCore::on_deliver_blk(blk)))
{
LOG_DEBUG("block %.10s delivered",
get_hex(blk_hash).c_str());
part_parent_size += blk->get_parent_hashes().size();
part_delivered++;
delivered++;
}
else
{
LOG_WARN("dropping invalid block");
}
auto it = blk_delivery_waiting.find(blk_hash);
if (it != blk_delivery_waiting.end())
{
auto &pm = it->second;
if (valid)
{
pm.elapsed.stop(false);
auto sec = pm.elapsed.elapsed_sec;
part_delivery_time += sec;
part_delivery_time_min = std::min(part_delivery_time_min, sec);
part_delivery_time_max = std::max(part_delivery_time_max, sec);
pm.resolve(blk);
}
else
{
pm.reject(blk);
// TODO: do we need to also free it from storage?
}
blk_delivery_waiting.erase(it);
}
}
promise_t HotStuffBase::async_fetch_blk(const uint256_t &blk_hash,
const NetAddr *replica_id,
bool fetch_now) {
if (storage->is_blk_fetched(blk_hash))
return promise_t([this, &blk_hash](promise_t pm){
pm.resolve(storage->find_blk(blk_hash));
});
auto it = blk_fetch_waiting.find(blk_hash);
if (it == blk_fetch_waiting.end())
{
#ifdef HOTSTUFF_ENABLE_TX_PROFILE
blk_profiler.rec_tx(blk_hash, false);
#endif
it = blk_fetch_waiting.insert(
std::make_pair(
blk_hash,
BlockFetchContext(blk_hash, this))).first;
}
if (replica_id != nullptr)
it->second.add_replica(*replica_id, fetch_now);
return static_cast<promise_t &>(it->second);
}
promise_t HotStuffBase::async_fetch_cmd(const uint256_t &cmd_hash,
const NetAddr *replica_id,
bool fetch_now) {
if (storage->is_cmd_fetched(cmd_hash))
return promise_t([this, &cmd_hash](promise_t pm){
pm.resolve(storage->find_cmd(cmd_hash));
});
auto it = cmd_fetch_waiting.find(cmd_hash);
if (it == cmd_fetch_waiting.end())
{
it = cmd_fetch_waiting.insert(
std::make_pair(cmd_hash, CmdFetchContext(cmd_hash, this))).first;
}
if (replica_id != nullptr)
it->second.add_replica(*replica_id, fetch_now);
return static_cast<promise_t &>(it->second);
}
promise_t HotStuffBase::async_deliver_blk(const uint256_t &blk_hash,
const NetAddr &replica_id) {
if (storage->is_blk_delivered(blk_hash))
return promise_t([this, &blk_hash](promise_t pm) {
pm.resolve(storage->find_blk(blk_hash));
});
auto it = blk_delivery_waiting.find(blk_hash);
if (it != blk_delivery_waiting.end())
return static_cast<promise_t &>(it->second);
BlockDeliveryContext pm{[](promise_t){}};
it = blk_delivery_waiting.insert(std::make_pair(blk_hash, pm)).first;
/* otherwise the on_deliver_batch will resolve */
async_fetch_blk(blk_hash, &replica_id).then([this, replica_id](block_t blk) {
/* qc_ref should be fetched */
std::vector<promise_t> pms;
const auto &qc = blk->get_qc();
if (qc)
pms.push_back(async_fetch_blk(qc->get_blk_hash(), &replica_id));
/* the parents should be delivered */
for (const auto &phash: blk->get_parent_hashes())
pms.push_back(async_deliver_blk(phash, replica_id));
promise::all(pms).then([this, blk]() {
on_deliver_blk(blk);
});
});
return static_cast<promise_t &>(pm);
}
void HotStuffBase::propose_handler(const MsgHotStuff &msg, conn_t conn_) {
auto conn = static_pointer_cast<PeerNetwork<MsgHotStuff>::Conn>(conn_);
const NetAddr &peer = conn->get_peer();
Proposal prop(this);
msg.parse_propose(prop);
block_t blk = prop.blk;
promise::all(std::vector<promise_t>{
async_deliver_blk(prop.bqc_hash, peer),
async_deliver_blk(blk->get_hash(), peer),
}).then([this, prop = std::move(prop)]() {
on_receive_proposal(prop);
});
}
void HotStuffBase::vote_handler(const MsgHotStuff &msg, conn_t conn_) {
auto conn = static_pointer_cast<PeerNetwork<MsgHotStuff>::Conn>(conn_);
const NetAddr &peer = conn->get_peer();
Vote vote(this);
msg.parse_vote(vote);
promise::all(std::vector<promise_t>{
async_deliver_blk(vote.bqc_hash, peer),
async_deliver_blk(vote.blk_hash, peer)
}).then([this, vote = std::move(vote)]() {
on_receive_vote(vote);
});
}
void HotStuffBase::query_fetch_blk_handler(const MsgHotStuff &msg, conn_t conn_) {
auto conn = static_pointer_cast<PeerNetwork<MsgHotStuff>::Conn>(conn_);
const NetAddr replica = conn->get_peer();
std::vector<uint256_t> blk_hashes;
msg.parse_qfetchblk(blk_hashes);
std::vector<promise_t> pms;
for (const auto &h: blk_hashes)
pms.push_back(async_fetch_blk(h, nullptr));
promise::all(pms).then([replica, this](const promise::values_t values) {
MsgHotStuff resp;
std::vector<block_t> blks;
for (auto &v: values)
{
auto blk = promise::any_cast<block_t>(v);
blks.push_back(blk);
}
resp.gen_rfetchblk(blks);
pn.send_msg(resp, replica);
});
}
void HotStuffBase::resp_fetch_blk_handler(const MsgHotStuff &msg, conn_t) {
std::vector<block_t> blks;
msg.parse_rfetchblk(blks, this);
for (const auto &blk: blks)
if (blk) on_fetch_blk(blk);
}
void HotStuffBase::print_stat() const {
LOG_INFO("===== begin stats =====");
LOG_INFO("-------- queues -------");
LOG_INFO("blk_fetch_waiting: %lu", blk_fetch_waiting.size());
LOG_INFO("blk_delivery_waiting: %lu", blk_delivery_waiting.size());
LOG_INFO("cmd_fetch_waiting: %lu", cmd_fetch_waiting.size());
LOG_INFO("decision_waiting: %lu", decision_waiting.size());
LOG_INFO("-------- misc ---------");
LOG_INFO("fetched: %lu", fetched);
LOG_INFO("delivered: %lu", delivered);
LOG_INFO("cmd_cache: %lu", storage->get_cmd_cache_size());
LOG_INFO("blk_cache: %lu", storage->get_blk_cache_size());
LOG_INFO("------ misc (10s) -----");
LOG_INFO("fetched: %lu", part_fetched);
LOG_INFO("delivered: %lu", part_delivered);
LOG_INFO("decided: %lu", part_decided);
LOG_INFO("gened: %lu", part_gened);
LOG_INFO("avg. parent_size: %.3f",
part_delivered ? part_parent_size / double(part_delivered) : 0);
LOG_INFO("delivery time: %.3f avg, %.3f min, %.3f max",
part_delivered ? part_delivery_time / double(part_delivered) : 0,
part_delivery_time_min == double_inf ? 0 : part_delivery_time_min,
part_delivery_time_max);
part_parent_size = 0;
part_fetched = 0;
part_delivered = 0;
part_decided = 0;
part_gened = 0;
part_delivery_time = 0;
part_delivery_time_min = double_inf;
part_delivery_time_max = 0;
LOG_INFO("-- sent opcode (10s) --");
auto &sent_op = pn.get_sent_by_opcode();
for (auto &op: sent_op)
{
auto &val = op.second;
LOG_INFO("%02x: %lu, %.2fBpm", op.first,
val.first, val.first ? val.second / double(val.first) : 0);
val.first = val.second = 0;
}
LOG_INFO("-- recv opcode (10s) --");
auto &recv_op = pn.get_recv_by_opcode();
for (auto &op: recv_op)
{
auto &val = op.second;
LOG_INFO("%02x: %lu, %.2fBpm", op.first,
val.first, val.first ? val.second / double(val.first) : 0);
val.first = val.second = 0;
}
LOG_INFO("--- replica msg. (10s) ---");
size_t _nsent = 0;
size_t _nrecv = 0;
for (const auto &replica: pn.all_peers())
{
auto conn = pn.get_peer_conn(replica);
size_t ns = conn->get_nsent();
size_t nr = conn->get_nrecv();
conn->clear_nsent();
conn->clear_nrecv();
LOG_INFO("%s: %u, %u, %u",
std::string(replica).c_str(), ns, nr, part_fetched_replica[replica]);
_nsent += ns;
_nrecv += nr;
part_fetched_replica[replica] = 0;
}
nsent += _nsent;
nrecv += _nrecv;
LOG_INFO("sent: %lu", _nsent);
LOG_INFO("recv: %lu", _nrecv);
LOG_INFO("--- replica msg. total ---");
LOG_INFO("sent: %lu", nsent);
LOG_INFO("recv: %lu", nrecv);
LOG_INFO("====== end stats ======");
}
promise_t HotStuffBase::async_decide(const uint256_t &cmd_hash) {
if (get_cmd_decision(cmd_hash))
return promise_t([this, cmd_hash](promise_t pm){
pm.resolve(storage->find_cmd(cmd_hash));
});
/* otherwise the do_decide will resolve the promise */
auto it = decision_waiting.find(cmd_hash);
if (it == decision_waiting.end())
{
promise_t pm{[](promise_t){}};
it = decision_waiting.insert(std::make_pair(cmd_hash, pm)).first;
}
return it->second;
}
HotStuffBase::HotStuffBase(uint32_t blk_size,
int32_t parent_limit,
ReplicaID rid,
privkey_bt &&priv_key,
NetAddr listen_addr,
EventContext eb,
pacemaker_bt pmaker):
HotStuffCore(rid, std::move(priv_key), parent_limit),
listen_addr(listen_addr),
blk_size(blk_size),
eb(eb),
pmaker(std::move(pmaker)),
pn(eb),
fetched(0), delivered(0),
nsent(0), nrecv(0),
part_parent_size(0),
part_fetched(0),
part_delivered(0),
part_decided(0),
part_gened(0),
part_delivery_time(0),
part_delivery_time_min(double_inf),
part_delivery_time_max(0)
{
if (pmaker == nullptr)
this->pmaker = new PaceMakerDummy(this);
/* register the handlers for msg from replicas */
pn.reg_handler(PROPOSE, std::bind(&HotStuffBase::propose_handler, this, _1, _2));
pn.reg_handler(VOTE, std::bind(&HotStuffBase::vote_handler, this, _1, _2));
pn.reg_handler(QUERY_FETCH_BLK, std::bind(&HotStuffBase::query_fetch_blk_handler, this, _1, _2));
pn.reg_handler(RESP_FETCH_BLK, std::bind(&HotStuffBase::resp_fetch_blk_handler, this, _1, _2));
pn.init(listen_addr);
}
void HotStuffBase::do_broadcast_proposal(const Proposal &prop) {
MsgHotStuff prop_msg;
prop_msg.gen_propose(prop);
for (const auto &replica: pn.all_peers())
pn.send_msg(prop_msg, replica);
}
void HotStuffBase::do_vote(ReplicaID last_proposer, const Vote &vote) {
MsgHotStuff vote_msg;
vote_msg.gen_vote(vote);
pmaker->next_proposer(last_proposer)
.then([this, vote_msg](ReplicaID proposer) {
pn.send_msg(vote_msg, get_config().get_addr(proposer));
});
}
void HotStuffBase::do_decide(const command_t &cmd) {
auto it = decision_waiting.find(cmd->get_hash());
if (it != decision_waiting.end())
{
it->second.resolve(cmd);
decision_waiting.erase(it);
}
}
HotStuffBase::~HotStuffBase() {}
void HotStuffBase::start(bool eb_loop) {
/* ((n - 1) + 1 - 1) / 3 */
uint32_t nfaulty = pn.all_peers().size() / 3;
if (nfaulty == 0)
LOG_WARN("too few replicas in the system to tolerate any failure");
on_init(nfaulty);
if (eb_loop)
eb.dispatch();
}
}