aboutsummaryrefslogtreecommitdiff
path: root/include/salticidae/conn.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/salticidae/conn.h')
-rw-r--r--include/salticidae/conn.h75
1 files changed, 28 insertions, 47 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 26d19fe..1364d4d 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -61,7 +61,7 @@ class ConnPool {
/** The handle to a bi-directional connection. */
using conn_t = ArcObj<Conn>;
/** The type of callback invoked when connection status is changed. */
- using conn_callback_t = std::function<void(Conn &)>;
+ using conn_callback_t = std::function<void(Conn &, bool)>;
/** Abstraction for a bi-directional connection. */
class Conn {
@@ -72,7 +72,7 @@ class ConnPool {
PASSIVE, /**< the connection is established by accept() */
};
- private:
+ protected:
size_t seg_buff_size;
conn_t self_ref;
int fd;
@@ -109,14 +109,10 @@ class ConnPool {
/** Get the handle to itself. */
conn_t self() { return self_ref; }
operator std::string() const;
- int get_fd() const { return fd; }
const NetAddr &get_addr() const { return addr; }
ConnMode get_mode() const { return mode; }
ConnPool *get_pool() const { return cpool; }
- SegBuffer &get_recv_buffer() { return recv_buffer; }
MPSCWriteBuffer &get_send_buffer() { return send_buffer; }
- /** Set the buffer size used for send/receive data. */
- void set_seg_buff_size(size_t size) { seg_buff_size = size; }
/** Write data to the connection (non-blocking). The data will be sent
* whenever I/O is available. */
@@ -124,11 +120,6 @@ class ConnPool {
send_buffer.push(std::move(data));
}
- ///** Move the send buffer from the other (old) connection. */
- //void move_send_buffer(conn_t other) {
- // send_buffer = std::move(other->send_buffer);
- //}
-
protected:
/** Close the IO and clear all on-going or planned events. */
virtual void on_close() {
@@ -143,13 +134,9 @@ class ConnPool {
/** Called when new data is available. */
virtual void on_read() {}
/** Called when the underlying connection is established. */
- virtual void on_setup() {
- cpool->update_conn(self());
- }
+ virtual void on_setup() {}
/** Called when the underlying connection breaks. */
- virtual void on_teardown() {
- cpool->update_conn(self());
- }
+ virtual void on_teardown() {}
};
private:
@@ -165,14 +152,13 @@ class ConnPool {
/* owned by the dispatcher */
std::unordered_map<int, conn_t> pool;
int listen_fd; /**< for accepting new network connections */
- int dlisten_fd[2]; /**< for control command sent to the dispatcher */
Event ev_listen;
Event ev_dlisten;
std::mutex cp_mlock;
- void update_conn(const conn_t &conn) {
- auto ptr = new conn_t(conn);
- write(mlisten_fd[1], &ptr, sizeof(ptr));
+ void update_conn(const conn_t &conn, bool connected) {
+ auto dcmd = new UserConn(conn, connected);
+ write(mlisten_fd[1], &dcmd, sizeof(dcmd));
}
struct Worker;
@@ -269,31 +255,22 @@ class ConnPool {
void accept_client(evutil_socket_t, short);
conn_t add_conn(const conn_t &conn);
conn_t _connect(const NetAddr &addr);
- void _listen(NetAddr listen_addr);
void _post_terminate(int fd);
+ protected:
class DispatchCmd {
public:
virtual ~DispatchCmd() = default;
virtual void exec(ConnPool *cpool) = 0;
};
- // TODO: the following two are untested
- class DspListen: public DispatchCmd {
- const NetAddr addr;
- public:
- DspListen(const NetAddr &addr): addr(addr) {}
- void exec(ConnPool *cpool) override {
- cpool->_listen(addr);
- }
- };
-
+ private:
class DspConnect: public DispatchCmd {
const NetAddr addr;
public:
DspConnect(const NetAddr &addr): addr(addr) {}
void exec(ConnPool *cpool) override {
- cpool->update_conn(cpool->_connect(addr));
+ cpool->update_conn(cpool->_connect(addr), true);
}
};
@@ -318,6 +295,18 @@ class ConnPool {
}
};
+ class UserConn: public DispatchCmd {
+ conn_t conn;
+ bool connected;
+ public:
+ UserConn(const conn_t &conn, bool connected):
+ conn(conn), connected(connected) {}
+ void exec(ConnPool *cpool) override {
+ if (cpool->conn_cb)
+ cpool->conn_cb(*conn, connected);
+ }
+ };
+
void post_terminate(int fd) {
auto dcmd = new DspPostTerm(fd);
write(dlisten_fd[1], &dcmd, sizeof(dcmd));
@@ -330,6 +319,7 @@ class ConnPool {
protected:
EventContext ec;
EventContext dispatcher_ec;
+ int dlisten_fd[2]; /**< for control command sent to the dispatcher */
std::mutex dsp_ec_mlock;
/** Should be implemented by derived class to return a new Conn object. */
virtual Conn *create_conn() = 0;
@@ -352,11 +342,10 @@ class ConnPool {
throw ConnPoolError(std::string("failed to create dispatcher pipe"));
ev_mlisten = Event(ec, mlisten_fd[0], EV_READ | EV_PERSIST, [this](int fd, short) {
- conn_t *conn_ptr;
- read(fd, &conn_ptr, sizeof(conn_ptr));
- if (conn_cb)
- conn_cb(**conn_ptr);
- delete conn_ptr;
+ DispatchCmd *dcmd;
+ read(fd, &dcmd, sizeof(dcmd));
+ dcmd->exec(this);
+ delete dcmd;
});
ev_mlisten.add();
@@ -414,15 +403,7 @@ class ConnPool {
/** Listen for passive connections (connection initiated from remote).
* Does not need to be called if do not want to accept any passive
* connections. */
- void listen(NetAddr listen_addr, bool blocking = true) {
- if (blocking)
- _listen(listen_addr);
- else
- {
- auto dcmd = new DspListen(listen_addr);
- write(dlisten_fd[1], &dcmd, sizeof(dcmd));
- }
- }
+ void listen(NetAddr listen_addr);
template<typename Func>
void reg_conn_handler(Func cb) { conn_cb = cb; }