From 2587e2c1b944882bc0b1e5b088f605076baa3435 Mon Sep 17 00:00:00 2001 From: Determinant Date: Mon, 17 Feb 2020 18:33:44 -0500 Subject: WIP: use PeerId to identify peers and allow one-way handshake --- src/conn.cpp | 44 ++++++++++++++++++++------------------------ src/network.cpp | 8 ++------ 2 files changed, 22 insertions(+), 30 deletions(-) (limited to 'src') diff --git a/src/conn.cpp b/src/conn.cpp index ba584c1..a5d60a7 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -66,7 +66,7 @@ void ConnPool::Conn::_send_data(const conn_t &conn, int fd, int events) { conn->cpool->worker_terminate(conn); return; } - ssize_t ret = conn->seg_buff_size; + ssize_t ret = conn->recv_chunk_size; for (;;) { bytearray_t buff_seg = conn->send_buffer.move_pop(); @@ -94,13 +94,13 @@ void ConnPool::Conn::_send_data(const conn_t &conn, int fd, int events) { bytearray_t(buff_seg.begin() + ret, buff_seg.end())); /* wait for the next write callback */ conn->ready_send = false; - //ev_write.add(); return; } } + /* the send_buffer is empty though the kernel buffer is still available, so + * temporarily mask the WRITE event and mark the `ready_send` flag */ conn->ev_socket.del(); conn->ev_socket.add(conn->ready_recv ? 0 : FdEvent::READ); - /* consumed the buffer but endpoint still seems to be writable */ conn->ready_send = true; } @@ -110,21 +110,21 @@ void ConnPool::Conn::_recv_data(const conn_t &conn, int fd, int events) { conn->cpool->worker_terminate(conn); return; } - const size_t seg_buff_size = conn->seg_buff_size; - ssize_t ret = seg_buff_size; - while (ret == (ssize_t)seg_buff_size) + const size_t recv_chunk_size = conn->recv_chunk_size; + ssize_t ret = recv_chunk_size; + while (ret == (ssize_t)recv_chunk_size) { if (conn->recv_buffer.len() >= conn->max_recv_buff_size) { - /* receive buffer is full, disable READ event */ + /* recv_buffer is full, temporarily mask the READ event */ conn->ev_socket.del(); conn->ev_socket.add(conn->ready_send ? 0 : FdEvent::WRITE); conn->ready_recv = true; return; } bytearray_t buff_seg; - buff_seg.resize(seg_buff_size); - ret = recv(fd, buff_seg.data(), seg_buff_size, 0); + buff_seg.resize(recv_chunk_size); + ret = recv(fd, buff_seg.data(), recv_chunk_size, 0); SALTICIDAE_LOG_DEBUG("socket read %zd bytes", ret); if (ret < 0) { @@ -136,14 +136,14 @@ void ConnPool::Conn::_recv_data(const conn_t &conn, int fd, int events) { } if (ret == 0) { - //SALTICIDAE_LOG_INFO("recv(%d) terminates", fd, strerror(errno)); + /* the remote closes the connection */ conn->cpool->worker_terminate(conn); return; } buff_seg.resize(ret); conn->recv_buffer.push(std::move(buff_seg)); } - //ev_read.add(); + /* wait for the next read callback */ conn->ready_recv = false; conn->cpool->on_read(conn); } @@ -155,7 +155,7 @@ void ConnPool::Conn::_send_data_tls(const conn_t &conn, int fd, int events) { conn->cpool->worker_terminate(conn); return; } - ssize_t ret = conn->seg_buff_size; + ssize_t ret = conn->recv_chunk_size; auto &tls = conn->tls; for (;;) { @@ -189,7 +189,6 @@ void ConnPool::Conn::_send_data_tls(const conn_t &conn, int fd, int events) { } conn->ev_socket.del(); conn->ev_socket.add(conn->ready_recv ? 0 : FdEvent::READ); - /* consumed the buffer but endpoint still seems to be writable */ conn->ready_send = true; } @@ -199,28 +198,26 @@ void ConnPool::Conn::_recv_data_tls(const conn_t &conn, int fd, int events) { conn->cpool->worker_terminate(conn); return; } - const size_t seg_buff_size = conn->seg_buff_size; - ssize_t ret = seg_buff_size; + const size_t recv_chunk_size = conn->recv_chunk_size; + ssize_t ret = recv_chunk_size; auto &tls = conn->tls; - while (ret == (ssize_t)seg_buff_size) + while (ret == (ssize_t)recv_chunk_size) { if (conn->recv_buffer.len() >= conn->max_recv_buff_size) { - /* receive buffer is full, disable READ event */ conn->ev_socket.del(); conn->ev_socket.add(conn->ready_send ? 0 : FdEvent::WRITE); conn->ready_recv = true; return; } bytearray_t buff_seg; - buff_seg.resize(seg_buff_size); - ret = tls->recv(buff_seg.data(), seg_buff_size); + buff_seg.resize(recv_chunk_size); + ret = tls->recv(buff_seg.data(), recv_chunk_size); SALTICIDAE_LOG_DEBUG("ssl read %zd bytes", ret); if (ret < 0) { if (tls->get_error(ret) == SSL_ERROR_WANT_READ) break; SALTICIDAE_LOG_INFO("recv(%d) failure: %s", fd, strerror(errno)); - /* connection err or half-opened connection */ conn->cpool->worker_terminate(conn); return; } @@ -247,6 +244,7 @@ void ConnPool::Conn::_recv_data_tls_handshake(const conn_t &conn, int, int) { { /* finishing TLS handshake */ conn->send_data_func = _send_data_tls; + /* do not start receiving data immediately */ conn->recv_data_func = _recv_data_dummy; conn->ev_socket.del(); conn->ev_socket.add(FdEvent::WRITE); @@ -320,14 +318,13 @@ void ConnPool::accept_client(int fd, int) { NetAddr addr((struct sockaddr_in *)&client_addr); conn_t conn = create_conn(); conn->send_buffer.set_capacity(max_send_buff_size); - conn->seg_buff_size = seg_buff_size; + conn->recv_chunk_size = recv_chunk_size; conn->max_recv_buff_size = max_recv_buff_size; conn->fd = client_fd; conn->cpool = this; conn->mode = Conn::PASSIVE; conn->addr = addr; add_conn(conn); - //SALTICIDAE_LOG_INFO("new %x", (uintptr_t)conn.get()); SALTICIDAE_LOG_INFO("accepted %s", std::string(*conn).c_str()); auto &worker = select_worker(); conn->worker = &worker; @@ -401,14 +398,13 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) { throw ConnPoolError(SALTI_ERROR_CONNECT, errno); conn_t conn = create_conn(); conn->send_buffer.set_capacity(max_send_buff_size); - conn->seg_buff_size = seg_buff_size; + conn->recv_chunk_size = recv_chunk_size; conn->max_recv_buff_size = max_recv_buff_size; conn->fd = fd; conn->cpool = this; conn->mode = Conn::ACTIVE; conn->addr = addr; add_conn(conn); - //SALTICIDAE_LOG_INFO("new %x", (uintptr_t)conn.get()); struct sockaddr_in sockin; memset(&sockin, 0, sizeof(struct sockaddr_in)); diff --git a/src/network.cpp b/src/network.cpp index e3eb56c..921ea03 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -43,8 +43,8 @@ void msgnetwork_config_conn_server_timeout(msgnetwork_config_t *self, double tim self->conn_server_timeout(timeout); } -void msgnetwork_config_seg_buff_size(msgnetwork_config_t *self, size_t size) { - self->seg_buff_size(size); +void msgnetwork_config_recv_chunk_size(msgnetwork_config_t *self, size_t size) { + self->recv_chunk_size(size); } void msgnetwork_config_nworker(msgnetwork_config_t *self, size_t nworker) { @@ -183,10 +183,6 @@ peernetwork_config_t *peernetwork_config_new() { void peernetwork_config_free(const peernetwork_config_t *self) { delete self; } -void peernetwork_config_retry_conn_delay(peernetwork_config_t *self, double t) { - self->retry_conn_delay(t); -} - void peernetwork_config_ping_period(peernetwork_config_t *self, double t) { self->ping_period(t); } -- cgit v1.2.3