aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/conn.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/conn.h')
-rw-r--r--include/salticidae/conn.h63
1 files changed, 31 insertions, 32 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index ceec176..9e2408f 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -59,6 +59,7 @@ class ConnPool {
using conn_t = ArcObj<Conn>;
/** The type of callback invoked when connection status is changed. */
using conn_callback_t = std::function<bool(const conn_t &, bool)>;
+ /** The type of callback invoked when an error occured (during async execution). */
using error_callback_t = std::function<void(const std::exception_ptr, bool)>;
/** Abstraction for a bi-directional connection. */
class Conn {
@@ -71,9 +72,8 @@ class ConnPool {
};
protected:
+ std::atomic<bool> terminated;
size_t seg_buff_size;
- conn_t self_ref;
- std::mutex ref_mlock;
int fd;
Worker *worker;
ConnPool *cpool;
@@ -85,7 +85,6 @@ class ConnPool {
TimedFdEvent ev_connect;
FdEvent ev_socket;
- TimerEvent ev_send_wait;
/** does not need to wait if true */
bool ready_send;
@@ -104,15 +103,8 @@ class ConnPool {
static socket_io_func _send_data_tls_handshake;
static socket_io_func _recv_data_dummy;
- void conn_server(int, int);
-
- /** Terminate the connection (from the worker thread). */
- void worker_terminate();
- /** Terminate the connection (from the dispatcher thread). */
- void disp_terminate();
-
public:
- Conn(): worker(nullptr), ready_send(false),
+ Conn(): terminated(false), worker(nullptr), ready_send(false),
send_data_func(nullptr), recv_data_func(nullptr),
tls(nullptr), peer_cert(nullptr) {}
Conn(const Conn &) = delete;
@@ -122,15 +114,12 @@ class ConnPool {
SALTICIDAE_LOG_INFO("destroyed %s", std::string(*this).c_str());
}
- /** Get the handle to itself. */
- conn_t self() {
- mutex_lg_t _(ref_mlock);
- return self_ref;
+ bool is_terminated() {
+ return terminated.load(std::memory_order_acquire);
}
- void release_self() {
- mutex_lg_t _(ref_mlock);
- self_ref = nullptr;
+ bool set_terminated() {
+ return !terminated.exchange(true, std::memory_order_acq_rel);
}
operator std::string() const;
@@ -150,12 +139,6 @@ class ConnPool {
/** Close the IO and clear all on-going or planned events. Remove the
* connection from a Worker. */
virtual void stop();
- /** Called when new data is available. */
- virtual void on_read() {}
- /** Called when the underlying connection is established. */
- virtual void on_setup() {}
- /** Called when the underlying connection breaks. */
- virtual void on_teardown() {}
};
protected:
@@ -168,6 +151,18 @@ class ConnPool {
worker_error_callback_t disp_error_cb;
worker_error_callback_t worker_error_cb;
+ /** Terminate the connection (from the worker thread). */
+ void worker_terminate(const conn_t &conn);
+ /** Terminate the connection (from the dispatcher thread). */
+ void disp_terminate(const conn_t &conn);
+
+ void conn_server(const conn_t &conn, int, int);
+ /** Called when new data is available. */
+ virtual void on_read(const conn_t &) {}
+ /** Called when the underlying connection is established. */
+ virtual void on_setup(const conn_t &) {}
+ /** Called when the underlying connection breaks. */
+ virtual void on_teardown(const conn_t &) {}
private:
const int max_listen_backlog;
@@ -195,11 +190,11 @@ class ConnPool {
bool ret = !conn_cb || conn_cb(conn, connected);
if (enable_tls && connected)
{
- conn->worker->get_tcall()->async_call([conn, ret](ThreadCall::Handle &) {
+ conn->worker->get_tcall()->async_call([this, conn, ret](ThreadCall::Handle &) {
if (ret)
conn->recv_data_func = Conn::_recv_data_tls;
else
- conn->worker_terminate();
+ worker_terminate(conn);
});
}
});
@@ -214,6 +209,7 @@ class ConnPool {
ConnPool::worker_error_callback_t on_fatal_error;
public:
+
Worker(): tcall(ec), disp_flag(false), nconn(0) {}
void set_error_callback(ConnPool::worker_error_callback_t _on_error) {
@@ -233,6 +229,7 @@ class ConnPool {
/* the caller should finalize all the preparation */
tcall.async_call([this, conn, client_fd](ThreadCall::Handle &) {
try {
+ conn->ev_connect.clear();
if (conn->mode == Conn::ConnMode::DEAD)
{
SALTICIDAE_LOG_INFO("worker %x discarding dead connection",
@@ -258,6 +255,7 @@ class ConnPool {
SALTICIDAE_LOG_INFO("worker %x got %s",
std::this_thread::get_id(),
std::string(*conn).c_str());
+ assert(conn->worker == this);
conn->get_send_buffer()
.get_queue()
.reg_handler(this->ec, [conn, client_fd]
@@ -270,7 +268,7 @@ class ConnPool {
}
return false;
});
- conn->ev_socket = FdEvent(ec, client_fd, [this, conn=conn](int fd, int what) {
+ conn->ev_socket = FdEvent(ec, client_fd, [this, conn](int fd, int what) {
try {
if (what & FdEvent::READ)
conn->recv_data_func(conn, fd, what);
@@ -278,7 +276,7 @@ class ConnPool {
conn->send_data_func(conn, fd, what);
} catch (...) {
conn->cpool->recoverable_error(std::current_exception());
- conn->worker_terminate();
+ conn->cpool->worker_terminate(conn);
}
});
conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE);
@@ -310,6 +308,7 @@ class ConnPool {
void accept_client(int, int);
conn_t add_conn(const conn_t &conn);
void del_conn(const conn_t &conn);
+ void release_conn(const conn_t &conn);
protected:
conn_t _connect(const NetAddr &addr);
@@ -494,6 +493,7 @@ class ConnPool {
ConnPool(ConnPool &&) = delete;
void start() {
+ std::atomic_thread_fence(std::memory_order_acq_rel);
if (system_state) return;
SALTICIDAE_LOG_INFO("starting all threads...");
for (size_t i = 0; i < nworker; i++)
@@ -516,10 +516,9 @@ class ConnPool {
workers[i].get_handle().join();
for (auto it: pool)
{
- conn_t conn = it.second;
+ auto &conn = it.second;
conn->stop();
- conn->self_ref = nullptr;
- ::close(conn->fd);
+ release_conn(conn);
}
}
@@ -589,7 +588,7 @@ class ConnPool {
void terminate(const conn_t &conn) {
disp_tcall->async_call([this, conn](ThreadCall::Handle &) {
try {
- conn->disp_terminate();
+ disp_terminate(conn);
} catch (...) {
disp_error_cb(std::current_exception());
}