aboutsummaryrefslogtreecommitdiff
path: root/include
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2018-11-15 00:24:38 -0500
committerDeterminant <[email protected]>2018-11-15 00:24:38 -0500
commit82bd1d81e918cc8a1a46b6c36a81cbba43750bb1 (patch)
tree046e0e51dd6a035a0bc8f4c2686f8e1a28caa43f /include
parent389a9a56e00225b0e682d64cb05e5291c23892b0 (diff)
use a simple worker selection policy; add todo-list
Diffstat (limited to 'include')
-rw-r--r--include/salticidae/conn.h64
-rw-r--r--include/salticidae/network.h6
2 files changed, 34 insertions, 36 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 480809f..5f5fef4 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -94,8 +94,10 @@ class ConnPool {
void send_data(int, int);
void conn_server(int, int);
- /** Terminate the connection. */
- void terminate();
+ /** Terminate the connection (from the worker thread). */
+ void worker_terminate();
+ /** Terminate the connection (from the dispatcher thread). */
+ void disp_terminate(bool blocking = true);
public:
Conn(): ready_send(false) {}
@@ -121,16 +123,9 @@ class ConnPool {
}
protected:
- /** Close the IO and clear all on-going or planned events. */
- virtual void stop() {
- if (!self_ref) return;
- ev_connect.clear();
- ev_socket.clear();
- send_buffer.get_queue().unreg_handler();
- ::close(fd);
- self_ref = nullptr; /* remove the self-cycle */
- }
-
+ /** Close the IO and clear all on-going or planned events. Remove the
+ * connection from a Worker. */
+ virtual void stop();
/** Called when new data is available. */
virtual void on_read() {}
/** Called when the underlying connection is established. */
@@ -153,7 +148,7 @@ class ConnPool {
/* owned by user loop */
BoxObj<ThreadCall> user_tcall;
- conn_callback_t conn_cb;
+ conn_callback_t conn_cb;
/* owned by the dispatcher */
Event ev_listen;
@@ -170,9 +165,11 @@ class ConnPool {
EventContext ec;
ThreadCall tcall;
std::thread handle;
+ bool disp_flag;
+ std::atomic<size_t> nconn;
public:
- Worker(): tcall(ec) {}
+ Worker(): tcall(ec), disp_flag(false), nconn(0) {}
/* the following functions are called by the dispatcher */
void start() {
@@ -203,15 +200,13 @@ class ConnPool {
else
conn->send_data(fd, what);
});
-
- // std::bind(&Conn::recv_data, conn_ptr, _1, _2));
- //conn->ev_write = Event(ec, client_fd, Event::WRITE,
- // std::bind(&Conn::send_data, conn_ptr, _1, _2));
conn->ev_socket.add(Event::READ | Event::WRITE);
- //conn->ev_write.add();
+ nconn++;
});
}
+ void unfeed() { nconn--; }
+
void stop() {
tcall.call([this](ThreadCall::Handle &) { ec.stop(); });
}
@@ -219,6 +214,9 @@ class ConnPool {
std::thread &get_handle() { return handle; }
const EventContext &get_ec() { return ec; }
ThreadCall *get_tcall() { return &tcall; }
+ void set_dispatcher() { disp_flag = true; }
+ bool is_dispatcher() const { return disp_flag; }
+ size_t get_nconn() { return nconn; }
};
/* related to workers */
@@ -248,7 +246,18 @@ class ConnPool {
//};
Worker &select_worker() {
- return workers[1];
+ size_t idx = 0;
+ size_t best = workers[idx].get_nconn();
+ for (size_t i = 0; i < nworker; i++)
+ {
+ size_t t = workers[i].get_nconn();
+ if (t < best)
+ {
+ best = t;
+ idx = i;
+ }
+ }
+ return workers[idx];
}
public:
@@ -256,7 +265,7 @@ class ConnPool {
int max_listen_backlog = 10,
double conn_server_timeout = 2,
size_t seg_buff_size = 4096,
- size_t nworker = 2):
+ size_t nworker = 4):
ec(ec),
max_listen_backlog(max_listen_backlog),
conn_server_timeout(conn_server_timeout),
@@ -267,6 +276,7 @@ class ConnPool {
user_tcall = new ThreadCall(ec);
disp_ec = workers[0].get_ec();
disp_tcall = workers[0].get_tcall();
+ workers[0].set_dispatcher();
}
~ConnPool() {
@@ -336,18 +346,6 @@ class ConnPool {
template<typename Func>
void reg_conn_handler(Func cb) { conn_cb = cb; }
-
- void terminate(const conn_t &conn, bool blocking = true) {
- int fd = conn->fd;
- auto worker = conn->worker;
- if (worker)
- worker->get_tcall()->call([conn](ThreadCall::Handle &) {
- conn->stop();
- }, blocking);
- else
- conn->stop();
- remove_conn(fd);
- }
};
}
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 60d8f20..4f934cd 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -470,7 +470,7 @@ void PeerNetwork<O, _, __>::Conn::on_setup() {
assert(!ev_timeout);
ev_timeout = Event(worker->get_ec(), -1, [conn](int, int) {
SALTICIDAE_LOG_INFO("peer ping-pong timeout");
- conn->terminate();
+ conn->worker_terminate();
});
/* the initial ping-pong to set up the connection */
tcall_reset_timeout(worker, conn, pn->conn_timeout);
@@ -508,7 +508,7 @@ void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) {
//SALTICIDAE_LOG_DEBUG("moving send buffer");
//new_conn->move_send_buffer(conn);
SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str());
- conn->cpool->terminate(conn);
+ conn->disp_terminate();
}
addr = new_conn->get_addr();
conn = new_conn;
@@ -557,7 +557,7 @@ bool PeerNetwork<O, _, __>::check_new_conn(const conn_t &conn, uint16_t port) {
{
if (conn != p->conn)
{
- conn->cpool->terminate(conn);
+ conn->disp_terminate();
return true;
}
return false;