aboutsummaryrefslogtreecommitdiff
path: root/src/conn.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/conn.cpp')
-rw-r--r--src/conn.cpp108
1 files changed, 66 insertions, 42 deletions
diff --git a/src/conn.cpp b/src/conn.cpp
index 0607ed3..5863f3c 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -44,14 +44,17 @@ ConnPool::Conn::operator std::string() const {
return std::move(s);
}
+/* the following two functions are executed by exactly one worker per Conn object */
+
void ConnPool::Conn::send_data(evutil_socket_t fd, short events) {
if (!(events & EV_WRITE)) return;
auto conn = self(); /* pin the connection */
ssize_t ret = seg_buff_size;
- while (!send_buffer.empty())
+ for (;;)
{
bytearray_t buff_seg = send_buffer.move_pop();
ssize_t size = buff_seg.size();
+ if (!size) break;
ret = send(fd, buff_seg.data(), size, MSG_NOSIGNAL);
SALTICIDAE_LOG_DEBUG("socket sent %zd bytes", ret);
size -= ret;
@@ -112,6 +115,12 @@ void ConnPool::Conn::recv_data(evutil_socket_t fd, short events) {
on_read();
}
+void ConnPool::Conn::terminate() {
+ ev_read.clear();
+ ev_write.clear();
+ cpool->post_terminate(fd);
+}
+
void ConnPool::accept_client(evutil_socket_t fd, short) {
int client_fd;
struct sockaddr client_addr;
@@ -135,33 +144,36 @@ void ConnPool::accept_client(evutil_socket_t fd, short) {
conn->mode = Conn::PASSIVE;
conn->addr = addr;
- Conn *conn_ptr = conn.get();
- conn->ev_read = Event(ec, client_fd, EV_READ,
- std::bind(&Conn::recv_data, conn_ptr, _1, _2));
- conn->ev_write = Event(ec, client_fd, EV_WRITE,
- std::bind(&Conn::send_data, conn_ptr, _1, _2));
- conn->ev_read.add();
- conn->ev_write.add();
+ //Conn *conn_ptr = conn.get();
+ // TODO: use worker thread ec
+ //conn->ev_read = Event(ec, client_fd, EV_READ,
+ // std::bind(&Conn::recv_data, conn_ptr, _1, _2));
+ //conn->ev_write = Event(ec, client_fd, EV_WRITE,
+ // std::bind(&Conn::send_data, conn_ptr, _1, _2));
+ //conn->ev_read.add();
+ //conn->ev_write.add();
add_conn(conn);
SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str());
conn->on_setup();
+ select_worker().feed(conn, client_fd);
}
- ev_listen.add();
}
void ConnPool::Conn::conn_server(evutil_socket_t fd, short events) {
auto conn = self(); /* pin the connection */
if (send(fd, "", 0, MSG_NOSIGNAL) == 0)
{
- ev_read = Event(cpool->ec, fd, EV_READ,
- std::bind(&Conn::recv_data, this, _1, _2));
- ev_write = Event(cpool->ec, fd, EV_WRITE,
- std::bind(&Conn::send_data, this, _1, _2));
- ev_read.add();
- ev_write.add();
+ // TODO: use worker thread ec
+ //ev_read = Event(cpool->ec, fd, EV_READ,
+ // std::bind(&Conn::recv_data, this, _1, _2));
+ //ev_write = Event(cpool->ec, fd, EV_WRITE,
+ // std::bind(&Conn::send_data, this, _1, _2));
+ //ev_read.add();
+ //ev_write.add();
ev_connect.clear();
SALTICIDAE_LOG_INFO("connected to peer %s", std::string(*this).c_str());
on_setup();
+ cpool->select_worker().feed(self(), fd);
}
else
{
@@ -172,8 +184,14 @@ void ConnPool::Conn::conn_server(evutil_socket_t fd, short events) {
}
}
-void ConnPool::listen(NetAddr listen_addr) {
+void ConnPool::_listen(NetAddr listen_addr) {
+ std::lock_guard<std::mutex> _(cp_mlock);
int one = 1;
+ if (listen_fd != -1)
+ { /* reset the previous listen() */
+ ev_listen.clear();
+ close(listen_fd);
+ }
if ((listen_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
throw ConnPoolError(std::string("cannot create socket for listening"));
if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, (const char *)&one, sizeof(one)) < 0 ||
@@ -192,28 +210,16 @@ void ConnPool::listen(NetAddr listen_addr) {
throw ConnPoolError(std::string("binding error"));
if (::listen(listen_fd, max_listen_backlog) < 0)
throw ConnPoolError(std::string("listen error"));
- ev_listen = Event(ec, listen_fd, EV_READ,
- std::bind(&ConnPool::accept_client, this, _1, _2));
- ev_listen.add();
- SALTICIDAE_LOG_INFO("listening to %u", ntohs(listen_addr.port));
-}
-
-void ConnPool::Conn::terminate() {
- auto &pool = cpool->pool;
- auto it = pool.find(fd);
- if (it != pool.end())
{
- /* temporarily pin the conn before it dies */
- auto conn = it->second;
- assert(conn.get() == this);
- pool.erase(it);
- on_close();
- /* inform the upper layer the connection will be destroyed */
- on_teardown();
+ std::lock_guard<std::mutex> _(dsp_ec_mlock);
+ ev_listen = Event(dispatcher_ec, listen_fd, EV_READ | EV_PERSIST,
+ std::bind(&ConnPool::accept_client, this, _1, _2));
+ ev_listen.add();
}
+ SALTICIDAE_LOG_INFO("listening to %u", ntohs(listen_addr.port));
}
-ConnPool::conn_t ConnPool::connect(const NetAddr &addr) {
+ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) {
int fd;
int one = 1;
if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
@@ -245,23 +251,41 @@ ConnPool::conn_t ConnPool::connect(const NetAddr &addr) {
}
else
{
- conn->ev_connect = Event(ec, fd, EV_WRITE,
- std::bind(&Conn::conn_server, conn.get(), _1, _2));
- conn->ev_connect.add_with_timeout(conn_server_timeout);
-
+ {
+ std::lock_guard<std::mutex> _(dsp_ec_mlock);
+ conn->ev_connect = Event(dispatcher_ec, fd, EV_WRITE,
+ std::bind(&Conn::conn_server, conn.get(), _1, _2));
+ conn->ev_connect.add_with_timeout(conn_server_timeout);
+ }
add_conn(conn);
SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str());
}
return conn;
}
-ConnPool::conn_t ConnPool::add_conn(conn_t conn) {
- auto it = pool.find(conn->fd);
+void ConnPool::_post_terminate(int fd) {
+ std::lock_guard<std::mutex> _(cp_mlock);
+ auto it = pool.find(fd);
if (it != pool.end())
{
- auto old_conn = it->second;
- old_conn->terminate();
+ /* temporarily pin the conn before it dies */
+ auto conn = it->second;
+ assert(conn->fd == fd);
+ pool.erase(it);
+ conn->on_close();
+ /* inform the upper layer the connection will be destroyed */
+ conn->on_teardown();
}
+}
+
+ConnPool::conn_t ConnPool::add_conn(const conn_t &conn) {
+ std::lock_guard<std::mutex> _(cp_mlock);
+ assert(pool.find(conn->fd) == pool.end());
+ //if (it != pool.end())
+ //{
+ // auto old_conn = it->second;
+ // old_conn->terminate();
+ //}
return pool.insert(std::make_pair(conn->fd, conn)).first->second;
}