aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt4
-rw-r--r--include/salticidae/conn.h58
-rw-r--r--include/salticidae/event.h132
-rw-r--r--include/salticidae/msg.h2
-rw-r--r--include/salticidae/network.h8
-rw-r--r--src/conn.cpp49
-rw-r--r--src/util.cpp8
-rw-r--r--test/test_network.cpp4
8 files changed, 174 insertions, 91 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 66cd2d4..4769b38 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -39,11 +39,11 @@ if(BUILD_SHARED)
set_property(TARGET salticidae PROPERTY POSITION_INDEPENDENT_CODE 1)
add_library(salticidae_shared SHARED $<TARGET_OBJECTS:salticidae>)
set_target_properties(salticidae_shared PROPERTIES OUTPUT_NAME "salticidae")
- target_link_libraries(salticidae_shared event crypto pthread)
+ target_link_libraries(salticidae_shared uv crypto pthread)
endif()
add_library(salticidae_static STATIC $<TARGET_OBJECTS:salticidae>)
set_target_properties(salticidae_static PROPERTIES OUTPUT_NAME "salticidae")
-target_link_libraries(salticidae_static event crypto pthread)
+target_link_libraries(salticidae_static uv crypto pthread)
option(BUILD_TEST "build test binaries." OFF)
if(BUILD_TEST)
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 1364d4d..a86a4d2 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -39,6 +39,7 @@
#include <mutex>
#include <thread>
#include <fcntl.h>
+#include <event2/thread.h>
#include "salticidae/type.h"
#include "salticidae/ref.h"
@@ -90,9 +91,9 @@ class ConnPool {
/** does not need to wait if true */
bool ready_send;
- void recv_data(evutil_socket_t, short);
- void send_data(evutil_socket_t, short);
- void conn_server(evutil_socket_t, short);
+ void recv_data(int, int);
+ void send_data(int, int);
+ void conn_server(int, int);
/** Terminate the connection. */
void terminate();
@@ -180,7 +181,7 @@ class ConnPool {
Worker() {
if (pipe2(ctl_fd, O_NONBLOCK))
throw ConnPoolError(std::string("failed to create worker pipe"));
- ev_ctl = Event(ec, ctl_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) {
+ ev_ctl = Event(ec, ctl_fd[0], Event::READ, [this](int fd, int) {
WorkerCmd *dcmd;
read(fd, &dcmd, sizeof(dcmd));
dcmd->exec(this);
@@ -229,17 +230,23 @@ class ConnPool {
.get_queue()
.reg_handler(ec, [conn=this->conn,
client_fd=this->client_fd](MPSCWriteBuffer::queue_t &) {
- if (conn->ready_send)
- conn->send_data(client_fd, EV_WRITE);
+ if (conn->ready_send && conn->fd != -1)
+ conn->send_data(client_fd, Event::WRITE);
return false;
});
- auto conn_ptr = conn.get();
- conn->ev_read = Event(ec, client_fd, EV_READ,
- std::bind(&Conn::recv_data, conn_ptr, _1, _2));
- conn->ev_write = Event(ec, client_fd, EV_WRITE,
- std::bind(&Conn::send_data, conn_ptr, _1, _2));
+ //auto conn_ptr = conn.get();
+ conn->ev_read = Event(ec, client_fd, Event::READ | Event::WRITE, [conn=conn](int fd, int what) {
+ if (what & Event::READ)
+ conn->recv_data(fd, what);
+ else
+ conn->send_data(fd, what);
+ });
+
+ // std::bind(&Conn::recv_data, conn_ptr, _1, _2));
+ //conn->ev_write = Event(ec, client_fd, Event::WRITE,
+ // std::bind(&Conn::send_data, conn_ptr, _1, _2));
conn->ev_read.add();
- conn->ev_write.add();
+ //conn->ev_write.add();
}
};
@@ -252,10 +259,12 @@ class ConnPool {
size_t nworker;
salticidae::BoxObj<Worker[]> workers;
- void accept_client(evutil_socket_t, short);
+ void accept_client(int, int);
conn_t add_conn(const conn_t &conn);
conn_t _connect(const NetAddr &addr);
void _post_terminate(int fd);
+ void _accept_listen(int listen_fd);
+ void _connect_listen(const conn_t &conn);
protected:
class DispatchCmd {
@@ -295,6 +304,24 @@ class ConnPool {
}
};
+ class DspAcceptListen: public DispatchCmd {
+ int listen_fd;
+ public:
+ DspAcceptListen(int listen_fd): listen_fd(listen_fd) {}
+ void exec(ConnPool *cpool) override {
+ cpool->_accept_listen(listen_fd);
+ }
+ };
+
+ class DspConnectListen: public DispatchCmd {
+ conn_t conn;
+ public:
+ DspConnectListen(const conn_t &conn): conn(conn) {}
+ void exec(ConnPool *cpool) override {
+ cpool->_connect_listen(conn);
+ }
+ };
+
class UserConn: public DispatchCmd {
conn_t conn;
bool connected;
@@ -320,7 +347,6 @@ class ConnPool {
EventContext ec;
EventContext dispatcher_ec;
int dlisten_fd[2]; /**< for control command sent to the dispatcher */
- std::mutex dsp_ec_mlock;
/** Should be implemented by derived class to return a new Conn object. */
virtual Conn *create_conn() = 0;
@@ -341,7 +367,7 @@ class ConnPool {
if (pipe2(dlisten_fd, O_NONBLOCK))
throw ConnPoolError(std::string("failed to create dispatcher pipe"));
- ev_mlisten = Event(ec, mlisten_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) {
+ ev_mlisten = Event(ec, mlisten_fd[0], Event::READ, [this](int fd, int) {
DispatchCmd *dcmd;
read(fd, &dcmd, sizeof(dcmd));
dcmd->exec(this);
@@ -352,7 +378,7 @@ class ConnPool {
workers = new Worker[nworker];
dispatcher_ec = workers[0].get_ec();
- ev_dlisten = Event(dispatcher_ec, dlisten_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) {
+ ev_dlisten = Event(dispatcher_ec, dlisten_fd[0], Event::READ, [this](int fd, int) {
DispatchCmd *dcmd;
read(fd, &dcmd, sizeof(dcmd));
dcmd->exec(this);
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index ddb93fc..c21644b 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -26,9 +26,8 @@
#define _SALTICIDAE_EVENT_H
#include <unistd.h>
-#include <event2/event.h>
+#include <uv.h>
#include <sys/eventfd.h>
-//#include <thread>
#include "salticidae/queue.h"
#include "salticidae/util.h"
@@ -38,87 +37,144 @@ namespace salticidae {
struct _event_context_deleter {
constexpr _event_context_deleter() = default;
- void operator()(struct event_base *ptr) {
- if (ptr != nullptr) event_base_free(ptr);
+ void operator()(uv_loop_t *ptr) {
+ if (ptr != nullptr)
+ {
+ uv_loop_close(ptr);
+ delete ptr;
+ }
}
};
-using _event_context_ot = RcObj<struct event_base, _event_context_deleter>;
+using _event_context_ot = ArcObj<uv_loop_t, _event_context_deleter>;
class EventContext: public _event_context_ot {
public:
- EventContext(): _event_context_ot(event_base_new()) {}
- EventContext(struct event_base *eb): _event_context_ot(eb) {}
+ EventContext(): _event_context_ot(new uv_loop_t()) {
+ uv_loop_init(get());
+ }
+ EventContext(uv_loop_t *eb): _event_context_ot(eb) {}
EventContext(const EventContext &) = default;
EventContext(EventContext &&) = default;
EventContext &operator=(const EventContext &) = default;
EventContext &operator=(EventContext &&) = default;
- void dispatch() const { event_base_dispatch(get()); }
- void stop() const { event_base_loopbreak(get()); }
+ void dispatch() const {
+ // TODO: improve this loop
+ for (;;)
+ uv_run(get(), UV_RUN_ONCE);
+ }
+ void stop() const { uv_stop(get()); }
};
class Event {
public:
- using callback_t = std::function<void(evutil_socket_t fd, short events)>;
+ using callback_t = std::function<void(int fd, short events)>;
+ static const int READ = UV_READABLE;
+ static const int WRITE = UV_WRITABLE;
+ static const int TIMEOUT = ~(UV_READABLE | UV_WRITABLE |
+ UV_DISCONNECT | UV_PRIORITIZED);
private:
EventContext eb;
- evutil_socket_t fd;
- short events;
- struct event *ev;
+ int fd;
+ int events;
+ uv_poll_t *ev_fd;
+ uv_timer_t *ev_timer;
callback_t callback;
- static inline void _then(evutil_socket_t fd, short events, void *arg) {
- (static_cast<Event *>(arg))->callback(fd, events);
+ static inline void fd_then(uv_poll_t *h, int status, int events) {
+ assert(status == 0);
+ auto event = static_cast<Event *>(h->data);
+ event->callback(event->fd, events);
+ }
+
+ static inline void timer_then(uv_timer_t *h) {
+ auto event = static_cast<Event *>(h->data);
+ if (event->ev_fd) uv_poll_stop(event->ev_fd);
+ event->callback(event->fd, TIMEOUT);
}
public:
- Event(): eb(nullptr), ev(nullptr) {}
- Event(const EventContext &eb,
- evutil_socket_t fd,
- short events,
- callback_t callback):
+ Event(): eb(nullptr), ev_fd(nullptr), ev_timer(nullptr) {}
+ Event(const EventContext &eb, int fd, short events, callback_t callback):
eb(eb), fd(fd), events(events),
- ev(event_new(eb.get(), fd, events, Event::_then, this)),
- callback(callback) {}
+ ev_fd(nullptr),
+ ev_timer(new uv_timer_t()),
+ callback(callback) {
+ if (fd != -1)
+ {
+ ev_fd = new uv_poll_t();
+ uv_poll_init(eb.get(), ev_fd, fd);
+ ev_fd->data = this;
+ }
+ uv_timer_init(eb.get(), ev_timer);
+ ev_timer->data = this;
+ }
Event(const Event &) = delete;
Event(Event &&other):
eb(std::move(other.eb)), fd(other.fd), events(other.events),
+ ev_fd(other.ev_fd), ev_timer(other.ev_timer),
callback(std::move(other.callback)) {
- other.clear();
- ev = event_new(eb.get(), fd, events, Event::_then, this);
+ other.del();
+ if (fd != -1)
+ {
+ other.ev_fd = nullptr;
+ ev_fd->data = this;
+ }
+ other.ev_timer = nullptr;
+ ev_timer->data = this;
}
Event &operator=(Event &&other) {
clear();
- other.clear();
+ other.del();
eb = std::move(other.eb);
fd = other.fd;
events = other.events;
- ev = event_new(eb.get(), fd, events, Event::_then, this);
+ ev_fd = other.ev_fd;
+ ev_timer = other.ev_timer;
callback = std::move(other.callback);
+
+ if (fd != -1)
+ {
+ other.ev_fd = nullptr;
+ ev_fd->data = this;
+ }
+ other.ev_timer = nullptr;
+ ev_timer->data = this;
return *this;
}
~Event() { clear(); }
void clear() {
- if (ev != nullptr)
+ if (ev_fd != nullptr)
+ {
+ uv_poll_stop(ev_fd);
+ delete ev_fd;
+ ev_fd = nullptr;
+ }
+ if (ev_timer != nullptr)
{
- event_del(ev);
- event_free(ev);
- ev = nullptr;
+ uv_timer_stop(ev_timer);
+ delete ev_timer;
+ ev_timer = nullptr;
}
}
- void add() { if (ev) event_add(ev, nullptr); }
- void del() { if (ev) event_del(ev); }
- void add_with_timeout(double timeout) {
- if (ev)
- event_add_with_timeout(ev, timeout);
+ void add() {
+ if (ev_fd) uv_poll_start(ev_fd, events, Event::fd_then);
+ }
+ void del() {
+ if (ev_fd) uv_poll_stop(ev_fd);
+ uv_timer_stop(ev_timer);
+ }
+ void add_with_timeout(double t_sec) {
+ add();
+ uv_timer_start(ev_timer, Event::timer_then, uint64_t(t_sec * 1000), 0);
}
- operator bool() const { return ev != nullptr; }
+ operator bool() const { return ev_fd != nullptr || ev_timer != nullptr; }
};
template<typename T>
@@ -139,7 +195,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
template<typename Func>
void reg_handler(const EventContext &ec, Func &&func) {
- ev = Event(ec, fd, EV_READ | EV_PERSIST,
+ ev = Event(ec, fd, Event::READ,
[this, func=std::forward<Func>(func)](int, short) {
//fprintf(stderr, "%x\n", std::this_thread::get_id());
uint64_t t;
@@ -171,6 +227,8 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
}
};
+
+
// TODO: incorrect MPMCQueueEventDriven impl
/*
template<typename T>
diff --git a/include/salticidae/msg.h b/include/salticidae/msg.h
index 8c44bbb..257ce4f 100644
--- a/include/salticidae/msg.h
+++ b/include/salticidae/msg.h
@@ -192,7 +192,7 @@ class MsgBase {
#ifndef SALTICIDAE_NOCHECKSUM
uint32_t get_checksum() const {
- static class SHA256 sha256;
+ static thread_local class SHA256 sha256;
uint32_t res;
bytearray_t tmp;
#ifndef SALTICIDAE_NOCHECK
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 290eaa9..18406ea 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -310,7 +310,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
Peer &operator=(const Peer &) = delete;
Peer(const Peer &) = delete;
- void ping_timer(evutil_socket_t, short);
+ void ping_timer(int, int);
void reset_ping_timer();
void send_ping();
void clear_all_events() {
@@ -471,7 +471,7 @@ void PeerNetwork<O, _, __>::Conn::on_setup() {
MsgNet::Conn::on_setup();
auto pn = get_net();
assert(!ev_timeout);
- ev_timeout = Event(pn->ec, -1, 0, [this](evutil_socket_t, short) {
+ ev_timeout = Event(pn->ec, -1, 0, [this](int, int) {
SALTICIDAE_LOG_INFO("peer ping-pong timeout");
this->terminate();
});
@@ -504,7 +504,7 @@ void PeerNetwork<O, _, __>::Conn::on_teardown() {
std::string(peer_id).c_str());
// try to reconnect
p->ev_retry_timer = Event(pn->dispatcher_ec, -1, 0,
- [pn, peer_id = this->peer_id](evutil_socket_t, short) {
+ [pn, peer_id = this->peer_id](int, int) {
mutex_lg_t _pn_lg(pn->pn_mlock);
pn->start_active_conn(peer_id);
});
@@ -554,7 +554,7 @@ void PeerNetwork<O, _, __>::Peer::send_ping() {
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Peer::ping_timer(evutil_socket_t, short) {
+void PeerNetwork<O, _, __>::Peer::ping_timer(int, int) {
mutex_lg_t _p_lg(mlock);
ping_timer_ok = true;
if (pong_msg_ok)
diff --git a/src/conn.cpp b/src/conn.cpp
index 7b7c699..f2922d8 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -46,8 +46,8 @@ ConnPool::Conn::operator std::string() const {
/* the following two functions are executed by exactly one worker per Conn object */
-void ConnPool::Conn::send_data(evutil_socket_t fd, short events) {
- if (!(events & EV_WRITE)) return;
+void ConnPool::Conn::send_data(int fd, int events) {
+ if (!(events & Event::WRITE)) return;
auto conn = self(); /* pin the connection */
ssize_t ret = seg_buff_size;
for (;;)
@@ -77,7 +77,7 @@ void ConnPool::Conn::send_data(evutil_socket_t fd, short events) {
bytearray_t(buff_seg.begin() + ret, buff_seg.end()));
/* wait for the next write callback */
ready_send = false;
- ev_write.add();
+ //ev_write.add();
return;
}
}
@@ -85,8 +85,8 @@ void ConnPool::Conn::send_data(evutil_socket_t fd, short events) {
ready_send = true;
}
-void ConnPool::Conn::recv_data(evutil_socket_t fd, short events) {
- if (!(events & EV_READ)) return;
+void ConnPool::Conn::recv_data(int fd, int events) {
+ if (!(events & Event::READ)) return;
auto conn = self(); /* pin the connection */
ssize_t ret = seg_buff_size;
while (ret == (ssize_t)seg_buff_size)
@@ -105,13 +105,14 @@ void ConnPool::Conn::recv_data(evutil_socket_t fd, short events) {
}
if (ret == 0)
{
+ //SALTICIDAE_LOG_INFO("recv(%d) terminates", fd, strerror(errno));
terminate();
return;
}
buff_seg.resize(ret);
recv_buffer.push(std::move(buff_seg));
}
- ev_read.add();
+ //ev_read.add();
on_read();
}
@@ -121,7 +122,7 @@ void ConnPool::Conn::terminate() {
cpool->post_terminate(fd);
}
-void ConnPool::accept_client(evutil_socket_t fd, short) {
+void ConnPool::accept_client(int fd, int) {
int client_fd;
struct sockaddr client_addr;
socklen_t addr_size = sizeof(struct sockaddr_in);
@@ -151,7 +152,7 @@ void ConnPool::accept_client(evutil_socket_t fd, short) {
}
}
-void ConnPool::Conn::conn_server(evutil_socket_t fd, short events) {
+void ConnPool::Conn::conn_server(int fd, int events) {
auto conn = self(); /* pin the connection */
if (send(fd, "", 0, MSG_NOSIGNAL) == 0)
{
@@ -163,7 +164,7 @@ void ConnPool::Conn::conn_server(evutil_socket_t fd, short events) {
}
else
{
- if (events & EV_TIMEOUT)
+ if (events & Event::TIMEOUT)
SALTICIDAE_LOG_INFO("%s connect timeout", std::string(*this).c_str());
terminate();
return;
@@ -196,12 +197,8 @@ void ConnPool::listen(NetAddr listen_addr) {
throw ConnPoolError(std::string("binding error"));
if (::listen(listen_fd, max_listen_backlog) < 0)
throw ConnPoolError(std::string("listen error"));
- {
- std::lock_guard<std::mutex> _(dsp_ec_mlock);
- ev_listen = Event(dispatcher_ec, listen_fd, EV_READ | EV_PERSIST,
- std::bind(&ConnPool::accept_client, this, _1, _2));
- ev_listen.add();
- }
+ auto dcmd = new DspAcceptListen(listen_fd);
+ write(dlisten_fd[1], &dcmd, sizeof(dcmd));
SALTICIDAE_LOG_INFO("listening to %u", ntohs(listen_addr.port));
}
@@ -237,12 +234,8 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) {
}
else
{
- {
- std::lock_guard<std::mutex> _(dsp_ec_mlock);
- conn->ev_connect = Event(dispatcher_ec, fd, EV_WRITE,
- std::bind(&Conn::conn_server, conn.get(), _1, _2));
- conn->ev_connect.add_with_timeout(conn_server_timeout);
- }
+ auto dcmd = new DspConnectListen(conn);
+ write(dlisten_fd[1], &dcmd, sizeof(dcmd));
add_conn(conn);
SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str());
}
@@ -271,4 +264,18 @@ ConnPool::conn_t ConnPool::add_conn(const conn_t &conn) {
return pool.insert(std::make_pair(conn->fd, conn)).first->second;
}
+void ConnPool::_accept_listen(int listen_fd) {
+ std::lock_guard<std::mutex> _(cp_mlock);
+ ev_listen = Event(dispatcher_ec, listen_fd, Event::READ,
+ std::bind(&ConnPool::accept_client, this, _1, _2));
+ ev_listen.add();
+}
+
+void ConnPool::_connect_listen(const conn_t &conn) {
+ std::lock_guard<std::mutex> _(cp_mlock);
+ conn->ev_connect = Event(dispatcher_ec, conn->fd, Event::WRITE,
+ std::bind(&Conn::conn_server, conn.get(), _1, _2));
+ conn->ev_connect.add_with_timeout(conn_server_timeout);
+}
+
}
diff --git a/src/util.cpp b/src/util.cpp
index 6c8866d..b975f13 100644
--- a/src/util.cpp
+++ b/src/util.cpp
@@ -29,7 +29,6 @@
#include <ctime>
#include <sys/time.h>
#include <cmath>
-#include <event2/event.h>
#include "salticidae/util.h"
@@ -40,13 +39,6 @@ void sec2tv(double t, struct timeval &tv) {
tv.tv_usec = trunc((t - tv.tv_sec) * 1e6);
}
-void event_add_with_timeout(struct event *ev, double timeout) {
- struct timeval tv;
- tv.tv_sec = trunc(timeout);
- tv.tv_usec = trunc((timeout - tv.tv_sec) * 1e6);
- event_add(ev, &tv);
-}
-
double gen_rand_timeout(double base_timeout, double alpha) {
return base_timeout + rand() / (double)RAND_MAX * alpha * base_timeout;
}
diff --git a/test/test_network.cpp b/test/test_network.cpp
index 1821ada..061b520 100644
--- a/test/test_network.cpp
+++ b/test/test_network.cpp
@@ -129,8 +129,8 @@ void on_receive_ack(MsgAck &&msg, MyNet::Conn &conn) {
}
salticidae::EventContext ec;
-NetAddr alice_addr("127.0.0.1:1234");
-NetAddr bob_addr("127.0.0.1:1235");
+NetAddr alice_addr("127.0.0.1:12345");
+NetAddr bob_addr("127.0.0.1:12346");
int main() {
/* test two nodes */