diff options
Diffstat (limited to 'include')
-rw-r--r-- | include/salticidae/conn.h | 267 | ||||
-rw-r--r-- | include/salticidae/event.h | 86 | ||||
-rw-r--r-- | include/salticidae/network.h | 217 | ||||
-rw-r--r-- | include/salticidae/queue.h | 2 |
4 files changed, 258 insertions, 314 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index a86a4d2..2ef6b50 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -95,6 +95,8 @@ class ConnPool { void send_data(int, int); void conn_server(int, int); + /** Stop the worker and I/O events. */ + void stop(); /** Terminate the connection. */ void terminate(); @@ -146,54 +148,27 @@ class ConnPool { const size_t seg_buff_size; /* owned by user loop */ - int mlisten_fd[2]; /**< for connection events sent to the user loop */ - Event ev_mlisten; + BoxObj<ThreadCall> user_tcall; conn_callback_t conn_cb; /* owned by the dispatcher */ + Event ev_listen; std::unordered_map<int, conn_t> pool; int listen_fd; /**< for accepting new network connections */ - Event ev_listen; - Event ev_dlisten; - std::mutex cp_mlock; void update_conn(const conn_t &conn, bool connected) { - auto dcmd = new UserConn(conn, connected); - write(mlisten_fd[1], &dcmd, sizeof(dcmd)); + user_tcall->call([this, conn, connected](ThreadCall::Handle &) { + if (conn_cb) conn_cb(*conn, connected); + }); } - struct Worker; - class WorkerFeed; - - class WorkerCmd { - public: - virtual ~WorkerCmd() = default; - virtual void exec(Worker *worker) = 0; - }; - class Worker { EventContext ec; - Event ev_ctl; - int ctl_fd[2]; /**< for control messages from dispatcher */ + ThreadCall msgr; std::thread handle; public: - Worker() { - if (pipe2(ctl_fd, O_NONBLOCK)) - throw ConnPoolError(std::string("failed to create worker pipe")); - ev_ctl = Event(ec, ctl_fd[0], Event::READ, [this](int fd, int) { - WorkerCmd *dcmd; - read(fd, &dcmd, sizeof(dcmd)); - dcmd->exec(this); - delete dcmd; - }); - ev_ctl.add(); - } - - ~Worker() { - close(ctl_fd[0]); - close(ctl_fd[1]); - } + Worker(): msgr(ec) {} /* the following functions are called by the dispatcher */ void start() { @@ -201,143 +176,69 @@ class ConnPool { } void feed(const conn_t &conn, int client_fd) { - auto dcmd = new WorkerFeed(conn, client_fd); - write(ctl_fd[1], &dcmd, sizeof(dcmd)); + msgr.call([this, conn, client_fd](ThreadCall::Handle &) { + SALTICIDAE_LOG_INFO("worker %x got %s", + std::this_thread::get_id(), + std::string(*conn).c_str()); + conn->get_send_buffer() + .get_queue() + .reg_handler(this->ec, [conn, client_fd] + (MPSCWriteBuffer::queue_t &) { + if (conn->ready_send && conn->fd != -1) + conn->send_data(client_fd, Event::WRITE); + return false; + }); + //auto conn_ptr = conn.get(); + conn->ev_read = Event(ec, client_fd, Event::READ | Event::WRITE, [conn=conn](int fd, int what) { + if (what & Event::READ) + conn->recv_data(fd, what); + else + conn->send_data(fd, what); + }); + + // std::bind(&Conn::recv_data, conn_ptr, _1, _2)); + //conn->ev_write = Event(ec, client_fd, Event::WRITE, + // std::bind(&Conn::send_data, conn_ptr, _1, _2)); + conn->ev_read.add(); + //conn->ev_write.add(); + }); } void stop() { - auto dcmd = new WorkerStop(); - write(ctl_fd[1], &dcmd, sizeof(dcmd)); + msgr.call([this](ThreadCall::Handle &) { + ec.stop(); + }); } std::thread &get_handle() { return handle; } const EventContext &get_ec() { return ec; } }; - class WorkerFeed: public WorkerCmd { - conn_t conn; - int client_fd; - - public: - WorkerFeed(const conn_t &conn, int client_fd): - conn(conn), client_fd(client_fd) {} - void exec(Worker *worker) override { - SALTICIDAE_LOG_INFO("worker %x got %s", - std::this_thread::get_id(), - std::string(*conn).c_str()); - auto &ec = worker->get_ec(); - conn->get_send_buffer() - .get_queue() - .reg_handler(ec, [conn=this->conn, - client_fd=this->client_fd](MPSCWriteBuffer::queue_t &) { - if (conn->ready_send && conn->fd != -1) - conn->send_data(client_fd, Event::WRITE); - return false; - }); - //auto conn_ptr = conn.get(); - conn->ev_read = Event(ec, client_fd, Event::READ | Event::WRITE, [conn=conn](int fd, int what) { - if (what & Event::READ) - conn->recv_data(fd, what); - else - conn->send_data(fd, what); - }); - - // std::bind(&Conn::recv_data, conn_ptr, _1, _2)); - //conn->ev_write = Event(ec, client_fd, Event::WRITE, - // std::bind(&Conn::send_data, conn_ptr, _1, _2)); - conn->ev_read.add(); - //conn->ev_write.add(); - } - }; - - class WorkerStop: public WorkerCmd { - public: - void exec(Worker *worker) override { worker->get_ec().stop(); } - }; - /* related to workers */ size_t nworker; salticidae::BoxObj<Worker[]> workers; void accept_client(int, int); conn_t add_conn(const conn_t &conn); - conn_t _connect(const NetAddr &addr); - void _post_terminate(int fd); - void _accept_listen(int listen_fd); - void _connect_listen(const conn_t &conn); + void terminate(int fd); protected: - class DispatchCmd { - public: - virtual ~DispatchCmd() = default; - virtual void exec(ConnPool *cpool) = 0; - }; + conn_t _connect(const NetAddr &addr); + void _listen(NetAddr listen_addr); private: - class DspConnect: public DispatchCmd { - const NetAddr addr; - public: - DspConnect(const NetAddr &addr): addr(addr) {} - void exec(ConnPool *cpool) override { - cpool->update_conn(cpool->_connect(addr), true); - } - }; - - class DspPostTerm: public DispatchCmd { - int fd; - public: - DspPostTerm(int fd): fd(fd) {} - void exec(ConnPool *cpool) override { - cpool->_post_terminate(fd); - } - }; - - class DspMulticast: public DispatchCmd { - std::vector<conn_t> receivers; - bytearray_t data; - public: - DspMulticast(std::vector<conn_t> &&receivers, bytearray_t &&data): - receivers(std::move(receivers)), - data(std::move(data)) {} - void exec(ConnPool *) override { - for (auto &r: receivers) r->write(bytearray_t(data)); - } - }; - - class DspAcceptListen: public DispatchCmd { - int listen_fd; - public: - DspAcceptListen(int listen_fd): listen_fd(listen_fd) {} - void exec(ConnPool *cpool) override { - cpool->_accept_listen(listen_fd); - } - }; - class DspConnectListen: public DispatchCmd { - conn_t conn; - public: - DspConnectListen(const conn_t &conn): conn(conn) {} - void exec(ConnPool *cpool) override { - cpool->_connect_listen(conn); - } - }; - - class UserConn: public DispatchCmd { - conn_t conn; - bool connected; - public: - UserConn(const conn_t &conn, bool connected): - conn(conn), connected(connected) {} - void exec(ConnPool *cpool) override { - if (cpool->conn_cb) - cpool->conn_cb(*conn, connected); - } - }; - - void post_terminate(int fd) { - auto dcmd = new DspPostTerm(fd); - write(dlisten_fd[1], &dcmd, sizeof(dcmd)); - } + //class DspMulticast: public DispatchCmd { + // std::vector<conn_t> receivers; + // bytearray_t data; + // public: + // DspMulticast(std::vector<conn_t> &&receivers, bytearray_t &&data): + // receivers(std::move(receivers)), + // data(std::move(data)) {} + // void exec(ConnPool *) override { + // for (auto &r: receivers) r->write(bytearray_t(data)); + // } + //}; Worker &select_worker() { return workers[1]; @@ -346,7 +247,7 @@ class ConnPool { protected: EventContext ec; EventContext dispatcher_ec; - int dlisten_fd[2]; /**< for control command sent to the dispatcher */ + BoxObj<ThreadCall> disp_tcall; /** Should be implemented by derived class to return a new Conn object. */ virtual Conn *create_conn() = 0; @@ -360,35 +261,13 @@ class ConnPool { conn_server_timeout(conn_server_timeout), seg_buff_size(seg_buff_size), listen_fd(-1), - nworker(std::min((size_t)1, nworker)), + nworker(std::max((size_t)1, nworker)), ec(ec) { - if (pipe2(mlisten_fd, O_NONBLOCK)) - throw ConnPoolError(std::string("failed to create main pipe")); - if (pipe2(dlisten_fd, O_NONBLOCK)) - throw ConnPoolError(std::string("failed to create dispatcher pipe")); - - ev_mlisten = Event(ec, mlisten_fd[0], Event::READ, [this](int fd, int) { - DispatchCmd *dcmd; - read(fd, &dcmd, sizeof(dcmd)); - dcmd->exec(this); - delete dcmd; - }); - ev_mlisten.add(); - workers = new Worker[nworker]; dispatcher_ec = workers[0].get_ec(); - ev_dlisten = Event(dispatcher_ec, dlisten_fd[0], Event::READ, [this](int fd, int) { - DispatchCmd *dcmd; - read(fd, &dcmd, sizeof(dcmd)); - dcmd->exec(this); - delete dcmd; - }); - ev_dlisten.add(); - - SALTICIDAE_LOG_INFO("starting all threads..."); - for (size_t i = 0; i < nworker; i++) - workers[i].start(); + user_tcall = new ThreadCall(ec); + disp_tcall = new ThreadCall(dispatcher_ec); } ~ConnPool() { @@ -404,24 +283,36 @@ class ConnPool { conn->on_close(); } if (listen_fd != -1) close(listen_fd); - for (int i = 0; i < 2; i++) - { - close(mlisten_fd[i]); - close(dlisten_fd[i]); - } } ConnPool(const ConnPool &) = delete; ConnPool(ConnPool &&) = delete; + void start() { + SALTICIDAE_LOG_INFO("starting all threads..."); + for (size_t i = 0; i < nworker; i++) + workers[i].start(); + } + /** Actively connect to remote addr. */ conn_t connect(const NetAddr &addr, bool blocking = true) { if (blocking) - return _connect(addr); + { + auto ret = static_cast<conn_t *>(disp_tcall->call( + [this, addr](ThreadCall::Handle &h) { + auto ptr = new conn_t(_connect(addr)); + std::atomic_thread_fence(std::memory_order_release); + h.set_result(ptr); + }, true)); + auto conn = *ret; + delete ret; + return std::move(conn); + } else { - auto dcmd = new DspConnect(addr); - write(dlisten_fd[1], &dcmd, sizeof(dcmd)); + disp_tcall->call([this, addr](ThreadCall::Handle &) { + _connect(addr); + }, false); return nullptr; } } @@ -429,7 +320,11 @@ class ConnPool { /** Listen for passive connections (connection initiated from remote). * Does not need to be called if do not want to accept any passive * connections. */ - void listen(NetAddr listen_addr); + void listen(NetAddr listen_addr) { + disp_tcall->call([this, listen_addr](ThreadCall::Handle &) { + _listen(listen_addr); + }, true); + } template<typename Func> void reg_conn_handler(Func cb) { conn_cb = cb; } diff --git a/include/salticidae/event.h b/include/salticidae/event.h index c21644b..da27902 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -25,10 +25,12 @@ #ifndef _SALTICIDAE_EVENT_H #define _SALTICIDAE_EVENT_H +#include <condition_variable> #include <unistd.h> #include <uv.h> #include <sys/eventfd.h> +#include "salticidae/type.h" #include "salticidae/queue.h" #include "salticidae/util.h" #include "salticidae/ref.h" @@ -177,6 +179,90 @@ class Event { operator bool() const { return ev_fd != nullptr || ev_timer != nullptr; } }; +class ThreadNotifier { + std::condition_variable cv; + std::mutex mlock; + mutex_ul_t ul; + bool ready; + void *data; + public: + ThreadNotifier(): ul(mlock), ready(false) {} + void *wait() { + cv.wait(ul, [this]{ return ready; }); + return data; + } + void notify(void *_data) { + { + mutex_lg_t _(mlock); + ready = true; + data = _data; + } + cv.notify_all(); + } +}; + +class ThreadCall { + int ctl_fd[2]; + EventContext ec; + Event ev_listen; + + public: + class Handle { + std::function<void(Handle &)> callback; + ThreadNotifier* notifier; + void *result; + friend ThreadCall; + public: + Handle(): notifier(nullptr), result(nullptr) {} + void exec() { + callback(*this); + if (notifier) notifier->notify(result); + } + void set_result(void *data) { result = data; } + }; + + ThreadCall() = default; + ThreadCall(const ThreadCall &) = delete; + ThreadCall(ThreadCall &&) = delete; + ThreadCall(EventContext ec): ec(ec) { + if (pipe2(ctl_fd, O_NONBLOCK)) + throw SalticidaeError(std::string("ThreadCall: failed to create pipe")); + ev_listen = Event(ec, ctl_fd[0], Event::READ, [this](int fd, int) { + Handle *h; + read(fd, &h, sizeof(h)); + h->exec(); + delete h; + }); + ev_listen.add(); + } + + ~ThreadCall() { + close(ctl_fd[0]); + close(ctl_fd[1]); + } + + template<typename Func> + void *call(Func callback, bool blocking = false) { + auto h = new Handle(); + h->callback = callback; + if (blocking) + { + ThreadNotifier notifier; + h->notifier = ¬ifier; + std::atomic_thread_fence(std::memory_order_release); + write(ctl_fd[1], &h, sizeof(h)); + return notifier.wait(); + } + else + { + std::atomic_thread_fence(std::memory_order_release); + write(ctl_fd[1], &h, sizeof(h)); + return nullptr; + } + } +}; + + template<typename T> class MPSCQueueEventDriven: public MPSCQueue<T> { private: diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 18406ea..e5165bf 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -206,7 +206,6 @@ class ClientNetwork: public MsgNetwork<OpcodeType> { private: std::unordered_map<NetAddr, typename MsgNet::conn_t> addr2conn; - std::mutex cn_mlock; public: class Conn: public MsgNet::Conn { @@ -298,7 +297,6 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { bool ping_timer_ok; bool pong_msg_ok; bool connected; - std::mutex mlock; Peer() = delete; Peer(NetAddr addr, conn_t conn, const EventContext &ec): @@ -321,7 +319,6 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { }; std::unordered_map <NetAddr, BoxObj<Peer>> id2peer; - std::mutex pn_mlock; const IdentityMode id_mode; double retry_conn_delay; @@ -355,30 +352,11 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { } }; - struct PingCmd: public ConnPool::DispatchCmd { - conn_t conn; - uint16_t port; - PingCmd(const conn_t &conn, uint16_t port): - conn(conn), port(port) {} - void exec(ConnPool *cpool) override { - auto pn = static_cast<PeerNetwork *>(cpool); - pn->_ping_msg_cb(conn, port); - } - }; - - struct PongCmd: public PingCmd { - using PingCmd::PingCmd; - void exec(ConnPool *cpool) override { - auto pn = static_cast<PeerNetwork *>(cpool); - pn->_pong_msg_cb(this->conn, this->port); - } - }; - void msg_ping(MsgPing &&msg, Conn &conn); void msg_pong(MsgPong &&msg, Conn &conn); void _ping_msg_cb(const conn_t &conn, uint16_t port); void _pong_msg_cb(const conn_t &conn, uint16_t port); - bool check_new_conn(Conn &conn, uint16_t port); + bool check_new_conn(const conn_t &conn, uint16_t port); void start_active_conn(const NetAddr &paddr); protected: @@ -402,12 +380,14 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { id_mode(id_mode), retry_conn_delay(retry_conn_delay), ping_period(ping_period), - conn_timeout(conn_timeout) {} + conn_timeout(conn_timeout) { + this->reg_handler(generic_bind(&PeerNetwork::msg_ping, this, _1, _2)); + this->reg_handler(generic_bind(&PeerNetwork::msg_pong, this, _1, _2)); + } void add_peer(const NetAddr &paddr); const conn_t get_peer_conn(const NetAddr &paddr) const; - template<typename MsgType> - void _send_msg(const MsgType &msg, const Peer *peer); + using MsgNet::send_msg; template<typename MsgType> void send_msg(const MsgType &msg, const NetAddr &paddr); void listen(NetAddr listen_addr); @@ -471,7 +451,7 @@ void PeerNetwork<O, _, __>::Conn::on_setup() { MsgNet::Conn::on_setup(); auto pn = get_net(); assert(!ev_timeout); - ev_timeout = Event(pn->ec, -1, 0, [this](int, int) { + ev_timeout = Event(pn->dispatcher_ec, -1, 0, [this](int, int) { SALTICIDAE_LOG_INFO("peer ping-pong timeout"); this->terminate(); }); @@ -483,18 +463,16 @@ void PeerNetwork<O, _, __>::Conn::on_setup() { /* the initial ping-pong to set up the connection */ auto &conn = static_cast<Conn &>(*this); reset_timeout(pn->conn_timeout); - pn->MsgNet::send_msg(MsgPing(pn->listen_port), conn); + pn->send_msg(MsgPing(pn->listen_port), conn); } template<typename O, O _, O __> void PeerNetwork<O, _, __>::Conn::on_teardown() { MsgNet::Conn::on_teardown(); auto pn = get_net(); - mutex_lg_t _pn_lg(pn->pn_mlock); auto it = pn->id2peer.find(peer_id); if (it == pn->id2peer.end()) return; auto p = it->second.get(); - mutex_lg_t _p_lg(p->mlock); if (this != p->conn.get()) return; p->ev_ping_timer.del(); p->connected = false; @@ -505,7 +483,6 @@ void PeerNetwork<O, _, __>::Conn::on_teardown() { // try to reconnect p->ev_retry_timer = Event(pn->dispatcher_ec, -1, 0, [pn, peer_id = this->peer_id](int, int) { - mutex_lg_t _pn_lg(pn->pn_mlock); pn->start_active_conn(peer_id); }); p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout()); @@ -550,12 +527,11 @@ void PeerNetwork<O, _, __>::Peer::send_ping() { ping_timer_ok = false; pong_msg_ok = false; conn->reset_timeout(pn->conn_timeout); - pn->_send_msg(MsgPing(pn->listen_port), this); + pn->send_msg(MsgPing(pn->listen_port), *conn); } template<typename O, O _, O __> void PeerNetwork<O, _, __>::Peer::ping_timer(int, int) { - mutex_lg_t _p_lg(mlock); ping_timer_ok = true; if (pong_msg_ok) { @@ -565,149 +541,132 @@ void PeerNetwork<O, _, __>::Peer::ping_timer(int, int) { } template<typename O, O _, O __> -bool PeerNetwork<O, _, __>::check_new_conn(Conn &conn, uint16_t port) { - if (conn.peer_id.is_null()) +bool PeerNetwork<O, _, __>::check_new_conn(const conn_t &conn, uint16_t port) { + if (conn->peer_id.is_null()) { /* passive connections can eventually have ids after getting the port number in IP_BASED_PORT mode */ assert(id_mode == IP_PORT_BASED); - conn.peer_id.ip = conn.get_addr().ip; - conn.peer_id.port = port; + conn->peer_id.ip = conn->get_addr().ip; + conn->peer_id.port = port; } - auto p = id2peer.find(conn.peer_id)->second.get(); - mutex_lg_t _p_lg(p->mlock); + auto p = id2peer.find(conn->peer_id)->second.get(); if (p->connected) { - if (conn.self() != p->conn) + if (conn != p->conn) { - conn.terminate(); + conn->terminate(); return true; } return false; } - p->reset_conn(static_pointer_cast<Conn>(conn.self())); + p->reset_conn(conn); p->connected = true; p->reset_ping_timer(); p->send_ping(); if (p->connected) SALTICIDAE_LOG_INFO("PeerNetwork: established connection with %s via %s", - std::string(conn.peer_id).c_str(), std::string(conn).c_str()); + std::string(conn->peer_id).c_str(), std::string(*conn).c_str()); return false; } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::_ping_msg_cb(const conn_t &conn, uint16_t port) { - mutex_lg_t _pn_lg(pn_mlock); - if (check_new_conn(*conn, port)) return; - auto p = id2peer.find(conn->peer_id)->second.get(); - mutex_lg_t _p_lg(p->mlock); - _send_msg(MsgPong(this->listen_port), p); -} - -template<typename O, O _, O __> -void PeerNetwork<O, _, __>::_pong_msg_cb(const conn_t &conn, uint16_t port) { - mutex_lg_t _pn_lg(pn_mlock); - auto it = id2peer.find(conn->peer_id); - if (it == id2peer.end()) - { - SALTICIDAE_LOG_WARN("pong message discarded"); - return; - } - if (check_new_conn(*conn, port)) return; - auto p = it->second.get(); - mutex_lg_t _p_lg(p->mlock); - p->pong_msg_ok = true; - if (p->ping_timer_ok) - { - p->reset_ping_timer(); - p->send_ping(); - } -} - -/* end: functions invoked by the dispatcher */ - -/* this function could be both invoked by the dispatcher and the user loop */ -template<typename O, O _, O __> void PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) { auto p = id2peer.find(addr)->second.get(); - mutex_lg_t _p_lg(p->mlock); if (p->connected) return; - auto conn = static_pointer_cast<Conn>(MsgNet::connect(addr)); + auto conn = static_pointer_cast<Conn>(MsgNet::_connect(addr)); assert(p->conn == nullptr); p->conn = conn; } +/* end: functions invoked by the dispatcher */ /* begin: functions invoked by the user loop */ template<typename O, O _, O __> -void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, Conn &conn) { +void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, Conn &_conn) { + auto conn = static_pointer_cast<Conn>(_conn.self()); + if (!conn) return; uint16_t port = msg.port; - SALTICIDAE_LOG_INFO("ping from %s, port %u", std::string(conn).c_str(), ntohs(port)); - auto dcmd = new PingCmd(static_pointer_cast<Conn>(conn.self()), port); - write(this->dlisten_fd[1], &dcmd, sizeof(dcmd)); + this->disp_tcall->call([this, conn, port](ThreadCall::Handle &msg) { + SALTICIDAE_LOG_INFO("ping from %s, port %u", + std::string(*conn).c_str(), ntohs(port)); + if (check_new_conn(conn, port)) return; + auto p = id2peer.find(conn->peer_id)->second.get(); + send_msg(MsgPong(this->listen_port), *conn); + }); } template<typename O, O _, O __> -void PeerNetwork<O, _, __>::msg_pong(MsgPong &&msg, Conn &conn) { - auto dcmd = new PongCmd(static_pointer_cast<Conn>(conn.self()), msg.port); - write(this->dlisten_fd[1], &dcmd, sizeof(dcmd)); +void PeerNetwork<O, _, __>::msg_pong(MsgPong &&msg, Conn &_conn) { + auto conn = static_pointer_cast<Conn>(_conn.self()); + if (!conn) return; + uint16_t port = msg.port; + this->disp_tcall->call([this, conn, port](ThreadCall::Handle &msg) { + auto it = id2peer.find(conn->peer_id); + if (it == id2peer.end()) + { + SALTICIDAE_LOG_WARN("pong message discarded"); + return; + } + if (check_new_conn(conn, port)) return; + auto p = it->second.get(); + p->pong_msg_ok = true; + if (p->ping_timer_ok) + { + p->reset_ping_timer(); + p->send_ping(); + } + }); } template<typename O, O _, O __> void PeerNetwork<O, _, __>::listen(NetAddr listen_addr) { - MsgNet::listen(listen_addr); - listen_port = listen_addr.port; - this->reg_handler(generic_bind(&PeerNetwork::msg_ping, this, _1, _2)); - this->reg_handler(generic_bind(&PeerNetwork::msg_pong, this, _1, _2)); + this->disp_tcall->call([this, listen_addr](ThreadCall::Handle &msg) { + MsgNet::_listen(listen_addr); + listen_port = listen_addr.port; + }, true); } template<typename O, O _, O __> void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) { - mutex_lg_t _pn_lg(pn_mlock); - auto it = id2peer.find(addr); - if (it != id2peer.end()) - throw PeerNetworkError("peer already exists"); - id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->dispatcher_ec))); - start_active_conn(addr); + this->disp_tcall->call([this, addr](ThreadCall::Handle &) { + auto it = id2peer.find(addr); + if (it != id2peer.end()) + throw PeerNetworkError("peer already exists"); + id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->dispatcher_ec))); + start_active_conn(addr); + }, true); } template<typename O, O _, O __> const typename PeerNetwork<O, _, __>::conn_t PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &paddr) const { - mutex_lg_t _pn_lg(pn_mlock); - auto it = id2peer.find(paddr); - if (it == id2peer.end()) - throw PeerNetworkError("peer does not exist"); - return it->second->conn; + auto ret = static_cast<conn_t *>(this->disp_tcall->call( + [this, paddr](ThreadCall::Handle &h) { + auto it = id2peer.find(paddr); + if (it == id2peer.end()) + throw PeerNetworkError("peer does not exist"); + auto ptr = new conn_t(it->second->conn); + h.set_result(ptr); + })); + auto conn = *ret; + delete ret; + return std::move(conn); } template<typename O, O _, O __> bool PeerNetwork<O, _, __>::has_peer(const NetAddr &paddr) const { - mutex_lg_t _pn_lg(pn_mlock); - return id2peer.count(paddr); -} - -template<typename O, O _, O __> -template<typename MsgType> -void PeerNetwork<O, _, __>::_send_msg(const MsgType &msg, const Peer *peer) { - if (peer->connected) - MsgNet::send_msg(msg, *(peer->conn)); - else - SALTICIDAE_LOG_DEBUG("dropped"); + auto ret = static_cast<bool *>(this->disp_tcall->call( + [this, paddr](ThreadCall::Handle &h) { + h.set_result(id2peer.count(paddr)); + })); + auto has = *ret; + delete ret; + return has; } template<typename O, O _, O __> template<typename MsgType> void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &addr) { - mutex_lg_t _pn_lg(pn_mlock); - auto it = id2peer.find(addr); - if (it == id2peer.end()) - { - SALTICIDAE_LOG_ERROR("sending to non-existing peer: %s", - std::string(addr).c_str()); - throw PeerNetworkError("peer does not exist"); - } - auto p = it->second.get(); - mutex_lg_t _p_lg(p->mlock); - _send_msg(msg, p); + send_msg(msg, *get_peer_conn(addr)); } /* end: functions invoked by the user loop */ @@ -716,7 +675,6 @@ void ClientNetwork<OpcodeType>::Conn::on_setup() { MsgNet::Conn::on_setup(); assert(this->get_mode() == Conn::PASSIVE); const auto &addr = this->get_addr(); - mutex_lg_t _cn_lg(cn_mlock); auto cn = get_net(); cn->addr2conn.erase(addr); cn->addr2conn.insert( @@ -728,17 +686,22 @@ template<typename OpcodeType> void ClientNetwork<OpcodeType>::Conn::on_teardown() { MsgNet::Conn::on_teardown(); assert(this->get_mode() == Conn::PASSIVE); - mutex_lg_t _cn_lg(cn_mlock); get_net()->addr2conn.erase(this->get_addr()); } template<typename OpcodeType> template<typename MsgType> void ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr) { - mutex_lg_t _cn_lg(cn_mlock); - auto it = addr2conn.find(addr); - if (it == addr2conn.end()) return; - MsgNet::send_msg(msg, *(it->second)); + auto ret = static_cast<conn_t *>(this->disp_tcall->call( + [this, addr](ThreadCall::Handle &h) { + auto it = addr2conn.find(addr); + if (it == addr2conn.end()) + throw PeerNetworkError("client does not exist"); + auto ptr = new conn_t(it->second->conn); + h.set_result(ptr); + })); + send_msg(msg, **ret); + delete ret; } template<typename O, O OPCODE_PING, O _> diff --git a/include/salticidae/queue.h b/include/salticidae/queue.h index 9045e2b..0b493ea 100644 --- a/include/salticidae/queue.h +++ b/include/salticidae/queue.h @@ -105,7 +105,7 @@ class MPMCQueue { void _enqueue(Block *nblk, U &&e) { new (&(nblk->elem)) T(std::forward<U>(e)); nblk->next.store(nullptr, std::memory_order_release); - auto prev = tail.exchange(nblk, std::memory_order_relaxed); + auto prev = tail.exchange(nblk, std::memory_order_acq_rel); prev->next.store(nblk, std::memory_order_relaxed); } |