diff options
-rw-r--r-- | include/salticidae/event.h | 16 | ||||
-rw-r--r-- | include/salticidae/network.h | 3 | ||||
-rw-r--r-- | test/test_queue.cpp | 13 |
3 files changed, 12 insertions, 20 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 0600109..df49c58 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -488,19 +488,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> { return true; } - template<typename U> - bool try_enqueue(U &&e) { - static const uint64_t dummy = 1; - if (!MPSCQueue<T>::try_enqueue(std::forward<U>(e))) - return false; - // memory barrier here, so any load/store in enqueue must be finialized - if (wait_sig.exchange(false, std::memory_order_acq_rel)) - { - SALTICIDAE_LOG_DEBUG("mpsc notify"); - write(fd, &dummy, 8); - } - return true; - } + template<typename U> bool try_enqueue(U &&e) = delete; }; template<typename T> @@ -552,6 +540,8 @@ class MPMCQueueEventDriven: public MPMCQueue<T> { } return true; } + + template<typename U> bool try_enqueue(U &&e) = delete; }; class ThreadCall { diff --git a/include/salticidae/network.h b/include/salticidae/network.h index 2cef81d..1b871f5 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -451,7 +451,8 @@ void MsgNetwork<OpcodeType>::Conn::on_read() { } #endif auto conn = static_pointer_cast<Conn>(self()); - while (!mn->incoming_msgs.try_enqueue(std::make_pair(msg, conn))); + while (!mn->incoming_msgs.enqueue(std::make_pair(msg, conn), false)) + std::this_thread::yield(); } } } diff --git a/test/test_queue.cpp b/test/test_queue.cpp index 5c56585..fac3bc7 100644 --- a/test/test_queue.cpp +++ b/test/test_queue.cpp @@ -43,7 +43,7 @@ void test_mpsc(int nproducers, int nops, size_t burst_size) { for (int j = 0; j < nops; j++) { //usleep(rand() / double(RAND_MAX) * 100); - while (!q.try_enqueue(x)) + while (!q.enqueue(x, false)) std::this_thread::yield(); x += nproducers; } @@ -55,7 +55,7 @@ void test_mpsc(int nproducers, int nops, size_t burst_size) { SALTICIDAE_LOG_INFO("consumers terminate"); } -/*void test_mpmc(int nproducers, int nconsumers, int nops, size_t burst_size) { +void test_mpmc(int nproducers, int nconsumers, int nops, size_t burst_size) { size_t total = nproducers * nops; using queue_t = salticidae::MPMCQueueEventDriven<int>; queue_t q; @@ -64,6 +64,7 @@ void test_mpsc(int nproducers, int nops, size_t burst_size) { std::vector<salticidae::EventContext> ecs; std::atomic<size_t> collected(0); ecs.resize(nconsumers); + q.set_capacity(65536 * nproducers); for (int i = 0; i < nconsumers; i++) { q.reg_handler(ecs[i], [&collected, burst_size](queue_t &q) { @@ -98,7 +99,8 @@ void test_mpsc(int nproducers, int nops, size_t burst_size) { for (int j = 0; j < nops; j++) { //usleep(rand() / double(RAND_MAX) * 100); - q.try_enqueue(x); + while (!q.enqueue(x, false)) + std::this_thread::yield(); x += nproducers; } })); @@ -108,7 +110,6 @@ void test_mpsc(int nproducers, int nops, size_t burst_size) { for (auto &t: consumers) t.join(); SALTICIDAE_LOG_INFO("consumers terminate"); } -*/ int main(int argc, char **argv) { Config config; @@ -140,8 +141,8 @@ int main(int argc, char **argv) { else { SALTICIDAE_LOG_INFO("testing an MPMC queue..."); - //test_mpmc(opt_nproducers->get(), opt_nconsumers->get(), - // opt_nops->get(), opt_burst_size->get()); + test_mpmc(opt_nproducers->get(), opt_nconsumers->get(), + opt_nops->get(), opt_burst_size->get()); } return 0; } |