aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/salticidae/conn.h75
-rw-r--r--include/salticidae/network.h278
-rw-r--r--include/salticidae/type.h3
-rw-r--r--src/conn.cpp36
-rw-r--r--test/bench_network.cpp9
-rw-r--r--test/test_network.cpp4
6 files changed, 211 insertions, 194 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 26d19fe..1364d4d 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -61,7 +61,7 @@ class ConnPool {
/** The handle to a bi-directional connection. */
using conn_t = ArcObj<Conn>;
/** The type of callback invoked when connection status is changed. */
- using conn_callback_t = std::function<void(Conn &)>;
+ using conn_callback_t = std::function<void(Conn &, bool)>;
/** Abstraction for a bi-directional connection. */
class Conn {
@@ -72,7 +72,7 @@ class ConnPool {
PASSIVE, /**< the connection is established by accept() */
};
- private:
+ protected:
size_t seg_buff_size;
conn_t self_ref;
int fd;
@@ -109,14 +109,10 @@ class ConnPool {
/** Get the handle to itself. */
conn_t self() { return self_ref; }
operator std::string() const;
- int get_fd() const { return fd; }
const NetAddr &get_addr() const { return addr; }
ConnMode get_mode() const { return mode; }
ConnPool *get_pool() const { return cpool; }
- SegBuffer &get_recv_buffer() { return recv_buffer; }
MPSCWriteBuffer &get_send_buffer() { return send_buffer; }
- /** Set the buffer size used for send/receive data. */
- void set_seg_buff_size(size_t size) { seg_buff_size = size; }
/** Write data to the connection (non-blocking). The data will be sent
* whenever I/O is available. */
@@ -124,11 +120,6 @@ class ConnPool {
send_buffer.push(std::move(data));
}
- ///** Move the send buffer from the other (old) connection. */
- //void move_send_buffer(conn_t other) {
- // send_buffer = std::move(other->send_buffer);
- //}
-
protected:
/** Close the IO and clear all on-going or planned events. */
virtual void on_close() {
@@ -143,13 +134,9 @@ class ConnPool {
/** Called when new data is available. */
virtual void on_read() {}
/** Called when the underlying connection is established. */
- virtual void on_setup() {
- cpool->update_conn(self());
- }
+ virtual void on_setup() {}
/** Called when the underlying connection breaks. */
- virtual void on_teardown() {
- cpool->update_conn(self());
- }
+ virtual void on_teardown() {}
};
private:
@@ -165,14 +152,13 @@ class ConnPool {
/* owned by the dispatcher */
std::unordered_map<int, conn_t> pool;
int listen_fd; /**< for accepting new network connections */
- int dlisten_fd[2]; /**< for control command sent to the dispatcher */
Event ev_listen;
Event ev_dlisten;
std::mutex cp_mlock;
- void update_conn(const conn_t &conn) {
- auto ptr = new conn_t(conn);
- write(mlisten_fd[1], &ptr, sizeof(ptr));
+ void update_conn(const conn_t &conn, bool connected) {
+ auto dcmd = new UserConn(conn, connected);
+ write(mlisten_fd[1], &dcmd, sizeof(dcmd));
}
struct Worker;
@@ -269,31 +255,22 @@ class ConnPool {
void accept_client(evutil_socket_t, short);
conn_t add_conn(const conn_t &conn);
conn_t _connect(const NetAddr &addr);
- void _listen(NetAddr listen_addr);
void _post_terminate(int fd);
+ protected:
class DispatchCmd {
public:
virtual ~DispatchCmd() = default;
virtual void exec(ConnPool *cpool) = 0;
};
- // TODO: the following two are untested
- class DspListen: public DispatchCmd {
- const NetAddr addr;
- public:
- DspListen(const NetAddr &addr): addr(addr) {}
- void exec(ConnPool *cpool) override {
- cpool->_listen(addr);
- }
- };
-
+ private:
class DspConnect: public DispatchCmd {
const NetAddr addr;
public:
DspConnect(const NetAddr &addr): addr(addr) {}
void exec(ConnPool *cpool) override {
- cpool->update_conn(cpool->_connect(addr));
+ cpool->update_conn(cpool->_connect(addr), true);
}
};
@@ -318,6 +295,18 @@ class ConnPool {
}
};
+ class UserConn: public DispatchCmd {
+ conn_t conn;
+ bool connected;
+ public:
+ UserConn(const conn_t &conn, bool connected):
+ conn(conn), connected(connected) {}
+ void exec(ConnPool *cpool) override {
+ if (cpool->conn_cb)
+ cpool->conn_cb(*conn, connected);
+ }
+ };
+
void post_terminate(int fd) {
auto dcmd = new DspPostTerm(fd);
write(dlisten_fd[1], &dcmd, sizeof(dcmd));
@@ -330,6 +319,7 @@ class ConnPool {
protected:
EventContext ec;
EventContext dispatcher_ec;
+ int dlisten_fd[2]; /**< for control command sent to the dispatcher */
std::mutex dsp_ec_mlock;
/** Should be implemented by derived class to return a new Conn object. */
virtual Conn *create_conn() = 0;
@@ -352,11 +342,10 @@ class ConnPool {
throw ConnPoolError(std::string("failed to create dispatcher pipe"));
ev_mlisten = Event(ec, mlisten_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) {
- conn_t *conn_ptr;
- read(fd, &conn_ptr, sizeof(conn_ptr));
- if (conn_cb)
- conn_cb(**conn_ptr);
- delete conn_ptr;
+ DispatchCmd *dcmd;
+ read(fd, &dcmd, sizeof(dcmd));
+ dcmd->exec(this);
+ delete dcmd;
});
ev_mlisten.add();
@@ -414,15 +403,7 @@ class ConnPool {
/** Listen for passive connections (connection initiated from remote).
* Does not need to be called if do not want to accept any passive
* connections. */
- void listen(NetAddr listen_addr, bool blocking = true) {
- if (blocking)
- _listen(listen_addr);
- else
- {
- auto dcmd = new DspListen(listen_addr);
- write(dlisten_fd[1], &dcmd, sizeof(dcmd));
- }
- }
+ void listen(NetAddr listen_addr);
template<typename Func>
void reg_conn_handler(Func cb) { conn_cb = cb; }
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index d82772f..8f8940b 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -77,8 +77,8 @@ class MsgNetwork: public ConnPool {
protected:
#ifdef SALTICIDAE_MSG_STAT
- mutable size_t nsent;
- mutable size_t nrecv;
+ mutable std::atomic<size_t> nsent;
+ mutable std::atomic<size_t> nrecv;
#endif
public:
@@ -107,7 +107,8 @@ class MsgNetwork: public ConnPool {
#ifdef SALTICIDAE_MSG_STAT
class msg_stat_by_opcode_t:
public std::unordered_map<typename Msg::opcode_t,
- std::pair<uint32_t, size_t>> {
+ std::pair<std::atomic<uint32_t>,
+ std::atomic<size_t>>> {
public:
void add(const Msg &msg) {
auto &p = this->operator[](msg.get_opcode());
@@ -259,6 +260,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
friend PeerNetwork;
NetAddr peer_id;
Event ev_timeout;
+ void reset_timeout(double timeout);
public:
Conn() = default;
@@ -292,6 +294,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
bool ping_timer_ok;
bool pong_msg_ok;
bool connected;
+ std::mutex mlock;
Peer() = delete;
Peer(NetAddr addr, conn_t conn, const EventContext &ec):
@@ -314,7 +317,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
};
std::unordered_map <NetAddr, BoxObj<Peer>> id2peer;
- std::vector<NetAddr> peer_list;
+ std::mutex pn_mlock;
IdentityMode id_mode;
double retry_conn_delay;
@@ -350,7 +353,6 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
void msg_ping(MsgPing &&msg, Conn &conn);
void msg_pong(MsgPong &&msg, Conn &conn);
- void reset_conn_timeout(Conn &conn);
bool check_new_conn(Conn &conn, uint16_t port);
void start_active_conn(const NetAddr &paddr);
@@ -380,12 +382,11 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
void add_peer(const NetAddr &paddr);
const conn_t get_peer_conn(const NetAddr &paddr) const;
template<typename MsgType>
- void send_msg(const MsgType &msg, const Peer *peer);
+ void _send_msg(const MsgType &msg, const Peer *peer);
template<typename MsgType>
void send_msg(const MsgType &msg, const NetAddr &paddr);
void listen(NetAddr listen_addr);
bool has_peer(const NetAddr &paddr) const;
- const std::vector<NetAddr> &all_peers() const;
conn_t connect(const NetAddr &addr) {
return static_pointer_cast<Conn>(ConnPool::connect(addr));
}
@@ -395,9 +396,9 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
template<typename OpcodeType>
void MsgNetwork<OpcodeType>::Conn::on_read() {
ConnPool::Conn::on_read();
- auto &recv_buffer = get_recv_buffer();
+ auto &recv_buffer = this->recv_buffer;
auto mn = get_net();
- while (get_fd() != -1)
+ while (fd != -1)
{
if (msg_state == Conn::HEADER)
{
@@ -426,23 +427,22 @@ void MsgNetwork<OpcodeType>::Conn::on_read() {
}
}
-template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) {
- if (conn != new_conn)
- {
- if (conn)
- {
- //SALTICIDAE_LOG_DEBUG("moving send buffer");
- //new_conn->move_send_buffer(conn);
- SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str());
- conn->terminate();
- }
- addr = new_conn->get_addr();
- conn = new_conn;
- }
- clear_all_events();
+template<typename OpcodeType>
+template<typename MsgType>
+void MsgNetwork<OpcodeType>::send_msg(const MsgType &_msg, Conn &conn) {
+ Msg msg(_msg);
+ bytearray_t msg_data = msg.serialize();
+ SALTICIDAE_LOG_DEBUG("wrote message %s to %s",
+ std::string(msg).c_str(),
+ std::string(conn).c_str());
+ conn.write(std::move(msg_data));
+#ifdef SALTICIDAE_MSG_STAT
+ conn.nsent++;
+ sent_by_opcode.add(msg);
+#endif
}
+/* begin: functions invoked by the dispatcher */
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::Conn::on_setup() {
MsgNet::Conn::on_setup();
@@ -452,9 +452,14 @@ void PeerNetwork<O, _, __>::Conn::on_setup() {
SALTICIDAE_LOG_INFO("peer ping-pong timeout");
this->terminate();
});
+ if (this->get_mode() == Conn::ConnMode::ACTIVE)
+ {
+ peer_id = this->get_addr();
+ if (id_mode == IP_BASED) peer_id.port = 0;
+ }
/* the initial ping-pong to set up the connection */
auto &conn = static_cast<Conn &>(*this);
- pn->reset_conn_timeout(conn);
+ conn->reset_timeout(pn->conn_timeout);
pn->MsgNet::send_msg(MsgPing(pn->listen_port), conn);
}
@@ -462,9 +467,11 @@ template<typename O, O _, O __>
void PeerNetwork<O, _, __>::Conn::on_teardown() {
MsgNet::Conn::on_teardown();
auto pn = get_net();
+ mutex_lg_t _pn_lg(pn->pn_mlock);
auto it = pn->id2peer.find(peer_id);
if (it == pn->id2peer.end()) return;
auto p = it->second.get();
+ mutex_lg_t _p_lg(p->mlock);
if (this != p->conn.get()) return;
p->ev_ping_timer.del();
p->connected = false;
@@ -472,14 +479,69 @@ void PeerNetwork<O, _, __>::Conn::on_teardown() {
SALTICIDAE_LOG_INFO("connection lost %s for %s",
std::string(*this).c_str(),
std::string(peer_id).c_str());
- p->ev_retry_timer = Event(pn->ec, -1, 0,
+ // try to reconnect
+ p->ev_retry_timer = Event(pn->dispatcher_ec, -1, 0,
[pn, peer_id = this->peer_id](evutil_socket_t, short) {
+ mutex_lg_t _pn_lg(pn->pn_mlock);
pn->start_active_conn(peer_id);
});
p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout());
}
template<typename O, O _, O __>
+void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) {
+ if (conn != new_conn)
+ {
+ if (conn)
+ {
+ //SALTICIDAE_LOG_DEBUG("moving send buffer");
+ //new_conn->move_send_buffer(conn);
+ SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str());
+ conn->terminate();
+ }
+ addr = new_conn->get_addr();
+ conn = new_conn;
+ }
+ clear_all_events();
+}
+
+template<typename O, O _, O __>
+void PeerNetwork<O, _, __>::Conn::reset_timeout(double timeout) {
+ assert(ev_timeout);
+ ev_timeout.del();
+ ev_timeout.add_with_timeout(timeout);
+ SALTICIDAE_LOG_INFO("reset timeout %.2f", timeout);
+}
+
+template<typename O, O _, O __>
+void PeerNetwork<O, _, __>::Peer::reset_ping_timer() {
+ assert(ev_ping_timer);
+ ev_ping_timer.del();
+ ev_ping_timer.add_with_timeout(
+ gen_rand_timeout(conn->get_net()->ping_period));
+}
+
+template<typename O, O _, O __>
+void PeerNetwork<O, _, __>::Peer::send_ping() {
+ auto pn = conn->get_net();
+ ping_timer_ok = false;
+ pong_msg_ok = false;
+ conn->reset_timeout(pn->conn_timeout);
+ pn->_send_msg(MsgPing(pn->listen_port), this);
+}
+
+template<typename O, O _, O __>
+void PeerNetwork<O, _, __>::Peer::ping_timer(evutil_socket_t, short) {
+ mutex_lg_t _p_lg(mlock);
+ ping_timer_ok = true;
+ if (pong_msg_ok)
+ {
+ reset_ping_timer();
+ send_ping();
+ }
+}
+
+template<typename O, O _, O __>
bool PeerNetwork<O, _, __>::check_new_conn(Conn &conn, uint16_t port) {
if (conn.peer_id.is_null())
{ /* passive connections can eventually have ids after getting the port
@@ -489,6 +551,7 @@ bool PeerNetwork<O, _, __>::check_new_conn(Conn &conn, uint16_t port) {
conn.peer_id.port = port;
}
auto p = id2peer.find(conn.peer_id)->second.get();
+ mutex_lg_t _p_lg(p->mlock);
if (p->connected)
{
if (conn.self() != p->conn)
@@ -509,31 +572,77 @@ bool PeerNetwork<O, _, __>::check_new_conn(Conn &conn, uint16_t port) {
}
template<typename O, O _, O __>
+class PeerNetworkPing: public ConnPool::DispatchCmd {
+ using conn_t = typename PeerNetwork<O, _, __>::conn_t;
+ conn_t conn;
+ uint16_t port;
+ public:
+ PeerNetworkPing(const conn_t &conn, uint16_t port):
+ conn(conn), port(port) {}
+ void exec(ConnPool *cpool) override {
+ auto pn = static_cast<PeerNetwork<O, _, __> *>(cpool);
+ mutex_lg_t _pn_lg(pn->pn_mlock);
+ if (pn->check_new_conn(conn, port)) return;
+ auto p = pn->id2peer.find(conn.peer_id)->second.get();
+ mutex_lg_t _p_lg(p->mlock);
+ pn->_send_msg(MsgPong(this->listen_port), p);
+ }
+};
+
+template<typename O, O _, O __>
+class PeerNetworkPong: public ConnPool::DispatchCmd {
+ using conn_t = typename PeerNetwork<O, _, __>::conn_t;
+ conn_t conn;
+ uint16_t port;
+ public:
+ PeerNetworkPong(const conn_t &conn, uint16_t port):
+ conn(conn), port(port) {}
+ void exec(ConnPool *cpool) override {
+ auto pn = static_cast<PeerNetwork<O, _, __> *>(cpool);
+ mutex_lg_t _pn_lg(pn->pn_mlock);
+ auto it = pn->id2peer.find(conn->peer_id);
+ if (it == pn->id2peer.end())
+ {
+ SALTICIDAE_LOG_WARN("pong message discarded");
+ return;
+ }
+ if (pn->check_new_conn(conn, port)) return;
+ auto p = it->second.get();
+ mutex_lg_t _p_lg(p->mlock);
+ p->pong_msg_ok = true;
+ if (p->ping_timer_ok)
+ {
+ p->reset_ping_timer();
+ p->send_ping();
+ }
+ }
+};
+/* end: functions invoked by the dispatcher */
+
+/* this function could be both invoked by the dispatcher and the user loop */
+template<typename O, O _, O __>
+void PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) {
+ auto p = id2peer.find(addr)->second.get();
+ mutex_lg_t _p_lg(p->mlock);
+ if (p->connected) return;
+ auto conn = static_pointer_cast<Conn>(connect(addr));
+ assert(p->conn == nullptr);
+ p->conn = conn;
+}
+
+/* begin: functions invoked by the user loop */
+template<typename O, O _, O __>
void PeerNetwork<O, _, __>::msg_ping(MsgPing &&msg, Conn &conn) {
uint16_t port = msg.port;
SALTICIDAE_LOG_INFO("ping from %s, port %u", std::string(conn).c_str(), ntohs(port));
- if (check_new_conn(conn, port)) return;
- auto p = id2peer.find(conn.peer_id)->second.get();
- send_msg(MsgPong(this->listen_port), p);
+ auto dcmd = new PeerNetworkPing<O, _, __>(conn.self(), port);
+ write(this->dlisten_fd[1], &dcmd, sizeof(dcmd));
}
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::msg_pong(MsgPong &&msg, Conn &conn) {
- auto it = id2peer.find(conn.peer_id);
- if (it == id2peer.end())
- {
- SALTICIDAE_LOG_WARN("pong message discarded");
- return;
- }
- auto p = it->second.get();
- uint16_t port = msg.port;
- if (check_new_conn(conn, port)) return;
- p->pong_msg_ok = true;
- if (p->ping_timer_ok)
- {
- p->reset_ping_timer();
- p->send_ping();
- }
+ auto dcmd = new PeerNetworkPong<O, _, __>(conn.self(), msg.port);
+ write(this->dlisten_fd[1], &dcmd, sizeof(dcmd));
}
template<typename O, O _, O __>
@@ -545,30 +654,19 @@ void PeerNetwork<O, _, __>::listen(NetAddr listen_addr) {
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::start_active_conn(const NetAddr &addr) {
- auto p = id2peer.find(addr)->second.get();
- if (p->connected) return;
- auto conn = static_pointer_cast<Conn>(connect(addr));
- assert(p->conn == nullptr);
- p->conn = conn;
- conn->peer_id = addr;
- if (id_mode == IP_BASED)
- conn->peer_id.port = 0;
-}
-
-template<typename O, O _, O __>
void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
+ mutex_lg_t _pn_lg(pn_mlock);
auto it = id2peer.find(addr);
if (it != id2peer.end())
throw PeerNetworkError("peer already exists");
- id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->ec)));
- peer_list.push_back(addr);
+ id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->dispatcher_ec)));
start_active_conn(addr);
}
template<typename O, O _, O __>
const typename PeerNetwork<O, _, __>::conn_t
PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &paddr) const {
+ mutex_lg_t _pn_lg(pn_mlock);
auto it = id2peer.find(paddr);
if (it == id2peer.end())
throw PeerNetworkError("peer does not exist");
@@ -577,27 +675,13 @@ PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &paddr) const {
template<typename O, O _, O __>
bool PeerNetwork<O, _, __>::has_peer(const NetAddr &paddr) const {
+ mutex_lg_t _pn_lg(pn_mlock);
return id2peer.count(paddr);
}
-template<typename OpcodeType>
-template<typename MsgType>
-void MsgNetwork<OpcodeType>::send_msg(const MsgType &_msg, Conn &conn) {
- Msg msg(_msg);
- bytearray_t msg_data = msg.serialize();
- SALTICIDAE_LOG_DEBUG("wrote message %s to %s",
- std::string(msg).c_str(),
- std::string(conn).c_str());
- conn.write(std::move(msg_data));
-#ifdef SALTICIDAE_MSG_STAT
- conn.nsent++;
- sent_by_opcode.add(msg);
-#endif
-}
-
template<typename O, O _, O __>
template<typename MsgType>
-void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const Peer *peer) {
+void PeerNetwork<O, _, __>::_send_msg(const MsgType &msg, const Peer *peer) {
if (peer->connected)
MsgNet::send_msg(msg, *(peer->conn));
else
@@ -607,6 +691,7 @@ void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const Peer *peer) {
template<typename O, O _, O __>
template<typename MsgType>
void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &addr) {
+ mutex_lg_t _pn_lg(pn_mlock);
auto it = id2peer.find(addr);
if (it == id2peer.end())
{
@@ -614,48 +699,11 @@ void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &addr) {
std::string(addr).c_str());
throw PeerNetworkError("peer does not exist");
}
- send_msg(msg, it->second.get());
-}
-
-template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Peer::reset_ping_timer() {
- assert(ev_ping_timer);
- ev_ping_timer.del();
- ev_ping_timer.add_with_timeout(
- gen_rand_timeout(conn->get_net()->ping_period));
-}
-
-template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::reset_conn_timeout(Conn &conn) {
- assert(conn.ev_timeout);
- conn.ev_timeout.del();
- conn.ev_timeout.add_with_timeout(conn_timeout);
- SALTICIDAE_LOG_INFO("reset timeout %.2f", conn_timeout);
-}
-
-template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Peer::send_ping() {
- auto pn = conn->get_net();
- ping_timer_ok = false;
- pong_msg_ok = false;
- pn->reset_conn_timeout(*conn);
- pn->send_msg(MsgPing(pn->listen_port), this);
-}
-
-template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Peer::ping_timer(evutil_socket_t, short) {
- ping_timer_ok = true;
- if (pong_msg_ok)
- {
- reset_ping_timer();
- send_ping();
- }
-}
-
-template<typename O, O _, O __>
-const std::vector<NetAddr> &PeerNetwork<O, _, __>::all_peers() const {
- return peer_list;
+ auto p = it->second.get();
+ mutex_lg_t _p_lg(p->mlock);
+ _send_msg(msg, p);
}
+/* end: functions invoked by the user loop */
template<typename OpcodeType>
void ClientNetwork<OpcodeType>::Conn::on_setup() {
diff --git a/include/salticidae/type.h b/include/salticidae/type.h
index c454265..68deeb0 100644
--- a/include/salticidae/type.h
+++ b/include/salticidae/type.h
@@ -33,6 +33,7 @@
#include <cstdio>
#include <ios>
#include <functional>
+#include <mutex>
namespace salticidae {
@@ -40,6 +41,8 @@ const auto _1 = std::placeholders::_1;
const auto _2 = std::placeholders::_2;
using bytearray_t = std::vector<uint8_t>;
+using mutex_lg_t = std::lock_guard<std::mutex>;
+using mutex_ul_t = std::unique_lock<std::mutex>;
template<typename T> T htole(T) = delete;
template<> inline uint16_t htole<uint16_t>(uint16_t x) { return htole16(x); }
diff --git a/src/conn.cpp b/src/conn.cpp
index 5863f3c..edbe9f3 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -143,18 +143,10 @@ void ConnPool::accept_client(evutil_socket_t fd, short) {
conn->cpool = this;
conn->mode = Conn::PASSIVE;
conn->addr = addr;
-
- //Conn *conn_ptr = conn.get();
- // TODO: use worker thread ec
- //conn->ev_read = Event(ec, client_fd, EV_READ,
- // std::bind(&Conn::recv_data, conn_ptr, _1, _2));
- //conn->ev_write = Event(ec, client_fd, EV_WRITE,
- // std::bind(&Conn::send_data, conn_ptr, _1, _2));
- //conn->ev_read.add();
- //conn->ev_write.add();
add_conn(conn);
- SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str());
+ SALTICIDAE_LOG_INFO("accepted %s", std::string(*conn).c_str());
conn->on_setup();
+ update_conn(conn, true);
select_worker().feed(conn, client_fd);
}
}
@@ -163,17 +155,11 @@ void ConnPool::Conn::conn_server(evutil_socket_t fd, short events) {
auto conn = self(); /* pin the connection */
if (send(fd, "", 0, MSG_NOSIGNAL) == 0)
{
- // TODO: use worker thread ec
- //ev_read = Event(cpool->ec, fd, EV_READ,
- // std::bind(&Conn::recv_data, this, _1, _2));
- //ev_write = Event(cpool->ec, fd, EV_WRITE,
- // std::bind(&Conn::send_data, this, _1, _2));
- //ev_read.add();
- //ev_write.add();
ev_connect.clear();
- SALTICIDAE_LOG_INFO("connected to peer %s", std::string(*this).c_str());
+ SALTICIDAE_LOG_INFO("connected to remote %s", std::string(*this).c_str());
on_setup();
- cpool->select_worker().feed(self(), fd);
+ cpool->update_conn(conn, true);
+ cpool->select_worker().feed(conn, fd);
}
else
{
@@ -184,7 +170,7 @@ void ConnPool::Conn::conn_server(evutil_socket_t fd, short events) {
}
}
-void ConnPool::_listen(NetAddr listen_addr) {
+void ConnPool::listen(NetAddr listen_addr) {
std::lock_guard<std::mutex> _(cp_mlock);
int one = 1;
if (listen_fd != -1)
@@ -246,8 +232,8 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) {
if (::connect(fd, (struct sockaddr *)&sockin,
sizeof(struct sockaddr_in)) < 0 && errno != EINPROGRESS)
{
- SALTICIDAE_LOG_INFO("cannot connect to %s", std::string(addr).c_str());
- conn->terminate();
+ SALTICIDAE_LOG_INFO("cannot connect to %s", std::string(addr).c_str());
+ conn->terminate();
}
else
{
@@ -275,17 +261,13 @@ void ConnPool::_post_terminate(int fd) {
conn->on_close();
/* inform the upper layer the connection will be destroyed */
conn->on_teardown();
+ update_conn(conn, false);
}
}
ConnPool::conn_t ConnPool::add_conn(const conn_t &conn) {
std::lock_guard<std::mutex> _(cp_mlock);
assert(pool.find(conn->fd) == pool.end());
- //if (it != pool.end())
- //{
- // auto old_conn = it->second;
- // old_conn->terminate();
- //}
return pool.insert(std::make_pair(conn->fd, conn)).first->second;
}
diff --git a/test/bench_network.cpp b/test/bench_network.cpp
index b1f1a0f..8ff9ab2 100644
--- a/test/bench_network.cpp
+++ b/test/bench_network.cpp
@@ -148,6 +148,10 @@ void signal_handler(int) {
}
int main() {
+ struct sigaction sa;
+ sa.sa_handler = signal_handler;
+ sigaction(SIGTERM, &sa, NULL);
+ sigaction(SIGINT, &sa, NULL);
/* test two nodes */
MyNet alice(ec, "Alice", bob_addr, 10);
alice.listen(alice_addr);
@@ -158,13 +162,12 @@ int main() {
try {
ec.dispatch();
} catch (std::exception &) {}
+ SALTICIDAE_LOG_INFO("exiting");
});
- signal(SIGTERM, signal_handler);
- signal(SIGINT, signal_handler);
try {
ec.dispatch();
} catch (std::exception &e) {
- pthread_kill(bob_thread.native_handle(), SIGINT);
+ pthread_kill(bob_thread.native_handle(), SIGTERM);
bob_thread.join();
SALTICIDAE_LOG_INFO("exception: %s", e.what());
}
diff --git a/test/test_network.cpp b/test/test_network.cpp
index c4fe2e2..1821ada 100644
--- a/test/test_network.cpp
+++ b/test/test_network.cpp
@@ -89,8 +89,8 @@ struct MyNet: public MsgNetworkByteOp {
reg_handler(salticidae::generic_bind(
&MyNet::on_receive_hello, this, _1, _2));
- reg_conn_handler([this](ConnPool::Conn &conn) {
- if (conn.get_fd() != -1)
+ reg_conn_handler([this](ConnPool::Conn &conn, bool connected) {
+ if (connected)
{
if (conn.get_mode() == ConnPool::Conn::ACTIVE)
{