/** * Copyright (c) 2018 Cornell University. * * Author: Ted Yin * * Permission is hereby granted, free of charge, to any person obtaining a copy of * this software and associated documentation files (the "Software"), to deal in * the Software without restriction, including without limitation the rights to * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies * of the Software, and to permit persons to whom the Software is furnished to do * so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in all * copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ #ifndef _SALTICIDAE_CONN_H #define _SALTICIDAE_CONN_H #include #include #include #include #include #include #include #include #include #include #include "salticidae/type.h" #include "salticidae/ref.h" #include "salticidae/event.h" #include "salticidae/util.h" #include "salticidae/netaddr.h" #include "salticidae/msg.h" namespace salticidae { class SegBuffer { struct buffer_entry_t { bytearray_t data; bytearray_t::iterator offset; buffer_entry_t(bytearray_t &&_data): data(std::move(_data)) { offset = data.begin(); } buffer_entry_t(buffer_entry_t &&other) { size_t _offset = other.offset - other.data.begin(); data = std::move(other.data); offset = data.begin() + _offset; } buffer_entry_t(const buffer_entry_t &other): data(other.data) { offset = data.begin() + (other.offset - other.data.begin()); } size_t length() const { return data.end() - offset; } }; std::list buffer; size_t _size; public: SegBuffer(): _size(0) {} ~SegBuffer() { clear(); } void swap(SegBuffer &other) { std::swap(buffer, other.buffer); std::swap(_size, other._size); } SegBuffer(const SegBuffer &other): buffer(other.buffer), _size(other._size) {} SegBuffer(SegBuffer &&other): buffer(std::move(other.buffer)), _size(other._size) { other._size = 0; } SegBuffer &operator=(SegBuffer &&other) { if (this != &other) { SegBuffer tmp(std::move(other)); tmp.swap(*this); } return *this; } SegBuffer &operator=(const SegBuffer &other) { if (this != &other) { SegBuffer tmp(other); tmp.swap(*this); } return *this; } void push(bytearray_t &&data) { _size += data.size(); buffer.push_back(buffer_entry_t(std::move(data))); } bytearray_t pop(size_t len) { bytearray_t res; auto i = buffer.begin(); while (len && i != buffer.end()) { size_t copy_len = std::min(i->length(), len); res.insert(res.end(), i->offset, i->offset + copy_len); i->offset += copy_len; len -= copy_len; if (i->offset == i->data.end()) i++; } buffer.erase(buffer.begin(), i); _size -= res.size(); return std::move(res); } size_t size() const { return _size; } bool empty() const { return buffer.empty(); } void clear() { buffer.clear(); _size = 0; } }; class ConnPoolError: public SalticidaeError { using SalticidaeError::SalticidaeError; }; /** Abstraction for connection management. */ class ConnPool { public: class Conn; /** The handle to a bi-directional connection. */ using conn_t = RcObj; /** The type of callback invoked when connection status is changed. */ using conn_callback_t = std::function; /** Abstraction for a bi-directional connection. */ class Conn { friend ConnPool; public: enum ConnMode { ACTIVE, /**< the connection is established by connect() */ PASSIVE, /**< the connection is established by accept() */ }; private: size_t seg_buff_size; conn_t self_ref; int fd; ConnPool *cpool; ConnMode mode; NetAddr addr; SegBuffer send_buffer; SegBuffer recv_buffer; Event ev_read; Event ev_write; Event ev_connect; /** does not need to wait if true */ bool ready_send; void recv_data(evutil_socket_t, short); void send_data(evutil_socket_t, short); void conn_server(evutil_socket_t, short); public: Conn(): ready_send(false) {} Conn(const Conn &) = delete; Conn(Conn &&other) = delete; virtual ~Conn() { SALTICIDAE_LOG_INFO("destroyed %s", std::string(*this).c_str()); } /** Get the handle to itself. */ conn_t self() { return self_ref; } operator std::string() const; int get_fd() const { return fd; } const NetAddr &get_addr() const { return addr; } ConnMode get_mode() const { return mode; } ConnPool *get_pool() const { return cpool; } SegBuffer &read() { return recv_buffer; } /** Set the buffer size used for send/receive data. */ void set_seg_buff_size(size_t size) { seg_buff_size = size; } /** Write data to the connection (non-blocking). The data will be sent * whenever I/O is available. */ void write(bytearray_t &&data) { send_buffer.push(std::move(data)); if (ready_send) send_data(fd, EV_WRITE); } /** Move the send buffer from the other (old) connection. */ void move_send_buffer(conn_t other) { send_buffer = std::move(other->send_buffer); } /** Terminate the connection. */ void terminate(); protected: /** Close the IO and clear all on-going or planned events. */ virtual void on_close() { ev_read.clear(); ev_write.clear(); ev_connect.clear(); ::close(fd); fd = -1; self_ref = nullptr; /* remove the self-cycle */ } /** Called when new data is available. */ virtual void on_read() { if (cpool->read_cb) cpool->read_cb(this); } /** Called when the underlying connection is established. */ virtual void on_setup() { if (cpool->conn_cb) cpool->conn_cb(this); } /** Called when the underlying connection breaks. */ virtual void on_teardown() { if (cpool->conn_cb) cpool->conn_cb(this); } }; private: int max_listen_backlog; double conn_server_timeout; size_t seg_buff_size; conn_callback_t read_cb; conn_callback_t conn_cb; std::unordered_map pool; int listen_fd; Event ev_listen; void accept_client(evutil_socket_t, short); conn_t add_conn(conn_t conn); protected: EventContext ec; /** Should be implemented by derived class to return a new Conn object. */ virtual Conn *create_conn() = 0; public: ConnPool(const EventContext &ec, int max_listen_backlog = 10, double conn_server_timeout = 2, size_t seg_buff_size = 4096): max_listen_backlog(max_listen_backlog), conn_server_timeout(conn_server_timeout), seg_buff_size(seg_buff_size), ec(ec) {} ~ConnPool() { for (auto it: pool) { conn_t conn = it.second; conn->on_close(); } } ConnPool(const ConnPool &) = delete; ConnPool(ConnPool &&) = delete; /** Actively connect to remote addr. */ conn_t connect(const NetAddr &addr); /** Listen for passive connections (connection initiated from remote). * Does not need to be called if do not want to accept any passive * connections. */ void listen(NetAddr listen_addr); template void reg_conn_handler(Func cb) { conn_cb = cb; } }; } #endif