diff options
author | Determinant <[email protected]> | 2018-11-13 18:20:08 -0500 |
---|---|---|
committer | Determinant <[email protected]> | 2018-11-13 18:20:08 -0500 |
commit | 2c1e8ec448a1039ab9a46bce4c959e6ec3cefeb8 (patch) | |
tree | 3389de1c53b304eee74e12d9e2adfbc2ab39fef1 | |
parent | 7645971cd6e21ebcf5dd1800bb1f1393284ee0c9 (diff) |
working on p2p; switch to libuv (libevent sucks in multi-threading)
-rw-r--r-- | CMakeLists.txt | 4 | ||||
-rw-r--r-- | include/salticidae/conn.h | 58 | ||||
-rw-r--r-- | include/salticidae/event.h | 132 | ||||
-rw-r--r-- | include/salticidae/msg.h | 2 | ||||
-rw-r--r-- | include/salticidae/network.h | 8 | ||||
-rw-r--r-- | src/conn.cpp | 49 | ||||
-rw-r--r-- | src/util.cpp | 8 | ||||
-rw-r--r-- | test/test_network.cpp | 4 |
8 files changed, 174 insertions, 91 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 66cd2d4..4769b38 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -39,11 +39,11 @@ if(BUILD_SHARED) set_property(TARGET salticidae PROPERTY POSITION_INDEPENDENT_CODE 1) add_library(salticidae_shared SHARED $<TARGET_OBJECTS:salticidae>) set_target_properties(salticidae_shared PROPERTIES OUTPUT_NAME "salticidae") - target_link_libraries(salticidae_shared event crypto pthread) + target_link_libraries(salticidae_shared uv crypto pthread) endif() add_library(salticidae_static STATIC $<TARGET_OBJECTS:salticidae>) set_target_properties(salticidae_static PROPERTIES OUTPUT_NAME "salticidae") -target_link_libraries(salticidae_static event crypto pthread) +target_link_libraries(salticidae_static uv crypto pthread) option(BUILD_TEST "build test binaries." OFF) if(BUILD_TEST) diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 1364d4d..a86a4d2 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -39,6 +39,7 @@ #include <mutex> #include <thread> #include <fcntl.h> +#include <event2/thread.h> #include "salticidae/type.h" #include "salticidae/ref.h" @@ -90,9 +91,9 @@ class ConnPool { /** does not need to wait if true */ bool ready_send; - void recv_data(evutil_socket_t, short); - void send_data(evutil_socket_t, short); - void conn_server(evutil_socket_t, short); + void recv_data(int, int); + void send_data(int, int); + void conn_server(int, int); /** Terminate the connection. */ void terminate(); @@ -180,7 +181,7 @@ class ConnPool { Worker() { if (pipe2(ctl_fd, O_NONBLOCK)) throw ConnPoolError(std::string("failed to create worker pipe")); - ev_ctl = Event(ec, ctl_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) { + ev_ctl = Event(ec, ctl_fd[0], Event::READ, [this](int fd, int) { WorkerCmd *dcmd; read(fd, &dcmd, sizeof(dcmd)); dcmd->exec(this); @@ -229,17 +230,23 @@ class ConnPool { .get_queue() .reg_handler(ec, [conn=this->conn, client_fd=this->client_fd](MPSCWriteBuffer::queue_t &) { - if (conn->ready_send) - conn->send_data(client_fd, EV_WRITE); + if (conn->ready_send && conn->fd != -1) + conn->send_data(client_fd, Event::WRITE); return false; }); - auto conn_ptr = conn.get(); - conn->ev_read = Event(ec, client_fd, EV_READ, - std::bind(&Conn::recv_data, conn_ptr, _1, _2)); - conn->ev_write = Event(ec, client_fd, EV_WRITE, - std::bind(&Conn::send_data, conn_ptr, _1, _2)); + //auto conn_ptr = conn.get(); + conn->ev_read = Event(ec, client_fd, Event::READ | Event::WRITE, [conn=conn](int fd, int what) { + if (what & Event::READ) + conn->recv_data(fd, what); + else + conn->send_data(fd, what); + }); + + // std::bind(&Conn::recv_data, conn_ptr, _1, _2)); + //conn->ev_write = Event(ec, client_fd, Event::WRITE, + // std::bind(&Conn::send_data, conn_ptr, _1, _2)); conn->ev_read.add(); - conn->ev_write.add(); + //conn->ev_write.add(); } }; @@ -252,10 +259,12 @@ class ConnPool { size_t nworker; salticidae::BoxObj<Worker[]> workers; - void accept_client(evutil_socket_t, short); + void accept_client(int, int); conn_t add_conn(const conn_t &conn); conn_t _connect(const NetAddr &addr); void _post_terminate(int fd); + void _accept_listen(int listen_fd); + void _connect_listen(const conn_t &conn); protected: class DispatchCmd { @@ -295,6 +304,24 @@ class ConnPool { } }; + class DspAcceptListen: public DispatchCmd { + int listen_fd; + public: + DspAcceptListen(int listen_fd): listen_fd(listen_fd) {} + void exec(ConnPool *cpool) override { + cpool->_accept_listen(listen_fd); + } + }; + + class DspConnectListen: public DispatchCmd { + conn_t conn; + public: + DspConnectListen(const conn_t &conn): conn(conn) {} + void exec(ConnPool *cpool) override { + cpool->_connect_listen(conn); + } + }; + class UserConn: public DispatchCmd { conn_t conn; bool connected; @@ -320,7 +347,6 @@ class ConnPool { EventContext ec; EventContext dispatcher_ec; int dlisten_fd[2]; /**< for control command sent to the dispatcher */ - std::mutex dsp_ec_mlock; /** Should be implemented by derived class to return a new Conn object. */ virtual Conn *create_conn() = 0; @@ -341,7 +367,7 @@ class ConnPool { if (pipe2(dlisten_fd, O_NONBLOCK)) throw ConnPoolError(std::string("failed to create dispatcher pipe")); - ev_mlisten = Event(ec, mlisten_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) { + ev_mlisten = Event(ec, mlisten_fd[0], Event::READ, [this](int fd, int) { DispatchCmd *dcmd; read(fd, &dcmd, sizeof(dcmd)); dcmd->exec(this); @@ -352,7 +378,7 @@ class ConnPool { workers = new Worker[nworker]; dispatcher_ec = workers[0].get_ec(); - ev_dlisten = Event(dispatcher_ec, dlisten_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) { + ev_dlisten = Event(dispatcher_ec, dlisten_fd[0], Event::READ, [this](int fd, int) { DispatchCmd *dcmd; read(fd, &dcmd, sizeof(dcmd)); dcmd->exec(this); diff --git a/include/salticidae/event.h b/include/salticidae/event.h index ddb93fc..c21644b 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -26,9 +26,8 @@ #define _SALTICIDAE_EVENT_H #include <unistd.h> -#include <event2/event.h> +#include <uv.h> #include <sys/eventfd.h> -//#include <thread> #include "salticidae/queue.h" #include "salticidae/util.h" @@ -38,87 +37,144 @@ namespace salticidae { struct _event_context_deleter { constexpr _event_context_deleter() = default; - void operator()(struct event_base *ptr) { - if (ptr != nullptr) event_base_free(ptr); + void operator()(uv_loop_t *ptr) { + if (ptr != nullptr) + { + uv_loop_close(ptr); + delete ptr; + } } }; -using _event_context_ot = RcObj<struct event_base, _event_context_deleter>; +using _event_context_ot = ArcObj<uv_loop_t, _event_context_deleter>; class EventContext: public _event_context_ot { public: - EventContext(): _event_context_ot(event_base_new()) {} - EventContext(struct event_base *eb): _event_context_ot(eb) {} + EventContext(): _event_context_ot(new uv_loop_t()) { + uv_loop_init(get()); + } + EventContext(uv_loop_t *eb): _event_context_ot(eb) {} EventContext(const EventContext &) = default; EventContext(EventContext &&) = default; EventContext &operator=(const EventContext &) = default; EventContext &operator=(EventContext &&) = default; - void dispatch() const { event_base_dispatch(get()); } - void stop() const { event_base_loopbreak(get()); } + void dispatch() const { + // TODO: improve this loop + for (;;) + uv_run(get(), UV_RUN_ONCE); + } + void stop() const { uv_stop(get()); } }; class Event { public: - using callback_t = std::function<void(evutil_socket_t fd, short events)>; + using callback_t = std::function<void(int fd, short events)>; + static const int READ = UV_READABLE; + static const int WRITE = UV_WRITABLE; + static const int TIMEOUT = ~(UV_READABLE | UV_WRITABLE | + UV_DISCONNECT | UV_PRIORITIZED); private: EventContext eb; - evutil_socket_t fd; - short events; - struct event *ev; + int fd; + int events; + uv_poll_t *ev_fd; + uv_timer_t *ev_timer; callback_t callback; - static inline void _then(evutil_socket_t fd, short events, void *arg) { - (static_cast<Event *>(arg))->callback(fd, events); + static inline void fd_then(uv_poll_t *h, int status, int events) { + assert(status == 0); + auto event = static_cast<Event *>(h->data); + event->callback(event->fd, events); + } + + static inline void timer_then(uv_timer_t *h) { + auto event = static_cast<Event *>(h->data); + if (event->ev_fd) uv_poll_stop(event->ev_fd); + event->callback(event->fd, TIMEOUT); } public: - Event(): eb(nullptr), ev(nullptr) {} - Event(const EventContext &eb, - evutil_socket_t fd, - short events, - callback_t callback): + Event(): eb(nullptr), ev_fd(nullptr), ev_timer(nullptr) {} + Event(const EventContext &eb, int fd, short events, callback_t callback): eb(eb), fd(fd), events(events), - ev(event_new(eb.get(), fd, events, Event::_then, this)), - callback(callback) {} + ev_fd(nullptr), + ev_timer(new uv_timer_t()), + callback(callback) { + if (fd != -1) + { + ev_fd = new uv_poll_t(); + uv_poll_init(eb.get(), ev_fd, fd); + ev_fd->data = this; + } + uv_timer_init(eb.get(), ev_timer); + ev_timer->data = this; + } Event(const Event &) = delete; Event(Event &&other): eb(std::move(other.eb)), fd(other.fd), events(other.events), + ev_fd(other.ev_fd), ev_timer(other.ev_timer), callback(std::move(other.callback)) { - other.clear(); - ev = event_new(eb.get(), fd, events, Event::_then, this); + other.del(); + if (fd != -1) + { + other.ev_fd = nullptr; + ev_fd->data = this; + } + other.ev_timer = nullptr; + ev_timer->data = this; } Event &operator=(Event &&other) { clear(); - other.clear(); + other.del(); eb = std::move(other.eb); fd = other.fd; events = other.events; - ev = event_new(eb.get(), fd, events, Event::_then, this); + ev_fd = other.ev_fd; + ev_timer = other.ev_timer; callback = std::move(other.callback); + + if (fd != -1) + { + other.ev_fd = nullptr; + ev_fd->data = this; + } + other.ev_timer = nullptr; + ev_timer->data = this; return *this; } ~Event() { clear(); } void clear() { - if (ev != nullptr) + if (ev_fd != nullptr) + { + uv_poll_stop(ev_fd); + delete ev_fd; + ev_fd = nullptr; + } + if (ev_timer != nullptr) { - event_del(ev); - event_free(ev); - ev = nullptr; + uv_timer_stop(ev_timer); + delete ev_timer; + ev_timer = nullptr; } } - void add() { if (ev) event_add(ev, nullptr); } - void del() { if (ev) event_del(ev); } - void add_with_timeout(double timeout) { - if (ev) - event_add_with_timeout(ev, timeout); + void add() { + if (ev_fd) uv_poll_start(ev_fd, events, Event::fd_then); + } + void del() { + if (ev_fd) uv_poll_stop(ev_fd); + uv_timer_stop(ev_timer); + } + void add_with_timeout(double t_sec) { + add(); + uv_timer_start(ev_timer, Event::timer_then, uint64_t(t_sec * 1000), 0); } - operator bool() const { return ev != nullptr; } + operator bool() const { return ev_fd != nullptr || ev_timer != nullptr; } }; template<typename T> @@ -139,7 +195,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> { template<typename Func> void reg_handler(const EventContext &ec, Func &&func) { - ev = Event(ec, fd, EV_READ | EV_PERSIST, + ev = Event(ec, fd, Event::READ, [this, func=std::forward<Func>(func)](int, short) { //fprintf(stderr, "%x\n", std::this_thread::get_id()); uint64_t t; @@ -171,6 +227,8 @@ class MPSCQueueEventDriven: public MPSCQueue<T> { } }; + + // TODO: incorrect MPMCQueueEventDriven impl /* template<typename T> diff --git a/include/salticidae/msg.h b/include/salticidae/msg.h index 8c44bbb..257ce4f 100644 --- a/include/salticidae/msg.h +++ b/include/salticidae/msg.h @@ -192,7 +192,7 @@ class MsgBase { #ifndef SALTICIDAE_NOCHECKSUM uint32_t get_checksum() const { - static class SHA256 sha256; + static thread_local class SHA256 sha256; uint32_t res; bytearray_t tmp; #ifndef SALTICIDAE_NOCHECK diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 290eaa9..18406ea 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -310,7 +310,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { Peer &operator=(const Peer &) = delete; Peer(const Peer &) = delete; - void ping_timer(evutil_socket_t, short); + void ping_timer(int, int); void reset_ping_timer(); void send_ping(); void clear_all_events() { @@ -471,7 +471,7 @@ void PeerNetwork<O, _, __>::Conn::on_setup() { MsgNet::Conn::on_setup(); auto pn = get_net(); assert(!ev_timeout); - ev_timeout = Event(pn->ec, -1, 0, [this](evutil_socket_t, short) { + ev_timeout = Event(pn->ec, -1, 0, [this](int, int) { SALTICIDAE_LOG_INFO("peer ping-pong timeout"); this->terminate(); }); @@ -504,7 +504,7 @@ void PeerNetwork<O, _, __>::Conn::on_teardown() { std::string(peer_id).c_str()); // try to reconnect p->ev_retry_timer = Event(pn->dispatcher_ec, -1, 0, - [pn, peer_id = this->peer_id](evutil_socket_t, short) { + [pn, peer_id = this->peer_id](int, int) { mutex_lg_t _pn_lg(pn->pn_mlock); pn->start_active_conn(peer_id); }); @@ -554,7 +554,7 @@ void PeerNetwork<O, _, __>::Peer::send_ping() { } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::Peer::ping_timer(evutil_socket_t, short) { +void PeerNetwork<O, _, __>::Peer::ping_timer(int, int) { mutex_lg_t _p_lg(mlock); ping_timer_ok = true; if (pong_msg_ok) diff --git a/src/conn.cpp b/src/conn.cpp index 7b7c699..f2922d8 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -46,8 +46,8 @@ ConnPool::Conn::operator std::string() const { /* the following two functions are executed by exactly one worker per Conn object */ -void ConnPool::Conn::send_data(evutil_socket_t fd, short events) { - if (!(events & EV_WRITE)) return; +void ConnPool::Conn::send_data(int fd, int events) { + if (!(events & Event::WRITE)) return; auto conn = self(); /* pin the connection */ ssize_t ret = seg_buff_size; for (;;) @@ -77,7 +77,7 @@ void ConnPool::Conn::send_data(evutil_socket_t fd, short events) { bytearray_t(buff_seg.begin() + ret, buff_seg.end())); /* wait for the next write callback */ ready_send = false; - ev_write.add(); + //ev_write.add(); return; } } @@ -85,8 +85,8 @@ void ConnPool::Conn::send_data(evutil_socket_t fd, short events) { ready_send = true; } -void ConnPool::Conn::recv_data(evutil_socket_t fd, short events) { - if (!(events & EV_READ)) return; +void ConnPool::Conn::recv_data(int fd, int events) { + if (!(events & Event::READ)) return; auto conn = self(); /* pin the connection */ ssize_t ret = seg_buff_size; while (ret == (ssize_t)seg_buff_size) @@ -105,13 +105,14 @@ void ConnPool::Conn::recv_data(evutil_socket_t fd, short events) { } if (ret == 0) { + //SALTICIDAE_LOG_INFO("recv(%d) terminates", fd, strerror(errno)); terminate(); return; } buff_seg.resize(ret); recv_buffer.push(std::move(buff_seg)); } - ev_read.add(); + //ev_read.add(); on_read(); } @@ -121,7 +122,7 @@ void ConnPool::Conn::terminate() { cpool->post_terminate(fd); } -void ConnPool::accept_client(evutil_socket_t fd, short) { +void ConnPool::accept_client(int fd, int) { int client_fd; struct sockaddr client_addr; socklen_t addr_size = sizeof(struct sockaddr_in); @@ -151,7 +152,7 @@ void ConnPool::accept_client(evutil_socket_t fd, short) { } } -void ConnPool::Conn::conn_server(evutil_socket_t fd, short events) { +void ConnPool::Conn::conn_server(int fd, int events) { auto conn = self(); /* pin the connection */ if (send(fd, "", 0, MSG_NOSIGNAL) == 0) { @@ -163,7 +164,7 @@ void ConnPool::Conn::conn_server(evutil_socket_t fd, short events) { } else { - if (events & EV_TIMEOUT) + if (events & Event::TIMEOUT) SALTICIDAE_LOG_INFO("%s connect timeout", std::string(*this).c_str()); terminate(); return; @@ -196,12 +197,8 @@ void ConnPool::listen(NetAddr listen_addr) { throw ConnPoolError(std::string("binding error")); if (::listen(listen_fd, max_listen_backlog) < 0) throw ConnPoolError(std::string("listen error")); - { - std::lock_guard<std::mutex> _(dsp_ec_mlock); - ev_listen = Event(dispatcher_ec, listen_fd, EV_READ | EV_PERSIST, - std::bind(&ConnPool::accept_client, this, _1, _2)); - ev_listen.add(); - } + auto dcmd = new DspAcceptListen(listen_fd); + write(dlisten_fd[1], &dcmd, sizeof(dcmd)); SALTICIDAE_LOG_INFO("listening to %u", ntohs(listen_addr.port)); } @@ -237,12 +234,8 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) { } else { - { - std::lock_guard<std::mutex> _(dsp_ec_mlock); - conn->ev_connect = Event(dispatcher_ec, fd, EV_WRITE, - std::bind(&Conn::conn_server, conn.get(), _1, _2)); - conn->ev_connect.add_with_timeout(conn_server_timeout); - } + auto dcmd = new DspConnectListen(conn); + write(dlisten_fd[1], &dcmd, sizeof(dcmd)); add_conn(conn); SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str()); } @@ -271,4 +264,18 @@ ConnPool::conn_t ConnPool::add_conn(const conn_t &conn) { return pool.insert(std::make_pair(conn->fd, conn)).first->second; } +void ConnPool::_accept_listen(int listen_fd) { + std::lock_guard<std::mutex> _(cp_mlock); + ev_listen = Event(dispatcher_ec, listen_fd, Event::READ, + std::bind(&ConnPool::accept_client, this, _1, _2)); + ev_listen.add(); +} + +void ConnPool::_connect_listen(const conn_t &conn) { + std::lock_guard<std::mutex> _(cp_mlock); + conn->ev_connect = Event(dispatcher_ec, conn->fd, Event::WRITE, + std::bind(&Conn::conn_server, conn.get(), _1, _2)); + conn->ev_connect.add_with_timeout(conn_server_timeout); +} + } diff --git a/src/util.cpp b/src/util.cpp index 6c8866d..b975f13 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -29,7 +29,6 @@ #include <ctime> #include <sys/time.h> #include <cmath> -#include <event2/event.h> #include "salticidae/util.h" @@ -40,13 +39,6 @@ void sec2tv(double t, struct timeval &tv) { tv.tv_usec = trunc((t - tv.tv_sec) * 1e6); } -void event_add_with_timeout(struct event *ev, double timeout) { - struct timeval tv; - tv.tv_sec = trunc(timeout); - tv.tv_usec = trunc((timeout - tv.tv_sec) * 1e6); - event_add(ev, &tv); -} - double gen_rand_timeout(double base_timeout, double alpha) { return base_timeout + rand() / (double)RAND_MAX * alpha * base_timeout; } diff --git a/test/test_network.cpp b/test/test_network.cpp index 1821ada..061b520 100644 --- a/test/test_network.cpp +++ b/test/test_network.cpp @@ -129,8 +129,8 @@ void on_receive_ack(MsgAck &&msg, MyNet::Conn &conn) { } salticidae::EventContext ec; -NetAddr alice_addr("127.0.0.1:1234"); -NetAddr bob_addr("127.0.0.1:1235"); +NetAddr alice_addr("127.0.0.1:12345"); +NetAddr bob_addr("127.0.0.1:12346"); int main() { /* test two nodes */ |