aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/salticidae/event.h16
-rw-r--r--include/salticidae/network.h3
-rw-r--r--test/test_queue.cpp13
3 files changed, 12 insertions, 20 deletions
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index 0600109..df49c58 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -488,19 +488,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
return true;
}
- template<typename U>
- bool try_enqueue(U &&e) {
- static const uint64_t dummy = 1;
- if (!MPSCQueue<T>::try_enqueue(std::forward<U>(e)))
- return false;
- // memory barrier here, so any load/store in enqueue must be finialized
- if (wait_sig.exchange(false, std::memory_order_acq_rel))
- {
- SALTICIDAE_LOG_DEBUG("mpsc notify");
- write(fd, &dummy, 8);
- }
- return true;
- }
+ template<typename U> bool try_enqueue(U &&e) = delete;
};
template<typename T>
@@ -552,6 +540,8 @@ class MPMCQueueEventDriven: public MPMCQueue<T> {
}
return true;
}
+
+ template<typename U> bool try_enqueue(U &&e) = delete;
};
class ThreadCall {
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 2cef81d..1b871f5 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -451,7 +451,8 @@ void MsgNetwork<OpcodeType>::Conn::on_read() {
}
#endif
auto conn = static_pointer_cast<Conn>(self());
- while (!mn->incoming_msgs.try_enqueue(std::make_pair(msg, conn)));
+ while (!mn->incoming_msgs.enqueue(std::make_pair(msg, conn), false))
+ std::this_thread::yield();
}
}
}
diff --git a/test/test_queue.cpp b/test/test_queue.cpp
index 5c56585..fac3bc7 100644
--- a/test/test_queue.cpp
+++ b/test/test_queue.cpp
@@ -43,7 +43,7 @@ void test_mpsc(int nproducers, int nops, size_t burst_size) {
for (int j = 0; j < nops; j++)
{
//usleep(rand() / double(RAND_MAX) * 100);
- while (!q.try_enqueue(x))
+ while (!q.enqueue(x, false))
std::this_thread::yield();
x += nproducers;
}
@@ -55,7 +55,7 @@ void test_mpsc(int nproducers, int nops, size_t burst_size) {
SALTICIDAE_LOG_INFO("consumers terminate");
}
-/*void test_mpmc(int nproducers, int nconsumers, int nops, size_t burst_size) {
+void test_mpmc(int nproducers, int nconsumers, int nops, size_t burst_size) {
size_t total = nproducers * nops;
using queue_t = salticidae::MPMCQueueEventDriven<int>;
queue_t q;
@@ -64,6 +64,7 @@ void test_mpsc(int nproducers, int nops, size_t burst_size) {
std::vector<salticidae::EventContext> ecs;
std::atomic<size_t> collected(0);
ecs.resize(nconsumers);
+ q.set_capacity(65536 * nproducers);
for (int i = 0; i < nconsumers; i++)
{
q.reg_handler(ecs[i], [&collected, burst_size](queue_t &q) {
@@ -98,7 +99,8 @@ void test_mpsc(int nproducers, int nops, size_t burst_size) {
for (int j = 0; j < nops; j++)
{
//usleep(rand() / double(RAND_MAX) * 100);
- q.try_enqueue(x);
+ while (!q.enqueue(x, false))
+ std::this_thread::yield();
x += nproducers;
}
}));
@@ -108,7 +110,6 @@ void test_mpsc(int nproducers, int nops, size_t burst_size) {
for (auto &t: consumers) t.join();
SALTICIDAE_LOG_INFO("consumers terminate");
}
-*/
int main(int argc, char **argv) {
Config config;
@@ -140,8 +141,8 @@ int main(int argc, char **argv) {
else
{
SALTICIDAE_LOG_INFO("testing an MPMC queue...");
- //test_mpmc(opt_nproducers->get(), opt_nconsumers->get(),
- // opt_nops->get(), opt_burst_size->get());
+ test_mpmc(opt_nproducers->get(), opt_nconsumers->get(),
+ opt_nops->get(), opt_burst_size->get());
}
return 0;
}