From 3505e9d33eab6d341185773c1da315b2dc833a21 Mon Sep 17 00:00:00 2001 From: Determinant Date: Sun, 13 Oct 2019 14:05:29 -0400 Subject: WIP: bounded recv buffer --- include/salticidae/conn.h | 37 ++++++-- include/salticidae/network.h | 55 +++++++++-- src/conn.cpp | 22 ++++- test/.gitignore | 1 + test/CMakeLists.txt | 3 + test/bench_network.cpp | 8 ++ test/bench_network_tls.cpp | 8 ++ test/test_bounded_recv_buffer.cpp | 192 ++++++++++++++++++++++++++++++++++++++ test/test_p2p.cpp | 9 ++ test/test_p2p_stress.cpp | 8 ++ test/test_p2p_tls.cpp | 9 ++ test/test_queue.cpp | 11 +++ 12 files changed, 341 insertions(+), 22 deletions(-) create mode 100644 test/test_bounded_recv_buffer.cpp diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index 56d2d6a..a594da6 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -69,10 +69,11 @@ class ConnPool { ACTIVE, /**< the connection is established by connect() */ PASSIVE, /**< the connection is established by accept() */ }; - + protected: std::atomic terminated; size_t seg_buff_size; + size_t max_recv_buff_size; int fd; Worker *worker; ConnPool *cpool; @@ -86,13 +87,14 @@ class ConnPool { FdEvent ev_socket; /** does not need to wait if true */ bool ready_send; + bool ready_recv; typedef void (socket_io_func)(const conn_t &, int, int); socket_io_func *send_data_func; socket_io_func *recv_data_func; BoxObj tls; BoxObj peer_cert; - + static socket_io_func _recv_data; static socket_io_func _send_data; @@ -107,12 +109,13 @@ class ConnPool { virtual void stop(); public: - Conn(): terminated(false), worker(nullptr), ready_send(false), + Conn(): terminated(false), worker(nullptr), + ready_send(false), ready_recv(false), send_data_func(nullptr), recv_data_func(nullptr), tls(nullptr), peer_cert(nullptr) {} Conn(const Conn &) = delete; Conn(Conn &&other) = delete; - + virtual ~Conn() { SALTICIDAE_LOG_INFO("destroyed %s", std::string(*this).c_str()); } @@ -139,6 +142,7 @@ class ConnPool { }; protected: + int system_state; EventContext ec; EventContext disp_ec; ThreadCall* disp_tcall; @@ -178,6 +182,7 @@ class ConnPool { const int max_listen_backlog; const double conn_server_timeout; const size_t seg_buff_size; + const size_t max_recv_buff_size; const size_t queue_capacity; tls_context_t tls_ctx; @@ -230,7 +235,12 @@ class ConnPool { /* the following functions are called by the dispatcher */ void start() { - handle = std::thread([this]() { ec.dispatch(); }); + handle = std::thread([this]() { + sigset_t mask; + sigfillset(&mask); + pthread_sigmask(SIG_BLOCK, &mask, NULL); + ec.dispatch(); + }); } void enable_send_buffer(const conn_t &conn, int client_fd) { @@ -240,7 +250,8 @@ class ConnPool { if (conn->ready_send) { conn->ev_socket.del(); - conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE); + conn->ev_socket.add((conn->ready_recv ? 0 : FdEvent::READ) | + FdEvent::WRITE); conn->send_data_func(conn, client_fd, FdEvent::WRITE); } return false; @@ -318,7 +329,6 @@ class ConnPool { /* related to workers */ size_t nworker; salticidae::BoxObj workers; - int system_state; void accept_client(int, int); void conn_server(const conn_t &conn, int, int); @@ -348,6 +358,7 @@ class ConnPool { int _max_listen_backlog; double _conn_server_timeout; size_t _seg_buff_size; + size_t _max_recv_buff_size; size_t _nworker; size_t _queue_capacity; bool _enable_tls; @@ -363,6 +374,7 @@ class ConnPool { _max_listen_backlog(10), _conn_server_timeout(2), _seg_buff_size(4096), + _max_recv_buff_size(4096), _nworker(1), _queue_capacity(0), _enable_tls(false), @@ -388,6 +400,11 @@ class ConnPool { return *this; } + Config &max_recv_buff_size(size_t x) { + _max_recv_buff_size = x; + return *this; + } + Config &nworker(size_t x) { _nworker = std::max((size_t)1, x); return *this; @@ -435,17 +452,17 @@ class ConnPool { }; ConnPool(const EventContext &ec, const Config &config): - ec(ec), + system_state(0), ec(ec), enable_tls(config._enable_tls), async_id(0), max_listen_backlog(config._max_listen_backlog), conn_server_timeout(config._conn_server_timeout), seg_buff_size(config._seg_buff_size), + max_recv_buff_size(config._max_recv_buff_size), queue_capacity(config._queue_capacity), tls_ctx(nullptr), listen_fd(-1), - nworker(config._nworker), - system_state(0) { + nworker(config._nworker) { if (enable_tls) { tls_ctx = new TLSContext(); diff --git a/include/salticidae/network.h b/include/salticidae/network.h index fd69b58..6f7a034 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -45,7 +45,7 @@ class MsgNetwork: public ConnPool { template struct callback_traits: public callback_traits {}; - + /* match plain functions */ template struct callback_traits { @@ -53,17 +53,17 @@ class MsgNetwork: public ConnPool { using conn_type = typename std::remove_reference::type::type; using msg_type = typename std::remove_reference::type; }; - + /* match function pointers */ template struct callback_traits: public callback_traits {}; - + /* match const member functions */ template struct callback_traits: public callback_traits {}; - + /* match member functions */ template struct callback_traits: @@ -78,6 +78,8 @@ class MsgNetwork: public ConnPool { Msg msg; MsgState msg_state; + bool msg_sleep; + TimerEvent ev_enqueue_poll; protected: #ifdef SALTICIDAE_MSG_STAT @@ -88,7 +90,7 @@ class MsgNetwork: public ConnPool { #endif public: - Conn(): msg_state(HEADER) + Conn(): msg_state(HEADER), msg_sleep(false) #ifdef SALTICIDAE_MSG_STAT , nsent(0), nrecv(0), nsentb(0), nrecvb(0) #endif @@ -128,6 +130,26 @@ class MsgNetwork: public ConnPool { ConnPool::Conn *create_conn() override { return new Conn(); } void on_read(const ConnPool::conn_t &) override; + void on_setup(const ConnPool::conn_t &_conn) override { + auto conn = static_pointer_cast(_conn); + conn->ev_enqueue_poll = TimerEvent(conn->worker->get_ec(), + [this, conn](TimerEvent &) { + if (!incoming_msgs.enqueue(std::make_pair(conn->msg, conn), false)) + { + conn->msg_sleep = true; + conn->ev_enqueue_poll.add(0); + return; + } + conn->msg_sleep = false; + on_read(conn); + }); + } + + void on_teardown(const ConnPool::conn_t &_conn) override { + auto conn = static_pointer_cast(_conn); + conn->ev_enqueue_poll.clear(); + } + public: class Config: public ConnPool::Config { @@ -153,7 +175,7 @@ class MsgNetwork: public ConnPool { incoming_msgs.reg_handler(ec, [this, burst_size=config._burst_size](queue_t &q) { std::pair item; size_t cnt = 0; - while (q.try_dequeue(item)) + while (q.try_dequeue(item) && this->system_state == 1) { auto &msg = item.first; auto &conn = item.second; @@ -525,8 +547,9 @@ class PeerNetwork: public MsgNetwork { /* this callback is run by a worker */ template void MsgNetwork::on_read(const ConnPool::conn_t &_conn) { - ConnPool::on_read(_conn); auto conn = static_pointer_cast(_conn); + if (conn->msg_sleep) return; + ConnPool::on_read(_conn); auto &recv_buffer = conn->recv_buffer; auto &msg = conn->msg; auto &msg_state = conn->msg_state; @@ -550,13 +573,25 @@ void MsgNetwork::on_read(const ConnPool::conn_t &_conn) { if (!msg.verify_checksum()) { SALTICIDAE_LOG_WARN("checksums do not match, dropping the message"); - return; + break; } #endif - while (!incoming_msgs.enqueue(std::make_pair(msg, conn), true)) - std::this_thread::yield(); + if (!incoming_msgs.enqueue(std::make_pair(msg, conn), false)) + { + conn->msg_sleep = true; + conn->ev_enqueue_poll.add(0); + return; + } } } + if (conn->ready_recv && recv_buffer.size() < conn->max_recv_buff_size) + { + /* resume reading from socket */ + conn->ev_socket.del(); + conn->ev_socket.add(FdEvent::READ | + (conn->ready_send ? 0: FdEvent::WRITE)); + conn->send_data_func(conn, conn->fd, FdEvent::READ); + } } template diff --git a/src/conn.cpp b/src/conn.cpp index 98ad3fc..a627a15 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -91,7 +91,7 @@ void ConnPool::Conn::_send_data(const conn_t &conn, int fd, int events) { } } conn->ev_socket.del(); - conn->ev_socket.add(FdEvent::READ); + conn->ev_socket.add(conn->ready_recv ? 0 : FdEvent::READ); /* consumed the buffer but endpoint still seems to be writable */ conn->ready_send = true; } @@ -106,6 +106,14 @@ void ConnPool::Conn::_recv_data(const conn_t &conn, int fd, int events) { ssize_t ret = seg_buff_size; while (ret == (ssize_t)seg_buff_size) { + if (conn->recv_buffer.size() >= conn->max_recv_buff_size) + { + /* receive buffer is full, disable READ event */ + conn->ev_socket.del(); + conn->ev_socket.add(conn->ready_send ? 0 : FdEvent::WRITE); + conn->ready_recv = true; + return; + } bytearray_t buff_seg; buff_seg.resize(seg_buff_size); ret = recv(fd, buff_seg.data(), seg_buff_size, 0); @@ -171,7 +179,7 @@ void ConnPool::Conn::_send_data_tls(const conn_t &conn, int fd, int events) { } } conn->ev_socket.del(); - conn->ev_socket.add(FdEvent::READ); + conn->ev_socket.add(conn->ready_recv ? : FdEvent::READ); /* consumed the buffer but endpoint still seems to be writable */ conn->ready_send = true; } @@ -187,6 +195,14 @@ void ConnPool::Conn::_recv_data_tls(const conn_t &conn, int fd, int events) { auto &tls = conn->tls; while (ret == (ssize_t)seg_buff_size) { + if (conn->recv_buffer.size() >= conn->max_recv_buff_size) + { + /* receive buffer is full, disable READ event */ + conn->ev_socket.del(); + conn->ev_socket.add(conn->ready_send ? 0 : FdEvent::WRITE); + conn->ready_recv = true; + return; + } bytearray_t buff_seg; buff_seg.resize(seg_buff_size); ret = tls->recv(buff_seg.data(), seg_buff_size); @@ -295,6 +311,7 @@ void ConnPool::accept_client(int fd, int) { conn_t conn = create_conn(); conn->send_buffer.set_capacity(queue_capacity); conn->seg_buff_size = seg_buff_size; + conn->max_recv_buff_size = max_recv_buff_size; conn->fd = client_fd; conn->cpool = this; conn->mode = Conn::PASSIVE; @@ -375,6 +392,7 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) { conn_t conn = create_conn(); conn->send_buffer.set_capacity(queue_capacity); conn->seg_buff_size = seg_buff_size; + conn->max_recv_buff_size = max_recv_buff_size; conn->fd = fd; conn->cpool = this; conn->mode = Conn::ACTIVE; diff --git a/test/.gitignore b/test/.gitignore index 50e25ba..a83336f 100644 --- a/test/.gitignore +++ b/test/.gitignore @@ -11,3 +11,4 @@ Makefile test_msgnet_c test_msgnet_tls bench_network_tls +test_bounded_recv_buffer diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 15cd414..8f95e14 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -55,3 +55,6 @@ target_link_libraries(bench_network_tls salticidae_static pthread) add_executable(test_msgnet_c test_msgnet_c.c) target_link_libraries(test_msgnet_c salticidae_static pthread) + +add_executable(test_bounded_recv_buffer test_bounded_recv_buffer.cpp) +target_link_libraries(test_bounded_recv_buffer salticidae_static pthread) diff --git a/test/bench_network.cpp b/test/bench_network.cpp index 1021ec4..1d52c39 100644 --- a/test/bench_network.cpp +++ b/test/bench_network.cpp @@ -130,6 +130,13 @@ salticidae::EventContext ec; NetAddr alice_addr("127.0.0.1:1234"); NetAddr bob_addr("127.0.0.1:1235"); +void masksigs() { + sigset_t mask; + sigemptyset(&mask); + sigfillset(&mask); + pthread_sigmask(SIG_BLOCK, &mask, NULL); +} + int main() { salticidae::BoxObj alice = new MyNet(ec, "Alice", 10); alice->start(); @@ -137,6 +144,7 @@ int main() { salticidae::EventContext tec; MyNet bob(tec, "Bob"); std::thread bob_thread([&]() { + masksigs(); bob.start(); bob.connect(alice_addr); tec.dispatch(); diff --git a/test/bench_network_tls.cpp b/test/bench_network_tls.cpp index 7c682ba..47b9347 100644 --- a/test/bench_network_tls.cpp +++ b/test/bench_network_tls.cpp @@ -132,6 +132,13 @@ salticidae::EventContext ec; NetAddr alice_addr("127.0.0.1:1234"); NetAddr bob_addr("127.0.0.1:1235"); +void masksigs() { + sigset_t mask; + sigemptyset(&mask); + sigfillset(&mask); + pthread_sigmask(SIG_BLOCK, &mask, NULL); +} + int main() { salticidae::BoxObj alice = new MyNet(ec, "Alice", 10); alice->start(); @@ -139,6 +146,7 @@ int main() { salticidae::EventContext tec; MyNet bob(tec, "Bob"); std::thread bob_thread([&]() { + masksigs(); bob.start(); bob.connect(alice_addr); tec.dispatch(); diff --git a/test/test_bounded_recv_buffer.cpp b/test/test_bounded_recv_buffer.cpp new file mode 100644 index 0000000..a2ec4e1 --- /dev/null +++ b/test/test_bounded_recv_buffer.cpp @@ -0,0 +1,192 @@ +/** + * Copyright (c) 2018 Cornell University. + * + * Author: Ted Yin + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is furnished to do + * so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include +#include +#include +#include +#include + +/* disable SHA256 checksum */ +#define SALTICIDAE_NOCHECKSUM + +#include "salticidae/msg.h" +#include "salticidae/event.h" +#include "salticidae/network.h" +#include "salticidae/stream.h" + +using salticidae::NetAddr; +using salticidae::DataStream; +using salticidae::MsgNetwork; +using salticidae::htole; +using salticidae::letoh; +using salticidae::bytearray_t; +using salticidae::TimerEvent; +using salticidae::ThreadCall; +using std::placeholders::_1; +using std::placeholders::_2; +using opcode_t = uint8_t; + +struct MsgBytes { + static const opcode_t opcode = 0xa; + DataStream serialized; + bytearray_t bytes; + MsgBytes(size_t size) { + bytes.resize(size); + serialized << htole((uint32_t)size) << bytes; + } + MsgBytes(DataStream &&s) { + uint32_t len; + s >> len; + len = letoh(len); + auto base = s.get_data_inplace(len); + bytes = bytearray_t(base, base + len); + } +}; + +const opcode_t MsgBytes::opcode; + +using MsgNetworkByteOp = MsgNetwork; + +struct MyNet: public MsgNetworkByteOp { + const std::string name; + TimerEvent ev_period_stat; + ThreadCall tcall; + size_t nrecv; + std::function trigger; + std::atomic stopped; + + MyNet(const salticidae::EventContext &ec, + const std::string name, + double stat_timeout = -1): + MsgNetworkByteOp(ec, MsgNetworkByteOp::Config( + ConnPool::Config() + .max_recv_buff_size(10) + .queue_capacity(10) + ).burst_size(10)), + name(name), + ev_period_stat(ec, [this, stat_timeout](TimerEvent &) { + SALTICIDAE_LOG_INFO("%.2f mps", nrecv / (double)stat_timeout); + fflush(stderr); + nrecv = 0; + ev_period_stat.add(stat_timeout); + }), + tcall(ec), + nrecv(0), stopped(false) { + /* message handler could be a bound method */ + reg_handler(salticidae::generic_bind(&MyNet::on_receive_bytes, this, _1, _2)); + if (stat_timeout > 0) + ev_period_stat.add(0); + reg_conn_handler([this, ec](const ConnPool::conn_t &conn, bool connected) { + if (connected) + { + if (conn->get_mode() == MyNet::Conn::ACTIVE) + { + printf("[%s] connected, sending bytes.\n", this->name.c_str()); + /* send the first message through this connection */ + trigger = [this, conn](ThreadCall::Handle &) { + while (!send_msg(MsgBytes(256), salticidae::static_pointer_cast(conn))) + { + if (stopped) + { + stop(); + return; + } + fprintf(stderr, "cannot send message, retrying\n"); + sleep(1); + } + if (!conn->is_terminated()) + tcall.async_call(trigger); + }; + tcall.async_call(trigger); + } + else + printf("[%s] passively connected, waiting for bytes.\n", this->name.c_str()); + } + else + { + printf("[%s] disconnected, retrying.\n", this->name.c_str()); + /* try to reconnect to the same address */ + connect(conn->get_addr()); + } + return true; + }); + } + + void on_receive_bytes(MsgBytes &&msg, const conn_t &conn) { + if (stopped) + { + conn->get_pool()->stop(); + return; + } + sleep(1); + nrecv++; + } +}; + +salticidae::EventContext aec, bec, ec; +NetAddr alice_addr("127.0.0.1:1234"); +NetAddr bob_addr("127.0.0.1:1235"); + +void masksigs() { + sigset_t mask; + sigemptyset(&mask); + sigfillset(&mask); + pthread_sigmask(SIG_BLOCK, &mask, NULL); +} + +int main() { + MyNet alice(aec, "Alice", 10), bob(bec, "Bob"); + std::thread alice_thread([&]() { + masksigs(); + alice.start(); + alice.listen(alice_addr); + aec.dispatch(); + }); + std::thread bob_thread([&]() { + masksigs(); + bob.start(); + bob.connect(alice_addr); + bec.dispatch(); + }); + auto shutdown = [&](int) { + bob.stopped = true; + bob.tcall.async_call([&](salticidae::ThreadCall::Handle &) { + bec.stop(); + }); + bob_thread.join(); + alice.stopped = true; + alice.tcall.async_call([&](salticidae::ThreadCall::Handle &) { + aec.stop(); + }); + alice_thread.join(); + ec.stop(); + }; + salticidae::SigEvent ev_sigint(ec, shutdown); + salticidae::SigEvent ev_sigterm(ec, shutdown); + ev_sigint.add(SIGINT); + ev_sigterm.add(SIGTERM); + ec.dispatch(); + return 0; +} diff --git a/test/test_p2p.cpp b/test/test_p2p.cpp index d097562..aff712a 100644 --- a/test/test_p2p.cpp +++ b/test/test_p2p.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include "salticidae/msg.h" #include "salticidae/event.h" @@ -67,6 +68,13 @@ struct MsgText { const uint8_t MsgText::opcode; +void masksigs() { + sigset_t mask; + sigemptyset(&mask); + sigfillset(&mask); + pthread_sigmask(SIG_BLOCK, &mask, NULL); +} + struct Net { uint64_t id; EventContext ec; @@ -98,6 +106,7 @@ struct Net { this->id, std::string(claimed_addr).c_str()); }); th = std::thread([=](){ + masksigs(); try { net->start(); net->listen(NetAddr(listen_addr)); diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp index 9fe1b83..49f1b2e 100644 --- a/test/test_p2p_stress.cpp +++ b/test/test_p2p_stress.cpp @@ -179,6 +179,13 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) { }); } +void masksigs() { + sigset_t mask; + sigemptyset(&mask); + sigfillset(&mask); + pthread_sigmask(SIG_BLOCK, &mask, NULL); +} + int main(int argc, char **argv) { Config config; auto opt_no_msg = Config::OptValFlag::create(false); @@ -225,6 +232,7 @@ int main(int argc, char **argv) { for (auto &a: apps) threads.push_back(std::thread([&]() { + masksigs(); a.net->listen(a.addr); for (auto &paddr: addrs) if (paddr != a.addr) a.net->add_peer(paddr); diff --git a/test/test_p2p_tls.cpp b/test/test_p2p_tls.cpp index 698bbac..93cefac 100644 --- a/test/test_p2p_tls.cpp +++ b/test/test_p2p_tls.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include "salticidae/msg.h" #include "salticidae/event.h" @@ -69,6 +70,13 @@ struct MsgText { const uint8_t MsgText::opcode; +void masksigs() { + sigset_t mask; + sigemptyset(&mask); + sigfillset(&mask); + pthread_sigmask(SIG_BLOCK, &mask, NULL); +} + std::unordered_set valid_certs; struct Net { @@ -124,6 +132,7 @@ struct Net { this->id, std::string(claimed_addr).c_str()); }); th = std::thread([=](){ + masksigs(); try { net->start(); net->listen(NetAddr(listen_addr)); diff --git a/test/test_queue.cpp b/test/test_queue.cpp index 5c32dac..9082135 100644 --- a/test/test_queue.cpp +++ b/test/test_queue.cpp @@ -8,6 +8,13 @@ using salticidae::TimerEvent; using salticidae::Config; +void masksigs() { + sigset_t mask; + sigemptyset(&mask); + sigfillset(&mask); + pthread_sigmask(SIG_BLOCK, &mask, NULL); +} + void test_mpsc(int nproducers, int nops, size_t burst_size, bool test_rewind) { size_t total = nproducers * nops; salticidae::EventContext ec; @@ -33,6 +40,7 @@ void test_mpsc(int nproducers, int nops, size_t burst_size, bool test_rewind) { }); std::vector producers; std::thread consumer([&collected, total, &ec]() { + masksigs(); TimerEvent timer(ec, [&ec, &collected, total](TimerEvent &timer) { if (collected.load() == total) ec.stop(); timer.add(1); @@ -44,6 +52,7 @@ void test_mpsc(int nproducers, int nops, size_t burst_size, bool test_rewind) { for (int i = 0; i < nproducers; i++) { producers.emplace(producers.end(), std::thread([&q, nops, i, nproducers]() { + masksigs(); int x = i; for (int j = 0; j < nops; j++) { @@ -89,6 +98,7 @@ void test_mpmc(int nproducers, int nconsumers, int nops, size_t burst_size) { { consumers.emplace(consumers.end(), std::thread( [&collected, total, &ec = ecs[i]]() { + masksigs(); TimerEvent timer(ec, [&ec, &collected, total](TimerEvent &timer) { if (collected.load() == total) ec.stop(); timer.add(1); @@ -100,6 +110,7 @@ void test_mpmc(int nproducers, int nconsumers, int nops, size_t burst_size) { for (int i = 0; i < nproducers; i++) { producers.emplace(producers.end(), std::thread([&q, nops, i, nproducers]() { + masksigs(); int x = i; for (int j = 0; j < nops; j++) { -- cgit v1.2.3