aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/conn.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/conn.h')
-rw-r--r--include/salticidae/conn.h267
1 files changed, 81 insertions, 186 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index a86a4d2..2ef6b50 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -95,6 +95,8 @@ class ConnPool {
void send_data(int, int);
void conn_server(int, int);
+ /** Stop the worker and I/O events. */
+ void stop();
/** Terminate the connection. */
void terminate();
@@ -146,54 +148,27 @@ class ConnPool {
const size_t seg_buff_size;
/* owned by user loop */
- int mlisten_fd[2]; /**< for connection events sent to the user loop */
- Event ev_mlisten;
+ BoxObj<ThreadCall> user_tcall;
conn_callback_t conn_cb;
/* owned by the dispatcher */
+ Event ev_listen;
std::unordered_map<int, conn_t> pool;
int listen_fd; /**< for accepting new network connections */
- Event ev_listen;
- Event ev_dlisten;
- std::mutex cp_mlock;
void update_conn(const conn_t &conn, bool connected) {
- auto dcmd = new UserConn(conn, connected);
- write(mlisten_fd[1], &dcmd, sizeof(dcmd));
+ user_tcall->call([this, conn, connected](ThreadCall::Handle &) {
+ if (conn_cb) conn_cb(*conn, connected);
+ });
}
- struct Worker;
- class WorkerFeed;
-
- class WorkerCmd {
- public:
- virtual ~WorkerCmd() = default;
- virtual void exec(Worker *worker) = 0;
- };
-
class Worker {
EventContext ec;
- Event ev_ctl;
- int ctl_fd[2]; /**< for control messages from dispatcher */
+ ThreadCall msgr;
std::thread handle;
public:
- Worker() {
- if (pipe2(ctl_fd, O_NONBLOCK))
- throw ConnPoolError(std::string("failed to create worker pipe"));
- ev_ctl = Event(ec, ctl_fd[0], Event::READ, [this](int fd, int) {
- WorkerCmd *dcmd;
- read(fd, &dcmd, sizeof(dcmd));
- dcmd->exec(this);
- delete dcmd;
- });
- ev_ctl.add();
- }
-
- ~Worker() {
- close(ctl_fd[0]);
- close(ctl_fd[1]);
- }
+ Worker(): msgr(ec) {}
/* the following functions are called by the dispatcher */
void start() {
@@ -201,143 +176,69 @@ class ConnPool {
}
void feed(const conn_t &conn, int client_fd) {
- auto dcmd = new WorkerFeed(conn, client_fd);
- write(ctl_fd[1], &dcmd, sizeof(dcmd));
+ msgr.call([this, conn, client_fd](ThreadCall::Handle &) {
+ SALTICIDAE_LOG_INFO("worker %x got %s",
+ std::this_thread::get_id(),
+ std::string(*conn).c_str());
+ conn->get_send_buffer()
+ .get_queue()
+ .reg_handler(this->ec, [conn, client_fd]
+ (MPSCWriteBuffer::queue_t &) {
+ if (conn->ready_send && conn->fd != -1)
+ conn->send_data(client_fd, Event::WRITE);
+ return false;
+ });
+ //auto conn_ptr = conn.get();
+ conn->ev_read = Event(ec, client_fd, Event::READ | Event::WRITE, [conn=conn](int fd, int what) {
+ if (what & Event::READ)
+ conn->recv_data(fd, what);
+ else
+ conn->send_data(fd, what);
+ });
+
+ // std::bind(&Conn::recv_data, conn_ptr, _1, _2));
+ //conn->ev_write = Event(ec, client_fd, Event::WRITE,
+ // std::bind(&Conn::send_data, conn_ptr, _1, _2));
+ conn->ev_read.add();
+ //conn->ev_write.add();
+ });
}
void stop() {
- auto dcmd = new WorkerStop();
- write(ctl_fd[1], &dcmd, sizeof(dcmd));
+ msgr.call([this](ThreadCall::Handle &) {
+ ec.stop();
+ });
}
std::thread &get_handle() { return handle; }
const EventContext &get_ec() { return ec; }
};
- class WorkerFeed: public WorkerCmd {
- conn_t conn;
- int client_fd;
-
- public:
- WorkerFeed(const conn_t &conn, int client_fd):
- conn(conn), client_fd(client_fd) {}
- void exec(Worker *worker) override {
- SALTICIDAE_LOG_INFO("worker %x got %s",
- std::this_thread::get_id(),
- std::string(*conn).c_str());
- auto &ec = worker->get_ec();
- conn->get_send_buffer()
- .get_queue()
- .reg_handler(ec, [conn=this->conn,
- client_fd=this->client_fd](MPSCWriteBuffer::queue_t &) {
- if (conn->ready_send && conn->fd != -1)
- conn->send_data(client_fd, Event::WRITE);
- return false;
- });
- //auto conn_ptr = conn.get();
- conn->ev_read = Event(ec, client_fd, Event::READ | Event::WRITE, [conn=conn](int fd, int what) {
- if (what & Event::READ)
- conn->recv_data(fd, what);
- else
- conn->send_data(fd, what);
- });
-
- // std::bind(&Conn::recv_data, conn_ptr, _1, _2));
- //conn->ev_write = Event(ec, client_fd, Event::WRITE,
- // std::bind(&Conn::send_data, conn_ptr, _1, _2));
- conn->ev_read.add();
- //conn->ev_write.add();
- }
- };
-
- class WorkerStop: public WorkerCmd {
- public:
- void exec(Worker *worker) override { worker->get_ec().stop(); }
- };
-
/* related to workers */
size_t nworker;
salticidae::BoxObj<Worker[]> workers;
void accept_client(int, int);
conn_t add_conn(const conn_t &conn);
- conn_t _connect(const NetAddr &addr);
- void _post_terminate(int fd);
- void _accept_listen(int listen_fd);
- void _connect_listen(const conn_t &conn);
+ void terminate(int fd);
protected:
- class DispatchCmd {
- public:
- virtual ~DispatchCmd() = default;
- virtual void exec(ConnPool *cpool) = 0;
- };
+ conn_t _connect(const NetAddr &addr);
+ void _listen(NetAddr listen_addr);
private:
- class DspConnect: public DispatchCmd {
- const NetAddr addr;
- public:
- DspConnect(const NetAddr &addr): addr(addr) {}
- void exec(ConnPool *cpool) override {
- cpool->update_conn(cpool->_connect(addr), true);
- }
- };
-
- class DspPostTerm: public DispatchCmd {
- int fd;
- public:
- DspPostTerm(int fd): fd(fd) {}
- void exec(ConnPool *cpool) override {
- cpool->_post_terminate(fd);
- }
- };
-
- class DspMulticast: public DispatchCmd {
- std::vector<conn_t> receivers;
- bytearray_t data;
- public:
- DspMulticast(std::vector<conn_t> &&receivers, bytearray_t &&data):
- receivers(std::move(receivers)),
- data(std::move(data)) {}
- void exec(ConnPool *) override {
- for (auto &r: receivers) r->write(bytearray_t(data));
- }
- };
-
- class DspAcceptListen: public DispatchCmd {
- int listen_fd;
- public:
- DspAcceptListen(int listen_fd): listen_fd(listen_fd) {}
- void exec(ConnPool *cpool) override {
- cpool->_accept_listen(listen_fd);
- }
- };
- class DspConnectListen: public DispatchCmd {
- conn_t conn;
- public:
- DspConnectListen(const conn_t &conn): conn(conn) {}
- void exec(ConnPool *cpool) override {
- cpool->_connect_listen(conn);
- }
- };
-
- class UserConn: public DispatchCmd {
- conn_t conn;
- bool connected;
- public:
- UserConn(const conn_t &conn, bool connected):
- conn(conn), connected(connected) {}
- void exec(ConnPool *cpool) override {
- if (cpool->conn_cb)
- cpool->conn_cb(*conn, connected);
- }
- };
-
- void post_terminate(int fd) {
- auto dcmd = new DspPostTerm(fd);
- write(dlisten_fd[1], &dcmd, sizeof(dcmd));
- }
+ //class DspMulticast: public DispatchCmd {
+ // std::vector<conn_t> receivers;
+ // bytearray_t data;
+ // public:
+ // DspMulticast(std::vector<conn_t> &&receivers, bytearray_t &&data):
+ // receivers(std::move(receivers)),
+ // data(std::move(data)) {}
+ // void exec(ConnPool *) override {
+ // for (auto &r: receivers) r->write(bytearray_t(data));
+ // }
+ //};
Worker &select_worker() {
return workers[1];
@@ -346,7 +247,7 @@ class ConnPool {
protected:
EventContext ec;
EventContext dispatcher_ec;
- int dlisten_fd[2]; /**< for control command sent to the dispatcher */
+ BoxObj<ThreadCall> disp_tcall;
/** Should be implemented by derived class to return a new Conn object. */
virtual Conn *create_conn() = 0;
@@ -360,35 +261,13 @@ class ConnPool {
conn_server_timeout(conn_server_timeout),
seg_buff_size(seg_buff_size),
listen_fd(-1),
- nworker(std::min((size_t)1, nworker)),
+ nworker(std::max((size_t)1, nworker)),
ec(ec) {
- if (pipe2(mlisten_fd, O_NONBLOCK))
- throw ConnPoolError(std::string("failed to create main pipe"));
- if (pipe2(dlisten_fd, O_NONBLOCK))
- throw ConnPoolError(std::string("failed to create dispatcher pipe"));
-
- ev_mlisten = Event(ec, mlisten_fd[0], Event::READ, [this](int fd, int) {
- DispatchCmd *dcmd;
- read(fd, &dcmd, sizeof(dcmd));
- dcmd->exec(this);
- delete dcmd;
- });
- ev_mlisten.add();
-
workers = new Worker[nworker];
dispatcher_ec = workers[0].get_ec();
- ev_dlisten = Event(dispatcher_ec, dlisten_fd[0], Event::READ, [this](int fd, int) {
- DispatchCmd *dcmd;
- read(fd, &dcmd, sizeof(dcmd));
- dcmd->exec(this);
- delete dcmd;
- });
- ev_dlisten.add();
-
- SALTICIDAE_LOG_INFO("starting all threads...");
- for (size_t i = 0; i < nworker; i++)
- workers[i].start();
+ user_tcall = new ThreadCall(ec);
+ disp_tcall = new ThreadCall(dispatcher_ec);
}
~ConnPool() {
@@ -404,24 +283,36 @@ class ConnPool {
conn->on_close();
}
if (listen_fd != -1) close(listen_fd);
- for (int i = 0; i < 2; i++)
- {
- close(mlisten_fd[i]);
- close(dlisten_fd[i]);
- }
}
ConnPool(const ConnPool &) = delete;
ConnPool(ConnPool &&) = delete;
+ void start() {
+ SALTICIDAE_LOG_INFO("starting all threads...");
+ for (size_t i = 0; i < nworker; i++)
+ workers[i].start();
+ }
+
/** Actively connect to remote addr. */
conn_t connect(const NetAddr &addr, bool blocking = true) {
if (blocking)
- return _connect(addr);
+ {
+ auto ret = static_cast<conn_t *>(disp_tcall->call(
+ [this, addr](ThreadCall::Handle &h) {
+ auto ptr = new conn_t(_connect(addr));
+ std::atomic_thread_fence(std::memory_order_release);
+ h.set_result(ptr);
+ }, true));
+ auto conn = *ret;
+ delete ret;
+ return std::move(conn);
+ }
else
{
- auto dcmd = new DspConnect(addr);
- write(dlisten_fd[1], &dcmd, sizeof(dcmd));
+ disp_tcall->call([this, addr](ThreadCall::Handle &) {
+ _connect(addr);
+ }, false);
return nullptr;
}
}
@@ -429,7 +320,11 @@ class ConnPool {
/** Listen for passive connections (connection initiated from remote).
* Does not need to be called if do not want to accept any passive
* connections. */
- void listen(NetAddr listen_addr);
+ void listen(NetAddr listen_addr) {
+ disp_tcall->call([this, listen_addr](ThreadCall::Handle &) {
+ _listen(listen_addr);
+ }, true);
+ }
template<typename Func>
void reg_conn_handler(Func cb) { conn_cb = cb; }