aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/conn.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/conn.h')
-rw-r--r--include/salticidae/conn.h64
1 files changed, 31 insertions, 33 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 480809f..5f5fef4 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -94,8 +94,10 @@ class ConnPool {
void send_data(int, int);
void conn_server(int, int);
- /** Terminate the connection. */
- void terminate();
+ /** Terminate the connection (from the worker thread). */
+ void worker_terminate();
+ /** Terminate the connection (from the dispatcher thread). */
+ void disp_terminate(bool blocking = true);
public:
Conn(): ready_send(false) {}
@@ -121,16 +123,9 @@ class ConnPool {
}
protected:
- /** Close the IO and clear all on-going or planned events. */
- virtual void stop() {
- if (!self_ref) return;
- ev_connect.clear();
- ev_socket.clear();
- send_buffer.get_queue().unreg_handler();
- ::close(fd);
- self_ref = nullptr; /* remove the self-cycle */
- }
-
+ /** Close the IO and clear all on-going or planned events. Remove the
+ * connection from a Worker. */
+ virtual void stop();
/** Called when new data is available. */
virtual void on_read() {}
/** Called when the underlying connection is established. */
@@ -153,7 +148,7 @@ class ConnPool {
/* owned by user loop */
BoxObj<ThreadCall> user_tcall;
- conn_callback_t conn_cb;
+ conn_callback_t conn_cb;
/* owned by the dispatcher */
Event ev_listen;
@@ -170,9 +165,11 @@ class ConnPool {
EventContext ec;
ThreadCall tcall;
std::thread handle;
+ bool disp_flag;
+ std::atomic<size_t> nconn;
public:
- Worker(): tcall(ec) {}
+ Worker(): tcall(ec), disp_flag(false), nconn(0) {}
/* the following functions are called by the dispatcher */
void start() {
@@ -203,15 +200,13 @@ class ConnPool {
else
conn->send_data(fd, what);
});
-
- // std::bind(&Conn::recv_data, conn_ptr, _1, _2));
- //conn->ev_write = Event(ec, client_fd, Event::WRITE,
- // std::bind(&Conn::send_data, conn_ptr, _1, _2));
conn->ev_socket.add(Event::READ | Event::WRITE);
- //conn->ev_write.add();
+ nconn++;
});
}
+ void unfeed() { nconn--; }
+
void stop() {
tcall.call([this](ThreadCall::Handle &) { ec.stop(); });
}
@@ -219,6 +214,9 @@ class ConnPool {
std::thread &get_handle() { return handle; }
const EventContext &get_ec() { return ec; }
ThreadCall *get_tcall() { return &tcall; }
+ void set_dispatcher() { disp_flag = true; }
+ bool is_dispatcher() const { return disp_flag; }
+ size_t get_nconn() { return nconn; }
};
/* related to workers */
@@ -248,7 +246,18 @@ class ConnPool {
//};
Worker &select_worker() {
- return workers[1];
+ size_t idx = 0;
+ size_t best = workers[idx].get_nconn();
+ for (size_t i = 0; i < nworker; i++)
+ {
+ size_t t = workers[i].get_nconn();
+ if (t < best)
+ {
+ best = t;
+ idx = i;
+ }
+ }
+ return workers[idx];
}
public:
@@ -256,7 +265,7 @@ class ConnPool {
int max_listen_backlog = 10,
double conn_server_timeout = 2,
size_t seg_buff_size = 4096,
- size_t nworker = 2):
+ size_t nworker = 4):
ec(ec),
max_listen_backlog(max_listen_backlog),
conn_server_timeout(conn_server_timeout),
@@ -267,6 +276,7 @@ class ConnPool {
user_tcall = new ThreadCall(ec);
disp_ec = workers[0].get_ec();
disp_tcall = workers[0].get_tcall();
+ workers[0].set_dispatcher();
}
~ConnPool() {
@@ -336,18 +346,6 @@ class ConnPool {
template<typename Func>
void reg_conn_handler(Func cb) { conn_cb = cb; }
-
- void terminate(const conn_t &conn, bool blocking = true) {
- int fd = conn->fd;
- auto worker = conn->worker;
- if (worker)
- worker->get_tcall()->call([conn](ThreadCall::Handle &) {
- conn->stop();
- }, blocking);
- else
- conn->stop();
- remove_conn(fd);
- }
};
}