diff options
-rw-r--r-- | CMakeLists.txt | 6 | ||||
-rw-r--r-- | include/salticidae/event.h | 6 | ||||
-rw-r--r-- | include/salticidae/msg.h | 19 | ||||
-rw-r--r-- | include/salticidae/netaddr.h | 18 | ||||
-rw-r--r-- | include/salticidae/network.h | 57 | ||||
-rw-r--r-- | include/salticidae/stream.h | 37 | ||||
-rw-r--r-- | include/salticidae/type.h | 6 | ||||
-rw-r--r-- | src/config.h.in | 1 | ||||
-rw-r--r-- | src/msg.cpp | 0 | ||||
-rw-r--r-- | src/netaddr.cpp | 30 | ||||
-rw-r--r-- | src/network.cpp | 52 | ||||
-rw-r--r-- | src/stream.cpp | 69 | ||||
-rw-r--r-- | test/test_msgnet_c.c | 171 |
13 files changed, 467 insertions, 5 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 903a66f..29ea67d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,7 +32,10 @@ include_directories(include) add_library(salticidae OBJECT src/util.cpp - src/conn.cpp) + src/conn.cpp + src/network.cpp + src/stream.cpp + src/netaddr.cpp) option(BUILD_SHARED "build shared library." OFF) if(BUILD_SHARED) @@ -59,6 +62,7 @@ option(SALTICIDAE_NORMAL_LOG "enable regular log" ON) option(SALTICIDAE_MSG_STAT "enable message statistics" ON) option(SALTICIDAE_NOCHECK "disable the sanity check" OFF) option(SALTICIDAE_NOCHECKSUM " disable checksum in messages" OFF) +option(SALTICIDAE_CBINDINGS "enable C bindings" OFF) configure_file(src/config.h.in include/salticidae/config.h @ONLY) diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 946dda1..19bd250 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -35,6 +35,7 @@ #include "salticidae/util.h" #include "salticidae/ref.h" +#ifdef __cplusplus namespace salticidae { struct _event_context_deleter { @@ -644,4 +645,9 @@ class ThreadCall { } +using eventcontext_t = salticidae::EventContext; + +#else +typedef struct eventcontext_t; +#endif #endif diff --git a/include/salticidae/msg.h b/include/salticidae/msg.h index 03eb6dd..128e287 100644 --- a/include/salticidae/msg.h +++ b/include/salticidae/msg.h @@ -34,6 +34,7 @@ #include "salticidae/stream.h" #include "salticidae/netaddr.h" +#ifdef __cplusplus namespace salticidae { template<typename OpcodeType> @@ -69,6 +70,14 @@ class MsgBase { set_checksum(); } +#ifdef SALTICIDAE_CBINDINGS + MsgBase(const OpcodeType &opcode, bytearray_t &&payload): magic(0x0) { + set_opcode(opcode); + set_payload(std::move(payload)); + set_checksum(); + } +#endif + MsgBase(const MsgBase &other): magic(other.magic), opcode(other.opcode), @@ -258,4 +267,14 @@ const size_t MsgBase<OpcodeType>::header_size = 0; } +using msg_t = salticidae::MsgBase<_opcode_t>; +#else +typedef struct msg_t msg_t; +#endif + +extern "C" { + +void msg_new(); +} + #endif diff --git a/include/salticidae/netaddr.h b/include/salticidae/netaddr.h index 224d1d4..917c498 100644 --- a/include/salticidae/netaddr.h +++ b/include/salticidae/netaddr.h @@ -34,10 +34,10 @@ #include "salticidae/util.h" #include "salticidae/stream.h" +#ifdef __cplusplus namespace salticidae { /* TODO: IPv6 support */ - struct NetAddr { uint32_t ip; uint16_t port; @@ -124,4 +124,20 @@ namespace std { }; } +using netaddr_t = salticidae::NetAddr; +#else +typedef struct netaddr_t netaddr_t; +#endif + +extern "C" { + +netaddr_t *netaddr_new(); +netaddr_t *netaddr_new_from_ip_port(uint32_t ip, uint16_t port); +netaddr_t *netaddr_new_from_sip_port(const char *ip, uint16_t port); +netaddr_t *netaddr_new_from_sipport(const char *ip_port_addr); +bool netaddr_is_eq(const netaddr_t *a, const netaddr_t *b); +bool netaddr_is_null(const netaddr_t *self); + +} + #endif diff --git a/include/salticidae/network.h b/include/salticidae/network.h index f3dd313..12fe720 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -30,8 +30,8 @@ #include "salticidae/msg.h" #include "salticidae/conn.h" +#ifdef __cplusplus namespace salticidae { - /** Network of nodes who can send async messages. */ template<typename OpcodeType> class MsgNetwork: public ConnPool { @@ -156,6 +156,9 @@ class MsgNetwork: public ConnPool { { auto &msg = item.first; auto &conn = item.second; +#ifdef SALTICIDAE_CBINDINGS_INJECT_CALLBACK + salticidae_injected_msg_callback(&msg, conn.get()); +#else auto it = handler_map.find(msg.get_opcode()); if (it == handler_map.end()) SALTICIDAE_LOG_WARN("unknown opcode: %s", @@ -171,6 +174,7 @@ class MsgNetwork: public ConnPool { #endif it->second(msg, conn); } +#endif if (++cnt == burst_size) return true; } return false; @@ -182,14 +186,23 @@ class MsgNetwork: public ConnPool { typename callback_traits<Func>::msg_type, DataStream &&>::value>::type reg_handler(Func handler) { using callback_t = callback_traits<Func>; - handler_map[callback_t::msg_type::opcode] = [handler](const Msg &msg, const conn_t &conn) { + set_handler(callback_t::msg_type::opcode, + [handler](const Msg &msg, const conn_t &conn) { handler(typename callback_t::msg_type(msg.get_payload()), static_pointer_cast<typename callback_t::conn_type>(conn)); - }; + }); + } + + template<typename Func> + inline void set_handler(OpcodeType opcode, Func handler) { + handler_map[opcode] = handler; } template<typename MsgType> bool send_msg(MsgType &&msg, const conn_t &conn); +#ifdef SALTICIDAE_CBINDINGS + inline bool send_msg(const Msg &msg, const conn_t &conn); +#endif using ConnPool::listen; conn_t connect(const NetAddr &addr) { return static_pointer_cast<Conn>(ConnPool::connect(addr)); @@ -468,6 +481,11 @@ template<typename OpcodeType> template<typename MsgType> bool MsgNetwork<OpcodeType>::send_msg(MsgType &&_msg, const conn_t &conn) { Msg msg(std::forward<MsgType>(_msg)); + return send_msg(msg, conn); +} + +template<typename OpcodeType> +inline bool MsgNetwork<OpcodeType>::send_msg(const Msg &msg, const conn_t &conn) { bytearray_t msg_data = msg.serialize(); SALTICIDAE_LOG_DEBUG("wrote message %s to %s", std::string(msg).c_str(), @@ -764,4 +782,37 @@ const O PeerNetwork<O, _, OPCODE_PONG>::MsgPong::opcode = OPCODE_PONG; } +using msgnetwork_t = salticidae::MsgNetwork<_opcode_t>; +using msgnetwork_config_t = msgnetwork_t::Config; +using msgnetwork_conn_t = msgnetwork_t::conn_t; + +#else +typedef struct msg_t; +typedef struct msgnetwork_t; +typedef struct msgnetwork_config_t; +typedef struct msgnetwork_conn_t; +#endif + +extern "C" { + +void salticidae_injected_msg_callback(const msg_t *msg, msgnetwork_conn_t *conn); + +msg_t _test_create_msg(); +msgnetwork_t *msgnetwork_new(const eventcontext_t *ec, const msgnetwork_config_t *config); + +bool msgnetwork_send_msg(msgnetwork_t *self, const msg_t *msg, const msgnetwork_conn_t *conn); + +msgnetwork_conn_t *msgnetwork_connect(msgnetwork_t *self, const netaddr_t *addr); + +void msgnetwork_listen(msgnetwork_t *self, const netaddr_t *listen_addr); + +typedef void (*msgnetwork_msg_callback_t)(const msg_t *msg, const msgnetwork_conn_t *conn); + +#ifdef SALTICIDAE_CBINDINGS_STR_OP +void msgnetwork_reg_handler(msgnetwork_t *self, const char *opcode, msgnetwork_msg_callback_t cb); +#else +void msgnetwork_reg_handler(msgnetwork_t *self, uint8_t opcode, msgnetwork_msg_callback_t cb); +#endif +} + #endif diff --git a/include/salticidae/stream.h b/include/salticidae/stream.h index 5f54195..9eda2bd 100644 --- a/include/salticidae/stream.h +++ b/include/salticidae/stream.h @@ -29,6 +29,7 @@ #include "salticidae/ref.h" #include "salticidae/crypto.h" +#ifdef __cplusplus namespace salticidae { template<size_t N, typename T> class Blob; @@ -466,4 +467,40 @@ namespace std { }; } +using uint256_t = salticidae::uint256_t; +using datastream_t = salticidae::DataStream; + +#else +typedef struct datastream_t; +typedef struct uint256_t; +#endif + +extern "C" { + +uint256_t *uint256_new(); +uint256_t *uint256_new_from_bytes(const uint8_t *arr); +bool uint256_is_null(const uint256_t *self); +bool uint256_is_eq(const uint256_t *a, const uint256_t *b); +void uint256_serialize(const uint256_t *self, datastream_t *s); +void uint256_unserialize(uint256_t *self, datastream_t *s); + +datastream_t *datastream_new(); +datastream_t *datastream_new_from_bytes(const uint8_t *begin, const uint8_t *end); +void datastream_assign_by_copy(datastream_t *dst, const datastream_t *src); +void datastream_assign_by_move(datastream_t *dst, datastream_t *src); +uint8_t *datastream_data(datastream_t *self); +void datastream_clear(datastream_t *self); +size_t datastream_size(const datastream_t *self); +void datastream_put_u8(datastream_t *self, uint8_t val); +void datastream_put_u16(datastream_t *self, uint16_t val); +void datastream_put_u32(datastream_t *self, uint32_t val); +void datastream_put_i8(datastream_t *self, int8_t val); +void datastream_put_i16(datastream_t *self, int16_t val); +void datastream_put_i32(datastream_t *self, int32_t val); +void datastream_put_data(datastream_t *self, + uint8_t *begin, uint8_t *end); +const uint8_t *datastream_get_data_inplace(datastream_t *self, size_t len); +uint256_t *datastream_get_hash(const datastream_t *self); + +} #endif diff --git a/include/salticidae/type.h b/include/salticidae/type.h index 68deeb0..3ec202a 100644 --- a/include/salticidae/type.h +++ b/include/salticidae/type.h @@ -89,4 +89,10 @@ inline auto generic_bind(ReturnType(ClassType::* f)(Args...), FArgs&&... fargs) } +#ifdef SALTICIDAE_CBINDINGS_STR_OP +using _opcode_t = std::string; +#else +using _opcode_t = uint8_t; +#endif + #endif diff --git a/src/config.h.in b/src/config.h.in index fb39d63..c4ab698 100644 --- a/src/config.h.in +++ b/src/config.h.in @@ -6,5 +6,6 @@ #cmakedefine SALTICIDAE_MSG_STAT #cmakedefine SALTICIDAE_NOCHECK #cmakedefine SALTICIDAE_NOCHECKSUM +#cmakedefine SALTICIDAE_CBINDINGS #endif diff --git a/src/msg.cpp b/src/msg.cpp new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/msg.cpp diff --git a/src/netaddr.cpp b/src/netaddr.cpp new file mode 100644 index 0000000..a3a54bf --- /dev/null +++ b/src/netaddr.cpp @@ -0,0 +1,30 @@ +#include "salticidae/netaddr.h" + +using namespace salticidae; + +#ifdef __cplusplus + +extern "C" { + +netaddr_t *netaddr_new() { return new NetAddr(); } +netaddr_t *netaddr_new_from_ip_port(uint32_t ip, uint16_t port) { + return new NetAddr(ip, port); +} + +netaddr_t *netaddr_new_from_sip_port(const char *ip, uint16_t port) { + return new NetAddr(ip, port); +} + +netaddr_t *netaddr_new_from_sipport(const char *ip_port_addr) { + return new NetAddr(ip_port_addr); +} + +bool netaddr_is_eq(const netaddr_t *a, const netaddr_t *b) { + return *a == *b; +} + +bool netaddr_is_null(const netaddr_t *self) { return self->is_null(); } + +} + +#endif diff --git a/src/network.cpp b/src/network.cpp new file mode 100644 index 0000000..cefe723 --- /dev/null +++ b/src/network.cpp @@ -0,0 +1,52 @@ +#include "salticidae/network.h" +#ifdef SALTICIDAE_CBINDINGS + +using namespace salticidae; + +extern "C" { + +msg_t _test_create_msg() { + return msg_t(0x0, bytearray_t()); +} + + +msgnetwork_t *msgnetwork_new(const EventContext *ec, const msgnetwork_config_t *config) { + return new msgnetwork_t(*ec, *config); +} + +bool msgnetwork_send_msg(msgnetwork_t *self, + const msg_t *msg, const msgnetwork_conn_t *conn) { + return self->send_msg(*msg, *conn); +} + +msgnetwork_conn_t *msgnetwork_connect(msgnetwork_t *self, const netaddr_t *addr) { + return new msgnetwork_t::conn_t(self->connect(*addr)); +} + +void msgnetwork_listen(msgnetwork_t *self, const netaddr_t *listen_addr) { + self->listen(*listen_addr); +} + +#ifdef SALTICIDAE_CBINDINGS_STR_OP +void msgnetwork_reg_handler(msgnetwork_t *self, + const char *opcode, + msgnetwork_msg_callback_t cb) { + self->set_handler(std::string(opcode), + [cb](const msgnetwork_t::Msg &msg, const msgnetwork_t::conn_t &conn) { + cb(&msg, &conn); + }); +} +#else +void msgnetwork_reg_handler(msgnetwork_t *self, + uint8_t opcode, + msgnetwork_msg_callback_t cb) { + self->set_handler(opcode, + [cb](const msgnetwork_t::Msg &msg, const msgnetwork_t::conn_t &conn) { + cb(&msg, &conn); + }); +} +#endif + +} + +#endif diff --git a/src/stream.cpp b/src/stream.cpp new file mode 100644 index 0000000..8846414 --- /dev/null +++ b/src/stream.cpp @@ -0,0 +1,69 @@ +#include "salticidae/stream.h" + +using namespace salticidae; + +#ifdef __cplusplus + +extern "C" { + +uint256_t *uint256_new() { return new uint256_t(); } +uint256_t *uint256_new_from_bytes(const uint8_t *arr) { + return new uint256_t(arr); +} + +bool uint256_is_null(const uint256_t *self) { return self->is_null(); } +bool uint256_is_eq(const uint256_t *a, const uint256_t *b) { + return *a == *b; +} + +void uint256_serialize(const uint256_t *self, datastream_t *s) { + self->serialize(*s); +} + +void uint256_unserialize(uint256_t *self, datastream_t *s) { + self->unserialize(*s); +} + +datastream_t *datastream_new() { return new DataStream(); } +datastream_t *datastream_new_from_bytes(const uint8_t *begin, const uint8_t *end) { + return new DataStream(begin, end); +} + +void datastream_assign_by_copy(datastream_t *dst, const datastream_t *src) { + *dst = *src; +} + +void datastream_assign_by_move(datastream_t *dst, datastream_t *src) { + *dst = std::move(*src); +} + +uint8_t *datastream_data(datastream_t *self) { return self->data(); } + +void datastream_clear(datastream_t *self) { self->clear(); } + +size_t datastream_size(const datastream_t *self) { return self->size(); } + +void datastream_put_u8(datastream_t *self, uint8_t val) { *self << val; } +void datastream_put_u16(datastream_t *self, uint16_t val) { *self << val; } +void datastream_put_u32(datastream_t *self, uint32_t val) { *self << val; } + +void datastream_put_i8(datastream_t *self, int8_t val) { *self << val; } +void datastream_put_i16(datastream_t *self, int16_t val) { *self << val; } +void datastream_put_i32(datastream_t *self, int32_t val) { *self << val; } + +void datastream_put_data(datastream_t *self, + uint8_t *begin, uint8_t *end) { + self->put_data(begin, end); +} + +const uint8_t *datastream_get_data_inplace(datastream_t *self, size_t len) { + return self->get_data_inplace(len); +} + +uint256_t *datastream_get_hash(const datastream_t *self) { + return new uint256_t(self->get_hash()); +} + +} + +#endif diff --git a/test/test_msgnet_c.c b/test/test_msgnet_c.c new file mode 100644 index 0000000..1098e19 --- /dev/null +++ b/test/test_msgnet_c.c @@ -0,0 +1,171 @@ +/** + * Copyright (c) 2018 Cornell University. + * + * Author: Ted Yin <[email protected]> + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is furnished to do + * so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include <stdio.h> +#include <string.h> + +#include "salticidae/event.h" +#include "salticidae/network.h" +#include "salticidae/stream.h" + +/** Hello Message. */ +const uint8_t msg_hello_opcode = 0x0; +typedef struct MsgHello { + const char *name; + const char *text; +} MsgHello; +/** Defines how to serialize the msg. */ +datastream_t msg_hello_serialize(const char *name, const char *text) { + datastream_t *serialized = msgnet_ + serialized << htole((uint32_t)name.length()); + serialized << name << text; +} + +/** Defines how to parse the msg. */ +MsgHello MsgHello(DataStream &&s) { + MsgHello res; + uint32_t len; + s >> len; + len = letoh(len); + res.name = std::string((const char *)s.get_data_inplace(len), len); + len = s.size(); + res.text = std::string((const char *)s.get_data_inplace(len), len); + return res; +} + +/** Acknowledgement Message. */ +struct MsgAck { + static const uint8_t opcode = 0x1; + DataStream serialized; + MsgAck() {} + MsgAck(DataStream &&s) {} +}; + +const uint8_t MsgHello::opcode; +const uint8_t MsgAck::opcode; + +using MsgNetworkByteOp = MsgNetwork<uint8_t>; + +typedef struct MyNet { + msgnetwork_t *net; + const std::string name; + const NetAddr peer; +} MyNet; + +void msg_hello_handler(const msg_t *msg, const msgnetwork_conn_t *conn) { + +} + +void msg_ack_handler(const msg_t *msg, const msgnetwork_conn_t *conn) { +} + +void alice_conn_handler(const msgnetwork_conn_t *conn, bool connected) { + if (connected) + { + if (conn->get_mode() == ConnPool::Conn::ACTIVE) + { + puts("[alice] Connected, sending hello."); + /* send the first message through this connection */ + msgnetwork_send_msg(alice, + msg_hello_serialize("alice", "Hello there!"), conn); + } + else + printf("[alice] Accepted, waiting for greetings.\n", + this->name.c_str()); + } + else + { + printf("[alice] Disconnected, retrying.\n", this->name.c_str()); + /* try to reconnect to the same address */ + connect(conn->get_addr()); + } +} + +MyNet mynet_new(const salticidae::EventContext &ec, + const char *name, + const netaddr_t *peer) { + MyNet res; + const msgnetwork_config_t *netconfig = msgnetwork_config_new(); + res.net = msgnetwork_new(ec, netconfig); + res.name = name; + res.peer = peer; + + /* message handler could be a bound method */ + reg_handler(salticidae::generic_bind(&MyNet::on_receive_hello, this, _1, _2)); + + reg_conn_handler([this](const ConnPool::conn_t &conn, bool connected) { + }); + } + + void on_receive_hello(MsgHello &&msg, const MyNet::conn_t &conn) { + printf("[%s] %s says %s\n", + name.c_str(), + msg.name.c_str(), msg.text.c_str()); + /* send acknowledgement */ + send_msg(MsgAck(), conn); + } +}; + + +void on_receive_ack(MsgAck &&msg, const MyNet::conn_t &conn) { + auto net = static_cast<MyNet *>(conn->get_net()); + printf("[%s] the peer knows\n", net->name.c_str()); +} + +int main() { + salticidae::EventContext ec; + NetAddr alice_addr("127.0.0.1:12345"); + NetAddr bob_addr("127.0.0.1:12346"); + + /* test two nodes in the same main loop */ + MyNet alice(ec, "Alice", bob_addr); + MyNet bob(ec, "Bob", alice_addr); + + /* message handler could be a normal function */ + alice.reg_handler(on_receive_ack); + bob.reg_handler(on_receive_ack); + + /* start all threads */ + alice.start(); + bob.start(); + + /* accept incoming connections */ + alice.listen(alice_addr); + bob.listen(bob_addr); + + /* try to connect once */ + alice.connect(bob_addr); + bob.connect(alice_addr); + + /* the main loop can be shutdown by ctrl-c or kill */ + auto shutdown = [&](int) {ec.stop();}; + salticidae::SigEvent ev_sigint(ec, shutdown); + salticidae::SigEvent ev_sigterm(ec, shutdown); + ev_sigint.add(SIGINT); + ev_sigterm.add(SIGTERM); + + /* enter the main loop */ + ec.dispatch(); + return 0; +} |