diff options
Diffstat (limited to 'include')
-rw-r--r-- | include/salticidae/conn.h | 29 | ||||
-rw-r--r-- | include/salticidae/event.h | 89 | ||||
-rw-r--r-- | include/salticidae/network.h | 47 |
3 files changed, 84 insertions, 81 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 6462ddc..3144a68 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -98,7 +98,7 @@ class ConnPool { /** Terminate the connection (from the worker thread). */ void worker_terminate(); /** Terminate the connection (from the dispatcher thread). */ - void disp_terminate(bool blocking = true); + void disp_terminate(); public: Conn(): ready_send(false) {} @@ -157,7 +157,7 @@ class ConnPool { int listen_fd; /**< for accepting new network connections */ void update_conn(const conn_t &conn, bool connected) { - user_tcall->call([this, conn, connected](ThreadCall::Handle &) { + user_tcall->async_call([this, conn, connected](ThreadCall::Handle &) { if (conn_cb) conn_cb(*conn, connected); }); } @@ -178,7 +178,7 @@ class ConnPool { } void feed(const conn_t &conn, int client_fd) { - tcall.call([this, conn, client_fd](ThreadCall::Handle &) { + tcall.async_call([this, conn, client_fd](ThreadCall::Handle &) { if (conn->mode == Conn::ConnMode::DEAD) { SALTICIDAE_LOG_INFO("worker %x discarding dead connection", @@ -214,7 +214,7 @@ class ConnPool { void unfeed() { nconn--; } void stop() { - tcall.call([this](ThreadCall::Handle &) { ec.stop(); }); + tcall.async_call([this](ThreadCall::Handle &) { ec.stop(); }); } std::thread &get_handle() { return handle; } @@ -352,24 +352,19 @@ class ConnPool { conn_t connect(const NetAddr &addr, bool blocking = true) { if (blocking) { - auto ret = static_cast<conn_t *>(disp_tcall->call( + auto ret = *(static_cast<conn_t *>(disp_tcall->call( [this, addr](ThreadCall::Handle &h) { - auto ptr = new conn_t(_connect(addr)); + auto conn = _connect(addr); std::atomic_thread_fence(std::memory_order_release); - h.set_result(ptr); - h.set_deleter([](void *data) { - delete static_cast<conn_t *>(data); - }); - }, true)); - auto conn = *ret; - delete ret; - return std::move(conn); + h.set_result(std::move(conn)); + }).get())); + return std::move(ret); } else { - disp_tcall->call([this, addr](ThreadCall::Handle &) { + disp_tcall->async_call([this, addr](ThreadCall::Handle &) { _connect(addr); - }, false); + }); return nullptr; } } @@ -380,7 +375,7 @@ class ConnPool { void listen(NetAddr listen_addr) { disp_tcall->call([this, listen_addr](ThreadCall::Handle &) { _listen(listen_addr); - }, true); + }); } template<typename Func> diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 2cda44f..0498fa5 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -192,22 +192,23 @@ class Event { operator bool() const { return ev_fd != nullptr || ev_timer != nullptr; } }; +template<typename T> class ThreadNotifier { std::condition_variable cv; std::mutex mlock; mutex_ul_t ul; bool ready; - void *data; + T data; public: ThreadNotifier(): ul(mlock), ready(false) {} - void *wait() { + T wait() { cv.wait(ul, [this]{ return ready; }); - return data; + return std::move(data); } - void notify(void *_data) { + void notify(T &&_data) { mutex_lg_t _(mlock); ready = true; - data = _data; + data = std::move(_data); cv.notify_all(); } }; @@ -218,21 +219,50 @@ class ThreadCall { Event ev_listen; public: + struct Result { + void *data; + std::function<void(void *)> deleter; + Result(): data(nullptr) {} + Result(void *data, std::function<void(void *)> &&deleter): + data(data), deleter(std::move(deleter)) {} + ~Result() { if (data != nullptr) deleter(data); } + Result(const Result &) = delete; + Result(Result &&other): + data(other.data), deleter(std::move(other.deleter)) { + other.data = nullptr; + } + void swap(Result &other) { + std::swap(data, other.data); + std::swap(deleter, other.deleter); + } + Result &operator=(const Result &other) = delete; + Result &operator=(Result &&other) { + if (this != &other) + { + Result tmp(std::move(other)); + tmp.swap(*this); + } + return *this; + } + void *get() { return data; } + }; class Handle { std::function<void(Handle &)> callback; - std::function<void(void *)> deleter; - ThreadNotifier* notifier; - void *result; + ThreadNotifier<Result> * notifier; + Result result; friend ThreadCall; public: - Handle(): notifier(nullptr), result(nullptr) {} + Handle(): notifier(nullptr) {} void exec() { callback(*this); - if (notifier) notifier->notify(result); + if (notifier) + notifier->notify(std::move(result)); + } + template<typename T> + void set_result(T data) { + result = Result(new T(std::forward<T>(data)), + [](void *ptr) {delete static_cast<T *>(ptr);}); } - void set_result(void *data) { result = data; } - template<typename Func> - void set_deleter(Func _deleter) { deleter = _deleter; } }; ThreadCall() = default; @@ -254,33 +284,28 @@ class ThreadCall { ev_listen.clear(); Handle *h; while (read(ctl_fd[0], &h, sizeof(h)) == sizeof(h)) - { - if (h->result && h->deleter) - h->deleter(h->result); delete h; - } close(ctl_fd[0]); close(ctl_fd[1]); } template<typename Func> - void *call(Func callback, bool blocking = false) { + void async_call(Func callback) { auto h = new Handle(); h->callback = callback; - if (blocking) - { - ThreadNotifier notifier; - h->notifier = ¬ifier; - std::atomic_thread_fence(std::memory_order_release); - write(ctl_fd[1], &h, sizeof(h)); - return notifier.wait(); - } - else - { - std::atomic_thread_fence(std::memory_order_release); - write(ctl_fd[1], &h, sizeof(h)); - return nullptr; - } + std::atomic_thread_fence(std::memory_order_release); + write(ctl_fd[1], &h, sizeof(h)); + } + + template<typename Func> + Result call(Func callback) { + auto h = new Handle(); + h->callback = callback; + ThreadNotifier<Result> notifier; + h->notifier = ¬ifier; + std::atomic_thread_fence(std::memory_order_release); + write(ctl_fd[1], &h, sizeof(h)); + return notifier.wait(); } }; diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 2d16938..5e966fe 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -482,7 +482,7 @@ void MsgNetwork<OpcodeType>::send_msg(const MsgType &_msg, Conn &conn) { template<typename O, O _, O __> void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker, const conn_t &conn, double timeout) { - worker->get_tcall()->call([conn, t=timeout](ThreadCall::Handle &) { + worker->get_tcall()->async_call([conn, t=timeout](ThreadCall::Handle &) { if (!conn->ev_timeout) return; conn->ev_timeout.del(); conn->ev_timeout.add_with_timeout(t, 0); @@ -621,7 +621,7 @@ void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, Conn &_conn) { auto conn = static_pointer_cast<Conn>(_conn.self()); if (!conn) return; uint16_t port = msg.port; - this->disp_tcall->call([this, conn, port](ThreadCall::Handle &msg) { + this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &msg) { SALTICIDAE_LOG_INFO("ping from %s, port %u", std::string(*conn).c_str(), ntohs(port)); if (check_new_conn(conn, port)) return; @@ -635,7 +635,7 @@ void PeerNetwork<O, _, __>::msg_pong(MsgPong &&msg, Conn &_conn) { auto conn = static_pointer_cast<Conn>(_conn.self()); if (!conn) return; uint16_t port = msg.port; - this->disp_tcall->call([this, conn, port](ThreadCall::Handle &msg) { + this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &msg) { auto it = id2peer.find(conn->peer_id); if (it == id2peer.end()) { @@ -658,7 +658,7 @@ void PeerNetwork<O, _, __>::listen(NetAddr listen_addr) { this->disp_tcall->call([this, listen_addr](ThreadCall::Handle &msg) { MsgNet::_listen(listen_addr); listen_port = listen_addr.port; - }, true); + }); } template<typename O, O _, O __> @@ -669,40 +669,28 @@ void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) { throw PeerNetworkError("peer already exists"); id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->disp_ec))); start_active_conn(addr); - }, true); + }); } template<typename O, O _, O __> const typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &paddr) const { - auto ret = static_cast<conn_t *>(this->disp_tcall->call( + auto ret = *(static_cast<conn_t *>(this->disp_tcall->call( [this, paddr](ThreadCall::Handle &h) { auto it = id2peer.find(paddr); if (it == id2peer.end()) throw PeerNetworkError("peer does not exist"); - auto ptr = new conn_t(it->second->conn); - h.set_result(ptr); - h.set_deleter([](void *data) { - delete static_cast<conn_t *>(data); - }); - })); - auto conn = *ret; - delete ret; - return std::move(conn); + h.set_result(it->second->conn); + }).get())); + return std::move(*ret); } template<typename O, O _, O __> bool PeerNetwork<O, _, __>::has_peer(const NetAddr &paddr) const { - auto ret = static_cast<bool *>(this->disp_tcall->call( + return *(static_cast<bool *>(this->disp_tcall->call( [this, paddr](ThreadCall::Handle &h) { h.set_result(id2peer.count(paddr)); - h.set_deleter([](void *data) { - delete static_cast<bool *>(data); - }); - })); - auto has = *ret; - delete ret; - return has; + }).get())); } template<typename O, O _, O __> @@ -734,19 +722,14 @@ void ClientNetwork<OpcodeType>::Conn::on_teardown() { template<typename OpcodeType> template<typename MsgType> void ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr) { - auto ret = static_cast<conn_t *>(this->disp_tcall->call( + auto ret = *(static_cast<conn_t *>(this->disp_tcall->call( [this, addr](ThreadCall::Handle &h) { auto it = addr2conn.find(addr); if (it == addr2conn.end()) throw PeerNetworkError("client does not exist"); - auto ptr = new conn_t(it->second->conn); - h.set_result(ptr); - h.set_deleter([](void *data) { - delete static_cast<conn_t *>(data); - }); - })); - send_msg(msg, **ret); - delete ret; + h.set_result(it->second->conn); + }).get())); + send_msg(msg, *ret); } template<typename O, O OPCODE_PING, O _> |