aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2018-11-13 18:20:08 -0500
committerDeterminant <[email protected]>2018-11-13 18:20:08 -0500
commit2c1e8ec448a1039ab9a46bce4c959e6ec3cefeb8 (patch)
tree3389de1c53b304eee74e12d9e2adfbc2ab39fef1 /src
parent7645971cd6e21ebcf5dd1800bb1f1393284ee0c9 (diff)
working on p2p; switch to libuv (libevent sucks in multi-threading)
Diffstat (limited to 'src')
-rw-r--r--src/conn.cpp49
-rw-r--r--src/util.cpp8
2 files changed, 28 insertions, 29 deletions
diff --git a/src/conn.cpp b/src/conn.cpp
index 7b7c699..f2922d8 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -46,8 +46,8 @@ ConnPool::Conn::operator std::string() const {
/* the following two functions are executed by exactly one worker per Conn object */
-void ConnPool::Conn::send_data(evutil_socket_t fd, short events) {
- if (!(events & EV_WRITE)) return;
+void ConnPool::Conn::send_data(int fd, int events) {
+ if (!(events & Event::WRITE)) return;
auto conn = self(); /* pin the connection */
ssize_t ret = seg_buff_size;
for (;;)
@@ -77,7 +77,7 @@ void ConnPool::Conn::send_data(evutil_socket_t fd, short events) {
bytearray_t(buff_seg.begin() + ret, buff_seg.end()));
/* wait for the next write callback */
ready_send = false;
- ev_write.add();
+ //ev_write.add();
return;
}
}
@@ -85,8 +85,8 @@ void ConnPool::Conn::send_data(evutil_socket_t fd, short events) {
ready_send = true;
}
-void ConnPool::Conn::recv_data(evutil_socket_t fd, short events) {
- if (!(events & EV_READ)) return;
+void ConnPool::Conn::recv_data(int fd, int events) {
+ if (!(events & Event::READ)) return;
auto conn = self(); /* pin the connection */
ssize_t ret = seg_buff_size;
while (ret == (ssize_t)seg_buff_size)
@@ -105,13 +105,14 @@ void ConnPool::Conn::recv_data(evutil_socket_t fd, short events) {
}
if (ret == 0)
{
+ //SALTICIDAE_LOG_INFO("recv(%d) terminates", fd, strerror(errno));
terminate();
return;
}
buff_seg.resize(ret);
recv_buffer.push(std::move(buff_seg));
}
- ev_read.add();
+ //ev_read.add();
on_read();
}
@@ -121,7 +122,7 @@ void ConnPool::Conn::terminate() {
cpool->post_terminate(fd);
}
-void ConnPool::accept_client(evutil_socket_t fd, short) {
+void ConnPool::accept_client(int fd, int) {
int client_fd;
struct sockaddr client_addr;
socklen_t addr_size = sizeof(struct sockaddr_in);
@@ -151,7 +152,7 @@ void ConnPool::accept_client(evutil_socket_t fd, short) {
}
}
-void ConnPool::Conn::conn_server(evutil_socket_t fd, short events) {
+void ConnPool::Conn::conn_server(int fd, int events) {
auto conn = self(); /* pin the connection */
if (send(fd, "", 0, MSG_NOSIGNAL) == 0)
{
@@ -163,7 +164,7 @@ void ConnPool::Conn::conn_server(evutil_socket_t fd, short events) {
}
else
{
- if (events & EV_TIMEOUT)
+ if (events & Event::TIMEOUT)
SALTICIDAE_LOG_INFO("%s connect timeout", std::string(*this).c_str());
terminate();
return;
@@ -196,12 +197,8 @@ void ConnPool::listen(NetAddr listen_addr) {
throw ConnPoolError(std::string("binding error"));
if (::listen(listen_fd, max_listen_backlog) < 0)
throw ConnPoolError(std::string("listen error"));
- {
- std::lock_guard<std::mutex> _(dsp_ec_mlock);
- ev_listen = Event(dispatcher_ec, listen_fd, EV_READ | EV_PERSIST,
- std::bind(&ConnPool::accept_client, this, _1, _2));
- ev_listen.add();
- }
+ auto dcmd = new DspAcceptListen(listen_fd);
+ write(dlisten_fd[1], &dcmd, sizeof(dcmd));
SALTICIDAE_LOG_INFO("listening to %u", ntohs(listen_addr.port));
}
@@ -237,12 +234,8 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) {
}
else
{
- {
- std::lock_guard<std::mutex> _(dsp_ec_mlock);
- conn->ev_connect = Event(dispatcher_ec, fd, EV_WRITE,
- std::bind(&Conn::conn_server, conn.get(), _1, _2));
- conn->ev_connect.add_with_timeout(conn_server_timeout);
- }
+ auto dcmd = new DspConnectListen(conn);
+ write(dlisten_fd[1], &dcmd, sizeof(dcmd));
add_conn(conn);
SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str());
}
@@ -271,4 +264,18 @@ ConnPool::conn_t ConnPool::add_conn(const conn_t &conn) {
return pool.insert(std::make_pair(conn->fd, conn)).first->second;
}
+void ConnPool::_accept_listen(int listen_fd) {
+ std::lock_guard<std::mutex> _(cp_mlock);
+ ev_listen = Event(dispatcher_ec, listen_fd, Event::READ,
+ std::bind(&ConnPool::accept_client, this, _1, _2));
+ ev_listen.add();
+}
+
+void ConnPool::_connect_listen(const conn_t &conn) {
+ std::lock_guard<std::mutex> _(cp_mlock);
+ conn->ev_connect = Event(dispatcher_ec, conn->fd, Event::WRITE,
+ std::bind(&Conn::conn_server, conn.get(), _1, _2));
+ conn->ev_connect.add_with_timeout(conn_server_timeout);
+}
+
}
diff --git a/src/util.cpp b/src/util.cpp
index 6c8866d..b975f13 100644
--- a/src/util.cpp
+++ b/src/util.cpp
@@ -29,7 +29,6 @@
#include <ctime>
#include <sys/time.h>
#include <cmath>
-#include <event2/event.h>
#include "salticidae/util.h"
@@ -40,13 +39,6 @@ void sec2tv(double t, struct timeval &tv) {
tv.tv_usec = trunc((t - tv.tv_sec) * 1e6);
}
-void event_add_with_timeout(struct event *ev, double timeout) {
- struct timeval tv;
- tv.tv_sec = trunc(timeout);
- tv.tv_usec = trunc((timeout - tv.tv_sec) * 1e6);
- event_add(ev, &tv);
-}
-
double gen_rand_timeout(double base_timeout, double alpha) {
return base_timeout + rand() / (double)RAND_MAX * alpha * base_timeout;
}