From 4dc7feb8b15e0a76cc95e5a80fce363cca637856 Mon Sep 17 00:00:00 2001 From: Determinant Date: Mon, 17 Jun 2019 01:50:13 -0400 Subject: ... --- include/salticidae/conn.h | 20 ++++++++++---------- include/salticidae/netaddr.h | 1 + include/salticidae/network.h | 10 +++++++--- src/netaddr.cpp | 4 ++++ src/network.cpp | 12 ++++++++++-- test/test_msgnet_c.c | 15 ++++++++++----- 6 files changed, 42 insertions(+), 20 deletions(-) diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 665e1d5..48902d4 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -260,7 +260,7 @@ class ConnPool { /* related to workers */ size_t nworker; salticidae::BoxObj workers; - bool worker_running; + int system_state; void accept_client(int, int); conn_t add_conn(const conn_t &conn); @@ -344,7 +344,7 @@ class ConnPool { queue_capacity(config._queue_capacity), listen_fd(-1), nworker(config._nworker), - worker_running(false) { + system_state(0) { workers = new Worker[nworker]; user_tcall = new ThreadCall(ec); disp_ec = workers[0].get_ec(); @@ -381,16 +381,16 @@ class ConnPool { ConnPool(ConnPool &&) = delete; void start() { - if (worker_running) return; + if (system_state) return; SALTICIDAE_LOG_INFO("starting all threads..."); for (size_t i = 0; i < nworker; i++) workers[i].start(); - worker_running = true; + system_state = 1; } void stop_workers() { - if (!worker_running) return; - worker_running = false; + if (system_state != 1) return; + system_state = 2; SALTICIDAE_LOG_INFO("stopping all threads..."); /* stop the dispatcher */ workers[0].stop(); @@ -401,10 +401,6 @@ class ConnPool { /* join all worker threads */ for (size_t i = 1; i < nworker; i++) workers[i].get_handle().join(); - } - - void stop() { - stop_workers(); for (auto it: pool) { conn_t conn = it.second; @@ -412,6 +408,10 @@ class ConnPool { conn->self_ref = nullptr; ::close(conn->fd); } + } + + void stop() { + stop_workers(); if (listen_fd != -1) { close(listen_fd); diff --git a/include/salticidae/netaddr.h b/include/salticidae/netaddr.h index bff4cb7..e4e8c6c 100644 --- a/include/salticidae/netaddr.h +++ b/include/salticidae/netaddr.h @@ -141,6 +141,7 @@ void netaddr_free(const netaddr_t *self); netaddr_t *netaddr_new_from_ip_port(uint32_t ip, uint16_t port); netaddr_t *netaddr_new_from_sip_port(const char *ip, uint16_t port); netaddr_t *netaddr_new_from_sipport(const char *ip_port_addr); +netaddr_t *netaddr_copy(const netaddr_t *self); bool netaddr_is_eq(const netaddr_t *a, const netaddr_t *b); bool netaddr_is_null(const netaddr_t *self); uint32_t netaddr_get_ip(const netaddr_t *self); diff --git a/include/salticidae/network.h b/include/salticidae/network.h index b269acc..6fef98a 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -144,7 +144,7 @@ class MsgNetwork: public ConnPool { } }; - virtual ~MsgNetwork() { stop_workers(); } + virtual ~MsgNetwork() { stop(); } MsgNetwork(const EventContext &ec, const Config &config): ConnPool(ec, config) { @@ -203,6 +203,7 @@ class MsgNetwork: public ConnPool { inline void _send_msg(Msg &&msg, const conn_t &conn); inline void _send_msg_dispatcher(const Msg &msg, const conn_t &conn); + void stop() { stop_workers(); } using ConnPool::listen; conn_t connect(const NetAddr &addr) { return static_pointer_cast(ConnPool::connect(addr)); @@ -437,7 +438,7 @@ class PeerNetwork: public MsgNetwork { this->reg_handler(generic_bind(&PeerNetwork::msg_pong, this, _1, _2)); } - ~PeerNetwork() { this->stop_workers(); } + virtual ~PeerNetwork() { this->stop(); } void add_peer(const NetAddr &paddr); void del_peer(const NetAddr &paddr); @@ -969,6 +970,7 @@ msgnetwork_conn_t *msgnetwork_conn_copy(const msgnetwork_conn_t *self); void msgnetwork_conn_free(const msgnetwork_conn_t *self); void msgnetwork_listen(msgnetwork_t *self, const netaddr_t *listen_addr, SalticidaeCError *err); void msgnetwork_start(msgnetwork_t *self); +void msgnetwork_stop(msgnetwork_t *self); void msgnetwork_terminate(msgnetwork_t *self, const msgnetwork_conn_t *conn); typedef void (*msgnetwork_msg_callback_t)(const msg_t *, const msgnetwork_conn_t *, void *userdata); @@ -983,7 +985,7 @@ void msgnetwork_reg_error_handler(msgnetwork_t *self, msgnetwork_error_callback_ msgnetwork_t *msgnetwork_conn_get_net(const msgnetwork_conn_t *conn); msgnetwork_conn_mode_t msgnetwork_conn_get_mode(const msgnetwork_conn_t *conn); -netaddr_t *msgnetwork_conn_get_addr(const msgnetwork_conn_t *conn); +const netaddr_t *msgnetwork_conn_get_addr(const msgnetwork_conn_t *conn); // PeerNetwork @@ -1002,12 +1004,14 @@ void peernetwork_del_peer(peernetwork_t *self, const netaddr_t *paddr); bool peernetwork_has_peer(const peernetwork_t *self, const netaddr_t *paddr); const peernetwork_conn_t *peernetwork_get_peer_conn(const peernetwork_t *self, const netaddr_t *paddr, SalticidaeCError *cerror); msgnetwork_t *peernetwork_as_msgnetwork(peernetwork_t *self); +peernetwork_t *msgnetwork_as_peernetwork_unsafe(msgnetwork_t *self); msgnetwork_conn_t *msgnetwork_conn_new_from_peernetwork_conn(const peernetwork_conn_t *conn); peernetwork_conn_t *peernetwork_conn_copy(const peernetwork_conn_t *self); void peernetwork_conn_free(const peernetwork_conn_t *self); void peernetwork_send_msg_by_move(peernetwork_t *self, msg_t * _moved_msg, const netaddr_t *paddr); void peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const netaddr_array_t *paddrs); void peernetwork_listen(peernetwork_t *self, const netaddr_t *listen_addr, SalticidaeCError *err); +void peernetwork_stop(peernetwork_t *self); #ifdef __cplusplus } diff --git a/src/netaddr.cpp b/src/netaddr.cpp index 1387b84..cb8a3d2 100644 --- a/src/netaddr.cpp +++ b/src/netaddr.cpp @@ -20,6 +20,10 @@ netaddr_t *netaddr_new_from_sipport(const char *ip_port_addr) { return new netaddr_t(ip_port_addr); } +netaddr_t *netaddr_copy(const netaddr_t *self) { + return new netaddr_t(*self); +} + bool netaddr_is_eq(const netaddr_t *a, const netaddr_t *b) { return *a == *b; } diff --git a/src/network.cpp b/src/network.cpp index a1666ec..0a83e39 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -118,6 +118,8 @@ void msgnetwork_reg_error_handler(msgnetwork_t *self, void msgnetwork_start(msgnetwork_t *self) { self->start(); } +void msgnetwork_stop(msgnetwork_t *self) { self->stop(); } + void msgnetwork_terminate(msgnetwork_t *self, const msgnetwork_conn_t *conn) { self->terminate(*conn); } @@ -130,8 +132,8 @@ msgnetwork_conn_mode_t msgnetwork_conn_get_mode(const msgnetwork_conn_t *conn) { return (msgnetwork_conn_mode_t)(*conn)->get_mode(); } -netaddr_t *msgnetwork_conn_get_addr(const msgnetwork_conn_t *conn) { - return new netaddr_t((*conn)->get_addr()); +const netaddr_t *msgnetwork_conn_get_addr(const msgnetwork_conn_t *conn) { + return &(*conn)->get_addr(); } // PeerNetwork @@ -160,6 +162,10 @@ void peernetwork_config_id_mode(peernetwork_config_t *self, peernetwork_id_mode_ msgnetwork_config_t *peernetwork_config_as_msgnetwork_config(peernetwork_config_t *self) { return self; } +peernetwork_t *msgnetwork_as_peernetwork_unsafe(msgnetwork_t *self) { + return static_cast(self); +} + peernetwork_t *peernetwork_new(const eventcontext_t *ec, const peernetwork_config_t *config) { return new peernetwork_t(*ec, *config); } @@ -225,6 +231,8 @@ void peernetwork_listen(peernetwork_t *self, const netaddr_t *listen_addr, Salti } } +void peernetwork_stop(peernetwork_t *self) { self->stop(); } + } #endif diff --git a/test/test_msgnet_c.c b/test/test_msgnet_c.c index c2fb492..e6ebd14 100644 --- a/test/test_msgnet_c.c +++ b/test/test_msgnet_c.c @@ -54,8 +54,10 @@ msg_t *msg_hello_serialize(const char *name, const char *text) { datastream_put_i32(serialized, (uint32_t)htole32(name_len)); datastream_put_data(serialized, name, name_len); datastream_put_data(serialized, text, strlen(text)); - msg_t *msg = msg_new_moved_from_bytearray( - MSG_OPCODE_HELLO, bytearray_new_moved_from_datastream(serialized)); + bytearray_t *arr = bytearray_new_moved_from_datastream(serialized); + msg_t *msg = msg_new_moved_from_bytearray(MSG_OPCODE_HELLO, arr); + datastream_free(serialized); + bytearray_free(arr); return msg; } @@ -84,7 +86,9 @@ MsgHello msg_hello_unserialize(const msg_t *msg) { } msg_t *msg_ack_serialize() { - msg_t *msg = msg_new_moved_from_bytearray(MSG_OPCODE_ACK, bytearray_new()); + bytearray_t *arr = bytearray_new(); + msg_t *msg = msg_new_moved_from_bytearray(MSG_OPCODE_ACK, arr); + bytearray_free(arr); return msg; } @@ -105,6 +109,7 @@ void on_receive_hello(const msg_t *_msg, const msgnetwork_conn_t *conn, void *us msg_t *ack = msg_ack_serialize(); /* send acknowledgement */ msgnetwork_send_msg_by_move(net, ack, conn); + msg_free(ack); } void on_receive_ack(const msg_t *msg, const msgnetwork_conn_t *conn, void *userdata) { @@ -124,6 +129,7 @@ void conn_handler(const msgnetwork_conn_t *conn, bool connected, void *userdata) /* send the first message through this connection */ msg_t *hello = msg_hello_serialize(name, "Hello there!"); msgnetwork_send_msg_by_move(n->net, hello, conn); + msg_free(hello); } else printf("[%s] Accepted, waiting for greetings.\n", name); @@ -132,10 +138,9 @@ void conn_handler(const msgnetwork_conn_t *conn, bool connected, void *userdata) { printf("[%s] Disconnected, retrying.\n", name); /* try to reconnect to the same address */ - netaddr_t *addr = msgnetwork_conn_get_addr(conn); + const netaddr_t *addr = msgnetwork_conn_get_addr(conn); msgnetwork_connect(net, addr, &err); check_err(&err); - netaddr_free(addr); } } -- cgit v1.2.3-70-g09d2