From 7387f7f6b615717dd863bcb62ee7f65ace29879b Mon Sep 17 00:00:00 2001 From: Determinant Date: Mon, 12 Nov 2018 15:52:38 -0500 Subject: update PeerNetwork to work with multiloops --- include/salticidae/conn.h | 75 ++++++++++++++++++----------------------------- 1 file changed, 28 insertions(+), 47 deletions(-) (limited to 'include/salticidae/conn.h') diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 26d19fe..1364d4d 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -61,7 +61,7 @@ class ConnPool { /** The handle to a bi-directional connection. */ using conn_t = ArcObj; /** The type of callback invoked when connection status is changed. */ - using conn_callback_t = std::function; + using conn_callback_t = std::function; /** Abstraction for a bi-directional connection. */ class Conn { @@ -72,7 +72,7 @@ class ConnPool { PASSIVE, /**< the connection is established by accept() */ }; - private: + protected: size_t seg_buff_size; conn_t self_ref; int fd; @@ -109,14 +109,10 @@ class ConnPool { /** Get the handle to itself. */ conn_t self() { return self_ref; } operator std::string() const; - int get_fd() const { return fd; } const NetAddr &get_addr() const { return addr; } ConnMode get_mode() const { return mode; } ConnPool *get_pool() const { return cpool; } - SegBuffer &get_recv_buffer() { return recv_buffer; } MPSCWriteBuffer &get_send_buffer() { return send_buffer; } - /** Set the buffer size used for send/receive data. */ - void set_seg_buff_size(size_t size) { seg_buff_size = size; } /** Write data to the connection (non-blocking). The data will be sent * whenever I/O is available. */ @@ -124,11 +120,6 @@ class ConnPool { send_buffer.push(std::move(data)); } - ///** Move the send buffer from the other (old) connection. */ - //void move_send_buffer(conn_t other) { - // send_buffer = std::move(other->send_buffer); - //} - protected: /** Close the IO and clear all on-going or planned events. */ virtual void on_close() { @@ -143,13 +134,9 @@ class ConnPool { /** Called when new data is available. */ virtual void on_read() {} /** Called when the underlying connection is established. */ - virtual void on_setup() { - cpool->update_conn(self()); - } + virtual void on_setup() {} /** Called when the underlying connection breaks. */ - virtual void on_teardown() { - cpool->update_conn(self()); - } + virtual void on_teardown() {} }; private: @@ -165,14 +152,13 @@ class ConnPool { /* owned by the dispatcher */ std::unordered_map pool; int listen_fd; /**< for accepting new network connections */ - int dlisten_fd[2]; /**< for control command sent to the dispatcher */ Event ev_listen; Event ev_dlisten; std::mutex cp_mlock; - void update_conn(const conn_t &conn) { - auto ptr = new conn_t(conn); - write(mlisten_fd[1], &ptr, sizeof(ptr)); + void update_conn(const conn_t &conn, bool connected) { + auto dcmd = new UserConn(conn, connected); + write(mlisten_fd[1], &dcmd, sizeof(dcmd)); } struct Worker; @@ -269,31 +255,22 @@ class ConnPool { void accept_client(evutil_socket_t, short); conn_t add_conn(const conn_t &conn); conn_t _connect(const NetAddr &addr); - void _listen(NetAddr listen_addr); void _post_terminate(int fd); + protected: class DispatchCmd { public: virtual ~DispatchCmd() = default; virtual void exec(ConnPool *cpool) = 0; }; - // TODO: the following two are untested - class DspListen: public DispatchCmd { - const NetAddr addr; - public: - DspListen(const NetAddr &addr): addr(addr) {} - void exec(ConnPool *cpool) override { - cpool->_listen(addr); - } - }; - + private: class DspConnect: public DispatchCmd { const NetAddr addr; public: DspConnect(const NetAddr &addr): addr(addr) {} void exec(ConnPool *cpool) override { - cpool->update_conn(cpool->_connect(addr)); + cpool->update_conn(cpool->_connect(addr), true); } }; @@ -318,6 +295,18 @@ class ConnPool { } }; + class UserConn: public DispatchCmd { + conn_t conn; + bool connected; + public: + UserConn(const conn_t &conn, bool connected): + conn(conn), connected(connected) {} + void exec(ConnPool *cpool) override { + if (cpool->conn_cb) + cpool->conn_cb(*conn, connected); + } + }; + void post_terminate(int fd) { auto dcmd = new DspPostTerm(fd); write(dlisten_fd[1], &dcmd, sizeof(dcmd)); @@ -330,6 +319,7 @@ class ConnPool { protected: EventContext ec; EventContext dispatcher_ec; + int dlisten_fd[2]; /**< for control command sent to the dispatcher */ std::mutex dsp_ec_mlock; /** Should be implemented by derived class to return a new Conn object. */ virtual Conn *create_conn() = 0; @@ -352,11 +342,10 @@ class ConnPool { throw ConnPoolError(std::string("failed to create dispatcher pipe")); ev_mlisten = Event(ec, mlisten_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) { - conn_t *conn_ptr; - read(fd, &conn_ptr, sizeof(conn_ptr)); - if (conn_cb) - conn_cb(**conn_ptr); - delete conn_ptr; + DispatchCmd *dcmd; + read(fd, &dcmd, sizeof(dcmd)); + dcmd->exec(this); + delete dcmd; }); ev_mlisten.add(); @@ -414,15 +403,7 @@ class ConnPool { /** Listen for passive connections (connection initiated from remote). * Does not need to be called if do not want to accept any passive * connections. */ - void listen(NetAddr listen_addr, bool blocking = true) { - if (blocking) - _listen(listen_addr); - else - { - auto dcmd = new DspListen(listen_addr); - write(dlisten_fd[1], &dcmd, sizeof(dcmd)); - } - } + void listen(NetAddr listen_addr); template void reg_conn_handler(Func cb) { conn_cb = cb; } -- cgit v1.2.3