diff options
Diffstat (limited to 'src/conn.cpp')
-rw-r--r-- | src/conn.cpp | 122 |
1 files changed, 60 insertions, 62 deletions
diff --git a/src/conn.cpp b/src/conn.cpp index 931e915..1f5e324 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -56,7 +56,7 @@ ConnPool::Conn::operator std::string() const { void ConnPool::Conn::_send_data(const ConnPool::conn_t &conn, int fd, int events) { if (events & FdEvent::ERROR) { - conn->worker_terminate(); + conn->cpool->worker_terminate(conn); return; } ssize_t ret = conn->seg_buff_size; @@ -77,7 +77,7 @@ void ConnPool::Conn::_send_data(const ConnPool::conn_t &conn, int fd, int events if (ret < 0 && errno != EWOULDBLOCK) { SALTICIDAE_LOG_INFO("send(%d) failure: %s", fd, strerror(errno)); - conn->worker_terminate(); + conn->cpool->worker_terminate(conn); return; } } @@ -100,7 +100,7 @@ void ConnPool::Conn::_send_data(const ConnPool::conn_t &conn, int fd, int events void ConnPool::Conn::_recv_data(const ConnPool::conn_t &conn, int fd, int events) { if (events & FdEvent::ERROR) { - conn->worker_terminate(); + conn->cpool->worker_terminate(conn); return; } const size_t seg_buff_size = conn->seg_buff_size; @@ -116,27 +116,27 @@ void ConnPool::Conn::_recv_data(const ConnPool::conn_t &conn, int fd, int events if (errno == EWOULDBLOCK) break; SALTICIDAE_LOG_INFO("recv(%d) failure: %s", fd, strerror(errno)); /* connection err or half-opened connection */ - conn->worker_terminate(); + conn->cpool->worker_terminate(conn); return; } if (ret == 0) { //SALTICIDAE_LOG_INFO("recv(%d) terminates", fd, strerror(errno)); - conn->worker_terminate(); + conn->cpool->worker_terminate(conn); return; } buff_seg.resize(ret); conn->recv_buffer.push(std::move(buff_seg)); } //ev_read.add(); - conn->on_read(); + conn->cpool->on_read(conn); } void ConnPool::Conn::_send_data_tls(const ConnPool::conn_t &conn, int fd, int events) { if (events & FdEvent::ERROR) { - conn->worker_terminate(); + conn->cpool->worker_terminate(conn); return; } ssize_t ret = conn->seg_buff_size; @@ -158,7 +158,7 @@ void ConnPool::Conn::_send_data_tls(const ConnPool::conn_t &conn, int fd, int ev if (ret < 0 && tls->get_error(ret) != SSL_ERROR_WANT_WRITE) { SALTICIDAE_LOG_INFO("send(%d) failure: %s", fd, strerror(errno)); - conn->worker_terminate(); + conn->cpool->worker_terminate(conn); return; } } @@ -180,7 +180,7 @@ void ConnPool::Conn::_send_data_tls(const ConnPool::conn_t &conn, int fd, int ev void ConnPool::Conn::_recv_data_tls(const ConnPool::conn_t &conn, int fd, int events) { if (events & FdEvent::ERROR) { - conn->worker_terminate(); + conn->cpool->worker_terminate(conn); return; } const size_t seg_buff_size = conn->seg_buff_size; @@ -197,18 +197,18 @@ void ConnPool::Conn::_recv_data_tls(const ConnPool::conn_t &conn, int fd, int ev if (tls->get_error(ret) == SSL_ERROR_WANT_READ) break; SALTICIDAE_LOG_INFO("recv(%d) failure: %s", fd, strerror(errno)); /* connection err or half-opened connection */ - conn->worker_terminate(); + conn->cpool->worker_terminate(conn); return; } if (ret == 0) { - conn->worker_terminate(); + conn->cpool->worker_terminate(conn); return; } buff_seg.resize(ret); conn->recv_buffer.push(std::move(buff_seg)); } - conn->on_read(); + conn->cpool->on_read(conn); } void ConnPool::Conn::_send_data_tls_handshake(const ConnPool::conn_t &conn, int fd, int events) { @@ -234,44 +234,43 @@ void ConnPool::Conn::_recv_data_tls_handshake(const ConnPool::conn_t &conn, int, } } - void ConnPool::Conn::_recv_data_dummy(const ConnPool::conn_t &, int, int) { } -/****/ void ConnPool::Conn::stop() { if (mode != ConnMode::DEAD) { if (worker) worker->unfeed(); if (tls) tls->shutdown(); - ev_connect.clear(); - ev_socket.clear(); + ev_connect.del(); + ev_socket.del(); send_buffer.get_queue().unreg_handler(); mode = ConnMode::DEAD; } } -void ConnPool::Conn::worker_terminate() { - auto conn = self(); - if (!conn) return; - stop(); - if (!worker->is_dispatcher()) - cpool->disp_tcall->async_call( - [cpool=this->cpool, conn](ThreadCall::Handle &) { - cpool->del_conn(conn); +void ConnPool::worker_terminate(const conn_t &conn) { + conn->worker->get_tcall()->async_call([this, conn](ThreadCall::Handle &) { + if (!conn->set_terminated()) return; + conn->stop(); + disp_tcall->async_call([this, conn](ThreadCall::Handle &) { + del_conn(conn); }); - else cpool->del_conn(conn); + }); } -void ConnPool::Conn::disp_terminate() { - auto conn = self(); - if (!conn) return; - if (worker && !worker->is_dispatcher()) - worker->get_tcall()->call([conn](ThreadCall::Handle &) { +/****/ + +void ConnPool::disp_terminate(const conn_t &conn) { + auto worker = conn->worker; + if (worker) + worker_terminate(conn); + else + disp_tcall->async_call([this, conn](ThreadCall::Handle &) { + if (!conn->set_terminated()) return; conn->stop(); + del_conn(conn); }); - else stop(); - cpool->del_conn(conn); } void ConnPool::accept_client(int fd, int) { @@ -291,7 +290,6 @@ void ConnPool::accept_client(int fd, int) { NetAddr addr((struct sockaddr_in *)&client_addr); conn_t conn = create_conn(); - conn->self_ref = conn; conn->send_buffer.set_capacity(queue_capacity); conn->seg_buff_size = seg_buff_size; conn->fd = client_fd; @@ -299,31 +297,31 @@ void ConnPool::accept_client(int fd, int) { conn->mode = Conn::PASSIVE; conn->addr = addr; add_conn(conn); + //SALTICIDAE_LOG_INFO("new %x", (uintptr_t)conn.get()); SALTICIDAE_LOG_INFO("accepted %s", std::string(*conn).c_str()); auto &worker = select_worker(); conn->worker = &worker; - conn->on_setup(); + on_setup(conn); worker.feed(conn, client_fd); } } catch (...) { recoverable_error(std::current_exception()); } } -void ConnPool::Conn::conn_server(int fd, int events) { - auto conn = self(); /* pin the connection */ - if (!conn) return; +void ConnPool::conn_server(const conn_t &conn, int fd, int events) { if (send(fd, "", 0, MSG_NOSIGNAL) == 0) { - ev_connect.clear(); - SALTICIDAE_LOG_INFO("connected to remote %s", std::string(*this).c_str()); - worker = &(cpool->select_worker()); - on_setup(); - worker->feed(conn, fd); + conn->ev_connect.del(); + SALTICIDAE_LOG_INFO("connected to remote %s", std::string(*conn).c_str()); + auto &worker = select_worker(); + conn->worker = &worker; + on_setup(conn); + worker.feed(conn, fd); } else { if (events & TimedFdEvent::TIMEOUT) - SALTICIDAE_LOG_INFO("%s connect timeout", std::string(*this).c_str()); - conn->disp_terminate(); + SALTICIDAE_LOG_INFO("%s connect timeout", std::string(*conn).c_str()); + disp_terminate(conn); return; } } @@ -370,14 +368,14 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) { if (fcntl(fd, F_SETFL, O_NONBLOCK) == -1) throw ConnPoolError(SALTI_ERROR_CONNECT, errno); conn_t conn = create_conn(); - conn->self_ref = conn; conn->send_buffer.set_capacity(queue_capacity); conn->seg_buff_size = seg_buff_size; conn->fd = fd; - conn->worker = nullptr; conn->cpool = this; conn->mode = Conn::ACTIVE; conn->addr = addr; + add_conn(conn); + //SALTICIDAE_LOG_INFO("new %x", (uintptr_t)conn.get()); struct sockaddr_in sockin; memset(&sockin, 0, sizeof(struct sockaddr_in)); @@ -389,13 +387,14 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) { sizeof(struct sockaddr_in)) < 0 && errno != EINPROGRESS) { SALTICIDAE_LOG_INFO("cannot connect to %s", std::string(addr).c_str()); - conn->disp_terminate(); + disp_terminate(conn); } else { - conn->ev_connect = TimedFdEvent(disp_ec, conn->fd, std::bind(&Conn::conn_server, conn.get(), _1, _2)); + conn->ev_connect = TimedFdEvent(disp_ec, conn->fd, [this, conn](int fd, int events) { + conn_server(conn, fd, events); + }); conn->ev_connect.add(FdEvent::WRITE, conn_server_timeout); - add_conn(conn); SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str()); } return conn; @@ -403,19 +402,18 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) { void ConnPool::del_conn(const conn_t &conn) { auto it = pool.find(conn->fd); - if (it != pool.end()) - { - /* temporarily pin the conn before it dies */ - auto conn = it->second; - //assert(conn->fd == fd); - pool.erase(it); - /* inform the upper layer the connection will be destroyed */ - conn->on_teardown(); - update_conn(conn, false); - conn->release_self(); /* remove the self-cycle */ - ::close(conn->fd); - conn->fd = -1; - } + assert(it != pool.end()); + pool.erase(it); + update_conn(conn, false); + release_conn(conn); +} + +void ConnPool::release_conn(const conn_t &conn) { + /* inform the upper layer the connection will be destroyed */ + on_teardown(conn); + conn->ev_connect.clear(); + conn->ev_socket.clear(); + ::close(conn->fd); } ConnPool::conn_t ConnPool::add_conn(const conn_t &conn) { |