aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2018-11-20 20:43:57 -0500
committerDeterminant <[email protected]>2018-11-20 20:43:57 -0500
commitd2fe5eb74bdf40afc5cacd052f40b56aa3e57eaf (patch)
tree30e042013d87d10cf9e0db95fd9398878b305915
parent60c4af2602e18933f2b795500f44c6613c852f45 (diff)
refactor libuv wrapper classes
-rw-r--r--include/salticidae/conn.h16
-rw-r--r--include/salticidae/event.h307
-rw-r--r--include/salticidae/network.h25
-rw-r--r--src/conn.cpp19
-rw-r--r--src/util.cpp7
-rw-r--r--test/bench_network.cpp20
-rw-r--r--test/test_p2p_stress.cpp8
-rw-r--r--test/test_queue.cpp6
8 files changed, 265 insertions, 143 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index a60cd8f..e7d6295 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -86,8 +86,8 @@ class ConnPool {
MPSCWriteBuffer send_buffer;
SegBuffer recv_buffer;
- Event ev_connect;
- Event ev_socket;
+ TimedFdEvent ev_connect;
+ FdEvent ev_socket;
/** does not need to wait if true */
bool ready_send;
@@ -161,7 +161,7 @@ class ConnPool {
conn_callback_t conn_cb;
/* owned by the dispatcher */
- Event ev_listen;
+ FdEvent ev_listen;
std::unordered_map<int, conn_t> pool;
int listen_fd; /**< for accepting new network connections */
@@ -207,18 +207,18 @@ class ConnPool {
if (conn->ready_send)
{
conn->ev_socket.del();
- conn->ev_socket.add(Event::READ | Event::WRITE);
- conn->send_data(client_fd, Event::WRITE);
+ conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE);
+ conn->send_data(client_fd, FdEvent::WRITE);
}
return false;
});
- conn->ev_socket = Event(ec, client_fd, [conn=conn](int fd, int what) {
- if (what & Event::READ)
+ conn->ev_socket = FdEvent(ec, client_fd, [conn=conn](int fd, int what) {
+ if (what & FdEvent::READ)
conn->recv_data(fd, what);
else
conn->send_data(fd, what);
});
- conn->ev_socket.add(Event::READ | Event::WRITE);
+ conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE);
nconn++;
});
}
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index ca48e69..2365a71 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -68,101 +68,153 @@ class EventContext: public _event_context_ot {
void stop() const { uv_stop(get()); }
};
-class Event {
+static void _on_uv_handle_close(uv_handle_t *h) { delete h; }
+
+class FdEvent {
public:
using callback_t = std::function<void(int fd, int events)>;
static const int READ = UV_READABLE;
static const int WRITE = UV_WRITABLE;
static const int ERROR = 1 << 30;
- static const int TIMEOUT = 1 << 29;
- private:
- EventContext eb;
+ protected:
+ EventContext ec;
int fd;
uv_poll_t *ev_fd;
- uv_timer_t *ev_timer;
callback_t callback;
+
static inline void fd_then(uv_poll_t *h, int status, int events) {
if (status != 0)
events |= ERROR;
- auto event = static_cast<Event *>(h->data);
+ auto event = static_cast<FdEvent *>(h->data);
event->callback(event->fd, events);
}
- static inline void timer_then(uv_timer_t *h) {
- auto event = static_cast<Event *>(h->data);
- if (event->ev_fd) uv_poll_stop(event->ev_fd);
- event->callback(event->fd, TIMEOUT);
+ public:
+ FdEvent(): ec(nullptr), ev_fd(nullptr) {}
+ FdEvent(const EventContext &ec, int fd, callback_t callback):
+ ec(ec), fd(fd), ev_fd(new uv_poll_t()),
+ callback(callback) {
+ uv_poll_init(ec.get(), ev_fd, fd);
+ ev_fd->data = this;
}
- static void _on_handle_close(uv_handle_t *h) {
- delete h;
+ FdEvent(const FdEvent &) = delete;
+ FdEvent(FdEvent &&other):
+ ec(std::move(other.ec)), fd(other.fd), ev_fd(other.ev_fd),
+ callback(std::move(other.callback)) {
+ other.ev_fd = nullptr;
+ if (ev_fd != nullptr)
+ ev_fd->data = this;
+ }
+
+ void swap(FdEvent &other) {
+ std::swap(ec, other.ec);
+ std::swap(fd, other.fd);
+ std::swap(ev_fd, other.ev_fd);
+ std::swap(callback, other.callback);
+ if (ev_fd != nullptr)
+ ev_fd->data = this;
+ if (other.ev_fd != nullptr)
+ other.ev_fd->data = &other;
}
- public:
- Event(): eb(nullptr), ev_fd(nullptr), ev_timer(nullptr) {}
- Event(const EventContext &eb, int fd, callback_t callback):
- eb(eb), fd(fd),
- ev_fd(nullptr),
- ev_timer(new uv_timer_t()),
- callback(callback) {
- if (fd != -1)
+ FdEvent &operator=(FdEvent &&other) {
+ if (this != &other)
{
- ev_fd = new uv_poll_t();
- uv_poll_init(eb.get(), ev_fd, fd);
- ev_fd->data = this;
+ FdEvent tmp(std::move(other));
+ tmp.swap(*this);
}
- uv_timer_init(eb.get(), ev_timer);
- ev_timer->data = this;
+ return *this;
}
- Event(const Event &) = delete;
- Event(Event &&other):
- eb(std::move(other.eb)), fd(other.fd),
- ev_fd(other.ev_fd), ev_timer(other.ev_timer),
- callback(std::move(other.callback)) {
- other.del();
+ ~FdEvent() { clear(); }
+
+ void clear() {
if (ev_fd != nullptr)
{
- other.ev_fd = nullptr;
- ev_fd->data = this;
+ uv_poll_stop(ev_fd);
+ uv_close((uv_handle_t *)ev_fd, _on_uv_handle_close);
+ ev_fd = nullptr;
}
- other.ev_timer = nullptr;
+ callback = nullptr;
+ }
+
+ void set_callback(callback_t _callback) {
+ callback = _callback;
+ }
+
+ void add(int events) {
+ assert(ev_fd != nullptr);
+ uv_poll_start(ev_fd, events, FdEvent::fd_then);
+ }
+
+ void del() {
+ if (ev_fd != nullptr) uv_poll_stop(ev_fd);
+ }
+
+ operator bool() const { return ev_fd != nullptr; }
+};
+
+
+class TimerEvent {
+ public:
+ using callback_t = std::function<void(TimerEvent &)>;
+
+ protected:
+ EventContext ec;
+ uv_timer_t *ev_timer;
+ callback_t callback;
+
+ static inline void timer_then(uv_timer_t *h) {
+ auto event = static_cast<TimerEvent *>(h->data);
+ event->callback(*event);
+ }
+
+ public:
+ TimerEvent(): ec(nullptr), ev_timer(nullptr) {}
+ TimerEvent(const EventContext &ec, callback_t callback):
+ ec(ec), ev_timer(new uv_timer_t()),
+ callback(callback) {
+ uv_timer_init(ec.get(), ev_timer);
ev_timer->data = this;
}
- Event &operator=(Event &&other) {
- clear();
- other.del();
- eb = std::move(other.eb);
- fd = other.fd;
- ev_fd = other.ev_fd;
- ev_timer = other.ev_timer;
- callback = std::move(other.callback);
+ TimerEvent(const TimerEvent &) = delete;
+ TimerEvent(TimerEvent &&other):
+ ec(std::move(other.ec)), ev_timer(other.ev_timer),
+ callback(std::move(other.callback)) {
+ other.ev_timer = nullptr;
+ if (ev_timer != nullptr)
+ ev_timer->data = this;
+ }
- if (ev_fd != nullptr)
+ void swap(TimerEvent &other) {
+ std::swap(ec, other.ec);
+ std::swap(ev_timer, other.ev_timer);
+ std::swap(callback, other.callback);
+ if (ev_timer != nullptr)
+ ev_timer->data = this;
+ if (other.ev_timer != nullptr)
+ other.ev_timer->data = &other;
+ }
+
+ TimerEvent &operator=(TimerEvent &&other) {
+ if (this != &other)
{
- other.ev_fd = nullptr;
- ev_fd->data = this;
+ TimerEvent tmp(std::move(other));
+ tmp.swap(*this);
}
- other.ev_timer = nullptr;
- ev_timer->data = this;
return *this;
}
- ~Event() { clear(); }
+ ~TimerEvent() { clear(); }
void clear() {
- if (ev_fd != nullptr)
- {
- uv_poll_stop(ev_fd);
- uv_close((uv_handle_t *)ev_fd, Event::_on_handle_close);
- ev_fd = nullptr;
- }
if (ev_timer != nullptr)
{
uv_timer_stop(ev_timer);
- uv_close((uv_handle_t *)ev_timer, Event::_on_handle_close);
+ uv_close((uv_handle_t *)ev_timer, _on_uv_handle_close);
ev_timer = nullptr;
}
callback = nullptr;
@@ -172,28 +224,90 @@ class Event {
callback = _callback;
}
- void add(int events) {
- if (ev_fd) uv_poll_start(ev_fd, events, Event::fd_then);
+ void add(double t_sec) {
+ assert(ev_timer != nullptr);
+ uv_timer_start(ev_timer, TimerEvent::timer_then, uint64_t(t_sec * 1000), 0);
}
+
void del() {
- if (ev_fd) uv_poll_stop(ev_fd);
- if (ev_timer == nullptr)
- assert(ev_timer);
- uv_timer_stop(ev_timer);
+ if (ev_timer != nullptr) uv_timer_stop(ev_timer);
+ }
+
+ operator bool() const { return ev_timer != nullptr; }
+};
+
+class TimedFdEvent: public FdEvent, public TimerEvent {
+ public:
+ static const int TIMEOUT = 1 << 29;
+
+ private:
+ static inline void timer_then(uv_timer_t *h) {
+ auto event = static_cast<TimedFdEvent *>(h->data);
+ event->FdEvent::del();
+ event->FdEvent::callback(event->fd, TIMEOUT);
}
- void add_with_timeout(double t_sec, int events) {
- add(events);
- uv_timer_start(ev_timer, Event::timer_then, uint64_t(t_sec * 1000), 0);
+
+ public:
+ TimedFdEvent() = default;
+ TimedFdEvent(const EventContext &ec, int fd, FdEvent::callback_t callback):
+ FdEvent(ec, fd, callback),
+ TimerEvent(ec, TimerEvent::callback_t()) {
+ ev_timer->data = this;
}
- operator bool() const { return ev_fd != nullptr || ev_timer != nullptr; }
+ TimedFdEvent(TimedFdEvent &&other):
+ FdEvent(static_cast<FdEvent &&>(other)),
+ TimerEvent(static_cast<TimerEvent &&>(other)) {
+ if (ev_timer != nullptr)
+ ev_timer->data = this;
+ }
+
+ void swap(TimedFdEvent &other) {
+ std::swap(static_cast<FdEvent &>(*this), static_cast<FdEvent &>(other));
+ std::swap(static_cast<TimerEvent &>(*this), static_cast<TimerEvent &>(other));
+ if (ev_timer != nullptr)
+ ev_timer->data = this;
+ }
+
+ TimedFdEvent &operator=(TimedFdEvent &&other) {
+ if (this != &other)
+ {
+ TimedFdEvent tmp(std::move(other));
+ tmp.swap(*this);
+ }
+ return *this;
+ }
+
+ void clear() {
+ TimerEvent::clear();
+ FdEvent::clear();
+ }
+
+ using FdEvent::set_callback;
+
+ void add(int events) = delete;
+ void add(double t_sec) = delete;
+
+ void add(int events, double t_sec) {
+ assert(ev_fd != nullptr && ev_timer != nullptr);
+ uv_timer_start(ev_timer, TimedFdEvent::timer_then,
+ uint64_t(t_sec * 1000), 0);
+ FdEvent::add(events);
+ }
+
+ void del() {
+ TimerEvent::del();
+ FdEvent::del();
+ }
+
+ operator bool() const { return ev_fd != nullptr; }
};
class SigEvent {
public:
using callback_t = std::function<void(int signum)>;
private:
- EventContext eb;
+ EventContext ec;
uv_signal_t *ev_sig;
callback_t callback;
static inline void sig_then(uv_signal_t *h, int signum) {
@@ -201,39 +315,40 @@ class SigEvent {
event->callback(signum);
}
- static void _on_handle_close(uv_handle_t *h) {
- delete h;
- }
-
public:
- SigEvent(): eb(nullptr), ev_sig(nullptr) {}
- SigEvent(const EventContext &eb, callback_t callback):
- eb(eb),
- ev_sig(new uv_signal_t()),
+ SigEvent(): ec(nullptr), ev_sig(nullptr) {}
+ SigEvent(const EventContext &ec, callback_t callback):
+ ec(ec), ev_sig(new uv_signal_t()),
callback(callback) {
- uv_signal_init(eb.get(), ev_sig);
+ uv_signal_init(ec.get(), ev_sig);
ev_sig->data = this;
}
SigEvent(const SigEvent &) = delete;
SigEvent(SigEvent &&other):
- eb(std::move(other.eb)),
- ev_sig(other.ev_sig),
+ ec(std::move(other.ec)), ev_sig(other.ev_sig),
callback(std::move(other.callback)) {
- other.del();
other.ev_sig = nullptr;
- ev_sig->data = this;
+ if (ev_sig != nullptr)
+ ev_sig->data = this;
}
- SigEvent &operator=(SigEvent &&other) {
- clear();
- other.del();
- eb = std::move(other.eb);
- ev_sig = other.ev_sig;
- callback = std::move(other.callback);
+ void swap(SigEvent &other) {
+ std::swap(ec, other.ec);
+ std::swap(ev_sig, other.ev_sig);
+ std::swap(callback, other.callback);
+ if (ev_sig != nullptr)
+ ev_sig->data = this;
+ if (other.ev_sig != nullptr)
+ other.ev_sig->data = &other;
+ }
- other.ev_sig = nullptr;
- ev_sig->data = this;
+ SigEvent &operator=(SigEvent &&other) {
+ if (this != &other)
+ {
+ SigEvent tmp(std::move(other));
+ tmp.swap(*this);
+ }
return *this;
}
@@ -243,7 +358,7 @@ class SigEvent {
if (ev_sig != nullptr)
{
uv_signal_stop(ev_sig);
- uv_close((uv_handle_t *)ev_sig, SigEvent::_on_handle_close);
+ uv_close((uv_handle_t *)ev_sig, _on_uv_handle_close);
ev_sig = nullptr;
}
callback = nullptr;
@@ -254,15 +369,17 @@ class SigEvent {
}
void add(int signum) {
+ assert(ev_sig != nullptr);
uv_signal_start(ev_sig, SigEvent::sig_then, signum);
}
void add_once(int signum) {
+ assert(ev_sig != nullptr);
uv_signal_start_oneshot(ev_sig, SigEvent::sig_then, signum);
}
void del() {
- uv_signal_stop(ev_sig);
+ if (ev_sig != nullptr) uv_signal_stop(ev_sig);
}
operator bool() const { return ev_sig != nullptr; }
@@ -292,7 +409,7 @@ class ThreadNotifier {
class ThreadCall {
int ctl_fd[2];
EventContext ec;
- Event ev_listen;
+ FdEvent ev_listen;
public:
struct Result {
@@ -347,13 +464,13 @@ class ThreadCall {
ThreadCall(EventContext ec): ec(ec) {
if (pipe2(ctl_fd, O_NONBLOCK))
throw SalticidaeError(std::string("ThreadCall: failed to create pipe"));
- ev_listen = Event(ec, ctl_fd[0], [this](int fd, int) {
+ ev_listen = FdEvent(ec, ctl_fd[0], [this](int fd, int) {
Handle *h;
read(fd, &h, sizeof(h));
h->exec();
delete h;
});
- ev_listen.add(Event::READ);
+ ev_listen.add(FdEvent::READ);
}
~ThreadCall() {
@@ -394,7 +511,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
const uint64_t dummy = 1;
std::atomic<bool> wait_sig;
int fd;
- Event ev;
+ FdEvent ev;
public:
MPSCQueueEventDriven(size_t capacity = 65536):
@@ -409,7 +526,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
template<typename Func>
void reg_handler(const EventContext &ec, Func &&func) {
- ev = Event(ec, fd,
+ ev = FdEvent(ec, fd,
[this, func=std::forward<Func>(func)](int, int) {
//fprintf(stderr, "%x\n", std::this_thread::get_id());
uint64_t t;
@@ -424,7 +541,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
if (func(*this))
write(fd, &dummy, 8);
});
- ev.add(Event::READ);
+ ev.add(FdEvent::READ);
}
void unreg_handler() { ev.clear(); }
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index f2b99b3..51effe0 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -267,7 +267,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
class Conn: public MsgNet::Conn {
friend PeerNetwork;
NetAddr peer_id;
- Event ev_timeout;
+ TimerEvent ev_timeout;
void reset_timeout(double timeout);
public:
@@ -297,8 +297,8 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
NetAddr addr;
/** the underlying connection, may be invalid when connected = false */
conn_t conn;
- Event ev_ping_timer;
- Event ev_retry_timer;
+ TimerEvent ev_ping_timer;
+ TimerEvent ev_retry_timer;
bool ping_timer_ok;
bool pong_msg_ok;
bool connected;
@@ -307,13 +307,13 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
Peer(NetAddr addr, conn_t conn, const EventContext &ec):
addr(addr), conn(conn),
ev_ping_timer(
- Event(ec, -1, std::bind(&Peer::ping_timer, this, _1, _2))),
+ TimerEvent(ec, std::bind(&Peer::ping_timer, this, _1))),
connected(false) {}
~Peer() {}
Peer &operator=(const Peer &) = delete;
Peer(const Peer &) = delete;
- void ping_timer(int, int);
+ void ping_timer(TimerEvent &);
void reset_ping_timer();
void send_ping();
void clear_all_events() {
@@ -491,7 +491,7 @@ void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker,
worker->get_tcall()->async_call([conn, t=timeout](ThreadCall::Handle &) {
if (!conn->ev_timeout) return;
conn->ev_timeout.del();
- conn->ev_timeout.add_with_timeout(t, 0);
+ conn->ev_timeout.add(t);
SALTICIDAE_LOG_DEBUG("reset connection timeout %.2f", t);
});
}
@@ -504,7 +504,7 @@ void PeerNetwork<O, _, __>::Conn::on_setup() {
auto conn = static_pointer_cast<Conn>(this->self());
auto worker = this->worker;
assert(!ev_timeout);
- ev_timeout = Event(worker->get_ec(), -1, [conn](int, int) {
+ ev_timeout = TimerEvent(worker->get_ec(), [conn](TimerEvent &) {
SALTICIDAE_LOG_INFO("peer ping-pong timeout");
conn->worker_terminate();
});
@@ -526,11 +526,11 @@ void PeerNetwork<O, _, __>::Conn::on_teardown() {
p->conn = nullptr;
SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*this).c_str());
// try to reconnect
- p->ev_retry_timer = Event(pn->disp_ec, -1,
- [pn, peer_id = this->peer_id](int, int) {
+ p->ev_retry_timer = TimerEvent(pn->disp_ec,
+ [pn, peer_id = this->peer_id](TimerEvent &) {
pn->start_active_conn(peer_id);
});
- p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout(), 0);
+ p->ev_retry_timer.add(pn->gen_conn_timeout());
}
template<typename O, O _, O __>
@@ -554,8 +554,7 @@ template<typename O, O _, O __>
void PeerNetwork<O, _, __>::Peer::reset_ping_timer() {
assert(ev_ping_timer);
ev_ping_timer.del();
- ev_ping_timer.add_with_timeout(
- gen_rand_timeout(conn->get_net()->ping_period), 0);
+ ev_ping_timer.add(gen_rand_timeout(conn->get_net()->ping_period));
}
template<typename O, O _, O __>
@@ -568,7 +567,7 @@ void PeerNetwork<O, _, __>::Peer::send_ping() {
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Peer::ping_timer(int, int) {
+void PeerNetwork<O, _, __>::Peer::ping_timer(TimerEvent &) {
ping_timer_ok = true;
if (pong_msg_ok)
{
diff --git a/src/conn.cpp b/src/conn.cpp
index 51a5346..4ab3040 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -54,7 +54,7 @@ ConnPool::Conn::operator std::string() const {
/* the following two functions are executed by exactly one worker per Conn object */
void ConnPool::Conn::send_data(int fd, int events) {
- if (events & Event::ERROR)
+ if (events & FdEvent::ERROR)
{
worker_terminate();
return;
@@ -93,13 +93,13 @@ void ConnPool::Conn::send_data(int fd, int events) {
}
}
ev_socket.del();
- ev_socket.add(Event::READ);
+ ev_socket.add(FdEvent::READ);
/* consumed the buffer but endpoint still seems to be writable */
ready_send = true;
}
void ConnPool::Conn::recv_data(int fd, int events) {
- if (events & Event::ERROR)
+ if (events & FdEvent::ERROR)
{
worker_terminate();
return;
@@ -172,7 +172,8 @@ void ConnPool::accept_client(int fd, int) {
struct sockaddr client_addr;
socklen_t addr_size = sizeof(struct sockaddr_in);
if ((client_fd = accept(fd, &client_addr, &addr_size)) < 0)
- SALTICIDAE_LOG_ERROR("error while accepting the connection");
+ SALTICIDAE_LOG_ERROR("error while accepting the connection: %s",
+ strerror(errno));
else
{
int one = 1;
@@ -213,7 +214,7 @@ void ConnPool::Conn::conn_server(int fd, int events) {
}
else
{
- if (events & Event::TIMEOUT)
+ if (events & TimedFdEvent::TIMEOUT)
SALTICIDAE_LOG_INFO("%s connect timeout", std::string(*this).c_str());
conn->disp_terminate();
return;
@@ -245,9 +246,9 @@ void ConnPool::_listen(NetAddr listen_addr) {
throw ConnPoolError(std::string("binding error"));
if (::listen(listen_fd, max_listen_backlog) < 0)
throw ConnPoolError(std::string("listen error"));
- ev_listen = Event(disp_ec, listen_fd,
+ ev_listen = FdEvent(disp_ec, listen_fd,
std::bind(&ConnPool::accept_client, this, _1, _2));
- ev_listen.add(Event::READ);
+ ev_listen.add(FdEvent::READ);
SALTICIDAE_LOG_INFO("listening to %u", ntohs(listen_addr.port));
}
@@ -283,8 +284,8 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) {
}
else
{
- conn->ev_connect = Event(disp_ec, conn->fd, std::bind(&Conn::conn_server, conn.get(), _1, _2));
- conn->ev_connect.add_with_timeout(conn_server_timeout, Event::WRITE);
+ conn->ev_connect = TimedFdEvent(disp_ec, conn->fd, std::bind(&Conn::conn_server, conn.get(), _1, _2));
+ conn->ev_connect.add(FdEvent::WRITE, conn_server_timeout);
add_conn(conn);
SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str());
}
diff --git a/src/util.cpp b/src/util.cpp
index c51e191..6a8c652 100644
--- a/src/util.cpp
+++ b/src/util.cpp
@@ -51,11 +51,13 @@ double gen_rand_timeout(double base_timeout, double alpha) {
return base_timeout + rand() / (double)RAND_MAX * alpha * base_timeout;
}
-std::string vstringprintf(const char *fmt, va_list ap) {
+std::string vstringprintf(const char *fmt, va_list _ap) {
int guessed_size = 1024;
std::string buff;
+ va_list ap;
+ va_copy(ap, _ap);
buff.resize(guessed_size);
- int nwrote = vsnprintf(&buff[0], guessed_size, fmt, ap);
+ int nwrote = vsnprintf(&buff[0], guessed_size, fmt, _ap);
if (nwrote < 0) buff = "";
else
{
@@ -66,6 +68,7 @@ std::string vstringprintf(const char *fmt, va_list ap) {
buff = "";
}
}
+ va_end(ap);
return std::move(buff);
}
diff --git a/test/bench_network.cpp b/test/bench_network.cpp
index b0856b8..be90e2a 100644
--- a/test/bench_network.cpp
+++ b/test/bench_network.cpp
@@ -42,7 +42,7 @@ using salticidae::MsgNetwork;
using salticidae::htole;
using salticidae::letoh;
using salticidae::bytearray_t;
-using salticidae::Event;
+using salticidae::TimerEvent;
using std::placeholders::_1;
using std::placeholders::_2;
using opcode_t = uint8_t;
@@ -71,8 +71,8 @@ using MsgNetworkByteOp = MsgNetwork<opcode_t>;
struct MyNet: public MsgNetworkByteOp {
const std::string name;
const NetAddr peer;
- Event ev_period_send;
- Event ev_period_stat;
+ TimerEvent ev_period_send;
+ TimerEvent ev_period_stat;
size_t nrecv;
MyNet(const salticidae::EventContext &ec,
@@ -82,17 +82,17 @@ struct MyNet: public MsgNetworkByteOp {
MsgNetworkByteOp(ec, MsgNetworkByteOp::Config()),
name(name),
peer(peer),
- ev_period_stat(ec, -1, [this, stat_timeout](int, short) {
+ ev_period_stat(ec, [this, stat_timeout](TimerEvent &) {
SALTICIDAE_LOG_INFO("%.2f mps\n", nrecv / (double)stat_timeout);
nrecv = 0;
- ev_period_stat.add_with_timeout(stat_timeout, 0);
+ ev_period_stat.add(stat_timeout);
}),
nrecv(0) {
/* message handler could be a bound method */
reg_handler(salticidae::generic_bind(
&MyNet::on_receive_bytes, this, _1, _2));
if (stat_timeout > 0)
- ev_period_stat.add_with_timeout(0, 0);
+ ev_period_stat.add(0);
}
struct Conn: public MsgNetworkByteOp::Conn {
@@ -109,12 +109,12 @@ struct MyNet: public MsgNetworkByteOp {
printf("[%s] Connected, sending hello.\n",
net->name.c_str());
/* send the first message through this connection */
- net->ev_period_send = Event(net->ec, -1,
- [net, conn = self()](int, short) {
+ net->ev_period_send = TimerEvent(net->ec,
+ [net, conn = self()](TimerEvent &) {
net->send_msg(MsgBytes(256), conn);
- net->ev_period_send.add_with_timeout(0, 0);
+ net->ev_period_send.add(0);
});
- net->ev_period_send.add_with_timeout(0, 0);
+ net->ev_period_send.add(0);
}
else
diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp
index b272742..70e3444 100644
--- a/test/test_p2p_stress.cpp
+++ b/test/test_p2p_stress.cpp
@@ -36,7 +36,7 @@ using salticidae::NetAddr;
using salticidae::DataStream;
using salticidae::MsgNetwork;
using salticidae::ConnPool;
-using salticidae::Event;
+using salticidae::TimerEvent;
using salticidae::EventContext;
using salticidae::htole;
using salticidae::letoh;
@@ -78,7 +78,7 @@ using MyNet = salticidae::PeerNetwork<uint8_t>;
std::vector<NetAddr> addrs;
struct TestContext {
- Event timer;
+ TimerEvent timer;
int state;
uint256_t hash;
size_t ncompleted;
@@ -130,7 +130,7 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) {
{
send_rand(tc.state, conn);
tc.state = -1;
- tc.timer = Event(ec, -1, [&, conn](int, int) {
+ tc.timer = TimerEvent(ec, [&, conn](TimerEvent &) {
tc.ncompleted++;
net.terminate(conn);
std::string s;
@@ -139,7 +139,7 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) {
SALTICIDAE_LOG_INFO("%d completed:%s", ntohs(app.addr.port), s.c_str());
});
double t = salticidae::gen_rand_timeout(10);
- tc.timer.add_with_timeout(t, 0);
+ tc.timer.add(t);
SALTICIDAE_LOG_INFO("rand-bomboard phase, ending in %.2f secs", t);
}
else if (tc.state == -1)
diff --git a/test/test_queue.cpp b/test/test_queue.cpp
index 41c6f88..7b06951 100644
--- a/test/test_queue.cpp
+++ b/test/test_queue.cpp
@@ -4,6 +4,8 @@
#include "salticidae/event.h"
+using salticidae::TimerEvent;
+
void test_mpsc(int nproducers = 16, int nops = 100000, size_t burst_size = 128) {
size_t total = nproducers * nops;
salticidae::EventContext ec;
@@ -23,10 +25,10 @@ void test_mpsc(int nproducers = 16, int nops = 100000, size_t burst_size = 128)
});
std::vector<std::thread> producers;
std::thread consumer([&collected, total, &ec]() {
- salticidae::Event timer(ec, -1, [&ec, &collected, total](int, short) {
+ TimerEvent timer(ec, [&ec, &collected, total](TimerEvent &) {
if (collected.load() == total) ec.stop();
});
- timer.add_with_timeout(1, 0);
+ timer.add(1);
ec.dispatch();
});
for (int i = 0; i < nproducers; i++)