aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--TODO.rst3
-rw-r--r--include/salticidae/conn.h2
-rw-r--r--include/salticidae/event.h8
-rw-r--r--include/salticidae/queue.h42
-rw-r--r--test/bench_network.cpp54
5 files changed, 51 insertions, 58 deletions
diff --git a/TODO.rst b/TODO.rst
index de4f828..e31eb5c 100644
--- a/TODO.rst
+++ b/TODO.rst
@@ -1 +1,2 @@
-- stress test for PeerNetwork
+- stress test for PeerNetwork (done)
+- better impl (and user control) of non-blocking bounded outgoing queue
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index cc1c5a6..b4df259 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -88,6 +88,7 @@ class ConnPool {
TimedFdEvent ev_connect;
FdEvent ev_socket;
+ TimerEvent ev_send_wait;
/** does not need to wait if true */
bool ready_send;
@@ -365,6 +366,7 @@ class ConnPool {
conn_t conn = it.second;
conn->stop();
conn->self_ref = nullptr;
+ ::close(conn->fd);
}
if (listen_fd != -1)
{
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index b97ab9a..641bb8e 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -426,7 +426,7 @@ class ThreadNotifier {
cv.wait(ul, [this]{ return ready; });
return std::move(data);
}
- void notify(T &&_data) {
+ void notify(T &&_data) {
mutex_lg_t _(mlock);
ready = true;
data = std::move(_data);
@@ -481,7 +481,6 @@ class ThreadCall {
}
template<typename T>
void set_result(T &&data) {
- std::atomic_thread_fence(std::memory_order_release);
using _T = std::remove_reference_t<T>;
result = Result(new _T(std::forward<T>(data)),
[](void *ptr) {delete static_cast<_T *>(ptr);});
@@ -497,6 +496,7 @@ class ThreadCall {
ev_listen = FdEvent(ec, ctl_fd[0], [this](int fd, int) {
Handle *h;
read(fd, &h, sizeof(h));
+ std::atomic_thread_fence(std::memory_order_acquire);
h->exec();
delete h;
});
@@ -567,7 +567,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
// get to write(fd). Then store(true) must happen after all exchange(false),
// since all enqueue operations are finalized, the dequeue should be able
// to see those enqueued values in func()
- wait_sig.store(true, std::memory_order_release);
+ wait_sig.exchange(true, std::memory_order_acq_rel);
if (func(*this))
write(fd, &dummy, 8);
});
@@ -631,7 +631,7 @@ class MPMCQueueEventDriven: public MPMCQueue<T> {
uint64_t t;
if (read(fd, &t, 8) != 8) return;
// only one consumer should be here a a time
- wait_sig.store(true, std::memory_order_release);
+ wait_sig.exchange(true, std::memory_order_acq_rel);
if (func(*this))
write(fd, &dummy, 8);
});
diff --git a/include/salticidae/queue.h b/include/salticidae/queue.h
index 598303c..a72a28f 100644
--- a/include/salticidae/queue.h
+++ b/include/salticidae/queue.h
@@ -13,9 +13,8 @@ class FreeList {
public:
struct Node {
std::atomic<Node *> next;
- std::atomic<bool> addback;
std::atomic<size_t> refcnt;
- Node(): next(nullptr), addback(false), refcnt(1) {}
+ Node(): next(nullptr), refcnt(1) {}
};
private:
@@ -28,16 +27,14 @@ class FreeList {
void release_ref(Node *u) {
if (u->refcnt.fetch_sub(1, std::memory_order_relaxed) != 1) return;
- u->addback.store(false, std::memory_order_relaxed);
- //if (!u->addback.exchange(false, std::memory_order_relaxed)) return;
for (;;)
{
- auto t = top.load(std::memory_order_acquire);
+ auto t = top.load(std::memory_order_relaxed);
// repair the next pointer before CAS, otherwise u->next == nullptr
// could lead to skipping elements
- u->next.store(t, std::memory_order_consume);
+ u->next.store(t, std::memory_order_relaxed);
// the replacement is ok even if ABA happens
- if (top.compare_exchange_weak(t, u, std::memory_order_consume))
+ if (top.compare_exchange_weak(t, u, std::memory_order_release))
{
u->refcnt.store(1, std::memory_order_relaxed);
break;
@@ -46,9 +43,6 @@ class FreeList {
}
bool push(Node *u) {
- assert(u->addback.load() == false);
- // attempt to push it
- u->addback.store(true, std::memory_order_release);
release_ref(u);
return true;
}
@@ -60,18 +54,18 @@ class FreeList {
auto u = top.load(std::memory_order_acquire);
/* the list is now empty */
if (u == nullptr) return false;
- auto t = u->refcnt.load(std::memory_order_acquire);
+ auto t = u->refcnt.load(std::memory_order_relaxed);
/* let's wait for another round if u is a ghost (already popped) */
if (!t) continue;
/* otherwise t > 0, so with CAS, the invariant that zero refcnt can
* never be increased is guaranteed */
- if (u->refcnt.compare_exchange_weak(t, t + 1, std::memory_order_consume))
+ if (u->refcnt.compare_exchange_weak(t, t + 1, std::memory_order_relaxed))
{
/* here, nobody is able to change v->next (because v->next is
* only changed when pushed) even when ABA happens */
auto v = u;
- auto nv = u->next.load(std::memory_order_acquire);
- if (top.compare_exchange_weak(v, nv, std::memory_order_consume))
+ auto nv = u->next.load(std::memory_order_relaxed);
+ if (top.compare_exchange_weak(v, nv, std::memory_order_relaxed))
{
/* manage to pop the head */
r = u;
@@ -104,7 +98,6 @@ class MPMCQueue {
template<typename U>
void _enqueue(Block *nblk, U &&e) {
new (&(nblk->elem)) T(std::forward<U>(e));
- std::atomic_thread_fence(std::memory_order_release);
nblk->next.store(nullptr, std::memory_order_relaxed);
auto prev = tail.exchange(nblk, std::memory_order_acq_rel);
prev->next.store(nblk, std::memory_order_relaxed);
@@ -151,7 +144,7 @@ class MPMCQueue {
auto h = head.load(std::memory_order_relaxed);
auto t = h->refcnt.load(std::memory_order_relaxed);
if (!t) continue;
- if (h->refcnt.compare_exchange_weak(t, t + 1, std::memory_order_consume))
+ if (h->refcnt.compare_exchange_weak(t, t + 1, std::memory_order_relaxed))
{
auto nh = h->next.load(std::memory_order_relaxed);
if (nh == nullptr)
@@ -159,10 +152,9 @@ class MPMCQueue {
blks.release_ref(h);
return false;
}
- std::atomic_thread_fence(std::memory_order_acquire);
e = std::move(nh->elem);
auto hh = h;
- if (head.compare_exchange_weak(hh, nh, std::memory_order_consume))
+ if (head.compare_exchange_weak(hh, nh, std::memory_order_relaxed))
{
blks.release_ref(h);
blks.push(h);
@@ -177,13 +169,13 @@ class MPMCQueue {
template<typename T>
struct MPSCQueue: public MPMCQueue<T> {
using MPMCQueue<T>::MPMCQueue;
+ /* the same thread is calling the following functions */
+
bool try_dequeue(T &e) {
auto h = this->head.load(std::memory_order_relaxed);
auto nh = h->next.load(std::memory_order_relaxed);
- std::atomic_thread_fence(std::memory_order_acquire);
if (nh == nullptr) return false;
e = std::move(nh->elem);
- std::atomic_thread_fence(std::memory_order_release);
this->head.store(nh, std::memory_order_relaxed);
this->blks.push(h);
return true;
@@ -192,13 +184,13 @@ struct MPSCQueue: public MPMCQueue<T> {
template<typename U>
bool rewind(U &&e) {
FreeList::Node * _nblk;
- if (!this->blks.pop(_nblk)) return false;
+ if (!this->blks.pop(_nblk)) _nblk = new typename MPMCQueue<T>::Block();
+
auto nblk = static_cast<typename MPMCQueue<T>::Block *>(_nblk);
- auto h = this->head.load(std::memory_order_acquire);
- nblk->next.store(h, std::memory_order_release);
+ auto h = this->head.load(std::memory_order_relaxed);
new (&(h->elem)) T(std::forward<U>(e));
- std::atomic_thread_fence(std::memory_order_release);
- this->head.store(nblk, std::memory_order_release);
+ nblk->next.store(h, std::memory_order_relaxed);
+ this->head.store(nblk, std::memory_order_relaxed);
return true;
}
};
diff --git a/test/bench_network.cpp b/test/bench_network.cpp
index bb2193b..89289fe 100644
--- a/test/bench_network.cpp
+++ b/test/bench_network.cpp
@@ -83,7 +83,8 @@ struct MyNet: public MsgNetworkByteOp {
name(name),
peer(peer),
ev_period_stat(ec, [this, stat_timeout](TimerEvent &) {
- SALTICIDAE_LOG_INFO("%.2f mps\n", nrecv / (double)stat_timeout);
+ SALTICIDAE_LOG_INFO("%.2f mps", nrecv / (double)stat_timeout);
+ fflush(stderr);
nrecv = 0;
ev_period_stat.add(stat_timeout);
}),
@@ -115,7 +116,6 @@ struct MyNet: public MsgNetworkByteOp {
net->ev_period_send.add(0);
});
net->ev_period_send.add(0);
-
}
else
printf("[%s] Passively connected, waiting for greetings.\n",
@@ -143,35 +143,33 @@ salticidae::EventContext ec;
NetAddr alice_addr("127.0.0.1:1234");
NetAddr bob_addr("127.0.0.1:1235");
-void signal_handler(int) {
- throw salticidae::SalticidaeError("got terminal signal");
-}
-
int main() {
- struct sigaction sa;
- sa.sa_handler = signal_handler;
- sigaction(SIGTERM, &sa, NULL);
- sigaction(SIGINT, &sa, NULL);
- /* test two nodes */
- MyNet alice(ec, "Alice", bob_addr, 10);
- alice.start();
- alice.listen(alice_addr);
- std::thread bob_thread([]() {
- salticidae::EventContext ec;
- MyNet bob(ec, "Bob", alice_addr);
+ salticidae::BoxObj<MyNet> alice = new MyNet(ec, "Alice", bob_addr, 10);
+ alice->start();
+ alice->listen(alice_addr);
+ salticidae::EventContext tec;
+ salticidae::BoxObj<salticidae::ThreadCall> tcall = new salticidae::ThreadCall(tec);
+ std::thread bob_thread([&tec]() {
+ MyNet bob(tec, "Bob", alice_addr);
bob.start();
bob.connect(alice_addr);
- //try {
- ec.dispatch();
- //} catch (std::exception &) {}
- //SALTICIDAE_LOG_INFO("exiting");
+ try {
+ tec.dispatch();
+ } catch (std::exception &) {}
+ SALTICIDAE_LOG_INFO("thread exiting");
});
- //try {
- ec.dispatch();
- //} catch (std::exception &e) {
- // pthread_kill(bob_thread.native_handle(), SIGTERM);
- // bob_thread.join();
- // SALTICIDAE_LOG_INFO("exception: %s", e.what());
- //}
+ auto shutdown = [&](int) {
+ tcall->async_call([&](salticidae::ThreadCall::Handle &) {
+ tec.stop();
+ });
+ alice = nullptr;
+ //ec.stop();
+ //bob_thread.join();
+ };
+ salticidae::SigEvent ev_sigint(ec, shutdown);
+ salticidae::SigEvent ev_sigterm(ec, shutdown);
+ ev_sigint.add(SIGINT);
+ ev_sigterm.add(SIGTERM);
+ ec.dispatch();
return 0;
}