aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <ted.sybil@gmail.com>2018-11-15 00:24:38 -0500
committerDeterminant <ted.sybil@gmail.com>2018-11-15 00:24:38 -0500
commit82bd1d81e918cc8a1a46b6c36a81cbba43750bb1 (patch)
tree046e0e51dd6a035a0bc8f4c2686f8e1a28caa43f
parent389a9a56e00225b0e682d64cb05e5291c23892b0 (diff)
use a simple worker selection policy; add todo-list
-rw-r--r--TODO.rst2
-rw-r--r--include/salticidae/conn.h64
-rw-r--r--include/salticidae/network.h6
-rw-r--r--src/conn.cpp31
-rw-r--r--test/test_p2p.cpp7
5 files changed, 65 insertions, 45 deletions
diff --git a/TODO.rst b/TODO.rst
new file mode 100644
index 0000000..b6fb085
--- /dev/null
+++ b/TODO.rst
@@ -0,0 +1,2 @@
+- use config factory class to configure ConnPool, MsgNetwork, PeerNetwork, etc.
+- stress test for PeerNetwork
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 480809f..5f5fef4 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -94,8 +94,10 @@ class ConnPool {
void send_data(int, int);
void conn_server(int, int);
- /** Terminate the connection. */
- void terminate();
+ /** Terminate the connection (from the worker thread). */
+ void worker_terminate();
+ /** Terminate the connection (from the dispatcher thread). */
+ void disp_terminate(bool blocking = true);
public:
Conn(): ready_send(false) {}
@@ -121,16 +123,9 @@ class ConnPool {
}
protected:
- /** Close the IO and clear all on-going or planned events. */
- virtual void stop() {
- if (!self_ref) return;
- ev_connect.clear();
- ev_socket.clear();
- send_buffer.get_queue().unreg_handler();
- ::close(fd);
- self_ref = nullptr; /* remove the self-cycle */
- }
-
+ /** Close the IO and clear all on-going or planned events. Remove the
+ * connection from a Worker. */
+ virtual void stop();
/** Called when new data is available. */
virtual void on_read() {}
/** Called when the underlying connection is established. */
@@ -153,7 +148,7 @@ class ConnPool {
/* owned by user loop */
BoxObj<ThreadCall> user_tcall;
- conn_callback_t conn_cb;
+ conn_callback_t conn_cb;
/* owned by the dispatcher */
Event ev_listen;
@@ -170,9 +165,11 @@ class ConnPool {
EventContext ec;
ThreadCall tcall;
std::thread handle;
+ bool disp_flag;
+ std::atomic<size_t> nconn;
public:
- Worker(): tcall(ec) {}
+ Worker(): tcall(ec), disp_flag(false), nconn(0) {}
/* the following functions are called by the dispatcher */
void start() {
@@ -203,15 +200,13 @@ class ConnPool {
else
conn->send_data(fd, what);
});
-
- // std::bind(&Conn::recv_data, conn_ptr, _1, _2));
- //conn->ev_write = Event(ec, client_fd, Event::WRITE,
- // std::bind(&Conn::send_data, conn_ptr, _1, _2));
conn->ev_socket.add(Event::READ | Event::WRITE);
- //conn->ev_write.add();
+ nconn++;
});
}
+ void unfeed() { nconn--; }
+
void stop() {
tcall.call([this](ThreadCall::Handle &) { ec.stop(); });
}
@@ -219,6 +214,9 @@ class ConnPool {
std::thread &get_handle() { return handle; }
const EventContext &get_ec() { return ec; }
ThreadCall *get_tcall() { return &tcall; }
+ void set_dispatcher() { disp_flag = true; }
+ bool is_dispatcher() const { return disp_flag; }
+ size_t get_nconn() { return nconn; }
};
/* related to workers */
@@ -248,7 +246,18 @@ class ConnPool {
//};
Worker &select_worker() {
- return workers[1];
+ size_t idx = 0;
+ size_t best = workers[idx].get_nconn();
+ for (size_t i = 0; i < nworker; i++)
+ {
+ size_t t = workers[i].get_nconn();
+ if (t < best)
+ {
+ best = t;
+ idx = i;
+ }
+ }
+ return workers[idx];
}
public:
@@ -256,7 +265,7 @@ class ConnPool {
int max_listen_backlog = 10,
double conn_server_timeout = 2,
size_t seg_buff_size = 4096,
- size_t nworker = 2):
+ size_t nworker = 4):
ec(ec),
max_listen_backlog(max_listen_backlog),
conn_server_timeout(conn_server_timeout),
@@ -267,6 +276,7 @@ class ConnPool {
user_tcall = new ThreadCall(ec);
disp_ec = workers[0].get_ec();
disp_tcall = workers[0].get_tcall();
+ workers[0].set_dispatcher();
}
~ConnPool() {
@@ -336,18 +346,6 @@ class ConnPool {
template<typename Func>
void reg_conn_handler(Func cb) { conn_cb = cb; }
-
- void terminate(const conn_t &conn, bool blocking = true) {
- int fd = conn->fd;
- auto worker = conn->worker;
- if (worker)
- worker->get_tcall()->call([conn](ThreadCall::Handle &) {
- conn->stop();
- }, blocking);
- else
- conn->stop();
- remove_conn(fd);
- }
};
}
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 60d8f20..4f934cd 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -470,7 +470,7 @@ void PeerNetwork<O, _, __>::Conn::on_setup() {
assert(!ev_timeout);
ev_timeout = Event(worker->get_ec(), -1, [conn](int, int) {
SALTICIDAE_LOG_INFO("peer ping-pong timeout");
- conn->terminate();
+ conn->worker_terminate();
});
/* the initial ping-pong to set up the connection */
tcall_reset_timeout(worker, conn, pn->conn_timeout);
@@ -508,7 +508,7 @@ void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) {
//SALTICIDAE_LOG_DEBUG("moving send buffer");
//new_conn->move_send_buffer(conn);
SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str());
- conn->cpool->terminate(conn);
+ conn->disp_terminate();
}
addr = new_conn->get_addr();
conn = new_conn;
@@ -557,7 +557,7 @@ bool PeerNetwork<O, _, __>::check_new_conn(const conn_t &conn, uint16_t port) {
{
if (conn != p->conn)
{
- conn->cpool->terminate(conn);
+ conn->disp_terminate();
return true;
}
return false;
diff --git a/src/conn.cpp b/src/conn.cpp
index f170d0a..841002f 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -67,7 +67,7 @@ void ConnPool::Conn::send_data(int fd, int events) {
if (ret < 0 && errno != EWOULDBLOCK)
{
SALTICIDAE_LOG_INFO("send(%d) failure: %s", fd, strerror(errno));
- terminate();
+ worker_terminate();
return;
}
}
@@ -102,13 +102,13 @@ void ConnPool::Conn::recv_data(int fd, int events) {
if (errno == EWOULDBLOCK) break;
SALTICIDAE_LOG_INFO("recv(%d) failure: %s", fd, strerror(errno));
/* connection err or half-opened connection */
- terminate();
+ worker_terminate();
return;
}
if (ret == 0)
{
//SALTICIDAE_LOG_INFO("recv(%d) terminates", fd, strerror(errno));
- terminate();
+ worker_terminate();
return;
}
buff_seg.resize(ret);
@@ -118,7 +118,17 @@ void ConnPool::Conn::recv_data(int fd, int events) {
on_read();
}
-void ConnPool::Conn::terminate() {
+void ConnPool::Conn::stop() {
+ if (!self_ref) return;
+ if (worker) worker->unfeed();
+ ev_connect.clear();
+ ev_socket.clear();
+ send_buffer.get_queue().unreg_handler();
+ ::close(fd);
+ self_ref = nullptr; /* remove the self-cycle */
+}
+
+void ConnPool::Conn::worker_terminate() {
stop();
cpool->disp_tcall->call(
[cpool=this->cpool, fd=this->fd](ThreadCall::Handle &) {
@@ -126,6 +136,15 @@ void ConnPool::Conn::terminate() {
});
}
+void ConnPool::Conn::disp_terminate(bool blocking) {
+ if (worker && !worker->is_dispatcher())
+ worker->get_tcall()->call([conn=self()](ThreadCall::Handle &) {
+ conn->stop();
+ }, blocking);
+ else stop();
+ cpool->remove_conn(fd);
+}
+
void ConnPool::accept_client(int fd, int) {
int client_fd;
struct sockaddr client_addr;
@@ -173,7 +192,7 @@ void ConnPool::Conn::conn_server(int fd, int events) {
{
if (events & Event::TIMEOUT)
SALTICIDAE_LOG_INFO("%s connect timeout", std::string(*this).c_str());
- cpool->terminate(conn);
+ conn->disp_terminate();
return;
}
}
@@ -237,7 +256,7 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) {
sizeof(struct sockaddr_in)) < 0 && errno != EINPROGRESS)
{
SALTICIDAE_LOG_INFO("cannot connect to %s", std::string(addr).c_str());
- terminate(conn);
+ conn->disp_terminate();
}
else
{
diff --git a/test/test_p2p.cpp b/test/test_p2p.cpp
index f902afa..99c28a0 100644
--- a/test/test_p2p.cpp
+++ b/test/test_p2p.cpp
@@ -75,10 +75,11 @@ const uint8_t MsgAck::opcode;
using MyNet = salticidae::PeerNetwork<uint8_t>;
-NetAddr addrs[] = {
+std::vector<NetAddr> addrs = {
NetAddr("127.0.0.1:12345"),
NetAddr("127.0.0.1:12346"),
- NetAddr("127.0.0.1:12347")
+ NetAddr("127.0.0.1:12347"),
+ NetAddr("127.0.0.1:12348")
};
void signal_handler(int) {
@@ -97,7 +98,7 @@ int main(int argc, char **argv) {
int i;
net.start();
net.listen(addrs[i = atoi(argv[1])]);
- for (int j = 0; j < 3; j++)
+ for (int j = 0; j < addrs.size(); j++)
if (i != j) net.add_peer(addrs[j]);
ec.dispatch();
} catch (salticidae::SalticidaeError &e) {}