aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/conn.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/conn.h')
-rw-r--r--include/salticidae/conn.h99
1 files changed, 45 insertions, 54 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 9e2408f..ff75e34 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -103,6 +103,10 @@ class ConnPool {
static socket_io_func _send_data_tls_handshake;
static socket_io_func _recv_data_dummy;
+ /** Close the IO and clear all on-going or planned events. Remove the
+ * connection from a Worker. */
+ virtual void stop();
+
public:
Conn(): terminated(false), worker(nullptr), ready_send(false),
send_data_func(nullptr), recv_data_func(nullptr),
@@ -114,7 +118,7 @@ class ConnPool {
SALTICIDAE_LOG_INFO("destroyed %s", std::string(*this).c_str());
}
- bool is_terminated() {
+ bool is_terminated() const {
return terminated.load(std::memory_order_acquire);
}
@@ -134,35 +138,40 @@ class ConnPool {
bool write(bytearray_t &&data) {
return send_buffer.push(std::move(data), !cpool->queue_capacity);
}
-
- protected:
- /** Close the IO and clear all on-going or planned events. Remove the
- * connection from a Worker. */
- virtual void stop();
};
protected:
EventContext ec;
EventContext disp_ec;
ThreadCall* disp_tcall;
- /** Should be implemented by derived class to return a new Conn object. */
- virtual Conn *create_conn() = 0;
+ /* owned by user loop */
+ BoxObj<ThreadCall> user_tcall;
+
using worker_error_callback_t = std::function<void(const std::exception_ptr err)>;
worker_error_callback_t disp_error_cb;
worker_error_callback_t worker_error_cb;
+ conn_t _connect(const NetAddr &addr);
+ void _listen(NetAddr listen_addr);
+ void recoverable_error(const std::exception_ptr err) const {
+ user_tcall->async_call([this, err](ThreadCall::Handle &) {
+ if (error_cb) error_cb(err, false);
+ });
+ }
+
/** Terminate the connection (from the worker thread). */
void worker_terminate(const conn_t &conn);
/** Terminate the connection (from the dispatcher thread). */
void disp_terminate(const conn_t &conn);
- void conn_server(const conn_t &conn, int, int);
- /** Called when new data is available. */
- virtual void on_read(const conn_t &) {}
- /** Called when the underlying connection is established. */
- virtual void on_setup(const conn_t &) {}
- /** Called when the underlying connection breaks. */
- virtual void on_teardown(const conn_t &) {}
+ /** Should be implemented by derived class to return a new Conn object. */
+ virtual Conn *create_conn() = 0;
+ /** Called when new data is available. */
+ virtual void on_read(const conn_t &) {}
+ /** Called when the underlying connection is established. */
+ virtual void on_setup(const conn_t &) {}
+ /** Called when the underlying connection breaks. */
+ virtual void on_teardown(const conn_t &) {}
private:
const int max_listen_backlog;
@@ -172,11 +181,6 @@ class ConnPool {
const bool enable_tls;
tls_context_t tls_ctx;
- /* owned by user loop */
- protected:
- BoxObj<ThreadCall> user_tcall;
-
- private:
conn_callback_t conn_cb;
error_callback_t error_cb;
@@ -191,10 +195,8 @@ class ConnPool {
if (enable_tls && connected)
{
conn->worker->get_tcall()->async_call([this, conn, ret](ThreadCall::Handle &) {
- if (ret)
- conn->recv_data_func = Conn::_recv_data_tls;
- else
- worker_terminate(conn);
+ if (ret) conn->recv_data_func = Conn::_recv_data_tls;
+ else worker_terminate(conn);
});
}
});
@@ -225,17 +227,27 @@ class ConnPool {
handle = std::thread([this]() { ec.dispatch(); });
}
+ void enable_send_buffer(const conn_t &conn, int client_fd) {
+ conn->get_send_buffer()
+ .get_queue()
+ .reg_handler(this->ec, [conn, client_fd]
+ (MPSCWriteBuffer::queue_t &) {
+ if (conn->ready_send)
+ {
+ conn->ev_socket.del();
+ conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE);
+ conn->send_data_func(conn, client_fd, FdEvent::WRITE);
+ }
+ return false;
+ });
+ }
+
void feed(const conn_t &conn, int client_fd) {
/* the caller should finalize all the preparation */
tcall.async_call([this, conn, client_fd](ThreadCall::Handle &) {
try {
conn->ev_connect.clear();
- if (conn->mode == Conn::ConnMode::DEAD)
- {
- SALTICIDAE_LOG_INFO("worker %x discarding dead connection",
- std::this_thread::get_id());
- return;
- }
+ assert(conn->mode != Conn::ConnMode::DEAD);
auto cpool = conn->cpool;
if (cpool->enable_tls)
{
@@ -249,25 +261,14 @@ class ConnPool {
{
conn->send_data_func = Conn::_send_data;
conn->recv_data_func = Conn::_recv_data;
+ enable_send_buffer(conn, client_fd);
cpool->update_conn(conn, true);
}
assert(conn->fd != -1);
+ assert(conn->worker == this);
SALTICIDAE_LOG_INFO("worker %x got %s",
std::this_thread::get_id(),
std::string(*conn).c_str());
- assert(conn->worker == this);
- conn->get_send_buffer()
- .get_queue()
- .reg_handler(this->ec, [conn, client_fd]
- (MPSCWriteBuffer::queue_t &) {
- if (conn->ready_send)
- {
- conn->ev_socket.del();
- conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE);
- conn->send_data_func(conn, client_fd, FdEvent::WRITE);
- }
- return false;
- });
conn->ev_socket = FdEvent(ec, client_fd, [this, conn](int fd, int what) {
try {
if (what & FdEvent::READ)
@@ -306,21 +307,11 @@ class ConnPool {
int system_state;
void accept_client(int, int);
+ void conn_server(const conn_t &conn, int, int);
conn_t add_conn(const conn_t &conn);
void del_conn(const conn_t &conn);
void release_conn(const conn_t &conn);
- protected:
- conn_t _connect(const NetAddr &addr);
- void _listen(NetAddr listen_addr);
- void recoverable_error(const std::exception_ptr err) const {
- user_tcall->async_call([this, err](ThreadCall::Handle &) {
- if (error_cb) error_cb(err, false);
- });
- }
-
- private:
-
Worker &select_worker() {
size_t idx = 0;
size_t best = workers[idx].get_nconn();