diff options
-rw-r--r-- | include/salticidae/conn.h | 41 | ||||
-rw-r--r-- | include/salticidae/msg.h | 2 | ||||
-rw-r--r-- | include/salticidae/netaddr.h | 6 | ||||
-rw-r--r-- | include/salticidae/network.h | 72 | ||||
-rw-r--r-- | src/conn.cpp | 24 | ||||
-rw-r--r-- | src/util.cpp | 5 |
6 files changed, 94 insertions, 56 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 0cb721e..1a47904 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -139,13 +139,15 @@ class ConnPoolError: public SalticidaeError { using SalticidaeError::SalticidaeError; }; -/** The connection pool. */ +/** Abstraction for connection management. */ class ConnPool { public: class Conn; + /** The handle to a bi-directional connection. */ using conn_t = RcObj<Conn>; - /** The abstraction for a bi-directional connection. */ + /** Abstraction for a bi-directional connection. */ class Conn { + friend ConnPool; public: enum ConnMode { ACTIVE, /**< the connection is established by connect() */ @@ -172,38 +174,45 @@ class ConnPool { void recv_data(evutil_socket_t, short); void send_data(evutil_socket_t, short); void conn_server(evutil_socket_t, short); - void try_conn(evutil_socket_t, short); + void try_conn(); public: - friend ConnPool; Conn(): self_ref(this) {} + Conn(const Conn &) = delete; + Conn(Conn &&other) = delete; virtual ~Conn() { - SALTICIDAE_LOG_INFO("destroyed connection %s", std::string(*this).c_str()); + SALTICIDAE_LOG_INFO("destroyed %s", std::string(*this).c_str()); } + /** Get the handle to itself. */ conn_t self() { return self_ref; } operator std::string() const; int get_fd() const { return fd; } const NetAddr &get_addr() const { return addr; } ConnMode get_mode() const { return mode; } RingBuffer &read() { return recv_buffer; } + /** Set the buffer size used for send/receive data. */ void set_seg_buff_size(size_t size) { seg_buff_size = size; } + /** Write data to the connection (non-blocking). The data will be sent + * whenever I/O is available. */ void write(bytearray_t &&data) { send_buffer.push(std::move(data)); if (ready_send) send_data(fd, EV_WRITE); } + /** Move the send buffer from the other (old) connection. */ void move_send_buffer(conn_t other) { send_buffer = std::move(other->send_buffer); } + /** Terminate the connection. */ void terminate(); protected: - /** close the connection and free all on-going or planned events. */ + /** Close the IO and clear all on-going or planned events. */ virtual void close() { ev_read.clear(); ev_write.clear(); @@ -212,16 +221,19 @@ class ConnPool { fd = -1; } + /** Called when new data is available. */ virtual void on_read() = 0; + /** Called when the underlying connection is established. */ virtual void on_setup() = 0; + /** Called when the underlying connection breaks. */ virtual void on_teardown() = 0; }; private: int max_listen_backlog; - double try_conn_delay; double conn_server_timeout; size_t seg_buff_size; + std::unordered_map<int, conn_t> pool; int listen_fd; Event ev_listen; @@ -231,20 +243,15 @@ class ConnPool { protected: EventContext eb; + /** Should be implemented by derived class to return a new Conn object. */ virtual conn_t create_conn() = 0; - virtual double gen_conn_timeout() { - return gen_rand_timeout(try_conn_delay); - } public: - friend Conn; ConnPool(const EventContext &eb, int max_listen_backlog = 10, - double try_conn_delay = 2, double conn_server_timeout = 2, size_t seg_buff_size = 4096): max_listen_backlog(max_listen_backlog), - try_conn_delay(try_conn_delay), conn_server_timeout(conn_server_timeout), seg_buff_size(seg_buff_size), eb(eb) {} @@ -260,9 +267,11 @@ class ConnPool { ConnPool(const ConnPool &) = delete; ConnPool(ConnPool &&) = delete; - /** create an active mode connection to addr */ - conn_t create_conn(const NetAddr &addr); - /** setup and start listening */ + /** Actively connect to remote addr. */ + conn_t connect(const NetAddr &addr); + /** Listen for passive connections (connection initiated from remote). + * Does not need to be called if do not want to accept any passive + * connections. */ void listen(NetAddr listen_addr); }; diff --git a/include/salticidae/msg.h b/include/salticidae/msg.h index 798062a..33f0d2b 100644 --- a/include/salticidae/msg.h +++ b/include/salticidae/msg.h @@ -160,7 +160,7 @@ class MsgBase { << "length=" << std::to_string(length) << " " << "checksum=" << get_hex(checksum) << " " << "payload=" << get_hex(payload) << ">"; - return std::string(std::move(s)); + return std::move(s); } uint32_t get_checksum() const { diff --git a/include/salticidae/netaddr.h b/include/salticidae/netaddr.h index eabb5da..5639c1b 100644 --- a/include/salticidae/netaddr.h +++ b/include/salticidae/netaddr.h @@ -92,8 +92,10 @@ struct NetAddr { operator std::string() const { struct in_addr in; in.s_addr = ip; - return "<NetAddr " + std::string(inet_ntoa(in)) + - ":" + std::to_string(ntohs(port)) + ">"; + DataStream s; + s << "<NetAddr " << std::string(inet_ntoa(in)) + << ":" << std::to_string(ntohs(port)) << ">"; + return std::move(s); } bool is_null() const { return ip == 0 && port == 0; } diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 93b5766..7b05bdb 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -46,16 +46,26 @@ class MsgNetwork: public ConnPool { MsgNetwork *mn; protected: +#ifdef SALTICIDAE_MSG_STAT mutable size_t nsent; mutable size_t nrecv; +#endif public: friend MsgNetwork; - Conn(MsgNetwork *mn): msg_state(HEADER), mn(mn), nsent(0), nrecv(0) {} + Conn(MsgNetwork *mn): + msg_state(HEADER), mn(mn) +#ifdef SALTICIDAE_MSG_STAT + , nsent(0), nrecv(0) +#endif + {} + +#ifdef SALTICIDAE_MSG_STAT size_t get_nsent() const { return nsent; } size_t get_nrecv() const { return nrecv; } void clear_nsent() const { nsent = 0; } void clear_nrecv() const { nrecv = 0; } +#endif protected: void on_read() override; @@ -65,6 +75,7 @@ class MsgNetwork: public ConnPool { using conn_t = RcObj<Conn>; using msg_callback_t = std::function<void(const MsgType &msg, conn_t conn)>; +#ifdef SALTICIDAE_MSG_STAT class msg_stat_by_opcode_t: public std::unordered_map<typename MsgType::opcode_t, std::pair<uint32_t, size_t>> { @@ -75,39 +86,42 @@ class MsgNetwork: public ConnPool { p.second += msg.get_length(); } }; +#endif private: std::unordered_map<typename MsgType::opcode_t, msg_callback_t> handler_map; protected: +#ifdef SALTICIDAE_MSG_STAT mutable msg_stat_by_opcode_t sent_by_opcode; mutable msg_stat_by_opcode_t recv_by_opcode; +#endif ConnPool::conn_t create_conn() override { return (new Conn(this))->self(); } public: MsgNetwork(const EventContext &eb, int max_listen_backlog, - double try_conn_delay, double conn_server_timeout, size_t seg_buff_size): ConnPool(eb, max_listen_backlog, - try_conn_delay, conn_server_timeout, seg_buff_size) {} void reg_handler(typename MsgType::opcode_t opcode, msg_callback_t handler); void send_msg(const MsgType &msg, conn_t conn); using ConnPool::listen; +#ifdef SALTICIDAE_MSG_STAT msg_stat_by_opcode_t &get_sent_by_opcode() const { return sent_by_opcode; } msg_stat_by_opcode_t &get_recv_by_opcode() const { return recv_by_opcode; } - conn_t create_conn(const NetAddr &addr) { - return static_pointer_cast<Conn>(ConnPool::create_conn(addr)); +#endif + conn_t connect(const NetAddr &addr) { + return static_pointer_cast<Conn>(ConnPool::connect(addr)); } }; @@ -139,16 +153,14 @@ class ClientNetwork: public MsgNetwork<MsgType> { public: ClientNetwork(const EventContext &eb, int max_listen_backlog = 10, - double try_conn_delay = 0, double conn_server_timeout = 0, size_t seg_buff_size = 4096): MsgNet(eb, max_listen_backlog, - try_conn_delay, conn_server_timeout, seg_buff_size) {} void send_msg(const MsgType &msg, const NetAddr &addr); - conn_t create_conn(const NetAddr &addr) = delete; + conn_t connect(const NetAddr &addr) = delete; }; class PeerNetworkError: public SalticidaeError { @@ -198,6 +210,7 @@ class PeerNetwork: public MsgNetwork<MsgType> { conn_t conn; PeerNetwork *pn; Event ev_ping_timer; + Event ev_retry_timer; bool ping_timer_ok; bool pong_msg_ok; bool connected; @@ -222,10 +235,11 @@ class PeerNetwork: public MsgNetwork<MsgType> { void reset_conn(conn_t conn); }; - std::unordered_map <NetAddr, Peer *> id2peer; + std::unordered_map <NetAddr, BoxObj<Peer>> id2peer; std::vector<NetAddr> peer_list; IdentityMode id_mode; + double retry_conn_delay; double ping_period; double conn_timeout; uint16_t listen_port; @@ -238,21 +252,24 @@ class PeerNetwork: public MsgNetwork<MsgType> { protected: ConnPool::conn_t create_conn() override { return (new Conn(this))->self(); } + virtual double gen_conn_timeout() { + return gen_rand_timeout(retry_conn_delay); + } public: PeerNetwork(const EventContext &eb, int max_listen_backlog = 10, - double try_conn_delay = 2, + double retry_conn_delay = 2, double conn_server_timeout = 2, size_t seg_buff_size = 4096, double ping_period = 30, double conn_timeout = 180, IdentityMode id_mode = IP_PORT_BASED): MsgNet(eb, max_listen_backlog, - try_conn_delay, conn_server_timeout, seg_buff_size), id_mode(id_mode), + retry_conn_delay(retry_conn_delay), ping_period(ping_period), conn_timeout(conn_timeout) {} @@ -263,8 +280,8 @@ class PeerNetwork: public MsgNetwork<MsgType> { void listen(NetAddr listen_addr); bool has_peer(const NetAddr &paddr) const; const std::vector<NetAddr> &all_peers() const; - conn_t create_conn(const NetAddr &addr) { - return static_pointer_cast<Conn>(ConnPool::create_conn(addr)); + conn_t connect(const NetAddr &addr) { + return static_pointer_cast<Conn>(ConnPool::connect(addr)); } }; @@ -305,8 +322,10 @@ void MsgNetwork<MsgType>::Conn::on_read() { std::string(msg).c_str(), std::string(*this).c_str()); it->second(msg, conn); +#ifdef SALTICIDAE_MSG_STAT nrecv++; mn->recv_by_opcode.add(msg); +#endif } } } @@ -348,7 +367,7 @@ template<typename MsgType> void PeerNetwork<MsgType>::Conn::on_teardown() { auto it = pn->id2peer.find(peer_id); if (it == pn->id2peer.end()) return; - Peer *p = it->second; + auto p = it->second.get(); if (this != p->conn.get()) return; p->ev_ping_timer.del(); p->connected = false; @@ -356,7 +375,12 @@ void PeerNetwork<MsgType>::Conn::on_teardown() { SALTICIDAE_LOG_INFO("connection lost %s for %s", std::string(*this).c_str(), std::string(peer_id).c_str()); - pn->start_active_conn(peer_id); + p->ev_retry_timer = Event(pn->eb, -1, 0, + [pn = this->pn, + peer_id = this->peer_id](evutil_socket_t, short) { + pn->start_active_conn(peer_id); + }); + p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout()); } template<typename MsgType> @@ -368,7 +392,7 @@ bool PeerNetwork<MsgType>::check_new_conn(conn_t conn, uint16_t port) { conn->peer_id.ip = conn->get_addr().ip; conn->peer_id.port = port; } - Peer *p = id2peer.find(conn->peer_id)->second; + auto p = id2peer.find(conn->peer_id)->second.get(); if (p->connected) { if (conn != p->conn) @@ -383,7 +407,7 @@ bool PeerNetwork<MsgType>::check_new_conn(conn_t conn, uint16_t port) { p->reset_ping_timer(); p->send_ping(); if (p->connected) - SALTICIDAE_LOG_INFO("PeerNetwork: established connection with id %s via %s", + SALTICIDAE_LOG_INFO("PeerNetwork: established connection with %s via %s", std::string(conn->peer_id).c_str(), std::string(*conn).c_str()); return false; } @@ -395,7 +419,7 @@ void PeerNetwork<MsgType>::msg_ping(const MsgType &msg, ConnPool::conn_t conn_) msg.parse_ping(port); SALTICIDAE_LOG_INFO("ping from %s, port %u", std::string(*conn).c_str(), ntohs(port)); if (check_new_conn(conn, port)) return; - Peer *p = id2peer.find(conn->peer_id)->second; + auto p = id2peer.find(conn->peer_id)->second.get(); MsgType pong; pong.gen_pong(this->listen_port); send_msg(pong, p); @@ -410,7 +434,7 @@ void PeerNetwork<MsgType>::msg_pong(const MsgType &msg, ConnPool::conn_t conn_) SALTICIDAE_LOG_WARN("pong message discarded"); return; } - Peer *p = it->second; + auto p = it->second.get(); uint16_t port; msg.parse_pong(port); if (check_new_conn(conn, port)) return; @@ -434,9 +458,9 @@ void PeerNetwork<MsgType>::listen(NetAddr listen_addr) { template<typename MsgType> void PeerNetwork<MsgType>::start_active_conn(const NetAddr &addr) { - Peer *p = id2peer.find(addr)->second; + auto p = id2peer.find(addr)->second.get(); if (p->connected) return; - auto conn = static_pointer_cast<Conn>(create_conn(addr)); + auto conn = static_pointer_cast<Conn>(connect(addr)); assert(p->conn == nullptr); p->conn = conn; conn->peer_id = addr; @@ -481,8 +505,10 @@ void MsgNetwork<MsgType>::send_msg(const MsgType &msg, conn_t conn) { std::string(msg).c_str(), std::string(*conn).c_str()); conn->write(std::move(msg_data)); +#ifdef SALTICIDAE_MSG_STAT conn->nsent++; sent_by_opcode.add(msg); +#endif } template<typename MsgType> @@ -500,8 +526,10 @@ void PeerNetwork<MsgType>::send_msg(const MsgType &msg, const Peer *peer) { { SALTICIDAE_LOG_DEBUG("dropped"); } +#ifdef SALTICIDAE_MSG_STAT peer->conn->nsent++; this->sent_by_opcode.add(msg); +#endif } template<typename MsgType> @@ -512,7 +540,7 @@ void PeerNetwork<MsgType>::send_msg(const MsgType &msg, const NetAddr &addr) { SALTICIDAE_LOG_ERROR("sending to non-existing peer: %s", std::string(addr).c_str()); throw PeerNetworkError("peer does not exist"); } - send_msg(msg, it->second); + send_msg(msg, it->second.get()); } template<typename MsgType> diff --git a/src/conn.cpp b/src/conn.cpp index 052f2ad..e600ec9 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -36,10 +36,12 @@ namespace salticidae { ConnPool::Conn::operator std::string() const { - return "<Conn fd=" + std::to_string(fd) + " " + - "addr=" + std::string(addr).c_str() + " " + - "mode=" + ((mode == Conn::ACTIVE) ? "active" : "passive") + - ">"; + DataStream s; + s << "<Conn " + << "fd=" << std::to_string(fd) << " " + << "addr=" << std::string(addr) << " " + << "mode=" << ((mode == Conn::ACTIVE) ? "active" : "passive") << ">"; + return std::move(s); } void ConnPool::Conn::send_data(evutil_socket_t fd, short events) { @@ -150,7 +152,7 @@ void ConnPool::accept_client(evutil_socket_t fd, short) { conn->ev_write.add(); conn->ready_send = false; add_conn(conn); - SALTICIDAE_LOG_INFO("created connection %s", std::string(*conn).c_str()); + SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str()); conn->on_setup(); } ev_listen.add(); @@ -222,7 +224,7 @@ void ConnPool::Conn::terminate() { } } -void ConnPool::Conn::try_conn(evutil_socket_t, short) { +void ConnPool::Conn::try_conn() { auto conn = self(); /* pin the connection */ struct sockaddr_in sockin; memset(&sockin, 0, sizeof(struct sockaddr_in)); @@ -230,7 +232,7 @@ void ConnPool::Conn::try_conn(evutil_socket_t, short) { sockin.sin_addr.s_addr = addr.ip; sockin.sin_port = addr.port; - if (connect(fd, (struct sockaddr *)&sockin, + if (::connect(fd, (struct sockaddr *)&sockin, sizeof(struct sockaddr_in)) < 0 && errno != EINPROGRESS) { SALTICIDAE_LOG_INFO("cannot connect to %s", std::string(addr).c_str()); @@ -242,7 +244,7 @@ void ConnPool::Conn::try_conn(evutil_socket_t, short) { ev_connect.add_with_timeout(cpool->conn_server_timeout); } -ConnPool::conn_t ConnPool::create_conn(const NetAddr &addr) { +ConnPool::conn_t ConnPool::connect(const NetAddr &addr) { int fd; int one = 1; if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) @@ -253,16 +255,14 @@ ConnPool::conn_t ConnPool::create_conn(const NetAddr &addr) { if (fcntl(fd, F_SETFL, O_NONBLOCK) == -1) throw ConnPoolError(std::string("unable to set nonblocking socket")); conn_t conn = create_conn(); - Conn *conn_ptr = conn.get(); conn->seg_buff_size = seg_buff_size; conn->fd = fd; conn->cpool = this; conn->mode = Conn::ACTIVE; conn->addr = addr; - conn->ev_connect = Event(eb, -1, 0, std::bind(&Conn::try_conn, conn_ptr, _1, _2)); - conn->ev_connect.add_with_timeout(gen_conn_timeout()); + conn->try_conn(); add_conn(conn); - SALTICIDAE_LOG_INFO("created connection %s", std::string(*conn).c_str()); + SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str()); return conn; } diff --git a/src/util.cpp b/src/util.cpp index 7125598..7ef01a9 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -220,8 +220,7 @@ size_t Config::parse(int argc, char **argv) { SALTICIDAE_LOG_INFO("loaded configuration from %s", conf_fname.c_str()); size_t nopts = opts.size(); - struct option *longopts = (struct option *)malloc( - sizeof(struct option) * (nopts + 1)); + auto longopts = BoxObj<struct option[]>(new struct option[nopts + 1]); int ind; std::string shortopts; for (size_t i = 0; i < nopts; i++) @@ -238,7 +237,7 @@ size_t Config::parse(int argc, char **argv) { longopts[nopts] = {0, 0, 0, 0}; for (;;) { - int id = getopt_long(argc, argv, shortopts.c_str(), longopts, &ind); + int id = getopt_long(argc, argv, shortopts.c_str(), longopts.get(), &ind); if (id == -1) break; if (id == '?') |