aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/conn.h
diff options
context:
space:
mode:
authorDeterminant <ted.sybil@gmail.com>2018-06-26 12:58:54 -0400
committerDeterminant <ted.sybil@gmail.com>2018-06-26 12:58:54 -0400
commit5c3b39340d365f5ff37a79424956591e87b44816 (patch)
tree45fc59c19ed95c44bacbbe59396d8bc1b25872dd /include/salticidae/conn.h
init
Diffstat (limited to 'include/salticidae/conn.h')
-rw-r--r--include/salticidae/conn.h235
1 files changed, 235 insertions, 0 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
new file mode 100644
index 0000000..40facc9
--- /dev/null
+++ b/include/salticidae/conn.h
@@ -0,0 +1,235 @@
+/**
+ * Copyright (c) 2018 Cornell University.
+ *
+ * Author: Ted Yin <tederminant@gmail.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is furnished to do
+ * so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#ifndef _SALTICIDAE_CONN_H
+#define _SALTICIDAE_CONN_H
+
+#include <cassert>
+#include <cstdint>
+#include <event2/event.h>
+#include <arpa/inet.h>
+#include <unistd.h>
+
+#include <string>
+#include <unordered_map>
+#include <list>
+#include <algorithm>
+#include <exception>
+
+#include "salticidae/type.h"
+#include "salticidae/ref.h"
+#include "salticidae/util.h"
+#include "salticidae/netaddr.h"
+#include "salticidae/msg.h"
+
+const int MAX_LISTEN_BACKLOG = 10;
+const size_t BUFF_SEG_SIZE = 4096;
+const size_t MAX_MSG_HANDLER = 64;
+const double TRY_CONN_DELAY = 2;
+const double CONN_SERVER_TIMEOUT = 2;
+
+namespace salticidae {
+
+inline double gen_rand_timeout(double base_timeout) {
+ return base_timeout + rand() / (double)RAND_MAX * 0.5 * base_timeout;
+}
+
+class RingBuffer {
+ struct buffer_entry_t {
+ bytearray_t data;
+ bytearray_t::iterator offset;
+ buffer_entry_t(bytearray_t &&_data): data(std::move(_data)) {
+ offset = data.begin();
+ }
+
+ buffer_entry_t(buffer_entry_t &&other) {
+ size_t _offset = other.offset - other.data.begin();
+ data = std::move(other.data);
+ offset = data.begin() + _offset;
+ }
+
+ buffer_entry_t(const buffer_entry_t &other): data(other.data) {
+ offset = data.begin() + (other.offset - other.data.begin());
+ }
+
+ size_t length() const { return data.end() - offset; }
+ };
+ std::list<buffer_entry_t> ring;
+ size_t _size;
+
+ public:
+ RingBuffer(): _size(0) {}
+ ~RingBuffer() { clear(); }
+ RingBuffer &operator=(const RingBuffer &other) = delete;
+ RingBuffer(const RingBuffer &other) = delete;
+ RingBuffer &operator=(RingBuffer &&other) {
+ ring = std::move(other.ring);
+ _size = other._size;
+ other._size = 0;
+ return *this;
+ }
+
+ void push(bytearray_t &&data) {
+ _size += data.size();
+ ring.push_back(buffer_entry_t(std::move(data)));
+ }
+
+ bytearray_t pop(size_t len) {
+ bytearray_t res;
+ auto i = ring.begin();
+ while (len && i != ring.end())
+ {
+ size_t copy_len = std::min(i->length(), len);
+ res.insert(res.end(), i->offset, i->offset + copy_len);
+ i->offset += copy_len;
+ len -= copy_len;
+ if (i->offset == i->data.end())
+ i++;
+ }
+ ring.erase(ring.begin(), i);
+ _size -= res.size();
+ return std::move(res);
+ }
+
+ size_t size() const { return _size; }
+
+ void clear() {
+ ring.clear();
+ _size = 0;
+ }
+};
+
+class ConnPoolError: public SalticidaeError {
+ using SalticidaeError::SalticidaeError;
+};
+
+/** The connection pool. */
+class ConnPool {
+ public:
+ class Conn;
+ using conn_t = RcObj<Conn>;
+ /** The abstraction for a bi-directional connection. */
+ class Conn {
+ public:
+ enum ConnMode {
+ ACTIVE, /**< the connection is established by connect() */
+ PASSIVE, /**< the connection is established by accept() */
+ };
+
+ private:
+ conn_t self_ref;
+ int fd;
+ ConnPool *cpool;
+ ConnMode mode;
+ NetAddr addr;
+
+ RingBuffer send_buffer;
+ RingBuffer recv_buffer;
+
+ Event ev_read;
+ Event ev_write;
+ Event ev_connect;
+ /** does not need to wait if true */
+ bool ready_send;
+
+ void recv_data(evutil_socket_t, short);
+ void send_data(evutil_socket_t, short);
+ void conn_server(evutil_socket_t, short);
+ void try_conn(evutil_socket_t, short);
+
+ public:
+ friend ConnPool;
+ Conn(): self_ref(this) {}
+
+ virtual ~Conn() {
+ SALTICIDAE_LOG_INFO("destroyed connection %s", std::string(*this).c_str());
+ }
+
+ conn_t self() { return self_ref; }
+ operator std::string() const;
+ int get_fd() const { return fd; }
+ const NetAddr &get_addr() const { return addr; }
+ ConnMode get_mode() const { return mode; }
+ RingBuffer &read() { return recv_buffer; }
+
+ void write(bytearray_t &&data) {
+ send_buffer.push(std::move(data));
+ if (ready_send)
+ send_data(fd, EV_WRITE);
+ }
+
+ void move_send_buffer(conn_t other) {
+ send_buffer = std::move(other->send_buffer);
+ }
+
+ void terminate();
+
+ protected:
+ /** close the connection and free all on-going or planned events. */
+ virtual void close() {
+ ev_read.clear();
+ ev_write.clear();
+ ev_connect.clear();
+ ::close(fd);
+ fd = -1;
+ }
+
+ virtual void on_read() = 0;
+ virtual void on_setup() = 0;
+ virtual void on_teardown() = 0;
+ };
+
+ private:
+ std::unordered_map<int, conn_t> pool;
+ int listen_fd;
+ Event ev_listen;
+
+ void accept_client(evutil_socket_t, short);
+ conn_t add_conn(conn_t conn);
+
+ protected:
+ struct event_base *eb;
+ virtual conn_t create_conn() = 0;
+
+ public:
+ friend Conn;
+ ConnPool(struct event_base *eb): eb(eb) {}
+
+ ~ConnPool() {
+ for (auto it: pool)
+ {
+ conn_t conn = it.second;
+ conn->close();
+ }
+ }
+
+ /** create an active mode connection to addr */
+ conn_t create_conn(const NetAddr &addr);
+ /** setup and start listening */
+ void init(NetAddr listen_addr);
+};
+
+}
+
+#endif