aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.rst277
-rw-r--r--include/salticidae/conn.h37
-rw-r--r--include/salticidae/event.h135
-rw-r--r--include/salticidae/network.h2
-rw-r--r--test/.gitignore4
-rw-r--r--test/CMakeLists.txt4
-rw-r--r--test/test_msgnet.cpp (renamed from test/test_network.cpp)47
-rw-r--r--test/test_p2p.cpp31
-rw-r--r--test/test_p2p_stress.cpp24
9 files changed, 309 insertions, 252 deletions
diff --git a/README.rst b/README.rst
index 2a40304..23158e2 100644
--- a/README.rst
+++ b/README.rst
@@ -1,4 +1,4 @@
-Salticidae: a Minimal C++ asynchronous network library.
+Salticidae: Minimal C++ asynchronous network library.
=======================================================
.. image:: https://img.shields.io/badge/License-MIT-yellow.svg
@@ -67,135 +67,146 @@ Example (MsgNetwork layer)
--------------------------
.. code-block:: cpp
- #include <cstdio>
- #include <string>
- #include <functional>
-
- #include "salticidae/msg.h"
- #include "salticidae/event.h"
- #include "salticidae/network.h"
- #include "salticidae/stream.h"
-
- using salticidae::NetAddr;
- using salticidae::DataStream;
- using salticidae::MsgNetwork;
- using salticidae::htole;
- using salticidae::letoh;
- using std::placeholders::_1;
- using std::placeholders::_2;
-
- /** Hello Message. */
- struct MsgHello {
- static const uint8_t opcode = 0x0;
- DataStream serialized;
- std::string name;
- std::string text;
- /** Defines how to serialize the msg. */
- MsgHello(const std::string &name,
- const std::string &text) {
- serialized << htole((uint32_t)name.length());
- serialized << name << text;
- }
- /** Defines how to parse the msg. */
- MsgHello(DataStream &&s) {
- uint32_t len;
- s >> len;
- len = letoh(len);
- name = std::string((const char *)s.get_data_inplace(len), len);
- len = s.size();
- text = std::string((const char *)s.get_data_inplace(len), len);
- }
- };
-
- /** Acknowledgement Message. */
- struct MsgAck {
- static const uint8_t opcode = 0x1;
- DataStream serialized;
- MsgAck() {}
- MsgAck(DataStream &&s) {}
- };
-
- const uint8_t MsgHello::opcode;
- const uint8_t MsgAck::opcode;
-
- using MsgNetworkByteOp = MsgNetwork<uint8_t>;
-
- struct MyNet: public MsgNetworkByteOp {
- const std::string name;
- const NetAddr peer;
-
- MyNet(const salticidae::EventContext &ec,
- const std::string name,
- const NetAddr &peer):
- MsgNetwork<uint8_t>(ec, 10, 1.0, 4096),
- name(name),
- peer(peer) {
- /* message handler could be a bound method */
- reg_handler(salticidae::generic_bind(
- &MyNet::on_receive_hello, this, _1, _2));
-
- reg_conn_handler([this](ConnPool::Conn &conn, bool connected) {
- if (connected)
- {
- if (conn.get_mode() == ConnPool::Conn::ACTIVE)
- {
- printf("[%s] Connected, sending hello.\n",
- this->name.c_str());
- /* send the first message through this connection */
- send_msg(MsgHello(this->name, "Hello there!"),
- static_cast<Conn &>(conn));
- }
- else
- printf("[%s] Accepted, waiting for greetings.\n",
- this->name.c_str());
- }
- else
- {
- printf("[%s] Disconnected, retrying.\n", this->name.c_str());
- /* try to reconnect to the same address */
- connect(conn.get_addr());
- }
- });
- }
-
- void on_receive_hello(MsgHello &&msg, MyNet::Conn &conn) {
- printf("[%s] %s says %s\n",
- name.c_str(),
- msg.name.c_str(), msg.text.c_str());
- /* send acknowledgement */
- send_msg(MsgAck(), conn);
- }
- };
-
-
- void on_receive_ack(MsgAck &&msg, MyNet::Conn &conn) {
- auto net = static_cast<MyNet *>(conn.get_net());
- printf("[%s] the peer knows\n", net->name.c_str());
- }
-
- salticidae::EventContext ec;
- NetAddr alice_addr("127.0.0.1:12345");
- NetAddr bob_addr("127.0.0.1:12346");
-
- int main() {
- /* test two nodes */
- MyNet alice(ec, "Alice", bob_addr);
- MyNet bob(ec, "Bob", alice_addr);
-
- /* message handler could be a normal function */
- alice.reg_handler(on_receive_ack);
- bob.reg_handler(on_receive_ack);
-
- alice.start();
- bob.start();
-
- alice.listen(alice_addr);
- bob.listen(bob_addr);
-
- /* first attempt */
- alice.connect(bob_addr);
- bob.connect(alice_addr);
-
- ec.dispatch();
- return 0;
- }
+
+ #include <cstdio>
+ #include <string>
+ #include <functional>
+
+ #include "salticidae/msg.h"
+ #include "salticidae/event.h"
+ #include "salticidae/network.h"
+ #include "salticidae/stream.h"
+
+ using salticidae::NetAddr;
+ using salticidae::DataStream;
+ using salticidae::MsgNetwork;
+ using salticidae::htole;
+ using salticidae::letoh;
+ using std::placeholders::_1;
+ using std::placeholders::_2;
+
+ /** Hello Message. */
+ struct MsgHello {
+ static const uint8_t opcode = 0x0;
+ DataStream serialized;
+ std::string name;
+ std::string text;
+ /** Defines how to serialize the msg. */
+ MsgHello(const std::string &name,
+ const std::string &text) {
+ serialized << htole((uint32_t)name.length());
+ serialized << name << text;
+ }
+ /** Defines how to parse the msg. */
+ MsgHello(DataStream &&s) {
+ uint32_t len;
+ s >> len;
+ len = letoh(len);
+ name = std::string((const char *)s.get_data_inplace(len), len);
+ len = s.size();
+ text = std::string((const char *)s.get_data_inplace(len), len);
+ }
+ };
+
+ /** Acknowledgement Message. */
+ struct MsgAck {
+ static const uint8_t opcode = 0x1;
+ DataStream serialized;
+ MsgAck() {}
+ MsgAck(DataStream &&s) {}
+ };
+
+ const uint8_t MsgHello::opcode;
+ const uint8_t MsgAck::opcode;
+
+ using MsgNetworkByteOp = MsgNetwork<uint8_t>;
+
+ struct MyNet: public MsgNetworkByteOp {
+ const std::string name;
+ const NetAddr peer;
+
+ MyNet(const salticidae::EventContext &ec,
+ const std::string name,
+ const NetAddr &peer):
+ MsgNetwork<uint8_t>(ec, MsgNetwork::Config()),
+ name(name),
+ peer(peer) {
+ /* message handler could be a bound method */
+ reg_handler(
+ salticidae::generic_bind(&MyNet::on_receive_hello, this, _1, _2));
+
+ reg_conn_handler([this](const ConnPool::conn_t &conn, bool connected) {
+ if (connected)
+ {
+ if (conn->get_mode() == ConnPool::Conn::ACTIVE)
+ {
+ printf("[%s] Connected, sending hello.\n",
+ this->name.c_str());
+ /* send the first message through this connection */
+ send_msg(MsgHello(this->name, "Hello there!"),
+ salticidae::static_pointer_cast<Conn>(conn));
+ }
+ else
+ printf("[%s] Accepted, waiting for greetings.\n",
+ this->name.c_str());
+ }
+ else
+ {
+ printf("[%s] Disconnected, retrying.\n", this->name.c_str());
+ /* try to reconnect to the same address */
+ connect(conn->get_addr());
+ }
+ });
+ }
+
+ void on_receive_hello(MsgHello &&msg, const MyNet::conn_t &conn) {
+ printf("[%s] %s says %s\n",
+ name.c_str(),
+ msg.name.c_str(), msg.text.c_str());
+ /* send acknowledgement */
+ send_msg(MsgAck(), conn);
+ }
+ };
+
+
+ void on_receive_ack(MsgAck &&msg, const MyNet::conn_t &conn) {
+ auto net = static_cast<MyNet *>(conn->get_net());
+ printf("[%s] the peer knows\n", net->name.c_str());
+ }
+
+ int main() {
+ salticidae::EventContext ec;
+ NetAddr alice_addr("127.0.0.1:12345");
+ NetAddr bob_addr("127.0.0.1:12346");
+
+ /* test two nodes in the same main loop */
+ MyNet alice(ec, "Alice", bob_addr);
+ MyNet bob(ec, "Bob", alice_addr);
+
+ /* message handler could be a normal function */
+ alice.reg_handler(on_receive_ack);
+ bob.reg_handler(on_receive_ack);
+
+ /* start all threads */
+ alice.start();
+ bob.start();
+
+ /* accept incoming connections */
+ alice.listen(alice_addr);
+ bob.listen(bob_addr);
+
+ /* try to connect once */
+ alice.connect(bob_addr);
+ bob.connect(alice_addr);
+
+ /* the main loop can be shutdown by ctrl-c or kill */
+ auto shutdown = [&](int) {ec.stop();};
+ salticidae::SigEvent ev_sigint(ec, shutdown);
+ salticidae::SigEvent ev_sigterm(ec, shutdown);
+ ev_sigint.add(SIGINT);
+ ev_sigterm.add(SIGTERM);
+
+ /* enter the main loop */
+ ec.dispatch();
+ return 0;
+ }
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index c357875..265a02a 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -238,6 +238,7 @@ class ConnPool {
/* related to workers */
size_t nworker;
salticidae::BoxObj<Worker[]> workers;
+ bool worker_running;
void accept_client(int, int);
conn_t add_conn(const conn_t &conn);
@@ -319,7 +320,8 @@ class ConnPool {
conn_server_timeout(config._conn_server_timeout),
seg_buff_size(config._seg_buff_size),
listen_fd(-1),
- nworker(config._nworker) {
+ nworker(config._nworker),
+ worker_running(false) {
workers = new Worker[nworker];
user_tcall = new ThreadCall(ec);
disp_ec = workers[0].get_ec();
@@ -327,27 +329,22 @@ class ConnPool {
workers[0].set_dispatcher();
}
- ~ConnPool() {
- stop();
- for (auto it: pool)
- {
- conn_t conn = it.second;
- conn->stop();
- conn->self_ref = nullptr;
- }
- if (listen_fd != -1) close(listen_fd);
- }
+ ~ConnPool() { stop(); }
ConnPool(const ConnPool &) = delete;
ConnPool(ConnPool &&) = delete;
void start() {
+ if (worker_running) return;
SALTICIDAE_LOG_INFO("starting all threads...");
for (size_t i = 0; i < nworker; i++)
workers[i].start();
+ worker_running = true;
}
- void stop() {
+ void stop_workers() {
+ if (!worker_running) return;
+ worker_running = false;
SALTICIDAE_LOG_INFO("stopping all threads...");
/* stop all workers */
for (size_t i = 0; i < nworker; i++)
@@ -355,7 +352,21 @@ class ConnPool {
/* join all worker threads */
for (size_t i = 0; i < nworker; i++)
workers[i].get_handle().join();
- nworker = 0;
+ }
+
+ void stop() {
+ stop_workers();
+ for (auto it: pool)
+ {
+ conn_t conn = it.second;
+ conn->stop();
+ conn->self_ref = nullptr;
+ }
+ if (listen_fd != -1)
+ {
+ close(listen_fd);
+ listen_fd = -1;
+ }
}
/** Actively connect to remote addr. */
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index 021b5dc..d862ce8 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -189,6 +189,80 @@ class Event {
operator bool() const { return ev_fd != nullptr || ev_timer != nullptr; }
};
+class SigEvent {
+ public:
+ using callback_t = std::function<void(int signum)>;
+ private:
+ EventContext eb;
+ uv_signal_t *ev_sig;
+ callback_t callback;
+ static inline void sig_then(uv_signal_t *h, int signum) {
+ auto event = static_cast<SigEvent *>(h->data);
+ event->callback(signum);
+ }
+
+ static void _on_handle_close(uv_handle_t *h) {
+ delete h;
+ }
+
+ public:
+ SigEvent(): eb(nullptr), ev_sig(nullptr) {}
+ SigEvent(const EventContext &eb, callback_t callback):
+ eb(eb),
+ ev_sig(new uv_signal_t()),
+ callback(callback) {
+ uv_signal_init(eb.get(), ev_sig);
+ ev_sig->data = this;
+ }
+
+ SigEvent(const SigEvent &) = delete;
+ SigEvent(SigEvent &&other):
+ eb(std::move(other.eb)),
+ ev_sig(other.ev_sig),
+ callback(std::move(other.callback)) {
+ other.del();
+ other.ev_sig = nullptr;
+ ev_sig->data = this;
+ }
+
+ SigEvent &operator=(SigEvent &&other) {
+ clear();
+ other.del();
+ eb = std::move(other.eb);
+ ev_sig = other.ev_sig;
+ callback = std::move(other.callback);
+
+ other.ev_sig = nullptr;
+ ev_sig->data = this;
+ return *this;
+ }
+
+ ~SigEvent() { clear(); }
+
+ void clear() {
+ if (ev_sig != nullptr)
+ {
+ uv_signal_stop(ev_sig);
+ uv_close((uv_handle_t *)ev_sig, SigEvent::_on_handle_close);
+ ev_sig = nullptr;
+ }
+ callback = nullptr;
+ }
+
+ void set_callback(callback_t _callback) {
+ callback = _callback;
+ }
+
+ void add(int signum) {
+ uv_signal_start(ev_sig, SigEvent::sig_then, signum);
+ }
+ void del() {
+ uv_signal_stop(ev_sig);
+ }
+
+ operator bool() const { return ev_sig != nullptr; }
+};
+
template<typename T>
class ThreadNotifier {
std::condition_variable cv;
@@ -304,6 +378,8 @@ class ThreadCall {
write(ctl_fd[1], &h, sizeof(h));
return notifier.wait();
}
+
+ const EventContext &get_ec() const { return ec; }
};
@@ -362,65 +438,6 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
}
};
-
-
-// TODO: incorrect MPMCQueueEventDriven impl
-/*
-template<typename T>
-class MPMCQueueEventDriven: public MPMCQueue<T> {
- private:
- const uint64_t dummy = 1;
- std::atomic<bool> wait_sig;
- std::vector<std::pair<BoxObj<Event>, int>> evs;
-
- public:
- MPMCQueueEventDriven(size_t capacity = 65536):
- MPMCQueue<T>(capacity),
- wait_sig(true) {}
-
- template<typename Func>
- void listen(const EventContext &ec, Func &&func, size_t burst_size=128) {
- int fd = eventfd(0, EFD_NONBLOCK);
- evs.emplace(evs.end(), std::make_pair(new Event(ec, fd, EV_READ | EV_PERSIST,
- [this, func=std::forward<Func>(func), burst_size](int fd, int) {
- uint64_t t;
- read(fd, &t, 8);
- //fprintf(stderr, "%x\n", std::this_thread::get_id());
- T elem;
- size_t cnt = burst_size;
- while (MPMCQueue<T>::try_dequeue(elem))
- {
- func(std::move(elem));
- if (!--cnt)
- {
- write(fd, &dummy, 8);
- return;
- }
- }
- wait_sig.store(true, std::memory_order_relaxed);
- }), fd));
- evs.rbegin()->first->add();
- }
-
- ~MPMCQueueEventDriven() {
- for (const auto &p: evs)
- close(p.second);
- }
-
- template<typename U>
- bool enqueue(U &&e) {
- bool ret = MPMCQueue<T>::enqueue(std::forward<U>(e));
- if (wait_sig.exchange(false, std::memory_order_relaxed))
- {
- SALTICIDAE_LOG_DEBUG("mpmc notify");
- for (const auto &p: evs)
- write(p.second, &dummy, 8);
- }
- return ret;
- }
-};
-*/
-
}
#endif
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 43638cf..a71c5dd 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -423,7 +423,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
this->reg_handler(generic_bind(&PeerNetwork::msg_pong, this, _1, _2));
}
- ~PeerNetwork() { this->stop(); }
+ ~PeerNetwork() { this->stop_workers(); }
void add_peer(const NetAddr &paddr);
const conn_t get_peer_conn(const NetAddr &paddr) const;
diff --git a/test/.gitignore b/test/.gitignore
index 592f084..57a2e9b 100644
--- a/test/.gitignore
+++ b/test/.gitignore
@@ -1,6 +1,8 @@
test_msg
test_bits
-test_network
+test_msgnet
+test_p2p
+test_p2p_stress
test_queue
bench_network
Makefile
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 8f31ddb..14201e9 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -26,8 +26,8 @@ target_link_libraries(test_msg salticidae_static)
add_executable(test_bits test_bits.cpp)
target_link_libraries(test_bits salticidae_static)
-add_executable(test_network test_network.cpp)
-target_link_libraries(test_network salticidae_static)
+add_executable(test_msgnet test_msgnet.cpp)
+target_link_libraries(test_msgnet salticidae_static)
add_executable(test_p2p test_p2p.cpp)
target_link_libraries(test_p2p salticidae_static)
diff --git a/test/test_network.cpp b/test/test_msgnet.cpp
index 6a12117..088e0ff 100644
--- a/test/test_network.cpp
+++ b/test/test_msgnet.cpp
@@ -86,8 +86,8 @@ struct MyNet: public MsgNetworkByteOp {
name(name),
peer(peer) {
/* message handler could be a bound method */
- reg_handler(salticidae::generic_bind(
- &MyNet::on_receive_hello, this, _1, _2));
+ reg_handler(
+ salticidae::generic_bind(&MyNet::on_receive_hello, this, _1, _2));
reg_conn_handler([this](const ConnPool::conn_t &conn, bool connected) {
if (connected)
@@ -128,19 +128,12 @@ void on_receive_ack(MsgAck &&msg, const MyNet::conn_t &conn) {
printf("[%s] the peer knows\n", net->name.c_str());
}
-salticidae::EventContext ec;
-NetAddr alice_addr("127.0.0.1:12345");
-NetAddr bob_addr("127.0.0.1:12346");
-
-void signal_handler(int) {
- throw salticidae::SalticidaeError("got termination signal");
-}
-
int main() {
- signal(SIGTERM, signal_handler);
- signal(SIGINT, signal_handler);
+ salticidae::EventContext ec;
+ NetAddr alice_addr("127.0.0.1:12345");
+ NetAddr bob_addr("127.0.0.1:12346");
- /* test two nodes */
+ /* test two nodes in the same main loop */
MyNet alice(ec, "Alice", bob_addr);
MyNet bob(ec, "Bob", alice_addr);
@@ -148,18 +141,26 @@ int main() {
alice.reg_handler(on_receive_ack);
bob.reg_handler(on_receive_ack);
- try {
- alice.start();
- bob.start();
+ /* start all threads */
+ alice.start();
+ bob.start();
+
+ /* accept incoming connections */
+ alice.listen(alice_addr);
+ bob.listen(bob_addr);
- alice.listen(alice_addr);
- bob.listen(bob_addr);
+ /* try to connect once */
+ alice.connect(bob_addr);
+ bob.connect(alice_addr);
- /* first attempt */
- alice.connect(bob_addr);
- bob.connect(alice_addr);
+ /* the main loop can be shutdown by ctrl-c or kill */
+ auto shutdown = [&](int) {ec.stop();};
+ salticidae::SigEvent ev_sigint(ec, shutdown);
+ salticidae::SigEvent ev_sigterm(ec, shutdown);
+ ev_sigint.add(SIGINT);
+ ev_sigterm.add(SIGTERM);
- ec.dispatch();
- } catch (salticidae::SalticidaeError &e) {}
+ /* enter the main loop */
+ ec.dispatch();
return 0;
}
diff --git a/test/test_p2p.cpp b/test/test_p2p.cpp
index 1c37aa6..e884930 100644
--- a/test/test_p2p.cpp
+++ b/test/test_p2p.cpp
@@ -82,25 +82,20 @@ std::vector<NetAddr> addrs = {
NetAddr("127.0.0.1:12348")
};
-void signal_handler(int) {
- throw salticidae::SalticidaeError("got termination signal");
-}
+salticidae::EventContext ec;
+MyNet net(ec, MyNet::Config().conn_timeout(5).ping_period(2));
int main(int argc, char **argv) {
- signal(SIGTERM, signal_handler);
- signal(SIGINT, signal_handler);
-
- salticidae::EventContext ec;
- /* test two nodes */
- MyNet net(ec, MyNet::Config().conn_timeout(5).ping_period(2));
-
- try {
- int i;
- net.start();
- net.listen(addrs[i = atoi(argv[1])]);
- for (int j = 0; j < addrs.size(); j++)
- if (i != j) net.add_peer(addrs[j]);
- ec.dispatch();
- } catch (salticidae::SalticidaeError &e) {}
+ int i;
+ net.start();
+ net.listen(addrs[i = atoi(argv[1])]);
+ for (int j = 0; j < addrs.size(); j++)
+ if (i != j) net.add_peer(addrs[j]);
+ auto shutdown = [&](int) {ec.stop();};
+ salticidae::SigEvent ev_sigint(ec, shutdown);
+ salticidae::SigEvent ev_sigterm(ec, shutdown);
+ ev_sigint.add(SIGINT);
+ ev_sigterm.add(SIGTERM);
+ ec.dispatch();
return 0;
}
diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp
index b58bcc7..ac6168a 100644
--- a/test/test_p2p_stress.cpp
+++ b/test/test_p2p_stress.cpp
@@ -44,6 +44,7 @@ using salticidae::bytearray_t;
using salticidae::uint256_t;
using salticidae::static_pointer_cast;
using salticidae::Config;
+using salticidae::ThreadCall;
using std::placeholders::_1;
using std::placeholders::_2;
@@ -159,15 +160,19 @@ int main(int argc, char **argv) {
for (int i = 0; i < opt_npeers->get(); i++)
addrs.push_back(NetAddr("127.0.0.1:" + std::to_string(12345 + i)));
std::vector<std::thread> peers;
+ std::vector<salticidae::BoxObj<ThreadCall>> tcalls;
+ tcalls.resize(addrs.size());
+ size_t i = 0;
for (auto &addr: addrs)
{
- peers.push_back(std::thread([&, addr]() {
+ peers.push_back(std::thread([&, addr, i]() {
EventContext ec;
std::unordered_map<NetAddr, TestContext> tc;
MyNet net(ec, MyNet::Config(
salticidae::ConnPool::Config()
.nworker(opt_nworker->get()).seg_buff_size(seg_buff_size))
.conn_timeout(5).ping_period(2));
+ tcalls[i] = new ThreadCall(ec);
if (!opt_no_msg->get())
install_proto(ec, net, tc, seg_buff_size);
try {
@@ -178,7 +183,22 @@ int main(int argc, char **argv) {
ec.dispatch();
} catch (salticidae::SalticidaeError &e) {}
}));
+ i++;
}
- for (auto &t: peers) t.join();
+
+ EventContext ec;
+ auto shutdown = [&](int) {
+ for (auto &tc: tcalls)
+ tc->async_call([ec=tc->get_ec()](ThreadCall::Handle &) {
+ ec.stop();
+ });
+ for (auto &t: peers) t.join();
+ ec.stop();
+ };
+ salticidae::SigEvent ev_sigint(ec, shutdown);
+ salticidae::SigEvent ev_sigterm(ec, shutdown);
+ ev_sigint.add(SIGINT);
+ ev_sigterm.add(SIGTERM);
+ ec.dispatch();
return 0;
}