aboutsummaryrefslogtreecommitdiff
path: root/include
diff options
context:
space:
mode:
Diffstat (limited to 'include')
-rw-r--r--include/hotstuff/hotstuff.h49
-rw-r--r--include/hotstuff/liveness.h32
-rw-r--r--include/hotstuff/type.h3
-rw-r--r--include/hotstuff/util.h2
-rw-r--r--include/hotstuff/worker.h8
5 files changed, 48 insertions, 46 deletions
diff --git a/include/hotstuff/hotstuff.h b/include/hotstuff/hotstuff.h
index 3d1c7b6..bc8d960 100644
--- a/include/hotstuff/hotstuff.h
+++ b/include/hotstuff/hotstuff.h
@@ -14,7 +14,6 @@
namespace hotstuff {
-using salticidae::MsgNetwork;
using salticidae::PeerNetwork;
using salticidae::ElapsedTime;
using salticidae::_1;
@@ -69,12 +68,12 @@ class HotStuffBase;
template<EntityType ent_type>
class FetchContext: public promise_t {
- Event timeout;
+ TimerEvent timeout;
HotStuffBase *hs;
MsgReqBlock fetch_msg;
const uint256_t ent_hash;
std::unordered_set<NetAddr> replica_ids;
- inline void timeout_cb(evutil_socket_t, short);
+ inline void timeout_cb(TimerEvent &);
public:
FetchContext(const FetchContext &) = delete;
FetchContext &operator=(const FetchContext &) = delete;
@@ -109,7 +108,7 @@ class BlockDeliveryContext: public promise_t {
class HotStuffBase: public HotStuffCore {
using BlockFetchContext = FetchContext<ENT_TYPE_BLK>;
using CmdFetchContext = FetchContext<ENT_TYPE_CMD>;
- using Conn = PeerNetwork<opcode_t>::Conn;
+ using Net = PeerNetwork<opcode_t>;
friend BlockFetchContext;
friend CmdFetchContext;
@@ -120,14 +119,15 @@ class HotStuffBase: public HotStuffCore {
/** the block size */
size_t blk_size;
/** libevent handle */
- EventContext eb;
+ EventContext ec;
VeriPool vpool;
+ std::unordered_set<NetAddr> peers;
private:
/** whether libevent handle is owned by itself */
- bool eb_loop;
+ bool ec_loop;
/** network stack */
- PeerNetwork<opcode_t> pn;
+ Net pn;
#ifdef HOTSTUFF_BLK_PROFILE
BlockProfiler blk_profiler;
#endif
@@ -159,13 +159,13 @@ class HotStuffBase: public HotStuffCore {
void on_deliver_blk(const block_t &blk);
/** deliver consensus message: <propose> */
- inline void propose_handler(MsgPropose &&, Conn &);
+ inline void propose_handler(MsgPropose &&, const Net::conn_t &);
/** deliver consensus message: <vote> */
- inline void vote_handler(MsgVote &&, Conn &);
+ inline void vote_handler(MsgVote &&, const Net::conn_t &);
/** fetches full block data */
- inline void req_blk_handler(MsgReqBlock &&, Conn &);
+ inline void req_blk_handler(MsgReqBlock &&, const Net::conn_t &);
/** receives a block */
- inline void resp_blk_handler(MsgRespBlock &&, Conn &);
+ inline void resp_blk_handler(MsgRespBlock &&, const Net::conn_t &);
void do_broadcast_proposal(const Proposal &) override;
void do_vote(ReplicaID, const Vote &) override;
@@ -183,8 +183,9 @@ class HotStuffBase: public HotStuffCore {
privkey_bt &&priv_key,
NetAddr listen_addr,
pacemaker_bt pmaker,
- EventContext eb,
- size_t nworker);
+ EventContext ec,
+ size_t nworker,
+ const Net::Config &config = Net::Config());
~HotStuffBase();
@@ -193,9 +194,9 @@ class HotStuffBase: public HotStuffCore {
/* Submit the command to be decided. */
promise_t exec_command(uint256_t cmd);
void add_replica(ReplicaID idx, const NetAddr &addr, pubkey_bt &&pub_key);
- void start(bool eb_loop = false);
+ void start(bool ec_loop = false);
- size_t size() const { return pn.all_peers().size(); }
+ size_t size() const { return peers.size(); }
PaceMaker &get_pace_maker() { return *pmaker; }
void print_stat() const;
@@ -247,14 +248,14 @@ class HotStuff: public HotStuffBase {
const bytearray_t &raw_privkey,
NetAddr listen_addr,
pacemaker_bt pmaker,
- EventContext eb = EventContext(),
+ EventContext ec = EventContext(),
size_t nworker = 4):
HotStuffBase(blk_size,
rid,
new PrivKeyType(raw_privkey),
listen_addr,
std::move(pmaker),
- eb,
+ ec,
nworker) {}
void add_replica(ReplicaID idx, const NetAddr &addr, const bytearray_t &pubkey_raw) {
@@ -275,13 +276,13 @@ FetchContext<ent_type>::FetchContext(FetchContext && other):
ent_hash(other.ent_hash),
replica_ids(std::move(other.replica_ids)) {
other.timeout.del();
- timeout = Event(hs->eb, -1, 0,
- std::bind(&FetchContext::timeout_cb, this, _1, _2));
+ timeout = TimerEvent(hs->ec,
+ std::bind(&FetchContext::timeout_cb, this, _1));
reset_timeout();
}
template<>
-inline void FetchContext<ENT_TYPE_CMD>::timeout_cb(evutil_socket_t, short) {
+inline void FetchContext<ENT_TYPE_CMD>::timeout_cb(TimerEvent &) {
HOTSTUFF_LOG_WARN("cmd fetching %.10s timeout", get_hex(ent_hash).c_str());
for (const auto &replica_id: replica_ids)
send(replica_id);
@@ -289,7 +290,7 @@ inline void FetchContext<ENT_TYPE_CMD>::timeout_cb(evutil_socket_t, short) {
}
template<>
-inline void FetchContext<ENT_TYPE_BLK>::timeout_cb(evutil_socket_t, short) {
+inline void FetchContext<ENT_TYPE_BLK>::timeout_cb(TimerEvent &) {
HOTSTUFF_LOG_WARN("block fetching %.10s timeout", get_hex(ent_hash).c_str());
for (const auto &replica_id: replica_ids)
send(replica_id);
@@ -303,8 +304,8 @@ FetchContext<ent_type>::FetchContext(
hs(hs), ent_hash(ent_hash) {
fetch_msg = std::vector<uint256_t>{ent_hash};
- timeout = Event(hs->eb, -1, 0,
- std::bind(&FetchContext::timeout_cb, this, _1, _2));
+ timeout = TimerEvent(hs->ec,
+ std::bind(&FetchContext::timeout_cb, this, _1));
reset_timeout();
}
@@ -316,7 +317,7 @@ void FetchContext<ent_type>::send(const NetAddr &replica_id) {
template<EntityType ent_type>
void FetchContext<ent_type>::reset_timeout() {
- timeout.add_with_timeout(salticidae::gen_rand_timeout(ent_waiting_timeout));
+ timeout.add(salticidae::gen_rand_timeout(ent_waiting_timeout));
}
template<EntityType ent_type>
diff --git a/include/hotstuff/liveness.h b/include/hotstuff/liveness.h
index c88a0a1..36caca2 100644
--- a/include/hotstuff/liveness.h
+++ b/include/hotstuff/liveness.h
@@ -240,8 +240,8 @@ class PMStickyProposer: virtual public PaceMaker {
double candidate_timeout;
EventContext ec;
/** QC timer or randomized timeout */
- Event timer;
- Event ev_imp;
+ TimerEvent timer;
+ TimerEvent ev_imp;
block_t last_proposed;
/** the proposer it believes */
ReplicaID proposer;
@@ -304,7 +304,7 @@ class PMStickyProposer: virtual public PaceMaker {
.then([this, pm]() {
timer.del();
pm.resolve(proposer);
- timer.add_with_timeout(qc_timeout);
+ timer.add(qc_timeout);
HOTSTUFF_LOG_PROTO("QC timer reset");
});
locked = true;
@@ -350,7 +350,7 @@ class PMStickyProposer: virtual public PaceMaker {
});
double t = salticidae::gen_rand_timeout(candidate_timeout);
timer.del();
- timer.add_with_timeout(t);
+ timer.add(t);
HOTSTUFF_LOG_INFO("candidate next try in %.2fs", t);
propose_elect_block();
}
@@ -401,7 +401,7 @@ class PMStickyProposer: virtual public PaceMaker {
proposer = hsc->get_id();
last_proposed = nullptr;
hsc->set_neg_vote(true);
- timer = Event(ec, -1, 0, [this](int, short) {
+ timer = TimerEvent(ec, [this](TimerEvent &) {
/* proposer unable to get a QC in time */
to_candidate();
});
@@ -416,7 +416,7 @@ class PMStickyProposer: virtual public PaceMaker {
proposer = hsc->get_id();
last_proposed = nullptr;
hsc->set_neg_vote(false);
- timer = Event(ec, -1, 0, [this](int, short) {
+ timer = TimerEvent(ec, [this](TimerEvent &) {
candidate_qc_timeout();
});
candidate_timeout = qc_timeout;
@@ -427,10 +427,10 @@ class PMStickyProposer: virtual public PaceMaker {
protected:
void impeach() override {
if (role == CANDIDATE) return;
- ev_imp = Event(ec, -1, 0, [this](int, short) {
+ ev_imp = TimerEvent(ec, [this](TimerEvent &) {
to_candidate();
});
- ev_imp.add_with_timeout(0);
+ ev_imp.add(0);
HOTSTUFF_LOG_INFO("schedule to impeach the proposer");
}
@@ -490,8 +490,8 @@ class PMRoundRobinProposer: virtual public PaceMaker {
double candidate_timeout;
EventContext ec;
/** QC timer or randomized timeout */
- Event timer;
- Event ev_imp;
+ TimerEvent timer;
+ TimerEvent ev_imp;
block_t last_proposed;
/** the proposer it believes */
ReplicaID proposer;
@@ -554,7 +554,7 @@ class PMRoundRobinProposer: virtual public PaceMaker {
.then([this, pm]() {
timer.del();
pm.resolve(proposer);
- timer.add_with_timeout(qc_timeout);
+ timer.add(qc_timeout);
HOTSTUFF_LOG_PROTO("QC timer reset");
});
locked = true;
@@ -608,7 +608,7 @@ class PMRoundRobinProposer: virtual public PaceMaker {
void candidate_qc_timeout() {
timer.del();
- timer.add_with_timeout(candidate_timeout);
+ timer.add(candidate_timeout);
candidate_timeout *= 1.01;
proposer = (proposer + 1) % hsc->get_config().nreplicas;
if (proposer == hsc->get_id())
@@ -654,7 +654,7 @@ class PMRoundRobinProposer: virtual public PaceMaker {
role = PROPOSER;
last_proposed = nullptr;
hsc->set_neg_vote(true);
- timer = Event(ec, -1, 0, [this](int, short) {
+ timer = TimerEvent(ec, [this](TimerEvent &) {
/* proposer unable to get a QC in time */
to_candidate();
});
@@ -667,7 +667,7 @@ class PMRoundRobinProposer: virtual public PaceMaker {
role = CANDIDATE;
last_proposed = nullptr;
hsc->set_neg_vote(false);
- timer = Event(ec, -1, 0, [this](int, short) {
+ timer = TimerEvent(ec, [this](TimerEvent &) {
candidate_qc_timeout();
});
candidate_timeout = qc_timeout * 0.1;
@@ -678,10 +678,10 @@ class PMRoundRobinProposer: virtual public PaceMaker {
protected:
void impeach() override {
if (role == CANDIDATE) return;
- ev_imp = Event(ec, -1, 0, [this](int, short) {
+ ev_imp = TimerEvent(ec, [this](TimerEvent &) {
to_candidate();
});
- ev_imp.add_with_timeout(0);
+ ev_imp.add(0);
HOTSTUFF_LOG_INFO("schedule to impeach the proposer");
}
diff --git a/include/hotstuff/type.h b/include/hotstuff/type.h
index 0897956..784a952 100644
--- a/include/hotstuff/type.h
+++ b/include/hotstuff/type.h
@@ -24,7 +24,8 @@ using salticidae::bytearray_t;
using salticidae::get_hash;
using salticidae::NetAddr;
-using salticidae::Event;
+using salticidae::TimerEvent;
+using salticidae::FdEvent;
using salticidae::EventContext;
using promise::promise_t;
diff --git a/include/hotstuff/util.h b/include/hotstuff/util.h
index 25dda70..efec4be 100644
--- a/include/hotstuff/util.h
+++ b/include/hotstuff/util.h
@@ -13,7 +13,7 @@ class Logger: public salticidae::Logger {
void proto(const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
- write("proto", fmt, ap);
+ write("proto", is_tty() ? salticidae::TTY_COLOR_MAGENTA : nullptr, fmt, ap);
va_end(ap);
}
};
diff --git a/include/hotstuff/worker.h b/include/hotstuff/worker.h
index 229b1bf..e39ea66 100644
--- a/include/hotstuff/worker.h
+++ b/include/hotstuff/worker.h
@@ -21,7 +21,7 @@ using veritask_ut = BoxObj<VeriTask>;
class VeriPool {
using queue_t = moodycamel::BlockingConcurrentQueue<VeriTask *>;
int fin_fd[2];
- Event fin_ev;
+ FdEvent fin_ev;
queue_t in_queue;
queue_t out_queue;
std::thread notifier;
@@ -30,7 +30,7 @@ class VeriPool {
public:
VeriPool(EventContext ec, size_t nworker) {
pipe(fin_fd);
- fin_ev = Event(ec, fin_fd[0], EV_READ, [&](int fd, short) {
+ fin_ev = FdEvent(ec, fin_fd[0], [&](int fd, short) {
VeriTask *task;
bool result;
read(fd, &task, sizeof(VeriTask *));
@@ -38,9 +38,9 @@ class VeriPool {
auto it = pms.find(task);
it->second.second.resolve(result);
pms.erase(it);
- fin_ev.add();
+ fin_ev.add(FdEvent::READ);
});
- fin_ev.add();
+ fin_ev.add(FdEvent::READ);
// finish notifier thread
notifier = std::thread([this]() {
while (true)