aboutsummaryrefslogtreecommitdiff
path: root/include
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2018-11-14 22:18:59 -0500
committerDeterminant <[email protected]>2018-11-14 22:18:59 -0500
commitecc163f98e434b557768560d00ee2f9755d6d950 (patch)
tree7cb9d04ba0dc4761968cd79de0c9be5aab3aa6e4 /include
parent0f341fe7f092f704e1c1952c72085eb1ebd2086a (diff)
major bug fix
Diffstat (limited to 'include')
-rw-r--r--include/salticidae/conn.h91
-rw-r--r--include/salticidae/event.h67
-rw-r--r--include/salticidae/network.h63
3 files changed, 142 insertions, 79 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 2ef6b50..73b3022 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -57,13 +57,13 @@ struct ConnPoolError: public SalticidaeError {
/** Abstraction for connection management. */
class ConnPool {
+ class Worker;
public:
class Conn;
/** The handle to a bi-directional connection. */
using conn_t = ArcObj<Conn>;
/** The type of callback invoked when connection status is changed. */
using conn_callback_t = std::function<void(Conn &, bool)>;
-
/** Abstraction for a bi-directional connection. */
class Conn {
friend ConnPool;
@@ -77,17 +77,16 @@ class ConnPool {
size_t seg_buff_size;
conn_t self_ref;
int fd;
+ Worker *worker;
ConnPool *cpool;
ConnMode mode;
NetAddr addr;
- // TODO: send_buffer should be a thread-safe mpsc queue
MPSCWriteBuffer send_buffer;
SegBuffer recv_buffer;
- Event ev_read;
- Event ev_write;
Event ev_connect;
+ Event ev_socket;
/** does not need to wait if true */
bool ready_send;
@@ -95,8 +94,6 @@ class ConnPool {
void send_data(int, int);
void conn_server(int, int);
- /** Stop the worker and I/O events. */
- void stop();
/** Terminate the connection. */
void terminate();
@@ -125,10 +122,11 @@ class ConnPool {
protected:
/** Close the IO and clear all on-going or planned events. */
- virtual void on_close() {
- ev_read.clear();
- ev_write.clear();
+ virtual void stop() {
+ if (fd == -1) return;
ev_connect.clear();
+ ev_socket.clear();
+ send_buffer.get_queue().unreg_handler();
::close(fd);
fd = -1;
self_ref = nullptr; /* remove the self-cycle */
@@ -142,6 +140,13 @@ class ConnPool {
virtual void on_teardown() {}
};
+ protected:
+ EventContext ec;
+ EventContext disp_ec;
+ ThreadCall* disp_tcall;
+ /** Should be implemented by derived class to return a new Conn object. */
+ virtual Conn *create_conn() = 0;
+
private:
const int max_listen_backlog;
const double conn_server_timeout;
@@ -164,11 +169,11 @@ class ConnPool {
class Worker {
EventContext ec;
- ThreadCall msgr;
+ ThreadCall tcall;
std::thread handle;
public:
- Worker(): msgr(ec) {}
+ Worker(): tcall(ec) {}
/* the following functions are called by the dispatcher */
void start() {
@@ -176,7 +181,7 @@ class ConnPool {
}
void feed(const conn_t &conn, int client_fd) {
- msgr.call([this, conn, client_fd](ThreadCall::Handle &) {
+ tcall.call([this, conn, client_fd](ThreadCall::Handle &) {
SALTICIDAE_LOG_INFO("worker %x got %s",
std::this_thread::get_id(),
std::string(*conn).c_str());
@@ -185,11 +190,15 @@ class ConnPool {
.reg_handler(this->ec, [conn, client_fd]
(MPSCWriteBuffer::queue_t &) {
if (conn->ready_send && conn->fd != -1)
+ {
+ conn->ev_socket.del();
+ conn->ev_socket.add(Event::READ | Event::WRITE);
conn->send_data(client_fd, Event::WRITE);
+ }
return false;
});
//auto conn_ptr = conn.get();
- conn->ev_read = Event(ec, client_fd, Event::READ | Event::WRITE, [conn=conn](int fd, int what) {
+ conn->ev_socket = Event(ec, client_fd, [conn=conn](int fd, int what) {
if (what & Event::READ)
conn->recv_data(fd, what);
else
@@ -199,19 +208,18 @@ class ConnPool {
// std::bind(&Conn::recv_data, conn_ptr, _1, _2));
//conn->ev_write = Event(ec, client_fd, Event::WRITE,
// std::bind(&Conn::send_data, conn_ptr, _1, _2));
- conn->ev_read.add();
+ conn->ev_socket.add(Event::READ | Event::WRITE);
//conn->ev_write.add();
});
}
void stop() {
- msgr.call([this](ThreadCall::Handle &) {
- ec.stop();
- });
+ tcall.call([this](ThreadCall::Handle &) { ec.stop(); });
}
std::thread &get_handle() { return handle; }
const EventContext &get_ec() { return ec; }
+ ThreadCall *get_tcall() { return &tcall; }
};
/* related to workers */
@@ -220,7 +228,7 @@ class ConnPool {
void accept_client(int, int);
conn_t add_conn(const conn_t &conn);
- void terminate(int fd);
+ void remove_conn(int fd);
protected:
conn_t _connect(const NetAddr &addr);
@@ -244,43 +252,30 @@ class ConnPool {
return workers[1];
}
- protected:
- EventContext ec;
- EventContext dispatcher_ec;
- BoxObj<ThreadCall> disp_tcall;
- /** Should be implemented by derived class to return a new Conn object. */
- virtual Conn *create_conn() = 0;
-
public:
ConnPool(const EventContext &ec,
int max_listen_backlog = 10,
double conn_server_timeout = 2,
size_t seg_buff_size = 4096,
size_t nworker = 2):
+ ec(ec),
max_listen_backlog(max_listen_backlog),
conn_server_timeout(conn_server_timeout),
seg_buff_size(seg_buff_size),
listen_fd(-1),
- nworker(std::max((size_t)1, nworker)),
- ec(ec) {
+ nworker(std::max((size_t)1, nworker)) {
workers = new Worker[nworker];
- dispatcher_ec = workers[0].get_ec();
-
user_tcall = new ThreadCall(ec);
- disp_tcall = new ThreadCall(dispatcher_ec);
+ disp_ec = workers[0].get_ec();
+ disp_tcall = workers[0].get_tcall();
}
~ConnPool() {
- /* stop all workers */
- for (size_t i = 0; i < nworker; i++)
- workers[i].stop();
- /* join all worker threads */
- for (size_t i = 0; i < nworker; i++)
- workers[i].get_handle().join();
+ stop();
for (auto it: pool)
{
conn_t conn = it.second;
- conn->on_close();
+ conn->stop();
}
if (listen_fd != -1) close(listen_fd);
}
@@ -294,6 +289,17 @@ class ConnPool {
workers[i].start();
}
+ void stop() {
+ SALTICIDAE_LOG_INFO("stopping all threads...");
+ /* stop all workers */
+ for (size_t i = 0; i < nworker; i++)
+ workers[i].stop();
+ /* join all worker threads */
+ for (size_t i = 0; i < nworker; i++)
+ workers[i].get_handle().join();
+ nworker = 0;
+ }
+
/** Actively connect to remote addr. */
conn_t connect(const NetAddr &addr, bool blocking = true) {
if (blocking)
@@ -303,6 +309,9 @@ class ConnPool {
auto ptr = new conn_t(_connect(addr));
std::atomic_thread_fence(std::memory_order_release);
h.set_result(ptr);
+ h.set_deleter([](void *data) {
+ delete static_cast<conn_t *>(data);
+ });
}, true));
auto conn = *ret;
delete ret;
@@ -328,6 +337,14 @@ class ConnPool {
template<typename Func>
void reg_conn_handler(Func cb) { conn_cb = cb; }
+
+ void terminate(const conn_t &conn, bool blocking = true) {
+ int fd = conn->fd;
+ conn->worker->get_tcall()->call([conn](ThreadCall::Handle &) {
+ conn->stop();
+ }, blocking);
+ remove_conn(fd);
+ }
};
}
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index da27902..616f598 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -42,7 +42,8 @@ struct _event_context_deleter {
void operator()(uv_loop_t *ptr) {
if (ptr != nullptr)
{
- uv_loop_close(ptr);
+ while (uv_loop_close(ptr) == UV_EBUSY)
+ uv_run(ptr, UV_RUN_NOWAIT);
delete ptr;
}
}
@@ -62,8 +63,7 @@ class EventContext: public _event_context_ot {
EventContext &operator=(EventContext &&) = default;
void dispatch() const {
// TODO: improve this loop
- for (;;)
- uv_run(get(), UV_RUN_ONCE);
+ uv_run(get(), UV_RUN_DEFAULT);
}
void stop() const { uv_stop(get()); }
};
@@ -79,12 +79,15 @@ class Event {
private:
EventContext eb;
int fd;
- int events;
uv_poll_t *ev_fd;
uv_timer_t *ev_timer;
callback_t callback;
static inline void fd_then(uv_poll_t *h, int status, int events) {
- assert(status == 0);
+ if (status != 0)
+ {
+ SALTICIDAE_LOG_WARN("%s", uv_strerror(status));
+ return;
+ }
auto event = static_cast<Event *>(h->data);
event->callback(event->fd, events);
}
@@ -95,10 +98,14 @@ class Event {
event->callback(event->fd, TIMEOUT);
}
+ static void _on_handle_close(uv_handle_t *h) {
+ delete h;
+ }
+
public:
Event(): eb(nullptr), ev_fd(nullptr), ev_timer(nullptr) {}
- Event(const EventContext &eb, int fd, short events, callback_t callback):
- eb(eb), fd(fd), events(events),
+ Event(const EventContext &eb, int fd, callback_t callback):
+ eb(eb), fd(fd),
ev_fd(nullptr),
ev_timer(new uv_timer_t()),
callback(callback) {
@@ -114,7 +121,7 @@ class Event {
Event(const Event &) = delete;
Event(Event &&other):
- eb(std::move(other.eb)), fd(other.fd), events(other.events),
+ eb(std::move(other.eb)), fd(other.fd),
ev_fd(other.ev_fd), ev_timer(other.ev_timer),
callback(std::move(other.callback)) {
other.del();
@@ -132,7 +139,6 @@ class Event {
other.del();
eb = std::move(other.eb);
fd = other.fd;
- events = other.events;
ev_fd = other.ev_fd;
ev_timer = other.ev_timer;
callback = std::move(other.callback);
@@ -153,26 +159,33 @@ class Event {
if (ev_fd != nullptr)
{
uv_poll_stop(ev_fd);
- delete ev_fd;
+ uv_close((uv_handle_t *)ev_fd, Event::_on_handle_close);
ev_fd = nullptr;
}
if (ev_timer != nullptr)
{
uv_timer_stop(ev_timer);
- delete ev_timer;
+ uv_close((uv_handle_t *)ev_timer, Event::_on_handle_close);
ev_timer = nullptr;
}
+ callback = nullptr;
+ }
+
+ void set_callback(callback_t _callback) {
+ callback = _callback;
}
- void add() {
+ void add(int events) {
if (ev_fd) uv_poll_start(ev_fd, events, Event::fd_then);
}
void del() {
if (ev_fd) uv_poll_stop(ev_fd);
+ if (ev_timer == nullptr)
+ assert(ev_timer);
uv_timer_stop(ev_timer);
}
- void add_with_timeout(double t_sec) {
- add();
+ void add_with_timeout(double t_sec, int events) {
+ add(events);
uv_timer_start(ev_timer, Event::timer_then, uint64_t(t_sec * 1000), 0);
}
@@ -209,6 +222,7 @@ class ThreadCall {
public:
class Handle {
std::function<void(Handle &)> callback;
+ std::function<void(void *)> deleter;
ThreadNotifier* notifier;
void *result;
friend ThreadCall;
@@ -219,6 +233,8 @@ class ThreadCall {
if (notifier) notifier->notify(result);
}
void set_result(void *data) { result = data; }
+ template<typename Func>
+ void set_deleter(Func _deleter) { deleter = _deleter; }
};
ThreadCall() = default;
@@ -227,16 +243,24 @@ class ThreadCall {
ThreadCall(EventContext ec): ec(ec) {
if (pipe2(ctl_fd, O_NONBLOCK))
throw SalticidaeError(std::string("ThreadCall: failed to create pipe"));
- ev_listen = Event(ec, ctl_fd[0], Event::READ, [this](int fd, int) {
+ ev_listen = Event(ec, ctl_fd[0], [this](int fd, int) {
Handle *h;
read(fd, &h, sizeof(h));
h->exec();
delete h;
});
- ev_listen.add();
+ ev_listen.add(Event::READ);
}
~ThreadCall() {
+ ev_listen.clear();
+ Handle *h;
+ while (read(ctl_fd[0], &h, sizeof(h)) == sizeof(h))
+ {
+ if (h->result && h->deleter)
+ h->deleter(h->result);
+ delete h;
+ }
close(ctl_fd[0]);
close(ctl_fd[1]);
}
@@ -277,11 +301,14 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
wait_sig(true),
fd(eventfd(0, EFD_NONBLOCK)) {}
- ~MPSCQueueEventDriven() { close(fd); }
+ ~MPSCQueueEventDriven() {
+ ev.clear();
+ close(fd);
+ }
template<typename Func>
void reg_handler(const EventContext &ec, Func &&func) {
- ev = Event(ec, fd, Event::READ,
+ ev = Event(ec, fd,
[this, func=std::forward<Func>(func)](int, short) {
//fprintf(stderr, "%x\n", std::this_thread::get_id());
uint64_t t;
@@ -296,9 +323,11 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
if (func(*this))
write(fd, &dummy, 8);
});
- ev.add();
+ ev.add(Event::READ);
}
+ void unreg_handler() { ev.clear(); }
+
template<typename U>
bool enqueue(U &&e) {
static const uint64_t dummy = 1;
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index e5165bf..a63976b 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -275,9 +275,9 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
const NetAddr &get_peer() { return peer_id; }
protected:
- void on_close() override {
+ void stop() override {
ev_timeout.clear();
- MsgNet::Conn::on_close();
+ MsgNet::Conn::stop();
}
void on_setup() override;
@@ -302,7 +302,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
Peer(NetAddr addr, conn_t conn, const EventContext &ec):
addr(addr), conn(conn),
ev_ping_timer(
- Event(ec, -1, 0, std::bind(&Peer::ping_timer, this, _1, _2))),
+ Event(ec, -1, std::bind(&Peer::ping_timer, this, _1, _2))),
connected(false) {}
~Peer() {}
Peer &operator=(const Peer &) = delete;
@@ -358,6 +358,8 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
void _pong_msg_cb(const conn_t &conn, uint16_t port);
bool check_new_conn(const conn_t &conn, uint16_t port);
void start_active_conn(const NetAddr &paddr);
+ static void tcall_reset_timeout(ConnPool::Worker *worker,
+ const conn_t &conn, double timeout);
protected:
ConnPool::Conn *create_conn() override { return new Conn(); }
@@ -385,6 +387,8 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
this->reg_handler(generic_bind(&PeerNetwork::msg_pong, this, _1, _2));
}
+ ~PeerNetwork() { this->stop(); }
+
void add_peer(const NetAddr &paddr);
const conn_t get_peer_conn(const NetAddr &paddr) const;
using MsgNet::send_msg;
@@ -445,15 +449,28 @@ void MsgNetwork<OpcodeType>::send_msg(const MsgType &_msg, Conn &conn) {
#endif
}
+template<typename O, O _, O __>
+void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker,
+ const conn_t &conn, double timeout) {
+ worker->get_tcall()->call([conn, t=timeout](ThreadCall::Handle &) {
+ assert(conn->ev_timeout);
+ conn->ev_timeout.del();
+ conn->ev_timeout.add_with_timeout(t, 0);
+ SALTICIDAE_LOG_INFO("reset timeout %.2f", t);
+ });
+}
+
/* begin: functions invoked by the dispatcher */
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::Conn::on_setup() {
MsgNet::Conn::on_setup();
auto pn = get_net();
+ auto conn = static_pointer_cast<Conn>(this->self());
+ auto worker = this->worker;
assert(!ev_timeout);
- ev_timeout = Event(pn->dispatcher_ec, -1, 0, [this](int, int) {
+ ev_timeout = Event(worker->get_ec(), -1, [conn](int, int) {
SALTICIDAE_LOG_INFO("peer ping-pong timeout");
- this->terminate();
+ conn->terminate();
});
if (this->get_mode() == Conn::ConnMode::ACTIVE)
{
@@ -461,9 +478,8 @@ void PeerNetwork<O, _, __>::Conn::on_setup() {
if (pn->id_mode == IP_BASED) peer_id.port = 0;
}
/* the initial ping-pong to set up the connection */
- auto &conn = static_cast<Conn &>(*this);
- reset_timeout(pn->conn_timeout);
- pn->send_msg(MsgPing(pn->listen_port), conn);
+ tcall_reset_timeout(worker, conn, pn->conn_timeout);
+ pn->send_msg(MsgPing(pn->listen_port), *conn);
}
template<typename O, O _, O __>
@@ -481,11 +497,11 @@ void PeerNetwork<O, _, __>::Conn::on_teardown() {
std::string(*this).c_str(),
std::string(peer_id).c_str());
// try to reconnect
- p->ev_retry_timer = Event(pn->dispatcher_ec, -1, 0,
+ p->ev_retry_timer = Event(pn->disp_ec, -1,
[pn, peer_id = this->peer_id](int, int) {
pn->start_active_conn(peer_id);
});
- p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout());
+ p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout(), 0);
}
template<typename O, O _, O __>
@@ -497,7 +513,7 @@ void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) {
//SALTICIDAE_LOG_DEBUG("moving send buffer");
//new_conn->move_send_buffer(conn);
SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str());
- conn->terminate();
+ conn->cpool->terminate(conn);
}
addr = new_conn->get_addr();
conn = new_conn;
@@ -506,19 +522,11 @@ void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) {
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Conn::reset_timeout(double timeout) {
- assert(ev_timeout);
- ev_timeout.del();
- ev_timeout.add_with_timeout(timeout);
- SALTICIDAE_LOG_INFO("reset timeout %.2f", timeout);
-}
-
-template<typename O, O _, O __>
void PeerNetwork<O, _, __>::Peer::reset_ping_timer() {
assert(ev_ping_timer);
ev_ping_timer.del();
ev_ping_timer.add_with_timeout(
- gen_rand_timeout(conn->get_net()->ping_period));
+ gen_rand_timeout(conn->get_net()->ping_period), 0);
}
template<typename O, O _, O __>
@@ -526,7 +534,7 @@ void PeerNetwork<O, _, __>::Peer::send_ping() {
auto pn = conn->get_net();
ping_timer_ok = false;
pong_msg_ok = false;
- conn->reset_timeout(pn->conn_timeout);
+ tcall_reset_timeout(conn->worker, conn, pn->conn_timeout);
pn->send_msg(MsgPing(pn->listen_port), *conn);
}
@@ -554,7 +562,7 @@ bool PeerNetwork<O, _, __>::check_new_conn(const conn_t &conn, uint16_t port) {
{
if (conn != p->conn)
{
- conn->terminate();
+ conn->cpool->terminate(conn);
return true;
}
return false;
@@ -631,7 +639,7 @@ void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
auto it = id2peer.find(addr);
if (it != id2peer.end())
throw PeerNetworkError("peer already exists");
- id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->dispatcher_ec)));
+ id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->disp_ec)));
start_active_conn(addr);
}, true);
}
@@ -646,6 +654,9 @@ PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &paddr) const {
throw PeerNetworkError("peer does not exist");
auto ptr = new conn_t(it->second->conn);
h.set_result(ptr);
+ h.set_deleter([](void *data) {
+ delete static_cast<conn_t *>(data);
+ });
}));
auto conn = *ret;
delete ret;
@@ -657,6 +668,9 @@ bool PeerNetwork<O, _, __>::has_peer(const NetAddr &paddr) const {
auto ret = static_cast<bool *>(this->disp_tcall->call(
[this, paddr](ThreadCall::Handle &h) {
h.set_result(id2peer.count(paddr));
+ h.set_deleter([](void *data) {
+ delete static_cast<bool *>(data);
+ });
}));
auto has = *ret;
delete ret;
@@ -699,6 +713,9 @@ void ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr
throw PeerNetworkError("client does not exist");
auto ptr = new conn_t(it->second->conn);
h.set_result(ptr);
+ h.set_deleter([](void *data) {
+ delete static_cast<conn_t *>(data);
+ });
}));
send_msg(msg, **ret);
delete ret;