aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2019-05-22 02:51:43 -0400
committerDeterminant <[email protected]>2019-05-22 02:51:43 -0400
commit811619ba36d0151d451647fe55e7a16d44b56357 (patch)
tree7d5677de826242f5744c0191c0f08448e48c6a4e /src
parent380f6411fe4889839d470a343ebd50bdf2205531 (diff)
optimize exec_command
Diffstat (limited to 'src')
-rw-r--r--src/hotstuff.cpp92
-rw-r--r--src/hotstuff_app.cpp26
2 files changed, 56 insertions, 62 deletions
diff --git a/src/hotstuff.cpp b/src/hotstuff.cpp
index d0b4139..5e75cc3 100644
--- a/src/hotstuff.cpp
+++ b/src/hotstuff.cpp
@@ -75,56 +75,8 @@ void MsgRespBlock::postponed_parse(HotStuffCore *hsc) {
}
// TODO: improve this function
-promise_t HotStuffBase::exec_command(uint256_t cmd_hash) {
- ReplicaID proposer = pmaker->get_proposer();
-
- if (proposer != get_id())
- return promise_t([proposer, cmd_hash](promise_t &pm) {
- pm.resolve(Finality(proposer, -1, 0, 0,
- cmd_hash, uint256_t()));
- });
-
- auto it = decision_waiting.find(cmd_hash);
- promise_t pm{};
- if (it == decision_waiting.end())
- {
- cmd_pending.push(cmd_hash);
- it = decision_waiting.insert(std::make_pair(cmd_hash, pm)).first;
- }
-
- if (cmd_pending.size() >= blk_size)
- {
- std::vector<uint256_t> cmds;
- for (uint32_t i = 0; i < blk_size; i++)
- {
- cmds.push_back(cmd_pending.front());
- cmd_pending.pop();
- }
- pmaker->beat().then([this, cmds = std::move(cmds)](ReplicaID proposer) {
- if (proposer != get_id())
- {
- for (auto &cmd_hash: cmds)
- {
- auto it = decision_waiting.find(cmd_hash);
- if (it != decision_waiting.end())
- {
- it->second.resolve(Finality(proposer, -1, 0, 0,
- cmd_hash, uint256_t()));
- decision_waiting.erase(it);
- }
- }
- }
- else
- {
- on_propose(cmds, pmaker->get_parents());
-//#ifdef HOTSTUFF_AUTOCLI
-// for (size_t i = pmaker->get_pending_size(); i < 1; i++)
-// do_demand_commands(blk_size);
-//#endif
- }
- });
- }
- return pm;
+void HotStuffBase::exec_command(uint256_t cmd_hash, commit_cb_t callback) {
+ cmd_pending.enqueue(std::make_pair(cmd_hash, callback));
}
void HotStuffBase::on_fetch_blk(const block_t &blk) {
@@ -414,7 +366,7 @@ void HotStuffBase::do_decide(Finality &&fin) {
auto it = decision_waiting.find(fin.cmd_hash);
if (it != decision_waiting.end())
{
- it->second.resolve(std::move(fin));
+ it->second(std::move(fin));
decision_waiting.erase(it);
}
}
@@ -443,6 +395,44 @@ void HotStuffBase::start(
pmaker->init(this);
if (ec_loop)
ec.dispatch();
+
+ cmd_pending.reg_handler(ec, [this](cmd_queue_t &q) {
+ std::pair<uint256_t, commit_cb_t> e;
+ while (q.try_dequeue(e))
+ {
+ ReplicaID proposer = pmaker->get_proposer();
+ if (proposer != get_id()) continue;
+
+ const auto &cmd_hash = e.first;
+ cmd_pending_buffer.push(cmd_hash);
+
+ auto it = decision_waiting.find(cmd_hash);
+ if (it == decision_waiting.end())
+ {
+ it = decision_waiting.insert(std::make_pair(cmd_hash, e.second)).first;
+ }
+ else
+ {
+ // TODO: duplicate commands
+ }
+
+ if (cmd_pending_buffer.size() >= blk_size)
+ {
+ std::vector<uint256_t> cmds;
+ for (uint32_t i = 0; i < blk_size; i++)
+ {
+ cmds.push_back(cmd_pending_buffer.front());
+ cmd_pending_buffer.pop();
+ }
+ pmaker->beat().then([this, cmds = std::move(cmds)](ReplicaID proposer) {
+ if (proposer == get_id())
+ on_propose(cmds, pmaker->get_parents());
+ });
+ return true;
+ }
+ }
+ return false;
+ });
}
}
diff --git a/src/hotstuff_app.cpp b/src/hotstuff_app.cpp
index 49e06a3..31f63dd 100644
--- a/src/hotstuff_app.cpp
+++ b/src/hotstuff_app.cpp
@@ -69,6 +69,8 @@ class HotStuffApp: public HotStuff {
double stat_period;
double impeach_timeout;
EventContext ec;
+ EventContext req_ec;
+ EventContext resp_ec;
/** Network messaging between a replica and its client. */
ClientNetwork<opcode_t> cn;
/** Timer object to schedule a periodic printing of system statistics */
@@ -84,10 +86,11 @@ class HotStuffApp: public HotStuff {
using resp_queue_t = salticidae::MPSCQueueEventDriven<Finality>;
/* for the dedicated thread sending responses to the clients */
- EventContext resp_ec;
+ std::thread req_thread;
std::thread resp_thread;
resp_queue_t resp_queue;
salticidae::BoxObj<salticidae::ThreadCall> resp_tcall;
+ salticidae::BoxObj<salticidae::ThreadCall> req_tcall;
void client_request_cmd_handler(MsgReqCmd &&, const conn_t &);
@@ -292,10 +295,11 @@ HotStuffApp::HotStuffApp(uint32_t blk_size,
stat_period(stat_period),
impeach_timeout(impeach_timeout),
ec(ec),
- cn(ec, clinet_config),
+ cn(req_ec, clinet_config),
clisten_addr(clisten_addr) {
/* prepare the thread used for sending back confirmations */
resp_tcall = new salticidae::ThreadCall(resp_ec);
+ req_tcall = new salticidae::ThreadCall(req_ec);
resp_queue.reg_handler(resp_ec, [this](resp_queue_t &q) {
Finality fin;
while (q.try_dequeue(fin))
@@ -320,16 +324,10 @@ void HotStuffApp::client_request_cmd_handler(MsgReqCmd &&msg, const conn_t &conn
const NetAddr addr = conn->get_addr();
auto cmd = parse_cmd(msg.serialized);
const auto &cmd_hash = cmd->get_hash();
- std::vector<promise_t> pms;
HOTSTUFF_LOG_DEBUG("processing %s", std::string(*cmd).c_str());
- // record the data of the command
- storage->add_cmd(cmd);
- if (get_pace_maker().get_proposer() == get_id())
- {
- exec_command(cmd_hash).then([this, addr](Finality fin) {
- resp_queue.enqueue(fin);
- });
- }
+ exec_command(cmd_hash, [this, addr](Finality fin) {
+ resp_queue.enqueue(fin);
+ });
/* the following function is executed on the dedicated thread for confirming commands */
resp_tcall->async_call([this, addr, cmd_hash](salticidae::ThreadCall::Handle &) {
auto it = unconfirmed.find(cmd_hash);
@@ -367,15 +365,21 @@ void HotStuffApp::start(const std::vector<std::pair<NetAddr, bytearray_t>> &reps
else
client_conns.erase(conn);
});
+ req_thread = std::thread([this]() { req_ec.dispatch(); });
resp_thread = std::thread([this]() { resp_ec.dispatch(); });
/* enter the event main loop */
ec.dispatch();
}
void HotStuffApp::stop() {
+ papp->req_tcall->async_call([this](salticidae::ThreadCall::Handle &) {
+ req_ec.stop();
+ });
papp->resp_tcall->async_call([this](salticidae::ThreadCall::Handle &) {
resp_ec.stop();
});
+
+ req_thread.join();
resp_thread.join();
ec.stop();
}