aboutsummaryrefslogtreecommitdiff
path: root/src/conn.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/conn.cpp')
-rw-r--r--src/conn.cpp49
1 files changed, 28 insertions, 21 deletions
diff --git a/src/conn.cpp b/src/conn.cpp
index 7b7c699..f2922d8 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -46,8 +46,8 @@ ConnPool::Conn::operator std::string() const {
/* the following two functions are executed by exactly one worker per Conn object */
-void ConnPool::Conn::send_data(evutil_socket_t fd, short events) {
- if (!(events & EV_WRITE)) return;
+void ConnPool::Conn::send_data(int fd, int events) {
+ if (!(events & Event::WRITE)) return;
auto conn = self(); /* pin the connection */
ssize_t ret = seg_buff_size;
for (;;)
@@ -77,7 +77,7 @@ void ConnPool::Conn::send_data(evutil_socket_t fd, short events) {
bytearray_t(buff_seg.begin() + ret, buff_seg.end()));
/* wait for the next write callback */
ready_send = false;
- ev_write.add();
+ //ev_write.add();
return;
}
}
@@ -85,8 +85,8 @@ void ConnPool::Conn::send_data(evutil_socket_t fd, short events) {
ready_send = true;
}
-void ConnPool::Conn::recv_data(evutil_socket_t fd, short events) {
- if (!(events & EV_READ)) return;
+void ConnPool::Conn::recv_data(int fd, int events) {
+ if (!(events & Event::READ)) return;
auto conn = self(); /* pin the connection */
ssize_t ret = seg_buff_size;
while (ret == (ssize_t)seg_buff_size)
@@ -105,13 +105,14 @@ void ConnPool::Conn::recv_data(evutil_socket_t fd, short events) {
}
if (ret == 0)
{
+ //SALTICIDAE_LOG_INFO("recv(%d) terminates", fd, strerror(errno));
terminate();
return;
}
buff_seg.resize(ret);
recv_buffer.push(std::move(buff_seg));
}
- ev_read.add();
+ //ev_read.add();
on_read();
}
@@ -121,7 +122,7 @@ void ConnPool::Conn::terminate() {
cpool->post_terminate(fd);
}
-void ConnPool::accept_client(evutil_socket_t fd, short) {
+void ConnPool::accept_client(int fd, int) {
int client_fd;
struct sockaddr client_addr;
socklen_t addr_size = sizeof(struct sockaddr_in);
@@ -151,7 +152,7 @@ void ConnPool::accept_client(evutil_socket_t fd, short) {
}
}
-void ConnPool::Conn::conn_server(evutil_socket_t fd, short events) {
+void ConnPool::Conn::conn_server(int fd, int events) {
auto conn = self(); /* pin the connection */
if (send(fd, "", 0, MSG_NOSIGNAL) == 0)
{
@@ -163,7 +164,7 @@ void ConnPool::Conn::conn_server(evutil_socket_t fd, short events) {
}
else
{
- if (events & EV_TIMEOUT)
+ if (events & Event::TIMEOUT)
SALTICIDAE_LOG_INFO("%s connect timeout", std::string(*this).c_str());
terminate();
return;
@@ -196,12 +197,8 @@ void ConnPool::listen(NetAddr listen_addr) {
throw ConnPoolError(std::string("binding error"));
if (::listen(listen_fd, max_listen_backlog) < 0)
throw ConnPoolError(std::string("listen error"));
- {
- std::lock_guard<std::mutex> _(dsp_ec_mlock);
- ev_listen = Event(dispatcher_ec, listen_fd, EV_READ | EV_PERSIST,
- std::bind(&ConnPool::accept_client, this, _1, _2));
- ev_listen.add();
- }
+ auto dcmd = new DspAcceptListen(listen_fd);
+ write(dlisten_fd[1], &dcmd, sizeof(dcmd));
SALTICIDAE_LOG_INFO("listening to %u", ntohs(listen_addr.port));
}
@@ -237,12 +234,8 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) {
}
else
{
- {
- std::lock_guard<std::mutex> _(dsp_ec_mlock);
- conn->ev_connect = Event(dispatcher_ec, fd, EV_WRITE,
- std::bind(&Conn::conn_server, conn.get(), _1, _2));
- conn->ev_connect.add_with_timeout(conn_server_timeout);
- }
+ auto dcmd = new DspConnectListen(conn);
+ write(dlisten_fd[1], &dcmd, sizeof(dcmd));
add_conn(conn);
SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str());
}
@@ -271,4 +264,18 @@ ConnPool::conn_t ConnPool::add_conn(const conn_t &conn) {
return pool.insert(std::make_pair(conn->fd, conn)).first->second;
}
+void ConnPool::_accept_listen(int listen_fd) {
+ std::lock_guard<std::mutex> _(cp_mlock);
+ ev_listen = Event(dispatcher_ec, listen_fd, Event::READ,
+ std::bind(&ConnPool::accept_client, this, _1, _2));
+ ev_listen.add();
+}
+
+void ConnPool::_connect_listen(const conn_t &conn) {
+ std::lock_guard<std::mutex> _(cp_mlock);
+ conn->ev_connect = Event(dispatcher_ec, conn->fd, Event::WRITE,
+ std::bind(&Conn::conn_server, conn.get(), _1, _2));
+ conn->ev_connect.add_with_timeout(conn_server_timeout);
+}
+
}