From e27e529e589ef89fbe010ebf7c5635ec2873f64f Mon Sep 17 00:00:00 2001 From: Determinant Date: Wed, 12 Jun 2019 19:14:40 -0400 Subject: WIP: error handling --- test/test_msgnet_c.c | 42 +++++++++++++++++++++++------------------- test/test_p2p_stress.cpp | 5 +++++ 2 files changed, 28 insertions(+), 19 deletions(-) (limited to 'test') diff --git a/test/test_msgnet_c.c b/test/test_msgnet_c.c index b717137..656bd80 100644 --- a/test/test_msgnet_c.c +++ b/test/test_msgnet_c.c @@ -35,23 +35,25 @@ const uint8_t MSG_OPCODE_HELLO = 0x0; const uint8_t MSG_OPCODE_ACK = 0x1; typedef struct MsgHello { - const char *name; - const char *text; + char *name; + char *text; } MsgHello; + /** Defines how to serialize the msg. */ msg_t *msg_hello_serialize(const char *name, const char *text) { datastream_t *serialized = datastream_new(); size_t name_len = strlen(name); datastream_put_i32(serialized, (uint32_t)htole32(name_len)); - datastream_put_data(serialized, name, name + name_len); - datastream_put_data(serialized, text, text + strlen(text)); - msg_t *msg = msg_new(MSG_OPCODE_HELLO, bytearray_new_moved_from_datastream(serialized)); + datastream_put_data(serialized, name, name_len); + datastream_put_data(serialized, text, strlen(text)); + msg_t *msg = msg_new_moved_from_bytearray( + MSG_OPCODE_HELLO, bytearray_new_moved_from_datastream(serialized)); return msg; } /** Defines how to parse the msg. */ MsgHello msg_hello_unserialize(const msg_t *msg) { - datastream_t *s = msg_get_payload(msg); + datastream_t *s = msg_consume_payload(msg); MsgHello res; uint32_t len; len = datastream_get_u32(s); @@ -73,7 +75,7 @@ MsgHello msg_hello_unserialize(const msg_t *msg) { } msg_t *msg_ack_serialize() { - msg_t *msg = msg_new(MSG_OPCODE_ACK, bytearray_new()); + msg_t *msg = msg_new_moved_from_bytearray(MSG_OPCODE_ACK, bytearray_new()); return msg; } @@ -88,10 +90,11 @@ void on_receive_hello(const msg_t *_msg, const msgnetwork_conn_t *conn, void *us const char *name = ((MyNet *)userdata)->name; MsgHello msg = msg_hello_unserialize(_msg); printf("[%s] %s says %s\n", name, msg.name, msg.text); + free(msg.name); + free(msg.text); msg_t *ack = msg_ack_serialize(); /* send acknowledgement */ - msgnetwork_send_msg(net, ack, conn); - msg_free(ack); + msgnetwork_send_msg_by_move(net, ack, conn); } void on_receive_ack(const msg_t *msg, const msgnetwork_conn_t *conn, void *userdata) { @@ -110,8 +113,7 @@ void conn_handler(const msgnetwork_conn_t *conn, bool connected, void *userdata) printf("[%s] Connected, sending hello.", name); /* send the first message through this connection */ msg_t *hello = msg_hello_serialize(name, "Hello there!"); - msgnetwork_send_msg(n->net, hello, conn); - msg_free(hello); + msgnetwork_send_msg_by_move(n->net, hello, conn); } else printf("[%s] Accepted, waiting for greetings.\n", name); @@ -120,7 +122,9 @@ void conn_handler(const msgnetwork_conn_t *conn, bool connected, void *userdata) { printf("[%s] Disconnected, retrying.\n", name); /* try to reconnect to the same address */ - msgnetwork_connect(net, msgnetwork_conn_get_addr(conn)); + netaddr_t *addr = msgnetwork_conn_get_addr(conn); + msgnetwork_connect(net, addr); + netaddr_free(addr); } } @@ -136,7 +140,7 @@ MyNet gen_mynet(const eventcontext_t *ec, static eventcontext_t *ec; -void on_term_signal(int sig) { +void on_term_signal(int sig, void *userdata) { eventcontext_stop(ec); } @@ -166,25 +170,25 @@ int main() { msgnetwork_listen(bob.net, bob_addr); /* try to connect once */ - msgnetwork_connect(alice.net, bob_addr); - msgnetwork_connect(bob.net, alice_addr); + msgnetwork_conn_free(msgnetwork_connect(alice.net, bob_addr)); + msgnetwork_conn_free(msgnetwork_connect(bob.net, alice_addr)); netaddr_free(alice_addr); netaddr_free(bob_addr); /* the main loop can be shutdown by ctrl-c or kill */ - sigev_t *ev_sigint = sigev_new(ec, on_term_signal); - sigev_t *ev_sigterm = sigev_new(ec, on_term_signal); + sigev_t *ev_sigint = sigev_new(ec, on_term_signal, NULL); + sigev_t *ev_sigterm = sigev_new(ec, on_term_signal, NULL); sigev_add(ev_sigint, SIGINT); sigev_add(ev_sigterm, SIGTERM); /* enter the main loop */ eventcontext_dispatch(ec); - sigev_free(ev_sigint); - sigev_free(ev_sigterm); msgnetwork_free(alice.net); msgnetwork_free(bob.net); + sigev_free(ev_sigint); + sigev_free(ev_sigterm); eventcontext_free(ec); return 0; } diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp index 70e3444..7321217 100644 --- a/test/test_p2p_stress.cpp +++ b/test/test_p2p_stress.cpp @@ -114,6 +114,11 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) { } } }); + net.reg_error_handler([ec](const std::exception &err, bool fatal) { + SALTICIDAE_LOG_WARN("main thread captured %s error: %s", + fatal ? "fatal" : "recoverable", err.what()); + ec.stop(); + }); net.reg_handler([&](MsgRand &&msg, const MyNet::conn_t &conn) { uint256_t hash = salticidae::get_hash(msg.bytes); net.send_msg(MsgAck(hash), conn); -- cgit v1.2.3-70-g09d2