From 0f341fe7f092f704e1c1952c72085eb1ebd2086a Mon Sep 17 00:00:00 2001 From: Determinant Date: Wed, 14 Nov 2018 15:19:32 -0500 Subject: use ThreadCall pattern --- include/salticidae/conn.h | 267 +++++++++++++------------------------------ include/salticidae/event.h | 86 ++++++++++++++ include/salticidae/network.h | 217 +++++++++++++++-------------------- include/salticidae/queue.h | 2 +- src/conn.cpp | 43 +++---- test/test_network.cpp | 3 + test/test_p2p.cpp | 3 + 7 files changed, 282 insertions(+), 339 deletions(-) diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index a86a4d2..2ef6b50 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -95,6 +95,8 @@ class ConnPool { void send_data(int, int); void conn_server(int, int); + /** Stop the worker and I/O events. */ + void stop(); /** Terminate the connection. */ void terminate(); @@ -146,54 +148,27 @@ class ConnPool { const size_t seg_buff_size; /* owned by user loop */ - int mlisten_fd[2]; /**< for connection events sent to the user loop */ - Event ev_mlisten; + BoxObj user_tcall; conn_callback_t conn_cb; /* owned by the dispatcher */ + Event ev_listen; std::unordered_map pool; int listen_fd; /**< for accepting new network connections */ - Event ev_listen; - Event ev_dlisten; - std::mutex cp_mlock; void update_conn(const conn_t &conn, bool connected) { - auto dcmd = new UserConn(conn, connected); - write(mlisten_fd[1], &dcmd, sizeof(dcmd)); + user_tcall->call([this, conn, connected](ThreadCall::Handle &) { + if (conn_cb) conn_cb(*conn, connected); + }); } - struct Worker; - class WorkerFeed; - - class WorkerCmd { - public: - virtual ~WorkerCmd() = default; - virtual void exec(Worker *worker) = 0; - }; - class Worker { EventContext ec; - Event ev_ctl; - int ctl_fd[2]; /**< for control messages from dispatcher */ + ThreadCall msgr; std::thread handle; public: - Worker() { - if (pipe2(ctl_fd, O_NONBLOCK)) - throw ConnPoolError(std::string("failed to create worker pipe")); - ev_ctl = Event(ec, ctl_fd[0], Event::READ, [this](int fd, int) { - WorkerCmd *dcmd; - read(fd, &dcmd, sizeof(dcmd)); - dcmd->exec(this); - delete dcmd; - }); - ev_ctl.add(); - } - - ~Worker() { - close(ctl_fd[0]); - close(ctl_fd[1]); - } + Worker(): msgr(ec) {} /* the following functions are called by the dispatcher */ void start() { @@ -201,143 +176,69 @@ class ConnPool { } void feed(const conn_t &conn, int client_fd) { - auto dcmd = new WorkerFeed(conn, client_fd); - write(ctl_fd[1], &dcmd, sizeof(dcmd)); + msgr.call([this, conn, client_fd](ThreadCall::Handle &) { + SALTICIDAE_LOG_INFO("worker %x got %s", + std::this_thread::get_id(), + std::string(*conn).c_str()); + conn->get_send_buffer() + .get_queue() + .reg_handler(this->ec, [conn, client_fd] + (MPSCWriteBuffer::queue_t &) { + if (conn->ready_send && conn->fd != -1) + conn->send_data(client_fd, Event::WRITE); + return false; + }); + //auto conn_ptr = conn.get(); + conn->ev_read = Event(ec, client_fd, Event::READ | Event::WRITE, [conn=conn](int fd, int what) { + if (what & Event::READ) + conn->recv_data(fd, what); + else + conn->send_data(fd, what); + }); + + // std::bind(&Conn::recv_data, conn_ptr, _1, _2)); + //conn->ev_write = Event(ec, client_fd, Event::WRITE, + // std::bind(&Conn::send_data, conn_ptr, _1, _2)); + conn->ev_read.add(); + //conn->ev_write.add(); + }); } void stop() { - auto dcmd = new WorkerStop(); - write(ctl_fd[1], &dcmd, sizeof(dcmd)); + msgr.call([this](ThreadCall::Handle &) { + ec.stop(); + }); } std::thread &get_handle() { return handle; } const EventContext &get_ec() { return ec; } }; - class WorkerFeed: public WorkerCmd { - conn_t conn; - int client_fd; - - public: - WorkerFeed(const conn_t &conn, int client_fd): - conn(conn), client_fd(client_fd) {} - void exec(Worker *worker) override { - SALTICIDAE_LOG_INFO("worker %x got %s", - std::this_thread::get_id(), - std::string(*conn).c_str()); - auto &ec = worker->get_ec(); - conn->get_send_buffer() - .get_queue() - .reg_handler(ec, [conn=this->conn, - client_fd=this->client_fd](MPSCWriteBuffer::queue_t &) { - if (conn->ready_send && conn->fd != -1) - conn->send_data(client_fd, Event::WRITE); - return false; - }); - //auto conn_ptr = conn.get(); - conn->ev_read = Event(ec, client_fd, Event::READ | Event::WRITE, [conn=conn](int fd, int what) { - if (what & Event::READ) - conn->recv_data(fd, what); - else - conn->send_data(fd, what); - }); - - // std::bind(&Conn::recv_data, conn_ptr, _1, _2)); - //conn->ev_write = Event(ec, client_fd, Event::WRITE, - // std::bind(&Conn::send_data, conn_ptr, _1, _2)); - conn->ev_read.add(); - //conn->ev_write.add(); - } - }; - - class WorkerStop: public WorkerCmd { - public: - void exec(Worker *worker) override { worker->get_ec().stop(); } - }; - /* related to workers */ size_t nworker; salticidae::BoxObj workers; void accept_client(int, int); conn_t add_conn(const conn_t &conn); - conn_t _connect(const NetAddr &addr); - void _post_terminate(int fd); - void _accept_listen(int listen_fd); - void _connect_listen(const conn_t &conn); + void terminate(int fd); protected: - class DispatchCmd { - public: - virtual ~DispatchCmd() = default; - virtual void exec(ConnPool *cpool) = 0; - }; + conn_t _connect(const NetAddr &addr); + void _listen(NetAddr listen_addr); private: - class DspConnect: public DispatchCmd { - const NetAddr addr; - public: - DspConnect(const NetAddr &addr): addr(addr) {} - void exec(ConnPool *cpool) override { - cpool->update_conn(cpool->_connect(addr), true); - } - }; - - class DspPostTerm: public DispatchCmd { - int fd; - public: - DspPostTerm(int fd): fd(fd) {} - void exec(ConnPool *cpool) override { - cpool->_post_terminate(fd); - } - }; - - class DspMulticast: public DispatchCmd { - std::vector receivers; - bytearray_t data; - public: - DspMulticast(std::vector &&receivers, bytearray_t &&data): - receivers(std::move(receivers)), - data(std::move(data)) {} - void exec(ConnPool *) override { - for (auto &r: receivers) r->write(bytearray_t(data)); - } - }; - - class DspAcceptListen: public DispatchCmd { - int listen_fd; - public: - DspAcceptListen(int listen_fd): listen_fd(listen_fd) {} - void exec(ConnPool *cpool) override { - cpool->_accept_listen(listen_fd); - } - }; - class DspConnectListen: public DispatchCmd { - conn_t conn; - public: - DspConnectListen(const conn_t &conn): conn(conn) {} - void exec(ConnPool *cpool) override { - cpool->_connect_listen(conn); - } - }; - - class UserConn: public DispatchCmd { - conn_t conn; - bool connected; - public: - UserConn(const conn_t &conn, bool connected): - conn(conn), connected(connected) {} - void exec(ConnPool *cpool) override { - if (cpool->conn_cb) - cpool->conn_cb(*conn, connected); - } - }; - - void post_terminate(int fd) { - auto dcmd = new DspPostTerm(fd); - write(dlisten_fd[1], &dcmd, sizeof(dcmd)); - } + //class DspMulticast: public DispatchCmd { + // std::vector receivers; + // bytearray_t data; + // public: + // DspMulticast(std::vector &&receivers, bytearray_t &&data): + // receivers(std::move(receivers)), + // data(std::move(data)) {} + // void exec(ConnPool *) override { + // for (auto &r: receivers) r->write(bytearray_t(data)); + // } + //}; Worker &select_worker() { return workers[1]; @@ -346,7 +247,7 @@ class ConnPool { protected: EventContext ec; EventContext dispatcher_ec; - int dlisten_fd[2]; /**< for control command sent to the dispatcher */ + BoxObj disp_tcall; /** Should be implemented by derived class to return a new Conn object. */ virtual Conn *create_conn() = 0; @@ -360,35 +261,13 @@ class ConnPool { conn_server_timeout(conn_server_timeout), seg_buff_size(seg_buff_size), listen_fd(-1), - nworker(std::min((size_t)1, nworker)), + nworker(std::max((size_t)1, nworker)), ec(ec) { - if (pipe2(mlisten_fd, O_NONBLOCK)) - throw ConnPoolError(std::string("failed to create main pipe")); - if (pipe2(dlisten_fd, O_NONBLOCK)) - throw ConnPoolError(std::string("failed to create dispatcher pipe")); - - ev_mlisten = Event(ec, mlisten_fd[0], Event::READ, [this](int fd, int) { - DispatchCmd *dcmd; - read(fd, &dcmd, sizeof(dcmd)); - dcmd->exec(this); - delete dcmd; - }); - ev_mlisten.add(); - workers = new Worker[nworker]; dispatcher_ec = workers[0].get_ec(); - ev_dlisten = Event(dispatcher_ec, dlisten_fd[0], Event::READ, [this](int fd, int) { - DispatchCmd *dcmd; - read(fd, &dcmd, sizeof(dcmd)); - dcmd->exec(this); - delete dcmd; - }); - ev_dlisten.add(); - - SALTICIDAE_LOG_INFO("starting all threads..."); - for (size_t i = 0; i < nworker; i++) - workers[i].start(); + user_tcall = new ThreadCall(ec); + disp_tcall = new ThreadCall(dispatcher_ec); } ~ConnPool() { @@ -404,24 +283,36 @@ class ConnPool { conn->on_close(); } if (listen_fd != -1) close(listen_fd); - for (int i = 0; i < 2; i++) - { - close(mlisten_fd[i]); - close(dlisten_fd[i]); - } } ConnPool(const ConnPool &) = delete; ConnPool(ConnPool &&) = delete; + void start() { + SALTICIDAE_LOG_INFO("starting all threads..."); + for (size_t i = 0; i < nworker; i++) + workers[i].start(); + } + /** Actively connect to remote addr. */ conn_t connect(const NetAddr &addr, bool blocking = true) { if (blocking) - return _connect(addr); + { + auto ret = static_cast(disp_tcall->call( + [this, addr](ThreadCall::Handle &h) { + auto ptr = new conn_t(_connect(addr)); + std::atomic_thread_fence(std::memory_order_release); + h.set_result(ptr); + }, true)); + auto conn = *ret; + delete ret; + return std::move(conn); + } else { - auto dcmd = new DspConnect(addr); - write(dlisten_fd[1], &dcmd, sizeof(dcmd)); + disp_tcall->call([this, addr](ThreadCall::Handle &) { + _connect(addr); + }, false); return nullptr; } } @@ -429,7 +320,11 @@ class ConnPool { /** Listen for passive connections (connection initiated from remote). * Does not need to be called if do not want to accept any passive * connections. */ - void listen(NetAddr listen_addr); + void listen(NetAddr listen_addr) { + disp_tcall->call([this, listen_addr](ThreadCall::Handle &) { + _listen(listen_addr); + }, true); + } template void reg_conn_handler(Func cb) { conn_cb = cb; } diff --git a/include/salticidae/event.h b/include/salticidae/event.h index c21644b..da27902 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -25,10 +25,12 @@ #ifndef _SALTICIDAE_EVENT_H #define _SALTICIDAE_EVENT_H +#include #include #include #include +#include "salticidae/type.h" #include "salticidae/queue.h" #include "salticidae/util.h" #include "salticidae/ref.h" @@ -177,6 +179,90 @@ class Event { operator bool() const { return ev_fd != nullptr || ev_timer != nullptr; } }; +class ThreadNotifier { + std::condition_variable cv; + std::mutex mlock; + mutex_ul_t ul; + bool ready; + void *data; + public: + ThreadNotifier(): ul(mlock), ready(false) {} + void *wait() { + cv.wait(ul, [this]{ return ready; }); + return data; + } + void notify(void *_data) { + { + mutex_lg_t _(mlock); + ready = true; + data = _data; + } + cv.notify_all(); + } +}; + +class ThreadCall { + int ctl_fd[2]; + EventContext ec; + Event ev_listen; + + public: + class Handle { + std::function callback; + ThreadNotifier* notifier; + void *result; + friend ThreadCall; + public: + Handle(): notifier(nullptr), result(nullptr) {} + void exec() { + callback(*this); + if (notifier) notifier->notify(result); + } + void set_result(void *data) { result = data; } + }; + + ThreadCall() = default; + ThreadCall(const ThreadCall &) = delete; + ThreadCall(ThreadCall &&) = delete; + ThreadCall(EventContext ec): ec(ec) { + if (pipe2(ctl_fd, O_NONBLOCK)) + throw SalticidaeError(std::string("ThreadCall: failed to create pipe")); + ev_listen = Event(ec, ctl_fd[0], Event::READ, [this](int fd, int) { + Handle *h; + read(fd, &h, sizeof(h)); + h->exec(); + delete h; + }); + ev_listen.add(); + } + + ~ThreadCall() { + close(ctl_fd[0]); + close(ctl_fd[1]); + } + + template + void *call(Func callback, bool blocking = false) { + auto h = new Handle(); + h->callback = callback; + if (blocking) + { + ThreadNotifier notifier; + h->notifier = ¬ifier; + std::atomic_thread_fence(std::memory_order_release); + write(ctl_fd[1], &h, sizeof(h)); + return notifier.wait(); + } + else + { + std::atomic_thread_fence(std::memory_order_release); + write(ctl_fd[1], &h, sizeof(h)); + return nullptr; + } + } +}; + + template class MPSCQueueEventDriven: public MPSCQueue { private: diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 18406ea..e5165bf 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -206,7 +206,6 @@ class ClientNetwork: public MsgNetwork { private: std::unordered_map addr2conn; - std::mutex cn_mlock; public: class Conn: public MsgNet::Conn { @@ -298,7 +297,6 @@ class PeerNetwork: public MsgNetwork { bool ping_timer_ok; bool pong_msg_ok; bool connected; - std::mutex mlock; Peer() = delete; Peer(NetAddr addr, conn_t conn, const EventContext &ec): @@ -321,7 +319,6 @@ class PeerNetwork: public MsgNetwork { }; std::unordered_map > id2peer; - std::mutex pn_mlock; const IdentityMode id_mode; double retry_conn_delay; @@ -355,30 +352,11 @@ class PeerNetwork: public MsgNetwork { } }; - struct PingCmd: public ConnPool::DispatchCmd { - conn_t conn; - uint16_t port; - PingCmd(const conn_t &conn, uint16_t port): - conn(conn), port(port) {} - void exec(ConnPool *cpool) override { - auto pn = static_cast(cpool); - pn->_ping_msg_cb(conn, port); - } - }; - - struct PongCmd: public PingCmd { - using PingCmd::PingCmd; - void exec(ConnPool *cpool) override { - auto pn = static_cast(cpool); - pn->_pong_msg_cb(this->conn, this->port); - } - }; - void msg_ping(MsgPing &&msg, Conn &conn); void msg_pong(MsgPong &&msg, Conn &conn); void _ping_msg_cb(const conn_t &conn, uint16_t port); void _pong_msg_cb(const conn_t &conn, uint16_t port); - bool check_new_conn(Conn &conn, uint16_t port); + bool check_new_conn(const conn_t &conn, uint16_t port); void start_active_conn(const NetAddr &paddr); protected: @@ -402,12 +380,14 @@ class PeerNetwork: public MsgNetwork { id_mode(id_mode), retry_conn_delay(retry_conn_delay), ping_period(ping_period), - conn_timeout(conn_timeout) {} + conn_timeout(conn_timeout) { + this->reg_handler(generic_bind(&PeerNetwork::msg_ping, this, _1, _2)); + this->reg_handler(generic_bind(&PeerNetwork::msg_pong, this, _1, _2)); + } void add_peer(const NetAddr &paddr); const conn_t get_peer_conn(const NetAddr &paddr) const; - template - void _send_msg(const MsgType &msg, const Peer *peer); + using MsgNet::send_msg; template void send_msg(const MsgType &msg, const NetAddr &paddr); void listen(NetAddr listen_addr); @@ -471,7 +451,7 @@ void PeerNetwork::Conn::on_setup() { MsgNet::Conn::on_setup(); auto pn = get_net(); assert(!ev_timeout); - ev_timeout = Event(pn->ec, -1, 0, [this](int, int) { + ev_timeout = Event(pn->dispatcher_ec, -1, 0, [this](int, int) { SALTICIDAE_LOG_INFO("peer ping-pong timeout"); this->terminate(); }); @@ -483,18 +463,16 @@ void PeerNetwork::Conn::on_setup() { /* the initial ping-pong to set up the connection */ auto &conn = static_cast(*this); reset_timeout(pn->conn_timeout); - pn->MsgNet::send_msg(MsgPing(pn->listen_port), conn); + pn->send_msg(MsgPing(pn->listen_port), conn); } template void PeerNetwork::Conn::on_teardown() { MsgNet::Conn::on_teardown(); auto pn = get_net(); - mutex_lg_t _pn_lg(pn->pn_mlock); auto it = pn->id2peer.find(peer_id); if (it == pn->id2peer.end()) return; auto p = it->second.get(); - mutex_lg_t _p_lg(p->mlock); if (this != p->conn.get()) return; p->ev_ping_timer.del(); p->connected = false; @@ -505,7 +483,6 @@ void PeerNetwork::Conn::on_teardown() { // try to reconnect p->ev_retry_timer = Event(pn->dispatcher_ec, -1, 0, [pn, peer_id = this->peer_id](int, int) { - mutex_lg_t _pn_lg(pn->pn_mlock); pn->start_active_conn(peer_id); }); p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout()); @@ -550,12 +527,11 @@ void PeerNetwork::Peer::send_ping() { ping_timer_ok = false; pong_msg_ok = false; conn->reset_timeout(pn->conn_timeout); - pn->_send_msg(MsgPing(pn->listen_port), this); + pn->send_msg(MsgPing(pn->listen_port), *conn); } template void PeerNetwork::Peer::ping_timer(int, int) { - mutex_lg_t _p_lg(mlock); ping_timer_ok = true; if (pong_msg_ok) { @@ -565,149 +541,132 @@ void PeerNetwork::Peer::ping_timer(int, int) { } template -bool PeerNetwork::check_new_conn(Conn &conn, uint16_t port) { - if (conn.peer_id.is_null()) +bool PeerNetwork::check_new_conn(const conn_t &conn, uint16_t port) { + if (conn->peer_id.is_null()) { /* passive connections can eventually have ids after getting the port number in IP_BASED_PORT mode */ assert(id_mode == IP_PORT_BASED); - conn.peer_id.ip = conn.get_addr().ip; - conn.peer_id.port = port; + conn->peer_id.ip = conn->get_addr().ip; + conn->peer_id.port = port; } - auto p = id2peer.find(conn.peer_id)->second.get(); - mutex_lg_t _p_lg(p->mlock); + auto p = id2peer.find(conn->peer_id)->second.get(); if (p->connected) { - if (conn.self() != p->conn) + if (conn != p->conn) { - conn.terminate(); + conn->terminate(); return true; } return false; } - p->reset_conn(static_pointer_cast(conn.self())); + p->reset_conn(conn); p->connected = true; p->reset_ping_timer(); p->send_ping(); if (p->connected) SALTICIDAE_LOG_INFO("PeerNetwork: established connection with %s via %s", - std::string(conn.peer_id).c_str(), std::string(conn).c_str()); + std::string(conn->peer_id).c_str(), std::string(*conn).c_str()); return false; } -template -void PeerNetwork::_ping_msg_cb(const conn_t &conn, uint16_t port) { - mutex_lg_t _pn_lg(pn_mlock); - if (check_new_conn(*conn, port)) return; - auto p = id2peer.find(conn->peer_id)->second.get(); - mutex_lg_t _p_lg(p->mlock); - _send_msg(MsgPong(this->listen_port), p); -} - -template -void PeerNetwork::_pong_msg_cb(const conn_t &conn, uint16_t port) { - mutex_lg_t _pn_lg(pn_mlock); - auto it = id2peer.find(conn->peer_id); - if (it == id2peer.end()) - { - SALTICIDAE_LOG_WARN("pong message discarded"); - return; - } - if (check_new_conn(*conn, port)) return; - auto p = it->second.get(); - mutex_lg_t _p_lg(p->mlock); - p->pong_msg_ok = true; - if (p->ping_timer_ok) - { - p->reset_ping_timer(); - p->send_ping(); - } -} - -/* end: functions invoked by the dispatcher */ - -/* this function could be both invoked by the dispatcher and the user loop */ template void PeerNetwork::start_active_conn(const NetAddr &addr) { auto p = id2peer.find(addr)->second.get(); - mutex_lg_t _p_lg(p->mlock); if (p->connected) return; - auto conn = static_pointer_cast(MsgNet::connect(addr)); + auto conn = static_pointer_cast(MsgNet::_connect(addr)); assert(p->conn == nullptr); p->conn = conn; } +/* end: functions invoked by the dispatcher */ /* begin: functions invoked by the user loop */ template -void PeerNetwork::msg_ping(MsgPing &&msg, Conn &conn) { +void PeerNetwork::msg_ping(MsgPing &&msg, Conn &_conn) { + auto conn = static_pointer_cast(_conn.self()); + if (!conn) return; uint16_t port = msg.port; - SALTICIDAE_LOG_INFO("ping from %s, port %u", std::string(conn).c_str(), ntohs(port)); - auto dcmd = new PingCmd(static_pointer_cast(conn.self()), port); - write(this->dlisten_fd[1], &dcmd, sizeof(dcmd)); + this->disp_tcall->call([this, conn, port](ThreadCall::Handle &msg) { + SALTICIDAE_LOG_INFO("ping from %s, port %u", + std::string(*conn).c_str(), ntohs(port)); + if (check_new_conn(conn, port)) return; + auto p = id2peer.find(conn->peer_id)->second.get(); + send_msg(MsgPong(this->listen_port), *conn); + }); } template -void PeerNetwork::msg_pong(MsgPong &&msg, Conn &conn) { - auto dcmd = new PongCmd(static_pointer_cast(conn.self()), msg.port); - write(this->dlisten_fd[1], &dcmd, sizeof(dcmd)); +void PeerNetwork::msg_pong(MsgPong &&msg, Conn &_conn) { + auto conn = static_pointer_cast(_conn.self()); + if (!conn) return; + uint16_t port = msg.port; + this->disp_tcall->call([this, conn, port](ThreadCall::Handle &msg) { + auto it = id2peer.find(conn->peer_id); + if (it == id2peer.end()) + { + SALTICIDAE_LOG_WARN("pong message discarded"); + return; + } + if (check_new_conn(conn, port)) return; + auto p = it->second.get(); + p->pong_msg_ok = true; + if (p->ping_timer_ok) + { + p->reset_ping_timer(); + p->send_ping(); + } + }); } template void PeerNetwork::listen(NetAddr listen_addr) { - MsgNet::listen(listen_addr); - listen_port = listen_addr.port; - this->reg_handler(generic_bind(&PeerNetwork::msg_ping, this, _1, _2)); - this->reg_handler(generic_bind(&PeerNetwork::msg_pong, this, _1, _2)); + this->disp_tcall->call([this, listen_addr](ThreadCall::Handle &msg) { + MsgNet::_listen(listen_addr); + listen_port = listen_addr.port; + }, true); } template void PeerNetwork::add_peer(const NetAddr &addr) { - mutex_lg_t _pn_lg(pn_mlock); - auto it = id2peer.find(addr); - if (it != id2peer.end()) - throw PeerNetworkError("peer already exists"); - id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->dispatcher_ec))); - start_active_conn(addr); + this->disp_tcall->call([this, addr](ThreadCall::Handle &) { + auto it = id2peer.find(addr); + if (it != id2peer.end()) + throw PeerNetworkError("peer already exists"); + id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->dispatcher_ec))); + start_active_conn(addr); + }, true); } template const typename PeerNetwork::conn_t PeerNetwork::get_peer_conn(const NetAddr &paddr) const { - mutex_lg_t _pn_lg(pn_mlock); - auto it = id2peer.find(paddr); - if (it == id2peer.end()) - throw PeerNetworkError("peer does not exist"); - return it->second->conn; + auto ret = static_cast(this->disp_tcall->call( + [this, paddr](ThreadCall::Handle &h) { + auto it = id2peer.find(paddr); + if (it == id2peer.end()) + throw PeerNetworkError("peer does not exist"); + auto ptr = new conn_t(it->second->conn); + h.set_result(ptr); + })); + auto conn = *ret; + delete ret; + return std::move(conn); } template bool PeerNetwork::has_peer(const NetAddr &paddr) const { - mutex_lg_t _pn_lg(pn_mlock); - return id2peer.count(paddr); -} - -template -template -void PeerNetwork::_send_msg(const MsgType &msg, const Peer *peer) { - if (peer->connected) - MsgNet::send_msg(msg, *(peer->conn)); - else - SALTICIDAE_LOG_DEBUG("dropped"); + auto ret = static_cast(this->disp_tcall->call( + [this, paddr](ThreadCall::Handle &h) { + h.set_result(id2peer.count(paddr)); + })); + auto has = *ret; + delete ret; + return has; } template template void PeerNetwork::send_msg(const MsgType &msg, const NetAddr &addr) { - mutex_lg_t _pn_lg(pn_mlock); - auto it = id2peer.find(addr); - if (it == id2peer.end()) - { - SALTICIDAE_LOG_ERROR("sending to non-existing peer: %s", - std::string(addr).c_str()); - throw PeerNetworkError("peer does not exist"); - } - auto p = it->second.get(); - mutex_lg_t _p_lg(p->mlock); - _send_msg(msg, p); + send_msg(msg, *get_peer_conn(addr)); } /* end: functions invoked by the user loop */ @@ -716,7 +675,6 @@ void ClientNetwork::Conn::on_setup() { MsgNet::Conn::on_setup(); assert(this->get_mode() == Conn::PASSIVE); const auto &addr = this->get_addr(); - mutex_lg_t _cn_lg(cn_mlock); auto cn = get_net(); cn->addr2conn.erase(addr); cn->addr2conn.insert( @@ -728,17 +686,22 @@ template void ClientNetwork::Conn::on_teardown() { MsgNet::Conn::on_teardown(); assert(this->get_mode() == Conn::PASSIVE); - mutex_lg_t _cn_lg(cn_mlock); get_net()->addr2conn.erase(this->get_addr()); } template template void ClientNetwork::send_msg(const MsgType &msg, const NetAddr &addr) { - mutex_lg_t _cn_lg(cn_mlock); - auto it = addr2conn.find(addr); - if (it == addr2conn.end()) return; - MsgNet::send_msg(msg, *(it->second)); + auto ret = static_cast(this->disp_tcall->call( + [this, addr](ThreadCall::Handle &h) { + auto it = addr2conn.find(addr); + if (it == addr2conn.end()) + throw PeerNetworkError("client does not exist"); + auto ptr = new conn_t(it->second->conn); + h.set_result(ptr); + })); + send_msg(msg, **ret); + delete ret; } template diff --git a/include/salticidae/queue.h b/include/salticidae/queue.h index 9045e2b..0b493ea 100644 --- a/include/salticidae/queue.h +++ b/include/salticidae/queue.h @@ -105,7 +105,7 @@ class MPMCQueue { void _enqueue(Block *nblk, U &&e) { new (&(nblk->elem)) T(std::forward(e)); nblk->next.store(nullptr, std::memory_order_release); - auto prev = tail.exchange(nblk, std::memory_order_relaxed); + auto prev = tail.exchange(nblk, std::memory_order_acq_rel); prev->next.store(nblk, std::memory_order_relaxed); } diff --git a/src/conn.cpp b/src/conn.cpp index f2922d8..6b2e3aa 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -116,12 +116,20 @@ void ConnPool::Conn::recv_data(int fd, int events) { on_read(); } -void ConnPool::Conn::terminate() { +void ConnPool::Conn::stop() { ev_read.clear(); ev_write.clear(); - cpool->post_terminate(fd); } +void ConnPool::Conn::terminate() { + stop(); + cpool->disp_tcall->call( + [cpool=this->cpool, fd=this->fd](ThreadCall::Handle &) { + cpool->terminate(fd); + }); +} + + void ConnPool::accept_client(int fd, int) { int client_fd; struct sockaddr client_addr; @@ -171,8 +179,7 @@ void ConnPool::Conn::conn_server(int fd, int events) { } } -void ConnPool::listen(NetAddr listen_addr) { - std::lock_guard _(cp_mlock); +void ConnPool::_listen(NetAddr listen_addr) { int one = 1; if (listen_fd != -1) { /* reset the previous listen() */ @@ -197,8 +204,9 @@ void ConnPool::listen(NetAddr listen_addr) { throw ConnPoolError(std::string("binding error")); if (::listen(listen_fd, max_listen_backlog) < 0) throw ConnPoolError(std::string("listen error")); - auto dcmd = new DspAcceptListen(listen_fd); - write(dlisten_fd[1], &dcmd, sizeof(dcmd)); + ev_listen = Event(dispatcher_ec, listen_fd, Event::READ, + std::bind(&ConnPool::accept_client, this, _1, _2)); + ev_listen.add(); SALTICIDAE_LOG_INFO("listening to %u", ntohs(listen_addr.port)); } @@ -234,16 +242,16 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) { } else { - auto dcmd = new DspConnectListen(conn); - write(dlisten_fd[1], &dcmd, sizeof(dcmd)); + conn->ev_connect = Event(dispatcher_ec, conn->fd, Event::WRITE, + std::bind(&Conn::conn_server, conn.get(), _1, _2)); + conn->ev_connect.add_with_timeout(conn_server_timeout); add_conn(conn); SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str()); } return conn; } -void ConnPool::_post_terminate(int fd) { - std::lock_guard _(cp_mlock); +void ConnPool::terminate(int fd) { auto it = pool.find(fd); if (it != pool.end()) { @@ -259,23 +267,8 @@ void ConnPool::_post_terminate(int fd) { } ConnPool::conn_t ConnPool::add_conn(const conn_t &conn) { - std::lock_guard _(cp_mlock); assert(pool.find(conn->fd) == pool.end()); return pool.insert(std::make_pair(conn->fd, conn)).first->second; } -void ConnPool::_accept_listen(int listen_fd) { - std::lock_guard _(cp_mlock); - ev_listen = Event(dispatcher_ec, listen_fd, Event::READ, - std::bind(&ConnPool::accept_client, this, _1, _2)); - ev_listen.add(); -} - -void ConnPool::_connect_listen(const conn_t &conn) { - std::lock_guard _(cp_mlock); - conn->ev_connect = Event(dispatcher_ec, conn->fd, Event::WRITE, - std::bind(&Conn::conn_server, conn.get(), _1, _2)); - conn->ev_connect.add_with_timeout(conn_server_timeout); -} - } diff --git a/test/test_network.cpp b/test/test_network.cpp index 061b520..d93d0ff 100644 --- a/test/test_network.cpp +++ b/test/test_network.cpp @@ -141,6 +141,9 @@ int main() { alice.reg_handler(on_receive_ack); bob.reg_handler(on_receive_ack); + alice.start(); + bob.start(); + alice.listen(alice_addr); bob.listen(bob_addr); diff --git a/test/test_p2p.cpp b/test/test_p2p.cpp index 4146fd9..f52f48f 100644 --- a/test/test_p2p.cpp +++ b/test/test_p2p.cpp @@ -139,6 +139,9 @@ int main() { alice.reg_handler(on_receive_ack); bob.reg_handler(on_receive_ack); + alice.start(); + bob.start(); + alice.listen(alice_addr); bob.listen(bob_addr); -- cgit v1.2.3-70-g09d2