aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/salticidae/conn.h2
-rw-r--r--include/salticidae/event.h3
-rw-r--r--include/salticidae/queue.h6
-rw-r--r--test/bench_network.cpp22
4 files changed, 18 insertions, 15 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index e7d6295..cc1c5a6 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -188,7 +188,6 @@ class ConnPool {
void feed(const conn_t &conn, int client_fd) {
/* the caller should finalize all the preparation */
- std::atomic_thread_fence(std::memory_order_release);
tcall.async_call([this, conn, client_fd](ThreadCall::Handle &) {
if (conn->mode == Conn::ConnMode::DEAD)
{
@@ -381,7 +380,6 @@ class ConnPool {
auto ret = *(static_cast<conn_t *>(disp_tcall->call(
[this, addr](ThreadCall::Handle &h) {
auto conn = _connect(addr);
- std::atomic_thread_fence(std::memory_order_release);
h.set_result(std::move(conn));
}).get()));
return std::move(ret);
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index fafa884..b97ab9a 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -481,6 +481,7 @@ class ThreadCall {
}
template<typename T>
void set_result(T &&data) {
+ std::atomic_thread_fence(std::memory_order_release);
using _T = std::remove_reference_t<T>;
result = Result(new _T(std::forward<T>(data)),
[](void *ptr) {delete static_cast<_T *>(ptr);});
@@ -591,7 +592,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
template<typename U>
bool try_enqueue(U &&e) {
static const uint64_t dummy = 1;
- if (!MPMCQueue<T>::try_enqueue(std::forward<U>(e)))
+ if (!MPSCQueue<T>::try_enqueue(std::forward<U>(e)))
return false;
// memory barrier here, so any load/store in enqueue must be finialized
if (wait_sig.exchange(false, std::memory_order_acq_rel))
diff --git a/include/salticidae/queue.h b/include/salticidae/queue.h
index 0b493ea..1732238 100644
--- a/include/salticidae/queue.h
+++ b/include/salticidae/queue.h
@@ -104,7 +104,8 @@ class MPMCQueue {
template<typename U>
void _enqueue(Block *nblk, U &&e) {
new (&(nblk->elem)) T(std::forward<U>(e));
- nblk->next.store(nullptr, std::memory_order_release);
+ std::atomic_thread_fence(std::memory_order_release);
+ nblk->next.store(nullptr, std::memory_order_relaxed);
auto prev = tail.exchange(nblk, std::memory_order_acq_rel);
prev->next.store(nblk, std::memory_order_relaxed);
}
@@ -158,6 +159,7 @@ class MPMCQueue {
blks.release_ref(h);
return false;
}
+ std::atomic_thread_fence(std::memory_order_acquire);
e = std::move(nh->elem);
auto hh = h;
if (head.compare_exchange_weak(hh, nh, std::memory_order_consume))
@@ -180,6 +182,7 @@ struct MPSCQueue: public MPMCQueue<T> {
auto nh = h->next.load(std::memory_order_relaxed);
if (nh == nullptr)
return false;
+ std::atomic_thread_fence(std::memory_order_acquire);
e = std::move(nh->elem);
this->head.store(nh, std::memory_order_release);
this->blks.push(h);
@@ -194,6 +197,7 @@ struct MPSCQueue: public MPMCQueue<T> {
auto h = this->head.load(std::memory_order_acquire);
nblk->next.store(h, std::memory_order_release);
new (&(h->elem)) T(std::forward<U>(e));
+ std::atomic_thread_fence(std::memory_order_release);
this->head.store(nblk, std::memory_order_release);
return true;
}
diff --git a/test/bench_network.cpp b/test/bench_network.cpp
index be90e2a..bb2193b 100644
--- a/test/bench_network.cpp
+++ b/test/bench_network.cpp
@@ -48,7 +48,7 @@ using std::placeholders::_2;
using opcode_t = uint8_t;
struct MsgBytes {
- static const opcode_t opcode = 0x0;
+ static const opcode_t opcode = 0xa;
DataStream serialized;
bytearray_t bytes;
MsgBytes(size_t size) {
@@ -79,7 +79,7 @@ struct MyNet: public MsgNetworkByteOp {
const std::string name,
const NetAddr &peer,
double stat_timeout = -1):
- MsgNetworkByteOp(ec, MsgNetworkByteOp::Config()),
+ MsgNetworkByteOp(ec, MsgNetworkByteOp::Config().burst_size(1000)),
name(name),
peer(peer),
ev_period_stat(ec, [this, stat_timeout](TimerEvent &) {
@@ -161,17 +161,17 @@ int main() {
MyNet bob(ec, "Bob", alice_addr);
bob.start();
bob.connect(alice_addr);
- try {
+ //try {
ec.dispatch();
- } catch (std::exception &) {}
- SALTICIDAE_LOG_INFO("exiting");
+ //} catch (std::exception &) {}
+ //SALTICIDAE_LOG_INFO("exiting");
});
- try {
+ //try {
ec.dispatch();
- } catch (std::exception &e) {
- pthread_kill(bob_thread.native_handle(), SIGTERM);
- bob_thread.join();
- SALTICIDAE_LOG_INFO("exception: %s", e.what());
- }
+ //} catch (std::exception &e) {
+ // pthread_kill(bob_thread.native_handle(), SIGTERM);
+ // bob_thread.join();
+ // SALTICIDAE_LOG_INFO("exception: %s", e.what());
+ //}
return 0;
}