From 0ec28b0d659836a5fb32702cdee8c3433727f48f Mon Sep 17 00:00:00 2001 From: Determinant Date: Thu, 9 May 2019 11:24:04 -0400 Subject: ... --- src/hotstuff_app.cpp | 58 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 41 insertions(+), 17 deletions(-) (limited to 'src') diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp index bc596f4..49e06a3 100644 --- a/src/hotstuff_app.cpp +++ b/src/hotstuff_app.cpp @@ -78,11 +78,16 @@ class HotStuffApp: public HotStuff { /** The listen address for client RPC */ NetAddr clisten_addr; -//#if HOTSTUFF_CMD_RESPSIZE > 0 std::unordered_map unconfirmed; -//#endif using conn_t = ClientNetwork::conn_t; + using resp_queue_t = salticidae::MPSCQueueEventDriven; + + /* for the dedicated thread sending responses to the clients */ + EventContext resp_ec; + std::thread resp_thread; + resp_queue_t resp_queue; + salticidae::BoxObj resp_tcall; void client_request_cmd_handler(MsgReqCmd &&, const conn_t &); @@ -102,14 +107,7 @@ class HotStuffApp: public HotStuff { #ifndef HOTSTUFF_ENABLE_BENCHMARK HOTSTUFF_LOG_INFO("replicated %s", std::string(fin).c_str()); #endif -//#if HOTSTUFF_CMD_RESPSIZE > 0 - auto it = unconfirmed.find(fin.cmd_hash); - if (it != unconfirmed.end()) - { - it->second.resolve(fin); - unconfirmed.erase(it); - } -//#endif + resp_queue.enqueue(fin); } //#ifdef HOTSTUFF_AUTOCLI @@ -142,6 +140,7 @@ class HotStuffApp: public HotStuff { const ClientNetwork::Config &clinet_config); void start(const std::vector> &reps); + void stop(); }; std::pair split_ip_port_cport(const std::string &s) { @@ -265,7 +264,7 @@ int main(int argc, char **argv) { reps.push_back(std::make_pair( NetAddr(p.first), hotstuff::from_hex(r.second))); } - auto shutdown = [&](int) { ec.stop(); }; + auto shutdown = [&](int) { papp->stop(); }; salticidae::SigEvent ev_sigint(ec, shutdown); salticidae::SigEvent ev_sigterm(ec, shutdown); ev_sigint.add(SIGINT); @@ -295,6 +294,22 @@ HotStuffApp::HotStuffApp(uint32_t blk_size, ec(ec), cn(ec, clinet_config), clisten_addr(clisten_addr) { + /* prepare the thread used for sending back confirmations */ + resp_tcall = new salticidae::ThreadCall(resp_ec); + resp_queue.reg_handler(resp_ec, [this](resp_queue_t &q) { + Finality fin; + while (q.try_dequeue(fin)) + { + auto it = unconfirmed.find(fin.cmd_hash); + if (it != unconfirmed.end()) + { + it->second.resolve(fin); + unconfirmed.erase(it); + } + } + return false; + }); + /* register the handlers for msg from clients */ cn.reg_handler(salticidae::generic_bind(&HotStuffApp::client_request_cmd_handler, this, _1, _2)); cn.start(); @@ -310,12 +325,13 @@ void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, const conn_t &conn // record the data of the command storage->add_cmd(cmd); if (get_pace_maker().get_proposer() == get_id()) + { exec_command(cmd_hash).then([this, addr](Finality fin) { - cn.send_msg(MsgRespCmd(fin), addr); + resp_queue.enqueue(fin); }); -//#if HOTSTUFF_CMD_RESPSIZE > 0 - else - { + } + /* the following function is executed on the dedicated thread for confirming commands */ + resp_tcall->async_call([this, addr, cmd_hash](salticidae::ThreadCall::Handle &) { auto it = unconfirmed.find(cmd_hash); if (it == unconfirmed.end()) it = unconfirmed.insert( @@ -323,8 +339,7 @@ void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, const conn_t &conn it->second.then([this, addr](const Finality &fin) { cn.send_msg(MsgRespCmd(std::move(fin)), addr); }); - } -//#endif + }); } void HotStuffApp::start(const std::vector> &reps) { @@ -352,10 +367,19 @@ void HotStuffApp::start(const std::vector> &reps else client_conns.erase(conn); }); + resp_thread = std::thread([this]() { resp_ec.dispatch(); }); /* enter the event main loop */ ec.dispatch(); } +void HotStuffApp::stop() { + papp->resp_tcall->async_call([this](salticidae::ThreadCall::Handle &) { + resp_ec.stop(); + }); + resp_thread.join(); + ec.stop(); +} + void HotStuffApp::print_stat() const { #ifdef HOTSTUFF_MSG_STAT HOTSTUFF_LOG_INFO("--- client msg. (10s) ---"); -- cgit v1.2.3