aboutsummaryrefslogtreecommitdiff
path: root/include
diff options
context:
space:
mode:
authorDeterminant <ted.sybil@gmail.com>2018-06-26 12:58:54 -0400
committerDeterminant <ted.sybil@gmail.com>2018-06-26 12:58:54 -0400
commit5c3b39340d365f5ff37a79424956591e87b44816 (patch)
tree45fc59c19ed95c44bacbbe59396d8bc1b25872dd /include
init
Diffstat (limited to 'include')
-rw-r--r--include/salticidae/conn.h235
-rw-r--r--include/salticidae/crypto.h78
-rw-r--r--include/salticidae/msg.h253
-rw-r--r--include/salticidae/netaddr.h115
-rw-r--r--include/salticidae/network.h552
-rw-r--r--include/salticidae/ref.h202
-rw-r--r--include/salticidae/stream.h274
-rw-r--r--include/salticidae/type.h67
-rw-r--r--include/salticidae/util.h292
9 files changed, 2068 insertions, 0 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
new file mode 100644
index 0000000..40facc9
--- /dev/null
+++ b/include/salticidae/conn.h
@@ -0,0 +1,235 @@
+/**
+ * Copyright (c) 2018 Cornell University.
+ *
+ * Author: Ted Yin <tederminant@gmail.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is furnished to do
+ * so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#ifndef _SALTICIDAE_CONN_H
+#define _SALTICIDAE_CONN_H
+
+#include <cassert>
+#include <cstdint>
+#include <event2/event.h>
+#include <arpa/inet.h>
+#include <unistd.h>
+
+#include <string>
+#include <unordered_map>
+#include <list>
+#include <algorithm>
+#include <exception>
+
+#include "salticidae/type.h"
+#include "salticidae/ref.h"
+#include "salticidae/util.h"
+#include "salticidae/netaddr.h"
+#include "salticidae/msg.h"
+
+const int MAX_LISTEN_BACKLOG = 10;
+const size_t BUFF_SEG_SIZE = 4096;
+const size_t MAX_MSG_HANDLER = 64;
+const double TRY_CONN_DELAY = 2;
+const double CONN_SERVER_TIMEOUT = 2;
+
+namespace salticidae {
+
+inline double gen_rand_timeout(double base_timeout) {
+ return base_timeout + rand() / (double)RAND_MAX * 0.5 * base_timeout;
+}
+
+class RingBuffer {
+ struct buffer_entry_t {
+ bytearray_t data;
+ bytearray_t::iterator offset;
+ buffer_entry_t(bytearray_t &&_data): data(std::move(_data)) {
+ offset = data.begin();
+ }
+
+ buffer_entry_t(buffer_entry_t &&other) {
+ size_t _offset = other.offset - other.data.begin();
+ data = std::move(other.data);
+ offset = data.begin() + _offset;
+ }
+
+ buffer_entry_t(const buffer_entry_t &other): data(other.data) {
+ offset = data.begin() + (other.offset - other.data.begin());
+ }
+
+ size_t length() const { return data.end() - offset; }
+ };
+ std::list<buffer_entry_t> ring;
+ size_t _size;
+
+ public:
+ RingBuffer(): _size(0) {}
+ ~RingBuffer() { clear(); }
+ RingBuffer &operator=(const RingBuffer &other) = delete;
+ RingBuffer(const RingBuffer &other) = delete;
+ RingBuffer &operator=(RingBuffer &&other) {
+ ring = std::move(other.ring);
+ _size = other._size;
+ other._size = 0;
+ return *this;
+ }
+
+ void push(bytearray_t &&data) {
+ _size += data.size();
+ ring.push_back(buffer_entry_t(std::move(data)));
+ }
+
+ bytearray_t pop(size_t len) {
+ bytearray_t res;
+ auto i = ring.begin();
+ while (len && i != ring.end())
+ {
+ size_t copy_len = std::min(i->length(), len);
+ res.insert(res.end(), i->offset, i->offset + copy_len);
+ i->offset += copy_len;
+ len -= copy_len;
+ if (i->offset == i->data.end())
+ i++;
+ }
+ ring.erase(ring.begin(), i);
+ _size -= res.size();
+ return std::move(res);
+ }
+
+ size_t size() const { return _size; }
+
+ void clear() {
+ ring.clear();
+ _size = 0;
+ }
+};
+
+class ConnPoolError: public SalticidaeError {
+ using SalticidaeError::SalticidaeError;
+};
+
+/** The connection pool. */
+class ConnPool {
+ public:
+ class Conn;
+ using conn_t = RcObj<Conn>;
+ /** The abstraction for a bi-directional connection. */
+ class Conn {
+ public:
+ enum ConnMode {
+ ACTIVE, /**< the connection is established by connect() */
+ PASSIVE, /**< the connection is established by accept() */
+ };
+
+ private:
+ conn_t self_ref;
+ int fd;
+ ConnPool *cpool;
+ ConnMode mode;
+ NetAddr addr;
+
+ RingBuffer send_buffer;
+ RingBuffer recv_buffer;
+
+ Event ev_read;
+ Event ev_write;
+ Event ev_connect;
+ /** does not need to wait if true */
+ bool ready_send;
+
+ void recv_data(evutil_socket_t, short);
+ void send_data(evutil_socket_t, short);
+ void conn_server(evutil_socket_t, short);
+ void try_conn(evutil_socket_t, short);
+
+ public:
+ friend ConnPool;
+ Conn(): self_ref(this) {}
+
+ virtual ~Conn() {
+ SALTICIDAE_LOG_INFO("destroyed connection %s", std::string(*this).c_str());
+ }
+
+ conn_t self() { return self_ref; }
+ operator std::string() const;
+ int get_fd() const { return fd; }
+ const NetAddr &get_addr() const { return addr; }
+ ConnMode get_mode() const { return mode; }
+ RingBuffer &read() { return recv_buffer; }
+
+ void write(bytearray_t &&data) {
+ send_buffer.push(std::move(data));
+ if (ready_send)
+ send_data(fd, EV_WRITE);
+ }
+
+ void move_send_buffer(conn_t other) {
+ send_buffer = std::move(other->send_buffer);
+ }
+
+ void terminate();
+
+ protected:
+ /** close the connection and free all on-going or planned events. */
+ virtual void close() {
+ ev_read.clear();
+ ev_write.clear();
+ ev_connect.clear();
+ ::close(fd);
+ fd = -1;
+ }
+
+ virtual void on_read() = 0;
+ virtual void on_setup() = 0;
+ virtual void on_teardown() = 0;
+ };
+
+ private:
+ std::unordered_map<int, conn_t> pool;
+ int listen_fd;
+ Event ev_listen;
+
+ void accept_client(evutil_socket_t, short);
+ conn_t add_conn(conn_t conn);
+
+ protected:
+ struct event_base *eb;
+ virtual conn_t create_conn() = 0;
+
+ public:
+ friend Conn;
+ ConnPool(struct event_base *eb): eb(eb) {}
+
+ ~ConnPool() {
+ for (auto it: pool)
+ {
+ conn_t conn = it.second;
+ conn->close();
+ }
+ }
+
+ /** create an active mode connection to addr */
+ conn_t create_conn(const NetAddr &addr);
+ /** setup and start listening */
+ void init(NetAddr listen_addr);
+};
+
+}
+
+#endif
diff --git a/include/salticidae/crypto.h b/include/salticidae/crypto.h
new file mode 100644
index 0000000..c329c63
--- /dev/null
+++ b/include/salticidae/crypto.h
@@ -0,0 +1,78 @@
+/**
+ * Copyright (c) 2018 Cornell University.
+ *
+ * Author: Ted Yin <tederminant@gmail.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is furnished to do
+ * so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#ifndef _SALTICIDAE_CRYPTO_H
+#define _SALTICIDAE_CRYPTO_H
+
+#include "salticidae/type.h"
+#include <openssl/sha.h>
+
+namespace salticidae {
+
+class SHA256 {
+ SHA256_CTX *ctx;
+
+ public:
+ SHA256(): ctx(new SHA256_CTX()) { reset(); }
+ ~SHA256() { delete ctx; }
+
+ void reset() {
+ if (!SHA256_Init(ctx))
+ throw std::runtime_error("openssl SHA256 init error");
+ }
+
+ template<typename T>
+ void update(const T &data) {
+ update(reinterpret_cast<const uint8_t *>(&*data.begin()), data.size());
+ }
+
+ void update(const bytearray_t::const_iterator &it, size_t length) {
+ update(&*it, length);
+ }
+
+ void update(const uint8_t *ptr, size_t length) {
+ if (!SHA256_Update(ctx, ptr, length))
+ throw std::runtime_error("openssl SHA256 update error");
+ }
+
+ void _digest(bytearray_t &md) {
+ if (!SHA256_Final(&*md.begin(), ctx))
+ throw std::runtime_error("openssl SHA256 error");
+ }
+
+ void digest(bytearray_t &md) {
+ md.resize(32);
+ _digest(md);
+ }
+
+ bytearray_t digest() {
+ bytearray_t md(32);
+ _digest(md);
+ return std::move(md);
+ }
+};
+
+}
+
+#endif
diff --git a/include/salticidae/msg.h b/include/salticidae/msg.h
new file mode 100644
index 0000000..62fc33b
--- /dev/null
+++ b/include/salticidae/msg.h
@@ -0,0 +1,253 @@
+/**
+ * Copyright (c) 2018 Cornell University.
+ *
+ * Author: Ted Yin <tederminant@gmail.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is furnished to do
+ * so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#ifndef _SALTICIDAE_MSG_H
+#define _SALTICIDAE_MSG_H
+
+#include <cstdint>
+#include <cstring>
+#include <string>
+#include <vector>
+
+#include "salticidae/type.h"
+#include "salticidae/stream.h"
+#include "salticidae/netaddr.h"
+
+namespace salticidae {
+
+template<typename OpcodeType = uint8_t,
+ const OpcodeType PING = 0xf0,
+ const OpcodeType PONG = 0xf1>
+class MsgBase {
+ public:
+ using opcode_t = OpcodeType;
+ static const opcode_t OPCODE_PING = PING;
+ static const opcode_t OPCODE_PONG = PONG;
+ static const size_t header_size;
+
+ private:
+ /* header */
+ /* all integers are encoded in little endian in the protocol */
+ uint32_t magic;
+ opcode_t opcode;
+ uint32_t length;
+ uint32_t checksum;
+
+ mutable bytearray_t payload;
+ mutable bool no_payload;
+
+ public:
+ MsgBase(): magic(0x0), no_payload(true) {}
+
+ MsgBase(const MsgBase &other):
+ magic(other.magic),
+ opcode(other.opcode),
+ length(other.length),
+ checksum(other.checksum),
+ payload(other.payload),
+ no_payload(other.no_payload) {}
+
+ MsgBase(MsgBase &&other):
+ magic(other.magic),
+ opcode(std::move(other.opcode)),
+ length(other.length),
+ checksum(other.checksum),
+ payload(std::move(other.payload)),
+ no_payload(other.no_payload) {}
+
+ MsgBase(const uint8_t *raw_header) {
+ uint32_t _magic;
+ opcode_t _opcode;
+ uint32_t _length;
+ uint32_t _checksum;
+ DataStream s(raw_header, raw_header + MsgBase::header_size);
+
+ s >> _magic
+ >> _opcode
+ >> _length
+ >> _checksum;
+ magic = letoh(_magic);
+ opcode = _opcode;
+ length = letoh(_length);
+ checksum = letoh(_checksum);
+ }
+
+ MsgBase &operator=(const MsgBase &other) {
+ magic = other.magic;
+ opcode = other.opcode;
+ length = other.length;
+ checksum = other.checksum;
+ payload = other.payload;
+ no_payload = other.no_payload;
+ return *this;
+ }
+
+ MsgBase &operator=(MsgBase &&other) {
+ magic = other.magic;
+ opcode = std::move(other.opcode);
+ length = other.length;
+ checksum = other.checksum;
+ payload = std::move(other.payload);
+ no_payload = other.no_payload;
+ return *this;
+ }
+
+ ~MsgBase() {}
+
+ size_t get_length() const { return length; }
+
+ const opcode_t &get_opcode() const { return opcode; }
+
+ void set_opcode(const opcode_t &_opcode) {
+ opcode = _opcode;
+ }
+
+ bytearray_t &&get_payload() const {
+#ifndef SALTICIDAE_NOCHECK
+ if (no_payload)
+ throw std::runtime_error("payload not available");
+ no_payload = true;
+#endif
+ return std::move(payload);
+ }
+
+ void set_payload(DataStream &&s) {
+ set_payload(bytearray_t(std::move(s)));
+ }
+
+ void set_payload(bytearray_t &&_payload) {
+ payload = std::move(_payload);
+ length = payload.size();
+ checksum = get_checksum();
+#ifndef SALTICIDAE_NOCHECK
+ no_payload = false;
+#endif
+ }
+
+ operator std::string() const {
+ DataStream s;
+ s << "<"
+ << "magic=" << get_hex(magic) << " "
+ << "opcode=" << get_hex(opcode) << " "
+ << "length=" << get_hex(length) << " "
+ << "checksum=" << get_hex(checksum) << " "
+ << "payload=" << get_hex(payload) << ">";
+
+ //std::string opcode_hex = get_hex(opcode);
+ //char *buff = new char[128 + opcode_hex.size()];
+ //size_t ret = sprintf(buff,
+ // "<magic=%08x opcode=%s length=%08x checksum=%08x payload=",
+ // magic, opcode_hex.c_str(), length, checksum);
+ //buff[ret] = 0;
+ //std::string res = std::string(buff) + bin2hexstr(payload.data(), length) + ">";
+ //delete [] buff;
+ //return std::move(res);
+ return std::string(s);
+ }
+
+ uint32_t get_checksum() const {
+ static class SHA256 sha256;
+ uint32_t res;
+ bytearray_t tmp;
+ sha256.reset();
+ sha256.update(payload);
+ sha256.digest(tmp);
+ sha256.reset();
+ sha256.update(tmp);
+ sha256.digest(tmp);
+ memmove(&res, &*tmp.begin(), 4);
+ return res;
+ }
+
+ bool verify_checksum() const {
+ return checksum == get_checksum();
+ }
+
+ bytearray_t serialize() const {
+ DataStream s;
+ s << htole(magic)
+ << opcode
+ << htole(length)
+ << htole(checksum)
+ << payload;
+ return std::move(s);
+ }
+
+ void gen_ping(uint16_t port) {
+ DataStream s;
+ set_opcode(OPCODE_PING);
+ s << htole(port);
+ set_payload(std::move(s));
+ }
+
+ void parse_ping(uint16_t &port) const {
+ DataStream s(get_payload());
+ s >> port;
+ port = letoh(port);
+ }
+
+ void gen_pong(uint16_t port) {
+ DataStream s;
+ set_opcode(OPCODE_PONG);
+ s << htole(port);
+ set_payload(std::move(s));
+ }
+
+ void parse_pong(uint16_t &port) const {
+ DataStream s(get_payload());
+ s >> port;
+ port = letoh(port);
+ }
+
+ void gen_hash_list(DataStream &s,
+ const std::vector<uint256_t> &hashes) {
+ uint32_t size = htole((uint32_t)hashes.size());
+ s << size;
+ for (const auto &h: hashes) s << h;
+ }
+
+ void parse_hash_list(DataStream &s,
+ std::vector<uint256_t> &hashes) const {
+ uint32_t size;
+ hashes.clear();
+
+ s >> size;
+ size = letoh(size);
+
+ hashes.resize(size);
+ for (auto &hash: hashes) s >> hash;
+ }
+
+};
+
+template<typename OpcodeType,
+ OpcodeType _,
+ OpcodeType __>
+const size_t MsgBase<OpcodeType, _, __>::header_size =
+ sizeof(MsgBase<OpcodeType, _, __>) -
+ sizeof(MsgBase<OpcodeType, _, __>::payload) -
+ sizeof(MsgBase<OpcodeType, _, __>::no_payload);
+}
+
+#endif
diff --git a/include/salticidae/netaddr.h b/include/salticidae/netaddr.h
new file mode 100644
index 0000000..c166c3a
--- /dev/null
+++ b/include/salticidae/netaddr.h
@@ -0,0 +1,115 @@
+/**
+ * Copyright (c) 2018 Cornell University.
+ *
+ * Author: Ted Yin <tederminant@gmail.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is furnished to do
+ * so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#ifndef _SALTICIDAE_NETADDR_H
+#define _SALTICIDAE_NETADDR_H
+
+#include <string>
+#include <cstring>
+#include <cstdint>
+#include <arpa/inet.h>
+
+#include "salticidae/util.h"
+
+namespace salticidae {
+
+/* TODO: IPv6 support */
+
+struct NetAddr {
+ uint32_t ip;
+ uint16_t port;
+ /* construct from human-readable format */
+ NetAddr(): ip(0), port(0) {}
+
+ NetAddr(uint32_t ip, uint16_t port): ip(ip), port(port) {}
+
+ NetAddr(const std::string &_addr, uint16_t _port) {
+ set_by_ip_port(_addr, _port);
+ }
+
+ void set_by_ip_port(const std::string &_addr, uint16_t _port) {
+ struct hostent *h;
+ if ((h = gethostbyname(_addr.c_str())) == nullptr)
+ throw SalticidaeError("gethostbyname failed");
+ memmove(&ip, h->h_addr_list[0], sizeof(in_addr_t));
+ port = htons(_port);
+ }
+
+ NetAddr(const std::string &ip_port_addr) {
+ size_t pos = ip_port_addr.find(":");
+ if (pos == std::string::npos)
+ throw SalticidaeError("invalid port format");
+ std::string ip_str = ip_port_addr.substr(0, pos);
+ std::string port_str = ip_port_addr.substr(pos + 1);
+ long port;
+ try {
+ port = std::stol(port_str.c_str());
+ } catch (std::logic_error) {
+ throw SalticidaeError("invalid port format");
+ }
+ if (port < 0)
+ throw SalticidaeError("negative port number");
+ if (port > 0xffff)
+ throw SalticidaeError("port number greater than 0xffff");
+ set_by_ip_port(ip_str, (uint16_t)port);
+ }
+ /* construct from unix socket format */
+ NetAddr(const struct sockaddr_in *addr_sock) {
+ ip = addr_sock->sin_addr.s_addr;
+ port = addr_sock->sin_port;
+ }
+
+ bool operator==(const NetAddr &other) const {
+ return ip == other.ip && port == other.port;
+ }
+
+ operator std::string() const {
+ struct in_addr in;
+ in.s_addr = ip;
+ return "<NetAddr " + std::string(inet_ntoa(in)) +
+ ":" + std::to_string(ntohs(port)) + ">";
+ }
+
+ bool is_null() const { return ip == 0 && port == 0; }
+};
+
+}
+
+namespace std {
+ template <>
+ struct hash<salticidae::NetAddr> {
+ size_t operator()(const salticidae::NetAddr &k) const {
+ return k.ip ^ k.port;
+ }
+ };
+
+ template <>
+ struct hash<const salticidae::NetAddr> {
+ size_t operator()(const salticidae::NetAddr &k) const {
+ return k.ip ^ k.port;
+ }
+ };
+}
+
+#endif
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
new file mode 100644
index 0000000..3b82927
--- /dev/null
+++ b/include/salticidae/network.h
@@ -0,0 +1,552 @@
+/**
+ * Copyright (c) 2018 Cornell University.
+ *
+ * Author: Ted Yin <tederminant@gmail.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is furnished to do
+ * so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#ifndef _SALTICIDAE_NETWORK_H
+#define _SALTICIDAE_NETWORK_H
+
+#include "salticidae/netaddr.h"
+#include "salticidae/msg.h"
+#include "salticidae/conn.h"
+
+namespace salticidae {
+
+/** Network of nodes who can send async messages. */
+template<typename MsgType>
+class MsgNetwork: public ConnPool {
+ public:
+ class Conn: public ConnPool::Conn {
+ enum MsgState {
+ HEADER,
+ PAYLOAD
+ };
+ MsgType msg;
+ MsgState msg_state;
+ MsgNetwork *mn;
+
+ protected:
+ mutable size_t nsent;
+ mutable size_t nrecv;
+
+ public:
+ friend MsgNetwork;
+ Conn(MsgNetwork *mn): msg_state(HEADER), mn(mn), nsent(0), nrecv(0) {}
+ size_t get_nsent() const { return nsent; }
+ size_t get_nrecv() const { return nrecv; }
+ void clear_nsent() const { nsent = 0; }
+ void clear_nrecv() const { nrecv = 0; }
+
+ protected:
+ void on_read() override;
+ void on_setup() override {}
+ void on_teardown() override {}
+ };
+
+ using conn_t = RcObj<Conn>;
+ using msg_callback_t = std::function<void(const MsgType &msg, conn_t conn)>;
+ class msg_stat_by_opcode_t:
+ public std::unordered_map<typename MsgType::opcode_t,
+ std::pair<uint32_t, size_t>> {
+ public:
+ void add(const MsgType &msg) {
+ auto &p = this->operator[](msg.get_opcode());
+ p.first++;
+ p.second += msg.get_length();
+ }
+ };
+
+ private:
+ std::unordered_map<typename MsgType::opcode_t,
+ msg_callback_t> handler_map;
+
+ protected:
+ mutable msg_stat_by_opcode_t sent_by_opcode;
+ mutable msg_stat_by_opcode_t recv_by_opcode;
+ uint16_t listen_port;
+
+ ConnPool::conn_t create_conn() override { return (new Conn(this))->self(); }
+
+ public:
+ MsgNetwork(struct event_base *eb): ConnPool(eb) {}
+ void reg_handler(typename MsgType::opcode_t opcode, msg_callback_t handler);
+ void send_msg(const MsgType &msg, conn_t conn);
+ void init(NetAddr listen_addr);
+ msg_stat_by_opcode_t &get_sent_by_opcode() const {
+ return sent_by_opcode;
+ }
+ msg_stat_by_opcode_t &get_recv_by_opcode() const {
+ return recv_by_opcode;
+ }
+};
+
+/** Simple network that handles client-server requests. */
+template<typename MsgType>
+class ClientNetwork: public MsgNetwork<MsgType> {
+ using MsgNet = MsgNetwork<MsgType>;
+ std::unordered_map<NetAddr, typename MsgNet::conn_t> addr2conn;
+
+ public:
+ class Conn: public MsgNet::Conn {
+ ClientNetwork *cn;
+
+ public:
+ Conn(ClientNetwork *cn):
+ MsgNet::Conn(static_cast<MsgNet *>(cn)),
+ cn(cn) {}
+
+ protected:
+ void on_setup() override;
+ void on_teardown() override;
+ };
+
+ protected:
+ ConnPool::conn_t create_conn() override { return (new Conn(this))->self(); }
+
+ public:
+ ClientNetwork(struct event_base *eb): MsgNet(eb) {}
+ void send_msg(const MsgType &msg, const NetAddr &addr);
+};
+
+class PeerNetworkError: public SalticidaeError {
+ using SalticidaeError::SalticidaeError;
+};
+
+/** Peer-to-peer network where any two nodes could hold a bi-diretional message
+ * channel, established by either side. */
+template<typename MsgType>
+class PeerNetwork: public MsgNetwork<MsgType> {
+ using MsgNet= MsgNetwork<MsgType>;
+ public:
+ enum IdentityMode {
+ IP_BASED,
+ IP_PORT_BASED
+ };
+
+ class Conn: public MsgNet::Conn {
+ NetAddr peer_id;
+ Event ev_timeout;
+ PeerNetwork *pn;
+
+ public:
+ friend PeerNetwork;
+ const NetAddr &get_peer() { return peer_id; }
+ Conn(PeerNetwork *pn):
+ MsgNet::Conn(static_cast<MsgNet *>(pn)),
+ pn(pn) {}
+
+ protected:
+ void close() override {
+ ev_timeout.clear();
+ MsgNet::Conn::close();
+ }
+
+ void on_setup() override;
+ void on_teardown() override;
+ };
+
+ using conn_t = RcObj<Conn>;
+
+ private:
+ struct Peer {
+ /** connection addr, may be different due to passive mode */
+ NetAddr addr;
+ /** the underlying connection, may be invalid when connected = false */
+ conn_t conn;
+ PeerNetwork *pn;
+ Event ev_ping_timer;
+ bool ping_timer_ok;
+ bool pong_msg_ok;
+ bool connected;
+
+ Peer() = delete;
+ Peer(NetAddr addr, conn_t conn, PeerNetwork *pn, struct event_base *eb):
+ addr(addr), conn(conn), pn(pn),
+ ev_ping_timer(
+ Event(eb, -1, 0, std::bind(&Peer::ping_timer, this, _1, _2))),
+ connected(false) {}
+ ~Peer() {}
+ Peer &operator=(const Peer &) = delete;
+ Peer(const Peer &) = delete;
+
+ void ping_timer(evutil_socket_t, short);
+ void reset_ping_timer();
+ void send_ping();
+ void clear_all_events() {
+ if (ev_ping_timer)
+ ev_ping_timer.del();
+ }
+ void reset_conn(conn_t conn);
+ };
+
+ std::unordered_map <NetAddr, Peer *> id2peer;
+ std::vector<NetAddr> peer_list;
+
+ IdentityMode id_mode;
+ double ping_period;
+ double conn_timeout;
+
+ void msg_ping(const MsgType &msg, ConnPool::conn_t conn);
+ void msg_pong(const MsgType &msg, ConnPool::conn_t conn);
+ void reset_conn_timeout(conn_t conn);
+ bool check_new_conn(conn_t conn, uint16_t port);
+ void start_active_conn(const NetAddr &paddr);
+
+ protected:
+ ConnPool::conn_t create_conn() override { return (new Conn(this))->self(); }
+
+ public:
+ PeerNetwork(struct event_base *eb,
+ double ping_period = 30,
+ double conn_timeout = 180,
+ IdentityMode id_mode = IP_PORT_BASED):
+ MsgNet(eb),
+ id_mode(id_mode),
+ ping_period(ping_period),
+ conn_timeout(conn_timeout) {}
+
+ void add_peer(const NetAddr &paddr);
+ const conn_t get_peer_conn(const NetAddr &paddr) const;
+ void send_msg(const MsgType &msg, const Peer *peer);
+ void send_msg(const MsgType &msg, const NetAddr &paddr);
+ void init(NetAddr listen_addr);
+ bool has_peer(const NetAddr &paddr) const;
+ const std::vector<NetAddr> &all_peers() const;
+ using ConnPool::create_conn;
+};
+
+template<typename MsgType>
+void MsgNetwork<MsgType>::Conn::on_read() {
+ auto &recv_buffer = read();
+ auto conn = static_pointer_cast<Conn>(self());
+ while (get_fd() != -1)
+ {
+ if (msg_state == Conn::HEADER)
+ {
+ if (recv_buffer.size() < MsgType::header_size) break;
+ /* new header available */
+ bytearray_t data = recv_buffer.pop(MsgType::header_size);
+ msg = MsgType(data.data());
+ msg_state = Conn::PAYLOAD;
+ }
+ if (msg_state == Conn::PAYLOAD)
+ {
+ size_t len = msg.get_length();
+ if (recv_buffer.size() < len) break;
+ /* new payload available */
+ bytearray_t data = recv_buffer.pop(len);
+ msg.set_payload(std::move(data));
+ msg_state = Conn::HEADER;
+ if (!msg.verify_checksum())
+ {
+ SALTICIDAE_LOG_WARN("checksums do not match, dropping the message");
+ return;
+ }
+ auto it = mn->handler_map.find(msg.get_opcode());
+ if (it == mn->handler_map.end())
+ SALTICIDAE_LOG_WARN("unknown command: %s", get_hex(msg.get_opcode()));
+ else /* call the handler */
+ {
+ SALTICIDAE_LOG_DEBUG("got message %s from %s",
+ std::string(msg).c_str(),
+ std::string(*this).c_str());
+ it->second(msg, conn);
+ nrecv++;
+ mn->recv_by_opcode.add(msg);
+ }
+ }
+ }
+}<