diff options
author | Determinant <[email protected]> | 2018-11-13 18:20:08 -0500 |
---|---|---|
committer | Determinant <[email protected]> | 2018-11-13 18:20:08 -0500 |
commit | 2c1e8ec448a1039ab9a46bce4c959e6ec3cefeb8 (patch) | |
tree | 3389de1c53b304eee74e12d9e2adfbc2ab39fef1 /src | |
parent | 7645971cd6e21ebcf5dd1800bb1f1393284ee0c9 (diff) |
working on p2p; switch to libuv (libevent sucks in multi-threading)
Diffstat (limited to 'src')
-rw-r--r-- | src/conn.cpp | 49 | ||||
-rw-r--r-- | src/util.cpp | 8 |
2 files changed, 28 insertions, 29 deletions
diff --git a/src/conn.cpp b/src/conn.cpp index 7b7c699..f2922d8 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -46,8 +46,8 @@ ConnPool::Conn::operator std::string() const { /* the following two functions are executed by exactly one worker per Conn object */ -void ConnPool::Conn::send_data(evutil_socket_t fd, short events) { - if (!(events & EV_WRITE)) return; +void ConnPool::Conn::send_data(int fd, int events) { + if (!(events & Event::WRITE)) return; auto conn = self(); /* pin the connection */ ssize_t ret = seg_buff_size; for (;;) @@ -77,7 +77,7 @@ void ConnPool::Conn::send_data(evutil_socket_t fd, short events) { bytearray_t(buff_seg.begin() + ret, buff_seg.end())); /* wait for the next write callback */ ready_send = false; - ev_write.add(); + //ev_write.add(); return; } } @@ -85,8 +85,8 @@ void ConnPool::Conn::send_data(evutil_socket_t fd, short events) { ready_send = true; } -void ConnPool::Conn::recv_data(evutil_socket_t fd, short events) { - if (!(events & EV_READ)) return; +void ConnPool::Conn::recv_data(int fd, int events) { + if (!(events & Event::READ)) return; auto conn = self(); /* pin the connection */ ssize_t ret = seg_buff_size; while (ret == (ssize_t)seg_buff_size) @@ -105,13 +105,14 @@ void ConnPool::Conn::recv_data(evutil_socket_t fd, short events) { } if (ret == 0) { + //SALTICIDAE_LOG_INFO("recv(%d) terminates", fd, strerror(errno)); terminate(); return; } buff_seg.resize(ret); recv_buffer.push(std::move(buff_seg)); } - ev_read.add(); + //ev_read.add(); on_read(); } @@ -121,7 +122,7 @@ void ConnPool::Conn::terminate() { cpool->post_terminate(fd); } -void ConnPool::accept_client(evutil_socket_t fd, short) { +void ConnPool::accept_client(int fd, int) { int client_fd; struct sockaddr client_addr; socklen_t addr_size = sizeof(struct sockaddr_in); @@ -151,7 +152,7 @@ void ConnPool::accept_client(evutil_socket_t fd, short) { } } -void ConnPool::Conn::conn_server(evutil_socket_t fd, short events) { +void ConnPool::Conn::conn_server(int fd, int events) { auto conn = self(); /* pin the connection */ if (send(fd, "", 0, MSG_NOSIGNAL) == 0) { @@ -163,7 +164,7 @@ void ConnPool::Conn::conn_server(evutil_socket_t fd, short events) { } else { - if (events & EV_TIMEOUT) + if (events & Event::TIMEOUT) SALTICIDAE_LOG_INFO("%s connect timeout", std::string(*this).c_str()); terminate(); return; @@ -196,12 +197,8 @@ void ConnPool::listen(NetAddr listen_addr) { throw ConnPoolError(std::string("binding error")); if (::listen(listen_fd, max_listen_backlog) < 0) throw ConnPoolError(std::string("listen error")); - { - std::lock_guard<std::mutex> _(dsp_ec_mlock); - ev_listen = Event(dispatcher_ec, listen_fd, EV_READ | EV_PERSIST, - std::bind(&ConnPool::accept_client, this, _1, _2)); - ev_listen.add(); - } + auto dcmd = new DspAcceptListen(listen_fd); + write(dlisten_fd[1], &dcmd, sizeof(dcmd)); SALTICIDAE_LOG_INFO("listening to %u", ntohs(listen_addr.port)); } @@ -237,12 +234,8 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) { } else { - { - std::lock_guard<std::mutex> _(dsp_ec_mlock); - conn->ev_connect = Event(dispatcher_ec, fd, EV_WRITE, - std::bind(&Conn::conn_server, conn.get(), _1, _2)); - conn->ev_connect.add_with_timeout(conn_server_timeout); - } + auto dcmd = new DspConnectListen(conn); + write(dlisten_fd[1], &dcmd, sizeof(dcmd)); add_conn(conn); SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str()); } @@ -271,4 +264,18 @@ ConnPool::conn_t ConnPool::add_conn(const conn_t &conn) { return pool.insert(std::make_pair(conn->fd, conn)).first->second; } +void ConnPool::_accept_listen(int listen_fd) { + std::lock_guard<std::mutex> _(cp_mlock); + ev_listen = Event(dispatcher_ec, listen_fd, Event::READ, + std::bind(&ConnPool::accept_client, this, _1, _2)); + ev_listen.add(); +} + +void ConnPool::_connect_listen(const conn_t &conn) { + std::lock_guard<std::mutex> _(cp_mlock); + conn->ev_connect = Event(dispatcher_ec, conn->fd, Event::WRITE, + std::bind(&Conn::conn_server, conn.get(), _1, _2)); + conn->ev_connect.add_with_timeout(conn_server_timeout); +} + } diff --git a/src/util.cpp b/src/util.cpp index 6c8866d..b975f13 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -29,7 +29,6 @@ #include <ctime> #include <sys/time.h> #include <cmath> -#include <event2/event.h> #include "salticidae/util.h" @@ -40,13 +39,6 @@ void sec2tv(double t, struct timeval &tv) { tv.tv_usec = trunc((t - tv.tv_sec) * 1e6); } -void event_add_with_timeout(struct event *ev, double timeout) { - struct timeval tv; - tv.tv_sec = trunc(timeout); - tv.tv_usec = trunc((timeout - tv.tv_sec) * 1e6); - event_add(ev, &tv); -} - double gen_rand_timeout(double base_timeout, double alpha) { return base_timeout + rand() / (double)RAND_MAX * alpha * base_timeout; } |