aboutsummaryrefslogtreecommitdiff
path: root/include
diff options
context:
space:
mode:
Diffstat (limited to 'include')
-rw-r--r--include/salticidae/conn.h99
-rw-r--r--include/salticidae/network.h97
-rw-r--r--include/salticidae/util.h4
3 files changed, 116 insertions, 84 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 9e2408f..ff75e34 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -103,6 +103,10 @@ class ConnPool {
static socket_io_func _send_data_tls_handshake;
static socket_io_func _recv_data_dummy;
+ /** Close the IO and clear all on-going or planned events. Remove the
+ * connection from a Worker. */
+ virtual void stop();
+
public:
Conn(): terminated(false), worker(nullptr), ready_send(false),
send_data_func(nullptr), recv_data_func(nullptr),
@@ -114,7 +118,7 @@ class ConnPool {
SALTICIDAE_LOG_INFO("destroyed %s", std::string(*this).c_str());
}
- bool is_terminated() {
+ bool is_terminated() const {
return terminated.load(std::memory_order_acquire);
}
@@ -134,35 +138,40 @@ class ConnPool {
bool write(bytearray_t &&data) {
return send_buffer.push(std::move(data), !cpool->queue_capacity);
}
-
- protected:
- /** Close the IO and clear all on-going or planned events. Remove the
- * connection from a Worker. */
- virtual void stop();
};
protected:
EventContext ec;
EventContext disp_ec;
ThreadCall* disp_tcall;
- /** Should be implemented by derived class to return a new Conn object. */
- virtual Conn *create_conn() = 0;
+ /* owned by user loop */
+ BoxObj<ThreadCall> user_tcall;
+
using worker_error_callback_t = std::function<void(const std::exception_ptr err)>;
worker_error_callback_t disp_error_cb;
worker_error_callback_t worker_error_cb;
+ conn_t _connect(const NetAddr &addr);
+ void _listen(NetAddr listen_addr);
+ void recoverable_error(const std::exception_ptr err) const {
+ user_tcall->async_call([this, err](ThreadCall::Handle &) {
+ if (error_cb) error_cb(err, false);
+ });
+ }
+
/** Terminate the connection (from the worker thread). */
void worker_terminate(const conn_t &conn);
/** Terminate the connection (from the dispatcher thread). */
void disp_terminate(const conn_t &conn);
- void conn_server(const conn_t &conn, int, int);
- /** Called when new data is available. */
- virtual void on_read(const conn_t &) {}
- /** Called when the underlying connection is established. */
- virtual void on_setup(const conn_t &) {}
- /** Called when the underlying connection breaks. */
- virtual void on_teardown(const conn_t &) {}
+ /** Should be implemented by derived class to return a new Conn object. */
+ virtual Conn *create_conn() = 0;
+ /** Called when new data is available. */
+ virtual void on_read(const conn_t &) {}
+ /** Called when the underlying connection is established. */
+ virtual void on_setup(const conn_t &) {}
+ /** Called when the underlying connection breaks. */
+ virtual void on_teardown(const conn_t &) {}
private:
const int max_listen_backlog;
@@ -172,11 +181,6 @@ class ConnPool {
const bool enable_tls;
tls_context_t tls_ctx;
- /* owned by user loop */
- protected:
- BoxObj<ThreadCall> user_tcall;
-
- private:
conn_callback_t conn_cb;
error_callback_t error_cb;
@@ -191,10 +195,8 @@ class ConnPool {
if (enable_tls && connected)
{
conn->worker->get_tcall()->async_call([this, conn, ret](ThreadCall::Handle &) {
- if (ret)
- conn->recv_data_func = Conn::_recv_data_tls;
- else
- worker_terminate(conn);
+ if (ret) conn->recv_data_func = Conn::_recv_data_tls;
+ else worker_terminate(conn);
});
}
});
@@ -225,17 +227,27 @@ class ConnPool {
handle = std::thread([this]() { ec.dispatch(); });
}
+ void enable_send_buffer(const conn_t &conn, int client_fd) {
+ conn->get_send_buffer()
+ .get_queue()
+ .reg_handler(this->ec, [conn, client_fd]
+ (MPSCWriteBuffer::queue_t &) {
+ if (conn->ready_send)
+ {
+ conn->ev_socket.del();
+ conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE);
+ conn->send_data_func(conn, client_fd, FdEvent::WRITE);
+ }
+ return false;
+ });
+ }
+
void feed(const conn_t &conn, int client_fd) {
/* the caller should finalize all the preparation */
tcall.async_call([this, conn, client_fd](ThreadCall::Handle &) {
try {
conn->ev_connect.clear();
- if (conn->mode == Conn::ConnMode::DEAD)
- {
- SALTICIDAE_LOG_INFO("worker %x discarding dead connection",
- std::this_thread::get_id());
- return;
- }
+ assert(conn->mode != Conn::ConnMode::DEAD);
auto cpool = conn->cpool;
if (cpool->enable_tls)
{
@@ -249,25 +261,14 @@ class ConnPool {
{
conn->send_data_func = Conn::_send_data;
conn->recv_data_func = Conn::_recv_data;
+ enable_send_buffer(conn, client_fd);
cpool->update_conn(conn, true);
}
assert(conn->fd != -1);
+ assert(conn->worker == this);
SALTICIDAE_LOG_INFO("worker %x got %s",
std::this_thread::get_id(),
std::string(*conn).c_str());
- assert(conn->worker == this);
- conn->get_send_buffer()
- .get_queue()
- .reg_handler(this->ec, [conn, client_fd]
- (MPSCWriteBuffer::queue_t &) {
- if (conn->ready_send)
- {
- conn->ev_socket.del();
- conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE);
- conn->send_data_func(conn, client_fd, FdEvent::WRITE);
- }
- return false;
- });
conn->ev_socket = FdEvent(ec, client_fd, [this, conn](int fd, int what) {
try {
if (what & FdEvent::READ)
@@ -306,21 +307,11 @@ class ConnPool {
int system_state;
void accept_client(int, int);
+ void conn_server(const conn_t &conn, int, int);
conn_t add_conn(const conn_t &conn);
void del_conn(const conn_t &conn);
void release_conn(const conn_t &conn);
- protected:
- conn_t _connect(const NetAddr &addr);
- void _listen(NetAddr listen_addr);
- void recoverable_error(const std::exception_ptr err) const {
- user_tcall->async_call([this, err](ThreadCall::Handle &) {
- if (error_cb) error_cb(err, false);
- });
- }
-
- private:
-
Worker &select_worker() {
size_t idx = 0;
size_t best = workers[idx].get_nconn();
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 20dc696..05f4b7b 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -226,23 +226,19 @@ class ClientNetwork: public MsgNetwork<OpcodeType> {
public:
class Conn: public MsgNet::Conn {
friend ClientNetwork;
-
public:
Conn() = default;
-
ClientNetwork *get_net() {
return static_cast<ClientNetwork *>(ConnPool::Conn::get_pool());
}
-
- protected:
- void on_setup() override;
- void on_teardown() override;
};
using conn_t = ArcObj<Conn>;
protected:
ConnPool::Conn *create_conn() override { return new Conn(); }
+ void on_setup(const ConnPool::conn_t &) override;
+ void on_teardown(const ConnPool::conn_t &) override;
public:
using Config = typename MsgNet::Config;
@@ -251,7 +247,11 @@ class ClientNetwork: public MsgNetwork<OpcodeType> {
using MsgNet::send_msg;
template<typename MsgType>
- void send_msg(MsgType &&msg, const NetAddr &addr);
+ inline void send_msg(const MsgType &msg, const NetAddr &addr);
+ inline void _send_msg(const Msg &msg, const NetAddr &addr);
+ template<typename MsgType>
+ inline void send_msg_deferred(MsgType &&msg, const NetAddr &addr);
+ inline void _send_msg_deferred(Msg &&msg, const NetAddr &addr);
};
/** Peer-to-peer network where any two nodes could hold a bi-diretional message
@@ -503,7 +503,7 @@ void MsgNetwork<OpcodeType>::on_read(const ConnPool::conn_t &_conn) {
template<typename OpcodeType>
template<typename MsgType>
inline void MsgNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const conn_t &conn) {
- return _send_msg_deferred(MsgType(std::move(msg)), conn);
+ return _send_msg_deferred(std::move(msg), conn);
}
template<typename OpcodeType>
@@ -511,7 +511,7 @@ inline void MsgNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const conn_t &
this->disp_tcall->async_call(
[this, msg=std::move(msg), conn](ThreadCall::Handle &) {
try {
- this->send_msg(msg, conn);
+ this->_send_msg(msg, conn);
} catch (...) { this->recoverable_error(std::current_exception()); }
});
}
@@ -841,7 +841,7 @@ bool PeerNetwork<O, _, __>::has_peer(const NetAddr &paddr) const {
template<typename O, O _, O __>
template<typename MsgType>
inline void PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const NetAddr &paddr) {
- return _send_msg_deferred(MsgType(std::move(msg)), paddr);
+ return _send_msg_deferred(std::move(msg), paddr);
}
template<typename O, O _, O __>
@@ -849,7 +849,7 @@ inline void PeerNetwork<O, _, __>::_send_msg_deferred(Msg &&msg, const NetAddr &
this->disp_tcall->async_call(
[this, msg=std::move(msg), paddr](ThreadCall::Handle &) {
try {
- send_msg(msg, paddr);
+ _send_msg(msg, paddr);
} catch (...) { this->recoverable_error(std::current_exception()); }
});
}
@@ -864,7 +864,7 @@ template<typename O, O _, O __>
inline void PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const NetAddr &paddr) {
auto p = get_peer(paddr);
if (!p) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- this->send_msg(msg, p->conn);
+ MsgNet::_send_msg(msg, p->conn);
}
template<typename O, O _, O __>
@@ -883,7 +883,7 @@ inline void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<N
auto p = get_peer(addr);
if (!p)
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- this->send_msg(msg, p->conn);
+ MsgNet::_send_msg(msg, p->conn);
}
} catch (const PeerNetworkError &) {
this->recoverable_error(std::current_exception());
@@ -894,35 +894,54 @@ inline void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<N
/* end: functions invoked by the user loop */
template<typename OpcodeType>
-void ClientNetwork<OpcodeType>::Conn::on_setup() {
- MsgNet::Conn::on_setup();
- assert(this->get_mode() == Conn::PASSIVE);
- const auto &addr = this->get_addr();
- auto cn = get_net();
+void ClientNetwork<OpcodeType>::on_setup(const ConnPool::conn_t &_conn) {
+ MsgNet::on_setup(_conn);
+ auto conn = static_pointer_cast<Conn>(_conn);
+ assert(conn->get_mode() == Conn::PASSIVE);
+ const auto &addr = conn->get_addr();
+ auto cn = conn->get_net();
cn->addr2conn.erase(addr);
- cn->addr2conn.insert(
- std::make_pair(addr, static_pointer_cast<Conn>(this->self())));
+ cn->addr2conn.insert(std::make_pair(addr, conn));
}
template<typename OpcodeType>
-void ClientNetwork<OpcodeType>::Conn::on_teardown() {
- MsgNet::Conn::on_teardown();
- get_net()->addr2conn.erase(this->get_addr());
+void ClientNetwork<OpcodeType>::on_teardown(const ConnPool::conn_t &_conn) {
+ MsgNet::on_teardown(_conn);
+ auto conn = static_pointer_cast<Conn>(_conn);
+ conn->get_net()->addr2conn.erase(conn->get_addr());
}
template<typename OpcodeType>
template<typename MsgType>
-void ClientNetwork<OpcodeType>::send_msg(MsgType &&msg, const NetAddr &addr) {
+inline void ClientNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) {
+ return _send_msg_deferred(std::move(msg), addr);
+}
+
+template<typename OpcodeType>
+inline void ClientNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) {
this->disp_tcall->async_call(
- [this, addr, msg=std::forward<MsgType>(msg)](ThreadCall::Handle &) {
+ [this, msg=std::move(msg), addr](ThreadCall::Handle &) {
try {
- auto it = addr2conn.find(addr);
- if (it != addr2conn.end())
- send_msg(std::move(msg), it->second);
- } catch (...) { this->disp_error_cb(std::current_exception()); }
+ _send_msg(msg, addr);
+ } catch (...) { this->recoverable_error(std::current_exception()); }
});
}
+template<typename OpcodeType>
+template<typename MsgType>
+inline void ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr) {
+ return _send_msg(msg, addr);
+}
+
+template<typename OpcodeType>
+inline void ClientNetwork<OpcodeType>::_send_msg(const Msg &msg, const NetAddr &addr) {
+ auto it = addr2conn.find(addr);
+ if (it != addr2conn.end())
+ MsgNet::_send_msg(msg, it->second);
+ else
+ throw ClientNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
+}
+
template<typename O, O OPCODE_PING, O _>
const O PeerNetwork<O, OPCODE_PING, _>::MsgPing::opcode = OPCODE_PING;
@@ -939,6 +958,9 @@ using msgnetwork_conn_t = msgnetwork_t::conn_t;
using peernetwork_t = salticidae::PeerNetwork<_opcode_t>;
using peernetwork_config_t = peernetwork_t::Config;
using peernetwork_conn_t = peernetwork_t::conn_t;
+
+using clientnetwork_t = salticidae::ClientNetwork<_opcode_t>;
+using clientnetwork_conn_t = clientnetwork_t::conn_t;
#endif
#else
@@ -951,6 +973,9 @@ typedef struct msgnetwork_conn_t msgnetwork_conn_t;
typedef struct peernetwork_t peernetwork_t;
typedef struct peernetwork_config_t peernetwork_config_t;
typedef struct peernetwork_conn_t peernetwork_conn_t;
+
+typedef struct clientnetwork_t clientnetwork_t;
+typedef struct clientnetwork_conn_t clientnetwork_conn_t;
#endif
#endif
@@ -1042,11 +1067,23 @@ void peernetwork_send_msg(peernetwork_t *self, const msg_t * msg, const netaddr_
void peernetwork_send_msg_deferred_by_move(peernetwork_t *self, msg_t * _moved_msg, const netaddr_t *paddr);
void peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const netaddr_array_t *paddrs);
void peernetwork_listen(peernetwork_t *self, const netaddr_t *listen_addr, SalticidaeCError *err);
-void peernetwork_stop(peernetwork_t *self);
typedef void (*msgnetwork_unknown_peer_callback_t)(const netaddr_t *, void *userdata);
void peernetwork_reg_unknown_peer_handler(peernetwork_t *self, msgnetwork_unknown_peer_callback_t cb, void *userdata);
+/* ClientNetwork */
+
+clientnetwork_t *clientnetwork_new(const eventcontext_t *ec, const msgnetwork_config_t *config, SalticidaeCError *err);
+void clientnetwork_free(const clientnetwork_t *self);
+msgnetwork_t *clientnetwork_as_msgnetwork(clientnetwork_t *self);
+clientnetwork_t *msgnetwork_as_clientnetwork_unsafe(msgnetwork_t *self);
+msgnetwork_conn_t *msgnetwork_conn_new_from_clientnetwork_conn(const clientnetwork_conn_t *conn);
+clientnetwork_conn_t *clientnetwork_conn_new_from_msgnetwork_conn_unsafe(const msgnetwork_conn_t *conn);
+clientnetwork_conn_t *clientnetwork_conn_copy(const clientnetwork_conn_t *self);
+void clientnetwork_conn_free(const clientnetwork_conn_t *self);
+void clientnetwork_send_msg(clientnetwork_t *self, const msg_t * msg, const netaddr_t *addr);
+void clientnetwork_send_msg_deferred_by_move(clientnetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr);
+
#ifdef __cplusplus
}
#endif
diff --git a/include/salticidae/util.h b/include/salticidae/util.h
index 952b18b..27f4e8e 100644
--- a/include/salticidae/util.h
+++ b/include/salticidae/util.h
@@ -145,6 +145,10 @@ class PeerNetworkError: public ConnPoolError {
using ConnPoolError::ConnPoolError;
};
+class ClientNetworkError: public ConnPoolError {
+ using ConnPoolError::ConnPoolError;
+};
+
extern const char *TTY_COLOR_RED;
extern const char *TTY_COLOR_GREEN;
extern const char *TTY_COLOR_YELLOW;