aboutsummaryrefslogtreecommitdiff
path: root/src/conn.cpp
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2020-02-17 18:33:44 -0500
committerDeterminant <[email protected]>2020-02-17 18:33:44 -0500
commit2587e2c1b944882bc0b1e5b088f605076baa3435 (patch)
tree0bd477537c53d35a73bcce1c0e078246ef4a7d5c /src/conn.cpp
parent4fed5578db7714d8317d1bfe9262143d25034d00 (diff)
WIP: use PeerId to identify peers and allow one-way handshake
Diffstat (limited to 'src/conn.cpp')
-rw-r--r--src/conn.cpp44
1 files changed, 20 insertions, 24 deletions
diff --git a/src/conn.cpp b/src/conn.cpp
index ba584c1..a5d60a7 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -66,7 +66,7 @@ void ConnPool::Conn::_send_data(const conn_t &conn, int fd, int events) {
conn->cpool->worker_terminate(conn);
return;
}
- ssize_t ret = conn->seg_buff_size;
+ ssize_t ret = conn->recv_chunk_size;
for (;;)
{
bytearray_t buff_seg = conn->send_buffer.move_pop();
@@ -94,13 +94,13 @@ void ConnPool::Conn::_send_data(const conn_t &conn, int fd, int events) {
bytearray_t(buff_seg.begin() + ret, buff_seg.end()));
/* wait for the next write callback */
conn->ready_send = false;
- //ev_write.add();
return;
}
}
+ /* the send_buffer is empty though the kernel buffer is still available, so
+ * temporarily mask the WRITE event and mark the `ready_send` flag */
conn->ev_socket.del();
conn->ev_socket.add(conn->ready_recv ? 0 : FdEvent::READ);
- /* consumed the buffer but endpoint still seems to be writable */
conn->ready_send = true;
}
@@ -110,21 +110,21 @@ void ConnPool::Conn::_recv_data(const conn_t &conn, int fd, int events) {
conn->cpool->worker_terminate(conn);
return;
}
- const size_t seg_buff_size = conn->seg_buff_size;
- ssize_t ret = seg_buff_size;
- while (ret == (ssize_t)seg_buff_size)
+ const size_t recv_chunk_size = conn->recv_chunk_size;
+ ssize_t ret = recv_chunk_size;
+ while (ret == (ssize_t)recv_chunk_size)
{
if (conn->recv_buffer.len() >= conn->max_recv_buff_size)
{
- /* receive buffer is full, disable READ event */
+ /* recv_buffer is full, temporarily mask the READ event */
conn->ev_socket.del();
conn->ev_socket.add(conn->ready_send ? 0 : FdEvent::WRITE);
conn->ready_recv = true;
return;
}
bytearray_t buff_seg;
- buff_seg.resize(seg_buff_size);
- ret = recv(fd, buff_seg.data(), seg_buff_size, 0);
+ buff_seg.resize(recv_chunk_size);
+ ret = recv(fd, buff_seg.data(), recv_chunk_size, 0);
SALTICIDAE_LOG_DEBUG("socket read %zd bytes", ret);
if (ret < 0)
{
@@ -136,14 +136,14 @@ void ConnPool::Conn::_recv_data(const conn_t &conn, int fd, int events) {
}
if (ret == 0)
{
- //SALTICIDAE_LOG_INFO("recv(%d) terminates", fd, strerror(errno));
+ /* the remote closes the connection */
conn->cpool->worker_terminate(conn);
return;
}
buff_seg.resize(ret);
conn->recv_buffer.push(std::move(buff_seg));
}
- //ev_read.add();
+ /* wait for the next read callback */
conn->ready_recv = false;
conn->cpool->on_read(conn);
}
@@ -155,7 +155,7 @@ void ConnPool::Conn::_send_data_tls(const conn_t &conn, int fd, int events) {
conn->cpool->worker_terminate(conn);
return;
}
- ssize_t ret = conn->seg_buff_size;
+ ssize_t ret = conn->recv_chunk_size;
auto &tls = conn->tls;
for (;;)
{
@@ -189,7 +189,6 @@ void ConnPool::Conn::_send_data_tls(const conn_t &conn, int fd, int events) {
}
conn->ev_socket.del();
conn->ev_socket.add(conn->ready_recv ? 0 : FdEvent::READ);
- /* consumed the buffer but endpoint still seems to be writable */
conn->ready_send = true;
}
@@ -199,28 +198,26 @@ void ConnPool::Conn::_recv_data_tls(const conn_t &conn, int fd, int events) {
conn->cpool->worker_terminate(conn);
return;
}
- const size_t seg_buff_size = conn->seg_buff_size;
- ssize_t ret = seg_buff_size;
+ const size_t recv_chunk_size = conn->recv_chunk_size;
+ ssize_t ret = recv_chunk_size;
auto &tls = conn->tls;
- while (ret == (ssize_t)seg_buff_size)
+ while (ret == (ssize_t)recv_chunk_size)
{
if (conn->recv_buffer.len() >= conn->max_recv_buff_size)
{
- /* receive buffer is full, disable READ event */
conn->ev_socket.del();
conn->ev_socket.add(conn->ready_send ? 0 : FdEvent::WRITE);
conn->ready_recv = true;
return;
}
bytearray_t buff_seg;
- buff_seg.resize(seg_buff_size);
- ret = tls->recv(buff_seg.data(), seg_buff_size);
+ buff_seg.resize(recv_chunk_size);
+ ret = tls->recv(buff_seg.data(), recv_chunk_size);
SALTICIDAE_LOG_DEBUG("ssl read %zd bytes", ret);
if (ret < 0)
{
if (tls->get_error(ret) == SSL_ERROR_WANT_READ) break;
SALTICIDAE_LOG_INFO("recv(%d) failure: %s", fd, strerror(errno));
- /* connection err or half-opened connection */
conn->cpool->worker_terminate(conn);
return;
}
@@ -247,6 +244,7 @@ void ConnPool::Conn::_recv_data_tls_handshake(const conn_t &conn, int, int) {
{
/* finishing TLS handshake */
conn->send_data_func = _send_data_tls;
+ /* do not start receiving data immediately */
conn->recv_data_func = _recv_data_dummy;
conn->ev_socket.del();
conn->ev_socket.add(FdEvent::WRITE);
@@ -320,14 +318,13 @@ void ConnPool::accept_client(int fd, int) {
NetAddr addr((struct sockaddr_in *)&client_addr);
conn_t conn = create_conn();
conn->send_buffer.set_capacity(max_send_buff_size);
- conn->seg_buff_size = seg_buff_size;
+ conn->recv_chunk_size = recv_chunk_size;
conn->max_recv_buff_size = max_recv_buff_size;
conn->fd = client_fd;
conn->cpool = this;
conn->mode = Conn::PASSIVE;
conn->addr = addr;
add_conn(conn);
- //SALTICIDAE_LOG_INFO("new %x", (uintptr_t)conn.get());
SALTICIDAE_LOG_INFO("accepted %s", std::string(*conn).c_str());
auto &worker = select_worker();
conn->worker = &worker;
@@ -401,14 +398,13 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) {
throw ConnPoolError(SALTI_ERROR_CONNECT, errno);
conn_t conn = create_conn();
conn->send_buffer.set_capacity(max_send_buff_size);
- conn->seg_buff_size = seg_buff_size;
+ conn->recv_chunk_size = recv_chunk_size;
conn->max_recv_buff_size = max_recv_buff_size;
conn->fd = fd;
conn->cpool = this;
conn->mode = Conn::ACTIVE;
conn->addr = addr;
add_conn(conn);
- //SALTICIDAE_LOG_INFO("new %x", (uintptr_t)conn.get());
struct sockaddr_in sockin;
memset(&sockin, 0, sizeof(struct sockaddr_in));