From dd09443b0b3c0b5d1a8c034644d1065dd25bf5a9 Mon Sep 17 00:00:00 2001 From: Determinant Date: Sun, 11 Nov 2018 23:02:59 -0500 Subject: start debugging multiloops design --- src/conn.cpp | 108 ++++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 66 insertions(+), 42 deletions(-) (limited to 'src') diff --git a/src/conn.cpp b/src/conn.cpp index 0607ed3..5863f3c 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -44,14 +44,17 @@ ConnPool::Conn::operator std::string() const { return std::move(s); } +/* the following two functions are executed by exactly one worker per Conn object */ + void ConnPool::Conn::send_data(evutil_socket_t fd, short events) { if (!(events & EV_WRITE)) return; auto conn = self(); /* pin the connection */ ssize_t ret = seg_buff_size; - while (!send_buffer.empty()) + for (;;) { bytearray_t buff_seg = send_buffer.move_pop(); ssize_t size = buff_seg.size(); + if (!size) break; ret = send(fd, buff_seg.data(), size, MSG_NOSIGNAL); SALTICIDAE_LOG_DEBUG("socket sent %zd bytes", ret); size -= ret; @@ -112,6 +115,12 @@ void ConnPool::Conn::recv_data(evutil_socket_t fd, short events) { on_read(); } +void ConnPool::Conn::terminate() { + ev_read.clear(); + ev_write.clear(); + cpool->post_terminate(fd); +} + void ConnPool::accept_client(evutil_socket_t fd, short) { int client_fd; struct sockaddr client_addr; @@ -135,33 +144,36 @@ void ConnPool::accept_client(evutil_socket_t fd, short) { conn->mode = Conn::PASSIVE; conn->addr = addr; - Conn *conn_ptr = conn.get(); - conn->ev_read = Event(ec, client_fd, EV_READ, - std::bind(&Conn::recv_data, conn_ptr, _1, _2)); - conn->ev_write = Event(ec, client_fd, EV_WRITE, - std::bind(&Conn::send_data, conn_ptr, _1, _2)); - conn->ev_read.add(); - conn->ev_write.add(); + //Conn *conn_ptr = conn.get(); + // TODO: use worker thread ec + //conn->ev_read = Event(ec, client_fd, EV_READ, + // std::bind(&Conn::recv_data, conn_ptr, _1, _2)); + //conn->ev_write = Event(ec, client_fd, EV_WRITE, + // std::bind(&Conn::send_data, conn_ptr, _1, _2)); + //conn->ev_read.add(); + //conn->ev_write.add(); add_conn(conn); SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str()); conn->on_setup(); + select_worker().feed(conn, client_fd); } - ev_listen.add(); } void ConnPool::Conn::conn_server(evutil_socket_t fd, short events) { auto conn = self(); /* pin the connection */ if (send(fd, "", 0, MSG_NOSIGNAL) == 0) { - ev_read = Event(cpool->ec, fd, EV_READ, - std::bind(&Conn::recv_data, this, _1, _2)); - ev_write = Event(cpool->ec, fd, EV_WRITE, - std::bind(&Conn::send_data, this, _1, _2)); - ev_read.add(); - ev_write.add(); + // TODO: use worker thread ec + //ev_read = Event(cpool->ec, fd, EV_READ, + // std::bind(&Conn::recv_data, this, _1, _2)); + //ev_write = Event(cpool->ec, fd, EV_WRITE, + // std::bind(&Conn::send_data, this, _1, _2)); + //ev_read.add(); + //ev_write.add(); ev_connect.clear(); SALTICIDAE_LOG_INFO("connected to peer %s", std::string(*this).c_str()); on_setup(); + cpool->select_worker().feed(self(), fd); } else { @@ -172,8 +184,14 @@ void ConnPool::Conn::conn_server(evutil_socket_t fd, short events) { } } -void ConnPool::listen(NetAddr listen_addr) { +void ConnPool::_listen(NetAddr listen_addr) { + std::lock_guard _(cp_mlock); int one = 1; + if (listen_fd != -1) + { /* reset the previous listen() */ + ev_listen.clear(); + close(listen_fd); + } if ((listen_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) throw ConnPoolError(std::string("cannot create socket for listening")); if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, (const char *)&one, sizeof(one)) < 0 || @@ -192,28 +210,16 @@ void ConnPool::listen(NetAddr listen_addr) { throw ConnPoolError(std::string("binding error")); if (::listen(listen_fd, max_listen_backlog) < 0) throw ConnPoolError(std::string("listen error")); - ev_listen = Event(ec, listen_fd, EV_READ, - std::bind(&ConnPool::accept_client, this, _1, _2)); - ev_listen.add(); - SALTICIDAE_LOG_INFO("listening to %u", ntohs(listen_addr.port)); -} - -void ConnPool::Conn::terminate() { - auto &pool = cpool->pool; - auto it = pool.find(fd); - if (it != pool.end()) { - /* temporarily pin the conn before it dies */ - auto conn = it->second; - assert(conn.get() == this); - pool.erase(it); - on_close(); - /* inform the upper layer the connection will be destroyed */ - on_teardown(); + std::lock_guard _(dsp_ec_mlock); + ev_listen = Event(dispatcher_ec, listen_fd, EV_READ | EV_PERSIST, + std::bind(&ConnPool::accept_client, this, _1, _2)); + ev_listen.add(); } + SALTICIDAE_LOG_INFO("listening to %u", ntohs(listen_addr.port)); } -ConnPool::conn_t ConnPool::connect(const NetAddr &addr) { +ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) { int fd; int one = 1; if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) @@ -245,23 +251,41 @@ ConnPool::conn_t ConnPool::connect(const NetAddr &addr) { } else { - conn->ev_connect = Event(ec, fd, EV_WRITE, - std::bind(&Conn::conn_server, conn.get(), _1, _2)); - conn->ev_connect.add_with_timeout(conn_server_timeout); - + { + std::lock_guard _(dsp_ec_mlock); + conn->ev_connect = Event(dispatcher_ec, fd, EV_WRITE, + std::bind(&Conn::conn_server, conn.get(), _1, _2)); + conn->ev_connect.add_with_timeout(conn_server_timeout); + } add_conn(conn); SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str()); } return conn; } -ConnPool::conn_t ConnPool::add_conn(conn_t conn) { - auto it = pool.find(conn->fd); +void ConnPool::_post_terminate(int fd) { + std::lock_guard _(cp_mlock); + auto it = pool.find(fd); if (it != pool.end()) { - auto old_conn = it->second; - old_conn->terminate(); + /* temporarily pin the conn before it dies */ + auto conn = it->second; + assert(conn->fd == fd); + pool.erase(it); + conn->on_close(); + /* inform the upper layer the connection will be destroyed */ + conn->on_teardown(); } +} + +ConnPool::conn_t ConnPool::add_conn(const conn_t &conn) { + std::lock_guard _(cp_mlock); + assert(pool.find(conn->fd) == pool.end()); + //if (it != pool.end()) + //{ + // auto old_conn = it->second; + // old_conn->terminate(); + //} return pool.insert(std::make_pair(conn->fd, conn)).first->second; } -- cgit v1.2.3-70-g09d2