aboutsummaryrefslogtreecommitdiff
path: root/include
diff options
context:
space:
mode:
Diffstat (limited to 'include')
-rw-r--r--include/salticidae/conn.h16
-rw-r--r--include/salticidae/event.h307
-rw-r--r--include/salticidae/network.h25
3 files changed, 232 insertions, 116 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index a60cd8f..e7d6295 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -86,8 +86,8 @@ class ConnPool {
MPSCWriteBuffer send_buffer;
SegBuffer recv_buffer;
- Event ev_connect;
- Event ev_socket;
+ TimedFdEvent ev_connect;
+ FdEvent ev_socket;
/** does not need to wait if true */
bool ready_send;
@@ -161,7 +161,7 @@ class ConnPool {
conn_callback_t conn_cb;
/* owned by the dispatcher */
- Event ev_listen;
+ FdEvent ev_listen;
std::unordered_map<int, conn_t> pool;
int listen_fd; /**< for accepting new network connections */
@@ -207,18 +207,18 @@ class ConnPool {
if (conn->ready_send)
{
conn->ev_socket.del();
- conn->ev_socket.add(Event::READ | Event::WRITE);
- conn->send_data(client_fd, Event::WRITE);
+ conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE);
+ conn->send_data(client_fd, FdEvent::WRITE);
}
return false;
});
- conn->ev_socket = Event(ec, client_fd, [conn=conn](int fd, int what) {
- if (what & Event::READ)
+ conn->ev_socket = FdEvent(ec, client_fd, [conn=conn](int fd, int what) {
+ if (what & FdEvent::READ)
conn->recv_data(fd, what);
else
conn->send_data(fd, what);
});
- conn->ev_socket.add(Event::READ | Event::WRITE);
+ conn->ev_socket.add(FdEvent::READ | FdEvent::WRITE);
nconn++;
});
}
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index ca48e69..2365a71 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -68,101 +68,153 @@ class EventContext: public _event_context_ot {
void stop() const { uv_stop(get()); }
};
-class Event {
+static void _on_uv_handle_close(uv_handle_t *h) { delete h; }
+
+class FdEvent {
public:
using callback_t = std::function<void(int fd, int events)>;
static const int READ = UV_READABLE;
static const int WRITE = UV_WRITABLE;
static const int ERROR = 1 << 30;
- static const int TIMEOUT = 1 << 29;
- private:
- EventContext eb;
+ protected:
+ EventContext ec;
int fd;
uv_poll_t *ev_fd;
- uv_timer_t *ev_timer;
callback_t callback;
+
static inline void fd_then(uv_poll_t *h, int status, int events) {
if (status != 0)
events |= ERROR;
- auto event = static_cast<Event *>(h->data);
+ auto event = static_cast<FdEvent *>(h->data);
event->callback(event->fd, events);
}
- static inline void timer_then(uv_timer_t *h) {
- auto event = static_cast<Event *>(h->data);
- if (event->ev_fd) uv_poll_stop(event->ev_fd);
- event->callback(event->fd, TIMEOUT);
+ public:
+ FdEvent(): ec(nullptr), ev_fd(nullptr) {}
+ FdEvent(const EventContext &ec, int fd, callback_t callback):
+ ec(ec), fd(fd), ev_fd(new uv_poll_t()),
+ callback(callback) {
+ uv_poll_init(ec.get(), ev_fd, fd);
+ ev_fd->data = this;
}
- static void _on_handle_close(uv_handle_t *h) {
- delete h;
+ FdEvent(const FdEvent &) = delete;
+ FdEvent(FdEvent &&other):
+ ec(std::move(other.ec)), fd(other.fd), ev_fd(other.ev_fd),
+ callback(std::move(other.callback)) {
+ other.ev_fd = nullptr;
+ if (ev_fd != nullptr)
+ ev_fd->data = this;
+ }
+
+ void swap(FdEvent &other) {
+ std::swap(ec, other.ec);
+ std::swap(fd, other.fd);
+ std::swap(ev_fd, other.ev_fd);
+ std::swap(callback, other.callback);
+ if (ev_fd != nullptr)
+ ev_fd->data = this;
+ if (other.ev_fd != nullptr)
+ other.ev_fd->data = &other;
}
- public:
- Event(): eb(nullptr), ev_fd(nullptr), ev_timer(nullptr) {}
- Event(const EventContext &eb, int fd, callback_t callback):
- eb(eb), fd(fd),
- ev_fd(nullptr),
- ev_timer(new uv_timer_t()),
- callback(callback) {
- if (fd != -1)
+ FdEvent &operator=(FdEvent &&other) {
+ if (this != &other)
{
- ev_fd = new uv_poll_t();
- uv_poll_init(eb.get(), ev_fd, fd);
- ev_fd->data = this;
+ FdEvent tmp(std::move(other));
+ tmp.swap(*this);
}
- uv_timer_init(eb.get(), ev_timer);
- ev_timer->data = this;
+ return *this;
}
- Event(const Event &) = delete;
- Event(Event &&other):
- eb(std::move(other.eb)), fd(other.fd),
- ev_fd(other.ev_fd), ev_timer(other.ev_timer),
- callback(std::move(other.callback)) {
- other.del();
+ ~FdEvent() { clear(); }
+
+ void clear() {
if (ev_fd != nullptr)
{
- other.ev_fd = nullptr;
- ev_fd->data = this;
+ uv_poll_stop(ev_fd);
+ uv_close((uv_handle_t *)ev_fd, _on_uv_handle_close);
+ ev_fd = nullptr;
}
- other.ev_timer = nullptr;
+ callback = nullptr;
+ }
+
+ void set_callback(callback_t _callback) {
+ callback = _callback;
+ }
+
+ void add(int events) {
+ assert(ev_fd != nullptr);
+ uv_poll_start(ev_fd, events, FdEvent::fd_then);
+ }
+
+ void del() {
+ if (ev_fd != nullptr) uv_poll_stop(ev_fd);
+ }
+
+ operator bool() const { return ev_fd != nullptr; }
+};
+
+
+class TimerEvent {
+ public:
+ using callback_t = std::function<void(TimerEvent &)>;
+
+ protected:
+ EventContext ec;
+ uv_timer_t *ev_timer;
+ callback_t callback;
+
+ static inline void timer_then(uv_timer_t *h) {
+ auto event = static_cast<TimerEvent *>(h->data);
+ event->callback(*event);
+ }
+
+ public:
+ TimerEvent(): ec(nullptr), ev_timer(nullptr) {}
+ TimerEvent(const EventContext &ec, callback_t callback):
+ ec(ec), ev_timer(new uv_timer_t()),
+ callback(callback) {
+ uv_timer_init(ec.get(), ev_timer);
ev_timer->data = this;
}
- Event &operator=(Event &&other) {
- clear();
- other.del();
- eb = std::move(other.eb);
- fd = other.fd;
- ev_fd = other.ev_fd;
- ev_timer = other.ev_timer;
- callback = std::move(other.callback);
+ TimerEvent(const TimerEvent &) = delete;
+ TimerEvent(TimerEvent &&other):
+ ec(std::move(other.ec)), ev_timer(other.ev_timer),
+ callback(std::move(other.callback)) {
+ other.ev_timer = nullptr;
+ if (ev_timer != nullptr)
+ ev_timer->data = this;
+ }
- if (ev_fd != nullptr)
+ void swap(TimerEvent &other) {
+ std::swap(ec, other.ec);
+ std::swap(ev_timer, other.ev_timer);
+ std::swap(callback, other.callback);
+ if (ev_timer != nullptr)
+ ev_timer->data = this;
+ if (other.ev_timer != nullptr)
+ other.ev_timer->data = &other;
+ }
+
+ TimerEvent &operator=(TimerEvent &&other) {
+ if (this != &other)
{
- other.ev_fd = nullptr;
- ev_fd->data = this;
+ TimerEvent tmp(std::move(other));
+ tmp.swap(*this);
}
- other.ev_timer = nullptr;
- ev_timer->data = this;
return *this;
}
- ~Event() { clear(); }
+ ~TimerEvent() { clear(); }
void clear() {
- if (ev_fd != nullptr)
- {
- uv_poll_stop(ev_fd);
- uv_close((uv_handle_t *)ev_fd, Event::_on_handle_close);
- ev_fd = nullptr;
- }
if (ev_timer != nullptr)
{
uv_timer_stop(ev_timer);
- uv_close((uv_handle_t *)ev_timer, Event::_on_handle_close);
+ uv_close((uv_handle_t *)ev_timer, _on_uv_handle_close);
ev_timer = nullptr;
}
callback = nullptr;
@@ -172,28 +224,90 @@ class Event {
callback = _callback;
}
- void add(int events) {
- if (ev_fd) uv_poll_start(ev_fd, events, Event::fd_then);
+ void add(double t_sec) {
+ assert(ev_timer != nullptr);
+ uv_timer_start(ev_timer, TimerEvent::timer_then, uint64_t(t_sec * 1000), 0);
}
+
void del() {
- if (ev_fd) uv_poll_stop(ev_fd);
- if (ev_timer == nullptr)
- assert(ev_timer);
- uv_timer_stop(ev_timer);
+ if (ev_timer != nullptr) uv_timer_stop(ev_timer);
+ }
+
+ operator bool() const { return ev_timer != nullptr; }
+};
+
+class TimedFdEvent: public FdEvent, public TimerEvent {
+ public:
+ static const int TIMEOUT = 1 << 29;
+
+ private:
+ static inline void timer_then(uv_timer_t *h) {
+ auto event = static_cast<TimedFdEvent *>(h->data);
+ event->FdEvent::del();
+ event->FdEvent::callback(event->fd, TIMEOUT);
}
- void add_with_timeout(double t_sec, int events) {
- add(events);
- uv_timer_start(ev_timer, Event::timer_then, uint64_t(t_sec * 1000), 0);
+
+ public:
+ TimedFdEvent() = default;
+ TimedFdEvent(const EventContext &ec, int fd, FdEvent::callback_t callback):
+ FdEvent(ec, fd, callback),
+ TimerEvent(ec, TimerEvent::callback_t()) {
+ ev_timer->data = this;
}
- operator bool() const { return ev_fd != nullptr || ev_timer != nullptr; }
+ TimedFdEvent(TimedFdEvent &&other):
+ FdEvent(static_cast<FdEvent &&>(other)),
+ TimerEvent(static_cast<TimerEvent &&>(other)) {
+ if (ev_timer != nullptr)
+ ev_timer->data = this;
+ }
+
+ void swap(TimedFdEvent &other) {
+ std::swap(static_cast<FdEvent &>(*this), static_cast<FdEvent &>(other));
+ std::swap(static_cast<TimerEvent &>(*this), static_cast<TimerEvent &>(other));
+ if (ev_timer != nullptr)
+ ev_timer->data = this;
+ }
+
+ TimedFdEvent &operator=(TimedFdEvent &&other) {
+ if (this != &other)
+ {
+ TimedFdEvent tmp(std::move(other));
+ tmp.swap(*this);
+ }
+ return *this;
+ }
+
+ void clear() {
+ TimerEvent::clear();
+ FdEvent::clear();
+ }
+
+ using FdEvent::set_callback;
+
+ void add(int events) = delete;
+ void add(double t_sec) = delete;
+
+ void add(int events, double t_sec) {
+ assert(ev_fd != nullptr && ev_timer != nullptr);
+ uv_timer_start(ev_timer, TimedFdEvent::timer_then,
+ uint64_t(t_sec * 1000), 0);
+ FdEvent::add(events);
+ }
+
+ void del() {
+ TimerEvent::del();
+ FdEvent::del();
+ }
+
+ operator bool() const { return ev_fd != nullptr; }
};
class SigEvent {
public:
using callback_t = std::function<void(int signum)>;
private:
- EventContext eb;
+ EventContext ec;
uv_signal_t *ev_sig;
callback_t callback;
static inline void sig_then(uv_signal_t *h, int signum) {
@@ -201,39 +315,40 @@ class SigEvent {
event->callback(signum);
}
- static void _on_handle_close(uv_handle_t *h) {
- delete h;
- }
-
public:
- SigEvent(): eb(nullptr), ev_sig(nullptr) {}
- SigEvent(const EventContext &eb, callback_t callback):
- eb(eb),
- ev_sig(new uv_signal_t()),
+ SigEvent(): ec(nullptr), ev_sig(nullptr) {}
+ SigEvent(const EventContext &ec, callback_t callback):
+ ec(ec), ev_sig(new uv_signal_t()),
callback(callback) {
- uv_signal_init(eb.get(), ev_sig);
+ uv_signal_init(ec.get(), ev_sig);
ev_sig->data = this;
}
SigEvent(const SigEvent &) = delete;
SigEvent(SigEvent &&other):
- eb(std::move(other.eb)),
- ev_sig(other.ev_sig),
+ ec(std::move(other.ec)), ev_sig(other.ev_sig),
callback(std::move(other.callback)) {
- other.del();
other.ev_sig = nullptr;
- ev_sig->data = this;
+ if (ev_sig != nullptr)
+ ev_sig->data = this;
}
- SigEvent &operator=(SigEvent &&other) {
- clear();
- other.del();
- eb = std::move(other.eb);
- ev_sig = other.ev_sig;
- callback = std::move(other.callback);
+ void swap(SigEvent &other) {
+ std::swap(ec, other.ec);
+ std::swap(ev_sig, other.ev_sig);
+ std::swap(callback, other.callback);
+ if (ev_sig != nullptr)
+ ev_sig->data = this;
+ if (other.ev_sig != nullptr)
+ other.ev_sig->data = &other;
+ }
- other.ev_sig = nullptr;
- ev_sig->data = this;
+ SigEvent &operator=(SigEvent &&other) {
+ if (this != &other)
+ {
+ SigEvent tmp(std::move(other));
+ tmp.swap(*this);
+ }
return *this;
}
@@ -243,7 +358,7 @@ class SigEvent {
if (ev_sig != nullptr)
{
uv_signal_stop(ev_sig);
- uv_close((uv_handle_t *)ev_sig, SigEvent::_on_handle_close);
+ uv_close((uv_handle_t *)ev_sig, _on_uv_handle_close);
ev_sig = nullptr;
}
callback = nullptr;
@@ -254,15 +369,17 @@ class SigEvent {
}
void add(int signum) {
+ assert(ev_sig != nullptr);
uv_signal_start(ev_sig, SigEvent::sig_then, signum);
}
void add_once(int signum) {
+ assert(ev_sig != nullptr);
uv_signal_start_oneshot(ev_sig, SigEvent::sig_then, signum);
}
void del() {
- uv_signal_stop(ev_sig);
+ if (ev_sig != nullptr) uv_signal_stop(ev_sig);
}
operator bool() const { return ev_sig != nullptr; }
@@ -292,7 +409,7 @@ class ThreadNotifier {
class ThreadCall {
int ctl_fd[2];
EventContext ec;
- Event ev_listen;
+ FdEvent ev_listen;
public:
struct Result {
@@ -347,13 +464,13 @@ class ThreadCall {
ThreadCall(EventContext ec): ec(ec) {
if (pipe2(ctl_fd, O_NONBLOCK))
throw SalticidaeError(std::string("ThreadCall: failed to create pipe"));
- ev_listen = Event(ec, ctl_fd[0], [this](int fd, int) {
+ ev_listen = FdEvent(ec, ctl_fd[0], [this](int fd, int) {
Handle *h;
read(fd, &h, sizeof(h));
h->exec();
delete h;
});
- ev_listen.add(Event::READ);
+ ev_listen.add(FdEvent::READ);
}
~ThreadCall() {
@@ -394,7 +511,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
const uint64_t dummy = 1;
std::atomic<bool> wait_sig;
int fd;
- Event ev;
+ FdEvent ev;
public:
MPSCQueueEventDriven(size_t capacity = 65536):
@@ -409,7 +526,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
template<typename Func>
void reg_handler(const EventContext &ec, Func &&func) {
- ev = Event(ec, fd,
+ ev = FdEvent(ec, fd,
[this, func=std::forward<Func>(func)](int, int) {
//fprintf(stderr, "%x\n", std::this_thread::get_id());
uint64_t t;
@@ -424,7 +541,7 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
if (func(*this))
write(fd, &dummy, 8);
});
- ev.add(Event::READ);
+ ev.add(FdEvent::READ);
}
void unreg_handler() { ev.clear(); }
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index f2b99b3..51effe0 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -267,7 +267,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
class Conn: public MsgNet::Conn {
friend PeerNetwork;
NetAddr peer_id;
- Event ev_timeout;
+ TimerEvent ev_timeout;
void reset_timeout(double timeout);
public:
@@ -297,8 +297,8 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
NetAddr addr;
/** the underlying connection, may be invalid when connected = false */
conn_t conn;
- Event ev_ping_timer;
- Event ev_retry_timer;
+ TimerEvent ev_ping_timer;
+ TimerEvent ev_retry_timer;
bool ping_timer_ok;
bool pong_msg_ok;
bool connected;
@@ -307,13 +307,13 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
Peer(NetAddr addr, conn_t conn, const EventContext &ec):
addr(addr), conn(conn),
ev_ping_timer(
- Event(ec, -1, std::bind(&Peer::ping_timer, this, _1, _2))),
+ TimerEvent(ec, std::bind(&Peer::ping_timer, this, _1))),
connected(false) {}
~Peer() {}
Peer &operator=(const Peer &) = delete;
Peer(const Peer &) = delete;
- void ping_timer(int, int);
+ void ping_timer(TimerEvent &);
void reset_ping_timer();
void send_ping();
void clear_all_events() {
@@ -491,7 +491,7 @@ void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker,
worker->get_tcall()->async_call([conn, t=timeout](ThreadCall::Handle &) {
if (!conn->ev_timeout) return;
conn->ev_timeout.del();
- conn->ev_timeout.add_with_timeout(t, 0);
+ conn->ev_timeout.add(t);
SALTICIDAE_LOG_DEBUG("reset connection timeout %.2f", t);
});
}
@@ -504,7 +504,7 @@ void PeerNetwork<O, _, __>::Conn::on_setup() {
auto conn = static_pointer_cast<Conn>(this->self());
auto worker = this->worker;
assert(!ev_timeout);
- ev_timeout = Event(worker->get_ec(), -1, [conn](int, int) {
+ ev_timeout = TimerEvent(worker->get_ec(), [conn](TimerEvent &) {
SALTICIDAE_LOG_INFO("peer ping-pong timeout");
conn->worker_terminate();
});
@@ -526,11 +526,11 @@ void PeerNetwork<O, _, __>::Conn::on_teardown() {
p->conn = nullptr;
SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*this).c_str());
// try to reconnect
- p->ev_retry_timer = Event(pn->disp_ec, -1,
- [pn, peer_id = this->peer_id](int, int) {
+ p->ev_retry_timer = TimerEvent(pn->disp_ec,
+ [pn, peer_id = this->peer_id](TimerEvent &) {
pn->start_active_conn(peer_id);
});
- p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout(), 0);
+ p->ev_retry_timer.add(pn->gen_conn_timeout());
}
template<typename O, O _, O __>
@@ -554,8 +554,7 @@ template<typename O, O _, O __>
void PeerNetwork<O, _, __>::Peer::reset_ping_timer() {
assert(ev_ping_timer);
ev_ping_timer.del();
- ev_ping_timer.add_with_timeout(
- gen_rand_timeout(conn->get_net()->ping_period), 0);
+ ev_ping_timer.add(gen_rand_timeout(conn->get_net()->ping_period));
}
template<typename O, O _, O __>
@@ -568,7 +567,7 @@ void PeerNetwork<O, _, __>::Peer::send_ping() {
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Peer::ping_timer(int, int) {
+void PeerNetwork<O, _, __>::Peer::ping_timer(TimerEvent &) {
ping_timer_ok = true;
if (pong_msg_ok)
{