aboutsummaryrefslogtreecommitdiff
path: root/src/hotstuff_client.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/hotstuff_client.cpp')
-rw-r--r--src/hotstuff_client.cpp123
1 files changed, 85 insertions, 38 deletions
diff --git a/src/hotstuff_client.cpp b/src/hotstuff_client.cpp
index 8ee90d7..9478d36 100644
--- a/src/hotstuff_client.cpp
+++ b/src/hotstuff_client.cpp
@@ -13,6 +13,7 @@ using salticidae::ElapsedTime;
using salticidae::trim_all;
using salticidae::split;
+using hotstuff::ReplicaID;
using hotstuff::NetAddr;
using hotstuff::EventContext;
using hotstuff::Event;
@@ -20,18 +21,13 @@ using hotstuff::uint256_t;
using hotstuff::bytearray_t;
using hotstuff::MsgClient;
using hotstuff::CommandDummy;
+using hotstuff::command_t;
using hotstuff::Finality;
+EventContext eb;
size_t max_async_num = 10;
int max_iter_num = 100;
-
-struct Request {
- ElapsedTime et;
- Request() { et.start(); }
-};
-
-std::unordered_map<int, salticidae::RingBuffer> buffers;
-std::unordered_map<const uint256_t, Request> waiting;
+ReplicaID proposer;
int connect(const NetAddr &node) {
int fd;
@@ -48,6 +44,54 @@ int connect(const NetAddr &node) {
return fd;
}
+void on_receive(int);
+
+struct Request {
+ ReplicaID rid;
+ command_t cmd;
+ ElapsedTime et;
+ Request(ReplicaID rid, const command_t &cmd):
+ rid(rid), cmd(cmd) { et.start(); }
+};
+
+struct Conn {
+ int fd;
+ Event on_receive_ev;
+
+ Conn(const NetAddr &addr):
+ fd(connect(addr)),
+ on_receive_ev(eb, fd, EV_READ, [this](int fd, short) {
+ on_receive(fd);
+ on_receive_ev.add();
+ }) { on_receive_ev.add(); }
+
+ Conn(Conn &&other):
+ fd(other.fd),
+ on_receive_ev(eb, fd, EV_READ, [this](int fd, short) {
+ on_receive(fd);
+ on_receive_ev.add();
+ }) {
+ other.fd = -1;
+ other.on_receive_ev.del();
+ on_receive_ev.add();
+ }
+
+ ~Conn() { if (fd != -1) close(fd); }
+};
+
+std::unordered_map<int, salticidae::RingBuffer> buffers;
+std::unordered_map<const uint256_t, Request> waiting;
+std::unordered_map<ReplicaID, Conn> conns;
+std::vector<NetAddr> replicas;
+
+
+void setup(ReplicaID rid) {
+ proposer = rid;
+ auto it = conns.find(rid);
+ if (it == conns.end())
+ conns.insert(std::make_pair(rid, Conn(replicas[rid])));
+}
+
void write_msg(int fd, const MsgClient &msg) {
bytearray_t msg_data = msg.serialize();
if (write(fd, msg_data.data(), msg_data.size()) != (ssize_t)msg_data.size())
@@ -86,17 +130,17 @@ void read_msg(int fd, MsgClient &msg) {
}
}
-void try_send(int fd) {
+void try_send() {
while (waiting.size() < max_async_num && max_iter_num)
{
auto cmd = CommandDummy::make_cmd();
MsgClient msg;
msg.gen_reqcmd(*cmd);
- write_msg(fd, msg);
+ write_msg(conns.find(proposer)->second.fd, msg);
HOTSTUFF_LOG_INFO("send new cmd %.10s",
get_hex(cmd->get_hash()).c_str());
waiting.insert(std::make_pair(
- cmd->get_hash(), Request()));
+ cmd->get_hash(), Request(proposer, cmd)));
if (max_iter_num > 0)
max_iter_num--;
}
@@ -111,15 +155,31 @@ void on_receive(int fd) {
if (!msg.verify_checksum())
HOTSTUFF_LOG_ERROR("incorrect checksum %08x", msg.get_checksum());
msg.parse_respcmd(cmd_hash, fin);
+ auto it = waiting.find(cmd_hash);
+ if (fin.rid != proposer)
+ {
+ HOTSTUFF_LOG_INFO("reconnect to the new proposer");
+ setup(fin.rid);
+ }
+ if (fin.rid != it->second.rid)
+ {
+ MsgClient msg;
+ msg.gen_reqcmd(*(waiting.find(cmd_hash)->second.cmd));
+ write_msg(conns.find(proposer)->second.fd, msg);
+ HOTSTUFF_LOG_INFO("resend cmd %.10s",
+ get_hex(cmd_hash).c_str());
+ it->second.et.start();
+ it->second.rid = proposer;
+ return;
+ }
HOTSTUFF_LOG_INFO(
"fd %d got response for %.10s: <decision=%d, blk=%.10s>",
fd, get_hex(cmd_hash).c_str(),
fin.decision,
get_hex(fin.blk_hash).c_str());
- auto it = waiting.find(cmd_hash);
if (it == waiting.end()) return;
waiting.erase(it);
- try_send(fd);
+ try_send();
}
std::pair<std::string, std::string> split_ip_port_cport(const std::string &s) {
@@ -127,53 +187,40 @@ std::pair<std::string, std::string> split_ip_port_cport(const std::string &s) {
return std::make_pair(ret[0], ret[1]);
}
-Event *on_receive_ev;
int main(int argc, char **argv) {
Config config("hotstuff.conf");
- std::vector<NetAddr> peers2;
- EventContext eb;
- auto opt_idx = Config::OptValInt::create(-1);
- auto opt_server_addr = Config::OptValStr::create("127.0.0.1:2234");
+ auto opt_idx = Config::OptValInt::create(0);
auto opt_replicas = Config::OptValStrVec::create();
auto opt_max_iter_num = Config::OptValInt::create();
try {
config.add_opt("idx", opt_idx, Config::SET_VAL);
- config.add_opt("server", opt_server_addr, Config::SET_VAL);
config.add_opt("replica", opt_replicas, Config::APPEND);
config.add_opt("ntx", opt_max_iter_num, Config::SET_VAL);
config.parse(argc, argv);
auto idx = opt_idx->get();
max_iter_num = opt_max_iter_num->get();
- std::vector<std::pair<std::string, std::string>> replicas;
+ std::vector<std::pair<std::string, std::string>> raw;
for (const auto &s: opt_replicas->get())
{
auto res = trim_all(split(s, ","));
assert(res.size() == 2);
- replicas.push_back(std::make_pair(res[0], res[1]));
+ raw.push_back(std::make_pair(res[0], res[1]));
}
- NetAddr server(opt_server_addr->get());
- if (-1 < idx && (size_t)idx < replicas.size() &&
- replicas.size() > 0)
+ if (!(0 <= idx && (size_t)idx < raw.size() &&
+ raw.size() > 0))
+ throw std::invalid_argument("out of range");
+ for (const auto &p: raw)
{
- for (const auto &p: replicas)
- {
- auto _p = split_ip_port_cport(p.first);
- size_t _;
- peers2.push_back(NetAddr(NetAddr(_p.first).ip, htons(stoi(_p.second, &_))));
- }
- server = peers2[idx];
+ auto _p = split_ip_port_cport(p.first);
+ size_t _;
+ replicas.push_back(NetAddr(NetAddr(_p.first).ip, htons(stoi(_p.second, &_))));
}
- int fd = connect(server);
- on_receive_ev = new Event{eb, fd, EV_READ, [](int fd, short) {
- on_receive(fd);
- on_receive_ev->add();
- }};
- on_receive_ev->add();
- try_send(fd);
+ setup(idx);
+ try_send();
eb.dispatch();
} catch (hotstuff::HotStuffError &e) {
HOTSTUFF_LOG_ERROR("exception: %s", std::string(e).c_str());