aboutsummaryrefslogtreecommitdiff
path: root/src/conn.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/conn.cpp')
-rw-r--r--src/conn.cpp144
1 files changed, 122 insertions, 22 deletions
diff --git a/src/conn.cpp b/src/conn.cpp
index 7f485fd..3ec4284 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -51,19 +51,18 @@ ConnPool::Conn::operator std::string() const {
return std::move(s);
}
-/* the following two functions are executed by exactly one worker per Conn object */
+/* the following functions are executed by exactly one worker per Conn object */
-void ConnPool::Conn::send_data(int fd, int events) {
+void ConnPool::Conn::_send_data(const ConnPool::conn_t &conn, int fd, int events) {
if (events & FdEvent::ERROR)
{
- worker_terminate();
+ conn->worker_terminate();
return;
}
- auto conn = self(); /* pin the connection */
- ssize_t ret = seg_buff_size;
+ ssize_t ret = conn->seg_buff_size;
for (;;)
{
- bytearray_t buff_seg = send_buffer.move_pop();
+ bytearray_t buff_seg = conn->send_buffer.move_pop();
ssize_t size = buff_seg.size();
if (!size) break;
ret = send(fd, buff_seg.data(), size, MSG_NOSIGNAL);
@@ -74,37 +73,37 @@ void ConnPool::Conn::send_data(int fd, int events) {
if (ret < 1) /* nothing is sent */
{
/* rewind the whole buff_seg */
- send_buffer.rewind(std::move(buff_seg));
+ conn->send_buffer.rewind(std::move(buff_seg));
if (ret < 0 && errno != EWOULDBLOCK)
{
SALTICIDAE_LOG_INFO("send(%d) failure: %s", fd, strerror(errno));
- worker_terminate();
+ conn->worker_terminate();
return;
}
}
else
/* rewind the leftover */
- send_buffer.rewind(
+ conn->send_buffer.rewind(
bytearray_t(buff_seg.begin() + ret, buff_seg.end()));
/* wait for the next write callback */
- ready_send = false;
+ conn->ready_send = false;
//ev_write.add();
return;
}
}
- ev_socket.del();
- ev_socket.add(FdEvent::READ);
+ conn->ev_socket.del();
+ conn->ev_socket.add(FdEvent::READ);
/* consumed the buffer but endpoint still seems to be writable */
- ready_send = true;
+ conn->ready_send = true;
}
-void ConnPool::Conn::recv_data(int fd, int events) {
+void ConnPool::Conn::_recv_data(const ConnPool::conn_t &conn, int fd, int events) {
if (events & FdEvent::ERROR)
{
- worker_terminate();
+ conn->worker_terminate();
return;
}
- auto conn = self(); /* pin the connection */
+ const size_t seg_buff_size = conn->seg_buff_size;
ssize_t ret = seg_buff_size;
while (ret == (ssize_t)seg_buff_size)
{
@@ -117,21 +116,122 @@ void ConnPool::Conn::recv_data(int fd, int events) {
if (errno == EWOULDBLOCK) break;
SALTICIDAE_LOG_INFO("recv(%d) failure: %s", fd, strerror(errno));
/* connection err or half-opened connection */
- worker_terminate();
+ conn->worker_terminate();
return;
}
if (ret == 0)
{
//SALTICIDAE_LOG_INFO("recv(%d) terminates", fd, strerror(errno));
- worker_terminate();
+ conn->worker_terminate();
return;
}
buff_seg.resize(ret);
- recv_buffer.push(std::move(buff_seg));
+ conn->recv_buffer.push(std::move(buff_seg));
}
//ev_read.add();
- on_read();
+ conn->on_read();
+}
+
+
+void ConnPool::Conn::_send_data_tls(const ConnPool::conn_t &conn, int fd, int events) {
+ if (events & FdEvent::ERROR)
+ {
+ conn->worker_terminate();
+ return;
+ }
+ ssize_t ret = conn->seg_buff_size;
+ auto &tls = conn->tls;
+ for (;;)
+ {
+ bytearray_t buff_seg = conn->send_buffer.move_pop();
+ ssize_t size = buff_seg.size();
+ if (!size) break;
+ ret = tls->send(buff_seg.data(), size);
+ SALTICIDAE_LOG_DEBUG("ssl sent %zd bytes", ret);
+ size -= ret;
+ if (size > 0)
+ {
+ if (ret < 1) /* nothing is sent */
+ {
+ /* rewind the whole buff_seg */
+ conn->send_buffer.rewind(std::move(buff_seg));
+ if (ret < 0 && tls->get_error(ret) != SSL_ERROR_WANT_WRITE)
+ {
+ SALTICIDAE_LOG_INFO("send(%d) failure: %s", fd, strerror(errno));
+ conn->worker_terminate();
+ return;
+ }
+ }
+ else
+ /* rewind the leftover */
+ conn->send_buffer.rewind(
+ bytearray_t(buff_seg.begin() + ret, buff_seg.end()));
+ /* wait for the next write callback */
+ conn->ready_send = false;
+ return;
+ }
+ }
+ conn->ev_socket.del();
+ conn->ev_socket.add(FdEvent::READ);
+ /* consumed the buffer but endpoint still seems to be writable */
+ conn->ready_send = true;
+}
+
+void ConnPool::Conn::_recv_data_tls(const ConnPool::conn_t &conn, int fd, int events) {
+ if (events & FdEvent::ERROR)
+ {
+ conn->worker_terminate();
+ return;
+ }
+ const size_t seg_buff_size = conn->seg_buff_size;
+ ssize_t ret = seg_buff_size;
+ auto &tls = conn->tls;
+ while (ret == (ssize_t)seg_buff_size)
+ {
+ bytearray_t buff_seg;
+ buff_seg.resize(seg_buff_size);
+ ret = tls->recv(buff_seg.data(), seg_buff_size);
+ SALTICIDAE_LOG_DEBUG("ssl read %zd bytes", ret);
+ if (ret < 0)
+ {
+ if (tls->get_error(ret) == SSL_ERROR_WANT_READ) break;
+ SALTICIDAE_LOG_INFO("recv(%d) failure: %s", fd, strerror(errno));
+ /* connection err or half-opened connection */
+ conn->worker_terminate();
+ return;
+ }
+ if (ret == 0)
+ {
+ conn->worker_terminate();
+ return;
+ }
+ buff_seg.resize(ret);
+ conn->recv_buffer.push(std::move(buff_seg));
+ }
+ conn->on_read();
+}
+
+void ConnPool::Conn::_send_data_tls_handshake(const ConnPool::conn_t &conn, int fd, int events) {
+ int ret;
+ if (conn->tls->do_handshake(ret))
+ {
+ conn->send_data_func = _send_data_tls;
+ conn->recv_data_func = _recv_data_tls;
+ conn->cpool->update_conn(conn, true);
+ }
+ else
+ {
+ conn->ev_socket.del();
+ conn->ev_socket.add(ret == 0 ? FdEvent::READ : FdEvent::WRITE);
+ SALTICIDAE_LOG_INFO("tls handshake %d", ret);
+ }
+}
+
+void ConnPool::Conn::_recv_data_tls_handshake(const ConnPool::conn_t &conn, int fd, int events) {
+ conn->ready_send = true;
+ _send_data_tls_handshake(conn, fd, events);
}
+/****/
void ConnPool::Conn::stop() {
if (mode != ConnMode::DEAD)
@@ -188,6 +288,7 @@ void ConnPool::accept_client(int fd, int) {
conn->send_buffer.set_capacity(queue_capacity);
conn->seg_buff_size = seg_buff_size;
conn->fd = client_fd;
+ conn->worker = nullptr;
conn->cpool = this;
conn->mode = Conn::PASSIVE;
conn->addr = addr;
@@ -196,7 +297,6 @@ void ConnPool::accept_client(int fd, int) {
auto &worker = select_worker();
conn->worker = &worker;
conn->on_setup();
- update_conn(conn, true);
worker.feed(conn, client_fd);
}
} catch (ConnPoolError &e) {
@@ -214,7 +314,6 @@ void ConnPool::Conn::conn_server(int fd, int events) {
SALTICIDAE_LOG_INFO("connected to remote %s", std::string(*this).c_str());
worker = &(cpool->select_worker());
on_setup();
- cpool->update_conn(conn, true);
worker->feed(conn, fd);
}
else
@@ -282,6 +381,7 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) {
conn->send_buffer.set_capacity(queue_capacity);
conn->seg_buff_size = seg_buff_size;
conn->fd = fd;
+ conn->worker = nullptr;
conn->cpool = this;
conn->mode = Conn::ACTIVE;
conn->addr = addr;