diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/hotstuff.cpp | 92 | ||||
-rw-r--r-- | src/hotstuff_app.cpp | 26 |
2 files changed, 56 insertions, 62 deletions
diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp index d0b4139..5e75cc3 100644 --- a/src/hotstuff.cpp +++ b/src/hotstuff.cpp @@ -75,56 +75,8 @@ void MsgRespBlock::postponed_parse(HotStuffCore *hsc) { } // TODO: improve this function -promise_t HotStuffBase::exec_command(uint256_t cmd_hash) { - ReplicaID proposer = pmaker->get_proposer(); - - if (proposer != get_id()) - return promise_t([proposer, cmd_hash](promise_t &pm) { - pm.resolve(Finality(proposer, -1, 0, 0, - cmd_hash, uint256_t())); - }); - - auto it = decision_waiting.find(cmd_hash); - promise_t pm{}; - if (it == decision_waiting.end()) - { - cmd_pending.push(cmd_hash); - it = decision_waiting.insert(std::make_pair(cmd_hash, pm)).first; - } - - if (cmd_pending.size() >= blk_size) - { - std::vector<uint256_t> cmds; - for (uint32_t i = 0; i < blk_size; i++) - { - cmds.push_back(cmd_pending.front()); - cmd_pending.pop(); - } - pmaker->beat().then([this, cmds = std::move(cmds)](ReplicaID proposer) { - if (proposer != get_id()) - { - for (auto &cmd_hash: cmds) - { - auto it = decision_waiting.find(cmd_hash); - if (it != decision_waiting.end()) - { - it->second.resolve(Finality(proposer, -1, 0, 0, - cmd_hash, uint256_t())); - decision_waiting.erase(it); - } - } - } - else - { - on_propose(cmds, pmaker->get_parents()); -//#ifdef HOTSTUFF_AUTOCLI -// for (size_t i = pmaker->get_pending_size(); i < 1; i++) -// do_demand_commands(blk_size); -//#endif - } - }); - } - return pm; +void HotStuffBase::exec_command(uint256_t cmd_hash, commit_cb_t callback) { + cmd_pending.enqueue(std::make_pair(cmd_hash, callback)); } void HotStuffBase::on_fetch_blk(const block_t &blk) { @@ -414,7 +366,7 @@ void HotStuffBase::do_decide(Finality &&fin) { auto it = decision_waiting.find(fin.cmd_hash); if (it != decision_waiting.end()) { - it->second.resolve(std::move(fin)); + it->second(std::move(fin)); decision_waiting.erase(it); } } @@ -443,6 +395,44 @@ void HotStuffBase::start( pmaker->init(this); if (ec_loop) ec.dispatch(); + + cmd_pending.reg_handler(ec, [this](cmd_queue_t &q) { + std::pair<uint256_t, commit_cb_t> e; + while (q.try_dequeue(e)) + { + ReplicaID proposer = pmaker->get_proposer(); + if (proposer != get_id()) continue; + + const auto &cmd_hash = e.first; + cmd_pending_buffer.push(cmd_hash); + + auto it = decision_waiting.find(cmd_hash); + if (it == decision_waiting.end()) + { + it = decision_waiting.insert(std::make_pair(cmd_hash, e.second)).first; + } + else + { + // TODO: duplicate commands + } + + if (cmd_pending_buffer.size() >= blk_size) + { + std::vector<uint256_t> cmds; + for (uint32_t i = 0; i < blk_size; i++) + { + cmds.push_back(cmd_pending_buffer.front()); + cmd_pending_buffer.pop(); + } + pmaker->beat().then([this, cmds = std::move(cmds)](ReplicaID proposer) { + if (proposer == get_id()) + on_propose(cmds, pmaker->get_parents()); + }); + return true; + } + } + return false; + }); } } diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp index 49e06a3..31f63dd 100644 --- a/src/hotstuff_app.cpp +++ b/src/hotstuff_app.cpp @@ -69,6 +69,8 @@ class HotStuffApp: public HotStuff { double stat_period; double impeach_timeout; EventContext ec; + EventContext req_ec; + EventContext resp_ec; /** Network messaging between a replica and its client. */ ClientNetwork<opcode_t> cn; /** Timer object to schedule a periodic printing of system statistics */ @@ -84,10 +86,11 @@ class HotStuffApp: public HotStuff { using resp_queue_t = salticidae::MPSCQueueEventDriven<Finality>; /* for the dedicated thread sending responses to the clients */ - EventContext resp_ec; + std::thread req_thread; std::thread resp_thread; resp_queue_t resp_queue; salticidae::BoxObj<salticidae::ThreadCall> resp_tcall; + salticidae::BoxObj<salticidae::ThreadCall> req_tcall; void client_request_cmd_handler(MsgReqCmd &&, const conn_t &); @@ -292,10 +295,11 @@ HotStuffApp::HotStuffApp(uint32_t blk_size, stat_period(stat_period), impeach_timeout(impeach_timeout), ec(ec), - cn(ec, clinet_config), + cn(req_ec, clinet_config), clisten_addr(clisten_addr) { /* prepare the thread used for sending back confirmations */ resp_tcall = new salticidae::ThreadCall(resp_ec); + req_tcall = new salticidae::ThreadCall(req_ec); resp_queue.reg_handler(resp_ec, [this](resp_queue_t &q) { Finality fin; while (q.try_dequeue(fin)) @@ -320,16 +324,10 @@ void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, const conn_t &conn const NetAddr addr = conn->get_addr(); auto cmd = parse_cmd(msg.serialized); const auto &cmd_hash = cmd->get_hash(); - std::vector<promise_t> pms; HOTSTUFF_LOG_DEBUG("processing %s", std::string(*cmd).c_str()); - // record the data of the command - storage->add_cmd(cmd); - if (get_pace_maker().get_proposer() == get_id()) - { - exec_command(cmd_hash).then([this, addr](Finality fin) { - resp_queue.enqueue(fin); - }); - } + exec_command(cmd_hash, [this, addr](Finality fin) { + resp_queue.enqueue(fin); + }); /* the following function is executed on the dedicated thread for confirming commands */ resp_tcall->async_call([this, addr, cmd_hash](salticidae::ThreadCall::Handle &) { auto it = unconfirmed.find(cmd_hash); @@ -367,15 +365,21 @@ void HotStuffApp::start(const std::vector<std::pair<NetAddr, bytearray_t>> &reps else client_conns.erase(conn); }); + req_thread = std::thread([this]() { req_ec.dispatch(); }); resp_thread = std::thread([this]() { resp_ec.dispatch(); }); /* enter the event main loop */ ec.dispatch(); } void HotStuffApp::stop() { + papp->req_tcall->async_call([this](salticidae::ThreadCall::Handle &) { + req_ec.stop(); + }); papp->resp_tcall->async_call([this](salticidae::ThreadCall::Handle &) { resp_ec.stop(); }); + + req_thread.join(); resp_thread.join(); ec.stop(); } |