aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/hotstuff_app.cpp58
1 files changed, 41 insertions, 17 deletions
diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp
index bc596f4..49e06a3 100644
--- a/src/hotstuff_app.cpp
+++ b/src/hotstuff_app.cpp
@@ -78,11 +78,16 @@ class HotStuffApp: public HotStuff {
/** The listen address for client RPC */
NetAddr clisten_addr;
-//#if HOTSTUFF_CMD_RESPSIZE > 0
std::unordered_map<const uint256_t, promise_t> unconfirmed;
-//#endif
using conn_t = ClientNetwork<opcode_t>::conn_t;
+ using resp_queue_t = salticidae::MPSCQueueEventDriven<Finality>;
+
+ /* for the dedicated thread sending responses to the clients */
+ EventContext resp_ec;
+ std::thread resp_thread;
+ resp_queue_t resp_queue;
+ salticidae::BoxObj<salticidae::ThreadCall> resp_tcall;
void client_request_cmd_handler(MsgReqCmd &&, const conn_t &);
@@ -102,14 +107,7 @@ class HotStuffApp: public HotStuff {
#ifndef HOTSTUFF_ENABLE_BENCHMARK
HOTSTUFF_LOG_INFO("replicated %s", std::string(fin).c_str());
#endif
-//#if HOTSTUFF_CMD_RESPSIZE > 0
- auto it = unconfirmed.find(fin.cmd_hash);
- if (it != unconfirmed.end())
- {
- it->second.resolve(fin);
- unconfirmed.erase(it);
- }
-//#endif
+ resp_queue.enqueue(fin);
}
//#ifdef HOTSTUFF_AUTOCLI
@@ -142,6 +140,7 @@ class HotStuffApp: public HotStuff {
const ClientNetwork<opcode_t>::Config &clinet_config);
void start(const std::vector<std::pair<NetAddr, bytearray_t>> &reps);
+ void stop();
};
std::pair<std::string, std::string> split_ip_port_cport(const std::string &s) {
@@ -265,7 +264,7 @@ int main(int argc, char **argv) {
reps.push_back(std::make_pair(
NetAddr(p.first), hotstuff::from_hex(r.second)));
}
- auto shutdown = [&](int) { ec.stop(); };
+ auto shutdown = [&](int) { papp->stop(); };
salticidae::SigEvent ev_sigint(ec, shutdown);
salticidae::SigEvent ev_sigterm(ec, shutdown);
ev_sigint.add(SIGINT);
@@ -295,6 +294,22 @@ HotStuffApp::HotStuffApp(uint32_t blk_size,
ec(ec),
cn(ec, clinet_config),
clisten_addr(clisten_addr) {
+ /* prepare the thread used for sending back confirmations */
+ resp_tcall = new salticidae::ThreadCall(resp_ec);
+ resp_queue.reg_handler(resp_ec, [this](resp_queue_t &q) {
+ Finality fin;
+ while (q.try_dequeue(fin))
+ {
+ auto it = unconfirmed.find(fin.cmd_hash);
+ if (it != unconfirmed.end())
+ {
+ it->second.resolve(fin);
+ unconfirmed.erase(it);
+ }
+ }
+ return false;
+ });
+
/* register the handlers for msg from clients */
cn.reg_handler(salticidae::generic_bind(&HotStuffApp::client_request_cmd_handler, this, _1, _2));
cn.start();
@@ -310,12 +325,13 @@ void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, const conn_t &conn
// record the data of the command
storage->add_cmd(cmd);
if (get_pace_maker().get_proposer() == get_id())
+ {
exec_command(cmd_hash).then([this, addr](Finality fin) {
- cn.send_msg(MsgRespCmd(fin), addr);
+ resp_queue.enqueue(fin);
});
-//#if HOTSTUFF_CMD_RESPSIZE > 0
- else
- {
+ }
+ /* the following function is executed on the dedicated thread for confirming commands */
+ resp_tcall->async_call([this, addr, cmd_hash](salticidae::ThreadCall::Handle &) {
auto it = unconfirmed.find(cmd_hash);
if (it == unconfirmed.end())
it = unconfirmed.insert(
@@ -323,8 +339,7 @@ void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, const conn_t &conn
it->second.then([this, addr](const Finality &fin) {
cn.send_msg(MsgRespCmd(std::move(fin)), addr);
});
- }
-//#endif
+ });
}
void HotStuffApp::start(const std::vector<std::pair<NetAddr, bytearray_t>> &reps) {
@@ -352,10 +367,19 @@ void HotStuffApp::start(const std::vector<std::pair<NetAddr, bytearray_t>> &reps
else
client_conns.erase(conn);
});
+ resp_thread = std::thread([this]() { resp_ec.dispatch(); });
/* enter the event main loop */
ec.dispatch();
}
+void HotStuffApp::stop() {
+ papp->resp_tcall->async_call([this](salticidae::ThreadCall::Handle &) {
+ resp_ec.stop();
+ });
+ resp_thread.join();
+ ec.stop();
+}
+
void HotStuffApp::print_stat() const {
#ifdef HOTSTUFF_MSG_STAT
HOTSTUFF_LOG_INFO("--- client msg. (10s) ---");