aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDeterminant <tederminant@gmail.com>2018-07-18 20:15:28 -0400
committerDeterminant <tederminant@gmail.com>2018-07-18 20:15:28 -0400
commit70ab6576db5e49f7b2a38ea955e75328a6376812 (patch)
treef809f33c8fca2ce09c99cbb8af158f3819f34490 /src
parent960e06035636693b902d8523f1e50cafa1d62233 (diff)
improve network impl
Diffstat (limited to 'src')
-rw-r--r--src/consensus.cpp67
-rw-r--r--src/hotstuff.cpp2
-rw-r--r--src/hotstuff_app.cpp2
-rw-r--r--src/hotstuff_client.cpp131
4 files changed, 63 insertions, 139 deletions
diff --git a/src/consensus.cpp b/src/consensus.cpp
index 688c450..bf4868d 100644
--- a/src/consensus.cpp
+++ b/src/consensus.cpp
@@ -31,7 +31,7 @@ void HotStuffCore::sanity_check_delivered(const block_t &blk) {
throw std::runtime_error("block not delivered");
}
-block_t HotStuffCore::sanity_check_delivered(const uint256_t &blk_hash) {
+block_t HotStuffCore::get_delivered_blk(const uint256_t &blk_hash) {
block_t blk = storage->find_blk(blk_hash);
if (blk == nullptr || !blk->delivered)
throw std::runtime_error("block not delivered");
@@ -46,10 +46,7 @@ bool HotStuffCore::on_deliver_blk(const block_t &blk) {
}
blk->parents.clear();
for (const auto &hash: blk->parent_hashes)
- {
- block_t p = sanity_check_delivered(hash);
- blk->parents.push_back(p);
- }
+ blk->parents.push_back(get_delivered_blk(hash));
blk->height = blk->parents[0]->height + 1;
for (const auto &cmd: blk->cmds)
cmd->container = blk;
@@ -66,41 +63,41 @@ bool HotStuffCore::on_deliver_blk(const block_t &blk) {
tails.insert(blk);
blk->delivered = true;
- LOG_DEBUG("delivered %.10s", get_hex(blk->get_hash()).c_str());
+ LOG_DEBUG("deliver %s", std::string(*blk).c_str());
return true;
}
void HotStuffCore::check_commit(const block_t &_blk) {
const block_t &blk = _blk->qc_ref;
if (blk->qc_ref == nullptr) return;
+ /* decided blk could possible be incomplete due to pruning */
if (blk->decision) return;
block_t p = blk->parents[0];
- if (p == blk->qc_ref)
- { /* commit */
- std::vector<block_t> commit_queue;
- block_t b;
- for (b = p; b->height > bexec->height; b = b->parents[0])
- { /* todo: also commit the uncles/aunts */
- commit_queue.push_back(b);
- }
- if (b != bexec)
- throw std::runtime_error("safety breached :(");
- for (auto it = commit_queue.rbegin(); it != commit_queue.rend(); it++)
- {
- const block_t &blk = *it;
- blk->decision = 1;
+ /* commit requires direct parent */
+ if (p != blk->qc_ref) return;
+ /* otherwise commit */
+ std::vector<block_t> commit_queue;
+ block_t b;
+ for (b = p; b->height > bexec->height; b = b->parents[0])
+ { /* TODO: also commit the uncles/aunts */
+ commit_queue.push_back(b);
+ }
+ if (b != bexec)
+ throw std::runtime_error("safety breached :(");
+ for (auto it = commit_queue.rbegin(); it != commit_queue.rend(); it++)
+ {
+ const block_t &blk = *it;
+ blk->decision = 1;
#ifdef HOTSTUFF_ENABLE_LOG_PROTO
- LOG_INFO("commit blk %.10s", get_hex10(blk->get_hash()).c_str());
+ LOG_INFO("commit %s", std::string(*blk).c_str());
#endif
- for (auto cmd: blk->cmds)
- do_decide(cmd);
- }
- bexec = p;
+ for (auto cmd: blk->cmds) do_decide(cmd);
}
+ bexec = p;
}
bool HotStuffCore::update(const uint256_t &bqc_hash) {
- block_t _bqc = sanity_check_delivered(bqc_hash);
+ block_t _bqc = get_delivered_blk(bqc_hash);
if (_bqc->qc_ref == nullptr) return false;
check_commit(_bqc);
if (_bqc->qc_ref->height > bqc->qc_ref->height)
@@ -116,13 +113,14 @@ void HotStuffCore::on_propose(const std::vector<command_t> &cmds,
block_t p = parents[0];
quorum_cert_bt qc = nullptr;
block_t qc_ref = nullptr;
+ /* a block can optionally carray a QC */
if (p != b0 && p->voted.size() >= config.nmajority)
{
qc = p->self_qc->clone();
qc->compute();
qc_ref = p;
}
- /* create a new block */
+ /* create the new block */
block_t bnew = storage->add_blk(
Block(
parents,
@@ -163,8 +161,11 @@ void HotStuffCore::on_receive_proposal(const Proposal &prop) {
for (b = bnew;
b->height > pref->height;
b = b->parents[0]);
- opinion = b == pref;
- vheight = bnew->height;
+ if (b == pref) /* on the same branch */
+ {
+ opinion = true;
+ vheight = bnew->height;
+ }
}
#ifdef HOTSTUFF_ENABLE_LOG_PROTO
LOG_INFO("now state: %s", std::string(*this).c_str());
@@ -185,17 +186,17 @@ void HotStuffCore::on_receive_vote(const Vote &vote) {
LOG_INFO("got %s", std::string(vote).c_str());
LOG_INFO("now state: %s", std::string(*this).c_str());
#endif
-
- block_t blk = sanity_check_delivered(vote.blk_hash);
+ block_t blk = get_delivered_blk(vote.blk_hash);
if (vote.cert == nullptr) return;
+ /* otherwise the vote is positive */
if (!vote.verify())
{
- LOG_WARN("invalid vote");
+ LOG_WARN("invalid vote from %d", vote.voter);
return;
}
if (!blk->voted.insert(vote.voter).second)
{
- LOG_WARN("duplicate votes");
+ LOG_WARN("duplicate vote from %d", vote.voter);
return;
}
size_t qsize = blk->voted.size();
diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp
index d0b42c3..ed15cc1 100644
--- a/src/hotstuff.cpp
+++ b/src/hotstuff.cpp
@@ -411,7 +411,7 @@ HotStuffBase::HotStuffBase(uint32_t blk_size,
pn.reg_handler(VOTE, std::bind(&HotStuffBase::vote_handler, this, _1, _2));
pn.reg_handler(QUERY_FETCH_BLK, std::bind(&HotStuffBase::query_fetch_blk_handler, this, _1, _2));
pn.reg_handler(RESP_FETCH_BLK, std::bind(&HotStuffBase::resp_fetch_blk_handler, this, _1, _2));
- pn.init(listen_addr);
+ pn.listen(listen_addr);
}
void HotStuffBase::do_broadcast_proposal(const Proposal &prop) {
diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp
index e1eec1b..28f02fc 100644
--- a/src/hotstuff_app.cpp
+++ b/src/hotstuff_app.cpp
@@ -221,7 +221,7 @@ HotStuffApp::HotStuffApp(uint32_t blk_size,
/* register the handlers for msg from clients */
cn.reg_handler(hotstuff::REQ_CMD, std::bind(&HotStuffApp::client_request_cmd_handler, this, _1, _2));
cn.reg_handler(hotstuff::CHK_CMD, std::bind(&HotStuffApp::client_check_cmd_handler, this, _1, _2));
- cn.init(clisten_addr);
+ cn.listen(clisten_addr);
}
void HotStuffApp::client_request_cmd_handler(const MsgClient &msg, conn_client_t conn_) {
diff --git a/src/hotstuff_client.cpp b/src/hotstuff_client.cpp
index 9478d36..1363f39 100644
--- a/src/hotstuff_client.cpp
+++ b/src/hotstuff_client.cpp
@@ -12,39 +12,22 @@ using salticidae::Config;
using salticidae::ElapsedTime;
using salticidae::trim_all;
using salticidae::split;
+using salticidae::MsgNetwork;
using hotstuff::ReplicaID;
using hotstuff::NetAddr;
using hotstuff::EventContext;
-using hotstuff::Event;
using hotstuff::uint256_t;
-using hotstuff::bytearray_t;
using hotstuff::MsgClient;
using hotstuff::CommandDummy;
using hotstuff::command_t;
using hotstuff::Finality;
+using hotstuff::HotStuffError;
EventContext eb;
-size_t max_async_num = 10;
-int max_iter_num = 100;
ReplicaID proposer;
-
-int connect(const NetAddr &node) {
- int fd;
- if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1)
- assert(0);
- struct sockaddr_in sockin;
- memset(&sockin, 0, sizeof(struct sockaddr_in));
- sockin.sin_family = AF_INET;
- sockin.sin_addr.s_addr = node.ip;
- sockin.sin_port = node.port;
- if (connect(fd, (struct sockaddr *)&sockin, sizeof(struct sockaddr_in)) == -1)
- assert(0);
- HOTSTUFF_LOG_INFO("connected to %s", std::string(node).c_str());
- return fd;
-}
-
-void on_receive(int);
+size_t max_async_num;
+int max_iter_num;
struct Request {
ReplicaID rid;
@@ -54,80 +37,18 @@ struct Request {
rid(rid), cmd(cmd) { et.start(); }
};
-struct Conn {
- int fd;
- Event on_receive_ev;
-
- Conn(const NetAddr &addr):
- fd(connect(addr)),
- on_receive_ev(eb, fd, EV_READ, [this](int fd, short) {
- on_receive(fd);
- on_receive_ev.add();
- }) { on_receive_ev.add(); }
-
- Conn(Conn &&other):
- fd(other.fd),
- on_receive_ev(eb, fd, EV_READ, [this](int fd, short) {
- on_receive(fd);
- on_receive_ev.add();
- }) {
- other.fd = -1;
- other.on_receive_ev.del();
- on_receive_ev.add();
- }
-
- ~Conn() { if (fd != -1) close(fd); }
-};
-
std::unordered_map<int, salticidae::RingBuffer> buffers;
std::unordered_map<const uint256_t, Request> waiting;
-std::unordered_map<ReplicaID, Conn> conns;
-std::vector<NetAddr> replicas;
+MsgNetwork<MsgClient> mn(eb, 10, 0, 2, 4096);
+std::unordered_map<ReplicaID, MsgNetwork<MsgClient>::conn_t> conns;
+std::vector<NetAddr> replicas;
-void setup(ReplicaID rid) {
+void set_proposer(ReplicaID rid) {
proposer = rid;
auto it = conns.find(rid);
if (it == conns.end())
- conns.insert(std::make_pair(rid, Conn(replicas[rid])));
-}
-
-void write_msg(int fd, const MsgClient &msg) {
- bytearray_t msg_data = msg.serialize();
- if (write(fd, msg_data.data(), msg_data.size()) != (ssize_t)msg_data.size())
- assert(0);
-}
-
-void read_msg(int fd, MsgClient &msg) {
- static const size_t BUFF_SEG_SIZE = 1024;
- ssize_t ret;
- auto &buffer = buffers[fd];
- bool read_body = false;
- for (;;)
- {
- bytearray_t buff_seg;
- if (!read_body && buffer.size() >= MsgClient::header_size)
- {
- buff_seg = buffer.pop(MsgClient::header_size);
- msg = MsgClient(buff_seg.data());
- read_body = true;
- }
- if (read_body && buffer.size() >= msg.get_length())
- {
- buff_seg = buffer.pop(msg.get_length());
- msg.set_payload(std::move(buff_seg));
- return;
- }
-
- buff_seg.resize(BUFF_SEG_SIZE);
- ret = read(fd, buff_seg.data(), BUFF_SEG_SIZE);
- assert(ret != -1);
- if (ret > 0)
- {
- buff_seg.resize(ret);
- buffer.push(std::move(buff_seg));
- }
- }
+ conns.insert(std::make_pair(rid, mn.create_conn(replicas[rid])));
}
void try_send() {
@@ -136,7 +57,7 @@ void try_send() {
auto cmd = CommandDummy::make_cmd();
MsgClient msg;
msg.gen_reqcmd(*cmd);
- write_msg(conns.find(proposer)->second.fd, msg);
+ mn.send_msg(msg, conns.find(proposer)->second);
HOTSTUFF_LOG_INFO("send new cmd %.10s",
get_hex(cmd->get_hash()).c_str());
waiting.insert(std::make_pair(
@@ -146,11 +67,9 @@ void try_send() {
}
}
-void on_receive(int fd) {
- MsgClient msg;
+void on_receive(const MsgClient &msg, MsgNetwork<MsgClient>::conn_t) {
uint256_t cmd_hash;
Finality fin;
- read_msg(fd, msg);
HOTSTUFF_LOG_DEBUG("got %s", std::string(msg).c_str());
if (!msg.verify_checksum())
HOTSTUFF_LOG_ERROR("incorrect checksum %08x", msg.get_checksum());
@@ -159,13 +78,13 @@ void on_receive(int fd) {
if (fin.rid != proposer)
{
HOTSTUFF_LOG_INFO("reconnect to the new proposer");
- setup(fin.rid);
+ set_proposer(fin.rid);
}
if (fin.rid != it->second.rid)
{
MsgClient msg;
msg.gen_reqcmd(*(waiting.find(cmd_hash)->second.cmd));
- write_msg(conns.find(proposer)->second.fd, msg);
+ mn.send_msg(msg, conns.find(proposer)->second);
HOTSTUFF_LOG_INFO("resend cmd %.10s",
get_hex(cmd_hash).c_str());
it->second.et.start();
@@ -173,8 +92,8 @@ void on_receive(int fd) {
return;
}
HOTSTUFF_LOG_INFO(
- "fd %d got response for %.10s: <decision=%d, blk=%.10s>",
- fd, get_hex(cmd_hash).c_str(),
+ "got response for %.10s: <decision=%d, blk=%.10s>",
+ get_hex(cmd_hash).c_str(),
fin.decision,
get_hex(fin.blk_hash).c_str());
if (it == waiting.end()) return;
@@ -187,30 +106,34 @@ std::pair<std::string, std::string> split_ip_port_cport(const std::string &s) {
return std::make_pair(ret[0], ret[1]);
}
-
int main(int argc, char **argv) {
Config config("hotstuff.conf");
auto opt_idx = Config::OptValInt::create(0);
auto opt_replicas = Config::OptValStrVec::create();
- auto opt_max_iter_num = Config::OptValInt::create();
+ auto opt_max_iter_num = Config::OptValInt::create(100);
+ auto opt_max_async_num = Config::OptValInt::create(10);
+
+ mn.reg_handler(hotstuff::RESP_CMD, on_receive);
try {
config.add_opt("idx", opt_idx, Config::SET_VAL);
config.add_opt("replica", opt_replicas, Config::APPEND);
- config.add_opt("ntx", opt_max_iter_num, Config::SET_VAL);
+ config.add_opt("iter", opt_max_iter_num, Config::SET_VAL);
+ config.add_opt("max-async", opt_max_async_num, Config::SET_VAL);
config.parse(argc, argv);
auto idx = opt_idx->get();
max_iter_num = opt_max_iter_num->get();
+ max_async_num = opt_max_async_num->get();
std::vector<std::pair<std::string, std::string>> raw;
for (const auto &s: opt_replicas->get())
{
auto res = trim_all(split(s, ","));
- assert(res.size() == 2);
+ if (res.size() != 2)
+ throw HotStuffError("format error");
raw.push_back(std::make_pair(res[0], res[1]));
}
- if (!(0 <= idx && (size_t)idx < raw.size() &&
- raw.size() > 0))
+ if (!(0 <= idx && (size_t)idx < raw.size() && raw.size() > 0))
throw std::invalid_argument("out of range");
for (const auto &p: raw)
{
@@ -219,10 +142,10 @@ int main(int argc, char **argv) {
replicas.push_back(NetAddr(NetAddr(_p.first).ip, htons(stoi(_p.second, &_))));
}
- setup(idx);
+ set_proposer(idx);
try_send();
eb.dispatch();
- } catch (hotstuff::HotStuffError &e) {
+ } catch (HotStuffError &e) {
HOTSTUFF_LOG_ERROR("exception: %s", std::string(e).c_str());
}
return 0;