aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/client.cpp43
-rw-r--r--src/client.h66
-rw-r--r--src/core.cpp723
-rw-r--r--src/core.h631
-rw-r--r--src/crypto.cpp25
-rw-r--r--src/crypto.h386
-rw-r--r--src/entity.cpp35
-rw-r--r--src/entity.h309
-rw-r--r--src/hotstuff.cpp287
-rw-r--r--src/hotstuff_client.cpp181
-rw-r--r--src/hotstuff_keygen.cpp33
-rw-r--r--src/promise.hpp745
-rw-r--r--src/type.h46
-rw-r--r--src/util.cpp7
-rw-r--r--src/util.h119
15 files changed, 3636 insertions, 0 deletions
diff --git a/src/client.cpp b/src/client.cpp
new file mode 100644
index 0000000..f787003
--- /dev/null
+++ b/src/client.cpp
@@ -0,0 +1,43 @@
+#include "client.h"
+
+namespace hotstuff {
+
+uint64_t CommandDummy::cnt = 0;
+
+void MsgClient::gen_reqcmd(const Command &cmd) {
+ DataStream s;
+ set_opcode(REQ_CMD);
+ s << cmd;
+ set_payload(std::move(s));
+}
+
+void MsgClient::parse_reqcmd(CommandDummy &cmd) const {
+ DataStream s(get_payload());
+ s >> cmd;
+}
+
+void MsgClient::gen_respcmd(const uint256_t &cmd_hash, const Finality &fin) {
+ DataStream s;
+ set_opcode(RESP_CMD);
+ s << cmd_hash << fin;
+ set_payload(std::move(s));
+}
+
+void MsgClient::parse_respcmd(uint256_t &cmd_hash, Finality &fin) const {
+ DataStream s(get_payload());
+ s >> cmd_hash >> fin;
+}
+
+void MsgClient::gen_chkcmd(const uint256_t &cmd_hash) {
+ DataStream s;
+ set_opcode(CHK_CMD);
+ s << cmd_hash;
+ set_payload(std::move(s));
+}
+
+void MsgClient::parse_chkcmd(uint256_t &cmd_hash) const {
+ DataStream s(get_payload());
+ s >> cmd_hash;
+}
+
+}
diff --git a/src/client.h b/src/client.h
new file mode 100644
index 0000000..dd1cfee
--- /dev/null
+++ b/src/client.h
@@ -0,0 +1,66 @@
+#ifndef _HOTSTUFF_CLIENT_H
+#define _HOTSTUFF_CLIENT_H
+
+#include "type.h"
+#include "salticidae/msg.h"
+#include "entity.h"
+
+namespace hotstuff {
+
+enum {
+ REQ_CMD = 0x4,
+ RESP_CMD = 0x5,
+ CHK_CMD = 0x6
+};
+
+class CommandDummy: public Command {
+ static uint64_t cnt;
+ uint64_t n;
+ uint256_t hash;
+
+ public:
+
+ CommandDummy() {}
+
+ ~CommandDummy() override {}
+
+ CommandDummy(uint64_t n):
+ n(n), hash(salticidae::get_hash(*this)) {}
+
+ static command_t make_cmd() {
+ return new CommandDummy(cnt++);
+ }
+
+ void serialize(DataStream &s) const override {
+ s << n;
+ }
+
+ void unserialize(DataStream &s) override {
+ s >> n;
+ hash = salticidae::get_hash(*this);
+ }
+
+ const uint256_t &get_hash() const override {
+ return hash;
+ }
+
+ bool verify() const override {
+ return true;
+ }
+};
+
+struct MsgClient: public salticidae::MsgBase<> {
+ using MsgBase::MsgBase;
+ void gen_reqcmd(const Command &cmd);
+ void parse_reqcmd(CommandDummy &cmd) const;
+
+ void gen_respcmd(const uint256_t &cmd_hash, const Finality &fin);
+ void parse_respcmd(uint256_t &cmd_hash, Finality &fin) const;
+
+ void gen_chkcmd(const uint256_t &cmd_hash);
+ void parse_chkcmd(uint256_t &cmd_hash) const;
+};
+
+}
+
+#endif
diff --git a/src/core.cpp b/src/core.cpp
new file mode 100644
index 0000000..d6a4cc7
--- /dev/null
+++ b/src/core.cpp
@@ -0,0 +1,723 @@
+#include <stack>
+#include "core.h"
+
+using salticidae::DataStream;
+using salticidae::static_pointer_cast;
+using salticidae::get_hash;
+
+#define LOG_INFO HOTSTUFF_LOG_INFO
+#define LOG_DEBUG HOTSTUFF_LOG_DEBUG
+#define LOG_WARN HOTSTUFF_LOG_WARN
+
+namespace hotstuff {
+
+void MsgHotStuff::gen_propose(const Proposal &proposal) {
+ DataStream s;
+ set_opcode(PROPOSE);
+ s << proposal;
+ set_payload(std::move(s));
+}
+
+void MsgHotStuff::parse_propose(Proposal &proposal) const {
+ DataStream(get_payload()) >> proposal;
+}
+
+void MsgHotStuff::gen_vote(const Vote &vote) {
+ DataStream s;
+ set_opcode(VOTE);
+ s << vote;
+ set_payload(std::move(s));
+}
+
+void MsgHotStuff::parse_vote(Vote &vote) const {
+ DataStream(get_payload()) >> vote;
+}
+
+void MsgHotStuff::gen_qfetchblk(const std::vector<uint256_t> &blk_hashes) {
+ DataStream s;
+ set_opcode(QUERY_FETCH_BLK);
+ gen_hash_list(s, blk_hashes);
+ set_payload(std::move(s));
+}
+
+void MsgHotStuff::parse_qfetchblk(std::vector<uint256_t> &blk_hashes) const {
+ DataStream s(get_payload());
+ parse_hash_list(s, blk_hashes);
+}
+
+void MsgHotStuff::gen_rfetchblk(const std::vector<block_t> &blks) {
+ DataStream s;
+ set_opcode(RESP_FETCH_BLK);
+ s << htole((uint32_t)blks.size());
+ for (auto blk: blks) s << *blk;
+ set_payload(std::move(s));
+}
+
+void MsgHotStuff::parse_rfetchblk(std::vector<block_t> &blks, HotStuffCore *hsc) const {
+ DataStream s;
+ uint32_t size;
+ s >> size;
+ size = letoh(size);
+ blks.resize(size);
+ for (auto &blk: blks)
+ {
+ Block _blk;
+ _blk.unserialize(s, hsc);
+ if (!_blk.verify(hsc->get_config()))
+ blk = hsc->storage->add_blk(std::move(_blk));
+ else
+ {
+ blk = nullptr;
+ LOG_WARN("block is invalid");
+ }
+ }
+}
+
+/* The core logic of HotStuff, is farily simple :) */
+/*** begin HotStuff protocol logic ***/
+HotStuffCore::HotStuffCore(ReplicaID id,
+ privkey_bt &&priv_key,
+ int32_t parent_limit):
+ b0(new Block(true, 1)),
+ bqc(b0),
+ bexec(b0),
+ vheight(0),
+ priv_key(std::move(priv_key)),
+ tails{bqc},
+ id(id),
+ parent_limit(parent_limit),
+ storage(new EntityStorage()) {
+ storage->add_blk(b0);
+ b0->qc_ref = b0;
+}
+
+void HotStuffCore::sanity_check_delivered(const block_t &blk) {
+ if (!blk->delivered)
+ throw std::runtime_error("block not delivered");
+}
+
+block_t HotStuffCore::sanity_check_delivered(const uint256_t &blk_hash) {
+ block_t blk = storage->find_blk(blk_hash);
+ if (blk == nullptr || !blk->delivered)
+ throw std::runtime_error("block not delivered");
+ return std::move(blk);
+}
+
+bool HotStuffCore::on_deliver_blk(const block_t &blk) {
+ if (blk->delivered)
+ {
+ LOG_WARN("attempt to deliver a block twice");
+ return false;
+ }
+ blk->parents.clear();
+ for (const auto &hash: blk->parent_hashes)
+ {
+ block_t p = sanity_check_delivered(hash);
+ blk->parents.push_back(p);
+ }
+ blk->height = blk->parents[0]->height + 1;
+ for (const auto &cmd: blk->cmds)
+ cmd->container = blk;
+
+ if (blk->qc)
+ {
+ block_t _blk = storage->find_blk(blk->qc->get_blk_hash());
+ if (_blk == nullptr)
+ throw std::runtime_error("block referred by qc not fetched");
+ blk->qc_ref = std::move(_blk);
+ } // otherwise blk->qc_ref remains null
+
+ for (auto pblk: blk->parents) tails.erase(pblk);
+ tails.insert(blk);
+
+ blk->delivered = true;
+ LOG_DEBUG("delivered %.10s", get_hex(blk->get_hash()).c_str());
+ return true;
+}
+
+void HotStuffCore::check_commit(const block_t &_blk) {
+ const block_t &blk = _blk->qc_ref;
+ if (blk->qc_ref == nullptr) return;
+ if (blk->decision) return;
+ block_t p = blk->parents[0];
+ if (p == blk->qc_ref)
+ { /* commit */
+ std::vector<block_t> commit_queue;
+ block_t b;
+ for (b = p; b->height > bexec->height; b = b->parents[0])
+ { /* todo: also commit the uncles/aunts */
+ commit_queue.push_back(b);
+ }
+ if (b != bexec)
+ throw std::runtime_error("safety breached :(");
+ for (auto it = commit_queue.rbegin(); it != commit_queue.rend(); it++)
+ {
+ const block_t &blk = *it;
+ blk->decision = 1;
+#ifdef HOTSTUFF_ENABLE_LOG_PROTO
+ LOG_INFO("commit blk %.10s", get_hex10(blk->get_hash()).c_str());
+#endif
+ for (auto cmd: blk->cmds)
+ do_decide(cmd);
+ }
+ bexec = p;
+ }
+}
+
+bool HotStuffCore::update(const uint256_t &bqc_hash) {
+ block_t _bqc = sanity_check_delivered(bqc_hash);
+ if (_bqc->qc_ref == nullptr) return false;
+ check_commit(_bqc);
+ if (_bqc->qc_ref->height > bqc->qc_ref->height)
+ bqc = _bqc;
+ return true;
+}
+
+void HotStuffCore::on_propose(const std::vector<command_t> &cmds) {
+ size_t nparents = parent_limit < 1 ? tails.size() : parent_limit;
+ assert(tails.size() > 0);
+ block_t p = *tails.rbegin();
+ std::vector<block_t> parents{p};
+ tails.erase(p);
+ nparents--;
+ /* add the rest of tails as "uncles/aunts" */
+ while (nparents--)
+ {
+ auto it = tails.begin();
+ parents.push_back(*it);
+ tails.erase(it);
+ }
+ quorum_cert_bt qc = nullptr;
+ block_t qc_ref = nullptr;
+ if (p != b0 && p->voted.size() >= config.nmajority)
+ {
+ qc = p->self_qc->clone();
+ qc->compute();
+ qc_ref = p;
+ }
+ /* create a new block */
+ block_t bnew = storage->add_blk(
+ Block(
+ parents,
+ cmds,
+ p->height + 1,
+ std::move(qc), qc_ref,
+ nullptr
+ ));
+ const uint256_t bnew_hash = bnew->get_hash();
+ bnew->self_qc = create_quorum_cert(bnew_hash);
+ on_deliver_blk(bnew);
+ update(bnew_hash);
+ Proposal prop(id, bqc->get_hash(), bnew, nullptr);
+#ifdef HOTSTUFF_ENABLE_LOG_PROTO
+ LOG_INFO("propose %s", std::string(*bnew).c_str());
+#endif
+ /* self-vote */
+ on_receive_vote(
+ Vote(id, bqc->get_hash(), bnew_hash,
+ create_part_cert(*priv_key, bnew_hash), this));
+ on_propose_(bnew);
+ /* boradcast to other replicas */
+ do_broadcast_proposal(prop);
+}
+
+void HotStuffCore::on_receive_proposal(const Proposal &prop) {
+ if (!update(prop.bqc_hash)) return;
+#ifdef HOTSTUFF_ENABLE_LOG_PROTO
+ LOG_INFO("got %s", std::string(prop).c_str());
+#endif
+ block_t bnew = prop.blk;
+ sanity_check_delivered(bnew);
+ bool opinion = false;
+ if (bnew->height > vheight)
+ {
+ block_t pref = bqc->qc_ref;
+ block_t b;
+ for (b = bnew;
+ b->height > pref->height;
+ b = b->parents[0]);
+ opinion = b == pref;
+ vheight = bnew->height;
+ }
+#ifdef HOTSTUFF_ENABLE_LOG_PROTO
+ LOG_INFO("now state: %s", std::string(*this).c_str());
+#endif
+ do_vote(prop.proposer,
+ Vote(id,
+ bqc->get_hash(),
+ bnew->get_hash(),
+ (opinion ?
+ create_part_cert(*priv_key, bnew->get_hash()) :
+ nullptr),
+ nullptr));
+}
+
+void HotStuffCore::on_receive_vote(const Vote &vote) {
+ if (!update(vote.bqc_hash)) return;
+#ifdef HOTSTUFF_ENABLE_LOG_PROTO
+ LOG_INFO("got %s", std::string(vote).c_str());
+ LOG_INFO("now state: %s", std::string(*this).c_str());
+#endif
+
+ block_t blk = sanity_check_delivered(vote.blk_hash);
+ if (vote.cert == nullptr) return;
+ if (!vote.verify())
+ {
+ LOG_WARN("invalid vote");
+ return;
+ }
+ if (!blk->voted.insert(vote.voter).second)
+ {
+ LOG_WARN("duplicate votes");
+ return;
+ }
+ size_t qsize = blk->voted.size();
+ if (qsize <= config.nmajority)
+ {
+ blk->self_qc->add_part(vote.voter, *vote.cert);
+ if (qsize == config.nmajority)
+ on_qc_finish(blk);
+ }
+}
+/*** end HotStuff protocol logic ***/
+
+void HotStuffCore::prune(uint32_t staleness) {
+ block_t start;
+ /* skip the blocks */
+ for (start = bexec; staleness; staleness--, start = start->parents[0])
+ if (!start->parents.size()) return;
+ std::stack<block_t> s;
+ start->qc_ref = nullptr;
+ s.push(start);
+ while (!s.empty())
+ {
+ auto &blk = s.top();
+ if (blk->parents.empty())
+ {
+ storage->try_release_blk(blk);
+ s.pop();
+ continue;
+ }
+ blk->qc_ref = nullptr;
+ s.push(blk->parents.back());
+ blk->parents.pop_back();
+ }
+}
+
+int8_t HotStuffCore::get_cmd_decision(const uint256_t &cmd_hash) {
+ auto cmd = storage->find_cmd(cmd_hash);
+ return cmd != nullptr ? cmd->get_decision() : 0;
+}
+
+void HotStuffCore::add_replica(ReplicaID rid, const NetAddr &addr,
+ pubkey_bt &&pub_key) {
+ config.add_replica(rid,
+ ReplicaInfo(rid, addr, std::move(pub_key)));
+ b0->voted.insert(rid);
+}
+
+promise_t HotStuffCore::async_qc_finish(const block_t &blk) {
+ if (blk->voted.size() >= config.nmajority)
+ return promise_t([](promise_t &pm) {
+ pm.resolve();
+ });
+ auto it = qc_waiting.find(blk);
+ if (it == qc_waiting.end())
+ it = qc_waiting.insert(std::make_pair(blk, promise_t())).first;
+ return it->second;
+}
+
+void HotStuffCore::on_qc_finish(const block_t &blk) {
+ auto it = qc_waiting.find(blk);
+ if (it != qc_waiting.end())
+ {
+ it->second.resolve();
+ qc_waiting.erase(it);
+ }
+}
+
+promise_t HotStuffCore::async_wait_propose() {
+ return propose_waiting;
+}
+
+void HotStuffCore::on_propose_(const block_t &blk) {
+ auto t = std::move(propose_waiting);
+ propose_waiting = promise_t();
+ t.resolve(blk);
+}
+
+HotStuffCore::operator std::string () const {
+ DataStream s;
+ s << "<hotstuff "
+ << "bqc=" << get_hex10(bqc->get_hash()) << " "
+ << "bexec=" << get_hex10(bqc->get_hash()) << " "
+ << "vheight=" << std::to_string(vheight) << " "
+ << "tails=" << std::to_string(tails.size()) << ">";
+ return std::string(std::move(s));
+}
+
+void HotStuffBase::add_replica(ReplicaID idx, const NetAddr &addr,
+ pubkey_bt &&pub_key) {
+ HotStuffCore::add_replica(idx, addr, std::move(pub_key));
+ if (addr != listen_addr)
+ pn.add_peer(addr);
+}
+
+void HotStuffBase::on_fetch_blk(const block_t &blk) {
+#ifdef HOTSTUFF_ENABLE_TX_PROFILE
+ blk_profiler.get_tx(blk->get_hash());
+#endif
+ LOG_DEBUG("fetched %.10s", get_hex(blk->get_hash()).c_str());
+ part_fetched++;
+ fetched++;
+ for (auto cmd: blk->get_cmds()) on_fetch_cmd(cmd);
+ const uint256_t &blk_hash = blk->get_hash();
+ auto it = blk_fetch_waiting.find(blk_hash);
+ if (it != blk_fetch_waiting.end())
+ {
+ it->second.resolve(blk);
+ blk_fetch_waiting.erase(it);
+ }
+}
+
+void HotStuffBase::on_fetch_cmd(const command_t &cmd) {
+ const uint256_t &cmd_hash = cmd->get_hash();
+ auto it = cmd_fetch_waiting.find(cmd_hash);
+ if (it != cmd_fetch_waiting.end())
+ {
+ it->second.resolve(cmd);
+ cmd_fetch_waiting.erase(it);
+ }
+}
+
+void HotStuffBase::on_deliver_blk(const block_t &blk) {
+ const uint256_t &blk_hash = blk->get_hash();
+ bool valid;
+ /* sanity check: all parents must be delivered */
+ for (const auto &p: blk->get_parent_hashes())
+ assert(storage->is_blk_delivered(p));
+ if ((valid = HotStuffCore::on_deliver_blk(blk)))
+ {
+ LOG_DEBUG("block %.10s delivered",
+ get_hex(blk_hash).c_str());
+ part_parent_size += blk->get_parent_hashes().size();
+ part_delivered++;
+ delivered++;
+ }
+ else
+ {
+ LOG_WARN("dropping invalid block");
+ }
+
+ auto it = blk_delivery_waiting.find(blk_hash);
+ if (it != blk_delivery_waiting.end())
+ {
+ auto &pm = it->second;
+ if (valid)
+ {
+ pm.elapsed.stop(false);
+ auto sec = pm.elapsed.elapsed_sec;
+ part_delivery_time += sec;
+ part_delivery_time_min = std::min(part_delivery_time_min, sec);
+ part_delivery_time_max = std::max(part_delivery_time_max, sec);
+
+ pm.resolve(blk);
+ }
+ else
+ {
+ pm.reject(blk);
+ // TODO: do we need to also free it from storage?
+ }
+ blk_delivery_waiting.erase(it);
+ }
+}
+
+promise_t HotStuffBase::async_fetch_blk(const uint256_t &blk_hash,
+ const NetAddr *replica_id,
+ bool fetch_now) {
+ if (storage->is_blk_fetched(blk_hash))
+ return promise_t([this, &blk_hash](promise_t pm){
+ pm.resolve(storage->find_blk(blk_hash));
+ });
+ auto it = blk_fetch_waiting.find(blk_hash);
+ if (it == blk_fetch_waiting.end())
+ {
+#ifdef HOTSTUFF_ENABLE_TX_PROFILE
+ blk_profiler.rec_tx(blk_hash, false);
+#endif
+ it = blk_fetch_waiting.insert(
+ std::make_pair(
+ blk_hash,
+ BlockFetchContext(blk_hash, this))).first;
+ }
+ if (replica_id != nullptr)
+ it->second.add_replica(*replica_id, fetch_now);
+ return static_cast<promise_t &>(it->second);
+}
+
+promise_t HotStuffBase::async_fetch_cmd(const uint256_t &cmd_hash,
+ const NetAddr *replica_id,
+ bool fetch_now) {
+ if (storage->is_cmd_fetched(cmd_hash))
+ return promise_t([this, &cmd_hash](promise_t pm){
+ pm.resolve(storage->find_cmd(cmd_hash));
+ });
+ auto it = cmd_fetch_waiting.find(cmd_hash);
+ if (it == cmd_fetch_waiting.end())
+ {
+ it = cmd_fetch_waiting.insert(
+ std::make_pair(cmd_hash, CmdFetchContext(cmd_hash, this))).first;
+ }
+ if (replica_id != nullptr)
+ it->second.add_replica(*replica_id, fetch_now);
+ return static_cast<promise_t &>(it->second);
+}
+
+promise_t HotStuffBase::async_deliver_blk(const uint256_t &blk_hash,
+ const NetAddr &replica_id) {
+ if (storage->is_blk_delivered(blk_hash))
+ return promise_t([this, &blk_hash](promise_t pm) {
+ pm.resolve(storage->find_blk(blk_hash));
+ });
+ auto it = blk_delivery_waiting.find(blk_hash);
+ if (it != blk_delivery_waiting.end())
+ return static_cast<promise_t &>(it->second);
+ BlockDeliveryContext pm{[](promise_t){}};
+ it = blk_delivery_waiting.insert(std::make_pair(blk_hash, pm)).first;
+ /* otherwise the on_deliver_batch will resolve */
+ async_fetch_blk(blk_hash, &replica_id).then([this, replica_id](block_t blk) {
+ /* qc_ref should be fetched */
+ std::vector<promise_t> pms;
+ const auto &qc = blk->get_qc();
+ if (qc)
+ pms.push_back(async_fetch_blk(qc->get_blk_hash(), &replica_id));
+ /* the parents should be delivered */
+ for (const auto &phash: blk->get_parent_hashes())
+ pms.push_back(async_deliver_blk(phash, replica_id));
+ promise::all(pms).then([this, blk]() {
+ on_deliver_blk(blk);
+ });
+ });
+ return static_cast<promise_t &>(pm);
+}
+
+void HotStuffBase::propose_handler(const MsgHotStuff &msg, conn_t conn_) {
+ auto conn = static_pointer_cast<PeerNetwork<MsgHotStuff>::Conn>(conn_);
+ const NetAddr &peer = conn->get_peer();
+ Proposal prop(this);
+ msg.parse_propose(prop);
+ block_t blk = prop.blk;
+ promise::all(std::vector<promise_t>{
+ async_deliver_blk(prop.bqc_hash, peer),
+ async_deliver_blk(blk->get_hash(), peer),
+ }).then([this, prop = std::move(prop)]() {
+ on_receive_proposal(prop);
+ });
+}
+
+void HotStuffBase::vote_handler(const MsgHotStuff &msg, conn_t conn_) {
+ auto conn = static_pointer_cast<PeerNetwork<MsgHotStuff>::Conn>(conn_);
+ const NetAddr &peer = conn->get_peer();
+ Vote vote(this);
+ msg.parse_vote(vote);
+ promise::all(std::vector<promise_t>{
+ async_deliver_blk(vote.bqc_hash, peer),
+ async_deliver_blk(vote.blk_hash, peer)
+ }).then([this, vote = std::move(vote)]() {
+ on_receive_vote(vote);
+ });
+}
+
+void HotStuffBase::query_fetch_blk_handler(const MsgHotStuff &msg, conn_t conn_) {
+ auto conn = static_pointer_cast<PeerNetwork<MsgHotStuff>::Conn>(conn_);
+ const NetAddr replica = conn->get_peer();
+ std::vector<uint256_t> blk_hashes;
+ msg.parse_qfetchblk(blk_hashes);
+
+ std::vector<promise_t> pms;
+ for (const auto &h: blk_hashes)
+ pms.push_back(async_fetch_blk(h, nullptr));
+ promise::all(pms).then([replica, this](const promise::values_t values) {
+ MsgHotStuff resp;
+ std::vector<block_t> blks;
+ for (auto &v: values)
+ {
+ auto blk = promise::any_cast<block_t>(v);
+ blks.push_back(blk);
+ }
+ resp.gen_rfetchblk(blks);
+ pn.send_msg(resp, replica);
+ });
+}
+
+void HotStuffBase::resp_fetch_blk_handler(const MsgHotStuff &msg, conn_t) {
+ std::vector<block_t> blks;
+ msg.parse_rfetchblk(blks, this);
+ for (const auto &blk: blks)
+ if (blk) on_fetch_blk(blk);
+}
+
+void HotStuffBase::print_stat() const {
+ LOG_INFO("===== begin stats =====");
+ LOG_INFO("-------- queues -------");
+ LOG_INFO("blk_fetch_waiting: %lu", blk_fetch_waiting.size());
+ LOG_INFO("blk_delivery_waiting: %lu", blk_delivery_waiting.size());
+ LOG_INFO("cmd_fetch_waiting: %lu", cmd_fetch_waiting.size());
+ LOG_INFO("decision_waiting: %lu", decision_waiting.size());
+ LOG_INFO("-------- misc ---------");
+ LOG_INFO("fetched: %lu", fetched);
+ LOG_INFO("delivered: %lu", delivered);
+ LOG_INFO("cmd_cache: %lu", storage->get_cmd_cache_size());
+ LOG_INFO("blk_cache: %lu", storage->get_blk_cache_size());
+ LOG_INFO("------ misc (10s) -----");
+ LOG_INFO("fetched: %lu", part_fetched);
+ LOG_INFO("delivered: %lu", part_delivered);
+ LOG_INFO("decided: %lu", part_decided);
+ LOG_INFO("gened: %lu", part_gened);
+ LOG_INFO("avg. parent_size: %.3f",
+ part_delivered ? part_parent_size / double(part_delivered) : 0);
+ LOG_INFO("delivery time: %.3f avg, %.3f min, %.3f max",
+ part_delivered ? part_delivery_time / double(part_delivered) : 0,
+ part_delivery_time_min == double_inf ? 0 : part_delivery_time_min,
+ part_delivery_time_max);
+
+ part_parent_size = 0;
+ part_fetched = 0;
+ part_delivered = 0;
+ part_decided = 0;
+ part_gened = 0;
+ part_delivery_time = 0;
+ part_delivery_time_min = double_inf;
+ part_delivery_time_max = 0;
+ LOG_INFO("-- sent opcode (10s) --");
+ auto &sent_op = pn.get_sent_by_opcode();
+ for (auto &op: sent_op)
+ {
+ auto &val = op.second;
+ LOG_INFO("%02x: %lu, %.2fBpm", op.first,
+ val.first, val.first ? val.second / double(val.first) : 0);
+ val.first = val.second = 0;
+ }
+ LOG_INFO("-- recv opcode (10s) --");
+ auto &recv_op = pn.get_recv_by_opcode();
+ for (auto &op: recv_op)
+ {
+ auto &val = op.second;
+ LOG_INFO("%02x: %lu, %.2fBpm", op.first,
+ val.first, val.first ? val.second / double(val.first) : 0);
+ val.first = val.second = 0;
+ }
+ LOG_INFO("--- replica msg. (10s) ---");
+ size_t _nsent = 0;
+ size_t _nrecv = 0;
+ for (const auto &replica: pn.all_peers())
+ {
+ auto conn = pn.get_peer_conn(replica);
+ size_t ns = conn->get_nsent();
+ size_t nr = conn->get_nrecv();
+ conn->clear_nsent();
+ conn->clear_nrecv();
+ LOG_INFO("%s: %u, %u, %u",
+ std::string(replica).c_str(), ns, nr, part_fetched_replica[replica]);
+ _nsent += ns;
+ _nrecv += nr;
+ part_fetched_replica[replica] = 0;
+ }
+ nsent += _nsent;
+ nrecv += _nrecv;
+ LOG_INFO("sent: %lu", _nsent);
+ LOG_INFO("recv: %lu", _nrecv);
+ LOG_INFO("--- replica msg. total ---");
+ LOG_INFO("sent: %lu", nsent);
+ LOG_INFO("recv: %lu", nrecv);
+ LOG_INFO("====== end stats ======");
+}
+
+promise_t HotStuffBase::async_decide(const uint256_t &cmd_hash) {
+ if (get_cmd_decision(cmd_hash))
+ return promise_t([this, cmd_hash](promise_t pm){
+ pm.resolve(storage->find_cmd(cmd_hash));
+ });
+ /* otherwise the do_decide will resolve the promise */
+ auto it = decision_waiting.find(cmd_hash);
+ if (it == decision_waiting.end())
+ {
+ promise_t pm{[](promise_t){}};
+ it = decision_waiting.insert(std::make_pair(cmd_hash, pm)).first;
+ }
+ return it->second;
+}
+
+HotStuffBase::HotStuffBase(uint32_t blk_size,
+ int32_t parent_limit,
+ ReplicaID rid,
+ privkey_bt &&priv_key,
+ NetAddr listen_addr,
+ EventContext eb,
+ pacemaker_bt pmaker):
+ HotStuffCore(rid, std::move(priv_key), parent_limit),
+ listen_addr(listen_addr),
+ blk_size(blk_size),
+ eb(eb),
+ pmaker(std::move(pmaker)),
+ pn(eb),
+
+ fetched(0), delivered(0),
+ nsent(0), nrecv(0),
+ part_parent_size(0),
+ part_fetched(0),
+ part_delivered(0),
+ part_decided(0),
+ part_gened(0),
+ part_delivery_time(0),
+ part_delivery_time_min(double_inf),
+ part_delivery_time_max(0)
+{
+ if (pmaker == nullptr)
+ this->pmaker = new PaceMakerDummy(this);
+ /* register the handlers for msg from replicas */
+ pn.reg_handler(PROPOSE, std::bind(&HotStuffBase::propose_handler, this, _1, _2));
+ pn.reg_handler(VOTE, std::bind(&HotStuffBase::vote_handler, this, _1, _2));
+ pn.reg_handler(QUERY_FETCH_BLK, std::bind(&HotStuffBase::query_fetch_blk_handler, this, _1, _2));
+ pn.reg_handler(RESP_FETCH_BLK, std::bind(&HotStuffBase::resp_fetch_blk_handler, this, _1, _2));
+ pn.init(listen_addr);
+}
+
+void HotStuffBase::do_broadcast_proposal(const Proposal &prop) {
+ MsgHotStuff prop_msg;
+ prop_msg.gen_propose(prop);
+ for (const auto &replica: pn.all_peers())
+ pn.send_msg(prop_msg, replica);
+}
+
+void HotStuffBase::do_vote(ReplicaID last_proposer, const Vote &vote) {
+ MsgHotStuff vote_msg;
+ vote_msg.gen_vote(vote);
+ pmaker->next_proposer(last_proposer)
+ .then([this, vote_msg](ReplicaID proposer) {
+ pn.send_msg(vote_msg, get_config().get_addr(proposer));
+ });
+}
+
+void HotStuffBase::do_decide(const command_t &cmd) {
+ auto it = decision_waiting.find(cmd->get_hash());
+ if (it != decision_waiting.end())
+ {
+ it->second.resolve(cmd);
+ decision_waiting.erase(it);
+ }
+}
+
+HotStuffBase::~HotStuffBase() {}
+
+void HotStuffBase::start(bool eb_loop) {
+ /* ((n - 1) + 1 - 1) / 3 */
+ uint32_t nfaulty = pn.all_peers().size() / 3;
+ if (nfaulty == 0)
+ LOG_WARN("too few replicas in the system to tolerate any failure");
+ on_init(nfaulty);
+ if (eb_loop)
+ eb.dispatch();
+}
+
+}
diff --git a/src/core.h b/src/core.h
new file mode 100644
index 0000000..c7e1fe6
--- /dev/null
+++ b/src/core.h
@@ -0,0 +1,631 @@
+#ifndef _HOTSTUFF_CORE_H
+#define _HOTSTUFF_CORE_H
+
+#include <queue>
+#include <set>
+#include <unordered_map>
+#include <unordered_set>
+
+#include "salticidae/stream.h"
+#include "salticidae/util.h"
+#include "salticidae/network.h"
+#include "salticidae/msg.h"
+
+#include "promise.hpp"
+#include "util.h"
+#include "entity.h"
+#include "crypto.h"
+
+using salticidae::EventContext;
+using salticidae::Event;
+using salticidae::NetAddr;
+using salticidae::MsgNetwork;
+using salticidae::PeerNetwork;
+using salticidae::ElapsedTime;
+using salticidae::_1;
+using salticidae::_2;
+
+namespace hotstuff {
+
+const double ent_waiting_timeout = 10;
+const double double_inf = 1e10;
+
+enum {
+ PROPOSE = 0x0,
+ VOTE = 0x1,
+ QUERY_FETCH_BLK = 0x2,
+ RESP_FETCH_BLK = 0x3,
+};
+
+using promise::promise_t;
+
+struct Proposal;
+struct Vote;
+
+/** Abstraction for HotStuff protocol state machine (without network implementation). */
+class HotStuffCore {
+ block_t b0; /** the genesis block */
+ /* === state variables === */
+ /** block containing the QC for the highest block having one */
+ block_t bqc;
+ block_t bexec; /**< last executed block */
+ uint32_t vheight; /**< height of the block last voted for */
+ /* === auxilliary variables === */
+ privkey_bt priv_key; /**< private key for signing votes */
+ std::set<block_t, BlockHeightCmp> tails; /**< set of tail blocks */
+ ReplicaConfig config; /**< replica configuration */
+ /* === async event queues === */
+ std::unordered_map<block_t, promise_t> qc_waiting;
+ promise_t propose_waiting;
+
+ block_t sanity_check_delivered(const uint256_t &blk_hash);
+ void sanity_check_delivered(const block_t &blk);
+ void check_commit(const block_t &_bqc);
+ bool update(const uint256_t &bqc_hash);
+ void on_qc_finish(const block_t &blk);
+ void on_propose_(const block_t &blk);
+
+ protected:
+ ReplicaID id; /**< identity of the replica itself */
+ const int32_t parent_limit; /**< maximum number of parents */
+
+ public:
+ BoxObj<EntityStorage> storage;
+
+ HotStuffCore(ReplicaID id,
+ privkey_bt &&priv_key,
+ int32_t parent_limit);
+ virtual ~HotStuffCore() = default;
+
+ /* Inputs of the state machine triggered by external events, should called
+ * by the class user, with proper invariants. */
+
+ /** Call to initialize the protocol, should be called once before all other
+ * functions. */
+ void on_init(uint32_t nfaulty) { config.nmajority = 2 * nfaulty + 1; }
+
+ /** Call to deliver a block.
+ * A block is only delivered if itself is fetched, the block for the
+ * contained qc is fetched and all parents are delivered. The user should
+ * always ensure this invariant. The invalid blocks will be dropped by this
+ * function.
+ * @return true if valid */
+ bool on_deliver_blk(const block_t &blk);
+
+ /** Call upon the delivery of a proposal message.
+ * The block mentioned in the message should be already delivered. */
+ void on_receive_proposal(const Proposal &prop);
+
+ /** Call upon the delivery of a vote message.
+ * The block mentioned in the message should be already delivered. */
+ void on_receive_vote(const Vote &vote);
+
+ /** Call to submit new commands to be decided (executed). */
+ void on_propose(const std::vector<command_t> &cmds);
+
+ /* Functions required to construct concrete instances for abstract classes.
+ * */
+
+ /* Outputs of the state machine triggering external events. The virtual
+ * functions should be implemented by the user to specify the behavior upon
+ * the events. */
+ protected:
+ /** Called by HotStuffCore upon the decision being made for cmd. */
+ virtual void do_decide(const command_t &cmd) = 0;
+ /** Called by HotStuffCore upon broadcasting a new proposal.
+ * The user should send the proposal message to all replicas except for
+ * itself. */
+ virtual void do_broadcast_proposal(const Proposal &prop) = 0;
+ /** Called upon sending out a new vote to the next proposer. The user
+ * should send the vote message to a *good* proposer to have good liveness,
+ * while safety is always guaranteed by HotStuffCore. */
+ virtual void do_vote(ReplicaID last_proposer, const Vote &vote) = 0;
+
+ /* The user plugs in the detailed instances for those
+ * polymorphic data types. */
+ public:
+ /** Create a partial certificate that proves the vote for a block. */
+ virtual part_cert_bt create_part_cert(const PrivKey &priv_key, const uint256_t &blk_hash) = 0;
+ /** Create a partial certificate from its seralized form. */
+ virtual part_cert_bt parse_part_cert(DataStream &s) = 0;
+ /** Create a quorum certificate that proves 2f+1 votes for a block. */
+ virtual quorum_cert_bt create_quorum_cert(const uint256_t &blk_hash) = 0;
+ /** Create a quorum certificate from its serialized form. */
+ virtual quorum_cert_bt parse_quorum_cert(DataStream &s) = 0;
+ /** Create a command object from its serialized form. */
+ virtual command_t parse_cmd(DataStream &s) = 0;
+
+ public:
+ /** Add a replica to the current configuration. This should only be called
+ * before running HotStuffCore protocol. */
+ void add_replica(ReplicaID rid, const NetAddr &addr, pubkey_bt &&pub_key);
+ /** Try to prune blocks lower than last committed height - staleness. */
+ void prune(uint32_t staleness);
+
+ /* PaceMaker can use these functions to monitor the core protocol state
+ * transition */
+ /** Get a promise resolved when the block gets a QC. */
+ promise_t async_qc_finish(const block_t &blk);
+ /** Get a promise resolved when a new block is proposed. */
+ promise_t async_wait_propose();
+
+ /* Other useful functions */
+ block_t get_genesis() { return b0; }
+ const ReplicaConfig &get_config() { return config; }
+ int8_t get_cmd_decision(const uint256_t &cmd_hash);
+ ReplicaID get_id() { return id; }
+ operator std::string () const;
+};
+
+/** Abstraction for proposal messages. */
+struct Proposal: public Serializable {
+ ReplicaID proposer;
+ /** hash for the block containing the highest QC */
+ uint256_t bqc_hash;
+ /** block being proposed */
+ block_t blk;
+
+ /** handle of the core object to allow polymorphism. The user should use
+ * a pointer to the object of the class derived from HotStuffCore */
+ HotStuffCore *hsc;
+
+ Proposal(HotStuffCore *hsc): blk(nullptr), hsc(hsc) {}
+ Proposal(ReplicaID proposer,
+ const uint256_t &bqc_hash,
+ block_t &blk,
+ HotStuffCore *hsc):
+ proposer(proposer),
+ bqc_hash(bqc_hash),
+ blk(blk), hsc(hsc) {}
+
+ void serialize(DataStream &s) const override {
+ s << proposer
+ << bqc_hash
+ << *blk;
+ }
+
+ void unserialize(DataStream &s) override {
+ assert(hsc != nullptr);
+ s >> proposer
+ >> bqc_hash;
+ Block _blk;
+ _blk.unserialize(s, hsc);
+ blk = hsc->storage->add_blk(std::move(_blk));
+ }
+
+ operator std::string () const {
+ DataStream s;
+ s << "<proposal "
+ << "rid=" << std::to_string(proposer) << " "
+ << "bqc=" << get_hex10(bqc_hash) << " "
+ << "blk=" << get_hex10(blk->get_hash()) << ">";
+ return std::string(std::move(s));
+ }
+};
+
+/** Abstraction for vote messages. */
+struct Vote: public Serializable {
+ ReplicaID voter;
+ /** hash for the block containing the highest QC */
+ uint256_t bqc_hash;
+ /** block being voted */
+ uint256_t blk_hash;
+ /** proof of validity for the vote (nullptr for a negative vote) */
+ part_cert_bt cert;
+
+ /** handle of the core object to allow polymorphism */
+ HotStuffCore *hsc;
+
+ Vote(HotStuffCore *hsc): cert(nullptr), hsc(hsc) {}
+ Vote(ReplicaID voter,
+ const uint256_t &bqc_hash,
+ const uint256_t &blk_hash,
+ part_cert_bt &&cert,
+ HotStuffCore *hsc):
+ voter(voter),
+ bqc_hash(bqc_hash),
+ blk_hash(blk_hash),
+ cert(std::move(cert)), hsc(hsc) {}
+
+ Vote(const Vote &other):
+ voter(other.voter),
+ bqc_hash(other.bqc_hash),
+ blk_hash(other.blk_hash),
+ cert(other.cert->clone()),
+ hsc(other.hsc) {}
+
+ Vote(Vote &&other) = default;
+
+ void serialize(DataStream &s) const override {
+ s << voter
+ << bqc_hash
+ << blk_hash;
+ if (cert == nullptr)
+ s << (uint8_t)0;
+ else
+ s << (uint8_t)1 << *cert;
+ }
+
+ void unserialize(DataStream &s) override {
+ assert(hsc != nullptr);
+ uint8_t has_cert;
+ s >> voter
+ >> bqc_hash
+ >> blk_hash
+ >> has_cert;
+ cert = has_cert ? hsc->parse_part_cert(s) : nullptr;
+ }
+
+ bool verify() const {
+ assert(hsc != nullptr);
+ return cert->verify(hsc->get_config().get_pubkey(voter)) &&
+ cert->get_blk_hash() == blk_hash;
+ }
+
+ operator std::string () const {
+ DataStream s;
+ s << "<vote "
+ << "rid=" << std::to_string(voter) << " "
+ << "bqc=" << get_hex10(bqc_hash) << " "
+ << "blk=" << get_hex10(blk_hash) << " "
+ << "cert=" << (cert ? "yes" : "no") << ">";
+ return std::string(std::move(s));
+ }
+};
+
+/** Abstraction for liveness gadget (oracle). */
+class PaceMaker {
+ public:
+ virtual ~PaceMaker() = default;
+ /** Get a promise resolved when the pace maker thinks it is a *good* time
+ * to issue new commands. When promise is resolved with the ID of itself,
+ * the replica should propose the command, otherwise it will forward the
+ * command to the proposer indicated by the ID. */
+ virtual promise_t beat() = 0;
+ /** Get a promise resolved when the pace maker thinks it is a *good* time
+ * to vote for a block. The promise is resolved with the next proposer's ID
+ * */
+ virtual promise_t next_proposer(ReplicaID last_proposer) = 0;
+};
+
+using pacemaker_bt = BoxObj<PaceMaker>;
+
+/** A pace maker that waits for the qc of the last proposed block. */
+class PaceMakerDummy: public PaceMaker {
+ HotStuffCore *hsc;
+ std::queue<promise_t> pending_beats;
+ block_t last_proposed;
+ bool locked;
+
+ void schedule_next() {
+ if (!pending_beats.empty() && !locked)
+ {
+ auto pm = pending_beats.front();
+ pending_beats.pop();
+ hsc->async_qc_finish(last_proposed).then(
+ [id = hsc->get_id(), pm]() {
+ pm.resolve(id);
+ });
+ locked = true;
+ }
+ }
+
+ void update_last_proposed() {
+ hsc->async_wait_propose().then([this](block_t blk) {
+ update_last_proposed();
+ last_proposed = blk;
+ locked = false;
+ schedule_next();
+ });
+ }
+
+ public:
+ PaceMakerDummy(HotStuffCore *hsc):
+ hsc(hsc),
+ last_proposed(hsc->get_genesis()),
+ locked(false) {
+ update_last_proposed();
+ }
+
+ promise_t beat() override {
+ promise_t pm;
+ pending_beats.push(pm);
+ schedule_next();
+ return pm;
+ }
+
+ promise_t next_proposer(ReplicaID last_proposer) override {
+ return promise_t([last_proposer](promise_t &pm) {
+ pm.resolve(last_proposer);
+ });
+ }
+};
+
+/** Network message format for HotStuff. */
+struct MsgHotStuff: public salticidae::MsgBase<> {
+ using MsgBase::MsgBase;
+ void gen_propose(const Proposal &);
+ void parse_propose(Proposal &) const;
+
+ void gen_vote(const Vote &);
+ void parse_vote(Vote &) const;
+
+ void gen_qfetchblk(const std::vector<uint256_t> &blk_hashes);
+ void parse_qfetchblk(std::vector<uint256_t> &blk_hashes) const;
+
+ void gen_rfetchblk(const std::vector<block_t> &blks);
+ void parse_rfetchblk(std::vector<block_t> &blks, HotStuffCore *hsc) const;
+};
+
+using promise::promise_t;
+
+class HotStuffBase;
+
+template<EntityType ent_type>
+class FetchContext: public promise_t {
+ Event timeout;
+ HotStuffBase *hs;
+ MsgHotStuff fetch_msg;
+ const uint256_t ent_hash;
+ std::unordered_set<NetAddr> replica_ids;
+ inline void timeout_cb(evutil_socket_t, short);
+ public:
+ FetchContext(const FetchContext &) = delete;
+ FetchContext &operator=(const FetchContext &) = delete;
+ FetchContext(FetchContext &&other);
+
+ FetchContext(const uint256_t &ent_hash, HotStuffBase *hs);
+ ~FetchContext() {}
+
+ inline void send(const NetAddr &replica_id);
+ inline void reset_timeout();
+ inline void add_replica(const NetAddr &replica_id, bool fetch_now = true);
+};
+
+class BlockDeliveryContext: public promise_t {
+ public:
+ ElapsedTime elapsed;
+ BlockDeliveryContext &operator=(const BlockDeliveryContext &) = delete;
+ BlockDeliveryContext(const BlockDeliveryContext &other):
+ promise_t(static_cast<const promise_t &>(other)),
+ elapsed(other.elapsed) {}
+ BlockDeliveryContext(BlockDeliveryContext &&other):
+ promise_t(static_cast<const promise_t &>(other)),
+ elapsed(std::move(other.elapsed)) {}
+ template<typename Func>
+ BlockDeliveryContext(Func callback): promise_t(callback) {
+ elapsed.start();
+ }
+};
+
+
+/** HotStuff protocol (with network implementation). */
+class HotStuffBase: public HotStuffCore {
+ using BlockFetchContext = FetchContext<ENT_TYPE_BLK>;
+ using CmdFetchContext = FetchContext<ENT_TYPE_CMD>;
+ using conn_t = MsgNetwork<MsgHotStuff>::conn_t;
+
+ friend BlockFetchContext;
+ friend CmdFetchContext;
+
+ protected:
+ /** the binding address in replica network */
+ NetAddr listen_addr;
+ /** the block size */
+ size_t blk_size;
+ /** libevent handle */
+ EventContext eb;
+ pacemaker_bt pmaker;
+
+ private:
+ /** whether libevent handle is owned by itself */
+ bool eb_loop;
+ /** network stack */
+ PeerNetwork<MsgHotStuff> pn;
+#ifdef HOTSTUFF_ENABLE_BLK_PROFILE
+ BlockProfiler blk_profiler;
+#endif
+ /* queues for async tasks */
+ std::unordered_map<const uint256_t, BlockFetchContext> blk_fetch_waiting;
+ std::unordered_map<const uint256_t, BlockDeliveryContext> blk_delivery_waiting;
+ std::unordered_map<const uint256_t, CmdFetchContext> cmd_fetch_waiting;
+ std::unordered_map<const uint256_t, promise_t> decision_waiting;
+ std::queue<command_t> cmd_pending;
+
+ /* statistics */
+ uint64_t fetched;
+ uint64_t delivered;
+ mutable uint64_t nsent;
+ mutable uint64_t nrecv;
+
+ mutable uint32_t part_parent_size;
+ mutable uint32_t part_fetched;
+ mutable uint32_t part_delivered;
+ mutable uint32_t part_decided;
+ mutable uint32_t part_gened;
+ mutable double part_delivery_time;
+ mutable double part_delivery_time_min;
+ mutable double part_delivery_time_max;
+ mutable std::unordered_map<const NetAddr, uint32_t> part_fetched_replica;
+
+ void on_fetch_cmd(const command_t &cmd);
+ void on_fetch_blk(const block_t &blk);
+ void on_deliver_blk(const block_t &blk);
+
+ /** deliver consensus message: <propose> */
+ inline void propose_handler(const MsgHotStuff &, conn_t);
+ /** deliver consensus message: <vote> */
+ inline void vote_handler(const MsgHotStuff &, conn_t);
+ /** fetches full block data */
+ inline void query_fetch_blk_handler(const MsgHotStuff &, conn_t);
+ /** receives a block */
+ inline void resp_fetch_blk_handler(const MsgHotStuff &, conn_t);
+
+ void do_broadcast_proposal(const Proposal &) override;
+ void do_vote(ReplicaID, const Vote &) override;
+ void do_decide(const command_t &) override;
+
+ public:
+ HotStuffBase(uint32_t blk_size,
+ int32_t parent_limit,
+ ReplicaID rid,
+ privkey_bt &&priv_key,
+ NetAddr listen_addr,
+ EventContext eb = EventContext(),
+ pacemaker_bt pmaker = nullptr);
+
+ ~HotStuffBase();
+
+ /* the API for HotStuffBase */
+
+ /* Submit the command to be decided. */
+ void add_command(command_t cmd) {
+ cmd_pending.push(storage->add_cmd(cmd));
+ if (cmd_pending.size() >= blk_size)
+ {
+ std::vector<command_t> cmds;
+ for (uint32_t i = 0; i < blk_size; i++)
+ {
+ cmds.push_back(cmd_pending.front());
+ cmd_pending.pop();
+ }
+ pmaker->beat().then([this, cmds = std::move(cmds)]() {
+ on_propose(cmds);
+ });
+ }
+ }
+
+ void add_replica(ReplicaID idx, const NetAddr &addr, pubkey_bt &&pub_key);
+ void start(bool eb_loop = false);
+
+ size_t size() const { return pn.all_peers().size(); }
+ void print_stat() const;
+
+ /* Helper functions */
+ /** Returns a promise resolved (with command_t cmd) when Command is fetched. */
+ promise_t async_fetch_cmd(const uint256_t &cmd_hash, const NetAddr *replica_id, bool fetch_now = true);
+ /** Returns a promise resolved (with block_t blk) when Block is fetched. */
+ promise_t async_fetch_blk(const uint256_t &blk_hash, const NetAddr *replica_id, bool fetch_now = true);
+ /** Returns a promise resolved (with block_t blk) when Block is delivered (i.e. prefix is fetched). */
+ promise_t async_deliver_blk(const uint256_t &blk_hash, const NetAddr &replica_id);
+ /** Returns a promise resolved (with command_t cmd) when Command is decided. */
+ promise_t async_decide(const uint256_t &cmd_hash);
+};
+
+/** HotStuff protocol (templated by cryptographic implementation). */
+template<typename PrivKeyType = PrivKeyDummy,
+ typename PubKeyType = PubKeyDummy,
+ typename PartCertType = PartCertDummy,
+ typename QuorumCertType = QuorumCertDummy>
+class HotStuff: public HotStuffBase {
+ using HotStuffBase::HotStuffBase;
+ protected:
+
+ part_cert_bt create_part_cert(const PrivKey &priv_key, const uint256_t &blk_hash) override {
+ return new PartCertType(
+ static_cast<const PrivKeyType &>(priv_key),
+ blk_hash);
+ }
+
+ part_cert_bt parse_part_cert(DataStream &s) override {
+ PartCert *pc = new PartCertType();
+ s >> *pc;
+ return pc;
+ }
+
+ quorum_cert_bt create_quorum_cert(const uint256_t &blk_hash) override {
+ return new QuorumCertType(get_config(), blk_hash);
+ }
+
+ quorum_cert_bt parse_quorum_cert(DataStream &s) override {
+ QuorumCert *qc = new QuorumCertType();
+ s >> *qc;
+ return qc;
+ }
+
+ public:
+ HotStuff(uint32_t blk_size,
+ int32_t parent_limit,
+ ReplicaID rid,
+ const bytearray_t &raw_privkey,
+ NetAddr listen_addr,
+ EventContext eb = nullptr):
+ HotStuffBase(blk_size,
+ parent_limit,
+ rid,
+ new PrivKeyType(raw_privkey),
+ listen_addr,
+ eb) {}
+
+ void add_replica(ReplicaID idx, const NetAddr &addr, const bytearray_t &pubkey_raw) {
+ DataStream s(pubkey_raw);
+ HotStuffBase::add_replica(idx, addr, new PubKeyType(pubkey_raw));
+ }
+};
+
+using HotStuffNoSig = HotStuff<>;
+using HotStuffSecp256k1 = HotStuff<PrivKeySecp256k1, PubKeySecp256k1,
+ PartCertSecp256k1, QuorumCertSecp256k1>;
+
+template<EntityType ent_type>
+FetchContext<ent_type>::FetchContext(FetchContext && other):
+ promise_t(static_cast<const promise_t &>(other)),
+ hs(other.hs),
+ fetch_msg(std::move(other.fetch_msg)),
+ ent_hash(other.ent_hash),
+ replica_ids(std::move(other.replica_ids)) {
+ other.timeout.del();
+ timeout = Event(hs->eb, -1, 0,
+ std::bind(&FetchContext::timeout_cb, this, _1, _2));
+ reset_timeout();
+}
+
+template<>
+inline void FetchContext<ENT_TYPE_CMD>::timeout_cb(evutil_socket_t, short) {
+ HOTSTUFF_LOG_WARN("cmd fetching %.10s timeout", get_hex(ent_hash).c_str());
+ for (const auto &replica_id: replica_ids)
+ send(replica_id);
+ reset_timeout();
+}
+
+template<>
+inline void FetchContext<ENT_TYPE_BLK>::timeout_cb(evutil_socket_t, short) {
+ HOTSTUFF_LOG_WARN("block fetching %.10s timeout", get_hex(ent_hash).c_str());
+ for (const auto &replica_id: replica_ids)
+ send(replica_id);
+ reset_timeout();
+}
+
+template<EntityType ent_type>
+FetchContext<ent_type>::FetchContext(
+ const uint256_t &ent_hash, HotStuffBase *hs):
+ promise_t([](promise_t){}),
+ hs(hs), ent_hash(ent_hash) {
+ fetch_msg.gen_qfetchblk(std::vector<uint256_t>{ent_hash});
+
+ timeout = Event(hs->eb, -1, 0,
+ std::bind(&FetchContext::timeout_cb, this, _1, _2));
+ reset_timeout();
+}
+
+template<EntityType ent_type>
+void FetchContext<ent_type>::send(const NetAddr &replica_id) {
+ hs->part_fetched_replica[replica_id]++;
+ hs->pn.send_msg(fetch_msg, replica_id);
+}
+
+template<EntityType ent_type>
+void FetchContext<ent_type>::reset_timeout() {
+ timeout.add_with_timeout(salticidae::gen_rand_timeout(ent_waiting_timeout));
+}
+
+template<EntityType ent_type>
+void FetchContext<ent_type>::add_replica(const NetAddr &replica_id, bool fetch_now) {
+ if (replica_ids.empty() && fetch_now)
+ send(replica_id);
+ replica_ids.insert(replica_id);
+}
+
+}
+
+#endif
diff --git a/src/crypto.cpp b/src/crypto.cpp
new file mode 100644
index 0000000..335a521
--- /dev/null
+++ b/src/crypto.cpp
@@ -0,0 +1,25 @@
+#include "entity.h"
+#include "crypto.h"
+
+namespace hotstuff {
+
+secp256k1_context_t secp256k1_default_sign_ctx = new Secp256k1Context(true);
+secp256k1_context_t secp256k1_default_verify_ctx = new Secp256k1Context(false);
+
+QuorumCertSecp256k1::QuorumCertSecp256k1(
+ const ReplicaConfig &config, const uint256_t &blk_hash):
+ QuorumCert(), blk_hash(blk_hash), rids(config.nmajority) {
+ rids.clear();
+}
+
+bool QuorumCertSecp256k1::verify(const ReplicaConfig &config) {
+ bytearray_t _blk_hash(blk_hash);
+ if (rids.size() < config.nmajority) return false;
+ for (size_t i = 0; i < rids.size(); i++)
+ if (!sigs[i].verify(_blk_hash,
+ static_cast<const PubKeySecp256k1 &>(config.get_pubkey(rids.get(i)))))
+ return false;
+ return true;
+}
+
+}
diff --git a/src/crypto.h b/src/crypto.h
new file mode 100644
index 0000000..2fbf745
--- /dev/null
+++ b/src/crypto.h
@@ -0,0 +1,386 @@
+#ifndef _HOTSTUFF_CRYPTO_H
+#define _HOTSTUFF_CRYPTO_H
+
+#include "salticidae/crypto.h"
+#include "salticidae/ref.h"
+#include "secp256k1.h"
+#include <openssl/rand.h>
+#include "type.h"
+
+using salticidae::RcObj;
+using salticidae::BoxObj;
+
+namespace hotstuff {
+
+using salticidae::SHA256;
+
+class PubKey: public Serializable, Cloneable {
+ public:
+ virtual ~PubKey() = default;
+ virtual PubKey *clone() override = 0;
+};
+
+using pubkey_bt = BoxObj<PubKey>;
+
+class PrivKey: public Serializable {
+ public:
+ virtual ~PrivKey() = default;
+ virtual pubkey_bt get_pubkey() const = 0;
+ virtual void from_rand() = 0;
+};
+
+using privkey_bt = BoxObj<PrivKey>;
+
+class PartCert: public Serializable, public Cloneable {
+ public:
+ virtual ~PartCert() = default;
+ virtual bool verify(const PubKey &pubkey) = 0;
+ virtual const uint256_t &get_blk_hash() const = 0;
+ virtual PartCert *clone() override = 0;
+};
+
+class ReplicaConfig;
+
+class QuorumCert: public Serializable, public Cloneable {
+ public:
+ virtual ~QuorumCert() = default;
+ virtual void add_part(ReplicaID replica, const PartCert &pc) = 0;
+ virtual void compute() = 0;
+ virtual bool verify(const ReplicaConfig &config) = 0;
+ virtual const uint256_t &get_blk_hash() const = 0;
+ virtual QuorumCert *clone() override = 0;
+};
+
+using part_cert_bt = BoxObj<PartCert>;
+using quorum_cert_bt = BoxObj<QuorumCert>;
+
+class PubKeyDummy: public PubKey {
+ PubKeyDummy *clone() override { return new PubKeyDummy(*this); }
+ void serialize(DataStream &) const override {}
+ void unserialize(DataStream &) override {}
+};
+
+class PrivKeyDummy: public PrivKey {
+ pubkey_bt get_pubkey() const override { return new PubKeyDummy(); }
+ void serialize(DataStream &) const override {}
+ void unserialize(DataStream &) override {}
+ void from_rand() override {}
+};
+
+class PartCertDummy: public PartCert {
+ uint256_t blk_hash;
+ public:
+ PartCertDummy() {}
+ PartCertDummy(const uint256_t &blk_hash):
+ blk_hash(blk_hash) {}
+
+ void serialize(DataStream &s) const override {
+ s << (uint32_t)0 << blk_hash;
+ }
+
+ void unserialize(DataStream &s) override {
+ uint32_t tmp;
+ s >> tmp >> blk_hash;
+ }
+
+ PartCert *clone() override {
+ return new PartCertDummy(blk_hash);
+ }
+
+ bool verify(const PubKey &) override { return true; }
+
+ const uint256_t &get_blk_hash() const override { return blk_hash; }
+};
+
+class QuorumCertDummy: public QuorumCert {
+ uint256_t blk_hash;
+ public:
+ QuorumCertDummy() {}
+ QuorumCertDummy(const ReplicaConfig &, const uint256_t &blk_hash):
+ blk_hash(blk_hash) {}
+
+ void serialize(DataStream &s) const override {
+ s << (uint32_t)1 << blk_hash;
+ }
+
+ void unserialize(DataStream &s) override {
+ uint32_t tmp;
+ s >> tmp >> blk_hash;
+ }
+
+ QuorumCert *clone() override {
+ return new QuorumCertDummy(*this);
+ }
+
+ void add_part(ReplicaID, const PartCert &) override {}
+ void compute() override {}
+ bool verify(const ReplicaConfig &) override { return true; }
+
+ const uint256_t &get_blk_hash() const override { return blk_hash; }
+};
+
+
+class Secp256k1Context {
+ secp256k1_context *ctx;
+ friend class PubKeySecp256k1;
+ friend class SigSecp256k1;
+ public:
+ Secp256k1Context(bool sign = false):
+ ctx(secp256k1_context_create(
+ sign ? SECP256K1_CONTEXT_SIGN :
+ SECP256K1_CONTEXT_VERIFY)) {}
+
+ Secp256k1Context(const Secp256k1Context &) = delete;
+
+ Secp256k1Context(Secp256k1Context &&other): ctx(other.ctx) {
+ other.ctx = nullptr;
+ }
+
+ ~Secp256k1Context() {
+ if (ctx) secp256k1_context_destroy(ctx);
+ }
+};
+
+using secp256k1_context_t = RcObj<Secp256k1Context>;
+
+extern secp256k1_context_t secp256k1_default_sign_ctx;
+extern secp256k1_context_t secp256k1_default_verify_ctx;
+
+class PrivKeySecp256k1;
+
+class PubKeySecp256k1: public PubKey {
+ static const auto _olen = 33;
+ friend class SigSecp256k1;
+ secp256k1_pubkey data;
+ secp256k1_context_t ctx;
+
+ public:
+ PubKeySecp256k1(const secp256k1_context_t &ctx =
+ secp256k1_default_sign_ctx):
+ PubKey(), ctx(ctx) {}
+
+ PubKeySecp256k1(const bytearray_t &raw_bytes,
+ const secp256k1_context_t &ctx =
+ secp256k1_default_sign_ctx):
+ PubKeySecp256k1(ctx) { from_bytes(raw_bytes); }
+
+ inline PubKeySecp256k1(const PrivKeySecp256k1 &priv_key,
+ const secp256k1_context_t &ctx =
+ secp256k1_default_sign_ctx);
+
+ void serialize(DataStream &s) const override {
+ static uint8_t output[_olen];
+ size_t olen = _olen;
+ (void)secp256k1_ec_pubkey_serialize(
+ ctx->ctx, (unsigned char *)output,
+ &olen, &data, SECP256K1_EC_COMPRESSED);
+ s.put_data(output, output + _olen);
+ }
+
+ void unserialize(DataStream &s) override {
+ static const auto _exc = std::invalid_argument("ill-formed public key");
+ try {
+ if (!secp256k1_ec_pubkey_parse(
+ ctx->ctx, &data, s.get_data_inplace(_olen), _olen))
+ throw _exc;
+ } catch (std::ios_base::failure &) {
+ throw _exc;
+ }
+ }
+
+ PubKeySecp256k1 *clone() override {
+ return new PubKeySecp256k1(*this);
+ }
+};
+
+class PrivKeySecp256k1: public PrivKey {
+ static const auto nbytes = 32;
+ friend class PubKeySecp256k1;
+ friend class SigSecp256k1;
+ uint8_t data[nbytes];
+ secp256k1_context_t ctx;
+
+ public:
+ PrivKeySecp256k1(const secp256k1_context_t &ctx =
+ secp256k1_default_sign_ctx):
+ PrivKey(), ctx(ctx) {}
+
+ PrivKeySecp256k1(const bytearray_t &raw_bytes,
+ const secp256k1_context_t &ctx =
+ secp256k1_default_sign_ctx):
+ PrivKeySecp256k1(ctx) { from_bytes(raw_bytes); }
+
+ void serialize(DataStream &s) const override {
+ s.put_data(data, data + nbytes);
+ }
+
+ void unserialize(DataStream &s) override {
+ static const auto _exc = std::invalid_argument("ill-formed private key");
+ try {
+ memmove(data, s.get_data_inplace(nbytes), nbytes);
+ } catch (std::ios_base::failure &) {
+ throw _exc;
+ }
+ }
+
+ void from_rand() override {
+ if (!RAND_bytes(data, nbytes))
+ throw std::runtime_error("cannot get rand bytes from openssl");
+ }
+
+ inline pubkey_bt get_pubkey() const override;
+};
+
+pubkey_bt PrivKeySecp256k1::get_pubkey() const {
+ return new PubKeySecp256k1(*this, ctx);
+}
+
+PubKeySecp256k1::PubKeySecp256k1(
+ const PrivKeySecp256k1 &priv_key,
+ const secp256k1_context_t &ctx): PubKey(), ctx(ctx) {
+ if (!secp256k1_ec_pubkey_create(ctx->ctx, &data, priv_key.data))
+ throw std::invalid_argument("invalid secp256k1 private key");
+}
+
+class SigSecp256k1: public Serializable {
+ secp256k1_ecdsa_signature data;
+ secp256k1_context_t ctx;
+
+ void check_msg_length(const bytearray_t &msg) {
+ if (msg.size() != 32)
+ throw std::invalid_argument("the message should be 32-bytes");
+ }
+
+ public:
+ SigSecp256k1(const secp256k1_context_t &ctx =
+ secp256k1_default_sign_ctx):
+ Serializable(), ctx(ctx) {}
+ SigSecp256k1(const uint256_t &digest,
+ const PrivKeySecp256k1 &priv_key,
+ secp256k1_context_t &ctx =
+ secp256k1_default_sign_ctx):
+ Serializable(), ctx(ctx) {
+ sign(digest, priv_key);
+ }
+
+ void serialize(DataStream &s) const override {
+ static uint8_t output[64];
+ (void)secp256k1_ecdsa_signature_serialize_compact(
+ ctx->ctx, (unsigned char *)output,
+ &data);
+ s.put_data(output, output + 64);
+ }
+
+ void unserialize(DataStream &s) override {
+ static const auto _exc = std::invalid_argument("ill-formed signature");
+ try {
+ if (!secp256k1_ecdsa_signature_parse_compact(
+ ctx->ctx, &data, s.get_data_inplace(64)))
+ throw _exc;
+ } catch (std::ios_base::failure &) {
+ throw _exc;
+ }
+ }
+
+ void sign(const bytearray_t &msg, const PrivKeySecp256k1 &priv_key) {
+ check_msg_length(msg);
+ if (!secp256k1_ecdsa_sign(
+ ctx->ctx, &data,
+ (unsigned char *)&*msg.begin(),
+ (unsigned char *)priv_key.data,
+ NULL, // default nonce function
+ NULL))
+ throw std::invalid_argument("failed to create secp256k1 signature");
+ }
+
+ bool verify(const bytearray_t &msg, const PubKeySecp256k1 &pub_key,
+ const secp256k1_context_t &_ctx) {
+ check_msg_length(msg);
+ return secp256k1_ecdsa_verify(
+ _ctx->ctx, &data,
+ (unsigned char *)&*msg.begin(),
+ &pub_key.data) == 1;
+ }
+
+ bool verify(const bytearray_t &msg, const PubKeySecp256k1 &pub_key) {
+ return verify(msg, pub_key, ctx);
+ }
+};
+
+class PartCertSecp256k1: public SigSecp256k1, public PartCert {
+ uint256_t blk_hash;
+
+ public:
+ PartCertSecp256k1() = default;
+ PartCertSecp256k1(const PrivKeySecp256k1 &priv_key, const uint256_t &blk_hash):
+ SigSecp256k1(blk_hash, priv_key),
+ PartCert(),
+ blk_hash(blk_hash) {}
+
+ bool verify(const PubKey &pub_key) override {
+ return SigSecp256k1::verify(blk_hash,
+ static_cast<const PubKeySecp256k1 &>(pub_key),
+ secp256k1_default_verify_ctx);
+ }
+
+ const uint256_t &get_blk_hash() const override { return blk_hash; }
+
+ PartCertSecp256k1 *clone() override {
+ return new PartCertSecp256k1(*this);
+ }
+
+ void serialize(DataStream &s) const override {
+ s << blk_hash;
+ this->SigSecp256k1::serialize(s);
+ }
+
+ void unserialize(DataStream &s) override {
+ s >> blk_hash;
+ this->SigSecp256k1::unserialize(s);
+ }
+};
+
+class QuorumCertSecp256k1: public QuorumCert {
+ uint256_t blk_hash;
+ salticidae::Bits rids;
+ std::vector<SigSecp256k1> sigs;
+
+ public:
+ QuorumCertSecp256k1() = default;
+ QuorumCertSecp256k1(const ReplicaConfig &config, const uint256_t &blk_hash);
+
+ void add_part(ReplicaID rid, const PartCert &pc) override {
+ if (pc.get_blk_hash() != blk_hash)
+ throw std::invalid_argument("PartCert does match the block hash");
+ if (!rids.get(rid))
+ {
+ rids.set(rid);
+ sigs.push_back(static_cast<const PartCertSecp256k1 &>(pc));
+ }
+ }
+
+ void compute() override {}
+
+ bool verify(const ReplicaConfig &config) override;
+
+ const uint256_t &get_blk_hash() const override { return blk_hash; }
+
+ QuorumCertSecp256k1 *clone() override {
+ return new QuorumCertSecp256k1(*this);
+ }
+
+ void serialize(DataStream &s) const override {
+ s << blk_hash << rids;
+ for (const auto &sig: sigs) s << sig;
+ }
+
+ void unserialize(DataStream &s) override {
+ s >> blk_hash >> rids;
+ sigs.resize(rids.size());
+ for (auto &sig: sigs) s >> sig;
+ }
+};
+
+}
+
+#endif
diff --git a/src/entity.cpp b/src/entity.cpp
new file mode 100644
index 0000000..1294484
--- /dev/null
+++ b/src/entity.cpp
@@ -0,0 +1,35 @@
+#include "entity.h"
+#include "core.h"
+
+namespace hotstuff {
+
+void Block::serialize(DataStream &s) const {
+ s << (uint32_t)parent_hashes.size();
+ for (const auto &hash: parent_hashes)
+ s << hash;
+ s << (uint32_t)cmds.size();
+ for (auto cmd: cmds)
+ s << *cmd;
+ if (qc == nullptr)
+ s << (uint8_t)0;
+ else
+ s << (uint8_t)1 << *qc;
+}
+
+void Block::unserialize(DataStream &s, HotStuffCore *hsc) {
+ uint32_t n;
+ uint8_t has_qc;
+ s >> n;
+ parent_hashes.resize(n);
+ for (auto &hash: parent_hashes)
+ s >> hash;
+ s >> n;
+ cmds.resize(n);
+ for (auto &cmd: cmds)
+ cmd = hsc->parse_cmd(s);
+ s >> has_qc;
+ qc = has_qc ? hsc->parse_quorum_cert(s) : nullptr;
+ this->hash = salticidae::get_hash(*this);
+}
+
+}
diff --git a/src/entity.h b/src/entity.h
new file mode 100644
index 0000000..b3a0df4
--- /dev/null
+++ b/src/entity.h
@@ -0,0 +1,309 @@
+#ifndef _HOTSTUFF_ENT_H
+#define _HOTSTUFF_ENT_H
+
+#include <vector>
+#include <unordered_map>
+#include <unordered_set>
+#include <string>
+#include <cstddef>
+#include <ios>
+
+#include "salticidae/netaddr.h"
+#include "salticidae/ref.h"
+#include "type.h"
+#include "util.h"
+#include "crypto.h"
+
+namespace hotstuff {
+
+enum EntityType {
+ ENT_TYPE_CMD = 0x0,
+ ENT_TYPE_BLK = 0x1
+};
+
+struct ReplicaInfo {
+ ReplicaID id;
+ salticidae::NetAddr addr;
+ pubkey_bt pubkey;
+
+ ReplicaInfo(ReplicaID id,
+ const salticidae::NetAddr &addr,
+ pubkey_bt &&pubkey):
+ id(id), addr(addr), pubkey(std::move(pubkey)) {}
+
+ ReplicaInfo(const ReplicaInfo &other):
+ id(other.id), addr(other.addr),
+ pubkey(other.pubkey->clone()) {}
+
+ ReplicaInfo(ReplicaInfo &&other):
+ id(other.id), addr(other.addr),
+ pubkey(std::move(other.pubkey)) {}
+};
+
+class ReplicaConfig {
+ std::unordered_map<ReplicaID, ReplicaInfo> replica_map;
+
+ public:
+ size_t nmajority;
+
+ void add_replica(ReplicaID rid, const ReplicaInfo &info) {
+ replica_map.insert(std::make_pair(rid, info));
+ }
+
+ const ReplicaInfo &get_info(ReplicaID rid) const {
+ auto it = replica_map.find(rid);
+ if (it == replica_map.end())
+ throw HotStuffError("rid %s not found",
+ get_hex(rid).c_str());
+ return it->second;
+ }
+
+ const PubKey &get_pubkey(ReplicaID rid) const {
+ return *(get_info(rid).pubkey);
+ }
+
+ const salticidae::NetAddr &get_addr(ReplicaID rid) const {
+ return get_info(rid).addr;
+ }
+};
+
+class Block;
+class HotStuffCore;
+
+using block_t = salticidae::RcObj<Block>;
+using block_weak_t = salticidae::WeakObj<Block>;
+
+struct Finality: public Serializable {
+ int8_t decision;
+ uint256_t blk_hash;
+
+ public:
+ Finality(): decision(0) {}
+ Finality(int8_t decision, uint256_t blk_hash):
+ decision(decision), blk_hash(blk_hash) {}
+
+ void serialize(DataStream &s) const override {
+ s << decision;
+ if (decision == 1) s << blk_hash;
+ }
+
+ void unserialize(DataStream &s) override {
+ s >> decision;
+ if (decision == 1) s >> blk_hash;
+ }
+};
+
+class Command: public Serializable {
+ friend HotStuffCore;
+ block_weak_t container;
+ public:
+ virtual ~Command() = default;
+ virtual const uint256_t &get_hash() const = 0;
+ virtual bool verify() const = 0;
+ inline int8_t get_decision() const;
+ inline Finality get_finality() const;
+ block_t get_container() const {
+ return container;
+ }
+};
+
+using command_t = RcObj<Command>;
+
+template<typename Hashable>
+inline static std::vector<uint256_t>
+get_hashes(const std::vector<Hashable> &plist) {
+ std::vector<uint256_t> hashes;
+ for (const auto &p: plist)
+ hashes.push_back(p->get_hash());
+ return std::move(hashes);
+}
+
+class Block {
+ friend HotStuffCore;
+ std::vector<uint256_t> parent_hashes;
+ std::vector<command_t> cmds;
+ quorum_cert_bt qc;
+ uint256_t hash;
+
+ std::vector<block_t> parents;
+ block_t qc_ref;
+ quorum_cert_bt self_qc;
+ uint32_t height;
+ bool delivered;
+ int8_t decision;
+
+ std::unordered_set<ReplicaID> voted;
+
+ public:
+ Block():
+ qc(nullptr),
+ qc_ref(nullptr),
+ self_qc(nullptr), height(0),
+ delivered(false), decision(0) {}
+
+ Block(bool delivered, int8_t decision):
+ qc(nullptr),
+ hash(salticidae::get_hash(*this)),
+ qc_ref(nullptr),
+ self_qc(nullptr), height(0),
+ delivered(delivered), decision(decision) {}
+
+ Block(const std::vector<block_t> &parents,
+ const std::vector<command_t> &cmds,
+ uint32_t height,
+ quorum_cert_bt &&qc,
+ const block_t &qc_ref,
+ quorum_cert_bt &&self_qc,
+ int8_t decision = 0):
+ parent_hashes(get_hashes(parents)),
+ cmds(cmds),
+ qc(std::move(qc)),
+ hash(salticidae::get_hash(*this)),
+ parents(parents),
+ qc_ref(qc_ref),
+ self_qc(std::move(self_qc)),
+ height(height),
+ delivered(0),
+ decision(decision) {}
+
+ void serialize(DataStream &s) const;
+
+ void unserialize(DataStream &s, HotStuffCore *hsc);
+
+ const std::vector<command_t> &get_cmds() const {
+ return cmds;
+ }
+
+ const std::vector<block_t> &get_parents() const {
+ return parents;
+ }
+
+ const std::vector<uint256_t> &get_parent_hashes() const {
+ return parent_hashes;
+ }
+
+ const uint256_t &get_hash() const { return hash; }
+
+ bool verify(const ReplicaConfig &config) const {
+ if (qc && !qc->verify(config)) return false;
+ for (auto cmd: cmds)
+ if (!cmd->verify()) return false;
+ return true;
+ }
+
+ int8_t get_decision() const { return decision; }
+
+ bool is_delivered() const { return delivered; }
+
+ uint32_t get_height() const { return height; }
+
+ const quorum_cert_bt &get_qc() const { return qc; }
+
+ operator std::string () const {
+ DataStream s;
+ s << "<block "
+ << "id=" << get_hex10(hash) << " "
+ << "height=" << std::to_string(height) << " "
+ << "parent=" << get_hex10(parent_hashes[0]) << ">";
+ return std::string(std::move(s));
+ }
+};
+
+struct BlockHeightCmp {
+ bool operator()(const block_t &a, const block_t &b) const {
+ return a->get_height() < b->get_height();
+ }
+};
+
+int8_t Command::get_decision() const {
+ block_t cptr = container;
+ return cptr ? cptr->get_decision() : 0;
+}
+
+Finality Command::get_finality() const {
+ block_t blk = get_container();
+ return Finality(get_decision(),
+ blk ? blk->get_hash() : uint256_t());
+}
+
+class EntityStorage {
+ std::unordered_map<const uint256_t, block_t> blk_cache;
+ std::unordered_map<const uint256_t, command_t> cmd_cache;
+ public:
+ bool is_blk_delivered(const uint256_t &blk_hash) {
+ auto it = blk_cache.find(blk_hash);
+ if (it == blk_cache.end()) return false;
+ return it->second->is_delivered();
+ }
+
+ bool is_blk_fetched(const uint256_t &blk_hash) {
+ return blk_cache.count(blk_hash);
+ }
+
+ const block_t &add_blk(Block &&_blk) {
+ block_t blk = new Block(std::move(_blk));
+ return blk_cache.insert(std::make_pair(blk->get_hash(), blk)).first->second;
+ }
+
+ const block_t &add_blk(const block_t &blk) {
+ return blk_cache.insert(std::make_pair(blk->get_hash(), blk)).first->second;
+ }
+
+ block_t find_blk(const uint256_t &blk_hash) {
+ auto it = blk_cache.find(blk_hash);
+ return it == blk_cache.end() ? nullptr : it->second;
+ }
+
+ bool is_cmd_fetched(const uint256_t &cmd_hash) {
+ return cmd_cache.count(cmd_hash);
+ }
+
+ const command_t &add_cmd(const command_t &cmd) {
+ return cmd_cache.insert(std::make_pair(cmd->get_hash(), cmd)).first->second;
+ }
+
+ command_t find_cmd(const uint256_t &cmd_hash) {
+ auto it = cmd_cache.find(cmd_hash);
+ return it == cmd_cache.end() ? nullptr: it->second;
+ }
+
+ size_t get_cmd_cache_size() {
+ return cmd_cache.size();
+ }
+ size_t get_blk_cache_size() {
+ return blk_cache.size();
+ }
+
+ bool try_release_cmd(const command_t &cmd) {
+ if (cmd.get_cnt() == 2) /* only referred by cmd and the storage */
+ {
+ const auto &cmd_hash = cmd->get_hash();
+ cmd_cache.erase(cmd_hash);
+ return true;
+ }
+ return false;
+ }
+
+ bool try_release_blk(const block_t &blk) {
+ if (blk.get_cnt() == 2) /* only referred by blk and the storage */
+ {
+ const auto &blk_hash = blk->get_hash();
+#ifdef HOTSTUFF_ENABLE_LOG_PROTO
+ HOTSTUFF_LOG_INFO("releasing blk %.10s", get_hex(blk_hash).c_str());
+#endif
+ for (const auto &cmd: blk->get_cmds())
+ try_release_cmd(cmd);
+ blk_cache.erase(blk_hash);
+ return true;
+ }
+#ifdef HOTSTUFF_ENABLE_LOG_PROTO
+ else
+ HOTSTUFF_LOG_INFO("cannot release (%lu)", blk.get_cnt());
+#endif
+ return false;
+ }
+};
+
+}
+
+#endif
diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp
new file mode 100644
index 0000000..b10f103
--- /dev/null
+++ b/src/hotstuff.cpp
@@ -0,0 +1,287 @@
+#include <iostream>
+#include <cstring>
+#include <cassert>
+#include <algorithm>
+#include <random>
+#include <unistd.h>
+#include <signal.h>
+#include <event2/event.h>
+
+#include "salticidae/stream.h"
+#include "salticidae/util.h"
+#include "salticidae/network.h"
+#include "salticidae/msg.h"
+#include "promise.hpp"
+#include "type.h"
+#include "core.h"
+#include "entity.h"
+#include "util.h"
+#include "client.h"
+
+using promise::promise_t;
+using salticidae::NetAddr;
+using salticidae::MsgNetwork;
+using salticidae::ClientNetwork;
+using salticidae::Event;
+using salticidae::ElapsedTime;
+using salticidae::Config;
+using salticidae::_1;
+using salticidae::_2;
+using salticidae::static_pointer_cast;
+using salticidae::get_hash;
+using salticidae::trim_all;
+using salticidae::split;
+
+using hotstuff::HotStuffError;
+using hotstuff::CommandDummy;
+using hotstuff::Finality;
+using hotstuff::command_t;
+using hotstuff::uint256_t;
+using hotstuff::bytearray_t;
+using hotstuff::DataStream;
+using hotstuff::ReplicaID;
+using hotstuff::MsgClient;
+
+using HotStuff = hotstuff::HotStuffSecp256k1;
+
+#define LOG_INFO HOTSTUFF_LOG_INFO
+#define LOG_DEBUG HOTSTUFF_LOG_DEBUG
+#define LOG_WARN HOTSTUFF_LOG_WARN
+#define LOG_ERROR HOTSTUFF_LOG_ERROR
+
+class HotStuffApp;
+
+class HotStuffApp: public HotStuff {
+ double stat_period;
+ /** libevent handle */
+ EventContext eb;
+ /** network messaging between a replica and its client */
+ ClientNetwork<MsgClient> cn;
+ /** timer object to schedule a periodic printing of system statistics */
+ Event ev_stat_timer;
+ /** the binding address for client RPC */
+ NetAddr clisten_addr;
+
+ using conn_client_t = MsgNetwork<MsgClient>::conn_t;
+
+ /** Client */
+ /** submits a new command */
+ inline void client_request_cmd_handler(const MsgClient &, conn_client_t);
+ /** checks if a cmd is decided */
+ inline void client_check_cmd_handler(const MsgClient &, conn_client_t);
+
+ /** The callback function to print stat */
+ inline void print_stat_cb(evutil_socket_t, short);
+
+ command_t parse_cmd(DataStream &s) override {
+ auto cmd = new CommandDummy();
+ s >> *cmd;
+ return cmd;
+ }
+
+ public:
+ HotStuffApp(uint32_t blk_size,
+ int32_t parent_limit,
+ double stat_period,
+ ReplicaID idx,
+ const bytearray_t &raw_privkey,
+ NetAddr plisten_addr,
+ NetAddr clisten_addr,
+ const EventContext &eb);
+
+ void start();
+};
+
+
+std::pair<std::string, std::string> split_ip_port_cport(const std::string &s) {
+ auto ret = trim_all(split(s, ";"));
+ if (ret.size() != 2)
+ throw std::invalid_argument("invalid cport format");
+ return std::make_pair(ret[0], ret[1]);
+}
+
+void signal_handler(int) {
+ throw HotStuffError("got terminal signal");
+}
+
+BoxObj<HotStuffApp> papp = nullptr;
+
+int main(int argc, char **argv) {
+ Config config("hotstuff.conf");
+
+ ElapsedTime elapsed;
+ elapsed.start();
+
+ signal(SIGTERM, signal_handler);
+ signal(SIGINT, signal_handler);
+
+ auto opt_blk_size = Config::OptValInt::create(1);
+ auto opt_parent_limit = Config::OptValInt::create(-1);
+ auto opt_stat_period = Config::OptValDouble::create(10);
+ auto opt_replicas = Config::OptValStrVec::create();
+ auto opt_idx = Config::OptValInt::create(0);
+ auto opt_client_port = Config::OptValInt::create(-1);
+ auto opt_privkey = Config::OptValStr::create();
+ auto opt_help = Config::OptValFlag::create(false);
+
+ config.add_opt("block-size", opt_blk_size, Config::SET_VAL);
+ config.add_opt("parent-limit", opt_parent_limit, Config::SET_VAL);
+ config.add_opt("stat-period", opt_stat_period, Config::SET_VAL);
+ config.add_opt("replica", opt_replicas, Config::APPEND);
+ config.add_opt("idx", opt_idx, Config::SET_VAL);
+ config.add_opt("cport", opt_client_port, Config::SET_VAL);
+ config.add_opt("privkey", opt_privkey, Config::SET_VAL);
+ config.add_opt("help", opt_help, Config::SWITCH_ON, 'h', "show this help info");
+
+ EventContext eb;
+#ifndef HOTSTUFF_ENABLE_LOG_DEBUG
+ try {
+#endif
+ config.parse(argc, argv);
+ if (opt_help->get())
+ {
+ config.print_help();
+ exit(0);
+ }
+ auto idx = opt_idx->get();
+ auto client_port = opt_client_port->get();
+ std::vector<std::pair<std::string, std::string>> replicas;
+ for (const auto &s: opt_replicas->get())
+ {
+ auto res = trim_all(split(s, ","));
+ if (res.size() != 2)
+ throw HotStuffError("invalid replica info");
+ replicas.push_back(std::make_pair(res[0], res[1]));
+ }
+
+ if (!(0 <= idx && (size_t)idx < replicas.size()))
+ throw HotStuffError("replica idx out of range");
+ std::string binding_addr = replicas[idx].first;
+ if (client_port == -1)
+ {
+ auto p = split_ip_port_cport(binding_addr);
+ size_t idx;
+ try {
+ client_port = stoi(p.second, &idx);
+ } catch (std::invalid_argument &) {
+ throw HotStuffError("client port not specified");
+ }
+ }
+
+ NetAddr plisten_addr{split_ip_port_cport(binding_addr).first};
+
+ papp = new HotStuffApp(opt_blk_size->get(),
+ opt_parent_limit->get(),
+ opt_stat_period->get(),
+ idx,
+ hotstuff::from_hex(opt_privkey->get()),
+ plisten_addr,
+ NetAddr("0.0.0.0", client_port),
+ eb);
+ for (size_t i = 0; i < replicas.size(); i++)
+ {
+ auto p = split_ip_port_cport(replicas[i].first);
+ papp->add_replica(i, NetAddr(p.first),
+ hotstuff::from_hex(replicas[i].second));
+ }
+ papp->start();
+#ifndef HOTSTUFF_ENABLE_LOG_DEBUG
+ } catch (std::exception &e) {
+ HOTSTUFF_LOG_INFO("exception: %s", e.what());
+ elapsed.stop(true);
+ }
+#endif
+ return 0;
+}
+
+HotStuffApp::HotStuffApp(uint32_t blk_size,
+ int32_t parent_limit,
+ double stat_period,
+ ReplicaID idx,
+ const bytearray_t &raw_privkey,
+ NetAddr plisten_addr,
+ NetAddr clisten_addr,
+ const EventContext &eb):
+ HotStuff(blk_size, parent_limit, idx, raw_privkey,
+ plisten_addr, eb),
+ stat_period(stat_period),
+ eb(eb),
+ cn(eb),
+ clisten_addr(clisten_addr) {
+ /* register the handlers for msg from clients */
+ cn.reg_handler(hotstuff::REQ_CMD, std::bind(&HotStuffApp::client_request_cmd_handler, this, _1, _2));
+ cn.reg_handler(hotstuff::CHK_CMD, std::bind(&HotStuffApp::client_check_cmd_handler, this, _1, _2));
+ cn.init(clisten_addr);
+}
+
+void HotStuffApp::client_request_cmd_handler(const MsgClient &msg, conn_client_t conn_) {
+ auto conn = static_pointer_cast<ClientNetwork<MsgClient>::Conn>(conn_);
+ const NetAddr addr = conn->get_addr();
+ command_t cmd = new CommandDummy();
+ std::vector<promise_t> pms;
+ msg.parse_reqcmd(static_cast<CommandDummy &>(*cmd));
+
+ bool flag = true;
+#ifndef HOTSTUFF_DISABLE_TX_VERIFY
+ flag &= cmd->verify();
+#endif
+ if (!flag)
+ {
+ LOG_WARN("invalid client cmd");
+ MsgClient resp;
+ resp.gen_respcmd(cmd->get_hash(), Finality(-1, uint256_t()));
+ cn.send_msg(resp, addr);
+ }
+ else
+ {
+ const uint256_t cmd_hash = cmd->get_hash();
+ add_command(cmd);
+ /** wait for the decision of tx */
+ LOG_DEBUG("processing client cmd %.10s", get_hex(cmd_hash).c_str());
+ async_decide(cmd_hash).then([this, addr](command_t cmd) {
+ MsgClient resp;
+ resp.gen_respcmd(cmd->get_hash(), cmd->get_finality());
+ cn.send_msg(resp, addr);
+ });
+ }
+}
+
+void HotStuffApp::client_check_cmd_handler(const MsgClient &msg, conn_client_t conn_) {
+ auto conn = static_pointer_cast<ClientNetwork<MsgClient>::Conn>(conn_);
+ const NetAddr addr = conn->get_addr();
+ uint256_t cmd_hash;
+ msg.parse_chkcmd(cmd_hash);
+ MsgClient resp;
+ command_t cmd = storage->find_cmd(cmd_hash);
+ Finality fin;
+ if (cmd) fin = cmd->get_finality();
+ resp.gen_respcmd(cmd_hash, fin);
+ cn.send_msg(resp, addr);
+}
+
+
+void HotStuffApp::start() {
+ ev_stat_timer = Event(eb, -1, 0,
+ std::bind(&HotStuffApp::print_stat_cb, this, _1, _2));
+ ev_stat_timer.add_with_timeout(stat_period);
+ LOG_INFO("** starting the system with parameters **");
+ LOG_INFO("blk_size = %lu", blk_size);
+ LOG_INFO("parent_limit = %d", parent_limit);
+ LOG_INFO("conns = %lu", HotStuff::size());
+ LOG_INFO("** starting the event loop...");
+#ifdef HOTSTUFF_DISABLE_TX_VERIFY
+ LOG_INFO("!! verification disabled !!");
+#else
+ LOG_INFO("** verification enabled **");
+#endif
+ HotStuff::start();
+ /* enter the event main loop */
+ eb.dispatch();
+}
+
+
+void HotStuffApp::print_stat_cb(evutil_socket_t, short) {
+ HotStuff::print_stat();
+ HotStuffCore::prune(100);
+ ev_stat_timer.add_with_timeout(stat_period);
+}
diff --git a/src/hotstuff_client.cpp b/src/hotstuff_client.cpp
new file mode 100644
index 0000000..5c04e5f
--- /dev/null
+++ b/src/hotstuff_client.cpp
@@ -0,0 +1,181 @@
+#include <cassert>
+#include "salticidae/type.h"
+#include "salticidae/netaddr.h"
+#include "salticidae/network.h"
+#include "salticidae/util.h"
+#include "util.h"
+#include "type.h"
+#include "client.h"
+
+using salticidae::NetAddr;
+using salticidae::Config;
+using salticidae::ElapsedTime;
+using salticidae::EventContext;
+using salticidae::Event;
+using salticidae::bytearray_t;
+using salticidae::trim_all;
+using salticidae::split;
+
+using hotstuff::uint256_t;
+using hotstuff::MsgClient;
+using hotstuff::CommandDummy;
+using hotstuff::Finality;
+
+size_t max_async_num = 10;
+int max_iter_num = 100;
+
+struct Request {
+ ElapsedTime et;
+ Request() { et.start(); }
+};
+
+std::unordered_map<int, salticidae::RingBuffer> buffers;
+std::unordered_map<const uint256_t, Request> waiting;
+
+int connect(const NetAddr &node) {
+ int fd;
+ if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1)
+ assert(0);
+ struct sockaddr_in sockin;
+ memset(&sockin, 0, sizeof(struct sockaddr_in));
+ sockin.sin_family = AF_INET;
+ sockin.sin_addr.s_addr = node.ip;
+ sockin.sin_port = node.port;
+ if (connect(fd, (struct sockaddr *)&sockin, sizeof(struct sockaddr_in)) == -1)
+ assert(0);
+ HOTSTUFF_LOG_INFO("connected to %s", std::string(node).c_str());
+ return fd;
+}
+
+void write_msg(int fd, const MsgClient &msg) {
+ bytearray_t msg_data = msg.serialize();
+ if (write(fd, msg_data.data(), msg_data.size()) != (ssize_t)msg_data.size())
+ assert(0);
+}
+
+void read_msg(int fd, MsgClient &msg) {
+ static const size_t BUFF_SEG_SIZE = 1024;
+ ssize_t ret;
+ auto &buffer = buffers[fd];
+ bool read_body = false;
+ for (;;)
+ {
+ bytearray_t buff_seg;
+ if (!read_body && buffer.size() >= MsgClient::header_size)
+ {
+ buff_seg = buffer.pop(MsgClient::header_size);
+ msg = MsgClient(buff_seg.data());
+ read_body = true;
+ }
+ if (read_body && buffer.size() >= msg.get_length())
+ {
+ buff_seg = buffer.pop(msg.get_length());
+ msg.set_payload(std::move(buff_seg));
+ return;
+ }
+
+ buff_seg.resize(BUFF_SEG_SIZE);
+ ret = read(fd, buff_seg.data(), BUFF_SEG_SIZE);
+ assert(ret != -1);
+ if (ret > 0)
+ {
+ buff_seg.resize(ret);
+ buffer.push(std::move(buff_seg));
+ }
+ }
+}
+
+void try_send(int fd) {
+ while (waiting.size() < max_async_num && max_iter_num)
+ {
+ auto cmd = CommandDummy::make_cmd();
+ MsgClient msg;
+ msg.gen_reqcmd(*cmd);
+ write_msg(fd, msg);
+ HOTSTUFF_LOG_INFO("send new cmd %.10s",
+ get_hex(cmd->get_hash()).c_str());
+ waiting.insert(std::make_pair(
+ cmd->get_hash(), Request()));
+ if (max_iter_num > 0)
+ max_iter_num--;
+ }
+}
+
+void on_receive(int fd) {
+ MsgClient msg;
+ uint256_t cmd_hash;
+ Finality fin;
+ read_msg(fd, msg);
+ HOTSTUFF_LOG_DEBUG("got %s", std::string(msg).c_str());
+ if (!msg.verify_checksum())
+ HOTSTUFF_LOG_ERROR("incorrect checksum %08x", msg.get_checksum());
+ msg.parse_respcmd(cmd_hash, fin);
+ HOTSTUFF_LOG_INFO(
+ "fd %d got response for %.10s: <decision=%d, blk=%.10s>",
+ fd, get_hex(cmd_hash).c_str(),
+ fin.decision,
+ get_hex(fin.blk_hash).c_str());
+ auto it = waiting.find(cmd_hash);
+ if (it == waiting.end()) return;
+ waiting.erase(it);
+ try_send(fd);
+}
+
+std::pair<std::string, std::string> split_ip_port_cport(const std::string &s) {
+ auto ret = trim_all(split(s, ";"));
+ return std::make_pair(ret[0], ret[1]);
+}
+
+Event *on_receive_ev;
+
+int main(int argc, char **argv) {
+ Config config("hotstuff.conf");
+ std::vector<NetAddr> peers2;
+ EventContext eb;
+ auto opt_idx = Config::OptValInt::create(-1);
+ auto opt_server_addr = Config::OptValStr::create("127.0.0.1:2234");
+ auto opt_replicas = Config::OptValStrVec::create();
+ auto opt_max_iter_num = Config::OptValInt::create();
+
+ try {
+ config.add_opt("idx", opt_idx, Config::SET_VAL);
+ config.add_opt("server", opt_server_addr, Config::SET_VAL);
+ config.add_opt("replica", opt_replicas, Config::APPEND);
+ config.add_opt("ntx", opt_max_iter_num, Config::SET_VAL);
+ config.parse(argc, argv);
+ auto idx = opt_idx->get();
+ max_iter_num = opt_max_iter_num->get();
+ std::vector<std::pair<std::string, std::string>> replicas;
+ for (const auto &s: opt_replicas->get())
+ {
+ auto res = trim_all(split(s, ","));
+ assert(res.size() == 2);
+ replicas.push_back(std::make_pair(res[0], res[1]));
+ }
+
+ NetAddr server(opt_server_addr->get());
+ if (-1 < idx && (size_t)idx < replicas.size() &&
+ replicas.size() > 0)
+ {
+ for (const auto &p: replicas)
+ {
+ auto _p = split_ip_port_cport(p.first);
+ size_t _;
+ peers2.push_back(NetAddr(NetAddr(_p.first).ip, htons(stoi(_p.second, &_))));
+ }
+ server = peers2[idx];
+ }
+
+ int fd = connect(server);
+ on_receive_ev = new Event{eb, fd, EV_READ, [](int fd, short) {
+ on_receive(fd);
+ on_receive_ev->add();
+ }};
+ on_receive_ev->add();
+ try_send(fd);
+ eb.dispatch();
+ } catch (hotstuff::HotStuffError &e) {
+ HOTSTUFF_LOG_ERROR("exception: %s", std::string(e).c_str());
+ }
+ return 0;
+}
diff --git a/src/hotstuff_keygen.cpp b/src/hotstuff_keygen.cpp
new file mode 100644
index 0000000..7a7e615
--- /dev/null
+++ b/src/hotstuff_keygen.cpp
@@ -0,0 +1,33 @@
+#include <error.h>
+#include "salticidae/util.h"
+#include "crypto.h"
+
+using salticidae::Config;
+using hotstuff::privkey_bt;
+using hotstuff::pubkey_bt;
+
+int main(int argc, char **argv) {
+ Config config("hotstuff.conf");
+ privkey_bt priv_key;
+ auto opt_n = Config::OptValInt::create(1);
+ auto opt_algo = Config::OptValStr::create("secp256k1");
+ config.add_opt("num", opt_n, Config::SET_VAL);
+ config.add_opt("algo", opt_algo, Config::SET_VAL);
+ config.parse(argc, argv);
+ auto &algo = opt_algo->get();
+ if (algo == "secp256k1")
+ priv_key = new hotstuff::PrivKeySecp256k1();
+ else
+ error(1, 0, "algo not supported");
+ int n = opt_n->get();
+ if (n < 1)
+ error(1, 0, "n must be >0");
+ while (n--)
+ {
+ priv_key->from_rand();
+ pubkey_bt pub_key = priv_key->get_pubkey();
+ printf("pub:%s sec:%s\n", get_hex(*pub_key).c_str(),
+ get_hex(*priv_key).c_str());
+ }
+ return 0;
+}
diff --git a/src/promise.hpp b/src/promise.hpp
new file mode 100644
index 0000000..593d5c1
--- /dev/null
+++ b/src/promise.hpp
@@ -0,0 +1,745 @@
+#ifndef _CPPROMISE_HPP
+#define _CPPROMISE_HPP
+
+/**
+ * MIT License
+ * Copyright (c) 2018 Ted Yin <[email protected]>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#include <stack>
+#include <vector>
+#include <memory>
+#include <functional>
+#include <type_traits>
+
+#if __cplusplus >= 201703L
+#ifdef __has_include
+# if __has_include(<any>)
+# include <any>
+# ifdef __cpp_lib_any
+# define _CPPROMISE_STD_ANY
+# endif
+# endif
+#endif
+#endif
+
+#ifndef _CPPROMISE_STD_ANY
+#include <boost/any.hpp>
+#endif
+
+/**
+ * Implement type-safe Promise primitives similar to the ones specified by
+ * Javascript Promise/A+.
+ */
+namespace promise {
+#ifdef _CPPROMISE_STD_ANY
+ using pm_any_t = std::any;
+ template<typename T>
+ constexpr auto any_cast = static_cast<T(*)(const std::any&)>(std::any_cast<T>);
+ using bad_any_cast = std::bad_any_cast;
+#else
+# warning "using boost::any"
+# pragma message "using boost::any"
+ using pm_any_t = boost::any;
+ template<typename T>
+ constexpr auto any_cast = static_cast<T(*)(const boost::any&)>(boost::any_cast<T>);
+ using bad_any_cast = boost::bad_any_cast;
+#endif
+ using callback_t = std::function<void()>;
+ using values_t = std::vector<pm_any_t>;
+
+ /* match lambdas */
+ template<typename T>
+ struct function_traits:
+ public function_traits<decltype(&T::operator())> {};
+
+ template<typename ReturnType>
+ struct function_traits<ReturnType()> {
+ using ret_type = ReturnType;
+ using arg_type = void;
+ using empty_arg = void;
+ };
+
+ /* match plain functions */
+ template<typename ReturnType, typename ArgType>
+ struct function_traits<ReturnType(ArgType)> {
+ using ret_type = ReturnType;
+ using arg_type = ArgType;
+ using non_empty_arg = void;
+ };
+
+ /* match function pointers */
+ template<typename ReturnType, typename... ArgType>
+ struct function_traits<ReturnType(*)(ArgType...)>:
+ public function_traits<ReturnType(ArgType...)> {};
+
+ /* match const member functions */
+ template<typename ClassType, typename ReturnType, typename... ArgType>
+ struct function_traits<ReturnType(ClassType::*)(ArgType...) const>:
+ public function_traits<ReturnType(ArgType...)> {};
+
+ /* match member functions */
+ template<typename ClassType, typename ReturnType, typename... ArgType>
+ struct function_traits<ReturnType(ClassType::*)(ArgType...)>:
+ public function_traits<ReturnType(ArgType...)> {};
+
+ template<typename Func, typename ReturnType>
+ using enable_if_return = typename std::enable_if<
+ std::is_same<typename function_traits<Func>::ret_type,
+ ReturnType>::value>;
+
+ template<typename Func, typename ReturnType>
+ using disable_if_return = typename std::enable_if<
+ !std::is_same<typename function_traits<Func>::ret_type,
+ ReturnType>::value>;
+
+ template<typename Func, typename ArgType>
+ using enable_if_arg = typename std::enable_if<
+ std::is_same<typename function_traits<Func>::arg_type,
+ ArgType>::value>;
+
+ template<typename Func, typename ArgType>
+ using disable_if_arg = typename std::enable_if<
+ !std::is_same<typename function_traits<Func>::arg_type,
+ ArgType>::value>;
+
+ class Promise;
+ //class promise_t: public std::shared_ptr<Promise> {
+ class promise_t {
+ Promise *pm;
+ size_t *ref_cnt;
+ public:
+ friend Promise;
+ template<typename PList> friend promise_t all(const PList &promise_list);
+ template<typename PList> friend promise_t race(const PList &promise_list);
+
+ inline promise_t();
+ inline ~promise_t();
+ template<typename Func> inline promise_t(Func callback);
+
+ void swap(promise_t &other) {
+ std::swap(pm, other.pm);
+ std::swap(ref_cnt, other.ref_cnt);
+ }
+
+ promise_t &operator=(const promise_t &other) {
+ if (this != &other)
+ {
+ promise_t tmp(other);
+ tmp.swap(*this);
+ }
+ return *this;
+ }
+
+ promise_t &operator=(promise_t &&other) {
+ if (this != &other)
+ {
+ promise_t tmp(std::move(other));
+ tmp.swap(*this);
+ }
+ return *this;
+ }
+
+ promise_t(const promise_t &other):
+ pm(other.pm),
+ ref_cnt(other.ref_cnt) {
+ ++*ref_cnt;
+ }
+
+ promise_t(promise_t &&other):
+ pm(other.pm),
+ ref_cnt(other.ref_cnt) {
+ other.pm = nullptr;
+ }
+
+ Promise *operator->() const {
+ return pm;
+ }
+
+ template<typename T> inline void resolve(T result) const;
+ template<typename T> inline void reject(T reason) const;
+ inline void resolve() const;
+ inline void reject() const;
+
+ template<typename FuncFulfilled>
+ inline promise_t then(FuncFulfilled on_fulfilled) const;
+
+ template<typename FuncFulfilled, typename FuncRejected>
+ inline promise_t then(FuncFulfilled on_fulfilled,
+ FuncRejected on_rejected) const;
+
+ template<typename FuncRejected>
+ inline promise_t fail(FuncRejected on_rejected) const;
+ };
+
+#define PROMISE_ERR_INVALID_STATE do {throw std::runtime_error("invalid promise state");} while (0)
+#define PROMISE_ERR_MISMATCH_TYPE do {throw std::runtime_error("mismatching promise value types");} while (0)
+
+ class Promise {
+ template<typename PList> friend promise_t all(const PList &promise_list);
+ template<typename PList> friend promise_t race(const PList &promise_list);
+ std::vector<callback_t> fulfilled_callbacks;
+ std::vector<callback_t> rejected_callbacks;
+#ifdef CPPROMISE_USE_STACK_FREE
+ std::vector<Promise *> fulfilled_pms;
+ std::vector<Promise *> rejected_pms;
+#endif
+ enum class State {
+ Pending,
+#ifdef CPPROMISE_USE_STACK_FREE
+ PreFulfilled,
+ PreRejected,
+#endif
+ Fulfilled,
+ Rejected,
+ } state;
+ pm_any_t result;
+ pm_any_t reason;
+
+ void add_on_fulfilled(callback_t &&cb) {
+ fulfilled_callbacks.push_back(std::move(cb));
+ }
+
+ void add_on_rejected(callback_t &&cb) {
+ rejected_callbacks.push_back(std::move(cb));
+ }
+
+ template<typename Func,
+ typename function_traits<Func>::non_empty_arg * = nullptr>
+ static constexpr auto cps_transform(
+ Func f, const pm_any_t &result, const promise_t &npm) {
+ return [&result, npm, f = std::forward<Func>(f)]() mutable {
+#ifndef CPPROMISE_USE_STACK_FREE
+ f(result)->then(
+ [npm] (pm_any_t result) {npm->resolve(result);},
+ [npm] (pm_any_t reason) {npm->reject(reason);});
+#else
+ promise_t rpm{f(result)};
+ rpm->then(
+ [rpm, npm] (pm_any_t result) {
+ npm->_resolve(result);
+ },
+ [rpm, npm] (pm_any_t reason) {
+ npm->_reject(reason);
+ });
+ rpm->_dep_resolve(npm);
+ rpm->_dep_reject(npm);
+#endif
+ };
+ }
+
+ template<typename Func,
+ typename function_traits<Func>::empty_arg * = nullptr>
+ static constexpr auto cps_transform(
+ Func f, const pm_any_t &, const promise_t &npm) {
+ return [npm, f = std::forward<Func>(f)]() mutable {
+#ifndef CPPROMISE_USE_STACK_FREE
+ f()->then(
+ [npm] (pm_any_t result) {npm->resolve(result);},
+ [npm] (pm_any_t reason) {npm->reject(reason);});
+#else
+ promise_t rpm{f()};
+ rpm->then(
+ [rpm, npm] (pm_any_t result) {
+ npm->_resolve(result);
+ },
+ [rpm, npm] (pm_any_t reason) {
+ npm->_reject(reason);
+ });
+ rpm->_dep_resolve(npm);
+ rpm->_dep_reject(npm);
+#endif
+ };
+ }
+
+ template<typename Func,
+ typename enable_if_return<Func, promise_t>::type * = nullptr>
+ constexpr auto gen_on_fulfilled(Func on_fulfilled, const promise_t &npm) {
+ return cps_transform(std::forward<Func>(on_fulfilled), this->result, npm);
+ }
+
+ template<typename Func,
+ typename enable_if_return<Func, promise_t>::type * = nullptr>
+ constexpr auto gen_on_rejected(Func on_rejected, const promise_t &npm) {
+ return cps_transform(std::forward<Func>(on_rejected), this->reason, npm);
+ }
+
+
+ template<typename Func,
+ typename enable_if_return<Func, void>::type * = nullptr,
+ typename function_traits<Func>::non_empty_arg * = nullptr>
+ constexpr auto gen_on_fulfilled(Func on_fulfilled, const promise_t &npm) {
+ return [this, npm,
+ on_fulfilled = std::forward<Func>(on_fulfilled)]() mutable {
+ on_fulfilled(result);
+ npm->_resolve();
+ };
+ }
+
+ template<typename Func,
+ typename enable_if_return<Func, void>::type * = nullptr,
+ typename function_traits<Func>::empty_arg * = nullptr>
+ constexpr auto gen_on_fulfilled(Func on_fulfilled, const promise_t &npm) {
+ return [on_fulfilled = std::forward<Func>(on_fulfilled), npm]() mutable {
+ on_fulfilled();
+ npm->_resolve();
+ };
+ }
+
+ template<typename Func,
+ typename enable_if_return<Func, void>::type * = nullptr,
+ typename function_traits<Func>::non_empty_arg * = nullptr>
+ constexpr auto gen_on_rejected(Func on_rejected, const promise_t &npm) {
+ return [this, npm,
+ on_rejected = std::forward<Func>(on_rejected)]() mutable {
+ on_rejected(reason);
+ npm->_reject();
+ };
+ }
+
+ template<typename Func,
+ typename enable_if_return<Func, void>::type * = nullptr,
+ typename function_traits<Func>::empty_arg * = nullptr>
+ constexpr auto gen_on_rejected(Func on_rejected, const promise_t &npm) {
+ return [npm,
+ on_rejected = std::forward<Func>(on_rejected)]() mutable {
+ on_rejected();
+ npm->_reject();
+ };
+ }
+
+ template<typename Func,
+ typename enable_if_return<Func, pm_any_t>::type * = nullptr,
+ typename function_traits<Func>::non_empty_arg * = nullptr>
+ constexpr auto gen_on_fulfilled(Func on_fulfilled, const promise_t &npm) {
+ return [this, npm,
+ on_fulfilled = std::forward<Func>(on_fulfilled)]() mutable {
+ npm->_resolve(on_fulfilled(result));
+ };
+ }
+
+ template<typename Func,
+ typename enable_if_return<Func, pm_any_t>::type * = nullptr,
+ typename function_traits<Func>::empty_arg * = nullptr>
+ constexpr auto gen_on_fulfilled(Func on_fulfilled, const promise_t &npm) {
+ return [npm, on_fulfilled = std::forward<Func>(on_fulfilled)]() mutable {
+ npm->_resolve(on_fulfilled());
+ };
+ }
+
+ template<typename Func,
+ typename enable_if_return<Func, pm_any_t>::type * = nullptr,
+ typename function_traits<Func>::non_empty_arg * = nullptr>
+ constexpr auto gen_on_rejected(Func on_rejected, const promise_t &npm) {
+ return [this, npm, on_rejected = std::forward<Func>(on_rejected)]() mutable {
+ npm->_reject(on_rejected(reason));
+ };
+ }
+
+ template<typename Func,
+ typename enable_if_return<Func, pm_any_t>::type * = nullptr,
+ typename function_traits<Func>::empty_arg * = nullptr>
+ constexpr auto gen_on_rejected(Func on_rejected, const promise_t &npm) {
+ return [npm, on_rejected = std::forward<Func>(on_rejected)]() mutable {
+ npm->_reject(on_rejected());
+ };
+ }
+
+#ifdef CPPROMISE_USE_STACK_FREE
+ void _trigger() {
+ std::stack<std::tuple<
+ std::vector<Promise *>::const_iterator,
+ std::vector<Promise *> *,
+ Promise *>> s;
+ auto push_frame = [&s](Promise *pm) {
+ if (pm->state == State::PreFulfilled)
+ {
+ pm->state = State::Fulfilled;
+ for (auto &cb: pm->fulfilled_callbacks) cb();
+ s.push(std::make_tuple(pm->fulfilled_pms.begin(),
+ &pm->fulfilled_pms,
+ pm));
+ }
+ else if (pm->state == State::PreRejected)
+ {
+ pm->state = State::Rejected;
+ for (auto &cb: pm->rejected_callbacks) cb();
+ s.push(std::make_tuple(pm->rejected_pms.begin(),
+ &pm->rejected_pms,
+ pm));
+ }
+ };
+ push_frame(this);
+ while (!s.empty())
+ {
+ auto &u = s.top();
+ auto &it = std::get<0>(u);
+ auto vec = std::get<1>(u);
+ auto pm = std::get<2>(u);
+ if (it == vec->end())
+ {
+ s.pop();
+ vec->clear();
+ pm->fulfilled_callbacks.clear();
+ pm->rejected_callbacks.clear();
+ continue;
+ }
+ push_frame(*it++);
+ }
+ }
+
+ void trigger_fulfill() {
+ state = State::PreFulfilled;
+ _trigger();
+ }
+
+ void trigger_reject() {
+ state = State::PreRejected;
+ _trigger();
+ }
+
+ void _resolve() {
+ if (state == State::Pending) state = State::PreFulfilled;
+ }
+
+ void _reject() {
+ if (state == State::Pending) state = State::PreRejected;
+ }
+
+ void _dep_resolve(const promise_t &npm) {
+ if (state == State::Pending)
+ fulfilled_pms.push_back(npm.pm);
+ else
+ npm->_trigger();
+ }
+
+ void _dep_reject(const promise_t &npm) {
+ if (state == State::Pending)
+ rejected_pms.push_back(npm.pm);
+ else
+ npm->_trigger();
+ }
+
+ void _resolve(pm_any_t _result) {
+ if (state == State::Pending)
+ {
+ result = _result;
+ state = State::PreFulfilled;
+ }
+ }
+
+ void _reject(pm_any_t _reason) {
+ if (state == State::Pending)
+ {
+ reason = _reason;
+ state = State::PreRejected;
+ }
+ }
+#else
+ void _resolve() { resolve(); }
+ void _reject() { reject(); }
+ void _resolve(pm_any_t result) { resolve(result); }
+ void _reject(pm_any_t reason) { reject(reason); }
+
+ void trigger_fulfill() {
+ state = State::Fulfilled;
+ for (const auto &cb: fulfilled_callbacks) cb();
+ fulfilled_callbacks.clear();
+ }
+
+ void trigger_reject() {
+ state = State::Rejected;
+ for (const auto &cb: rejected_callbacks) cb();
+ rejected_callbacks.clear();
+ }
+#endif
+ public:
+
+ Promise(): state(State::Pending) {}
+ ~Promise() {}
+
+ template<typename FuncFulfilled, typename FuncRejected>
+ promise_t then(FuncFulfilled on_fulfilled,
+ FuncRejected on_rejected) {
+ switch (state)
+ {
+ case State::Pending:
+ return promise_t([this,
+ on_fulfilled = std::forward<FuncFulfilled>(on_fulfilled),
+ on_rejected = std::forward<FuncRejected>(on_rejected)
+ ](promise_t &npm) {
+ add_on_fulfilled(gen_on_fulfilled(std::move(on_fulfilled), npm));
+ add_on_rejected(gen_on_rejected(std::move(on_rejected), npm));
+#ifdef CPPROMISE_USE_STACK_FREE
+ _dep_resolve(npm);
+ _dep_reject(npm);
+#endif
+ });
+ case State::Fulfilled:
+ return promise_t([this,
+ on_fulfilled = std::forward<FuncFulfilled>(on_fulfilled)
+ ](promise_t &npm) {
+ gen_on_fulfilled(std::move(on_fulfilled), npm)();
+ });
+ case State::Rejected:
+ return promise_t([this,
+ on_rejected = std::forward<FuncRejected>(on_rejected)
+ ](promise_t &npm) {
+ gen_on_rejected(std::move(on_rejected), npm)();
+ });
+ default: PROMISE_ERR_INVALID_STATE;
+ }
+ }
+
+ template<typename FuncFulfilled>
+ promise_t then(FuncFulfilled &&on_fulfilled) {
+ switch (state)
+ {
+ case State::Pending:
+ return promise_t([this,
+ on_fulfilled = std::forward<FuncFulfilled>(on_fulfilled)
+ ](promise_t &npm) {
+ add_on_fulfilled(gen_on_fulfilled(std::move(on_fulfilled), npm));
+ add_on_rejected([this, npm]() {npm->_reject(reason);});
+#ifdef CPPROMISE_USE_STACK_FREE
+ _dep_resolve(npm);
+ _dep_reject(npm);
+#endif
+ });
+ case State::Fulfilled:
+ return promise_t([this,
+ on_fulfilled = std::forward<FuncFulfilled>(on_fulfilled)
+ ](promise_t &npm) {
+ gen_on_fulfilled(std::move(on_fulfilled), npm)();
+ });
+ case State::Rejected:
+ return promise_t([this](promise_t &npm) {npm->_reject(reason);});
+ default: PROMISE_ERR_INVALID_STATE;
+ }
+ }
+
+ template<typename FuncRejected>
+ promise_t fail(FuncRejected &&on_rejected) {
+ switch (state)
+ {
+ case State::Pending:
+ return promise_t([this,
+ on_rejected = std::forward<FuncRejected>(on_rejected)
+ ](promise_t &npm) {
+ callback_t ret;
+ add_on_rejected(gen_on_rejected(std::move(on_rejected), npm));
+ add_on_fulfilled([this, npm]() {npm->_resolve(result);});
+#ifdef CPPROMISE_USE_STACK_FREE
+ _dep_resolve(npm);
+ _dep_reject(npm);
+#endif
+ });
+ case State::Fulfilled:
+ return promise_t([this](promise_t &npm) {npm->_resolve(result);});
+ case State::Rejected:
+ return promise_t([this,
+ on_rejected = std::forward<FuncRejected>(on_rejected)
+ ](promise_t &npm) {
+ gen_on_rejected(std::move(on_rejected), npm)();
+ });
+ default: PROMISE_ERR_INVALID_STATE;
+ }
+ }
+
+ void resolve() {
+ if (state == State::Pending) trigger_fulfill();
+ }
+
+ void reject() {
+ if (state == State::Pending) trigger_reject();
+ }
+
+ void resolve(pm_any_t _result) {
+ if (state == State::Pending)
+ {
+ result = _result;
+ trigger_fulfill();
+ }
+ }
+
+ void reject(pm_any_t _reason) {
+ if (state == State::Pending)
+ {
+ reason = _reason;
+ trigger_reject();
+ }
+ }
+ };
+
+ template<typename PList> promise_t all(const PList &promise_list) {
+ return promise_t([&promise_list] (promise_t &npm) {
+ auto size = std::make_shared<size_t>(promise_list.size());
+ auto results = std::make_shared<values_t>();
+ if (!*size) PROMISE_ERR_MISMATCH_TYPE;
+ results->resize(*size);
+ size_t idx = 0;
+ for (const auto &pm: promise_list) {
+ pm->then(
+ [results, size, idx, npm](pm_any_t result) {
+ (*results)[idx] = result;
+ if (!--(*size))
+ npm->_resolve(*results);
+ },
+ [npm](pm_any_t reason) {npm->_reject(reason);});
+#ifdef CPPROMISE_USE_STACK_FREE
+ pm->_dep_resolve(npm);
+ pm->_dep_reject(npm);
+#endif
+ idx++;
+ }
+ });
+ }
+
+ template<typename PList> promise_t race(const PList &promise_list) {
+ return promise_t([&promise_list] (promise_t &npm) {
+ for (const auto &pm: promise_list) {
+ pm->then([npm](pm_any_t result) {npm->_resolve(result);},
+ [npm](pm_any_t reason) {npm->_reject(reason);});
+#ifdef CPPROMISE_USE_STACK_FREE
+ pm->_dep_resolve(npm);
+ pm->_dep_reject(npm);
+#endif
+ }
+ });
+ }
+
+ template<typename Func>
+ inline promise_t::promise_t(Func callback):
+ pm(new Promise()),
+ ref_cnt(new size_t(1)) {
+ callback(*this);
+ }
+
+ inline promise_t::promise_t():
+ pm(new Promise()),
+ ref_cnt(new size_t(1)) {}
+
+ inline promise_t::~promise_t() {
+ if (pm)
+ {
+ if (--*ref_cnt) return;
+ delete pm;
+ delete ref_cnt;
+ }
+ }
+
+ template<typename T>
+ inline void promise_t::resolve(T result) const { (*this)->resolve(result); }
+
+ template<typename T>
+ inline void promise_t::reject(T reason) const { (*this)->reject(reason); }
+
+ inline void promise_t::resolve() const { (*this)->resolve(); }
+ inline void promise_t::reject() const { (*this)->reject(); }
+
+ template<typename T>
+ struct callback_types {
+ using arg_type = typename function_traits<T>::arg_type;
+ using ret_type = typename std::conditional<
+ std::is_same<typename function_traits<T>::ret_type, promise_t>::value,
+ promise_t, pm_any_t>::type;
+ };
+
+ template<typename Func,
+ typename disable_if_arg<Func, pm_any_t>::type * = nullptr,
+ typename enable_if_return<Func, void>::type * = nullptr,
+ typename function_traits<Func>::non_empty_arg * = nullptr>
+ constexpr auto gen_any_callback(Func f) {
+ using func_t = callback_types<Func>;
+ return [f = std::forward<Func>(f)](pm_any_t v) mutable {
+ try {
+ f(any_cast<typename func_t::arg_type>(v));
+ } catch (bad_any_cast &e) { PROMISE_ERR_MISMATCH_TYPE; }
+ };
+ }
+
+ template<typename Func,
+ typename enable_if_arg<Func, pm_any_t>::type * = nullptr,
+ typename enable_if_return<Func, void>::type * = nullptr,
+ typename function_traits<Func>::non_empty_arg * = nullptr>
+ constexpr auto gen_any_callback(Func f) {
+ return [f = std::forward<Func>(f)](pm_any_t v) mutable {f(v);};
+ }
+
+ template<typename Func,
+ typename enable_if_return<Func, void>::type * = nullptr,
+ typename function_traits<Func>::empty_arg * = nullptr>
+ constexpr auto gen_any_callback(Func f) { return std::forward<Func>(f); }
+
+ template<typename Func,
+ typename enable_if_arg<Func, pm_any_t>::type * = nullptr,
+ typename disable_if_return<Func, void>::type * = nullptr,
+ typename function_traits<Func>::non_empty_arg * = nullptr>
+ constexpr auto gen_any_callback(Func f) {
+ using func_t = callback_types<Func>;
+ return [f = std::forward<Func>(f)](pm_any_t v) mutable {
+ return typename func_t::ret_type(f(v));
+ };
+ }
+
+ template<typename Func,
+ typename disable_if_arg<Func, pm_any_t>::type * = nullptr,
+ typename disable_if_return<Func, void>::type * = nullptr,
+ typename function_traits<Func>::non_empty_arg * = nullptr>
+ constexpr auto gen_any_callback(Func f) {
+ using func_t = callback_types<Func>;
+ return [f = std::forward<Func>(f)](pm_any_t v) mutable {
+ try {
+ return typename func_t::ret_type(
+ f(any_cast<typename func_t::arg_type>(v)));
+ } catch (bad_any_cast &e) { PROMISE_ERR_MISMATCH_TYPE; }
+ };
+ }
+
+ template<typename Func,
+ typename disable_if_return<Func, void>::type * = nullptr,
+ typename function_traits<Func>::empty_arg * = nullptr>
+ constexpr auto gen_any_callback(Func f) {
+ using func_t = callback_types<Func>;
+ return [f = std::forward<Func>(f)]() mutable {
+ return typename func_t::ret_type(f());
+ };
+ }
+
+ template<typename FuncFulfilled>
+ inline promise_t promise_t::then(FuncFulfilled on_fulfilled) const {
+ return (*this)->then(gen_any_callback(std::forward<FuncFulfilled>(on_fulfilled)));
+ }
+
+ template<typename FuncFulfilled, typename FuncRejected>
+ inline promise_t promise_t::then(FuncFulfilled on_fulfilled,
+ FuncRejected on_rejected) const {
+ return (*this)->then(gen_any_callback(std::forward<FuncFulfilled>(on_fulfilled)),
+ gen_any_callback(std::forward<FuncRejected>(on_rejected)));
+ }
+
+ template<typename FuncRejected>
+ inline promise_t promise_t::fail(FuncRejected on_rejected) const {
+ return (*this)->fail(gen_any_callback(std::forward<FuncRejected>(on_rejected)));
+ }
+}
+
+#endif
diff --git a/src/type.h b/src/type.h
new file mode 100644
index 0000000..4665979
--- /dev/null
+++ b/src/type.h
@@ -0,0 +1,46 @@
+#ifndef _HOTSTUFF_TYPE_H
+#define _HOTSTUFF_TYPE_H
+
+#include "salticidae/stream.h"
+#include "salticidae/type.h"
+#include "salticidae/util.h"
+
+namespace hotstuff {
+
+using salticidae::uint256_t;
+using salticidae::DataStream;
+using salticidae::htole;
+using salticidae::letoh;
+using salticidae::get_hex;
+using salticidae::from_hex;
+using salticidae::bytearray_t;
+
+inline std::string get_hex10(const uint256_t &x) {
+ return get_hex(x).substr(0, 10);
+}
+
+class HotStuffError: public salticidae::SalticidaeError {
+ public:
+ template<typename... Args>
+ HotStuffError(Args... args): salticidae::SalticidaeError(args...) {}
+};
+
+class HotStuffInvalidEntity: public HotStuffError {
+ public:
+ template<typename... Args>
+ HotStuffInvalidEntity(Args... args): HotStuffError(args...) {}
+};
+
+using salticidae::Serializable;
+
+class Cloneable {
+ public:
+ virtual ~Cloneable() = default;
+ virtual Cloneable *clone() = 0;
+};
+
+using ReplicaID = uint16_t;
+
+}
+
+#endif
diff --git a/src/util.cpp b/src/util.cpp
new file mode 100644
index 0000000..dc509f1
--- /dev/null
+++ b/src/util.cpp
@@ -0,0 +1,7 @@
+#include "util.h"
+
+namespace hotstuff {
+
+Logger logger("hotstuff");
+
+}
diff --git a/src/util.h b/src/util.h
new file mode 100644
index 0000000..42c0135
--- /dev/null
+++ b/src/util.h
@@ -0,0 +1,119 @@
+#ifndef _HOTSTUFF_UTIL_H
+#define _HOTSTUFF_UTIL_H
+
+#include "salticidae/util.h"
+
+namespace hotstuff {
+
+class Logger: public salticidae::Logger {
+ public:
+ using salticidae::Logger::Logger;
+};
+
+extern Logger logger;
+
+#ifdef HOTSTUFF_DEBUG_LOG
+#define HOTSTUFF_NORMAL_LOG
+#define HOTSTUFF_ENABLE_LOG_DEBUG
+#endif
+
+#ifdef HOTSTUFF_NORMAL_LOG
+#define HOTSTUFF_ENABLE_LOG_INFO
+#define HOTSTUFF_ENABLE_LOG_WARN
+#endif
+
+#ifdef HOTSTUFF_ENABLE_LOG_INFO
+#define HOTSTUFF_LOG_INFO(...) hotstuff::logger.info(__VA_ARGS__)
+#else
+#define HOTSTUFF_LOG_INFO(...) ((void)0)
+#endif
+
+#ifdef HOTSTUFF_ENABLE_LOG_DEBUG
+#define HOTSTUFF_LOG_DEBUG(...) hotstuff::logger.debug(__VA_ARGS__)
+#else
+#define HOTSTUFF_LOG_DEBUG(...) ((void)0)
+#endif
+
+#ifdef HOTSTUFF_ENABLE_LOG_WARN
+#define HOTSTUFF_LOG_WARN(...) hotstuff::logger.warning(__VA_ARGS__)
+#else
+#define HOTSTUFF_LOG_WARN(...) ((void)0)
+#endif
+
+#define HOTSTUFF_LOG_ERROR(...) hotstuff::logger.error(__VA_ARGS__)
+
+#ifdef HOTSTUFF_ENABLE_BLK_PROFILE
+class BlockProfiler {
+ enum BlockState {
+ BLK_SEEN,
+ BLK_FETCH,
+ BLK_CC
+ };
+
+ struct BlockProfile {
+ bool is_local; /* is the block proposed by the replica itself? */
+ BlockState state;
+ double hash_seen_time; /* the first time to see block hash */
+ double full_fetch_time; /* the first time to get full block content */
+ double cc_time; /* the time when it receives cc */
+ double commit_time; /* the time when it commits */
+ };
+
+ std::unordered_map<const uint256, BlockProfile> blocks;
+ ElapsedTime timer;
+
+ public:
+ BlockProfiler() { timer.start(); }
+
+ auto rec_blk(const uint256 &blk_hash, bool is_local) {
+ auto it = blocks.find(blk_hash);
+ assert(it == blocks.end());
+ timer.stop(false);
+ return blocks.insert(std::make_pair(blk_hash,
+ BlockProfile{is_local, BLK_SEEN, timer.elapsed_sec, 0, 0, 0})).first;
+ }
+
+ void get_blk(const uint256 &blk_hash) {
+ auto it = blocks.find(blk_hash);
+ if (it == blocks.end())
+ it = rec_blk(blk_hash, false);
+ BlockProfile &blkp = it->second;
+ assert(blkp.state == BLK_SEEN);
+ timer.stop(false);
+ blkp.full_fetch_time = timer.elapsed_sec;
+ blkp.state = BLK_FETCH;
+ }
+
+ void have_cc(const uint256 &blk_hash) {
+ auto it = blocks.find(blk_hash);
+ assert(it != blocks.end());
+ BlockProfile &blkp = it->second;
+ assert(blkp.state == BLK_FETCH);
+ timer.stop(false);
+ blkp.polling_start_time = timer.elapsed_sec;
+ blkp.state = BLK_CC;
+ }
+
+ const char *decide_blk(const uint256 &blk_hash) {
+ static char buff[1024];
+ auto it = blocks.find(blk_hash);
+ assert(it != blocks.end());
+ BlockProfile &blkp = it->second;
+ assert(blkp.state == BLK_CC);
+ timer.stop(false);
+ blkp.commit_time = timer.elapsed_sec;
+ snprintf(buff, sizeof buff, "(%d,%.4f,%.4f,%.4f,%.4f,%.4f)",
+ blkp.is_local,
+ blkp.hash_seen_time, blkp.full_fetch_time,
+ blkp.polling_start_time, blkp.polling_end_time,
+ blkp.commit_time);
+ blocks.erase(it);
+ return buff;
+ }
+};
+
+#endif
+
+}
+
+#endif