aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2018-11-14 15:19:32 -0500
committerDeterminant <[email protected]>2018-11-14 15:19:32 -0500
commit0f341fe7f092f704e1c1952c72085eb1ebd2086a (patch)
treef730db073fa232f911e6df7e6099852a090330bb
parent2c1e8ec448a1039ab9a46bce4c959e6ec3cefeb8 (diff)
use ThreadCall pattern
-rw-r--r--include/salticidae/conn.h267
-rw-r--r--include/salticidae/event.h86
-rw-r--r--include/salticidae/network.h217
-rw-r--r--include/salticidae/queue.h2
-rw-r--r--src/conn.cpp43
-rw-r--r--test/test_network.cpp3
-rw-r--r--test/test_p2p.cpp3
7 files changed, 282 insertions, 339 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index a86a4d2..2ef6b50 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -95,6 +95,8 @@ class ConnPool {
void send_data(int, int);
void conn_server(int, int);
+ /** Stop the worker and I/O events. */
+ void stop();
/** Terminate the connection. */
void terminate();
@@ -146,54 +148,27 @@ class ConnPool {
const size_t seg_buff_size;
/* owned by user loop */
- int mlisten_fd[2]; /**< for connection events sent to the user loop */
- Event ev_mlisten;
+ BoxObj<ThreadCall> user_tcall;
conn_callback_t conn_cb;
/* owned by the dispatcher */
+ Event ev_listen;
std::unordered_map<int, conn_t> pool;
int listen_fd; /**< for accepting new network connections */
- Event ev_listen;
- Event ev_dlisten;
- std::mutex cp_mlock;
void update_conn(const conn_t &conn, bool connected) {
- auto dcmd = new UserConn(conn, connected);
- write(mlisten_fd[1], &dcmd, sizeof(dcmd));
+ user_tcall->call([this, conn, connected](ThreadCall::Handle &) {
+ if (conn_cb) conn_cb(*conn, connected);
+ });
}
- struct Worker;
- class WorkerFeed;
-
- class WorkerCmd {
- public:
- virtual ~WorkerCmd() = default;
- virtual void exec(Worker *worker) = 0;
- };
-
class Worker {
EventContext ec;
- Event ev_ctl;
- int ctl_fd[2]; /**< for control messages from dispatcher */
+ ThreadCall msgr;
std::thread handle;
public:
- Worker() {
- if (pipe2(ctl_fd, O_NONBLOCK))
- throw ConnPoolError(std::string("failed to create worker pipe"));
- ev_ctl = Event(ec, ctl_fd[0], Event::READ, [this](int fd, int) {
- WorkerCmd *dcmd;
- read(fd, &dcmd, sizeof(dcmd));
- dcmd->exec(this);
- delete dcmd;
- });
- ev_ctl.add();
- }
-
- ~Worker() {
- close(ctl_fd[0]);
- close(ctl_fd[1]);
- }
+ Worker(): msgr(ec) {}
/* the following functions are called by the dispatcher */
void start() {
@@ -201,143 +176,69 @@ class ConnPool {
}
void feed(const conn_t &conn, int client_fd) {
- auto dcmd = new WorkerFeed(conn, client_fd);
- write(ctl_fd[1], &dcmd, sizeof(dcmd));
+ msgr.call([this, conn, client_fd](ThreadCall::Handle &) {
+ SALTICIDAE_LOG_INFO("worker %x got %s",
+ std::this_thread::get_id(),
+ std::string(*conn).c_str());
+ conn->get_send_buffer()
+ .get_queue()
+ .reg_handler(this->ec, [conn, client_fd]
+ (MPSCWriteBuffer::queue_t &) {
+ if (conn->ready_send && conn->fd != -1)
+ conn->send_data(client_fd, Event::WRITE);
+ return false;
+ });
+ //auto conn_ptr = conn.get();
+ conn->ev_read = Event(ec, client_fd, Event::READ | Event::WRITE, [conn=conn](int fd, int what) {
+ if (what & Event::READ)
+ conn->recv_data(fd, what);
+ else
+ conn->send_data(fd, what);
+ });
+
+ // std::bind(&Conn::recv_data, conn_ptr, _1, _2));
+ //conn->ev_write = Event(ec, client_fd, Event::WRITE,
+ // std::bind(&Conn::send_data, conn_ptr, _1, _2));
+ conn->ev_read.add();
+ //conn->ev_write.add();
+ });
}
void stop() {
- auto dcmd = new WorkerStop();
- write(ctl_fd[1], &dcmd, sizeof(dcmd));
+ msgr.call([this](ThreadCall::Handle &) {
+ ec.stop();
+ });
}
std::thread &get_handle() { return handle; }
const EventContext &get_ec() { return ec; }
};
- class WorkerFeed: public WorkerCmd {
- conn_t conn;
- int client_fd;
-
- public:
- WorkerFeed(const conn_t &conn, int client_fd):
- conn(conn), client_fd(client_fd) {}
- void exec(Worker *worker) override {
- SALTICIDAE_LOG_INFO("worker %x got %s",
- std::this_thread::get_id(),
- std::string(*conn).c_str());
- auto &ec = worker->get_ec();
- conn->get_send_buffer()
- .get_queue()
- .reg_handler(ec, [conn=this->conn,
- client_fd=this->client_fd](MPSCWriteBuffer::queue_t &) {
- if (conn->ready_send && conn->fd != -1)
- conn->send_data(client_fd, Event::WRITE);
- return false;
- });
- //auto conn_ptr = conn.get();
- conn->ev_read = Event(ec, client_fd, Event::READ | Event::WRITE, [conn=conn](int fd, int what) {
- if (what & Event::READ)
- conn->recv_data(fd, what);
- else
- conn->send_data(fd, what);
- });
-
- // std::bind(&Conn::recv_data, conn_ptr, _1, _2));
- //conn->ev_write = Event(ec, client_fd, Event::WRITE,
- // std::bind(&Conn::send_data, conn_ptr, _1, _2));
- conn->ev_read.add();
- //conn->ev_write.add();
- }
- };
-
- class WorkerStop: public WorkerCmd {
- public:
- void exec(Worker *worker) override { worker->get_ec().stop(); }
- };
-
/* related to workers */
size_t nworker;
salticidae::BoxObj<Worker[]> workers;
void accept_client(int, int);
conn_t add_conn(const conn_t &conn);
- conn_t _connect(const NetAddr &addr);
- void _post_terminate(int fd);
- void _accept_listen(int listen_fd);
- void _connect_listen(const conn_t &conn);
+ void terminate(int fd);
protected:
- class DispatchCmd {
- public:
- virtual ~DispatchCmd() = default;
- virtual void exec(ConnPool *cpool) = 0;
- };
+ conn_t _connect(const NetAddr &addr);
+ void _listen(NetAddr listen_addr);
private:
- class DspConnect: public DispatchCmd {
- const NetAddr addr;
- public:
- DspConnect(const NetAddr &addr): addr(addr) {}
- void exec(ConnPool *cpool) override {
- cpool->update_conn(cpool->_connect(addr), true);
- }
- };
-
- class DspPostTerm: public DispatchCmd {
- int fd;
- public:
- DspPostTerm(int fd): fd(fd) {}
- void exec(ConnPool *cpool) override {
- cpool->_post_terminate(fd);
- }
- };
-
- class DspMulticast: public DispatchCmd {
- std::vector<conn_t> receivers;
- bytearray_t data;
- public:
- DspMulticast(std::vector<conn_t> &&receivers, bytearray_t &&data):
- receivers(std::move(receivers)),
- data(std::move(data)) {}
- void exec(ConnPool *) override {
- for (auto &r: receivers) r->write(bytearray_t(data));
- }
- };
-
- class DspAcceptListen: public DispatchCmd {
- int listen_fd;
- public:
- DspAcceptListen(int listen_fd): listen_fd(listen_fd) {}
- void exec(ConnPool *cpool) override {
- cpool->_accept_listen(listen_fd);
- }
- };
- class DspConnectListen: public DispatchCmd {
- conn_t conn;
- public:
- DspConnectListen(const conn_t &conn): conn(conn) {}
- void exec(ConnPool *cpool) override {
- cpool->_connect_listen(conn);
- }
- };
-
- class UserConn: public DispatchCmd {
- conn_t conn;
- bool connected;
- public:
- UserConn(const conn_t &conn, bool connected):
- conn(conn), connected(connected) {}
- void exec(ConnPool *cpool) override {
- if (cpool->conn_cb)
- cpool->conn_cb(*conn, connected);
- }
- };
-
- void post_terminate(int fd) {
- auto dcmd = new DspPostTerm(fd);
- write(dlisten_fd[1], &dcmd, sizeof(dcmd));
- }
+ //class DspMulticast: public DispatchCmd {
+ // std::vector<conn_t> receivers;
+ // bytearray_t data;
+ // public:
+ // DspMulticast(std::vector<conn_t> &&receivers, bytearray_t &&data):
+ // receivers(std::move(receivers)),
+ // data(std::move(data)) {}
+ // void exec(ConnPool *) override {
+ // for (auto &r: receivers) r->write(bytearray_t(data));
+ // }
+ //};
Worker &select_worker() {
return workers[1];
@@ -346,7 +247,7 @@ class ConnPool {
protected:
EventContext ec;
EventContext dispatcher_ec;
- int dlisten_fd[2]; /**< for control command sent to the dispatcher */
+ BoxObj<ThreadCall> disp_tcall;
/** Should be implemented by derived class to return a new Conn object. */
virtual Conn *create_conn() = 0;
@@ -360,35 +261,13 @@ class ConnPool {
conn_server_timeout(conn_server_timeout),
seg_buff_size(seg_buff_size),
listen_fd(-1),
- nworker(std::min((size_t)1, nworker)),
+ nworker(std::max((size_t)1, nworker)),
ec(ec) {
- if (pipe2(mlisten_fd, O_NONBLOCK))
- throw ConnPoolError(std::string("failed to create main pipe"));
- if (pipe2(dlisten_fd, O_NONBLOCK))
- throw ConnPoolError(std::string("failed to create dispatcher pipe"));
-
- ev_mlisten = Event(ec, mlisten_fd[0], Event::READ, [this](int fd, int) {
- DispatchCmd *dcmd;
- read(fd, &dcmd, sizeof(dcmd));
- dcmd->exec(this);
- delete dcmd;
- });
- ev_mlisten.add();
-
workers = new Worker[nworker];
dispatcher_ec = workers[0].get_ec();
- ev_dlisten = Event(dispatcher_ec, dlisten_fd[0], Event::READ, [this](int fd, int) {
- DispatchCmd *dcmd;
- read(fd, &dcmd, sizeof(dcmd));
- dcmd->exec(this);
- delete dcmd;
- });
- ev_dlisten.add();
-
- SALTICIDAE_LOG_INFO("starting all threads...");
- for (size_t i = 0; i < nworker; i++)
- workers[i].start();
+ user_tcall = new ThreadCall(ec);
+ disp_tcall = new ThreadCall(dispatcher_ec);
}
~ConnPool() {
@@ -404,24 +283,36 @@ class ConnPool {
conn->on_close();
}
if (listen_fd != -1) close(listen_fd);
- for (int i = 0; i < 2; i++)
- {
- close(mlisten_fd[i]);
- close(dlisten_fd[i]);
- }
}
ConnPool(const ConnPool &) = delete;
ConnPool(ConnPool &&) = delete;
+ void start() {
+ SALTICIDAE_LOG_INFO("starting all threads...");
+ for (size_t i = 0; i < nworker; i++)
+ workers[i].start();
+ }
+
/** Actively connect to remote addr. */
conn_t connect(const NetAddr &addr, bool blocking = true) {
if (blocking)
- return _connect(addr);
+ {
+ auto ret = static_cast<conn_t *>(disp_tcall->call(
+ [this, addr](ThreadCall::Handle &h) {
+ auto ptr = new conn_t(_connect(addr));
+ std::atomic_thread_fence(std::memory_order_release);
+ h.set_result(ptr);
+ }, true));
+ auto conn = *ret;
+ delete ret;
+ return std::move(conn);
+ }
else
{
- auto dcmd = new DspConnect(addr);
- write(dlisten_fd[1], &dcmd, sizeof(dcmd));
+ disp_tcall->call([this, addr](ThreadCall::Handle &) {
+ _connect(addr);
+ }, false);
return nullptr;
}
}
@@ -429,7 +320,11 @@ class ConnPool {
/** Listen for passive connections (connection initiated from remote).
* Does not need to be called if do not want to accept any passive
* connections. */
- void listen(NetAddr listen_addr);
+ void listen(NetAddr listen_addr) {
+ disp_tcall->call([this, listen_addr](ThreadCall::Handle &) {
+ _listen(listen_addr);
+ }, true);
+ }
template<typename Func>
void reg_conn_handler(Func cb) { conn_cb = cb; }
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index c21644b..da27902 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -25,10 +25,12 @@
#ifndef _SALTICIDAE_EVENT_H
#define _SALTICIDAE_EVENT_H
+#include <condition_variable>
#include <unistd.h>
#include <uv.h>
#include <sys/eventfd.h>
+#include "salticidae/type.h"
#include "salticidae/queue.h"
#include "salticidae/util.h"
#include "salticidae/ref.h"
@@ -177,6 +179,90 @@ class Event {
operator bool() const { return ev_fd != nullptr || ev_timer != nullptr; }
};
+class ThreadNotifier {
+ std::condition_variable cv;
+ std::mutex mlock;
+ mutex_ul_t ul;
+ bool ready;
+ void *data;
+ public:
+ ThreadNotifier(): ul(mlock), ready(false) {}
+ void *wait() {
+ cv.wait(ul, [this]{ return ready; });
+ return data;
+ }
+ void notify(void *_data) {
+ {
+ mutex_lg_t _(mlock);
+ ready = true;
+ data = _data;
+ }
+ cv.notify_all();
+ }
+};
+
+class ThreadCall {
+ int ctl_fd[2];
+ EventContext ec;
+ Event ev_listen;
+
+ public:
+ class Handle {
+ std::function<void(Handle &)> callback;
+ ThreadNotifier* notifier;
+ void *result;
+ friend ThreadCall;
+ public:
+ Handle(): notifier(nullptr), result(nullptr) {}
+ void exec() {
+ callback(*this);
+ if (notifier) notifier->notify(result);
+ }
+ void set_result(void *data) { result = data; }
+ };
+
+ ThreadCall() = default;
+ ThreadCall(const ThreadCall &) = delete;
+ ThreadCall(ThreadCall &&) = delete;
+ ThreadCall(EventContext ec): ec(ec) {
+ if (pipe2(ctl_fd, O_NONBLOCK))
+ throw SalticidaeError(std::string("ThreadCall: failed to create pipe"));
+ ev_listen = Event(ec, ctl_fd[0], Event::READ, [this](int fd, int) {
+ Handle *h;
+ read(fd, &h, sizeof(h));
+ h->exec();
+ delete h;
+ });
+ ev_listen.add();
+ }
+
+ ~ThreadCall() {
+ close(ctl_fd[0]);
+ close(ctl_fd[1]);
+ }
+
+ template<typename Func>
+ void *call(Func callback, bool blocking = false) {
+ auto h = new Handle();
+ h->callback = callback;
+ if (blocking)
+ {
+ ThreadNotifier notifier;
+ h->notifier = &notifier;
+ std::atomic_thread_fence(std::memory_order_release);
+ write(ctl_fd[1], &h, sizeof(h));
+ return notifier.wait();
+ }
+ else
+ {
+ std::atomic_thread_fence(std::memory_order_release);
+ write(ctl_fd[1], &h, sizeof(h));
+ return nullptr;
+ }
+ }
+};
+
+
template<typename T>
class MPSCQueueEventDriven: public MPSCQueue<T> {
private:
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 18406ea..e5165bf 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -206,7 +206,6 @@ class ClientNetwork: public MsgNetwork<OpcodeType> {
private:
std::unordered_map<NetAddr, typename MsgNet::conn_t> addr2conn;
- std::mutex cn_mlock;
public:
class Conn: public MsgNet::Conn {
@@ -298,7 +297,6 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
bool ping_timer_ok;
bool pong_msg_ok;
bool connected;
- std::mutex mlock;
Peer() = delete;
Peer(NetAddr addr, conn_t conn, const EventContext &ec):
@@ -321,7 +319,6 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
};
std::unordered_map <NetAddr, BoxObj<Peer>> id2peer;
- std::mutex pn_mlock;
const IdentityMode id_mode;
double retry_conn_delay;
@@ -355,30 +352,11 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
}
};
- struct PingCmd: public ConnPool::DispatchCmd {
- conn_t conn;
- uint16_t port;
- PingCmd(const conn_t &conn, uint16_t port):
- conn(conn), port(port) {}
- void exec(ConnPool *cpool) override {
- auto pn = static_cast<PeerNetwork *>(cpool);
- pn->_ping_msg_cb(conn, port);
- }
- };
-
- struct PongCmd: public PingCmd {
- using PingCmd::PingCmd;
- void exec(ConnPool *cpool) override {
- auto pn = static_cast<PeerNetwork *>(cpool);
- pn->_pong_msg_cb(this->conn, this->port);
- }
- };
-
void msg_ping(MsgPing &&msg, Conn &conn);
void msg_pong(MsgPong &&msg, Conn &conn);
void _ping_msg_cb(const conn_t &conn, uint16_t port);
void _pong_msg_cb(const conn_t &conn, uint16_t port);
- bool check_new_conn(Conn &conn, uint16_t port);
+ bool check_new_conn(const conn_t &conn, uint16_t port);
void start_active_conn(const NetAddr &paddr);
protected:
@@ -402,12 +380,14 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
id_mode(id_mode),
retry_conn_delay(retry_conn_delay),
ping_period(ping_period),
- conn_timeout(conn_timeout) {}
+ conn_timeout(conn_timeout) {
+ this->reg_handler(generic_bind(&PeerNetwork::msg_ping, this, _1, _2));
+ this->reg_handler(generic_bind(&PeerNetwork::msg_pong, this, _1, _2));
+ }
void add_peer(const NetAddr &paddr);
const conn_t get_peer_conn(const NetAddr &paddr) const;
- template<typename MsgType>
- void _send_msg(const MsgType &msg, const Peer *peer);
+ using MsgNet::send_msg;
template<typename MsgType>
void send_msg(const MsgType &msg, const NetAddr &paddr);
void listen(NetAddr listen_addr);
@@ -471,7 +451,7 @@ void PeerNetwork<O, _, __>::Conn::on_setup() {
MsgNet::Conn::on_setup();
auto pn = get_net();
assert(!ev_timeout);
- ev_timeout = Event(pn->ec, -1, 0, [this](int, int) {
+ ev_timeout = Event(pn->dispatcher_ec, -1, 0, [this](int, int) {
SALTICIDAE_LOG_INFO("peer ping-pong timeout");
this->terminate();
});
@@ -483,18 +463,16 @@ void PeerNetwork<O, _, __>::Conn::on_setup() {
/* the initial ping-pong to set up the connection */
auto &conn = static_cast<Conn &>(*this);
reset_timeout(pn->conn_timeout);
- pn->MsgNet::send_msg(MsgPing(pn->listen_port), conn);
+ pn->send_msg(MsgPing(pn->listen_port), conn);
}
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::Conn::on_teardown() {
MsgNet::Conn::on_teardown();
auto pn = get_net();
- mutex_lg_t _pn_lg(pn->pn_mlock);
auto it = pn->id2peer.find(peer_id);
if (it == pn->id2peer.end()) return;
auto p = it->second.get();
- mutex_lg_t _p_lg(p->mlock);
if (this != p->conn.get()) return;
p->ev_ping_timer.del();
p->connected = false;
@@ -505,7 +483,6 @@ void PeerNetwork<O, _, __>::Conn::on_teardown() {
// try to reconnect
p->ev_retry_timer = Event(pn->dispatcher_ec, -1, 0,
[pn, peer_id = this->peer_id](int, int) {
- mutex_lg_t _pn_lg(pn->pn_mlock);
pn->start_active_conn(peer_id);
});
p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout());
@@ -550,12 +527,11 @@ void PeerNetwork<O, _, __>::Peer::send_ping() {
ping_timer_ok = false;
pong_msg_ok = false;
conn->reset_timeout(pn->conn_timeout);
- pn->_send_msg(MsgPing(pn->listen_port), this);
+ pn->send_msg(MsgPing(pn->listen_port), *conn);
}
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::Peer::ping_timer(int, int) {
- mutex_lg_t _p_lg(mlock);
ping_timer_ok = true;
if (pong_msg_ok)
{
@@ -565,149 +541,132 @@ void PeerNetwork<O, _, __>::Peer::ping_timer(int, int) {
}
template<typename O, O _, O __>
-bool PeerNetwork<O, _, __>::check_new_conn(Conn &conn, uint16_t port) {
- if (conn.peer_id.is_null())
+bool PeerNetwork<O, _, __>::check_new_conn(const conn_t &conn, uint16_t port) {
+ if (conn->peer_id.is_null())
{ /* passive connections can eventually have ids after getting the port
number in IP_BASED_PORT mode */
assert(id_mode == IP_PORT_BASED);
- conn.peer_id.ip = conn.get_addr().ip;
- conn.peer_id.port = port;
+ conn->peer_id.ip = conn->get_addr().ip;
+ conn->peer_id.port = port;
}
- auto p = id2peer.find(conn.peer_id)->second.get();
- mutex_lg_t _p_lg(p->mlock);
+ auto p = id2peer.find(conn->peer_id)->second.get();
if (p->connected)
{
- if (conn.self() != p->conn)
+ if (conn != p->conn)
{
- conn.terminate();
+ conn->terminate();
return true;
}
return false;
}
- p->reset_conn(static_pointer_cast<Conn>(conn.self()));
+ p->reset_conn(conn);
p->connected = true;
p->reset_ping_timer();
p->send_ping();
if (p->connected)
SALTICIDAE_LOG_INFO("PeerNetwork: established connection with %s via %s",
- std::string(conn.peer_id).c_str(), std::string(conn).c_str());
+ std::string(conn->peer_id).c_str(), std::string(*conn).c_str());
return false;
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::_ping_msg_cb(const conn_t &conn, uint16_t port) {
- mutex_lg_t _pn_lg(pn_mlock);
- if (check_new_conn(*conn, port)) return;
- auto p = id2peer.find(conn->peer_id)->second.get();
- mutex_lg_t _p_lg(p->mlock);
- _send_msg(MsgPong(this->listen_port), p);
-}
-
-template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::_pong_msg_cb(const conn_t &conn, uint16_t port) {
- mutex_lg_t _pn_lg(pn_mlock);
- auto it = id2peer.find(conn->peer_id);
- if (it == id2peer.end())
- {
- SALTICIDAE_LOG_WARN("pong message discarded");
- return;
- }
- if (check_new_conn(*conn, port)) return;
- auto p = it->second.get();
- mutex_lg_t _p_lg(p->mlock);
- p->pong_msg_ok = true;
- if (p->ping_timer_ok)
- {
- p->reset_ping_timer();
- p->send_ping();
- }
-}
-
-/* end: functions invoked by the dispatcher */
-
-/* this function could be both invoked by the dispatcher and the user loop */
-template<typename O, O _, O __>
void PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) {
auto p = id2peer.find(addr)->second.get();
- mutex_lg_t _p_lg(p->mlock);
if (p->connected) return;
- auto conn = static_pointer_cast<Conn>(MsgNet::connect(addr));
+ auto conn = static_pointer_cast<Conn>(MsgNet::_connect(addr));
assert(p->conn == nullptr);
p->conn = conn;
}
+/* end: functions invoked by the dispatcher */
/* begin: functions invoked by the user loop */
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, Conn &conn) {
+void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, Conn &_conn) {
+ auto conn = static_pointer_cast<Conn>(_conn.self());
+ if (!conn) return;
uint16_t port = msg.port;
- SALTICIDAE_LOG_INFO("ping from %s, port %u", std::string(conn).c_str(), ntohs(port));
- auto dcmd = new PingCmd(static_pointer_cast<Conn>(conn.self()), port);
- write(this->dlisten_fd[1], &dcmd, sizeof(dcmd));
+ this->disp_tcall->call([this, conn, port](ThreadCall::Handle &msg) {
+ SALTICIDAE_LOG_INFO("ping from %s, port %u",
+ std::string(*conn).c_str(), ntohs(port));
+ if (check_new_conn(conn, port)) return;
+ auto p = id2peer.find(conn->peer_id)->second.get();
+ send_msg(MsgPong(this->listen_port), *conn);
+ });
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::msg_pong(MsgPong &&msg, Conn &conn) {
- auto dcmd = new PongCmd(static_pointer_cast<Conn>(conn.self()), msg.port);
- write(this->dlisten_fd[1], &dcmd, sizeof(dcmd));
+void PeerNetwork<O, _, __>::msg_pong(MsgPong &&msg, Conn &_conn) {
+ auto conn = static_pointer_cast<Conn>(_conn.self());
+ if (!conn) return;
+ uint16_t port = msg.port;
+ this->disp_tcall->call([this, conn, port](ThreadCall::Handle &msg) {
+ auto it = id2peer.find(conn->peer_id);
+ if (it == id2peer.end())
+ {
+ SALTICIDAE_LOG_WARN("pong message discarded");
+ return;
+ }
+ if (check_new_conn(conn, port)) return;
+ auto p = it->second.get();
+ p->pong_msg_ok = true;
+ if (p->ping_timer_ok)
+ {
+ p->reset_ping_timer();
+ p->send_ping();
+ }
+ });
}
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::listen(NetAddr listen_addr) {
- MsgNet::listen(listen_addr);
- listen_port = listen_addr.port;
- this->reg_handler(generic_bind(&PeerNetwork::msg_ping, this, _1, _2));
- this->reg_handler(generic_bind(&PeerNetwork::msg_pong, this, _1, _2));
+ this->disp_tcall->call([this, listen_addr](ThreadCall::Handle &msg) {
+ MsgNet::_listen(listen_addr);
+ listen_port = listen_addr.port;
+ }, true);
}
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
- mutex_lg_t _pn_lg(pn_mlock);
- auto it = id2peer.find(addr);
- if (it != id2peer.end())
- throw PeerNetworkError("peer already exists");
- id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->dispatcher_ec)));
- start_active_conn(addr);
+ this->disp_tcall->call([this, addr](ThreadCall::Handle &) {
+ auto it = id2peer.find(addr);
+ if (it != id2peer.end())
+ throw PeerNetworkError("peer already exists");
+ id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->dispatcher_ec)));
+ start_active_conn(addr);
+ }, true);
}
template<typename O, O _, O __>
const typename PeerNetwork<O, _, __>::conn_t
PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &paddr) const {
- mutex_lg_t _pn_lg(pn_mlock);
- auto it = id2peer.find(paddr);
- if (it == id2peer.end())
- throw PeerNetworkError("peer does not exist");
- return it->second->conn;
+ auto ret = static_cast<conn_t *>(this->disp_tcall->call(
+ [this, paddr](ThreadCall::Handle &h) {
+ auto it = id2peer.find(paddr);
+ if (it == id2peer.end())
+ throw PeerNetworkError("peer does not exist");
+ auto ptr = new conn_t(it->second->conn);
+ h.set_result(ptr);
+ }));
+ auto conn = *ret;
+ delete ret;
+ return std::move(conn);
}
template<typename O, O _, O __>
bool PeerNetwork<O, _, __>::has_peer(const NetAddr &paddr) const {
- mutex_lg_t _pn_lg(pn_mlock);
- return id2peer.count(paddr);
-}
-
-template<typename O, O _, O __>
-template<typename MsgType>
-void PeerNetwork<O, _, __>::_send_msg(const MsgType &msg, const Peer *peer) {
- if (peer->connected)
- MsgNet::send_msg(msg, *(peer->conn));
- else
- SALTICIDAE_LOG_DEBUG("dropped");
+ auto ret = static_cast<bool *>(this->disp_tcall->call(
+ [this, paddr](ThreadCall::Handle &h) {
+ h.set_result(id2peer.count(paddr));
+ }));
+ auto has = *ret;
+ delete ret;
+ return has;
}
template<typename O, O _, O __>
template<typename MsgType>
void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &addr) {
- mutex_lg_t _pn_lg(pn_mlock);
- auto it = id2peer.find(addr);
- if (it == id2peer.end())
- {
- SALTICIDAE_LOG_ERROR("sending to non-existing peer: %s",
- std::string(addr).c_str());
- throw PeerNetworkError("peer does not exist");
- }
- auto p = it->second.get();
- mutex_lg_t _p_lg(p->mlock);
- _send_msg(msg, p);
+ send_msg(msg, *get_peer_conn(addr));
}
/* end: functions invoked by the user loop */
@@ -716,7 +675,6 @@ void ClientNetwork<OpcodeType>::Conn::on_setup() {
MsgNet::Conn::on_setup();
assert(this->get_mode() == Conn::PASSIVE);
const auto &addr = this->get_addr();
- mutex_lg_t _cn_lg(cn_mlock);
auto cn = get_net();
cn->addr2conn.erase(addr);
cn->addr2conn.insert(
@@ -728,17 +686,22 @@ template<typename OpcodeType>
void ClientNetwork<OpcodeType>::Conn::on_teardown() {
MsgNet::Conn::on_teardown();
assert(this->get_mode() == Conn::PASSIVE);
- mutex_lg_t _cn_lg(cn_mlock);
get_net()->addr2conn.erase(this->get_addr());
}
template<typename OpcodeType>
template<typename MsgType>
void ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr) {
- mutex_lg_t _cn_lg(cn_mlock);
- auto it = addr2conn.find(addr);
- if (it == addr2conn.end()) return;
- MsgNet::send_msg(msg, *(it->second));
+ auto ret = static_cast<conn_t *>(this->disp_tcall->call(
+ [this, addr](ThreadCall::Handle &h) {
+ auto it = addr2conn.find(addr);
+ if (it == addr2conn.end())
+ throw PeerNetworkError("client does not exist");
+ auto ptr = new conn_t(it->second->conn);
+ h.set_result(ptr);
+ }));
+ send_msg(msg, **ret);
+ delete ret;
}
template<typename O, O OPCODE_PING, O _>
diff --git a/include/salticidae/queue.h b/include/salticidae/queue.h
index 9045e2b..0b493ea 100644
--- a/include/salticidae/queue.h
+++ b/include/salticidae/queue.h
@@ -105,7 +105,7 @@ class MPMCQueue {
void _enqueue(Block *nblk, U &&e) {
new (&(nblk->elem)) T(std::forward<U>(e));
nblk->next.store(nullptr, std::memory_order_release);
- auto prev = tail.exchange(nblk, std::memory_order_relaxed);
+ auto prev = tail.exchange(nblk, std::memory_order_acq_rel);
prev->next.store(nblk, std::memory_order_relaxed);
}
diff --git a/src/conn.cpp b/src/conn.cpp
index f2922d8..6b2e3aa 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -116,12 +116,20 @@ void ConnPool::Conn::recv_data(int fd, int events) {
on_read();
}
-void ConnPool::Conn::terminate() {
+void ConnPool::Conn::stop() {
ev_read.clear();
ev_write.clear();
- cpool->post_terminate(fd);
}
+void ConnPool::Conn::terminate() {
+ stop();
+ cpool->disp_tcall->call(
+ [cpool=this->cpool, fd=this->fd](ThreadCall::Handle &) {
+ cpool->terminate(fd);
+ });
+}
+
+
void ConnPool::accept_client(int fd, int) {
int client_fd;
struct sockaddr client_addr;
@@ -171,8 +179,7 @@ void ConnPool::Conn::conn_server(int fd, int events) {
}
}
-void ConnPool::listen(NetAddr listen_addr) {
- std::lock_guard<std::mutex> _(cp_mlock);
+void ConnPool::_listen(NetAddr listen_addr) {
int one = 1;
if (listen_fd != -1)
{ /* reset the previous listen() */
@@ -197,8 +204,9 @@ void ConnPool::listen(NetAddr listen_addr) {
throw ConnPoolError(std::string("binding error"));
if (::listen(listen_fd, max_listen_backlog) < 0)
throw ConnPoolError(std::string("listen error"));
- auto dcmd = new DspAcceptListen(listen_fd);
- write(dlisten_fd[1], &dcmd, sizeof(dcmd));
+ ev_listen = Event(dispatcher_ec, listen_fd, Event::READ,
+ std::bind(&ConnPool::accept_client, this, _1, _2));
+ ev_listen.add();
SALTICIDAE_LOG_INFO("listening to %u", ntohs(listen_addr.port));
}
@@ -234,16 +242,16 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) {
}
else
{
- auto dcmd = new DspConnectListen(conn);
- write(dlisten_fd[1], &dcmd, sizeof(dcmd));
+ conn->ev_connect = Event(dispatcher_ec, conn->fd, Event::WRITE,
+ std::bind(&Conn::conn_server, conn.get(), _1, _2));
+ conn->ev_connect.add_with_timeout(conn_server_timeout);
add_conn(conn);
SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str());
}
return conn;
}
-void ConnPool::_post_terminate(int fd) {
- std::lock_guard<std::mutex> _(cp_mlock);
+void ConnPool::terminate(int fd) {
auto it = pool.find(fd);
if (it != pool.end())
{
@@ -259,23 +267,8 @@ void ConnPool::_post_terminate(int fd) {
}
ConnPool::conn_t ConnPool::add_conn(const conn_t &conn) {
- std::lock_guard<std::mutex> _(cp_mlock);
assert(pool.find(conn->fd) == pool.end());
return pool.insert(std::make_pair(conn->fd, conn)).first->second;
}
-void ConnPool::_accept_listen(int listen_fd) {
- std::lock_guard<std::mutex> _(cp_mlock);
- ev_listen = Event(dispatcher_ec, listen_fd, Event::READ,
- std::bind(&ConnPool::accept_client, this, _1, _2));
- ev_listen.add();
-}
-
-void ConnPool::_connect_listen(const conn_t &conn) {
- std::lock_guard<std::mutex> _(cp_mlock);
- conn->ev_connect = Event(dispatcher_ec, conn->fd, Event::WRITE,
- std::bind(&Conn::conn_server, conn.get(), _1, _2));
- conn->ev_connect.add_with_timeout(conn_server_timeout);
-}
-
}
diff --git a/test/test_network.cpp b/test/test_network.cpp
index 061b520..d93d0ff 100644
--- a/test/test_network.cpp
+++ b/test/test_network.cpp
@@ -141,6 +141,9 @@ int main() {
alice.reg_handler(on_receive_ack);
bob.reg_handler(on_receive_ack);
+ alice.start();
+ bob.start();
+
alice.listen(alice_addr);
bob.listen(bob_addr);
diff --git a/test/test_p2p.cpp b/test/test_p2p.cpp
index 4146fd9..f52f48f 100644
--- a/test/test_p2p.cpp
+++ b/test/test_p2p.cpp
@@ -139,6 +139,9 @@ int main() {
alice.reg_handler(on_receive_ack);
bob.reg_handler(on_receive_ack);
+ alice.start();
+ bob.start();
+
alice.listen(alice_addr);
bob.listen(bob_addr);