diff options
-rw-r--r-- | TODO.rst | 3 | ||||
-rw-r--r-- | include/salticidae/conn.h | 2 | ||||
-rw-r--r-- | include/salticidae/event.h | 8 | ||||
-rw-r--r-- | include/salticidae/queue.h | 42 | ||||
-rw-r--r-- | test/bench_network.cpp | 54 |
5 files changed, 51 insertions, 58 deletions
@@ -1 +1,2 @@ -- stress test for PeerNetwork +- stress test for PeerNetwork (done) +- better impl (and user control) of non-blocking bounded outgoing queue diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index cc1c5a6..b4df259 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -88,6 +88,7 @@ class ConnPool { TimedFdEvent ev_connect; FdEvent ev_socket; + TimerEvent ev_send_wait; /** does not need to wait if true */ bool ready_send; @@ -365,6 +366,7 @@ class ConnPool { conn_t conn = it.second; conn->stop(); conn->self_ref = nullptr; + ::close(conn->fd); } if (listen_fd != -1) { diff --git a/include/salticidae/event.h b/include/salticidae/event.h index b97ab9a..641bb8e 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -426,7 +426,7 @@ class ThreadNotifier { cv.wait(ul, [this]{ return ready; }); return std::move(data); } - void notify(T &&_data) { + void notify(T &&_data) { mutex_lg_t _(mlock); ready = true; data = std::move(_data); @@ -481,7 +481,6 @@ class ThreadCall { } template<typename T> void set_result(T &&data) { - std::atomic_thread_fence(std::memory_order_release); using _T = std::remove_reference_t<T>; result = Result(new _T(std::forward<T>(data)), [](void *ptr) {delete static_cast<_T *>(ptr);}); @@ -497,6 +496,7 @@ class ThreadCall { ev_listen = FdEvent(ec, ctl_fd[0], [this](int fd, int) { Handle *h; read(fd, &h, sizeof(h)); + std::atomic_thread_fence(std::memory_order_acquire); h->exec(); delete h; }); @@ -567,7 +567,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> { // get to write(fd). Then store(true) must happen after all exchange(false), // since all enqueue operations are finalized, the dequeue should be able // to see those enqueued values in func() - wait_sig.store(true, std::memory_order_release); + wait_sig.exchange(true, std::memory_order_acq_rel); if (func(*this)) write(fd, &dummy, 8); }); @@ -631,7 +631,7 @@ class MPMCQueueEventDriven: public MPMCQueue<T> { uint64_t t; if (read(fd, &t, 8) != 8) return; // only one consumer should be here a a time - wait_sig.store(true, std::memory_order_release); + wait_sig.exchange(true, std::memory_order_acq_rel); if (func(*this)) write(fd, &dummy, 8); }); diff --git a/include/salticidae/queue.h b/include/salticidae/queue.h index 598303c..a72a28f 100644 --- a/include/salticidae/queue.h +++ b/include/salticidae/queue.h @@ -13,9 +13,8 @@ class FreeList { public: struct Node { std::atomic<Node *> next; - std::atomic<bool> addback; std::atomic<size_t> refcnt; - Node(): next(nullptr), addback(false), refcnt(1) {} + Node(): next(nullptr), refcnt(1) {} }; private: @@ -28,16 +27,14 @@ class FreeList { void release_ref(Node *u) { if (u->refcnt.fetch_sub(1, std::memory_order_relaxed) != 1) return; - u->addback.store(false, std::memory_order_relaxed); - //if (!u->addback.exchange(false, std::memory_order_relaxed)) return; for (;;) { - auto t = top.load(std::memory_order_acquire); + auto t = top.load(std::memory_order_relaxed); // repair the next pointer before CAS, otherwise u->next == nullptr // could lead to skipping elements - u->next.store(t, std::memory_order_consume); + u->next.store(t, std::memory_order_relaxed); // the replacement is ok even if ABA happens - if (top.compare_exchange_weak(t, u, std::memory_order_consume)) + if (top.compare_exchange_weak(t, u, std::memory_order_release)) { u->refcnt.store(1, std::memory_order_relaxed); break; @@ -46,9 +43,6 @@ class FreeList { } bool push(Node *u) { - assert(u->addback.load() == false); - // attempt to push it - u->addback.store(true, std::memory_order_release); release_ref(u); return true; } @@ -60,18 +54,18 @@ class FreeList { auto u = top.load(std::memory_order_acquire); /* the list is now empty */ if (u == nullptr) return false; - auto t = u->refcnt.load(std::memory_order_acquire); + auto t = u->refcnt.load(std::memory_order_relaxed); /* let's wait for another round if u is a ghost (already popped) */ if (!t) continue; /* otherwise t > 0, so with CAS, the invariant that zero refcnt can * never be increased is guaranteed */ - if (u->refcnt.compare_exchange_weak(t, t + 1, std::memory_order_consume)) + if (u->refcnt.compare_exchange_weak(t, t + 1, std::memory_order_relaxed)) { /* here, nobody is able to change v->next (because v->next is * only changed when pushed) even when ABA happens */ auto v = u; - auto nv = u->next.load(std::memory_order_acquire); - if (top.compare_exchange_weak(v, nv, std::memory_order_consume)) + auto nv = u->next.load(std::memory_order_relaxed); + if (top.compare_exchange_weak(v, nv, std::memory_order_relaxed)) { /* manage to pop the head */ r = u; @@ -104,7 +98,6 @@ class MPMCQueue { template<typename U> void _enqueue(Block *nblk, U &&e) { new (&(nblk->elem)) T(std::forward<U>(e)); - std::atomic_thread_fence(std::memory_order_release); nblk->next.store(nullptr, std::memory_order_relaxed); auto prev = tail.exchange(nblk, std::memory_order_acq_rel); prev->next.store(nblk, std::memory_order_relaxed); @@ -151,7 +144,7 @@ class MPMCQueue { auto h = head.load(std::memory_order_relaxed); auto t = h->refcnt.load(std::memory_order_relaxed); if (!t) continue; - if (h->refcnt.compare_exchange_weak(t, t + 1, std::memory_order_consume)) + if (h->refcnt.compare_exchange_weak(t, t + 1, std::memory_order_relaxed)) { auto nh = h->next.load(std::memory_order_relaxed); if (nh == nullptr) @@ -159,10 +152,9 @@ class MPMCQueue { blks.release_ref(h); return false; } - std::atomic_thread_fence(std::memory_order_acquire); e = std::move(nh->elem); auto hh = h; - if (head.compare_exchange_weak(hh, nh, std::memory_order_consume)) + if (head.compare_exchange_weak(hh, nh, std::memory_order_relaxed)) { blks.release_ref(h); blks.push(h); @@ -177,13 +169,13 @@ class MPMCQueue { template<typename T> struct MPSCQueue: public MPMCQueue<T> { using MPMCQueue<T>::MPMCQueue; + /* the same thread is calling the following functions */ + bool try_dequeue(T &e) { auto h = this->head.load(std::memory_order_relaxed); auto nh = h->next.load(std::memory_order_relaxed); - std::atomic_thread_fence(std::memory_order_acquire); if (nh == nullptr) return false; e = std::move(nh->elem); - std::atomic_thread_fence(std::memory_order_release); this->head.store(nh, std::memory_order_relaxed); this->blks.push(h); return true; @@ -192,13 +184,13 @@ struct MPSCQueue: public MPMCQueue<T> { template<typename U> bool rewind(U &&e) { FreeList::Node * _nblk; - if (!this->blks.pop(_nblk)) return false; + if (!this->blks.pop(_nblk)) _nblk = new typename MPMCQueue<T>::Block(); + auto nblk = static_cast<typename MPMCQueue<T>::Block *>(_nblk); - auto h = this->head.load(std::memory_order_acquire); - nblk->next.store(h, std::memory_order_release); + auto h = this->head.load(std::memory_order_relaxed); new (&(h->elem)) T(std::forward<U>(e)); - std::atomic_thread_fence(std::memory_order_release); - this->head.store(nblk, std::memory_order_release); + nblk->next.store(h, std::memory_order_relaxed); + this->head.store(nblk, std::memory_order_relaxed); return true; } }; diff --git a/test/bench_network.cpp b/test/bench_network.cpp index bb2193b..89289fe 100644 --- a/test/bench_network.cpp +++ b/test/bench_network.cpp @@ -83,7 +83,8 @@ struct MyNet: public MsgNetworkByteOp { name(name), peer(peer), ev_period_stat(ec, [this, stat_timeout](TimerEvent &) { - SALTICIDAE_LOG_INFO("%.2f mps\n", nrecv / (double)stat_timeout); + SALTICIDAE_LOG_INFO("%.2f mps", nrecv / (double)stat_timeout); + fflush(stderr); nrecv = 0; ev_period_stat.add(stat_timeout); }), @@ -115,7 +116,6 @@ struct MyNet: public MsgNetworkByteOp { net->ev_period_send.add(0); }); net->ev_period_send.add(0); - } else printf("[%s] Passively connected, waiting for greetings.\n", @@ -143,35 +143,33 @@ salticidae::EventContext ec; NetAddr alice_addr("127.0.0.1:1234"); NetAddr bob_addr("127.0.0.1:1235"); -void signal_handler(int) { - throw salticidae::SalticidaeError("got terminal signal"); -} - int main() { - struct sigaction sa; - sa.sa_handler = signal_handler; - sigaction(SIGTERM, &sa, NULL); - sigaction(SIGINT, &sa, NULL); - /* test two nodes */ - MyNet alice(ec, "Alice", bob_addr, 10); - alice.start(); - alice.listen(alice_addr); - std::thread bob_thread([]() { - salticidae::EventContext ec; - MyNet bob(ec, "Bob", alice_addr); + salticidae::BoxObj<MyNet> alice = new MyNet(ec, "Alice", bob_addr, 10); + alice->start(); + alice->listen(alice_addr); + salticidae::EventContext tec; + salticidae::BoxObj<salticidae::ThreadCall> tcall = new salticidae::ThreadCall(tec); + std::thread bob_thread([&tec]() { + MyNet bob(tec, "Bob", alice_addr); bob.start(); bob.connect(alice_addr); - //try { - ec.dispatch(); - //} catch (std::exception &) {} - //SALTICIDAE_LOG_INFO("exiting"); + try { + tec.dispatch(); + } catch (std::exception &) {} + SALTICIDAE_LOG_INFO("thread exiting"); }); - //try { - ec.dispatch(); - //} catch (std::exception &e) { - // pthread_kill(bob_thread.native_handle(), SIGTERM); - // bob_thread.join(); - // SALTICIDAE_LOG_INFO("exception: %s", e.what()); - //} + auto shutdown = [&](int) { + tcall->async_call([&](salticidae::ThreadCall::Handle &) { + tec.stop(); + }); + alice = nullptr; + //ec.stop(); + //bob_thread.join(); + }; + salticidae::SigEvent ev_sigint(ec, shutdown); + salticidae::SigEvent ev_sigterm(ec, shutdown); + ev_sigint.add(SIGINT); + ev_sigterm.add(SIGTERM); + ec.dispatch(); return 0; } |