aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2019-07-04 00:28:30 -0400
committerDeterminant <[email protected]>2019-07-04 00:28:30 -0400
commit69a9bed21f18728483320e88530045180796e2ac (patch)
tree67aa3c133ba514b55934de638cea3bca38f71d0a
parent2ef9c99438e5c87651332f47c98da8d397b030e3 (diff)
improve dispatcher shutdown
-rw-r--r--include/salticidae/conn.h46
-rw-r--r--include/salticidae/event.h33
-rw-r--r--include/salticidae/network.h60
-rw-r--r--src/conn.cpp6
4 files changed, 72 insertions, 73 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 6b9b486..56d2d6a 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -210,6 +210,7 @@ class ConnPool {
class Worker {
EventContext ec;
ThreadCall tcall;
+ BoxObj<ThreadCall> exit_tcall; /** only used by the dispatcher thread */
std::thread handle;
bool disp_flag;
std::atomic<size_t> nconn;
@@ -297,10 +298,18 @@ class ConnPool {
tcall.async_call([this](ThreadCall::Handle &) { ec.stop(); });
}
+ void disp_stop() {
+ assert(disp_flag && exit_tcall);
+ exit_tcall->async_call([this](ThreadCall::Handle &) { ec.stop(); });
+ }
+
std::thread &get_handle() { return handle; }
const EventContext &get_ec() { return ec; }
ThreadCall *get_tcall() { return &tcall; }
- void set_dispatcher() { disp_flag = true; }
+ void set_dispatcher() {
+ disp_flag = true;
+ exit_tcall = new ThreadCall(ec);
+ }
bool is_dispatcher() const { return disp_flag; }
size_t get_nconn() { return nconn; }
void stop_tcall() { tcall.stop(); }
@@ -461,10 +470,9 @@ class ConnPool {
workers[0].set_dispatcher();
disp_error_cb = [this](const std::exception_ptr err) {
workers[0].stop_tcall();
- disp_ec.stop();
user_tcall->async_call([this, err](ThreadCall::Handle &) {
- stop_workers();
- //std::rethrow_exception(err);
+ for (size_t i = 1; i < nworker; i++)
+ workers[i].stop();
if (error_cb) error_cb(err, true, -1);
});
};
@@ -504,7 +512,7 @@ class ConnPool {
system_state = 2;
SALTICIDAE_LOG_INFO("stopping all threads...");
/* stop the dispatcher */
- workers[0].stop();
+ workers[0].disp_stop();
workers[0].get_handle().join();
/* stop all workers */
for (size_t i = 1; i < nworker; i++)
@@ -532,19 +540,13 @@ class ConnPool {
/** Actively connect to remote addr. */
conn_t connect_sync(const NetAddr &addr) {
- auto ret = *(static_cast<std::pair<conn_t, std::exception_ptr> *>(
+ auto ret = *(static_cast<conn_t *>(
disp_tcall->call([this, addr](ThreadCall::Handle &h) {
conn_t conn;
- std::exception_ptr err = nullptr;
- try {
- conn = _connect(addr);
- } catch (...) {
- err = std::current_exception();
- }
- h.set_result(std::make_pair(std::move(conn), err));
+ conn = _connect(addr);
+ h.set_result(conn);
}).get()));
- if (ret.second) std::rethrow_exception(ret.second);
- return std::move(ret.first);
+ return std::move(ret);
}
/** Actively connect to remote addr (async). */
@@ -565,17 +567,9 @@ class ConnPool {
* Does not need to be called if do not want to accept any passive
* connections. */
void listen(NetAddr listen_addr) {
- auto ret = *(static_cast<std::exception_ptr *>(
- disp_tcall->call([this, listen_addr](ThreadCall::Handle &h) {
- std::exception_ptr err = nullptr;
- try {
- _listen(listen_addr);
- } catch (...) {
- err = std::current_exception();
- }
- h.set_result(err);
- }).get()));
- if (ret) std::rethrow_exception(ret);
+ disp_tcall->call([this, listen_addr](ThreadCall::Handle &) {
+ _listen(listen_addr);
+ }).get();
}
template<typename Func>
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index 960588c..420c073 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -656,6 +656,7 @@ class ThreadCall {
public:
struct Result {
void *data;
+ std::exception_ptr error;
std::function<void(void *)> deleter;
Result(): data(nullptr) {}
Result(void *data, std::function<void(void *)> &&deleter):
@@ -663,11 +664,14 @@ class ThreadCall {
~Result() { if (data != nullptr) deleter(data); }
Result(const Result &) = delete;
Result(Result &&other):
- data(other.data), deleter(std::move(other.deleter)) {
+ data(other.data),
+ error(std::move(other.error)),
+ deleter(std::move(other.deleter)) {
other.data = nullptr;
}
void swap(Result &other) {
std::swap(data, other.data);
+ std::swap(error, other.error);
std::swap(deleter, other.deleter);
}
Result &operator=(const Result &other) = delete;
@@ -679,7 +683,10 @@ class ThreadCall {
}
return *this;
}
- void *get() { return data; }
+ void *get() {
+ if (error) std::rethrow_exception(error);
+ return data;
+ }
};
class Handle {
std::function<void(Handle &)> callback;
@@ -689,15 +696,18 @@ class ThreadCall {
public:
Handle(): notifier(nullptr) {}
Handle(const Handle &) = delete;
- void exec() {
- callback(*this);
+ void return_sync() {
if (notifier)
notifier->notify(std::move(result));
}
+ void exec() {
+ callback(*this);
+ return_sync();
+ }
template<typename T>
- void set_result(T &&data) {
+ Result &set_result(T &&data) {
using _T = std::remove_reference_t<T>;
- result = Result(new _T(std::forward<T>(data)),
+ return result = Result(new _T(std::forward<T>(data)),
[](void *ptr) {delete static_cast<_T *>(ptr);});
}
};
@@ -711,7 +721,13 @@ class ThreadCall {
Handle *h;
while (q.try_dequeue(h))
{
- if (!stopped) h->exec();
+ try {
+ if (!stopped) h->exec();
+ else throw SalticidaeError(SALTI_ERROR_NOT_AVAIL);
+ } catch (...) {
+ h->set_result(0).error = std::current_exception();
+ h->return_sync();
+ }
delete h;
if (++cnt == burst_size) return true;
}
@@ -726,7 +742,6 @@ class ThreadCall {
template<typename Func>
bool async_call(Func &&callback) {
- if (stopped) return false;
auto h = new Handle();
h->callback = std::forward<Func>(callback);
q.enqueue(h);
@@ -735,7 +750,6 @@ class ThreadCall {
template<typename Func>
Result call(Func &&callback) {
- if (stopped) throw SalticidaeError(SALTI_ERROR_NOT_AVAIL);
auto h = new Handle();
h->callback = std::forward<Func>(callback);
ThreadNotifier<Result> notifier;
@@ -746,6 +760,7 @@ class ThreadCall {
const EventContext &get_ec() const { return ec; }
void stop() { stopped = true; }
+ bool is_stopped() { return stopped; }
};
}
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 4fc575c..f664493 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -978,22 +978,14 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) {
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::listen(NetAddr _listen_addr) {
- auto ret = *(static_cast<std::exception_ptr *>(
- this->disp_tcall->call([this, _listen_addr](ThreadCall::Handle &h) {
- std::exception_ptr err = nullptr;
- try {
- MsgNet::_listen(_listen_addr);
- listen_addr = _listen_addr;
- uint8_t rand_bytes[32];
- if (!RAND_bytes(rand_bytes, 32))
- throw PeerNetworkError(SALTI_ERROR_RAND_SOURCE);
- my_nonce.load(rand_bytes);
- } catch (...) {
- err = std::current_exception();
- }
- h.set_result(std::move(err));
- }).get()));
- if (ret) std::rethrow_exception(ret);
+ this->disp_tcall->call([this, _listen_addr](ThreadCall::Handle &) {
+ MsgNet::_listen(_listen_addr);
+ listen_addr = _listen_addr;
+ uint8_t rand_bytes[32];
+ if (!RAND_bytes(rand_bytes, 32))
+ throw PeerNetworkError(SALTI_ERROR_RAND_SOURCE);
+ my_nonce.load(rand_bytes);
+ }).get();
}
template<typename O, O _, O __>
@@ -1054,31 +1046,25 @@ int32_t PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) {
template<typename O, O _, O __>
typename PeerNetwork<O, _, __>::conn_t
PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &addr) const {
- auto ret = *(static_cast<std::pair<conn_t, std::exception_ptr> *>(
+ auto ret = *(static_cast<conn_t *>(
this->disp_tcall->call([this, addr](ThreadCall::Handle &h) {
conn_t conn;
- std::exception_ptr err = nullptr;
- try {
- pinfo_slock_t _g(known_peers_lock);
- pinfo_slock_t __g(pid2peer_lock);
- auto it = known_peers.find(addr);
- if (it == known_peers.end())
- throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- if (it->second->peer_id.is_null())
- conn = nullptr;
- else
- {
- auto it2 = pid2peer.find(it->second->peer_id);
- assert(it2 != pid2peer.end());
- conn = it2->second->conn;
- }
- } catch (...) {
- err = std::current_exception();
+ pinfo_slock_t _g(known_peers_lock);
+ pinfo_slock_t __g(pid2peer_lock);
+ auto it = known_peers.find(addr);
+ if (it == known_peers.end())
+ throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
+ if (it->second->peer_id.is_null())
+ conn = nullptr;
+ else
+ {
+ auto it2 = pid2peer.find(it->second->peer_id);
+ assert(it2 != pid2peer.end());
+ conn = it2->second->conn;
}
- h.set_result(std::make_pair(std::move(conn), err));
+ h.set_result(std::move(conn));
}).get()));
- if (ret.second) std::rethrow_exception(ret.second);
- return std::move(ret.first);
+ return std::move(ret);
}
template<typename O, O _, O __>
diff --git a/src/conn.cpp b/src/conn.cpp
index 13486ee..5e663b6 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -273,15 +273,19 @@ void ConnPool::disp_terminate(const conn_t &conn) {
});
}
-void ConnPool::accept_client(int fd, int) {
+void ConnPool::accept_client(int fd, int events) {
int client_fd;
struct sockaddr client_addr;
try {
socklen_t addr_size = sizeof(struct sockaddr_in);
if ((client_fd = accept(fd, &client_addr, &addr_size)) < 0)
+ {
+ ev_listen.del();
throw ConnPoolError(SALTI_ERROR_ACCEPT, errno);
+ }
else
{
+ SALTICIDAE_LOG_INFO("%d\n", events);
int one = 1;
if (setsockopt(client_fd, SOL_TCP, TCP_NODELAY, (const char *)&one, sizeof(one)) < 0)
throw ConnPoolError(SALTI_ERROR_ACCEPT, errno);