From a4015bde81371a8b998361ccfb56d5fd714e3ec6 Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 26 Mar 2019 16:05:01 -0400 Subject: update atomic fences --- include/salticidae/conn.h | 2 -- include/salticidae/event.h | 3 ++- include/salticidae/queue.h | 6 +++++- test/bench_network.cpp | 22 +++++++++++----------- 4 files changed, 18 insertions(+), 15 deletions(-) diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index e7d6295..cc1c5a6 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -188,7 +188,6 @@ class ConnPool { void feed(const conn_t &conn, int client_fd) { /* the caller should finalize all the preparation */ - std::atomic_thread_fence(std::memory_order_release); tcall.async_call([this, conn, client_fd](ThreadCall::Handle &) { if (conn->mode == Conn::ConnMode::DEAD) { @@ -381,7 +380,6 @@ class ConnPool { auto ret = *(static_cast(disp_tcall->call( [this, addr](ThreadCall::Handle &h) { auto conn = _connect(addr); - std::atomic_thread_fence(std::memory_order_release); h.set_result(std::move(conn)); }).get())); return std::move(ret); diff --git a/include/salticidae/event.h b/include/salticidae/event.h index fafa884..b97ab9a 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -481,6 +481,7 @@ class ThreadCall { } template void set_result(T &&data) { + std::atomic_thread_fence(std::memory_order_release); using _T = std::remove_reference_t; result = Result(new _T(std::forward(data)), [](void *ptr) {delete static_cast<_T *>(ptr);}); @@ -591,7 +592,7 @@ class MPSCQueueEventDriven: public MPSCQueue { template bool try_enqueue(U &&e) { static const uint64_t dummy = 1; - if (!MPMCQueue::try_enqueue(std::forward(e))) + if (!MPSCQueue::try_enqueue(std::forward(e))) return false; // memory barrier here, so any load/store in enqueue must be finialized if (wait_sig.exchange(false, std::memory_order_acq_rel)) diff --git a/include/salticidae/queue.h b/include/salticidae/queue.h index 0b493ea..1732238 100644 --- a/include/salticidae/queue.h +++ b/include/salticidae/queue.h @@ -104,7 +104,8 @@ class MPMCQueue { template void _enqueue(Block *nblk, U &&e) { new (&(nblk->elem)) T(std::forward(e)); - nblk->next.store(nullptr, std::memory_order_release); + std::atomic_thread_fence(std::memory_order_release); + nblk->next.store(nullptr, std::memory_order_relaxed); auto prev = tail.exchange(nblk, std::memory_order_acq_rel); prev->next.store(nblk, std::memory_order_relaxed); } @@ -158,6 +159,7 @@ class MPMCQueue { blks.release_ref(h); return false; } + std::atomic_thread_fence(std::memory_order_acquire); e = std::move(nh->elem); auto hh = h; if (head.compare_exchange_weak(hh, nh, std::memory_order_consume)) @@ -180,6 +182,7 @@ struct MPSCQueue: public MPMCQueue { auto nh = h->next.load(std::memory_order_relaxed); if (nh == nullptr) return false; + std::atomic_thread_fence(std::memory_order_acquire); e = std::move(nh->elem); this->head.store(nh, std::memory_order_release); this->blks.push(h); @@ -194,6 +197,7 @@ struct MPSCQueue: public MPMCQueue { auto h = this->head.load(std::memory_order_acquire); nblk->next.store(h, std::memory_order_release); new (&(h->elem)) T(std::forward(e)); + std::atomic_thread_fence(std::memory_order_release); this->head.store(nblk, std::memory_order_release); return true; } diff --git a/test/bench_network.cpp b/test/bench_network.cpp index be90e2a..bb2193b 100644 --- a/test/bench_network.cpp +++ b/test/bench_network.cpp @@ -48,7 +48,7 @@ using std::placeholders::_2; using opcode_t = uint8_t; struct MsgBytes { - static const opcode_t opcode = 0x0; + static const opcode_t opcode = 0xa; DataStream serialized; bytearray_t bytes; MsgBytes(size_t size) { @@ -79,7 +79,7 @@ struct MyNet: public MsgNetworkByteOp { const std::string name, const NetAddr &peer, double stat_timeout = -1): - MsgNetworkByteOp(ec, MsgNetworkByteOp::Config()), + MsgNetworkByteOp(ec, MsgNetworkByteOp::Config().burst_size(1000)), name(name), peer(peer), ev_period_stat(ec, [this, stat_timeout](TimerEvent &) { @@ -161,17 +161,17 @@ int main() { MyNet bob(ec, "Bob", alice_addr); bob.start(); bob.connect(alice_addr); - try { + //try { ec.dispatch(); - } catch (std::exception &) {} - SALTICIDAE_LOG_INFO("exiting"); + //} catch (std::exception &) {} + //SALTICIDAE_LOG_INFO("exiting"); }); - try { + //try { ec.dispatch(); - } catch (std::exception &e) { - pthread_kill(bob_thread.native_handle(), SIGTERM); - bob_thread.join(); - SALTICIDAE_LOG_INFO("exception: %s", e.what()); - } + //} catch (std::exception &e) { + // pthread_kill(bob_thread.native_handle(), SIGTERM); + // bob_thread.join(); + // SALTICIDAE_LOG_INFO("exception: %s", e.what()); + //} return 0; } -- cgit v1.2.3-70-g09d2