aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/salticidae/conn.h29
-rw-r--r--include/salticidae/event.h89
-rw-r--r--include/salticidae/network.h47
-rw-r--r--src/conn.cpp4
4 files changed, 86 insertions, 83 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 6462ddc..3144a68 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -98,7 +98,7 @@ class ConnPool {
/** Terminate the connection (from the worker thread). */
void worker_terminate();
/** Terminate the connection (from the dispatcher thread). */
- void disp_terminate(bool blocking = true);
+ void disp_terminate();
public:
Conn(): ready_send(false) {}
@@ -157,7 +157,7 @@ class ConnPool {
int listen_fd; /**< for accepting new network connections */
void update_conn(const conn_t &conn, bool connected) {
- user_tcall->call([this, conn, connected](ThreadCall::Handle &) {
+ user_tcall->async_call([this, conn, connected](ThreadCall::Handle &) {
if (conn_cb) conn_cb(*conn, connected);
});
}
@@ -178,7 +178,7 @@ class ConnPool {
}
void feed(const conn_t &conn, int client_fd) {
- tcall.call([this, conn, client_fd](ThreadCall::Handle &) {
+ tcall.async_call([this, conn, client_fd](ThreadCall::Handle &) {
if (conn->mode == Conn::ConnMode::DEAD)
{
SALTICIDAE_LOG_INFO("worker %x discarding dead connection",
@@ -214,7 +214,7 @@ class ConnPool {
void unfeed() { nconn--; }
void stop() {
- tcall.call([this](ThreadCall::Handle &) { ec.stop(); });
+ tcall.async_call([this](ThreadCall::Handle &) { ec.stop(); });
}
std::thread &get_handle() { return handle; }
@@ -352,24 +352,19 @@ class ConnPool {
conn_t connect(const NetAddr &addr, bool blocking = true) {
if (blocking)
{
- auto ret = static_cast<conn_t *>(disp_tcall->call(
+ auto ret = *(static_cast<conn_t *>(disp_tcall->call(
[this, addr](ThreadCall::Handle &h) {
- auto ptr = new conn_t(_connect(addr));
+ auto conn = _connect(addr);
std::atomic_thread_fence(std::memory_order_release);
- h.set_result(ptr);
- h.set_deleter([](void *data) {
- delete static_cast<conn_t *>(data);
- });
- }, true));
- auto conn = *ret;
- delete ret;
- return std::move(conn);
+ h.set_result(std::move(conn));
+ }).get()));
+ return std::move(ret);
}
else
{
- disp_tcall->call([this, addr](ThreadCall::Handle &) {
+ disp_tcall->async_call([this, addr](ThreadCall::Handle &) {
_connect(addr);
- }, false);
+ });
return nullptr;
}
}
@@ -380,7 +375,7 @@ class ConnPool {
void listen(NetAddr listen_addr) {
disp_tcall->call([this, listen_addr](ThreadCall::Handle &) {
_listen(listen_addr);
- }, true);
+ });
}
template<typename Func>
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index 2cda44f..0498fa5 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -192,22 +192,23 @@ class Event {
operator bool() const { return ev_fd != nullptr || ev_timer != nullptr; }
};
+template<typename T>
class ThreadNotifier {
std::condition_variable cv;
std::mutex mlock;
mutex_ul_t ul;
bool ready;
- void *data;
+ T data;
public:
ThreadNotifier(): ul(mlock), ready(false) {}
- void *wait() {
+ T wait() {
cv.wait(ul, [this]{ return ready; });
- return data;
+ return std::move(data);
}
- void notify(void *_data) {
+ void notify(T &&_data) {
mutex_lg_t _(mlock);
ready = true;
- data = _data;
+ data = std::move(_data);
cv.notify_all();
}
};
@@ -218,21 +219,50 @@ class ThreadCall {
Event ev_listen;
public:
+ struct Result {
+ void *data;
+ std::function<void(void *)> deleter;
+ Result(): data(nullptr) {}
+ Result(void *data, std::function<void(void *)> &&deleter):
+ data(data), deleter(std::move(deleter)) {}
+ ~Result() { if (data != nullptr) deleter(data); }
+ Result(const Result &) = delete;
+ Result(Result &&other):
+ data(other.data), deleter(std::move(other.deleter)) {
+ other.data = nullptr;
+ }
+ void swap(Result &other) {
+ std::swap(data, other.data);
+ std::swap(deleter, other.deleter);
+ }
+ Result &operator=(const Result &other) = delete;
+ Result &operator=(Result &&other) {
+ if (this != &other)
+ {
+ Result tmp(std::move(other));
+ tmp.swap(*this);
+ }
+ return *this;
+ }
+ void *get() { return data; }
+ };
class Handle {
std::function<void(Handle &)> callback;
- std::function<void(void *)> deleter;
- ThreadNotifier* notifier;
- void *result;
+ ThreadNotifier<Result> * notifier;
+ Result result;
friend ThreadCall;
public:
- Handle(): notifier(nullptr), result(nullptr) {}
+ Handle(): notifier(nullptr) {}
void exec() {
callback(*this);
- if (notifier) notifier->notify(result);
+ if (notifier)
+ notifier->notify(std::move(result));
+ }
+ template<typename T>
+ void set_result(T data) {
+ result = Result(new T(std::forward<T>(data)),
+ [](void *ptr) {delete static_cast<T *>(ptr);});
}
- void set_result(void *data) { result = data; }
- template<typename Func>
- void set_deleter(Func _deleter) { deleter = _deleter; }
};
ThreadCall() = default;
@@ -254,33 +284,28 @@ class ThreadCall {
ev_listen.clear();
Handle *h;
while (read(ctl_fd[0], &h, sizeof(h)) == sizeof(h))
- {
- if (h->result && h->deleter)
- h->deleter(h->result);
delete h;
- }
close(ctl_fd[0]);
close(ctl_fd[1]);
}
template<typename Func>
- void *call(Func callback, bool blocking = false) {
+ void async_call(Func callback) {
auto h = new Handle();
h->callback = callback;
- if (blocking)
- {
- ThreadNotifier notifier;
- h->notifier = &notifier;
- std::atomic_thread_fence(std::memory_order_release);
- write(ctl_fd[1], &h, sizeof(h));
- return notifier.wait();
- }
- else
- {
- std::atomic_thread_fence(std::memory_order_release);
- write(ctl_fd[1], &h, sizeof(h));
- return nullptr;
- }
+ std::atomic_thread_fence(std::memory_order_release);
+ write(ctl_fd[1], &h, sizeof(h));
+ }
+
+ template<typename Func>
+ Result call(Func callback) {
+ auto h = new Handle();
+ h->callback = callback;
+ ThreadNotifier<Result> notifier;
+ h->notifier = &notifier;
+ std::atomic_thread_fence(std::memory_order_release);
+ write(ctl_fd[1], &h, sizeof(h));
+ return notifier.wait();
}
};
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 2d16938..5e966fe 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -482,7 +482,7 @@ void MsgNetwork<OpcodeType>::send_msg(const MsgType &_msg, Conn &conn) {
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker,
const conn_t &conn, double timeout) {
- worker->get_tcall()->call([conn, t=timeout](ThreadCall::Handle &) {
+ worker->get_tcall()->async_call([conn, t=timeout](ThreadCall::Handle &) {
if (!conn->ev_timeout) return;
conn->ev_timeout.del();
conn->ev_timeout.add_with_timeout(t, 0);
@@ -621,7 +621,7 @@ void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, Conn &_conn) {
auto conn = static_pointer_cast<Conn>(_conn.self());
if (!conn) return;
uint16_t port = msg.port;
- this->disp_tcall->call([this, conn, port](ThreadCall::Handle &msg) {
+ this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &msg) {
SALTICIDAE_LOG_INFO("ping from %s, port %u",
std::string(*conn).c_str(), ntohs(port));
if (check_new_conn(conn, port)) return;
@@ -635,7 +635,7 @@ void PeerNetwork<O, _, __>::msg_pong(MsgPong &&msg, Conn &_conn) {
auto conn = static_pointer_cast<Conn>(_conn.self());
if (!conn) return;
uint16_t port = msg.port;
- this->disp_tcall->call([this, conn, port](ThreadCall::Handle &msg) {
+ this->disp_tcall->async_call([this, conn, port](ThreadCall::Handle &msg) {
auto it = id2peer.find(conn->peer_id);
if (it == id2peer.end())
{
@@ -658,7 +658,7 @@ void PeerNetwork<O, _, __>::listen(NetAddr listen_addr) {
this->disp_tcall->call([this, listen_addr](ThreadCall::Handle &msg) {
MsgNet::_listen(listen_addr);
listen_port = listen_addr.port;
- }, true);
+ });
}
template<typename O, O _, O __>
@@ -669,40 +669,28 @@ void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
throw PeerNetworkError("peer already exists");
id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->disp_ec)));
start_active_conn(addr);
- }, true);
+ });
}
template<typename O, O _, O __>
const typename PeerNetwork<O, _, __>::conn_t
PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &paddr) const {
- auto ret = static_cast<conn_t *>(this->disp_tcall->call(
+ auto ret = *(static_cast<conn_t *>(this->disp_tcall->call(
[this, paddr](ThreadCall::Handle &h) {
auto it = id2peer.find(paddr);
if (it == id2peer.end())
throw PeerNetworkError("peer does not exist");
- auto ptr = new conn_t(it->second->conn);
- h.set_result(ptr);
- h.set_deleter([](void *data) {
- delete static_cast<conn_t *>(data);
- });
- }));
- auto conn = *ret;
- delete ret;
- return std::move(conn);
+ h.set_result(it->second->conn);
+ }).get()));
+ return std::move(*ret);
}
template<typename O, O _, O __>
bool PeerNetwork<O, _, __>::has_peer(const NetAddr &paddr) const {
- auto ret = static_cast<bool *>(this->disp_tcall->call(
+ return *(static_cast<bool *>(this->disp_tcall->call(
[this, paddr](ThreadCall::Handle &h) {
h.set_result(id2peer.count(paddr));
- h.set_deleter([](void *data) {
- delete static_cast<bool *>(data);
- });
- }));
- auto has = *ret;
- delete ret;
- return has;
+ }).get()));
}
template<typename O, O _, O __>
@@ -734,19 +722,14 @@ void ClientNetwork<OpcodeType>::Conn::on_teardown() {
template<typename OpcodeType>
template<typename MsgType>
void ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr) {
- auto ret = static_cast<conn_t *>(this->disp_tcall->call(
+ auto ret = *(static_cast<conn_t *>(this->disp_tcall->call(
[this, addr](ThreadCall::Handle &h) {
auto it = addr2conn.find(addr);
if (it == addr2conn.end())
throw PeerNetworkError("client does not exist");
- auto ptr = new conn_t(it->second->conn);
- h.set_result(ptr);
- h.set_deleter([](void *data) {
- delete static_cast<conn_t *>(data);
- });
- }));
- send_msg(msg, **ret);
- delete ret;
+ h.set_result(it->second->conn);
+ }).get()));
+ send_msg(msg, *ret);
}
template<typename O, O OPCODE_PING, O _>
diff --git a/src/conn.cpp b/src/conn.cpp
index ca13619..5967339 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -138,11 +138,11 @@ void ConnPool::Conn::worker_terminate() {
});
}
-void ConnPool::Conn::disp_terminate(bool blocking) {
+void ConnPool::Conn::disp_terminate() {
if (worker && !worker->is_dispatcher())
worker->get_tcall()->call([conn=self()](ThreadCall::Handle &) {
conn->stop();
- }, blocking);
+ });
else stop();
cpool->remove_conn(fd);
}