From 2c1e8ec448a1039ab9a46bce4c959e6ec3cefeb8 Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 13 Nov 2018 18:20:08 -0500 Subject: working on p2p; switch to libuv (libevent sucks in multi-threading) --- src/conn.cpp | 49 ++++++++++++++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 21 deletions(-) (limited to 'src/conn.cpp') diff --git a/src/conn.cpp b/src/conn.cpp index 7b7c699..f2922d8 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -46,8 +46,8 @@ ConnPool::Conn::operator std::string() const { /* the following two functions are executed by exactly one worker per Conn object */ -void ConnPool::Conn::send_data(evutil_socket_t fd, short events) { - if (!(events & EV_WRITE)) return; +void ConnPool::Conn::send_data(int fd, int events) { + if (!(events & Event::WRITE)) return; auto conn = self(); /* pin the connection */ ssize_t ret = seg_buff_size; for (;;) @@ -77,7 +77,7 @@ void ConnPool::Conn::send_data(evutil_socket_t fd, short events) { bytearray_t(buff_seg.begin() + ret, buff_seg.end())); /* wait for the next write callback */ ready_send = false; - ev_write.add(); + //ev_write.add(); return; } } @@ -85,8 +85,8 @@ void ConnPool::Conn::send_data(evutil_socket_t fd, short events) { ready_send = true; } -void ConnPool::Conn::recv_data(evutil_socket_t fd, short events) { - if (!(events & EV_READ)) return; +void ConnPool::Conn::recv_data(int fd, int events) { + if (!(events & Event::READ)) return; auto conn = self(); /* pin the connection */ ssize_t ret = seg_buff_size; while (ret == (ssize_t)seg_buff_size) @@ -105,13 +105,14 @@ void ConnPool::Conn::recv_data(evutil_socket_t fd, short events) { } if (ret == 0) { + //SALTICIDAE_LOG_INFO("recv(%d) terminates", fd, strerror(errno)); terminate(); return; } buff_seg.resize(ret); recv_buffer.push(std::move(buff_seg)); } - ev_read.add(); + //ev_read.add(); on_read(); } @@ -121,7 +122,7 @@ void ConnPool::Conn::terminate() { cpool->post_terminate(fd); } -void ConnPool::accept_client(evutil_socket_t fd, short) { +void ConnPool::accept_client(int fd, int) { int client_fd; struct sockaddr client_addr; socklen_t addr_size = sizeof(struct sockaddr_in); @@ -151,7 +152,7 @@ void ConnPool::accept_client(evutil_socket_t fd, short) { } } -void ConnPool::Conn::conn_server(evutil_socket_t fd, short events) { +void ConnPool::Conn::conn_server(int fd, int events) { auto conn = self(); /* pin the connection */ if (send(fd, "", 0, MSG_NOSIGNAL) == 0) { @@ -163,7 +164,7 @@ void ConnPool::Conn::conn_server(evutil_socket_t fd, short events) { } else { - if (events & EV_TIMEOUT) + if (events & Event::TIMEOUT) SALTICIDAE_LOG_INFO("%s connect timeout", std::string(*this).c_str()); terminate(); return; @@ -196,12 +197,8 @@ void ConnPool::listen(NetAddr listen_addr) { throw ConnPoolError(std::string("binding error")); if (::listen(listen_fd, max_listen_backlog) < 0) throw ConnPoolError(std::string("listen error")); - { - std::lock_guard _(dsp_ec_mlock); - ev_listen = Event(dispatcher_ec, listen_fd, EV_READ | EV_PERSIST, - std::bind(&ConnPool::accept_client, this, _1, _2)); - ev_listen.add(); - } + auto dcmd = new DspAcceptListen(listen_fd); + write(dlisten_fd[1], &dcmd, sizeof(dcmd)); SALTICIDAE_LOG_INFO("listening to %u", ntohs(listen_addr.port)); } @@ -237,12 +234,8 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) { } else { - { - std::lock_guard _(dsp_ec_mlock); - conn->ev_connect = Event(dispatcher_ec, fd, EV_WRITE, - std::bind(&Conn::conn_server, conn.get(), _1, _2)); - conn->ev_connect.add_with_timeout(conn_server_timeout); - } + auto dcmd = new DspConnectListen(conn); + write(dlisten_fd[1], &dcmd, sizeof(dcmd)); add_conn(conn); SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str()); } @@ -271,4 +264,18 @@ ConnPool::conn_t ConnPool::add_conn(const conn_t &conn) { return pool.insert(std::make_pair(conn->fd, conn)).first->second; } +void ConnPool::_accept_listen(int listen_fd) { + std::lock_guard _(cp_mlock); + ev_listen = Event(dispatcher_ec, listen_fd, Event::READ, + std::bind(&ConnPool::accept_client, this, _1, _2)); + ev_listen.add(); +} + +void ConnPool::_connect_listen(const conn_t &conn) { + std::lock_guard _(cp_mlock); + conn->ev_connect = Event(dispatcher_ec, conn->fd, Event::WRITE, + std::bind(&Conn::conn_server, conn.get(), _1, _2)); + conn->ev_connect.add_with_timeout(conn_server_timeout); +} + } -- cgit v1.2.3-70-g09d2