aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/conn.cpp122
1 files changed, 60 insertions, 62 deletions
diff --git a/src/conn.cpp b/src/conn.cpp
index 931e915..1f5e324 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -56,7 +56,7 @@ ConnPool::Conn::operator std::string() const {
void ConnPool::Conn::_send_data(const ConnPool::conn_t &conn, int fd, int events) {
if (events & FdEvent::ERROR)
{
- conn->worker_terminate();
+ conn->cpool->worker_terminate(conn);
return;
}
ssize_t ret = conn->seg_buff_size;
@@ -77,7 +77,7 @@ void ConnPool::Conn::_send_data(const ConnPool::conn_t &conn, int fd, int events
if (ret < 0 && errno != EWOULDBLOCK)
{
SALTICIDAE_LOG_INFO("send(%d) failure: %s", fd, strerror(errno));
- conn->worker_terminate();
+ conn->cpool->worker_terminate(conn);
return;
}
}
@@ -100,7 +100,7 @@ void ConnPool::Conn::_send_data(const ConnPool::conn_t &conn, int fd, int events
void ConnPool::Conn::_recv_data(const ConnPool::conn_t &conn, int fd, int events) {
if (events & FdEvent::ERROR)
{
- conn->worker_terminate();
+ conn->cpool->worker_terminate(conn);
return;
}
const size_t seg_buff_size = conn->seg_buff_size;
@@ -116,27 +116,27 @@ void ConnPool::Conn::_recv_data(const ConnPool::conn_t &conn, int fd, int events
if (errno == EWOULDBLOCK) break;
SALTICIDAE_LOG_INFO("recv(%d) failure: %s", fd, strerror(errno));
/* connection err or half-opened connection */
- conn->worker_terminate();
+ conn->cpool->worker_terminate(conn);
return;
}
if (ret == 0)
{
//SALTICIDAE_LOG_INFO("recv(%d) terminates", fd, strerror(errno));
- conn->worker_terminate();
+ conn->cpool->worker_terminate(conn);
return;
}
buff_seg.resize(ret);
conn->recv_buffer.push(std::move(buff_seg));
}
//ev_read.add();
- conn->on_read();
+ conn->cpool->on_read(conn);
}
void ConnPool::Conn::_send_data_tls(const ConnPool::conn_t &conn, int fd, int events) {
if (events & FdEvent::ERROR)
{
- conn->worker_terminate();
+ conn->cpool->worker_terminate(conn);
return;
}
ssize_t ret = conn->seg_buff_size;
@@ -158,7 +158,7 @@ void ConnPool::Conn::_send_data_tls(const ConnPool::conn_t &conn, int fd, int ev
if (ret < 0 && tls->get_error(ret) != SSL_ERROR_WANT_WRITE)
{
SALTICIDAE_LOG_INFO("send(%d) failure: %s", fd, strerror(errno));
- conn->worker_terminate();
+ conn->cpool->worker_terminate(conn);
return;
}
}
@@ -180,7 +180,7 @@ void ConnPool::Conn::_send_data_tls(const ConnPool::conn_t &conn, int fd, int ev
void ConnPool::Conn::_recv_data_tls(const ConnPool::conn_t &conn, int fd, int events) {
if (events & FdEvent::ERROR)
{
- conn->worker_terminate();
+ conn->cpool->worker_terminate(conn);
return;
}
const size_t seg_buff_size = conn->seg_buff_size;
@@ -197,18 +197,18 @@ void ConnPool::Conn::_recv_data_tls(const ConnPool::conn_t &conn, int fd, int ev
if (tls->get_error(ret) == SSL_ERROR_WANT_READ) break;
SALTICIDAE_LOG_INFO("recv(%d) failure: %s", fd, strerror(errno));
/* connection err or half-opened connection */
- conn->worker_terminate();
+ conn->cpool->worker_terminate(conn);
return;
}
if (ret == 0)
{
- conn->worker_terminate();
+ conn->cpool->worker_terminate(conn);
return;
}
buff_seg.resize(ret);
conn->recv_buffer.push(std::move(buff_seg));
}
- conn->on_read();
+ conn->cpool->on_read(conn);
}
void ConnPool::Conn::_send_data_tls_handshake(const ConnPool::conn_t &conn, int fd, int events) {
@@ -234,44 +234,43 @@ void ConnPool::Conn::_recv_data_tls_handshake(const ConnPool::conn_t &conn, int,
}
}
-
void ConnPool::Conn::_recv_data_dummy(const ConnPool::conn_t &, int, int) {
}
-/****/
void ConnPool::Conn::stop() {
if (mode != ConnMode::DEAD)
{
if (worker) worker->unfeed();
if (tls) tls->shutdown();
- ev_connect.clear();
- ev_socket.clear();
+ ev_connect.del();
+ ev_socket.del();
send_buffer.get_queue().unreg_handler();
mode = ConnMode::DEAD;
}
}
-void ConnPool::Conn::worker_terminate() {
- auto conn = self();
- if (!conn) return;
- stop();
- if (!worker->is_dispatcher())
- cpool->disp_tcall->async_call(
- [cpool=this->cpool, conn](ThreadCall::Handle &) {
- cpool->del_conn(conn);
+void ConnPool::worker_terminate(const conn_t &conn) {
+ conn->worker->get_tcall()->async_call([this, conn](ThreadCall::Handle &) {
+ if (!conn->set_terminated()) return;
+ conn->stop();
+ disp_tcall->async_call([this, conn](ThreadCall::Handle &) {
+ del_conn(conn);
});
- else cpool->del_conn(conn);
+ });
}
-void ConnPool::Conn::disp_terminate() {
- auto conn = self();
- if (!conn) return;
- if (worker && !worker->is_dispatcher())
- worker->get_tcall()->call([conn](ThreadCall::Handle &) {
+/****/
+
+void ConnPool::disp_terminate(const conn_t &conn) {
+ auto worker = conn->worker;
+ if (worker)
+ worker_terminate(conn);
+ else
+ disp_tcall->async_call([this, conn](ThreadCall::Handle &) {
+ if (!conn->set_terminated()) return;
conn->stop();
+ del_conn(conn);
});
- else stop();
- cpool->del_conn(conn);
}
void ConnPool::accept_client(int fd, int) {
@@ -291,7 +290,6 @@ void ConnPool::accept_client(int fd, int) {
NetAddr addr((struct sockaddr_in *)&client_addr);
conn_t conn = create_conn();
- conn->self_ref = conn;
conn->send_buffer.set_capacity(queue_capacity);
conn->seg_buff_size = seg_buff_size;
conn->fd = client_fd;
@@ -299,31 +297,31 @@ void ConnPool::accept_client(int fd, int) {
conn->mode = Conn::PASSIVE;
conn->addr = addr;
add_conn(conn);
+ //SALTICIDAE_LOG_INFO("new %x", (uintptr_t)conn.get());
SALTICIDAE_LOG_INFO("accepted %s", std::string(*conn).c_str());
auto &worker = select_worker();
conn->worker = &worker;
- conn->on_setup();
+ on_setup(conn);
worker.feed(conn, client_fd);
}
} catch (...) { recoverable_error(std::current_exception()); }
}
-void ConnPool::Conn::conn_server(int fd, int events) {
- auto conn = self(); /* pin the connection */
- if (!conn) return;
+void ConnPool::conn_server(const conn_t &conn, int fd, int events) {
if (send(fd, "", 0, MSG_NOSIGNAL) == 0)
{
- ev_connect.clear();
- SALTICIDAE_LOG_INFO("connected to remote %s", std::string(*this).c_str());
- worker = &(cpool->select_worker());
- on_setup();
- worker->feed(conn, fd);
+ conn->ev_connect.del();
+ SALTICIDAE_LOG_INFO("connected to remote %s", std::string(*conn).c_str());
+ auto &worker = select_worker();
+ conn->worker = &worker;
+ on_setup(conn);
+ worker.feed(conn, fd);
}
else
{
if (events & TimedFdEvent::TIMEOUT)
- SALTICIDAE_LOG_INFO("%s connect timeout", std::string(*this).c_str());
- conn->disp_terminate();
+ SALTICIDAE_LOG_INFO("%s connect timeout", std::string(*conn).c_str());
+ disp_terminate(conn);
return;
}
}
@@ -370,14 +368,14 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) {
if (fcntl(fd, F_SETFL, O_NONBLOCK) == -1)
throw ConnPoolError(SALTI_ERROR_CONNECT, errno);
conn_t conn = create_conn();
- conn->self_ref = conn;
conn->send_buffer.set_capacity(queue_capacity);
conn->seg_buff_size = seg_buff_size;
conn->fd = fd;
- conn->worker = nullptr;
conn->cpool = this;
conn->mode = Conn::ACTIVE;
conn->addr = addr;
+ add_conn(conn);
+ //SALTICIDAE_LOG_INFO("new %x", (uintptr_t)conn.get());
struct sockaddr_in sockin;
memset(&sockin, 0, sizeof(struct sockaddr_in));
@@ -389,13 +387,14 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) {
sizeof(struct sockaddr_in)) < 0 && errno != EINPROGRESS)
{
SALTICIDAE_LOG_INFO("cannot connect to %s", std::string(addr).c_str());
- conn->disp_terminate();
+ disp_terminate(conn);
}
else
{
- conn->ev_connect = TimedFdEvent(disp_ec, conn->fd, std::bind(&Conn::conn_server, conn.get(), _1, _2));
+ conn->ev_connect = TimedFdEvent(disp_ec, conn->fd, [this, conn](int fd, int events) {
+ conn_server(conn, fd, events);
+ });
conn->ev_connect.add(FdEvent::WRITE, conn_server_timeout);
- add_conn(conn);
SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str());
}
return conn;
@@ -403,19 +402,18 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) {
void ConnPool::del_conn(const conn_t &conn) {
auto it = pool.find(conn->fd);
- if (it != pool.end())
- {
- /* temporarily pin the conn before it dies */
- auto conn = it->second;
- //assert(conn->fd == fd);
- pool.erase(it);
- /* inform the upper layer the connection will be destroyed */
- conn->on_teardown();
- update_conn(conn, false);
- conn->release_self(); /* remove the self-cycle */
- ::close(conn->fd);
- conn->fd = -1;
- }
+ assert(it != pool.end());
+ pool.erase(it);
+ update_conn(conn, false);
+ release_conn(conn);
+}
+
+void ConnPool::release_conn(const conn_t &conn) {
+ /* inform the upper layer the connection will be destroyed */
+ on_teardown(conn);
+ conn->ev_connect.clear();
+ conn->ev_socket.clear();
+ ::close(conn->fd);
}
ConnPool::conn_t ConnPool::add_conn(const conn_t &conn) {