aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/conn.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/conn.h')
-rw-r--r--include/salticidae/conn.h91
1 files changed, 54 insertions, 37 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 2ef6b50..73b3022 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -57,13 +57,13 @@ struct ConnPoolError: public SalticidaeError {
/** Abstraction for connection management. */
class ConnPool {
+ class Worker;
public:
class Conn;
/** The handle to a bi-directional connection. */
using conn_t = ArcObj<Conn>;
/** The type of callback invoked when connection status is changed. */
using conn_callback_t = std::function<void(Conn &, bool)>;
-
/** Abstraction for a bi-directional connection. */
class Conn {
friend ConnPool;
@@ -77,17 +77,16 @@ class ConnPool {
size_t seg_buff_size;
conn_t self_ref;
int fd;
+ Worker *worker;
ConnPool *cpool;
ConnMode mode;
NetAddr addr;
- // TODO: send_buffer should be a thread-safe mpsc queue
MPSCWriteBuffer send_buffer;
SegBuffer recv_buffer;
- Event ev_read;
- Event ev_write;
Event ev_connect;
+ Event ev_socket;
/** does not need to wait if true */
bool ready_send;
@@ -95,8 +94,6 @@ class ConnPool {
void send_data(int, int);
void conn_server(int, int);
- /** Stop the worker and I/O events. */
- void stop();
/** Terminate the connection. */
void terminate();
@@ -125,10 +122,11 @@ class ConnPool {
protected:
/** Close the IO and clear all on-going or planned events. */
- virtual void on_close() {
- ev_read.clear();
- ev_write.clear();
+ virtual void stop() {
+ if (fd == -1) return;
ev_connect.clear();
+ ev_socket.clear();
+ send_buffer.get_queue().unreg_handler();
::close(fd);
fd = -1;
self_ref = nullptr; /* remove the self-cycle */
@@ -142,6 +140,13 @@ class ConnPool {
virtual void on_teardown() {}
};
+ protected:
+ EventContext ec;
+ EventContext disp_ec;
+ ThreadCall* disp_tcall;
+ /** Should be implemented by derived class to return a new Conn object. */
+ virtual Conn *create_conn() = 0;
+
private:
const int max_listen_backlog;
const double conn_server_timeout;
@@ -164,11 +169,11 @@ class ConnPool {
class Worker {
EventContext ec;
- ThreadCall msgr;
+ ThreadCall tcall;
std::thread handle;
public:
- Worker(): msgr(ec) {}
+ Worker(): tcall(ec) {}
/* the following functions are called by the dispatcher */
void start() {
@@ -176,7 +181,7 @@ class ConnPool {
}
void feed(const conn_t &conn, int client_fd) {
- msgr.call([this, conn, client_fd](ThreadCall::Handle &) {
+ tcall.call([this, conn, client_fd](ThreadCall::Handle &) {
SALTICIDAE_LOG_INFO("worker %x got %s",
std::this_thread::get_id(),
std::string(*conn).c_str());
@@ -185,11 +190,15 @@ class ConnPool {
.reg_handler(this->ec, [conn, client_fd]
(MPSCWriteBuffer::queue_t &) {
if (conn->ready_send && conn->fd != -1)
+ {
+ conn->ev_socket.del();
+ conn->ev_socket.add(Event::READ | Event::WRITE);
conn->send_data(client_fd, Event::WRITE);
+ }
return false;
});
//auto conn_ptr = conn.get();
- conn->ev_read = Event(ec, client_fd, Event::READ | Event::WRITE, [conn=conn](int fd, int what) {
+ conn->ev_socket = Event(ec, client_fd, [conn=conn](int fd, int what) {
if (what & Event::READ)
conn->recv_data(fd, what);
else
@@ -199,19 +208,18 @@ class ConnPool {
// std::bind(&Conn::recv_data, conn_ptr, _1, _2));
//conn->ev_write = Event(ec, client_fd, Event::WRITE,
// std::bind(&Conn::send_data, conn_ptr, _1, _2));
- conn->ev_read.add();
+ conn->ev_socket.add(Event::READ | Event::WRITE);
//conn->ev_write.add();
});
}
void stop() {
- msgr.call([this](ThreadCall::Handle &) {
- ec.stop();
- });
+ tcall.call([this](ThreadCall::Handle &) { ec.stop(); });
}
std::thread &get_handle() { return handle; }
const EventContext &get_ec() { return ec; }
+ ThreadCall *get_tcall() { return &tcall; }
};
/* related to workers */
@@ -220,7 +228,7 @@ class ConnPool {
void accept_client(int, int);
conn_t add_conn(const conn_t &conn);
- void terminate(int fd);
+ void remove_conn(int fd);
protected:
conn_t _connect(const NetAddr &addr);
@@ -244,43 +252,30 @@ class ConnPool {
return workers[1];
}
- protected:
- EventContext ec;
- EventContext dispatcher_ec;
- BoxObj<ThreadCall> disp_tcall;
- /** Should be implemented by derived class to return a new Conn object. */
- virtual Conn *create_conn() = 0;
-
public:
ConnPool(const EventContext &ec,
int max_listen_backlog = 10,
double conn_server_timeout = 2,
size_t seg_buff_size = 4096,
size_t nworker = 2):
+ ec(ec),
max_listen_backlog(max_listen_backlog),
conn_server_timeout(conn_server_timeout),
seg_buff_size(seg_buff_size),
listen_fd(-1),
- nworker(std::max((size_t)1, nworker)),
- ec(ec) {
+ nworker(std::max((size_t)1, nworker)) {
workers = new Worker[nworker];
- dispatcher_ec = workers[0].get_ec();
-
user_tcall = new ThreadCall(ec);
- disp_tcall = new ThreadCall(dispatcher_ec);
+ disp_ec = workers[0].get_ec();
+ disp_tcall = workers[0].get_tcall();
}
~ConnPool() {
- /* stop all workers */
- for (size_t i = 0; i < nworker; i++)
- workers[i].stop();
- /* join all worker threads */
- for (size_t i = 0; i < nworker; i++)
- workers[i].get_handle().join();
+ stop();
for (auto it: pool)
{
conn_t conn = it.second;
- conn->on_close();
+ conn->stop();
}
if (listen_fd != -1) close(listen_fd);
}
@@ -294,6 +289,17 @@ class ConnPool {
workers[i].start();
}
+ void stop() {
+ SALTICIDAE_LOG_INFO("stopping all threads...");
+ /* stop all workers */
+ for (size_t i = 0; i < nworker; i++)
+ workers[i].stop();
+ /* join all worker threads */
+ for (size_t i = 0; i < nworker; i++)
+ workers[i].get_handle().join();
+ nworker = 0;
+ }
+
/** Actively connect to remote addr. */
conn_t connect(const NetAddr &addr, bool blocking = true) {
if (blocking)
@@ -303,6 +309,9 @@ class ConnPool {
auto ptr = new conn_t(_connect(addr));
std::atomic_thread_fence(std::memory_order_release);
h.set_result(ptr);
+ h.set_deleter([](void *data) {
+ delete static_cast<conn_t *>(data);
+ });
}, true));
auto conn = *ret;
delete ret;
@@ -328,6 +337,14 @@ class ConnPool {
template<typename Func>
void reg_conn_handler(Func cb) { conn_cb = cb; }
+
+ void terminate(const conn_t &conn, bool blocking = true) {
+ int fd = conn->fd;
+ conn->worker->get_tcall()->call([conn](ThreadCall::Handle &) {
+ conn->stop();
+ }, blocking);
+ remove_conn(fd);
+ }
};
}