diff options
-rw-r--r-- | include/salticidae/conn.h | 46 | ||||
-rw-r--r-- | include/salticidae/event.h | 33 | ||||
-rw-r--r-- | include/salticidae/network.h | 60 | ||||
-rw-r--r-- | src/conn.cpp | 6 |
4 files changed, 72 insertions, 73 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 6b9b486..56d2d6a 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -210,6 +210,7 @@ class ConnPool { class Worker { EventContext ec; ThreadCall tcall; + BoxObj<ThreadCall> exit_tcall; /** only used by the dispatcher thread */ std::thread handle; bool disp_flag; std::atomic<size_t> nconn; @@ -297,10 +298,18 @@ class ConnPool { tcall.async_call([this](ThreadCall::Handle &) { ec.stop(); }); } + void disp_stop() { + assert(disp_flag && exit_tcall); + exit_tcall->async_call([this](ThreadCall::Handle &) { ec.stop(); }); + } + std::thread &get_handle() { return handle; } const EventContext &get_ec() { return ec; } ThreadCall *get_tcall() { return &tcall; } - void set_dispatcher() { disp_flag = true; } + void set_dispatcher() { + disp_flag = true; + exit_tcall = new ThreadCall(ec); + } bool is_dispatcher() const { return disp_flag; } size_t get_nconn() { return nconn; } void stop_tcall() { tcall.stop(); } @@ -461,10 +470,9 @@ class ConnPool { workers[0].set_dispatcher(); disp_error_cb = [this](const std::exception_ptr err) { workers[0].stop_tcall(); - disp_ec.stop(); user_tcall->async_call([this, err](ThreadCall::Handle &) { - stop_workers(); - //std::rethrow_exception(err); + for (size_t i = 1; i < nworker; i++) + workers[i].stop(); if (error_cb) error_cb(err, true, -1); }); }; @@ -504,7 +512,7 @@ class ConnPool { system_state = 2; SALTICIDAE_LOG_INFO("stopping all threads..."); /* stop the dispatcher */ - workers[0].stop(); + workers[0].disp_stop(); workers[0].get_handle().join(); /* stop all workers */ for (size_t i = 1; i < nworker; i++) @@ -532,19 +540,13 @@ class ConnPool { /** Actively connect to remote addr. */ conn_t connect_sync(const NetAddr &addr) { - auto ret = *(static_cast<std::pair<conn_t, std::exception_ptr> *>( + auto ret = *(static_cast<conn_t *>( disp_tcall->call([this, addr](ThreadCall::Handle &h) { conn_t conn; - std::exception_ptr err = nullptr; - try { - conn = _connect(addr); - } catch (...) { - err = std::current_exception(); - } - h.set_result(std::make_pair(std::move(conn), err)); + conn = _connect(addr); + h.set_result(conn); }).get())); - if (ret.second) std::rethrow_exception(ret.second); - return std::move(ret.first); + return std::move(ret); } /** Actively connect to remote addr (async). */ @@ -565,17 +567,9 @@ class ConnPool { * Does not need to be called if do not want to accept any passive * connections. */ void listen(NetAddr listen_addr) { - auto ret = *(static_cast<std::exception_ptr *>( - disp_tcall->call([this, listen_addr](ThreadCall::Handle &h) { - std::exception_ptr err = nullptr; - try { - _listen(listen_addr); - } catch (...) { - err = std::current_exception(); - } - h.set_result(err); - }).get())); - if (ret) std::rethrow_exception(ret); + disp_tcall->call([this, listen_addr](ThreadCall::Handle &) { + _listen(listen_addr); + }).get(); } template<typename Func> diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 960588c..420c073 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -656,6 +656,7 @@ class ThreadCall { public: struct Result { void *data; + std::exception_ptr error; std::function<void(void *)> deleter; Result(): data(nullptr) {} Result(void *data, std::function<void(void *)> &&deleter): @@ -663,11 +664,14 @@ class ThreadCall { ~Result() { if (data != nullptr) deleter(data); } Result(const Result &) = delete; Result(Result &&other): - data(other.data), deleter(std::move(other.deleter)) { + data(other.data), + error(std::move(other.error)), + deleter(std::move(other.deleter)) { other.data = nullptr; } void swap(Result &other) { std::swap(data, other.data); + std::swap(error, other.error); std::swap(deleter, other.deleter); } Result &operator=(const Result &other) = delete; @@ -679,7 +683,10 @@ class ThreadCall { } return *this; } - void *get() { return data; } + void *get() { + if (error) std::rethrow_exception(error); + return data; + } }; class Handle { std::function<void(Handle &)> callback; @@ -689,15 +696,18 @@ class ThreadCall { public: Handle(): notifier(nullptr) {} Handle(const Handle &) = delete; - void exec() { - callback(*this); + void return_sync() { if (notifier) notifier->notify(std::move(result)); } + void exec() { + callback(*this); + return_sync(); + } template<typename T> - void set_result(T &&data) { + Result &set_result(T &&data) { using _T = std::remove_reference_t<T>; - result = Result(new _T(std::forward<T>(data)), + return result = Result(new _T(std::forward<T>(data)), [](void *ptr) {delete static_cast<_T *>(ptr);}); } }; @@ -711,7 +721,13 @@ class ThreadCall { Handle *h; while (q.try_dequeue(h)) { - if (!stopped) h->exec(); + try { + if (!stopped) h->exec(); + else throw SalticidaeError(SALTI_ERROR_NOT_AVAIL); + } catch (...) { + h->set_result(0).error = std::current_exception(); + h->return_sync(); + } delete h; if (++cnt == burst_size) return true; } @@ -726,7 +742,6 @@ class ThreadCall { template<typename Func> bool async_call(Func &&callback) { - if (stopped) return false; auto h = new Handle(); h->callback = std::forward<Func>(callback); q.enqueue(h); @@ -735,7 +750,6 @@ class ThreadCall { template<typename Func> Result call(Func &&callback) { - if (stopped) throw SalticidaeError(SALTI_ERROR_NOT_AVAIL); auto h = new Handle(); h->callback = std::forward<Func>(callback); ThreadNotifier<Result> notifier; @@ -746,6 +760,7 @@ class ThreadCall { const EventContext &get_ec() const { return ec; } void stop() { stopped = true; } + bool is_stopped() { return stopped; } }; } diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 4fc575c..f664493 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -978,22 +978,14 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) { template<typename O, O _, O __> void PeerNetwork<O, _, __>::listen(NetAddr _listen_addr) { - auto ret = *(static_cast<std::exception_ptr *>( - this->disp_tcall->call([this, _listen_addr](ThreadCall::Handle &h) { - std::exception_ptr err = nullptr; - try { - MsgNet::_listen(_listen_addr); - listen_addr = _listen_addr; - uint8_t rand_bytes[32]; - if (!RAND_bytes(rand_bytes, 32)) - throw PeerNetworkError(SALTI_ERROR_RAND_SOURCE); - my_nonce.load(rand_bytes); - } catch (...) { - err = std::current_exception(); - } - h.set_result(std::move(err)); - }).get())); - if (ret) std::rethrow_exception(ret); + this->disp_tcall->call([this, _listen_addr](ThreadCall::Handle &) { + MsgNet::_listen(_listen_addr); + listen_addr = _listen_addr; + uint8_t rand_bytes[32]; + if (!RAND_bytes(rand_bytes, 32)) + throw PeerNetworkError(SALTI_ERROR_RAND_SOURCE); + my_nonce.load(rand_bytes); + }).get(); } template<typename O, O _, O __> @@ -1054,31 +1046,25 @@ int32_t PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) { template<typename O, O _, O __> typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &addr) const { - auto ret = *(static_cast<std::pair<conn_t, std::exception_ptr> *>( + auto ret = *(static_cast<conn_t *>( this->disp_tcall->call([this, addr](ThreadCall::Handle &h) { conn_t conn; - std::exception_ptr err = nullptr; - try { - pinfo_slock_t _g(known_peers_lock); - pinfo_slock_t __g(pid2peer_lock); - auto it = known_peers.find(addr); - if (it == known_peers.end()) - throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - if (it->second->peer_id.is_null()) - conn = nullptr; - else - { - auto it2 = pid2peer.find(it->second->peer_id); - assert(it2 != pid2peer.end()); - conn = it2->second->conn; - } - } catch (...) { - err = std::current_exception(); + pinfo_slock_t _g(known_peers_lock); + pinfo_slock_t __g(pid2peer_lock); + auto it = known_peers.find(addr); + if (it == known_peers.end()) + throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); + if (it->second->peer_id.is_null()) + conn = nullptr; + else + { + auto it2 = pid2peer.find(it->second->peer_id); + assert(it2 != pid2peer.end()); + conn = it2->second->conn; } - h.set_result(std::make_pair(std::move(conn), err)); + h.set_result(std::move(conn)); }).get())); - if (ret.second) std::rethrow_exception(ret.second); - return std::move(ret.first); + return std::move(ret); } template<typename O, O _, O __> diff --git a/src/conn.cpp b/src/conn.cpp index 13486ee..5e663b6 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -273,15 +273,19 @@ void ConnPool::disp_terminate(const conn_t &conn) { }); } -void ConnPool::accept_client(int fd, int) { +void ConnPool::accept_client(int fd, int events) { int client_fd; struct sockaddr client_addr; try { socklen_t addr_size = sizeof(struct sockaddr_in); if ((client_fd = accept(fd, &client_addr, &addr_size)) < 0) + { + ev_listen.del(); throw ConnPoolError(SALTI_ERROR_ACCEPT, errno); + } else { + SALTICIDAE_LOG_INFO("%d\n", events); int one = 1; if (setsockopt(client_fd, SOL_TCP, TCP_NODELAY, (const char *)&one, sizeof(one)) < 0) throw ConnPoolError(SALTI_ERROR_ACCEPT, errno); |