aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/conn.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/conn.h')
-rw-r--r--include/salticidae/conn.h58
1 files changed, 42 insertions, 16 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 1364d4d..a86a4d2 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -39,6 +39,7 @@
#include <mutex>
#include <thread>
#include <fcntl.h>
+#include <event2/thread.h>
#include "salticidae/type.h"
#include "salticidae/ref.h"
@@ -90,9 +91,9 @@ class ConnPool {
/** does not need to wait if true */
bool ready_send;
- void recv_data(evutil_socket_t, short);
- void send_data(evutil_socket_t, short);
- void conn_server(evutil_socket_t, short);
+ void recv_data(int, int);
+ void send_data(int, int);
+ void conn_server(int, int);
/** Terminate the connection. */
void terminate();
@@ -180,7 +181,7 @@ class ConnPool {
Worker() {
if (pipe2(ctl_fd, O_NONBLOCK))
throw ConnPoolError(std::string("failed to create worker pipe"));
- ev_ctl = Event(ec, ctl_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) {
+ ev_ctl = Event(ec, ctl_fd[0], Event::READ, [this](int fd, int) {
WorkerCmd *dcmd;
read(fd, &dcmd, sizeof(dcmd));
dcmd->exec(this);
@@ -229,17 +230,23 @@ class ConnPool {
.get_queue()
.reg_handler(ec, [conn=this->conn,
client_fd=this->client_fd](MPSCWriteBuffer::queue_t &) {
- if (conn->ready_send)
- conn->send_data(client_fd, EV_WRITE);
+ if (conn->ready_send && conn->fd != -1)
+ conn->send_data(client_fd, Event::WRITE);
return false;
});
- auto conn_ptr = conn.get();
- conn->ev_read = Event(ec, client_fd, EV_READ,
- std::bind(&Conn::recv_data, conn_ptr, _1, _2));
- conn->ev_write = Event(ec, client_fd, EV_WRITE,
- std::bind(&Conn::send_data, conn_ptr, _1, _2));
+ //auto conn_ptr = conn.get();
+ conn->ev_read = Event(ec, client_fd, Event::READ | Event::WRITE, [conn=conn](int fd, int what) {
+ if (what & Event::READ)
+ conn->recv_data(fd, what);
+ else
+ conn->send_data(fd, what);
+ });
+
+ // std::bind(&Conn::recv_data, conn_ptr, _1, _2));
+ //conn->ev_write = Event(ec, client_fd, Event::WRITE,
+ // std::bind(&Conn::send_data, conn_ptr, _1, _2));
conn->ev_read.add();
- conn->ev_write.add();
+ //conn->ev_write.add();
}
};
@@ -252,10 +259,12 @@ class ConnPool {
size_t nworker;
salticidae::BoxObj<Worker[]> workers;
- void accept_client(evutil_socket_t, short);
+ void accept_client(int, int);
conn_t add_conn(const conn_t &conn);
conn_t _connect(const NetAddr &addr);
void _post_terminate(int fd);
+ void _accept_listen(int listen_fd);
+ void _connect_listen(const conn_t &conn);
protected:
class DispatchCmd {
@@ -295,6 +304,24 @@ class ConnPool {
}
};
+ class DspAcceptListen: public DispatchCmd {
+ int listen_fd;
+ public:
+ DspAcceptListen(int listen_fd): listen_fd(listen_fd) {}
+ void exec(ConnPool *cpool) override {
+ cpool->_accept_listen(listen_fd);
+ }
+ };
+
+ class DspConnectListen: public DispatchCmd {
+ conn_t conn;
+ public:
+ DspConnectListen(const conn_t &conn): conn(conn) {}
+ void exec(ConnPool *cpool) override {
+ cpool->_connect_listen(conn);
+ }
+ };
+
class UserConn: public DispatchCmd {
conn_t conn;
bool connected;
@@ -320,7 +347,6 @@ class ConnPool {
EventContext ec;
EventContext dispatcher_ec;
int dlisten_fd[2]; /**< for control command sent to the dispatcher */
- std::mutex dsp_ec_mlock;
/** Should be implemented by derived class to return a new Conn object. */
virtual Conn *create_conn() = 0;
@@ -341,7 +367,7 @@ class ConnPool {
if (pipe2(dlisten_fd, O_NONBLOCK))
throw ConnPoolError(std::string("failed to create dispatcher pipe"));
- ev_mlisten = Event(ec, mlisten_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) {
+ ev_mlisten = Event(ec, mlisten_fd[0], Event::READ, [this](int fd, int) {
DispatchCmd *dcmd;
read(fd, &dcmd, sizeof(dcmd));
dcmd->exec(this);
@@ -352,7 +378,7 @@ class ConnPool {
workers = new Worker[nworker];
dispatcher_ec = workers[0].get_ec();
- ev_dlisten = Event(dispatcher_ec, dlisten_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) {
+ ev_dlisten = Event(dispatcher_ec, dlisten_fd[0], Event::READ, [this](int fd, int) {
DispatchCmd *dcmd;
read(fd, &dcmd, sizeof(dcmd));
dcmd->exec(this);