aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/salticidae/conn.h20
-rw-r--r--include/salticidae/netaddr.h1
-rw-r--r--include/salticidae/network.h10
-rw-r--r--src/netaddr.cpp4
-rw-r--r--src/network.cpp12
-rw-r--r--test/test_msgnet_c.c15
6 files changed, 42 insertions, 20 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 665e1d5..48902d4 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -260,7 +260,7 @@ class ConnPool {
/* related to workers */
size_t nworker;
salticidae::BoxObj<Worker[]> workers;
- bool worker_running;
+ int system_state;
void accept_client(int, int);
conn_t add_conn(const conn_t &conn);
@@ -344,7 +344,7 @@ class ConnPool {
queue_capacity(config._queue_capacity),
listen_fd(-1),
nworker(config._nworker),
- worker_running(false) {
+ system_state(0) {
workers = new Worker[nworker];
user_tcall = new ThreadCall(ec);
disp_ec = workers[0].get_ec();
@@ -381,16 +381,16 @@ class ConnPool {
ConnPool(ConnPool &&) = delete;
void start() {
- if (worker_running) return;
+ if (system_state) return;
SALTICIDAE_LOG_INFO("starting all threads...");
for (size_t i = 0; i < nworker; i++)
workers[i].start();
- worker_running = true;
+ system_state = 1;
}
void stop_workers() {
- if (!worker_running) return;
- worker_running = false;
+ if (system_state != 1) return;
+ system_state = 2;
SALTICIDAE_LOG_INFO("stopping all threads...");
/* stop the dispatcher */
workers[0].stop();
@@ -401,10 +401,6 @@ class ConnPool {
/* join all worker threads */
for (size_t i = 1; i < nworker; i++)
workers[i].get_handle().join();
- }
-
- void stop() {
- stop_workers();
for (auto it: pool)
{
conn_t conn = it.second;
@@ -412,6 +408,10 @@ class ConnPool {
conn->self_ref = nullptr;
::close(conn->fd);
}
+ }
+
+ void stop() {
+ stop_workers();
if (listen_fd != -1)
{
close(listen_fd);
diff --git a/include/salticidae/netaddr.h b/include/salticidae/netaddr.h
index bff4cb7..e4e8c6c 100644
--- a/include/salticidae/netaddr.h
+++ b/include/salticidae/netaddr.h
@@ -141,6 +141,7 @@ void netaddr_free(const netaddr_t *self);
netaddr_t *netaddr_new_from_ip_port(uint32_t ip, uint16_t port);
netaddr_t *netaddr_new_from_sip_port(const char *ip, uint16_t port);
netaddr_t *netaddr_new_from_sipport(const char *ip_port_addr);
+netaddr_t *netaddr_copy(const netaddr_t *self);
bool netaddr_is_eq(const netaddr_t *a, const netaddr_t *b);
bool netaddr_is_null(const netaddr_t *self);
uint32_t netaddr_get_ip(const netaddr_t *self);
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index b269acc..6fef98a 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -144,7 +144,7 @@ class MsgNetwork: public ConnPool {
}
};
- virtual ~MsgNetwork() { stop_workers(); }
+ virtual ~MsgNetwork() { stop(); }
MsgNetwork(const EventContext &ec, const Config &config):
ConnPool(ec, config) {
@@ -203,6 +203,7 @@ class MsgNetwork: public ConnPool {
inline void _send_msg(Msg &&msg, const conn_t &conn);
inline void _send_msg_dispatcher(const Msg &msg, const conn_t &conn);
+ void stop() { stop_workers(); }
using ConnPool::listen;
conn_t connect(const NetAddr &addr) {
return static_pointer_cast<Conn>(ConnPool::connect(addr));
@@ -437,7 +438,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
this->reg_handler(generic_bind(&PeerNetwork::msg_pong, this, _1, _2));
}
- ~PeerNetwork() { this->stop_workers(); }
+ virtual ~PeerNetwork() { this->stop(); }
void add_peer(const NetAddr &paddr);
void del_peer(const NetAddr &paddr);
@@ -969,6 +970,7 @@ msgnetwork_conn_t *msgnetwork_conn_copy(const msgnetwork_conn_t *self);
void msgnetwork_conn_free(const msgnetwork_conn_t *self);
void msgnetwork_listen(msgnetwork_t *self, const netaddr_t *listen_addr, SalticidaeCError *err);
void msgnetwork_start(msgnetwork_t *self);
+void msgnetwork_stop(msgnetwork_t *self);
void msgnetwork_terminate(msgnetwork_t *self, const msgnetwork_conn_t *conn);
typedef void (*msgnetwork_msg_callback_t)(const msg_t *, const msgnetwork_conn_t *, void *userdata);
@@ -983,7 +985,7 @@ void msgnetwork_reg_error_handler(msgnetwork_t *self, msgnetwork_error_callback_
msgnetwork_t *msgnetwork_conn_get_net(const msgnetwork_conn_t *conn);
msgnetwork_conn_mode_t msgnetwork_conn_get_mode(const msgnetwork_conn_t *conn);
-netaddr_t *msgnetwork_conn_get_addr(const msgnetwork_conn_t *conn);
+const netaddr_t *msgnetwork_conn_get_addr(const msgnetwork_conn_t *conn);
// PeerNetwork
@@ -1002,12 +1004,14 @@ void peernetwork_del_peer(peernetwork_t *self, const netaddr_t *paddr);
bool peernetwork_has_peer(const peernetwork_t *self, const netaddr_t *paddr);
const peernetwork_conn_t *peernetwork_get_peer_conn(const peernetwork_t *self, const netaddr_t *paddr, SalticidaeCError *cerror);
msgnetwork_t *peernetwork_as_msgnetwork(peernetwork_t *self);
+peernetwork_t *msgnetwork_as_peernetwork_unsafe(msgnetwork_t *self);
msgnetwork_conn_t *msgnetwork_conn_new_from_peernetwork_conn(const peernetwork_conn_t *conn);
peernetwork_conn_t *peernetwork_conn_copy(const peernetwork_conn_t *self);
void peernetwork_conn_free(const peernetwork_conn_t *self);
void peernetwork_send_msg_by_move(peernetwork_t *self, msg_t * _moved_msg, const netaddr_t *paddr);
void peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const netaddr_array_t *paddrs);
void peernetwork_listen(peernetwork_t *self, const netaddr_t *listen_addr, SalticidaeCError *err);
+void peernetwork_stop(peernetwork_t *self);
#ifdef __cplusplus
}
diff --git a/src/netaddr.cpp b/src/netaddr.cpp
index 1387b84..cb8a3d2 100644
--- a/src/netaddr.cpp
+++ b/src/netaddr.cpp
@@ -20,6 +20,10 @@ netaddr_t *netaddr_new_from_sipport(const char *ip_port_addr) {
return new netaddr_t(ip_port_addr);
}
+netaddr_t *netaddr_copy(const netaddr_t *self) {
+ return new netaddr_t(*self);
+}
+
bool netaddr_is_eq(const netaddr_t *a, const netaddr_t *b) {
return *a == *b;
}
diff --git a/src/network.cpp b/src/network.cpp
index a1666ec..0a83e39 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -118,6 +118,8 @@ void msgnetwork_reg_error_handler(msgnetwork_t *self,
void msgnetwork_start(msgnetwork_t *self) { self->start(); }
+void msgnetwork_stop(msgnetwork_t *self) { self->stop(); }
+
void msgnetwork_terminate(msgnetwork_t *self, const msgnetwork_conn_t *conn) {
self->terminate(*conn);
}
@@ -130,8 +132,8 @@ msgnetwork_conn_mode_t msgnetwork_conn_get_mode(const msgnetwork_conn_t *conn) {
return (msgnetwork_conn_mode_t)(*conn)->get_mode();
}
-netaddr_t *msgnetwork_conn_get_addr(const msgnetwork_conn_t *conn) {
- return new netaddr_t((*conn)->get_addr());
+const netaddr_t *msgnetwork_conn_get_addr(const msgnetwork_conn_t *conn) {
+ return &(*conn)->get_addr();
}
// PeerNetwork
@@ -160,6 +162,10 @@ void peernetwork_config_id_mode(peernetwork_config_t *self, peernetwork_id_mode_
msgnetwork_config_t *peernetwork_config_as_msgnetwork_config(peernetwork_config_t *self) { return self; }
+peernetwork_t *msgnetwork_as_peernetwork_unsafe(msgnetwork_t *self) {
+ return static_cast<peernetwork_t *>(self);
+}
+
peernetwork_t *peernetwork_new(const eventcontext_t *ec, const peernetwork_config_t *config) {
return new peernetwork_t(*ec, *config);
}
@@ -225,6 +231,8 @@ void peernetwork_listen(peernetwork_t *self, const netaddr_t *listen_addr, Salti
}
}
+void peernetwork_stop(peernetwork_t *self) { self->stop(); }
+
}
#endif
diff --git a/test/test_msgnet_c.c b/test/test_msgnet_c.c
index c2fb492..e6ebd14 100644
--- a/test/test_msgnet_c.c
+++ b/test/test_msgnet_c.c
@@ -54,8 +54,10 @@ msg_t *msg_hello_serialize(const char *name, const char *text) {
datastream_put_i32(serialized, (uint32_t)htole32(name_len));
datastream_put_data(serialized, name, name_len);
datastream_put_data(serialized, text, strlen(text));
- msg_t *msg = msg_new_moved_from_bytearray(
- MSG_OPCODE_HELLO, bytearray_new_moved_from_datastream(serialized));
+ bytearray_t *arr = bytearray_new_moved_from_datastream(serialized);
+ msg_t *msg = msg_new_moved_from_bytearray(MSG_OPCODE_HELLO, arr);
+ datastream_free(serialized);
+ bytearray_free(arr);
return msg;
}
@@ -84,7 +86,9 @@ MsgHello msg_hello_unserialize(const msg_t *msg) {
}
msg_t *msg_ack_serialize() {
- msg_t *msg = msg_new_moved_from_bytearray(MSG_OPCODE_ACK, bytearray_new());
+ bytearray_t *arr = bytearray_new();
+ msg_t *msg = msg_new_moved_from_bytearray(MSG_OPCODE_ACK, arr);
+ bytearray_free(arr);
return msg;
}
@@ -105,6 +109,7 @@ void on_receive_hello(const msg_t *_msg, const msgnetwork_conn_t *conn, void *us
msg_t *ack = msg_ack_serialize();
/* send acknowledgement */
msgnetwork_send_msg_by_move(net, ack, conn);
+ msg_free(ack);
}
void on_receive_ack(const msg_t *msg, const msgnetwork_conn_t *conn, void *userdata) {
@@ -124,6 +129,7 @@ void conn_handler(const msgnetwork_conn_t *conn, bool connected, void *userdata)
/* send the first message through this connection */
msg_t *hello = msg_hello_serialize(name, "Hello there!");
msgnetwork_send_msg_by_move(n->net, hello, conn);
+ msg_free(hello);
}
else
printf("[%s] Accepted, waiting for greetings.\n", name);
@@ -132,10 +138,9 @@ void conn_handler(const msgnetwork_conn_t *conn, bool connected, void *userdata)
{
printf("[%s] Disconnected, retrying.\n", name);
/* try to reconnect to the same address */
- netaddr_t *addr = msgnetwork_conn_get_addr(conn);
+ const netaddr_t *addr = msgnetwork_conn_get_addr(conn);
msgnetwork_connect(net, addr, &err);
check_err(&err);
- netaddr_free(addr);
}
}