aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/salticidae/conn.h41
-rw-r--r--include/salticidae/msg.h2
-rw-r--r--include/salticidae/netaddr.h6
-rw-r--r--include/salticidae/network.h72
-rw-r--r--src/conn.cpp24
-rw-r--r--src/util.cpp5
6 files changed, 94 insertions, 56 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 0cb721e..1a47904 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -139,13 +139,15 @@ class ConnPoolError: public SalticidaeError {
using SalticidaeError::SalticidaeError;
};
-/** The connection pool. */
+/** Abstraction for connection management. */
class ConnPool {
public:
class Conn;
+ /** The handle to a bi-directional connection. */
using conn_t = RcObj<Conn>;
- /** The abstraction for a bi-directional connection. */
+ /** Abstraction for a bi-directional connection. */
class Conn {
+ friend ConnPool;
public:
enum ConnMode {
ACTIVE, /**< the connection is established by connect() */
@@ -172,38 +174,45 @@ class ConnPool {
void recv_data(evutil_socket_t, short);
void send_data(evutil_socket_t, short);
void conn_server(evutil_socket_t, short);
- void try_conn(evutil_socket_t, short);
+ void try_conn();
public:
- friend ConnPool;
Conn(): self_ref(this) {}
+ Conn(const Conn &) = delete;
+ Conn(Conn &&other) = delete;
virtual ~Conn() {
- SALTICIDAE_LOG_INFO("destroyed connection %s", std::string(*this).c_str());
+ SALTICIDAE_LOG_INFO("destroyed %s", std::string(*this).c_str());
}
+ /** Get the handle to itself. */
conn_t self() { return self_ref; }
operator std::string() const;
int get_fd() const { return fd; }
const NetAddr &get_addr() const { return addr; }
ConnMode get_mode() const { return mode; }
RingBuffer &read() { return recv_buffer; }
+ /** Set the buffer size used for send/receive data. */
void set_seg_buff_size(size_t size) { seg_buff_size = size; }
+ /** Write data to the connection (non-blocking). The data will be sent
+ * whenever I/O is available. */
void write(bytearray_t &&data) {
send_buffer.push(std::move(data));
if (ready_send)
send_data(fd, EV_WRITE);
}
+ /** Move the send buffer from the other (old) connection. */
void move_send_buffer(conn_t other) {
send_buffer = std::move(other->send_buffer);
}
+ /** Terminate the connection. */
void terminate();
protected:
- /** close the connection and free all on-going or planned events. */
+ /** Close the IO and clear all on-going or planned events. */
virtual void close() {
ev_read.clear();
ev_write.clear();
@@ -212,16 +221,19 @@ class ConnPool {
fd = -1;
}
+ /** Called when new data is available. */
virtual void on_read() = 0;
+ /** Called when the underlying connection is established. */
virtual void on_setup() = 0;
+ /** Called when the underlying connection breaks. */
virtual void on_teardown() = 0;
};
private:
int max_listen_backlog;
- double try_conn_delay;
double conn_server_timeout;
size_t seg_buff_size;
+
std::unordered_map<int, conn_t> pool;
int listen_fd;
Event ev_listen;
@@ -231,20 +243,15 @@ class ConnPool {
protected:
EventContext eb;
+ /** Should be implemented by derived class to return a new Conn object. */
virtual conn_t create_conn() = 0;
- virtual double gen_conn_timeout() {
- return gen_rand_timeout(try_conn_delay);
- }
public:
- friend Conn;
ConnPool(const EventContext &eb,
int max_listen_backlog = 10,
- double try_conn_delay = 2,
double conn_server_timeout = 2,
size_t seg_buff_size = 4096):
max_listen_backlog(max_listen_backlog),
- try_conn_delay(try_conn_delay),
conn_server_timeout(conn_server_timeout),
seg_buff_size(seg_buff_size),
eb(eb) {}
@@ -260,9 +267,11 @@ class ConnPool {
ConnPool(const ConnPool &) = delete;
ConnPool(ConnPool &&) = delete;
- /** create an active mode connection to addr */
- conn_t create_conn(const NetAddr &addr);
- /** setup and start listening */
+ /** Actively connect to remote addr. */
+ conn_t connect(const NetAddr &addr);
+ /** Listen for passive connections (connection initiated from remote).
+ * Does not need to be called if do not want to accept any passive
+ * connections. */
void listen(NetAddr listen_addr);
};
diff --git a/include/salticidae/msg.h b/include/salticidae/msg.h
index 798062a..33f0d2b 100644
--- a/include/salticidae/msg.h
+++ b/include/salticidae/msg.h
@@ -160,7 +160,7 @@ class MsgBase {
<< "length=" << std::to_string(length) << " "
<< "checksum=" << get_hex(checksum) << " "
<< "payload=" << get_hex(payload) << ">";
- return std::string(std::move(s));
+ return std::move(s);
}
uint32_t get_checksum() const {
diff --git a/include/salticidae/netaddr.h b/include/salticidae/netaddr.h
index eabb5da..5639c1b 100644
--- a/include/salticidae/netaddr.h
+++ b/include/salticidae/netaddr.h
@@ -92,8 +92,10 @@ struct NetAddr {
operator std::string() const {
struct in_addr in;
in.s_addr = ip;
- return "<NetAddr " + std::string(inet_ntoa(in)) +
- ":" + std::to_string(ntohs(port)) + ">";
+ DataStream s;
+ s << "<NetAddr " << std::string(inet_ntoa(in))
+ << ":" << std::to_string(ntohs(port)) << ">";
+ return std::move(s);
}
bool is_null() const { return ip == 0 && port == 0; }
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 93b5766..7b05bdb 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -46,16 +46,26 @@ class MsgNetwork: public ConnPool {
MsgNetwork *mn;
protected:
+#ifdef SALTICIDAE_MSG_STAT
mutable size_t nsent;
mutable size_t nrecv;
+#endif
public:
friend MsgNetwork;
- Conn(MsgNetwork *mn): msg_state(HEADER), mn(mn), nsent(0), nrecv(0) {}
+ Conn(MsgNetwork *mn):
+ msg_state(HEADER), mn(mn)
+#ifdef SALTICIDAE_MSG_STAT
+ , nsent(0), nrecv(0)
+#endif
+ {}
+
+#ifdef SALTICIDAE_MSG_STAT
size_t get_nsent() const { return nsent; }
size_t get_nrecv() const { return nrecv; }
void clear_nsent() const { nsent = 0; }
void clear_nrecv() const { nrecv = 0; }
+#endif
protected:
void on_read() override;
@@ -65,6 +75,7 @@ class MsgNetwork: public ConnPool {
using conn_t = RcObj<Conn>;
using msg_callback_t = std::function<void(const MsgType &msg, conn_t conn)>;
+#ifdef SALTICIDAE_MSG_STAT
class msg_stat_by_opcode_t:
public std::unordered_map<typename MsgType::opcode_t,
std::pair<uint32_t, size_t>> {
@@ -75,39 +86,42 @@ class MsgNetwork: public ConnPool {
p.second += msg.get_length();
}
};
+#endif
private:
std::unordered_map<typename MsgType::opcode_t,
msg_callback_t> handler_map;
protected:
+#ifdef SALTICIDAE_MSG_STAT
mutable msg_stat_by_opcode_t sent_by_opcode;
mutable msg_stat_by_opcode_t recv_by_opcode;
+#endif
ConnPool::conn_t create_conn() override { return (new Conn(this))->self(); }
public:
MsgNetwork(const EventContext &eb,
int max_listen_backlog,
- double try_conn_delay,
double conn_server_timeout,
size_t seg_buff_size):
ConnPool(eb, max_listen_backlog,
- try_conn_delay,
conn_server_timeout,
seg_buff_size) {}
void reg_handler(typename MsgType::opcode_t opcode, msg_callback_t handler);
void send_msg(const MsgType &msg, conn_t conn);
using ConnPool::listen;
+#ifdef SALTICIDAE_MSG_STAT
msg_stat_by_opcode_t &get_sent_by_opcode() const {
return sent_by_opcode;
}
msg_stat_by_opcode_t &get_recv_by_opcode() const {
return recv_by_opcode;
}
- conn_t create_conn(const NetAddr &addr) {
- return static_pointer_cast<Conn>(ConnPool::create_conn(addr));
+#endif
+ conn_t connect(const NetAddr &addr) {
+ return static_pointer_cast<Conn>(ConnPool::connect(addr));
}
};
@@ -139,16 +153,14 @@ class ClientNetwork: public MsgNetwork<MsgType> {
public:
ClientNetwork(const EventContext &eb,
int max_listen_backlog = 10,
- double try_conn_delay = 0,
double conn_server_timeout = 0,
size_t seg_buff_size = 4096):
MsgNet(eb, max_listen_backlog,
- try_conn_delay,
conn_server_timeout,
seg_buff_size) {}
void send_msg(const MsgType &msg, const NetAddr &addr);
- conn_t create_conn(const NetAddr &addr) = delete;
+ conn_t connect(const NetAddr &addr) = delete;
};
class PeerNetworkError: public SalticidaeError {
@@ -198,6 +210,7 @@ class PeerNetwork: public MsgNetwork<MsgType> {
conn_t conn;
PeerNetwork *pn;
Event ev_ping_timer;
+ Event ev_retry_timer;
bool ping_timer_ok;
bool pong_msg_ok;
bool connected;
@@ -222,10 +235,11 @@ class PeerNetwork: public MsgNetwork<MsgType> {
void reset_conn(conn_t conn);
};
- std::unordered_map <NetAddr, Peer *> id2peer;
+ std::unordered_map <NetAddr, BoxObj<Peer>> id2peer;
std::vector<NetAddr> peer_list;
IdentityMode id_mode;
+ double retry_conn_delay;
double ping_period;
double conn_timeout;
uint16_t listen_port;
@@ -238,21 +252,24 @@ class PeerNetwork: public MsgNetwork<MsgType> {
protected:
ConnPool::conn_t create_conn() override { return (new Conn(this))->self(); }
+ virtual double gen_conn_timeout() {
+ return gen_rand_timeout(retry_conn_delay);
+ }
public:
PeerNetwork(const EventContext &eb,
int max_listen_backlog = 10,
- double try_conn_delay = 2,
+ double retry_conn_delay = 2,
double conn_server_timeout = 2,
size_t seg_buff_size = 4096,
double ping_period = 30,
double conn_timeout = 180,
IdentityMode id_mode = IP_PORT_BASED):
MsgNet(eb, max_listen_backlog,
- try_conn_delay,
conn_server_timeout,
seg_buff_size),
id_mode(id_mode),
+ retry_conn_delay(retry_conn_delay),
ping_period(ping_period),
conn_timeout(conn_timeout) {}
@@ -263,8 +280,8 @@ class PeerNetwork: public MsgNetwork<MsgType> {
void listen(NetAddr listen_addr);
bool has_peer(const NetAddr &paddr) const;
const std::vector<NetAddr> &all_peers() const;
- conn_t create_conn(const NetAddr &addr) {
- return static_pointer_cast<Conn>(ConnPool::create_conn(addr));
+ conn_t connect(const NetAddr &addr) {
+ return static_pointer_cast<Conn>(ConnPool::connect(addr));
}
};
@@ -305,8 +322,10 @@ void MsgNetwork<MsgType>::Conn::on_read() {
std::string(msg).c_str(),
std::string(*this).c_str());
it->second(msg, conn);
+#ifdef SALTICIDAE_MSG_STAT
nrecv++;
mn->recv_by_opcode.add(msg);
+#endif
}
}
}
@@ -348,7 +367,7 @@ template<typename MsgType>
void PeerNetwork<MsgType>::Conn::on_teardown() {
auto it = pn->id2peer.find(peer_id);
if (it == pn->id2peer.end()) return;
- Peer *p = it->second;
+ auto p = it->second.get();
if (this != p->conn.get()) return;
p->ev_ping_timer.del();
p->connected = false;
@@ -356,7 +375,12 @@ void PeerNetwork<MsgType>::Conn::on_teardown() {
SALTICIDAE_LOG_INFO("connection lost %s for %s",
std::string(*this).c_str(),
std::string(peer_id).c_str());
- pn->start_active_conn(peer_id);
+ p->ev_retry_timer = Event(pn->eb, -1, 0,
+ [pn = this->pn,
+ peer_id = this->peer_id](evutil_socket_t, short) {
+ pn->start_active_conn(peer_id);
+ });
+ p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout());
}
template<typename MsgType>
@@ -368,7 +392,7 @@ bool PeerNetwork<MsgType>::check_new_conn(conn_t conn, uint16_t port) {
conn->peer_id.ip = conn->get_addr().ip;
conn->peer_id.port = port;
}
- Peer *p = id2peer.find(conn->peer_id)->second;
+ auto p = id2peer.find(conn->peer_id)->second.get();
if (p->connected)
{
if (conn != p->conn)
@@ -383,7 +407,7 @@ bool PeerNetwork<MsgType>::check_new_conn(conn_t conn, uint16_t port) {
p->reset_ping_timer();
p->send_ping();
if (p->connected)
- SALTICIDAE_LOG_INFO("PeerNetwork: established connection with id %s via %s",
+ SALTICIDAE_LOG_INFO("PeerNetwork: established connection with %s via %s",
std::string(conn->peer_id).c_str(), std::string(*conn).c_str());
return false;
}
@@ -395,7 +419,7 @@ void PeerNetwork<MsgType>::msg_ping(const MsgType &msg, ConnPool::conn_t conn_)
msg.parse_ping(port);
SALTICIDAE_LOG_INFO("ping from %s, port %u", std::string(*conn).c_str(), ntohs(port));
if (check_new_conn(conn, port)) return;
- Peer *p = id2peer.find(conn->peer_id)->second;
+ auto p = id2peer.find(conn->peer_id)->second.get();
MsgType pong;
pong.gen_pong(this->listen_port);
send_msg(pong, p);
@@ -410,7 +434,7 @@ void PeerNetwork<MsgType>::msg_pong(const MsgType &msg, ConnPool::conn_t conn_)
SALTICIDAE_LOG_WARN("pong message discarded");
return;
}
- Peer *p = it->second;
+ auto p = it->second.get();
uint16_t port;
msg.parse_pong(port);
if (check_new_conn(conn, port)) return;
@@ -434,9 +458,9 @@ void PeerNetwork<MsgType>::listen(NetAddr listen_addr) {
template<typename MsgType>
void PeerNetwork<MsgType>::start_active_conn(const NetAddr &addr) {
- Peer *p = id2peer.find(addr)->second;
+ auto p = id2peer.find(addr)->second.get();
if (p->connected) return;
- auto conn = static_pointer_cast<Conn>(create_conn(addr));
+ auto conn = static_pointer_cast<Conn>(connect(addr));
assert(p->conn == nullptr);
p->conn = conn;
conn->peer_id = addr;
@@ -481,8 +505,10 @@ void MsgNetwork<MsgType>::send_msg(const MsgType &msg, conn_t conn) {
std::string(msg).c_str(),
std::string(*conn).c_str());
conn->write(std::move(msg_data));
+#ifdef SALTICIDAE_MSG_STAT
conn->nsent++;
sent_by_opcode.add(msg);
+#endif
}
template<typename MsgType>
@@ -500,8 +526,10 @@ void PeerNetwork<MsgType>::send_msg(const MsgType &msg, const Peer *peer) {
{
SALTICIDAE_LOG_DEBUG("dropped");
}
+#ifdef SALTICIDAE_MSG_STAT
peer->conn->nsent++;
this->sent_by_opcode.add(msg);
+#endif
}
template<typename MsgType>
@@ -512,7 +540,7 @@ void PeerNetwork<MsgType>::send_msg(const MsgType &msg, const NetAddr &addr) {
SALTICIDAE_LOG_ERROR("sending to non-existing peer: %s", std::string(addr).c_str());
throw PeerNetworkError("peer does not exist");
}
- send_msg(msg, it->second);
+ send_msg(msg, it->second.get());
}
template<typename MsgType>
diff --git a/src/conn.cpp b/src/conn.cpp
index 052f2ad..e600ec9 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -36,10 +36,12 @@
namespace salticidae {
ConnPool::Conn::operator std::string() const {
- return "<Conn fd=" + std::to_string(fd) + " " +
- "addr=" + std::string(addr).c_str() + " " +
- "mode=" + ((mode == Conn::ACTIVE) ? "active" : "passive") +
- ">";
+ DataStream s;
+ s << "<Conn "
+ << "fd=" << std::to_string(fd) << " "
+ << "addr=" << std::string(addr) << " "
+ << "mode=" << ((mode == Conn::ACTIVE) ? "active" : "passive") << ">";
+ return std::move(s);
}
void ConnPool::Conn::send_data(evutil_socket_t fd, short events) {
@@ -150,7 +152,7 @@ void ConnPool::accept_client(evutil_socket_t fd, short) {
conn->ev_write.add();
conn->ready_send = false;
add_conn(conn);
- SALTICIDAE_LOG_INFO("created connection %s", std::string(*conn).c_str());
+ SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str());
conn->on_setup();
}
ev_listen.add();
@@ -222,7 +224,7 @@ void ConnPool::Conn::terminate() {
}
}
-void ConnPool::Conn::try_conn(evutil_socket_t, short) {
+void ConnPool::Conn::try_conn() {
auto conn = self(); /* pin the connection */
struct sockaddr_in sockin;
memset(&sockin, 0, sizeof(struct sockaddr_in));
@@ -230,7 +232,7 @@ void ConnPool::Conn::try_conn(evutil_socket_t, short) {
sockin.sin_addr.s_addr = addr.ip;
sockin.sin_port = addr.port;
- if (connect(fd, (struct sockaddr *)&sockin,
+ if (::connect(fd, (struct sockaddr *)&sockin,
sizeof(struct sockaddr_in)) < 0 && errno != EINPROGRESS)
{
SALTICIDAE_LOG_INFO("cannot connect to %s", std::string(addr).c_str());
@@ -242,7 +244,7 @@ void ConnPool::Conn::try_conn(evutil_socket_t, short) {
ev_connect.add_with_timeout(cpool->conn_server_timeout);
}
-ConnPool::conn_t ConnPool::create_conn(const NetAddr &addr) {
+ConnPool::conn_t ConnPool::connect(const NetAddr &addr) {
int fd;
int one = 1;
if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
@@ -253,16 +255,14 @@ ConnPool::conn_t ConnPool::create_conn(const NetAddr &addr) {
if (fcntl(fd, F_SETFL, O_NONBLOCK) == -1)
throw ConnPoolError(std::string("unable to set nonblocking socket"));
conn_t conn = create_conn();
- Conn *conn_ptr = conn.get();
conn->seg_buff_size = seg_buff_size;
conn->fd = fd;
conn->cpool = this;
conn->mode = Conn::ACTIVE;
conn->addr = addr;
- conn->ev_connect = Event(eb, -1, 0, std::bind(&Conn::try_conn, conn_ptr, _1, _2));
- conn->ev_connect.add_with_timeout(gen_conn_timeout());
+ conn->try_conn();
add_conn(conn);
- SALTICIDAE_LOG_INFO("created connection %s", std::string(*conn).c_str());
+ SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str());
return conn;
}
diff --git a/src/util.cpp b/src/util.cpp
index 7125598..7ef01a9 100644
--- a/src/util.cpp
+++ b/src/util.cpp
@@ -220,8 +220,7 @@ size_t Config::parse(int argc, char **argv) {
SALTICIDAE_LOG_INFO("loaded configuration from %s", conf_fname.c_str());
size_t nopts = opts.size();
- struct option *longopts = (struct option *)malloc(
- sizeof(struct option) * (nopts + 1));
+ auto longopts = BoxObj<struct option[]>(new struct option[nopts + 1]);
int ind;
std::string shortopts;
for (size_t i = 0; i < nopts; i++)
@@ -238,7 +237,7 @@ size_t Config::parse(int argc, char **argv) {
longopts[nopts] = {0, 0, 0, 0};
for (;;)
{
- int id = getopt_long(argc, argv, shortopts.c_str(), longopts, &ind);
+ int id = getopt_long(argc, argv, shortopts.c_str(), longopts.get(), &ind);
if (id == -1)
break;
if (id == '?')