From b27216a50bd6566b6fd9203d3acf191005ae5763 Mon Sep 17 00:00:00 2001 From: Determinant Date: Fri, 16 Nov 2018 17:05:28 -0500 Subject: clean up code --- include/salticidae/conn.h | 29 ++++++--------- include/salticidae/event.h | 89 ++++++++++++++++++++++++++++---------------- include/salticidae/network.h | 47 ++++++++--------------- src/conn.cpp | 4 +- 4 files changed, 86 insertions(+), 83 deletions(-) diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 6462ddc..3144a68 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -98,7 +98,7 @@ class ConnPool { /** Terminate the connection (from the worker thread). */ void worker_terminate(); /** Terminate the connection (from the dispatcher thread). */ - void disp_terminate(bool blocking = true); + void disp_terminate(); public: Conn(): ready_send(false) {} @@ -157,7 +157,7 @@ class ConnPool { int listen_fd; /**< for accepting new network connections */ void update_conn(const conn_t &conn, bool connected) { - user_tcall->call([this, conn, connected](ThreadCall::Handle &) { + user_tcall->async_call([this, conn, connected](ThreadCall::Handle &) { if (conn_cb) conn_cb(*conn, connected); }); } @@ -178,7 +178,7 @@ class ConnPool { } void feed(const conn_t &conn, int client_fd) { - tcall.call([this, conn, client_fd](ThreadCall::Handle &) { + tcall.async_call([this, conn, client_fd](ThreadCall::Handle &) { if (conn->mode == Conn::ConnMode::DEAD) { SALTICIDAE_LOG_INFO("worker %x discarding dead connection", @@ -214,7 +214,7 @@ class ConnPool { void unfeed() { nconn--; } void stop() { - tcall.call([this](ThreadCall::Handle &) { ec.stop(); }); + tcall.async_call([this](ThreadCall::Handle &) { ec.stop(); }); } std::thread &get_handle() { return handle; } @@ -352,24 +352,19 @@ class ConnPool { conn_t connect(const NetAddr &addr, bool blocking = true) { if (blocking) { - auto ret = static_cast(disp_tcall->call( + auto ret = *(static_cast(disp_tcall->call( [this, addr](ThreadCall::Handle &h) { - auto ptr = new conn_t(_connect(addr)); + auto conn = _connect(addr); std::atomic_thread_fence(std::memory_order_release); - h.set_result(ptr); - h.set_deleter([](void *data) { - delete static_cast(data); - }); - }, true)); - auto conn = *ret; - delete ret; - return std::move(conn); + h.set_result(std::move(conn)); + }).get())); + return std::move(ret); } else { - disp_tcall->call([this, addr](ThreadCall::Handle &) { + disp_tcall->async_call([this, addr](ThreadCall::Handle &) { _connect(addr); - }, false); + }); return nullptr; } } @@ -380,7 +375,7 @@ class ConnPool { void listen(NetAddr listen_addr) { disp_tcall->call([this, listen_addr](ThreadCall::Handle &) { _listen(listen_addr); - }, true); + }); } template diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 2cda44f..0498fa5 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -192,22 +192,23 @@ class Event { operator bool() const { return ev_fd != nullptr || ev_timer != nullptr; } }; +template class ThreadNotifier { std::condition_variable cv; std::mutex mlock; mutex_ul_t ul; bool ready; - void *data; + T data; public: ThreadNotifier(): ul(mlock), ready(false) {} - void *wait() { + T wait() { cv.wait(ul, [this]{ return ready; }); - return data; + return std::move(data); } - void notify(void *_data) { + void notify(T &&_data) { mutex_lg_t _(mlock); ready = true; - data = _data; + data = std::move(_data); cv.notify_all(); } }; @@ -218,21 +219,50 @@ class ThreadCall { Event ev_listen; public: + struct Result { + void *data; + std::function deleter; + Result(): data(nullptr) {} + Result(void *data, std::function &&deleter): + data(data), deleter(std::move(deleter)) {} + ~Result() { if (data != nullptr) deleter(data); } + Result(const Result &) = delete; + Result(Result &&other): + data(other.data), deleter(std::move(other.deleter)) { + other.data = nullptr; + } + void swap(Result &other) { + std::swap(data, other.data); + std::swap(deleter, other.deleter); + } + Result &operator=(const Result &other) = delete; + Result &operator=(Result &&other) { + if (this != &other) + { + Result tmp(std::move(other)); + tmp.swap(*this); + } + return *this; + } + void *get() { return data; } + }; class Handle { std::function callback; - std::function deleter; - ThreadNotifier* notifier; - void *result; + ThreadNotifier * notifier; + Result result; friend ThreadCall; public: - Handle(): notifier(nullptr), result(nullptr) {} + Handle(): notifier(nullptr) {} void exec() { callback(*this); - if (notifier) notifier->notify(result); + if (notifier) + notifier->notify(std::move(result)); + } + template + void set_result(T data) { + result = Result(new T(std::forward(data)), + [](void *ptr) {delete static_cast(ptr);}); } - void set_result(void *data) { result = data; } - template - void set_deleter(Func _deleter) { deleter = _deleter; } }; ThreadCall() = default; @@ -254,33 +284,28 @@ class ThreadCall { ev_listen.clear(); Handle *h; while (read(ctl_fd[0], &h, sizeof(h)) == sizeof(h)) - { - if (h->result && h->deleter) - h->deleter(h->result); delete h; - } close(ctl_fd[0]); close(ctl_fd[1]); } template - void *call(Func callback, bool blocking = false) { + void async_call(Func callback) { auto h = new Handle(); h->callback = callback; - if (blocking) - { - ThreadNotifier notifier; - h->notifier = ¬ifier; - std::atomic_thread_fence(std::memory_order_release); - write(ctl_fd[1], &h, sizeof(h)); - return notifier.wait(); - } - else - { - std::atomic_thread_fence(std::memory_order_release); - write(ctl_fd[1], &h, sizeof(h)); - return nullptr; - } + std::atomic_thread_fence(std::memory_order_release); + write(ctl_fd[1], &h, sizeof(h)); + } + + template + Result call(Func callback) { + auto h = new Handle(); + h->callback = callback; + ThreadNotifier notifier; + h->notifier = ¬ifier; + std::atomic_thread_fence(std::memory_order_release); + write(ctl_fd[1], &h, sizeof(h)); + return notifier.wait(); } }; diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 2d16938..5e966fe 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -482,7 +482,7 @@ void MsgNetwork::send_msg(const MsgType &_msg, Conn &conn) { template void PeerNetwork::tcall_reset_timeout(ConnPool::Worker *worker, const conn_t &conn, double timeout) { - worker->get_tcall()->call([conn, t=timeout](ThreadCall::Handle &) { + worker->get_tcall()->async_call([conn, t=timeout](ThreadCall::Handle &) { if (!conn->ev_timeout) return; conn->ev_timeout.del(); conn->ev_timeout.add_with_timeout(t, 0); @@ -621,7 +621,7 @@ void PeerNetwork::msg_ping(MsgPing &&msg, Conn &_conn) { auto conn = static_pointer_cast(_conn.self()); if (!conn) return; uint16_t port = msg.port; - this->disp_tcall->call([this, conn, port](ThreadCall::Handle &msg) { + this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &msg) { SALTICIDAE_LOG_INFO("ping from %s, port %u", std::string(*conn).c_str(), ntohs(port)); if (check_new_conn(conn, port)) return; @@ -635,7 +635,7 @@ void PeerNetwork::msg_pong(MsgPong &&msg, Conn &_conn) { auto conn = static_pointer_cast(_conn.self()); if (!conn) return; uint16_t port = msg.port; - this->disp_tcall->call([this, conn, port](ThreadCall::Handle &msg) { + this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &msg) { auto it = id2peer.find(conn->peer_id); if (it == id2peer.end()) { @@ -658,7 +658,7 @@ void PeerNetwork::listen(NetAddr listen_addr) { this->disp_tcall->call([this, listen_addr](ThreadCall::Handle &msg) { MsgNet::_listen(listen_addr); listen_port = listen_addr.port; - }, true); + }); } template @@ -669,40 +669,28 @@ void PeerNetwork::add_peer(const NetAddr &addr) { throw PeerNetworkError("peer already exists"); id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->disp_ec))); start_active_conn(addr); - }, true); + }); } template const typename PeerNetwork::conn_t PeerNetwork::get_peer_conn(const NetAddr &paddr) const { - auto ret = static_cast(this->disp_tcall->call( + auto ret = *(static_cast(this->disp_tcall->call( [this, paddr](ThreadCall::Handle &h) { auto it = id2peer.find(paddr); if (it == id2peer.end()) throw PeerNetworkError("peer does not exist"); - auto ptr = new conn_t(it->second->conn); - h.set_result(ptr); - h.set_deleter([](void *data) { - delete static_cast(data); - }); - })); - auto conn = *ret; - delete ret; - return std::move(conn); + h.set_result(it->second->conn); + }).get())); + return std::move(*ret); } template bool PeerNetwork::has_peer(const NetAddr &paddr) const { - auto ret = static_cast(this->disp_tcall->call( + return *(static_cast(this->disp_tcall->call( [this, paddr](ThreadCall::Handle &h) { h.set_result(id2peer.count(paddr)); - h.set_deleter([](void *data) { - delete static_cast(data); - }); - })); - auto has = *ret; - delete ret; - return has; + }).get())); } template @@ -734,19 +722,14 @@ void ClientNetwork::Conn::on_teardown() { template template void ClientNetwork::send_msg(const MsgType &msg, const NetAddr &addr) { - auto ret = static_cast(this->disp_tcall->call( + auto ret = *(static_cast(this->disp_tcall->call( [this, addr](ThreadCall::Handle &h) { auto it = addr2conn.find(addr); if (it == addr2conn.end()) throw PeerNetworkError("client does not exist"); - auto ptr = new conn_t(it->second->conn); - h.set_result(ptr); - h.set_deleter([](void *data) { - delete static_cast(data); - }); - })); - send_msg(msg, **ret); - delete ret; + h.set_result(it->second->conn); + }).get())); + send_msg(msg, *ret); } template diff --git a/src/conn.cpp b/src/conn.cpp index ca13619..5967339 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -138,11 +138,11 @@ void ConnPool::Conn::worker_terminate() { }); } -void ConnPool::Conn::disp_terminate(bool blocking) { +void ConnPool::Conn::disp_terminate() { if (worker && !worker->is_dispatcher()) worker->get_tcall()->call([conn=self()](ThreadCall::Handle &) { conn->stop(); - }, blocking); + }); else stop(); cpool->remove_conn(fd); } -- cgit v1.2.3-70-g09d2