diff options
Diffstat (limited to 'include')
-rw-r--r-- | include/salticidae/conn.h | 58 | ||||
-rw-r--r-- | include/salticidae/event.h | 132 | ||||
-rw-r--r-- | include/salticidae/msg.h | 2 | ||||
-rw-r--r-- | include/salticidae/network.h | 8 |
4 files changed, 142 insertions, 58 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 1364d4d..a86a4d2 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -39,6 +39,7 @@ #include <mutex> #include <thread> #include <fcntl.h> +#include <event2/thread.h> #include "salticidae/type.h" #include "salticidae/ref.h" @@ -90,9 +91,9 @@ class ConnPool { /** does not need to wait if true */ bool ready_send; - void recv_data(evutil_socket_t, short); - void send_data(evutil_socket_t, short); - void conn_server(evutil_socket_t, short); + void recv_data(int, int); + void send_data(int, int); + void conn_server(int, int); /** Terminate the connection. */ void terminate(); @@ -180,7 +181,7 @@ class ConnPool { Worker() { if (pipe2(ctl_fd, O_NONBLOCK)) throw ConnPoolError(std::string("failed to create worker pipe")); - ev_ctl = Event(ec, ctl_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) { + ev_ctl = Event(ec, ctl_fd[0], Event::READ, [this](int fd, int) { WorkerCmd *dcmd; read(fd, &dcmd, sizeof(dcmd)); dcmd->exec(this); @@ -229,17 +230,23 @@ class ConnPool { .get_queue() .reg_handler(ec, [conn=this->conn, client_fd=this->client_fd](MPSCWriteBuffer::queue_t &) { - if (conn->ready_send) - conn->send_data(client_fd, EV_WRITE); + if (conn->ready_send && conn->fd != -1) + conn->send_data(client_fd, Event::WRITE); return false; }); - auto conn_ptr = conn.get(); - conn->ev_read = Event(ec, client_fd, EV_READ, - std::bind(&Conn::recv_data, conn_ptr, _1, _2)); - conn->ev_write = Event(ec, client_fd, EV_WRITE, - std::bind(&Conn::send_data, conn_ptr, _1, _2)); + //auto conn_ptr = conn.get(); + conn->ev_read = Event(ec, client_fd, Event::READ | Event::WRITE, [conn=conn](int fd, int what) { + if (what & Event::READ) + conn->recv_data(fd, what); + else + conn->send_data(fd, what); + }); + + // std::bind(&Conn::recv_data, conn_ptr, _1, _2)); + //conn->ev_write = Event(ec, client_fd, Event::WRITE, + // std::bind(&Conn::send_data, conn_ptr, _1, _2)); conn->ev_read.add(); - conn->ev_write.add(); + //conn->ev_write.add(); } }; @@ -252,10 +259,12 @@ class ConnPool { size_t nworker; salticidae::BoxObj<Worker[]> workers; - void accept_client(evutil_socket_t, short); + void accept_client(int, int); conn_t add_conn(const conn_t &conn); conn_t _connect(const NetAddr &addr); void _post_terminate(int fd); + void _accept_listen(int listen_fd); + void _connect_listen(const conn_t &conn); protected: class DispatchCmd { @@ -295,6 +304,24 @@ class ConnPool { } }; + class DspAcceptListen: public DispatchCmd { + int listen_fd; + public: + DspAcceptListen(int listen_fd): listen_fd(listen_fd) {} + void exec(ConnPool *cpool) override { + cpool->_accept_listen(listen_fd); + } + }; + + class DspConnectListen: public DispatchCmd { + conn_t conn; + public: + DspConnectListen(const conn_t &conn): conn(conn) {} + void exec(ConnPool *cpool) override { + cpool->_connect_listen(conn); + } + }; + class UserConn: public DispatchCmd { conn_t conn; bool connected; @@ -320,7 +347,6 @@ class ConnPool { EventContext ec; EventContext dispatcher_ec; int dlisten_fd[2]; /**< for control command sent to the dispatcher */ - std::mutex dsp_ec_mlock; /** Should be implemented by derived class to return a new Conn object. */ virtual Conn *create_conn() = 0; @@ -341,7 +367,7 @@ class ConnPool { if (pipe2(dlisten_fd, O_NONBLOCK)) throw ConnPoolError(std::string("failed to create dispatcher pipe")); - ev_mlisten = Event(ec, mlisten_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) { + ev_mlisten = Event(ec, mlisten_fd[0], Event::READ, [this](int fd, int) { DispatchCmd *dcmd; read(fd, &dcmd, sizeof(dcmd)); dcmd->exec(this); @@ -352,7 +378,7 @@ class ConnPool { workers = new Worker[nworker]; dispatcher_ec = workers[0].get_ec(); - ev_dlisten = Event(dispatcher_ec, dlisten_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) { + ev_dlisten = Event(dispatcher_ec, dlisten_fd[0], Event::READ, [this](int fd, int) { DispatchCmd *dcmd; read(fd, &dcmd, sizeof(dcmd)); dcmd->exec(this); diff --git a/include/salticidae/event.h b/include/salticidae/event.h index ddb93fc..c21644b 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -26,9 +26,8 @@ #define _SALTICIDAE_EVENT_H #include <unistd.h> -#include <event2/event.h> +#include <uv.h> #include <sys/eventfd.h> -//#include <thread> #include "salticidae/queue.h" #include "salticidae/util.h" @@ -38,87 +37,144 @@ namespace salticidae { struct _event_context_deleter { constexpr _event_context_deleter() = default; - void operator()(struct event_base *ptr) { - if (ptr != nullptr) event_base_free(ptr); + void operator()(uv_loop_t *ptr) { + if (ptr != nullptr) + { + uv_loop_close(ptr); + delete ptr; + } } }; -using _event_context_ot = RcObj<struct event_base, _event_context_deleter>; +using _event_context_ot = ArcObj<uv_loop_t, _event_context_deleter>; class EventContext: public _event_context_ot { public: - EventContext(): _event_context_ot(event_base_new()) {} - EventContext(struct event_base *eb): _event_context_ot(eb) {} + EventContext(): _event_context_ot(new uv_loop_t()) { + uv_loop_init(get()); + } + EventContext(uv_loop_t *eb): _event_context_ot(eb) {} EventContext(const EventContext &) = default; EventContext(EventContext &&) = default; EventContext &operator=(const EventContext &) = default; EventContext &operator=(EventContext &&) = default; - void dispatch() const { event_base_dispatch(get()); } - void stop() const { event_base_loopbreak(get()); } + void dispatch() const { + // TODO: improve this loop + for (;;) + uv_run(get(), UV_RUN_ONCE); + } + void stop() const { uv_stop(get()); } }; class Event { public: - using callback_t = std::function<void(evutil_socket_t fd, short events)>; + using callback_t = std::function<void(int fd, short events)>; + static const int READ = UV_READABLE; + static const int WRITE = UV_WRITABLE; + static const int TIMEOUT = ~(UV_READABLE | UV_WRITABLE | + UV_DISCONNECT | UV_PRIORITIZED); private: EventContext eb; - evutil_socket_t fd; - short events; - struct event *ev; + int fd; + int events; + uv_poll_t *ev_fd; + uv_timer_t *ev_timer; callback_t callback; - static inline void _then(evutil_socket_t fd, short events, void *arg) { - (static_cast<Event *>(arg))->callback(fd, events); + static inline void fd_then(uv_poll_t *h, int status, int events) { + assert(status == 0); + auto event = static_cast<Event *>(h->data); + event->callback(event->fd, events); + } + + static inline void timer_then(uv_timer_t *h) { + auto event = static_cast<Event *>(h->data); + if (event->ev_fd) uv_poll_stop(event->ev_fd); + event->callback(event->fd, TIMEOUT); } public: - Event(): eb(nullptr), ev(nullptr) {} - Event(const EventContext &eb, - evutil_socket_t fd, - short events, - callback_t callback): + Event(): eb(nullptr), ev_fd(nullptr), ev_timer(nullptr) {} + Event(const EventContext &eb, int fd, short events, callback_t callback): eb(eb), fd(fd), events(events), - ev(event_new(eb.get(), fd, events, Event::_then, this)), - callback(callback) {} + ev_fd(nullptr), + ev_timer(new uv_timer_t()), + callback(callback) { + if (fd != -1) + { + ev_fd = new uv_poll_t(); + uv_poll_init(eb.get(), ev_fd, fd); + ev_fd->data = this; + } + uv_timer_init(eb.get(), ev_timer); + ev_timer->data = this; + } Event(const Event &) = delete; Event(Event &&other): eb(std::move(other.eb)), fd(other.fd), events(other.events), + ev_fd(other.ev_fd), ev_timer(other.ev_timer), callback(std::move(other.callback)) { - other.clear(); - ev = event_new(eb.get(), fd, events, Event::_then, this); + other.del(); + if (fd != -1) + { + other.ev_fd = nullptr; + ev_fd->data = this; + } + other.ev_timer = nullptr; + ev_timer->data = this; } Event &operator=(Event &&other) { clear(); - other.clear(); + other.del(); eb = std::move(other.eb); fd = other.fd; events = other.events; - ev = event_new(eb.get(), fd, events, Event::_then, this); + ev_fd = other.ev_fd; + ev_timer = other.ev_timer; callback = std::move(other.callback); + + if (fd != -1) + { + other.ev_fd = nullptr; + ev_fd->data = this; + } + other.ev_timer = nullptr; + ev_timer->data = this; return *this; } ~Event() { clear(); } void clear() { - if (ev != nullptr) + if (ev_fd != nullptr) + { + uv_poll_stop(ev_fd); + delete ev_fd; + ev_fd = nullptr; + } + if (ev_timer != nullptr) { - event_del(ev); - event_free(ev); - ev = nullptr; + uv_timer_stop(ev_timer); + delete ev_timer; + ev_timer = nullptr; } } - void add() { if (ev) event_add(ev, nullptr); } - void del() { if (ev) event_del(ev); } - void add_with_timeout(double timeout) { - if (ev) - event_add_with_timeout(ev, timeout); + void add() { + if (ev_fd) uv_poll_start(ev_fd, events, Event::fd_then); + } + void del() { + if (ev_fd) uv_poll_stop(ev_fd); + uv_timer_stop(ev_timer); + } + void add_with_timeout(double t_sec) { + add(); + uv_timer_start(ev_timer, Event::timer_then, uint64_t(t_sec * 1000), 0); } - operator bool() const { return ev != nullptr; } + operator bool() const { return ev_fd != nullptr || ev_timer != nullptr; } }; template<typename T> @@ -139,7 +195,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> { template<typename Func> void reg_handler(const EventContext &ec, Func &&func) { - ev = Event(ec, fd, EV_READ | EV_PERSIST, + ev = Event(ec, fd, Event::READ, [this, func=std::forward<Func>(func)](int, short) { //fprintf(stderr, "%x\n", std::this_thread::get_id()); uint64_t t; @@ -171,6 +227,8 @@ class MPSCQueueEventDriven: public MPSCQueue<T> { } }; + + // TODO: incorrect MPMCQueueEventDriven impl /* template<typename T> diff --git a/include/salticidae/msg.h b/include/salticidae/msg.h index 8c44bbb..257ce4f 100644 --- a/include/salticidae/msg.h +++ b/include/salticidae/msg.h @@ -192,7 +192,7 @@ class MsgBase { #ifndef SALTICIDAE_NOCHECKSUM uint32_t get_checksum() const { - static class SHA256 sha256; + static thread_local class SHA256 sha256; uint32_t res; bytearray_t tmp; #ifndef SALTICIDAE_NOCHECK diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 290eaa9..18406ea 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -310,7 +310,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { Peer &operator=(const Peer &) = delete; Peer(const Peer &) = delete; - void ping_timer(evutil_socket_t, short); + void ping_timer(int, int); void reset_ping_timer(); void send_ping(); void clear_all_events() { @@ -471,7 +471,7 @@ void PeerNetwork<O, _, __>::Conn::on_setup() { MsgNet::Conn::on_setup(); auto pn = get_net(); assert(!ev_timeout); - ev_timeout = Event(pn->ec, -1, 0, [this](evutil_socket_t, short) { + ev_timeout = Event(pn->ec, -1, 0, [this](int, int) { SALTICIDAE_LOG_INFO("peer ping-pong timeout"); this->terminate(); }); @@ -504,7 +504,7 @@ void PeerNetwork<O, _, __>::Conn::on_teardown() { std::string(peer_id).c_str()); // try to reconnect p->ev_retry_timer = Event(pn->dispatcher_ec, -1, 0, - [pn, peer_id = this->peer_id](evutil_socket_t, short) { + [pn, peer_id = this->peer_id](int, int) { mutex_lg_t _pn_lg(pn->pn_mlock); pn->start_active_conn(peer_id); }); @@ -554,7 +554,7 @@ void PeerNetwork<O, _, __>::Peer::send_ping() { } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::Peer::ping_timer(evutil_socket_t, short) { +void PeerNetwork<O, _, __>::Peer::ping_timer(int, int) { mutex_lg_t _p_lg(mlock); ping_timer_ok = true; if (pong_msg_ok) |