aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt4
-rw-r--r--include/salticidae/buffer.h8
-rw-r--r--include/salticidae/conn.h15
-rw-r--r--include/salticidae/event.h218
-rw-r--r--include/salticidae/network.h25
-rw-r--r--include/salticidae/queue.h16
-rw-r--r--src/conn.cpp2
-rw-r--r--test/bench_network.cpp2
8 files changed, 163 insertions, 127 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index b441591..903a66f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -62,5 +62,5 @@ option(SALTICIDAE_NOCHECKSUM " disable checksum in messages" OFF)
configure_file(src/config.h.in include/salticidae/config.h @ONLY)
-set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -W -Wall -Wextra -pedantic -Wsuggest-override")
-set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -W -Wall -Wextra -pedantic -Wsuggest-override")
+set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -faligned-new -W -Wall -Wextra -pedantic -Wsuggest-override")
+set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -faligned-new -W -Wall -Wextra -pedantic -Wsuggest-override")
diff --git a/include/salticidae/buffer.h b/include/salticidae/buffer.h
index 2324d8b..c7b11dd 100644
--- a/include/salticidae/buffer.h
+++ b/include/salticidae/buffer.h
@@ -126,14 +126,14 @@ struct MPSCWriteBuffer {
MPSCWriteBuffer(const SegBuffer &other) = delete;
MPSCWriteBuffer(SegBuffer &&other) = delete;
+ void set_capacity(size_t capacity) { buffer.set_capacity(capacity); }
+
void rewind(bytearray_t &&data) {
buffer.rewind(buffer_entry_t(std::move(data)));
}
- void push(bytearray_t &&data) {
- buffer_entry_t d(std::move(data));
- // TODO: better bounded buffer impl
- while (!buffer.try_enqueue(d)) {}
+ bool push(bytearray_t &&data, bool unbounded) {
+ return buffer.enqueue(buffer_entry_t(std::move(data)), unbounded);
}
bytearray_t move_pop() {
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index b4df259..201e574 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -129,8 +129,8 @@ class ConnPool {
/** Write data to the connection (non-blocking). The data will be sent
* whenever I/O is available. */
- void write(bytearray_t &&data) {
- send_buffer.push(std::move(data));
+ bool write(bytearray_t &&data) {
+ return send_buffer.push(std::move(data), !cpool->queue_capacity);
}
protected:
@@ -156,6 +156,7 @@ class ConnPool {
const int max_listen_backlog;
const double conn_server_timeout;
const size_t seg_buff_size;
+ const size_t queue_capacity;
/* owned by user loop */
BoxObj<ThreadCall> user_tcall;
@@ -287,13 +288,15 @@ class ConnPool {
double _conn_server_timeout;
size_t _seg_buff_size;
size_t _nworker;
+ size_t _queue_capacity;
public:
Config():
_max_listen_backlog(10),
_conn_server_timeout(2),
_seg_buff_size(4096),
- _nworker(1) {}
+ _nworker(1),
+ _queue_capacity(0) {}
Config &max_listen_backlog(int x) {
_max_listen_backlog = x;
@@ -314,6 +317,11 @@ class ConnPool {
_nworker = std::max((size_t)1, x);
return *this;
}
+
+ Config &queue_capacity(size_t x) {
+ _queue_capacity = x;
+ return *this;
+ }
};
ConnPool(const EventContext &ec, const Config &config):
@@ -321,6 +329,7 @@ class ConnPool {
max_listen_backlog(config._max_listen_backlog),
conn_server_timeout(config._conn_server_timeout),
seg_buff_size(config._seg_buff_size),
+ queue_capacity(config._queue_capacity),
listen_fd(-1),
nworker(config._nworker),
worker_running(false) {
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index 641bb8e..0600109 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -434,107 +434,6 @@ class ThreadNotifier {
}
};
-class ThreadCall {
- int ctl_fd[2];
- EventContext ec;
- FdEvent ev_listen;
-
- public:
- struct Result {
- void *data;
- std::function<void(void *)> deleter;
- Result(): data(nullptr) {}
- Result(void *data, std::function<void(void *)> &&deleter):
- data(data), deleter(std::move(deleter)) {}
- ~Result() { if (data != nullptr) deleter(data); }
- Result(const Result &) = delete;
- Result(Result &&other):
- data(other.data), deleter(std::move(other.deleter)) {
- other.data = nullptr;
- }
- void swap(Result &other) {
- std::swap(data, other.data);
- std::swap(deleter, other.deleter);
- }
- Result &operator=(const Result &other) = delete;
- Result &operator=(Result &&other) {
- if (this != &other)
- {
- Result tmp(std::move(other));
- tmp.swap(*this);
- }
- return *this;
- }
- void *get() { return data; }
- };
- class Handle {
- std::function<void(Handle &)> callback;
- ThreadNotifier<Result> * notifier;
- Result result;
- friend ThreadCall;
- public:
- Handle(): notifier(nullptr) {}
- void exec() {
- callback(*this);
- if (notifier)
- notifier->notify(std::move(result));
- }
- template<typename T>
- void set_result(T &&data) {
- using _T = std::remove_reference_t<T>;
- result = Result(new _T(std::forward<T>(data)),
- [](void *ptr) {delete static_cast<_T *>(ptr);});
- }
- };
-
- ThreadCall() = default;
- ThreadCall(const ThreadCall &) = delete;
- ThreadCall(ThreadCall &&) = delete;
- ThreadCall(EventContext ec): ec(ec) {
- if (pipe2(ctl_fd, O_NONBLOCK))
- throw SalticidaeError(std::string("ThreadCall: failed to create pipe"));
- ev_listen = FdEvent(ec, ctl_fd[0], [this](int fd, int) {
- Handle *h;
- read(fd, &h, sizeof(h));
- std::atomic_thread_fence(std::memory_order_acquire);
- h->exec();
- delete h;
- });
- ev_listen.add(FdEvent::READ);
- }
-
- ~ThreadCall() {
- ev_listen.clear();
- Handle *h;
- while (read(ctl_fd[0], &h, sizeof(h)) == sizeof(h))
- delete h;
- close(ctl_fd[0]);
- close(ctl_fd[1]);
- }
-
- template<typename Func>
- void async_call(Func callback) {
- auto h = new Handle();
- h->callback = callback;
- std::atomic_thread_fence(std::memory_order_release);
- write(ctl_fd[1], &h, sizeof(h));
- }
-
- template<typename Func>
- Result call(Func callback) {
- auto h = new Handle();
- h->callback = callback;
- ThreadNotifier<Result> notifier;
- h->notifier = &notifier;
- std::atomic_thread_fence(std::memory_order_release);
- write(ctl_fd[1], &h, sizeof(h));
- return notifier.wait();
- }
-
- const EventContext &get_ec() const { return ec; }
-};
-
-
template<typename T>
class MPSCQueueEventDriven: public MPSCQueue<T> {
private:
@@ -544,8 +443,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
FdEvent ev;
public:
- MPSCQueueEventDriven(size_t capacity = 65536):
- MPSCQueue<T>(capacity),
+ MPSCQueueEventDriven():
wait_sig(true),
fd(eventfd(0, EFD_NONBLOCK)) {}
@@ -577,9 +475,10 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
void unreg_handler() { ev.clear(); }
template<typename U>
- bool enqueue(U &&e) {
+ bool enqueue(U &&e, bool unbounded = true) {
static const uint64_t dummy = 1;
- MPSCQueue<T>::enqueue(std::forward<U>(e));
+ if (!MPSCQueue<T>::enqueue(std::forward<U>(e), unbounded))
+ return false;
// memory barrier here, so any load/store in enqueue must be finialized
if (wait_sig.exchange(false, std::memory_order_acq_rel))
{
@@ -613,8 +512,7 @@ class MPMCQueueEventDriven: public MPMCQueue<T> {
std::vector<FdEvent> evs;
public:
- MPMCQueueEventDriven(size_t capacity = 65536):
- MPMCQueue<T>(capacity),
+ MPMCQueueEventDriven():
wait_sig(true),
fd(eventfd(0, EFD_NONBLOCK)) {}
@@ -642,9 +540,10 @@ class MPMCQueueEventDriven: public MPMCQueue<T> {
void unreg_handlers() { evs.clear(); }
template<typename U>
- bool enqueue(U &&e) {
+ bool enqueue(U &&e, bool unbounded = true) {
static const uint64_t dummy = 1;
- MPMCQueue<T>::enqueue(std::forward<U>(e));
+ if (!MPMCQueue<T>::enqueue(std::forward<U>(e), unbounded))
+ return false;
// memory barrier here, so any load/store in enqueue must be finialized
if (wait_sig.exchange(false, std::memory_order_acq_rel))
{
@@ -655,6 +554,107 @@ class MPMCQueueEventDriven: public MPMCQueue<T> {
}
};
+class ThreadCall {
+ public: class Handle;
+ private:
+ int ctl_fd[2];
+ EventContext ec;
+ const size_t burst_size;
+ using queue_t = MPSCQueueEventDriven<Handle *>;
+ queue_t q;
+
+ public:
+ struct Result {
+ void *data;
+ std::function<void(void *)> deleter;
+ Result(): data(nullptr) {}
+ Result(void *data, std::function<void(void *)> &&deleter):
+ data(data), deleter(std::move(deleter)) {}
+ ~Result() { if (data != nullptr) deleter(data); }
+ Result(const Result &) = delete;
+ Result(Result &&other):
+ data(other.data), deleter(std::move(other.deleter)) {
+ other.data = nullptr;
+ }
+ void swap(Result &other) {
+ std::swap(data, other.data);
+ std::swap(deleter, other.deleter);
+ }
+ Result &operator=(const Result &other) = delete;
+ Result &operator=(Result &&other) {
+ if (this != &other)
+ {
+ Result tmp(std::move(other));
+ tmp.swap(*this);
+ }
+ return *this;
+ }
+ void *get() { return data; }
+ };
+ class Handle {
+ std::function<void(Handle &)> callback;
+ ThreadNotifier<Result> * notifier;
+ Result result;
+ friend ThreadCall;
+ public:
+ Handle(): notifier(nullptr) {}
+ void exec() {
+ callback(*this);
+ if (notifier)
+ notifier->notify(std::move(result));
+ }
+ template<typename T>
+ void set_result(T &&data) {
+ using _T = std::remove_reference_t<T>;
+ result = Result(new _T(std::forward<T>(data)),
+ [](void *ptr) {delete static_cast<_T *>(ptr);});
+ }
+ };
+
+ ThreadCall(size_t burst_size): burst_size(burst_size) {}
+ ThreadCall(const ThreadCall &) = delete;
+ ThreadCall(ThreadCall &&) = delete;
+ ThreadCall(EventContext ec, size_t burst_size = 128): ec(ec), burst_size(burst_size) {
+ q.reg_handler(ec, [this, burst_size=burst_size](queue_t &q) {
+ size_t cnt = 0;
+ Handle *h;
+ while (q.try_dequeue(h))
+ {
+ h->exec();
+ delete h;
+ if (++cnt == burst_size) return true;
+ }
+ return false;
+ });
+ }
+
+ ~ThreadCall() {
+ Handle *h;
+ while (q.try_dequeue(h)) delete h;
+ close(ctl_fd[0]);
+ close(ctl_fd[1]);
+ }
+
+ template<typename Func>
+ void async_call(Func callback) {
+ auto h = new Handle();
+ h->callback = callback;
+ q.enqueue(h);
+ }
+
+ template<typename Func>
+ Result call(Func callback) {
+ auto h = new Handle();
+ h->callback = callback;
+ ThreadNotifier<Result> notifier;
+ h->notifier = &notifier;
+ q.enqueue(h);
+ return notifier.wait();
+ }
+
+ const EventContext &get_ec() const { return ec; }
+};
+
}
#endif
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 7814d56..2cef81d 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -140,6 +140,7 @@ class MsgNetwork: public ConnPool {
MsgNetwork(const EventContext &ec, const Config &config):
ConnPool(ec, config) {
+ incoming_msgs.set_capacity(65536);
incoming_msgs.reg_handler(ec, [this, burst_size=config._burst_size](queue_t &q) {
std::pair<Msg, conn_t> item;
size_t cnt = 0;
@@ -179,7 +180,7 @@ class MsgNetwork: public ConnPool {
}
template<typename MsgType>
- void send_msg(MsgType &&msg, const conn_t &conn);
+ bool send_msg(MsgType &&msg, const conn_t &conn);
using ConnPool::listen;
#ifdef SALTICIDAE_MSG_STAT
#endif
@@ -413,6 +414,8 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
using MsgNet::send_msg;
template<typename MsgType>
void send_msg(MsgType &&msg, const NetAddr &paddr);
+ template<typename MsgType>
+ void multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs);
void listen(NetAddr listen_addr);
bool has_peer(const NetAddr &paddr) const;
conn_t connect(const NetAddr &addr) = delete;
@@ -455,16 +458,16 @@ void MsgNetwork<OpcodeType>::Conn::on_read() {
template<typename OpcodeType>
template<typename MsgType>
-void MsgNetwork<OpcodeType>::send_msg(MsgType &&_msg, const conn_t &conn) {
+bool MsgNetwork<OpcodeType>::send_msg(MsgType &&_msg, const conn_t &conn) {
Msg msg(std::forward<MsgType>(_msg));
bytearray_t msg_data = msg.serialize();
SALTICIDAE_LOG_DEBUG("wrote message %s to %s",
std::string(msg).c_str(),
std::string(*conn).c_str());
- conn->write(std::move(msg_data));
#ifdef SALTICIDAE_MSG_STAT
conn->nsent++;
#endif
+ return conn->write(std::move(msg_data));
}
template<typename O, O _, O __>
@@ -697,6 +700,22 @@ void PeerNetwork<O, _, __>::send_msg(MsgType &&msg, const NetAddr &paddr) {
send_msg(std::move(msg), it->second->conn);
});
}
+
+template<typename O, O _, O __>
+template<typename MsgType>
+void PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &paddrs) {
+ this->disp_tcall->async_call(
+ [this, msg=std::forward<MsgType>(msg), paddrs](ThreadCall::Handle &) {
+ for (auto &paddr: paddrs)
+ {
+ auto it = id2peer.find(paddr);
+ if (it == id2peer.end())
+ throw PeerNetworkError("peer does not exist");
+ send_msg(std::move(msg), it->second->conn);
+ }
+ });
+}
+
/* end: functions invoked by the user loop */
template<typename OpcodeType>
diff --git a/include/salticidae/queue.h b/include/salticidae/queue.h
index a72a28f..0b4dae0 100644
--- a/include/salticidae/queue.h
+++ b/include/salticidae/queue.h
@@ -107,10 +107,8 @@ class MPMCQueue {
MPMCQueue(const MPMCQueue &) = delete;
MPMCQueue(MPMCQueue &&) = delete;
- MPMCQueue(size_t capacity = 65536): head(new Block()), tail(head.load()) {
+ MPMCQueue(): head(new Block()), tail(head.load()) {
head.load()->next = nullptr;
- while (capacity--)
- blks.push(new Block());
}
~MPMCQueue() {
@@ -122,10 +120,18 @@ class MPMCQueue {
}
}
+ void set_capacity(size_t capacity = 0) {
+ while (capacity--) blks.push(new Block());
+ }
+
template<typename U>
- bool enqueue(U &&e) {
+ bool enqueue(U &&e, bool unbounded = true) {
FreeList::Node * _nblk;
- if (!blks.pop(_nblk)) _nblk = new Block();
+ if (!blks.pop(_nblk))
+ {
+ if (unbounded) _nblk = new Block();
+ else return false;
+ }
_enqueue(static_cast<Block *>(_nblk), std::forward<U>(e));
return true;
}
diff --git a/src/conn.cpp b/src/conn.cpp
index 4ab3040..5fc59f3 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -185,6 +185,7 @@ void ConnPool::accept_client(int fd, int) {
NetAddr addr((struct sockaddr_in *)&client_addr);
conn_t conn = create_conn();
conn->self_ref = conn;
+ conn->send_buffer.set_capacity(queue_capacity);
conn->seg_buff_size = seg_buff_size;
conn->fd = client_fd;
conn->cpool = this;
@@ -264,6 +265,7 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) {
throw ConnPoolError(std::string("unable to set nonblocking socket"));
conn_t conn = create_conn();
conn->self_ref = conn;
+ conn->send_buffer.set_capacity(queue_capacity);
conn->seg_buff_size = seg_buff_size;
conn->fd = fd;
conn->cpool = this;
diff --git a/test/bench_network.cpp b/test/bench_network.cpp
index 89289fe..58f4a64 100644
--- a/test/bench_network.cpp
+++ b/test/bench_network.cpp
@@ -79,7 +79,7 @@ struct MyNet: public MsgNetworkByteOp {
const std::string name,
const NetAddr &peer,
double stat_timeout = -1):
- MsgNetworkByteOp(ec, MsgNetworkByteOp::Config().burst_size(1000)),
+ MsgNetworkByteOp(ec, MsgNetworkByteOp::Config().burst_size(1000).queue_capacity(65536)),
name(name),
peer(peer),
ev_period_stat(ec, [this, stat_timeout](TimerEvent &) {