aboutsummaryrefslogtreecommitdiff
path: root/include
diff options
context:
space:
mode:
Diffstat (limited to 'include')
-rw-r--r--include/salticidae/conn.h58
-rw-r--r--include/salticidae/event.h132
-rw-r--r--include/salticidae/msg.h2
-rw-r--r--include/salticidae/network.h8
4 files changed, 142 insertions, 58 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 1364d4d..a86a4d2 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -39,6 +39,7 @@
#include <mutex>
#include <thread>
#include <fcntl.h>
+#include <event2/thread.h>
#include "salticidae/type.h"
#include "salticidae/ref.h"
@@ -90,9 +91,9 @@ class ConnPool {
/** does not need to wait if true */
bool ready_send;
- void recv_data(evutil_socket_t, short);
- void send_data(evutil_socket_t, short);
- void conn_server(evutil_socket_t, short);
+ void recv_data(int, int);
+ void send_data(int, int);
+ void conn_server(int, int);
/** Terminate the connection. */
void terminate();
@@ -180,7 +181,7 @@ class ConnPool {
Worker() {
if (pipe2(ctl_fd, O_NONBLOCK))
throw ConnPoolError(std::string("failed to create worker pipe"));
- ev_ctl = Event(ec, ctl_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) {
+ ev_ctl = Event(ec, ctl_fd[0], Event::READ, [this](int fd, int) {
WorkerCmd *dcmd;
read(fd, &dcmd, sizeof(dcmd));
dcmd->exec(this);
@@ -229,17 +230,23 @@ class ConnPool {
.get_queue()
.reg_handler(ec, [conn=this->conn,
client_fd=this->client_fd](MPSCWriteBuffer::queue_t &) {
- if (conn->ready_send)
- conn->send_data(client_fd, EV_WRITE);
+ if (conn->ready_send && conn->fd != -1)
+ conn->send_data(client_fd, Event::WRITE);
return false;
});
- auto conn_ptr = conn.get();
- conn->ev_read = Event(ec, client_fd, EV_READ,
- std::bind(&Conn::recv_data, conn_ptr, _1, _2));
- conn->ev_write = Event(ec, client_fd, EV_WRITE,
- std::bind(&Conn::send_data, conn_ptr, _1, _2));
+ //auto conn_ptr = conn.get();
+ conn->ev_read = Event(ec, client_fd, Event::READ | Event::WRITE, [conn=conn](int fd, int what) {
+ if (what & Event::READ)
+ conn->recv_data(fd, what);
+ else
+ conn->send_data(fd, what);
+ });
+
+ // std::bind(&Conn::recv_data, conn_ptr, _1, _2));
+ //conn->ev_write = Event(ec, client_fd, Event::WRITE,
+ // std::bind(&Conn::send_data, conn_ptr, _1, _2));
conn->ev_read.add();
- conn->ev_write.add();
+ //conn->ev_write.add();
}
};
@@ -252,10 +259,12 @@ class ConnPool {
size_t nworker;
salticidae::BoxObj<Worker[]> workers;
- void accept_client(evutil_socket_t, short);
+ void accept_client(int, int);
conn_t add_conn(const conn_t &conn);
conn_t _connect(const NetAddr &addr);
void _post_terminate(int fd);
+ void _accept_listen(int listen_fd);
+ void _connect_listen(const conn_t &conn);
protected:
class DispatchCmd {
@@ -295,6 +304,24 @@ class ConnPool {
}
};
+ class DspAcceptListen: public DispatchCmd {
+ int listen_fd;
+ public:
+ DspAcceptListen(int listen_fd): listen_fd(listen_fd) {}
+ void exec(ConnPool *cpool) override {
+ cpool->_accept_listen(listen_fd);
+ }
+ };
+
+ class DspConnectListen: public DispatchCmd {
+ conn_t conn;
+ public:
+ DspConnectListen(const conn_t &conn): conn(conn) {}
+ void exec(ConnPool *cpool) override {
+ cpool->_connect_listen(conn);
+ }
+ };
+
class UserConn: public DispatchCmd {
conn_t conn;
bool connected;
@@ -320,7 +347,6 @@ class ConnPool {
EventContext ec;
EventContext dispatcher_ec;
int dlisten_fd[2]; /**< for control command sent to the dispatcher */
- std::mutex dsp_ec_mlock;
/** Should be implemented by derived class to return a new Conn object. */
virtual Conn *create_conn() = 0;
@@ -341,7 +367,7 @@ class ConnPool {
if (pipe2(dlisten_fd, O_NONBLOCK))
throw ConnPoolError(std::string("failed to create dispatcher pipe"));
- ev_mlisten = Event(ec, mlisten_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) {
+ ev_mlisten = Event(ec, mlisten_fd[0], Event::READ, [this](int fd, int) {
DispatchCmd *dcmd;
read(fd, &dcmd, sizeof(dcmd));
dcmd->exec(this);
@@ -352,7 +378,7 @@ class ConnPool {
workers = new Worker[nworker];
dispatcher_ec = workers[0].get_ec();
- ev_dlisten = Event(dispatcher_ec, dlisten_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) {
+ ev_dlisten = Event(dispatcher_ec, dlisten_fd[0], Event::READ, [this](int fd, int) {
DispatchCmd *dcmd;
read(fd, &dcmd, sizeof(dcmd));
dcmd->exec(this);
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index ddb93fc..c21644b 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -26,9 +26,8 @@
#define _SALTICIDAE_EVENT_H
#include <unistd.h>
-#include <event2/event.h>
+#include <uv.h>
#include <sys/eventfd.h>
-//#include <thread>
#include "salticidae/queue.h"
#include "salticidae/util.h"
@@ -38,87 +37,144 @@ namespace salticidae {
struct _event_context_deleter {
constexpr _event_context_deleter() = default;
- void operator()(struct event_base *ptr) {
- if (ptr != nullptr) event_base_free(ptr);
+ void operator()(uv_loop_t *ptr) {
+ if (ptr != nullptr)
+ {
+ uv_loop_close(ptr);
+ delete ptr;
+ }
}
};
-using _event_context_ot = RcObj<struct event_base, _event_context_deleter>;
+using _event_context_ot = ArcObj<uv_loop_t, _event_context_deleter>;
class EventContext: public _event_context_ot {
public:
- EventContext(): _event_context_ot(event_base_new()) {}
- EventContext(struct event_base *eb): _event_context_ot(eb) {}
+ EventContext(): _event_context_ot(new uv_loop_t()) {
+ uv_loop_init(get());
+ }
+ EventContext(uv_loop_t *eb): _event_context_ot(eb) {}
EventContext(const EventContext &) = default;
EventContext(EventContext &&) = default;
EventContext &operator=(const EventContext &) = default;
EventContext &operator=(EventContext &&) = default;
- void dispatch() const { event_base_dispatch(get()); }
- void stop() const { event_base_loopbreak(get()); }
+ void dispatch() const {
+ // TODO: improve this loop
+ for (;;)
+ uv_run(get(), UV_RUN_ONCE);
+ }
+ void stop() const { uv_stop(get()); }
};
class Event {
public:
- using callback_t = std::function<void(evutil_socket_t fd, short events)>;
+ using callback_t = std::function<void(int fd, short events)>;
+ static const int READ = UV_READABLE;
+ static const int WRITE = UV_WRITABLE;
+ static const int TIMEOUT = ~(UV_READABLE | UV_WRITABLE |
+ UV_DISCONNECT | UV_PRIORITIZED);
private:
EventContext eb;
- evutil_socket_t fd;
- short events;
- struct event *ev;
+ int fd;
+ int events;
+ uv_poll_t *ev_fd;
+ uv_timer_t *ev_timer;
callback_t callback;
- static inline void _then(evutil_socket_t fd, short events, void *arg) {
- (static_cast<Event *>(arg))->callback(fd, events);
+ static inline void fd_then(uv_poll_t *h, int status, int events) {
+ assert(status == 0);
+ auto event = static_cast<Event *>(h->data);
+ event->callback(event->fd, events);
+ }
+
+ static inline void timer_then(uv_timer_t *h) {
+ auto event = static_cast<Event *>(h->data);
+ if (event->ev_fd) uv_poll_stop(event->ev_fd);
+ event->callback(event->fd, TIMEOUT);
}
public:
- Event(): eb(nullptr), ev(nullptr) {}
- Event(const EventContext &eb,
- evutil_socket_t fd,
- short events,
- callback_t callback):
+ Event(): eb(nullptr), ev_fd(nullptr), ev_timer(nullptr) {}
+ Event(const EventContext &eb, int fd, short events, callback_t callback):
eb(eb), fd(fd), events(events),
- ev(event_new(eb.get(), fd, events, Event::_then, this)),
- callback(callback) {}
+ ev_fd(nullptr),
+ ev_timer(new uv_timer_t()),
+ callback(callback) {
+ if (fd != -1)
+ {
+ ev_fd = new uv_poll_t();
+ uv_poll_init(eb.get(), ev_fd, fd);
+ ev_fd->data = this;
+ }
+ uv_timer_init(eb.get(), ev_timer);
+ ev_timer->data = this;
+ }
Event(const Event &) = delete;
Event(Event &&other):
eb(std::move(other.eb)), fd(other.fd), events(other.events),
+ ev_fd(other.ev_fd), ev_timer(other.ev_timer),
callback(std::move(other.callback)) {
- other.clear();
- ev = event_new(eb.get(), fd, events, Event::_then, this);
+ other.del();
+ if (fd != -1)
+ {
+ other.ev_fd = nullptr;
+ ev_fd->data = this;
+ }
+ other.ev_timer = nullptr;
+ ev_timer->data = this;
}
Event &operator=(Event &&other) {
clear();
- other.clear();
+ other.del();
eb = std::move(other.eb);
fd = other.fd;
events = other.events;
- ev = event_new(eb.get(), fd, events, Event::_then, this);
+ ev_fd = other.ev_fd;
+ ev_timer = other.ev_timer;
callback = std::move(other.callback);
+
+ if (fd != -1)
+ {
+ other.ev_fd = nullptr;
+ ev_fd->data = this;
+ }
+ other.ev_timer = nullptr;
+ ev_timer->data = this;
return *this;
}
~Event() { clear(); }
void clear() {
- if (ev != nullptr)
+ if (ev_fd != nullptr)
+ {
+ uv_poll_stop(ev_fd);
+ delete ev_fd;
+ ev_fd = nullptr;
+ }
+ if (ev_timer != nullptr)
{
- event_del(ev);
- event_free(ev);
- ev = nullptr;
+ uv_timer_stop(ev_timer);
+ delete ev_timer;
+ ev_timer = nullptr;
}
}
- void add() { if (ev) event_add(ev, nullptr); }
- void del() { if (ev) event_del(ev); }
- void add_with_timeout(double timeout) {
- if (ev)
- event_add_with_timeout(ev, timeout);
+ void add() {
+ if (ev_fd) uv_poll_start(ev_fd, events, Event::fd_then);
+ }
+ void del() {
+ if (ev_fd) uv_poll_stop(ev_fd);
+ uv_timer_stop(ev_timer);
+ }
+ void add_with_timeout(double t_sec) {
+ add();
+ uv_timer_start(ev_timer, Event::timer_then, uint64_t(t_sec * 1000), 0);
}
- operator bool() const { return ev != nullptr; }
+ operator bool() const { return ev_fd != nullptr || ev_timer != nullptr; }
};
template<typename T>
@@ -139,7 +195,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
template<typename Func>
void reg_handler(const EventContext &ec, Func &&func) {
- ev = Event(ec, fd, EV_READ | EV_PERSIST,
+ ev = Event(ec, fd, Event::READ,
[this, func=std::forward<Func>(func)](int, short) {
//fprintf(stderr, "%x\n", std::this_thread::get_id());
uint64_t t;
@@ -171,6 +227,8 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
}
};
+
+
// TODO: incorrect MPMCQueueEventDriven impl
/*
template<typename T>
diff --git a/include/salticidae/msg.h b/include/salticidae/msg.h
index 8c44bbb..257ce4f 100644
--- a/include/salticidae/msg.h
+++ b/include/salticidae/msg.h
@@ -192,7 +192,7 @@ class MsgBase {
#ifndef SALTICIDAE_NOCHECKSUM
uint32_t get_checksum() const {
- static class SHA256 sha256;
+ static thread_local class SHA256 sha256;
uint32_t res;
bytearray_t tmp;
#ifndef SALTICIDAE_NOCHECK
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 290eaa9..18406ea 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -310,7 +310,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
Peer &operator=(const Peer &) = delete;
Peer(const Peer &) = delete;
- void ping_timer(evutil_socket_t, short);
+ void ping_timer(int, int);
void reset_ping_timer();
void send_ping();
void clear_all_events() {
@@ -471,7 +471,7 @@ void PeerNetwork<O, _, __>::Conn::on_setup() {
MsgNet::Conn::on_setup();
auto pn = get_net();
assert(!ev_timeout);
- ev_timeout = Event(pn->ec, -1, 0, [this](evutil_socket_t, short) {
+ ev_timeout = Event(pn->ec, -1, 0, [this](int, int) {
SALTICIDAE_LOG_INFO("peer ping-pong timeout");
this->terminate();
});
@@ -504,7 +504,7 @@ void PeerNetwork<O, _, __>::Conn::on_teardown() {
std::string(peer_id).c_str());
// try to reconnect
p->ev_retry_timer = Event(pn->dispatcher_ec, -1, 0,
- [pn, peer_id = this->peer_id](evutil_socket_t, short) {
+ [pn, peer_id = this->peer_id](int, int) {
mutex_lg_t _pn_lg(pn->pn_mlock);
pn->start_active_conn(peer_id);
});
@@ -554,7 +554,7 @@ void PeerNetwork<O, _, __>::Peer::send_ping() {
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Peer::ping_timer(evutil_socket_t, short) {
+void PeerNetwork<O, _, __>::Peer::ping_timer(int, int) {
mutex_lg_t _p_lg(mlock);
ping_timer_ok = true;
if (pong_msg_ok)