aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/hotstuff/hotstuff.h49
-rw-r--r--include/hotstuff/liveness.h32
-rw-r--r--include/hotstuff/type.h3
-rw-r--r--include/hotstuff/util.h2
-rw-r--r--include/hotstuff/worker.h8
m---------salticidae0
-rw-r--r--src/hotstuff.cpp59
-rw-r--r--src/hotstuff_app.cpp158
-rw-r--r--src/hotstuff_client.cpp116
-rw-r--r--test/CMakeLists.txt3
-rw-r--r--test/test_concurrent_queue.cpp68
11 files changed, 204 insertions, 294 deletions
diff --git a/include/hotstuff/hotstuff.h b/include/hotstuff/hotstuff.h
index 3d1c7b6..bc8d960 100644
--- a/include/hotstuff/hotstuff.h
+++ b/include/hotstuff/hotstuff.h
@@ -14,7 +14,6 @@
namespace hotstuff {
-using salticidae::MsgNetwork;
using salticidae::PeerNetwork;
using salticidae::ElapsedTime;
using salticidae::_1;
@@ -69,12 +68,12 @@ class HotStuffBase;
template<EntityType ent_type>
class FetchContext: public promise_t {
- Event timeout;
+ TimerEvent timeout;
HotStuffBase *hs;
MsgReqBlock fetch_msg;
const uint256_t ent_hash;
std::unordered_set<NetAddr> replica_ids;
- inline void timeout_cb(evutil_socket_t, short);
+ inline void timeout_cb(TimerEvent &);
public:
FetchContext(const FetchContext &) = delete;
FetchContext &operator=(const FetchContext &) = delete;
@@ -109,7 +108,7 @@ class BlockDeliveryContext: public promise_t {
class HotStuffBase: public HotStuffCore {
using BlockFetchContext = FetchContext<ENT_TYPE_BLK>;
using CmdFetchContext = FetchContext<ENT_TYPE_CMD>;
- using Conn = PeerNetwork<opcode_t>::Conn;
+ using Net = PeerNetwork<opcode_t>;
friend BlockFetchContext;
friend CmdFetchContext;
@@ -120,14 +119,15 @@ class HotStuffBase: public HotStuffCore {
/** the block size */
size_t blk_size;
/** libevent handle */
- EventContext eb;
+ EventContext ec;
VeriPool vpool;
+ std::unordered_set<NetAddr> peers;
private:
/** whether libevent handle is owned by itself */
- bool eb_loop;
+ bool ec_loop;
/** network stack */
- PeerNetwork<opcode_t> pn;
+ Net pn;
#ifdef HOTSTUFF_BLK_PROFILE
BlockProfiler blk_profiler;
#endif
@@ -159,13 +159,13 @@ class HotStuffBase: public HotStuffCore {
void on_deliver_blk(const block_t &blk);
/** deliver consensus message: <propose> */
- inline void propose_handler(MsgPropose &&, Conn &);
+ inline void propose_handler(MsgPropose &&, const Net::conn_t &);
/** deliver consensus message: <vote> */
- inline void vote_handler(MsgVote &&, Conn &);
+ inline void vote_handler(MsgVote &&, const Net::conn_t &);
/** fetches full block data */
- inline void req_blk_handler(MsgReqBlock &&, Conn &);
+ inline void req_blk_handler(MsgReqBlock &&, const Net::conn_t &);
/** receives a block */
- inline void resp_blk_handler(MsgRespBlock &&, Conn &);
+ inline void resp_blk_handler(MsgRespBlock &&, const Net::conn_t &);
void do_broadcast_proposal(const Proposal &) override;
void do_vote(ReplicaID, const Vote &) override;
@@ -183,8 +183,9 @@ class HotStuffBase: public HotStuffCore {
privkey_bt &&priv_key,
NetAddr listen_addr,
pacemaker_bt pmaker,
- EventContext eb,
- size_t nworker);
+ EventContext ec,
+ size_t nworker,
+ const Net::Config &config = Net::Config());
~HotStuffBase();
@@ -193,9 +194,9 @@ class HotStuffBase: public HotStuffCore {
/* Submit the command to be decided. */
promise_t exec_command(uint256_t cmd);
void add_replica(ReplicaID idx, const NetAddr &addr, pubkey_bt &&pub_key);
- void start(bool eb_loop = false);
+ void start(bool ec_loop = false);
- size_t size() const { return pn.all_peers().size(); }
+ size_t size() const { return peers.size(); }
PaceMaker &get_pace_maker() { return *pmaker; }
void print_stat() const;
@@ -247,14 +248,14 @@ class HotStuff: public HotStuffBase {
const bytearray_t &raw_privkey,
NetAddr listen_addr,
pacemaker_bt pmaker,
- EventContext eb = EventContext(),
+ EventContext ec = EventContext(),
size_t nworker = 4):
HotStuffBase(blk_size,
rid,
new PrivKeyType(raw_privkey),
listen_addr,
std::move(pmaker),
- eb,
+ ec,
nworker) {}
void add_replica(ReplicaID idx, const NetAddr &addr, const bytearray_t &pubkey_raw) {
@@ -275,13 +276,13 @@ FetchContext<ent_type>::FetchContext(FetchContext && other):
ent_hash(other.ent_hash),
replica_ids(std::move(other.replica_ids)) {
other.timeout.del();
- timeout = Event(hs->eb, -1, 0,
- std::bind(&FetchContext::timeout_cb, this, _1, _2));
+ timeout = TimerEvent(hs->ec,
+ std::bind(&FetchContext::timeout_cb, this, _1));
reset_timeout();
}
template<>
-inline void FetchContext<ENT_TYPE_CMD>::timeout_cb(evutil_socket_t, short) {
+inline void FetchContext<ENT_TYPE_CMD>::timeout_cb(TimerEvent &) {
HOTSTUFF_LOG_WARN("cmd fetching %.10s timeout", get_hex(ent_hash).c_str());
for (const auto &replica_id: replica_ids)
send(replica_id);
@@ -289,7 +290,7 @@ inline void FetchContext<ENT_TYPE_CMD>::timeout_cb(evutil_socket_t, short) {
}
template<>
-inline void FetchContext<ENT_TYPE_BLK>::timeout_cb(evutil_socket_t, short) {
+inline void FetchContext<ENT_TYPE_BLK>::timeout_cb(TimerEvent &) {
HOTSTUFF_LOG_WARN("block fetching %.10s timeout", get_hex(ent_hash).c_str());
for (const auto &replica_id: replica_ids)
send(replica_id);
@@ -303,8 +304,8 @@ FetchContext<ent_type>::FetchContext(
hs(hs), ent_hash(ent_hash) {
fetch_msg = std::vector<uint256_t>{ent_hash};
- timeout = Event(hs->eb, -1, 0,
- std::bind(&FetchContext::timeout_cb, this, _1, _2));
+ timeout = TimerEvent(hs->ec,
+ std::bind(&FetchContext::timeout_cb, this, _1));
reset_timeout();
}
@@ -316,7 +317,7 @@ void FetchContext<ent_type>::send(const NetAddr &replica_id) {
template<EntityType ent_type>
void FetchContext<ent_type>::reset_timeout() {
- timeout.add_with_timeout(salticidae::gen_rand_timeout(ent_waiting_timeout));
+ timeout.add(salticidae::gen_rand_timeout(ent_waiting_timeout));
}
template<EntityType ent_type>
diff --git a/include/hotstuff/liveness.h b/include/hotstuff/liveness.h
index c88a0a1..36caca2 100644
--- a/include/hotstuff/liveness.h
+++ b/include/hotstuff/liveness.h
@@ -240,8 +240,8 @@ class PMStickyProposer: virtual public PaceMaker {
double candidate_timeout;
EventContext ec;
/** QC timer or randomized timeout */
- Event timer;
- Event ev_imp;
+ TimerEvent timer;
+ TimerEvent ev_imp;
block_t last_proposed;
/** the proposer it believes */
ReplicaID proposer;
@@ -304,7 +304,7 @@ class PMStickyProposer: virtual public PaceMaker {
.then([this, pm]() {
timer.del();
pm.resolve(proposer);
- timer.add_with_timeout(qc_timeout);
+ timer.add(qc_timeout);
HOTSTUFF_LOG_PROTO("QC timer reset");
});
locked = true;
@@ -350,7 +350,7 @@ class PMStickyProposer: virtual public PaceMaker {
});
double t = salticidae::gen_rand_timeout(candidate_timeout);
timer.del();
- timer.add_with_timeout(t);
+ timer.add(t);
HOTSTUFF_LOG_INFO("candidate next try in %.2fs", t);
propose_elect_block();
}
@@ -401,7 +401,7 @@ class PMStickyProposer: virtual public PaceMaker {
proposer = hsc->get_id();
last_proposed = nullptr;
hsc->set_neg_vote(true);
- timer = Event(ec, -1, 0, [this](int, short) {
+ timer = TimerEvent(ec, [this](TimerEvent &) {
/* proposer unable to get a QC in time */
to_candidate();
});
@@ -416,7 +416,7 @@ class PMStickyProposer: virtual public PaceMaker {
proposer = hsc->get_id();
last_proposed = nullptr;
hsc->set_neg_vote(false);
- timer = Event(ec, -1, 0, [this](int, short) {
+ timer = TimerEvent(ec, [this](TimerEvent &) {
candidate_qc_timeout();
});
candidate_timeout = qc_timeout;
@@ -427,10 +427,10 @@ class PMStickyProposer: virtual public PaceMaker {
protected:
void impeach() override {
if (role == CANDIDATE) return;
- ev_imp = Event(ec, -1, 0, [this](int, short) {
+ ev_imp = TimerEvent(ec, [this](TimerEvent &) {
to_candidate();
});
- ev_imp.add_with_timeout(0);
+ ev_imp.add(0);
HOTSTUFF_LOG_INFO("schedule to impeach the proposer");
}
@@ -490,8 +490,8 @@ class PMRoundRobinProposer: virtual public PaceMaker {
double candidate_timeout;
EventContext ec;
/** QC timer or randomized timeout */
- Event timer;
- Event ev_imp;
+ TimerEvent timer;
+ TimerEvent ev_imp;
block_t last_proposed;
/** the proposer it believes */
ReplicaID proposer;
@@ -554,7 +554,7 @@ class PMRoundRobinProposer: virtual public PaceMaker {
.then([this, pm]() {
timer.del();
pm.resolve(proposer);
- timer.add_with_timeout(qc_timeout);
+ timer.add(qc_timeout);
HOTSTUFF_LOG_PROTO("QC timer reset");
});
locked = true;
@@ -608,7 +608,7 @@ class PMRoundRobinProposer: virtual public PaceMaker {
void candidate_qc_timeout() {
timer.del();
- timer.add_with_timeout(candidate_timeout);
+ timer.add(candidate_timeout);
candidate_timeout *= 1.01;
proposer = (proposer + 1) % hsc->get_config().nreplicas;
if (proposer == hsc->get_id())
@@ -654,7 +654,7 @@ class PMRoundRobinProposer: virtual public PaceMaker {
role = PROPOSER;
last_proposed = nullptr;
hsc->set_neg_vote(true);
- timer = Event(ec, -1, 0, [this](int, short) {
+ timer = TimerEvent(ec, [this](TimerEvent &) {
/* proposer unable to get a QC in time */
to_candidate();
});
@@ -667,7 +667,7 @@ class PMRoundRobinProposer: virtual public PaceMaker {
role = CANDIDATE;
last_proposed = nullptr;
hsc->set_neg_vote(false);
- timer = Event(ec, -1, 0, [this](int, short) {
+ timer = TimerEvent(ec, [this](TimerEvent &) {
candidate_qc_timeout();
});
candidate_timeout = qc_timeout * 0.1;
@@ -678,10 +678,10 @@ class PMRoundRobinProposer: virtual public PaceMaker {
protected:
void impeach() override {
if (role == CANDIDATE) return;
- ev_imp = Event(ec, -1, 0, [this](int, short) {
+ ev_imp = TimerEvent(ec, [this](TimerEvent &) {
to_candidate();
});
- ev_imp.add_with_timeout(0);
+ ev_imp.add(0);
HOTSTUFF_LOG_INFO("schedule to impeach the proposer");
}
diff --git a/include/hotstuff/type.h b/include/hotstuff/type.h
index 0897956..784a952 100644
--- a/include/hotstuff/type.h
+++ b/include/hotstuff/type.h
@@ -24,7 +24,8 @@ using salticidae::bytearray_t;
using salticidae::get_hash;
using salticidae::NetAddr;
-using salticidae::Event;
+using salticidae::TimerEvent;
+using salticidae::FdEvent;
using salticidae::EventContext;
using promise::promise_t;
diff --git a/include/hotstuff/util.h b/include/hotstuff/util.h
index 25dda70..efec4be 100644
--- a/include/hotstuff/util.h
+++ b/include/hotstuff/util.h
@@ -13,7 +13,7 @@ class Logger: public salticidae::Logger {
void proto(const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
- write("proto", fmt, ap);
+ write("proto", is_tty() ? salticidae::TTY_COLOR_MAGENTA : nullptr, fmt, ap);
va_end(ap);
}
};
diff --git a/include/hotstuff/worker.h b/include/hotstuff/worker.h
index 229b1bf..e39ea66 100644
--- a/include/hotstuff/worker.h
+++ b/include/hotstuff/worker.h
@@ -21,7 +21,7 @@ using veritask_ut = BoxObj<VeriTask>;
class VeriPool {
using queue_t = moodycamel::BlockingConcurrentQueue<VeriTask *>;
int fin_fd[2];
- Event fin_ev;
+ FdEvent fin_ev;
queue_t in_queue;
queue_t out_queue;
std::thread notifier;
@@ -30,7 +30,7 @@ class VeriPool {
public:
VeriPool(EventContext ec, size_t nworker) {
pipe(fin_fd);
- fin_ev = Event(ec, fin_fd[0], EV_READ, [&](int fd, short) {
+ fin_ev = FdEvent(ec, fin_fd[0], [&](int fd, short) {
VeriTask *task;
bool result;
read(fd, &task, sizeof(VeriTask *));
@@ -38,9 +38,9 @@ class VeriPool {
auto it = pms.find(task);
it->second.second.resolve(result);
pms.erase(it);
- fin_ev.add();
+ fin_ev.add(FdEvent::READ);
});
- fin_ev.add();
+ fin_ev.add(FdEvent::READ);
// finish notifier thread
notifier = std::thread([this]() {
while (true)
diff --git a/salticidae b/salticidae
-Subproject 0eea9ddc7cfb2820295dd87aed3dc911a100ecd
+Subproject 2b1f6791ddfd8ef4fb21cb4b50a3d6bc8694586
diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp
index 1912946..a8cc625 100644
--- a/src/hotstuff.cpp
+++ b/src/hotstuff.cpp
@@ -107,7 +107,10 @@ void HotStuffBase::add_replica(ReplicaID idx, const NetAddr &addr,
pubkey_bt &&pub_key) {
HotStuffCore::add_replica(idx, addr, std::move(pub_key));
if (addr != listen_addr)
+ {
+ peers.insert(addr);
pn.add_peer(addr);
+ }
}
void HotStuffBase::on_fetch_blk(const block_t &blk) {
@@ -221,8 +224,8 @@ promise_t HotStuffBase::async_deliver_blk(const uint256_t &blk_hash,
return static_cast<promise_t &>(pm);
}
-void HotStuffBase::propose_handler(MsgPropose &&msg, Conn &conn) {
- const NetAddr &peer = conn.get_peer();
+void HotStuffBase::propose_handler(MsgPropose &&msg, const Net::conn_t &conn) {
+ const NetAddr &peer = conn->get_peer();
msg.postponed_parse(this);
auto &prop = msg.proposal;
block_t blk = prop.blk;
@@ -235,8 +238,8 @@ void HotStuffBase::propose_handler(MsgPropose &&msg, Conn &conn) {
});
}
-void HotStuffBase::vote_handler(MsgVote &&msg, Conn &conn) {
- const NetAddr &peer = conn.get_peer();
+void HotStuffBase::vote_handler(MsgVote &&msg, const Net::conn_t &conn) {
+ const NetAddr &peer = conn->get_peer();
msg.postponed_parse(this);
//auto &vote = msg.vote;
RcObj<Vote> v(new Vote(std::move(msg.vote)));
@@ -255,8 +258,8 @@ void HotStuffBase::vote_handler(MsgVote &&msg, Conn &conn) {
});
}
-void HotStuffBase::req_blk_handler(MsgReqBlock &&msg, Conn &conn) {
- const NetAddr replica = conn.get_peer();
+void HotStuffBase::req_blk_handler(MsgReqBlock &&msg, const Net::conn_t &conn) {
+ const NetAddr replica = conn->get_peer();
auto &blk_hashes = msg.blk_hashes;
std::vector<promise_t> pms;
for (const auto &h: blk_hashes)
@@ -272,7 +275,7 @@ void HotStuffBase::req_blk_handler(MsgReqBlock &&msg, Conn &conn) {
});
}
-void HotStuffBase::resp_blk_handler(MsgRespBlock &&msg, Conn &) {
+void HotStuffBase::resp_blk_handler(MsgRespBlock &&msg, const Net::conn_t &) {
msg.postponed_parse(this);
for (const auto &blk: msg.blks)
if (blk) on_fetch_blk(blk);
@@ -310,28 +313,10 @@ void HotStuffBase::print_stat() const {
part_delivery_time_min = double_inf;
part_delivery_time_max = 0;
#ifdef HOTSTUFF_MSG_STAT
- LOG_INFO("-- sent opcode (10s) --");
- auto &sent_op = pn.get_sent_by_opcode();
- for (auto &op: sent_op)
- {
- auto &val = op.second;
- LOG_INFO("%02x: %lu, %.2fBpm", op.first,
- val.first, val.first ? val.second / double(val.first) : 0);
- val.first = val.second = 0;
- }
- LOG_INFO("-- recv opcode (10s) --");
- auto &recv_op = pn.get_recv_by_opcode();
- for (auto &op: recv_op)
- {
- auto &val = op.second;
- LOG_INFO("%02x: %lu, %.2fBpm", op.first,
- val.first, val.first ? val.second / double(val.first) : 0);
- val.first = val.second = 0;
- }
LOG_INFO("--- replica msg. (10s) ---");
size_t _nsent = 0;
size_t _nrecv = 0;
- for (const auto &replica: pn.all_peers())
+ for (const auto &replica: peers)
{
auto conn = pn.get_peer_conn(replica);
if (conn == nullptr) continue;
@@ -361,14 +346,15 @@ HotStuffBase::HotStuffBase(uint32_t blk_size,
privkey_bt &&priv_key,
NetAddr listen_addr,
pacemaker_bt pmaker,
- EventContext eb,
- size_t nworker):
+ EventContext ec,
+ size_t nworker,
+ const Net::Config &config):
HotStuffCore(rid, std::move(priv_key)),
listen_addr(listen_addr),
blk_size(blk_size),
- eb(eb),
- vpool(eb, nworker),
- pn(eb),
+ ec(ec),
+ vpool(ec, nworker),
+ pn(ec, config),
pmaker(std::move(pmaker)),
fetched(0), delivered(0),
@@ -387,12 +373,13 @@ HotStuffBase::HotStuffBase(uint32_t blk_size,
pn.reg_handler(salticidae::generic_bind(&HotStuffBase::vote_handler, this, _1, _2));
pn.reg_handler(salticidae::generic_bind(&HotStuffBase::req_blk_handler, this, _1, _2));
pn.reg_handler(salticidae::generic_bind(&HotStuffBase::resp_blk_handler, this, _1, _2));
+ pn.start();
pn.listen(listen_addr);
}
void HotStuffBase::do_broadcast_proposal(const Proposal &prop) {
MsgPropose prop_msg(prop);
- for (const auto &replica: pn.all_peers())
+ for (const auto &replica: peers)
pn.send_msg(prop_msg, replica);
}
@@ -422,15 +409,15 @@ void HotStuffBase::do_decide(Finality &&fin) {
HotStuffBase::~HotStuffBase() {}
-void HotStuffBase::start(bool eb_loop) {
+void HotStuffBase::start(bool ec_loop) {
/* ((n - 1) + 1 - 1) / 3 */
- uint32_t nfaulty = pn.all_peers().size() / 3;
+ uint32_t nfaulty = peers.size() / 3;
if (nfaulty == 0)
LOG_WARN("too few replicas in the system to tolerate any failure");
on_init(nfaulty);
pmaker->init(this);
- if (eb_loop)
- eb.dispatch();
+ if (ec_loop)
+ ec.dispatch();
}
}
diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp
index 014fe16..ee57a3a 100644
--- a/src/hotstuff_app.cpp
+++ b/src/hotstuff_app.cpp
@@ -29,7 +29,7 @@ using salticidae::static_pointer_cast;
using salticidae::trim_all;
using salticidae::split;
-using hotstuff::Event;
+using hotstuff::TimerEvent;
using hotstuff::EventContext;
using hotstuff::NetAddr;
using hotstuff::HotStuffError;
@@ -55,17 +55,17 @@ class HotStuffApp: public HotStuff {
/** Network messaging between a replica and its client. */
ClientNetwork<opcode_t> cn;
/** Timer object to schedule a periodic printing of system statistics */
- Event ev_stat_timer;
+ TimerEvent ev_stat_timer;
/** Timer object to monitor the progress for simple impeachment */
- Event impeach_timer;
+ TimerEvent impeach_timer;
/** The listen address for client RPC */
NetAddr clisten_addr;
std::unordered_map<const uint256_t, promise_t> unconfirmed;
- using Conn = ClientNetwork<opcode_t>::Conn;
+ using conn_t = ClientNetwork<opcode_t>::conn_t;
- void client_request_cmd_handler(MsgReqCmd &&, Conn &);
+ void client_request_cmd_handler(MsgReqCmd &&, const conn_t &);
static command_t parse_cmd(DataStream &s) {
auto cmd = new CommandDummy();
@@ -75,7 +75,7 @@ class HotStuffApp: public HotStuff {
void reset_imp_timer() {
impeach_timer.del();
- impeach_timer.add_with_timeout(impeach_timeout);
+ impeach_timer.add(impeach_timeout);
}
void state_machine_execute(const Finality &fin) override {
@@ -113,10 +113,6 @@ std::pair<std::string, std::string> split_ip_port_cport(const std::string &s) {
return std::make_pair(ret[0], ret[1]);
}
-void signal_handler(int) {
- throw HotStuffError("got terminal signal");
-}
-
salticidae::BoxObj<HotStuffApp> papp = nullptr;
int main(int argc, char **argv) {
@@ -125,9 +121,6 @@ int main(int argc, char **argv) {
ElapsedTime elapsed;
elapsed.start();
- signal(SIGTERM, signal_handler);
- signal(SIGINT, signal_handler);
-
auto opt_blk_size = Config::OptValInt::create(1);
auto opt_parent_limit = Config::OptValInt::create(-1);
auto opt_stat_period = Config::OptValDouble::create(10);
@@ -157,74 +150,72 @@ int main(int argc, char **argv) {
config.add_opt("help", opt_help, Config::SWITCH_ON, 'h', "show this help info");
EventContext ec;
-#ifdef HOTSTUFF_NORMAL_LOG
- try {
-#endif
- config.parse(argc, argv);
- if (opt_help->get())
- {
- config.print_help();
- exit(0);
- }
- auto idx = opt_idx->get();
- auto client_port = opt_client_port->get();
- std::vector<std::pair<std::string, std::string>> replicas;
- for (const auto &s: opt_replicas->get())
- {
- auto res = trim_all(split(s, ","));
- if (res.size() != 2)
- throw HotStuffError("invalid replica info");
- replicas.push_back(std::make_pair(res[0], res[1]));
- }
+ config.parse(argc, argv);
+ if (opt_help->get())
+ {
+ config.print_help();
+ exit(0);
+ }
+ auto idx = opt_idx->get();
+ auto client_port = opt_client_port->get();
+ std::vector<std::pair<std::string, std::string>> replicas;
+ for (const auto &s: opt_replicas->get())
+ {
+ auto res = trim_all(split(s, ","));
+ if (res.size() != 2)
+ throw HotStuffError("invalid replica info");
+ replicas.push_back(std::make_pair(res[0], res[1]));
+ }
- if (!(0 <= idx && (size_t)idx < replicas.size()))
- throw HotStuffError("replica idx out of range");
- std::string binding_addr = replicas[idx].first;
- if (client_port == -1)
- {
- auto p = split_ip_port_cport(binding_addr);
- size_t idx;
- try {
- client_port = stoi(p.second, &idx);
- } catch (std::invalid_argument &) {
- throw HotStuffError("client port not specified");
- }
+ if (!(0 <= idx && (size_t)idx < replicas.size()))
+ throw HotStuffError("replica idx out of range");
+ std::string binding_addr = replicas[idx].first;
+ if (client_port == -1)
+ {
+ auto p = split_ip_port_cport(binding_addr);
+ size_t idx;
+ try {
+ client_port = stoi(p.second, &idx);
+ } catch (std::invalid_argument &) {
+ throw HotStuffError("client port not specified");
}
+ }
- NetAddr plisten_addr{split_ip_port_cport(binding_addr).first};
+ NetAddr plisten_addr{split_ip_port_cport(binding_addr).first};
- auto parent_limit = opt_parent_limit->get();
- hotstuff::pacemaker_bt pmaker;
- if (opt_pace_maker->get() == "sticky")
- pmaker = new hotstuff::PaceMakerSticky(parent_limit, opt_qc_timeout->get(), ec);
- else if (opt_pace_maker->get() == "rr")
- pmaker = new hotstuff::PaceMakerRR(parent_limit, opt_qc_timeout->get(), ec);
- else
- pmaker = new hotstuff::PaceMakerDummyFixed(opt_fixed_proposer->get(), parent_limit);
+ auto parent_limit = opt_parent_limit->get();
+ hotstuff::pacemaker_bt pmaker;
+ if (opt_pace_maker->get() == "sticky")
+ pmaker = new hotstuff::PaceMakerSticky(parent_limit, opt_qc_timeout->get(), ec);
+ else if (opt_pace_maker->get() == "rr")
+ pmaker = new hotstuff::PaceMakerRR(parent_limit, opt_qc_timeout->get(), ec);
+ else
+ pmaker = new hotstuff::PaceMakerDummyFixed(opt_fixed_proposer->get(), parent_limit);
- papp = new HotStuffApp(opt_blk_size->get(),
- opt_stat_period->get(),
- opt_imp_timeout->get(),
- idx,
- hotstuff::from_hex(opt_privkey->get()),
- plisten_addr,
- NetAddr("0.0.0.0", client_port),
- std::move(pmaker),
- ec,
- opt_nworker->get());
- for (size_t i = 0; i < replicas.size(); i++)
- {
- auto p = split_ip_port_cport(replicas[i].first);
- papp->add_replica(i, NetAddr(p.first),
- hotstuff::from_hex(replicas[i].second));
- }
- papp->start();
-#ifdef HOTSTUFF_NORMAL_LOG
- } catch (std::exception &e) {
- HOTSTUFF_LOG_INFO("exception: %s", e.what());
- elapsed.stop(true);
+ papp = new HotStuffApp(opt_blk_size->get(),
+ opt_stat_period->get(),
+ opt_imp_timeout->get(),
+ idx,
+ hotstuff::from_hex(opt_privkey->get()),
+ plisten_addr,
+ NetAddr("0.0.0.0", client_port),
+ std::move(pmaker),
+ ec,
+ opt_nworker->get());
+ for (size_t i = 0; i < replicas.size(); i++)
+ {
+ auto p = split_ip_port_cport(replicas[i].first);
+ papp->add_replica(i, NetAddr(p.first),
+ hotstuff::from_hex(replicas[i].second));
}
-#endif
+ auto shutdown = [&](int) { ec.stop(); };
+ salticidae::SigEvent ev_sigint(ec, shutdown);
+ salticidae::SigEvent ev_sigterm(ec, shutdown);
+ ev_sigint.add(SIGINT);
+ ev_sigterm.add(SIGTERM);
+
+ papp->start();
+ elapsed.stop(true);
return 0;
}
@@ -243,15 +234,16 @@ HotStuffApp::HotStuffApp(uint32_t blk_size,
stat_period(stat_period),
impeach_timeout(impeach_timeout),
ec(ec),
- cn(ec),
+ cn(ec, ClientNetwork<opcode_t>::Config()),
clisten_addr(clisten_addr) {
/* register the handlers for msg from clients */
cn.reg_handler(salticidae::generic_bind(&HotStuffApp::client_request_cmd_handler, this, _1, _2));
+ cn.start();
cn.listen(clisten_addr);
}
-void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, Conn &conn) {
- const NetAddr addr = conn.get_addr();
+void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, const conn_t &conn) {
+ const NetAddr addr = conn->get_addr();
auto cmd = parse_cmd(msg.serialized);
const auto &cmd_hash = cmd->get_hash();
std::vector<promise_t> pms;
@@ -275,17 +267,17 @@ void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, Conn &conn) {
}
void HotStuffApp::start() {
- ev_stat_timer = Event(ec, -1, 0, [this](int, short) {
+ ev_stat_timer = TimerEvent(ec, [this](TimerEvent &) {
HotStuff::print_stat();
//HotStuffCore::prune(100);
- ev_stat_timer.add_with_timeout(stat_period);
+ ev_stat_timer.add(stat_period);
});
- ev_stat_timer.add_with_timeout(stat_period);
- impeach_timer = Event(ec, -1, 0, [this](int, short) {
+ ev_stat_timer.add(stat_period);
+ impeach_timer = TimerEvent(ec, [this](TimerEvent &) {
get_pace_maker().impeach();
reset_imp_timer();
});
- impeach_timer.add_with_timeout(impeach_timeout);
+ impeach_timer.add(impeach_timeout);
HOTSTUFF_LOG_INFO("** starting the system with parameters **");
HOTSTUFF_LOG_INFO("blk_size = %lu", blk_size);
HOTSTUFF_LOG_INFO("conns = %lu", HotStuff::size());
diff --git a/src/hotstuff_client.cpp b/src/hotstuff_client.cpp
index d8a6087..e8d7b9e 100644
--- a/src/hotstuff_client.cpp
+++ b/src/hotstuff_client.cpp
@@ -1,6 +1,8 @@
#include <cassert>
#include <random>
#include <signal.h>
+#include <sys/time.h>
+
#include "salticidae/type.h"
#include "salticidae/netaddr.h"
#include "salticidae/network.h"
@@ -11,7 +13,6 @@
#include "hotstuff/client.h"
using salticidae::Config;
-using salticidae::MsgNetwork;
using hotstuff::ReplicaID;
using hotstuff::NetAddr;
@@ -25,7 +26,7 @@ using hotstuff::uint256_t;
using hotstuff::opcode_t;
using hotstuff::command_t;
-EventContext eb;
+EventContext ec;
ReplicaID proposer;
size_t max_async_num;
int max_iter_num;
@@ -42,11 +43,13 @@ struct Request {
rid(rid), cmd(cmd), confirmed(0) { et.start(); }
};
-std::unordered_map<ReplicaID, MsgNetwork<opcode_t>::conn_t> conns;
+using Net = salticidae::MsgNetwork<opcode_t>;
+
+std::unordered_map<ReplicaID, Net::conn_t> conns;
std::unordered_map<const uint256_t, Request> waiting;
std::vector<NetAddr> replicas;
std::vector<std::pair<struct timeval, double>> elapsed;
-MsgNetwork<opcode_t> mn(eb, 10, 10, 4096);
+Net mn(ec, Net::Config());
void connect_all() {
for (size_t i = 0; i < replicas.size(); i++)
@@ -66,7 +69,7 @@ void try_send() {
auto cmd = new CommandDummy(cid, cnt++);
//mn.send_msg(MsgReqCmd(*cmd), *conns.at(proposer));
MsgReqCmd msg(*cmd);
- for (auto &p: conns) mn.send_msg(msg, *(p.second));
+ for (auto &p: conns) mn.send_msg(msg, p.second);
#ifndef HOTSTUFF_ENABLE_BENCHMARK
HOTSTUFF_LOG_INFO("send new cmd %.10s",
get_hex(cmd->get_hash()).c_str());
@@ -78,7 +81,7 @@ void try_send() {
}
}
-void client_resp_cmd_handler(MsgRespCmd &&msg, MsgNetwork<opcode_t>::Conn &) {
+void client_resp_cmd_handler(MsgRespCmd &&msg, const Net::conn_t &) {
auto &fin = msg.fin;
HOTSTUFF_LOG_DEBUG("got %s", std::string(msg.fin).c_str());
const uint256_t &cmd_hash = fin.cmd_hash;
@@ -122,70 +125,67 @@ std::pair<std::string, std::string> split_ip_port_cport(const std::string &s) {
return std::make_pair(ret[0], ret[1]);
}
-void signal_handler(int) {
- throw HotStuffError("got terminal signal");
-}
-
int main(int argc, char **argv) {
Config config("hotstuff.conf");
- signal(SIGTERM, signal_handler);
- signal(SIGINT, signal_handler);
-
auto opt_idx = Config::OptValInt::create(0);
auto opt_replicas = Config::OptValStrVec::create();
auto opt_max_iter_num = Config::OptValInt::create(100);
auto opt_max_async_num = Config::OptValInt::create(10);
auto opt_cid = Config::OptValInt::create(-1);
+ auto shutdown = [&](int) { ec.stop(); };
+ salticidae::SigEvent ev_sigint(ec, shutdown);
+ salticidae::SigEvent ev_sigterm(ec, shutdown);
+ ev_sigint.add(SIGINT);
+ ev_sigterm.add(SIGTERM);
+
mn.reg_handler(client_resp_cmd_handler);
+ mn.start();
+
+ config.add_opt("idx", opt_idx, Config::SET_VAL);
+ config.add_opt("cid", opt_cid, Config::SET_VAL);
+ config.add_opt("replica", opt_replicas, Config::APPEND);
+ config.add_opt("iter", opt_max_iter_num, Config::SET_VAL);
+ config.add_opt("max-async", opt_max_async_num, Config::SET_VAL);
+ config.parse(argc, argv);
+ auto idx = opt_idx->get();
+ max_iter_num = opt_max_iter_num->get();
+ max_async_num = opt_max_async_num->get();
+ std::vector<std::pair<std::string, std::string>> raw;
+ for (const auto &s: opt_replicas->get())
+ {
+ auto res = salticidae::trim_all(salticidae::split(s, ","));
+ if (res.size() != 2)
+ throw HotStuffError("format error");
+ raw.push_back(std::make_pair(res[0], res[1]));
+ }
+
+ if (!(0 <= idx && (size_t)idx < raw.size() && raw.size() > 0))
+ throw std::invalid_argument("out of range");
+ cid = opt_cid->get() != -1 ? opt_cid->get() : idx;
+ for (const auto &p: raw)
+ {
+ auto _p = split_ip_port_cport(p.first);
+ size_t _;
+ replicas.push_back(NetAddr(NetAddr(_p.first).ip, htons(stoi(_p.second, &_))));
+ }
+
+ nfaulty = (replicas.size() - 1) / 3;
+ HOTSTUFF_LOG_INFO("nfaulty = %zu", nfaulty);
+ connect_all();
+ set_proposer(idx);
+ try_send();
+ ec.dispatch();
- try {
- config.add_opt("idx", opt_idx, Config::SET_VAL);
- config.add_opt("cid", opt_cid, Config::SET_VAL);
- config.add_opt("replica", opt_replicas, Config::APPEND);
- config.add_opt("iter", opt_max_iter_num, Config::SET_VAL);
- config.add_opt("max-async", opt_max_async_num, Config::SET_VAL);
- config.parse(argc, argv);
- auto idx = opt_idx->get();
- max_iter_num = opt_max_iter_num->get();
- max_async_num = opt_max_async_num->get();
- std::vector<std::pair<std::string, std::string>> raw;
- for (const auto &s: opt_replicas->get())
- {
- auto res = salticidae::trim_all(salticidae::split(s, ","));
- if (res.size() != 2)
- throw HotStuffError("format error");
- raw.push_back(std::make_pair(res[0], res[1]));
- }
-
- if (!(0 <= idx && (size_t)idx < raw.size() && raw.size() > 0))
- throw std::invalid_argument("out of range");
- cid = opt_cid->get() != -1 ? opt_cid->get() : idx;
- for (const auto &p: raw)
- {
- auto _p = split_ip_port_cport(p.first);
- size_t _;
- replicas.push_back(NetAddr(NetAddr(_p.first).ip, htons(stoi(_p.second, &_))));
- }
-
- nfaulty = (replicas.size() - 1) / 3;
- HOTSTUFF_LOG_INFO("nfaulty = %zu", nfaulty);
- connect_all();
- set_proposer(idx);
- try_send();
- eb.dispatch();
- } catch (HotStuffError &e) {
- HOTSTUFF_LOG_ERROR("exception: %s", std::string(e).c_str());
#ifdef HOTSTUFF_ENABLE_BENCHMARK
- for (const auto &e: elapsed)
- {
- char fmt[64];
- struct tm *tmp = localtime(&e.first.tv_sec);
- strftime(fmt, sizeof fmt, "%Y-%m-%d %H:%M:%S.%%06u [hotstuff info] %%.6f\n", tmp);
- fprintf(stderr, fmt, e.first.tv_usec, e.second);
- }
-#endif
+ for (const auto &e: elapsed)
+ {
+ char fmt[64];
+ struct tm *tmp = localtime(&e.first.tv_sec);
+ strftime(fmt, sizeof fmt, "%Y-%m-%d %H:%M:%S.%%06u [hotstuff info] %%.6f\n", tmp);
+ fprintf(stderr, fmt, e.first.tv_usec, e.second);
}
+#endif
return 0;
}
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index e4c7a1b..f8796d0 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -4,6 +4,3 @@ include_directories(../src/
add_executable(test_secp256k1 test_secp256k1.cpp)
target_link_libraries(test_secp256k1 hotstuff_static)
-
-add_executable(test_concurrent_queue test_concurrent_queue.cpp)
-target_link_libraries(test_concurrent_queue salticidae_static pthread)
diff --git a/test/test_concurrent_queue.cpp b/test/test_concurrent_queue.cpp
deleted file mode 100644
index 7412213..0000000
--- a/test/test_concurrent_queue.cpp
+++ /dev/null
@@ -1,68 +0,0 @@
-#include "salticidae/event.h"
-#include "concurrentqueue/blockingconcurrentqueue.h"
-#include <thread>
-#include <unistd.h>
-
-class VeriPool {
- int fin_fd[2];
- moodycamel::BlockingConcurrentQueue<int> in_queue;
- moodycamel::BlockingConcurrentQueue<int> out_queue;
- std::thread notifier;
- std::vector<std::thread> workers;
- public:
- VeriPool(size_t nworker) {
- pipe(fin_fd);
- // finish notifier thread
- notifier = std::thread([this]() {
- while (true)
- {
- int item;
- out_queue.wait_dequeue(item);
- write(fin_fd[1], &item, sizeof(item));
- }
- });
- for (size_t i = 0; i < nworker; i++)
- {
- workers.push_back(std::thread([this]() {
- while (true)
- {
- int item;
- in_queue.wait_dequeue(item);
- fprintf(stderr, "%lu working on %d\n", std::this_thread::get_id(), item);
- out_queue.enqueue(item * 1000);
- }
- }));
- }
- }
-
- ~VeriPool() {
- notifier.detach();
- for (auto &w: workers) w.detach();
- close(fin_fd[0]);
- close(fin_fd[1]);
- }
-
- void submit(int item) {
- in_queue.enqueue(item);
- }
-
- int get_fd() {
- return fin_fd[0];
- }
-};
-
-int main() {
- VeriPool p(2);
- salticidae::EventContext ec;
- salticidae::Event ev;
- ev = salticidae::Event(ec, p.get_fd(), EV_READ, [&ev](int fd, short) {
- int item;
- read(fd, &item, sizeof(item));
- printf("finished %d\n", item);
- ev.add();
- });
- for (int i = 0; i < 10000; i++)
- p.submit(i);
- ev.add();
- ec.dispatch();
-}