diff options
-rw-r--r-- | hotstuff.conf | 1 | ||||
-rw-r--r-- | include/hotstuff/hotstuff.h | 9 | ||||
-rw-r--r-- | src/hotstuff.cpp | 92 | ||||
-rw-r--r-- | src/hotstuff_app.cpp | 26 |
4 files changed, 63 insertions, 65 deletions
diff --git a/hotstuff.conf b/hotstuff.conf index df181df..3651b70 100644 --- a/hotstuff.conf +++ b/hotstuff.conf @@ -1,4 +1,5 @@ nworker = 1 +block-size = 1 replica = 127.0.0.1:2234;22234, 028a1caf2c503a1e9b0b3ddf1d1df30253facdd50b93add05ebc7f708db00c11e4 replica = 127.0.0.1:2235;22235, 034ca53338e69321c1bc83e2fa76b1b00d68f64911074221abda88aac8af9d2b53 replica = 127.0.0.1:2236;22236, 0340f9d12dd1532968f7d8a99f95c3cd03992346487e15bd43265a3f273558ff2e diff --git a/include/hotstuff/hotstuff.h b/include/hotstuff/hotstuff.h index 43ed3a4..313511f 100644 --- a/include/hotstuff/hotstuff.h +++ b/include/hotstuff/hotstuff.h @@ -131,6 +131,7 @@ class HotStuffBase: public HotStuffCore { public: using Net = PeerNetwork<opcode_t>; + using commit_cb_t = std::function<void(const Finality &)>; protected: /** the binding address in replica network */ @@ -154,8 +155,10 @@ class HotStuffBase: public HotStuffCore { /* queues for async tasks */ std::unordered_map<const uint256_t, BlockFetchContext> blk_fetch_waiting; std::unordered_map<const uint256_t, BlockDeliveryContext> blk_delivery_waiting; - std::unordered_map<const uint256_t, promise_t> decision_waiting; - std::queue<uint256_t> cmd_pending; + std::unordered_map<const uint256_t, commit_cb_t> decision_waiting; + using cmd_queue_t = salticidae::MPSCQueueEventDriven<std::pair<uint256_t, commit_cb_t>>; + cmd_queue_t cmd_pending; + std::queue<uint256_t> cmd_pending_buffer; /* statistics */ uint64_t fetched; @@ -211,7 +214,7 @@ class HotStuffBase: public HotStuffCore { /* the API for HotStuffBase */ /* Submit the command to be decided. */ - promise_t exec_command(uint256_t cmd); + void exec_command(uint256_t cmd_hash, commit_cb_t callback); void start(std::vector<std::pair<NetAddr, pubkey_bt>> &&replicas, bool ec_loop = false); size_t size() const { return peers.size(); } diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp index d0b4139..5e75cc3 100644 --- a/src/hotstuff.cpp +++ b/src/hotstuff.cpp @@ -75,56 +75,8 @@ void MsgRespBlock::postponed_parse(HotStuffCore *hsc) { } // TODO: improve this function -promise_t HotStuffBase::exec_command(uint256_t cmd_hash) { - ReplicaID proposer = pmaker->get_proposer(); - - if (proposer != get_id()) - return promise_t([proposer, cmd_hash](promise_t &pm) { - pm.resolve(Finality(proposer, -1, 0, 0, - cmd_hash, uint256_t())); - }); - - auto it = decision_waiting.find(cmd_hash); - promise_t pm{}; - if (it == decision_waiting.end()) - { - cmd_pending.push(cmd_hash); - it = decision_waiting.insert(std::make_pair(cmd_hash, pm)).first; - } - - if (cmd_pending.size() >= blk_size) - { - std::vector<uint256_t> cmds; - for (uint32_t i = 0; i < blk_size; i++) - { - cmds.push_back(cmd_pending.front()); - cmd_pending.pop(); - } - pmaker->beat().then([this, cmds = std::move(cmds)](ReplicaID proposer) { - if (proposer != get_id()) - { - for (auto &cmd_hash: cmds) - { - auto it = decision_waiting.find(cmd_hash); - if (it != decision_waiting.end()) - { - it->second.resolve(Finality(proposer, -1, 0, 0, - cmd_hash, uint256_t())); - decision_waiting.erase(it); - } - } - } - else - { - on_propose(cmds, pmaker->get_parents()); -//#ifdef HOTSTUFF_AUTOCLI -// for (size_t i = pmaker->get_pending_size(); i < 1; i++) -// do_demand_commands(blk_size); -//#endif - } - }); - } - return pm; +void HotStuffBase::exec_command(uint256_t cmd_hash, commit_cb_t callback) { + cmd_pending.enqueue(std::make_pair(cmd_hash, callback)); } void HotStuffBase::on_fetch_blk(const block_t &blk) { @@ -414,7 +366,7 @@ void HotStuffBase::do_decide(Finality &&fin) { auto it = decision_waiting.find(fin.cmd_hash); if (it != decision_waiting.end()) { - it->second.resolve(std::move(fin)); + it->second(std::move(fin)); decision_waiting.erase(it); } } @@ -443,6 +395,44 @@ void HotStuffBase::start( pmaker->init(this); if (ec_loop) ec.dispatch(); + + cmd_pending.reg_handler(ec, [this](cmd_queue_t &q) { + std::pair<uint256_t, commit_cb_t> e; + while (q.try_dequeue(e)) + { + ReplicaID proposer = pmaker->get_proposer(); + if (proposer != get_id()) continue; + + const auto &cmd_hash = e.first; + cmd_pending_buffer.push(cmd_hash); + + auto it = decision_waiting.find(cmd_hash); + if (it == decision_waiting.end()) + { + it = decision_waiting.insert(std::make_pair(cmd_hash, e.second)).first; + } + else + { + // TODO: duplicate commands + } + + if (cmd_pending_buffer.size() >= blk_size) + { + std::vector<uint256_t> cmds; + for (uint32_t i = 0; i < blk_size; i++) + { + cmds.push_back(cmd_pending_buffer.front()); + cmd_pending_buffer.pop(); + } + pmaker->beat().then([this, cmds = std::move(cmds)](ReplicaID proposer) { + if (proposer == get_id()) + on_propose(cmds, pmaker->get_parents()); + }); + return true; + } + } + return false; + }); } } diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp index 49e06a3..31f63dd 100644 --- a/src/hotstuff_app.cpp +++ b/src/hotstuff_app.cpp @@ -69,6 +69,8 @@ class HotStuffApp: public HotStuff { double stat_period; double impeach_timeout; EventContext ec; + EventContext req_ec; + EventContext resp_ec; /** Network messaging between a replica and its client. */ ClientNetwork<opcode_t> cn; /** Timer object to schedule a periodic printing of system statistics */ @@ -84,10 +86,11 @@ class HotStuffApp: public HotStuff { using resp_queue_t = salticidae::MPSCQueueEventDriven<Finality>; /* for the dedicated thread sending responses to the clients */ - EventContext resp_ec; + std::thread req_thread; std::thread resp_thread; resp_queue_t resp_queue; salticidae::BoxObj<salticidae::ThreadCall> resp_tcall; + salticidae::BoxObj<salticidae::ThreadCall> req_tcall; void client_request_cmd_handler(MsgReqCmd &&, const conn_t &); @@ -292,10 +295,11 @@ HotStuffApp::HotStuffApp(uint32_t blk_size, stat_period(stat_period), impeach_timeout(impeach_timeout), ec(ec), - cn(ec, clinet_config), + cn(req_ec, clinet_config), clisten_addr(clisten_addr) { /* prepare the thread used for sending back confirmations */ resp_tcall = new salticidae::ThreadCall(resp_ec); + req_tcall = new salticidae::ThreadCall(req_ec); resp_queue.reg_handler(resp_ec, [this](resp_queue_t &q) { Finality fin; while (q.try_dequeue(fin)) @@ -320,16 +324,10 @@ void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, const conn_t &conn const NetAddr addr = conn->get_addr(); auto cmd = parse_cmd(msg.serialized); const auto &cmd_hash = cmd->get_hash(); - std::vector<promise_t> pms; HOTSTUFF_LOG_DEBUG("processing %s", std::string(*cmd).c_str()); - // record the data of the command - storage->add_cmd(cmd); - if (get_pace_maker().get_proposer() == get_id()) - { - exec_command(cmd_hash).then([this, addr](Finality fin) { - resp_queue.enqueue(fin); - }); - } + exec_command(cmd_hash, [this, addr](Finality fin) { + resp_queue.enqueue(fin); + }); /* the following function is executed on the dedicated thread for confirming commands */ resp_tcall->async_call([this, addr, cmd_hash](salticidae::ThreadCall::Handle &) { auto it = unconfirmed.find(cmd_hash); @@ -367,15 +365,21 @@ void HotStuffApp::start(const std::vector<std::pair<NetAddr, bytearray_t>> &reps else client_conns.erase(conn); }); + req_thread = std::thread([this]() { req_ec.dispatch(); }); resp_thread = std::thread([this]() { resp_ec.dispatch(); }); /* enter the event main loop */ ec.dispatch(); } void HotStuffApp::stop() { + papp->req_tcall->async_call([this](salticidae::ThreadCall::Handle &) { + req_ec.stop(); + }); papp->resp_tcall->async_call([this](salticidae::ThreadCall::Handle &) { resp_ec.stop(); }); + + req_thread.join(); resp_thread.join(); ec.stop(); } |