aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/salticidae/conn.h91
-rw-r--r--include/salticidae/event.h67
-rw-r--r--include/salticidae/network.h63
-rw-r--r--src/conn.cpp37
-rw-r--r--test/bench_network.cpp12
-rw-r--r--test/test_network.cpp25
-rw-r--r--test/test_p2p.cpp25
-rw-r--r--test/test_queue.cpp5
8 files changed, 201 insertions, 124 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 2ef6b50..73b3022 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -57,13 +57,13 @@ struct ConnPoolError: public SalticidaeError {
/** Abstraction for connection management. */
class ConnPool {
+ class Worker;
public:
class Conn;
/** The handle to a bi-directional connection. */
using conn_t = ArcObj<Conn>;
/** The type of callback invoked when connection status is changed. */
using conn_callback_t = std::function<void(Conn &, bool)>;
-
/** Abstraction for a bi-directional connection. */
class Conn {
friend ConnPool;
@@ -77,17 +77,16 @@ class ConnPool {
size_t seg_buff_size;
conn_t self_ref;
int fd;
+ Worker *worker;
ConnPool *cpool;
ConnMode mode;
NetAddr addr;
- // TODO: send_buffer should be a thread-safe mpsc queue
MPSCWriteBuffer send_buffer;
SegBuffer recv_buffer;
- Event ev_read;
- Event ev_write;
Event ev_connect;
+ Event ev_socket;
/** does not need to wait if true */
bool ready_send;
@@ -95,8 +94,6 @@ class ConnPool {
void send_data(int, int);
void conn_server(int, int);
- /** Stop the worker and I/O events. */
- void stop();
/** Terminate the connection. */
void terminate();
@@ -125,10 +122,11 @@ class ConnPool {
protected:
/** Close the IO and clear all on-going or planned events. */
- virtual void on_close() {
- ev_read.clear();
- ev_write.clear();
+ virtual void stop() {
+ if (fd == -1) return;
ev_connect.clear();
+ ev_socket.clear();
+ send_buffer.get_queue().unreg_handler();
::close(fd);
fd = -1;
self_ref = nullptr; /* remove the self-cycle */
@@ -142,6 +140,13 @@ class ConnPool {
virtual void on_teardown() {}
};
+ protected:
+ EventContext ec;
+ EventContext disp_ec;
+ ThreadCall* disp_tcall;
+ /** Should be implemented by derived class to return a new Conn object. */
+ virtual Conn *create_conn() = 0;
+
private:
const int max_listen_backlog;
const double conn_server_timeout;
@@ -164,11 +169,11 @@ class ConnPool {
class Worker {
EventContext ec;
- ThreadCall msgr;
+ ThreadCall tcall;
std::thread handle;
public:
- Worker(): msgr(ec) {}
+ Worker(): tcall(ec) {}
/* the following functions are called by the dispatcher */
void start() {
@@ -176,7 +181,7 @@ class ConnPool {
}
void feed(const conn_t &conn, int client_fd) {
- msgr.call([this, conn, client_fd](ThreadCall::Handle &) {
+ tcall.call([this, conn, client_fd](ThreadCall::Handle &) {
SALTICIDAE_LOG_INFO("worker %x got %s",
std::this_thread::get_id(),
std::string(*conn).c_str());
@@ -185,11 +190,15 @@ class ConnPool {
.reg_handler(this->ec, [conn, client_fd]
(MPSCWriteBuffer::queue_t &) {
if (conn->ready_send && conn->fd != -1)
+ {
+ conn->ev_socket.del();
+ conn->ev_socket.add(Event::READ | Event::WRITE);
conn->send_data(client_fd, Event::WRITE);
+ }
return false;
});
//auto conn_ptr = conn.get();
- conn->ev_read = Event(ec, client_fd, Event::READ | Event::WRITE, [conn=conn](int fd, int what) {
+ conn->ev_socket = Event(ec, client_fd, [conn=conn](int fd, int what) {
if (what & Event::READ)
conn->recv_data(fd, what);
else
@@ -199,19 +208,18 @@ class ConnPool {
// std::bind(&Conn::recv_data, conn_ptr, _1, _2));
//conn->ev_write = Event(ec, client_fd, Event::WRITE,
// std::bind(&Conn::send_data, conn_ptr, _1, _2));
- conn->ev_read.add();
+ conn->ev_socket.add(Event::READ | Event::WRITE);
//conn->ev_write.add();
});
}
void stop() {
- msgr.call([this](ThreadCall::Handle &) {
- ec.stop();
- });
+ tcall.call([this](ThreadCall::Handle &) { ec.stop(); });
}
std::thread &get_handle() { return handle; }
const EventContext &get_ec() { return ec; }
+ ThreadCall *get_tcall() { return &tcall; }
};
/* related to workers */
@@ -220,7 +228,7 @@ class ConnPool {
void accept_client(int, int);
conn_t add_conn(const conn_t &conn);
- void terminate(int fd);
+ void remove_conn(int fd);
protected:
conn_t _connect(const NetAddr &addr);
@@ -244,43 +252,30 @@ class ConnPool {
return workers[1];
}
- protected:
- EventContext ec;
- EventContext dispatcher_ec;
- BoxObj<ThreadCall> disp_tcall;
- /** Should be implemented by derived class to return a new Conn object. */
- virtual Conn *create_conn() = 0;
-
public:
ConnPool(const EventContext &ec,
int max_listen_backlog = 10,
double conn_server_timeout = 2,
size_t seg_buff_size = 4096,
size_t nworker = 2):
+ ec(ec),
max_listen_backlog(max_listen_backlog),
conn_server_timeout(conn_server_timeout),
seg_buff_size(seg_buff_size),
listen_fd(-1),
- nworker(std::max((size_t)1, nworker)),
- ec(ec) {
+ nworker(std::max((size_t)1, nworker)) {
workers = new Worker[nworker];
- dispatcher_ec = workers[0].get_ec();
-
user_tcall = new ThreadCall(ec);
- disp_tcall = new ThreadCall(dispatcher_ec);
+ disp_ec = workers[0].get_ec();
+ disp_tcall = workers[0].get_tcall();
}
~ConnPool() {
- /* stop all workers */
- for (size_t i = 0; i < nworker; i++)
- workers[i].stop();
- /* join all worker threads */
- for (size_t i = 0; i < nworker; i++)
- workers[i].get_handle().join();
+ stop();
for (auto it: pool)
{
conn_t conn = it.second;
- conn->on_close();
+ conn->stop();
}
if (listen_fd != -1) close(listen_fd);
}
@@ -294,6 +289,17 @@ class ConnPool {
workers[i].start();
}
+ void stop() {
+ SALTICIDAE_LOG_INFO("stopping all threads...");
+ /* stop all workers */
+ for (size_t i = 0; i < nworker; i++)
+ workers[i].stop();
+ /* join all worker threads */
+ for (size_t i = 0; i < nworker; i++)
+ workers[i].get_handle().join();
+ nworker = 0;
+ }
+
/** Actively connect to remote addr. */
conn_t connect(const NetAddr &addr, bool blocking = true) {
if (blocking)
@@ -303,6 +309,9 @@ class ConnPool {
auto ptr = new conn_t(_connect(addr));
std::atomic_thread_fence(std::memory_order_release);
h.set_result(ptr);
+ h.set_deleter([](void *data) {
+ delete static_cast<conn_t *>(data);
+ });
}, true));
auto conn = *ret;
delete ret;
@@ -328,6 +337,14 @@ class ConnPool {
template<typename Func>
void reg_conn_handler(Func cb) { conn_cb = cb; }
+
+ void terminate(const conn_t &conn, bool blocking = true) {
+ int fd = conn->fd;
+ conn->worker->get_tcall()->call([conn](ThreadCall::Handle &) {
+ conn->stop();
+ }, blocking);
+ remove_conn(fd);
+ }
};
}
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index da27902..616f598 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -42,7 +42,8 @@ struct _event_context_deleter {
void operator()(uv_loop_t *ptr) {
if (ptr != nullptr)
{
- uv_loop_close(ptr);
+ while (uv_loop_close(ptr) == UV_EBUSY)
+ uv_run(ptr, UV_RUN_NOWAIT);
delete ptr;
}
}
@@ -62,8 +63,7 @@ class EventContext: public _event_context_ot {
EventContext &operator=(EventContext &&) = default;
void dispatch() const {
// TODO: improve this loop
- for (;;)
- uv_run(get(), UV_RUN_ONCE);
+ uv_run(get(), UV_RUN_DEFAULT);
}
void stop() const { uv_stop(get()); }
};
@@ -79,12 +79,15 @@ class Event {
private:
EventContext eb;
int fd;
- int events;
uv_poll_t *ev_fd;
uv_timer_t *ev_timer;
callback_t callback;
static inline void fd_then(uv_poll_t *h, int status, int events) {
- assert(status == 0);
+ if (status != 0)
+ {
+ SALTICIDAE_LOG_WARN("%s", uv_strerror(status));
+ return;
+ }
auto event = static_cast<Event *>(h->data);
event->callback(event->fd, events);
}
@@ -95,10 +98,14 @@ class Event {
event->callback(event->fd, TIMEOUT);
}
+ static void _on_handle_close(uv_handle_t *h) {
+ delete h;
+ }
+
public:
Event(): eb(nullptr), ev_fd(nullptr), ev_timer(nullptr) {}
- Event(const EventContext &eb, int fd, short events, callback_t callback):
- eb(eb), fd(fd), events(events),
+ Event(const EventContext &eb, int fd, callback_t callback):
+ eb(eb), fd(fd),
ev_fd(nullptr),
ev_timer(new uv_timer_t()),
callback(callback) {
@@ -114,7 +121,7 @@ class Event {
Event(const Event &) = delete;
Event(Event &&other):
- eb(std::move(other.eb)), fd(other.fd), events(other.events),
+ eb(std::move(other.eb)), fd(other.fd),
ev_fd(other.ev_fd), ev_timer(other.ev_timer),
callback(std::move(other.callback)) {
other.del();
@@ -132,7 +139,6 @@ class Event {
other.del();
eb = std::move(other.eb);
fd = other.fd;
- events = other.events;
ev_fd = other.ev_fd;
ev_timer = other.ev_timer;
callback = std::move(other.callback);
@@ -153,26 +159,33 @@ class Event {
if (ev_fd != nullptr)
{
uv_poll_stop(ev_fd);
- delete ev_fd;
+ uv_close((uv_handle_t *)ev_fd, Event::_on_handle_close);
ev_fd = nullptr;
}
if (ev_timer != nullptr)
{
uv_timer_stop(ev_timer);
- delete ev_timer;
+ uv_close((uv_handle_t *)ev_timer, Event::_on_handle_close);
ev_timer = nullptr;
}
+ callback = nullptr;
+ }
+
+ void set_callback(callback_t _callback) {
+ callback = _callback;
}
- void add() {
+ void add(int events) {
if (ev_fd) uv_poll_start(ev_fd, events, Event::fd_then);
}
void del() {
if (ev_fd) uv_poll_stop(ev_fd);
+ if (ev_timer == nullptr)
+ assert(ev_timer);
uv_timer_stop(ev_timer);
}
- void add_with_timeout(double t_sec) {
- add();
+ void add_with_timeout(double t_sec, int events) {
+ add(events);
uv_timer_start(ev_timer, Event::timer_then, uint64_t(t_sec * 1000), 0);
}
@@ -209,6 +222,7 @@ class ThreadCall {
public:
class Handle {
std::function<void(Handle &)> callback;
+ std::function<void(void *)> deleter;
ThreadNotifier* notifier;
void *result;
friend ThreadCall;
@@ -219,6 +233,8 @@ class ThreadCall {
if (notifier) notifier->notify(result);
}
void set_result(void *data) { result = data; }
+ template<typename Func>
+ void set_deleter(Func _deleter) { deleter = _deleter; }
};
ThreadCall() = default;
@@ -227,16 +243,24 @@ class ThreadCall {
ThreadCall(EventContext ec): ec(ec) {
if (pipe2(ctl_fd, O_NONBLOCK))
throw SalticidaeError(std::string("ThreadCall: failed to create pipe"));
- ev_listen = Event(ec, ctl_fd[0], Event::READ, [this](int fd, int) {
+ ev_listen = Event(ec, ctl_fd[0], [this](int fd, int) {
Handle *h;
read(fd, &h, sizeof(h));
h->exec();
delete h;
});
- ev_listen.add();
+ ev_listen.add(Event::READ);
}
~ThreadCall() {
+ ev_listen.clear();
+ Handle *h;
+ while (read(ctl_fd[0], &h, sizeof(h)) == sizeof(h))
+ {
+ if (h->result && h->deleter)
+ h->deleter(h->result);
+ delete h;
+ }
close(ctl_fd[0]);
close(ctl_fd[1]);
}
@@ -277,11 +301,14 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
wait_sig(true),
fd(eventfd(0, EFD_NONBLOCK)) {}
- ~MPSCQueueEventDriven() { close(fd); }
+ ~MPSCQueueEventDriven() {
+ ev.clear();
+ close(fd);
+ }
template<typename Func>
void reg_handler(const EventContext &ec, Func &&func) {
- ev = Event(ec, fd, Event::READ,
+ ev = Event(ec, fd,
[this, func=std::forward<Func>(func)](int, short) {
//fprintf(stderr, "%x\n", std::this_thread::get_id());
uint64_t t;
@@ -296,9 +323,11 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
if (func(*this))
write(fd, &dummy, 8);
});
- ev.add();
+ ev.add(Event::READ);
}
+ void unreg_handler() { ev.clear(); }
+
template<typename U>
bool enqueue(U &&e) {
static const uint64_t dummy = 1;
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index e5165bf..a63976b 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -275,9 +275,9 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
const NetAddr &get_peer() { return peer_id; }
protected:
- void on_close() override {
+ void stop() override {
ev_timeout.clear();
- MsgNet::Conn::on_close();
+ MsgNet::Conn::stop();
}
void on_setup() override;
@@ -302,7 +302,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
Peer(NetAddr addr, conn_t conn, const EventContext &ec):
addr(addr), conn(conn),
ev_ping_timer(
- Event(ec, -1, 0, std::bind(&Peer::ping_timer, this, _1, _2))),
+ Event(ec, -1, std::bind(&Peer::ping_timer, this, _1, _2))),
connected(false) {}
~Peer() {}
Peer &operator=(const Peer &) = delete;
@@ -358,6 +358,8 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
void _pong_msg_cb(const conn_t &conn, uint16_t port);
bool check_new_conn(const conn_t &conn, uint16_t port);
void start_active_conn(const NetAddr &paddr);
+ static void tcall_reset_timeout(ConnPool::Worker *worker,
+ const conn_t &conn, double timeout);
protected:
ConnPool::Conn *create_conn() override { return new Conn(); }
@@ -385,6 +387,8 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
this->reg_handler(generic_bind(&PeerNetwork::msg_pong, this, _1, _2));
}
+ ~PeerNetwork() { this->stop(); }
+
void add_peer(const NetAddr &paddr);
const conn_t get_peer_conn(const NetAddr &paddr) const;
using MsgNet::send_msg;
@@ -445,15 +449,28 @@ void MsgNetwork<OpcodeType>::send_msg(const MsgType &_msg, Conn &conn) {
#endif
}
+template<typename O, O _, O __>
+void PeerNetwork<O, _, __>::tcall_reset_timeout(ConnPool::Worker *worker,
+ const conn_t &conn, double timeout) {
+ worker->get_tcall()->call([conn, t=timeout](ThreadCall::Handle &) {
+ assert(conn->ev_timeout);
+ conn->ev_timeout.del();
+ conn->ev_timeout.add_with_timeout(t, 0);
+ SALTICIDAE_LOG_INFO("reset timeout %.2f", t);
+ });
+}
+
/* begin: functions invoked by the dispatcher */
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::Conn::on_setup() {
MsgNet::Conn::on_setup();
auto pn = get_net();
+ auto conn = static_pointer_cast<Conn>(this->self());
+ auto worker = this->worker;
assert(!ev_timeout);
- ev_timeout = Event(pn->dispatcher_ec, -1, 0, [this](int, int) {
+ ev_timeout = Event(worker->get_ec(), -1, [conn](int, int) {
SALTICIDAE_LOG_INFO("peer ping-pong timeout");
- this->terminate();
+ conn->terminate();
});
if (this->get_mode() == Conn::ConnMode::ACTIVE)
{
@@ -461,9 +478,8 @@ void PeerNetwork<O, _, __>::Conn::on_setup() {
if (pn->id_mode == IP_BASED) peer_id.port = 0;
}
/* the initial ping-pong to set up the connection */
- auto &conn = static_cast<Conn &>(*this);
- reset_timeout(pn->conn_timeout);
- pn->send_msg(MsgPing(pn->listen_port), conn);
+ tcall_reset_timeout(worker, conn, pn->conn_timeout);
+ pn->send_msg(MsgPing(pn->listen_port), *conn);
}
template<typename O, O _, O __>
@@ -481,11 +497,11 @@ void PeerNetwork<O, _, __>::Conn::on_teardown() {
std::string(*this).c_str(),
std::string(peer_id).c_str());
// try to reconnect
- p->ev_retry_timer = Event(pn->dispatcher_ec, -1, 0,
+ p->ev_retry_timer = Event(pn->disp_ec, -1,
[pn, peer_id = this->peer_id](int, int) {
pn->start_active_conn(peer_id);
});
- p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout());
+ p->ev_retry_timer.add_with_timeout(pn->gen_conn_timeout(), 0);
}
template<typename O, O _, O __>
@@ -497,7 +513,7 @@ void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) {
//SALTICIDAE_LOG_DEBUG("moving send buffer");
//new_conn->move_send_buffer(conn);
SALTICIDAE_LOG_INFO("terminating old connection %s", std::string(*conn).c_str());
- conn->terminate();
+ conn->cpool->terminate(conn);
}
addr = new_conn->get_addr();
conn = new_conn;
@@ -506,19 +522,11 @@ void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) {
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::Conn::reset_timeout(double timeout) {
- assert(ev_timeout);
- ev_timeout.del();
- ev_timeout.add_with_timeout(timeout);
- SALTICIDAE_LOG_INFO("reset timeout %.2f", timeout);
-}
-
-template<typename O, O _, O __>
void PeerNetwork<O, _, __>::Peer::reset_ping_timer() {
assert(ev_ping_timer);
ev_ping_timer.del();
ev_ping_timer.add_with_timeout(
- gen_rand_timeout(conn->get_net()->ping_period));
+ gen_rand_timeout(conn->get_net()->ping_period), 0);
}
template<typename O, O _, O __>
@@ -526,7 +534,7 @@ void PeerNetwork<O, _, __>::Peer::send_ping() {
auto pn = conn->get_net();
ping_timer_ok = false;
pong_msg_ok = false;
- conn->reset_timeout(pn->conn_timeout);
+ tcall_reset_timeout(conn->worker, conn, pn->conn_timeout);
pn->send_msg(MsgPing(pn->listen_port), *conn);
}
@@ -554,7 +562,7 @@ bool PeerNetwork<O, _, __>::check_new_conn(const conn_t &conn, uint16_t port) {
{
if (conn != p->conn)
{
- conn->terminate();
+ conn->cpool->terminate(conn);
return true;
}
return false;
@@ -631,7 +639,7 @@ void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
auto it = id2peer.find(addr);
if (it != id2peer.end())
throw PeerNetworkError("peer already exists");
- id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->dispatcher_ec)));
+ id2peer.insert(std::make_pair(addr, new Peer(addr, nullptr, this->disp_ec)));
start_active_conn(addr);
}, true);
}
@@ -646,6 +654,9 @@ PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &paddr) const {
throw PeerNetworkError("peer does not exist");
auto ptr = new conn_t(it->second->conn);
h.set_result(ptr);
+ h.set_deleter([](void *data) {
+ delete static_cast<conn_t *>(data);
+ });
}));
auto conn = *ret;
delete ret;
@@ -657,6 +668,9 @@ bool PeerNetwork<O, _, __>::has_peer(const NetAddr &paddr) const {
auto ret = static_cast<bool *>(this->disp_tcall->call(
[this, paddr](ThreadCall::Handle &h) {
h.set_result(id2peer.count(paddr));
+ h.set_deleter([](void *data) {
+ delete static_cast<bool *>(data);
+ });
}));
auto has = *ret;
delete ret;
@@ -699,6 +713,9 @@ void ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr
throw PeerNetworkError("client does not exist");
auto ptr = new conn_t(it->second->conn);
h.set_result(ptr);
+ h.set_deleter([](void *data) {
+ delete static_cast<conn_t *>(data);
+ });
}));
send_msg(msg, **ret);
delete ret;
diff --git a/src/conn.cpp b/src/conn.cpp
index 6b2e3aa..da8086c 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -81,6 +81,8 @@ void ConnPool::Conn::send_data(int fd, int events) {
return;
}
}
+ ev_socket.del();
+ ev_socket.add(Event::READ);
/* consumed the buffer but endpoint still seems to be writable */
ready_send = true;
}
@@ -116,20 +118,14 @@ void ConnPool::Conn::recv_data(int fd, int events) {
on_read();
}
-void ConnPool::Conn::stop() {
- ev_read.clear();
- ev_write.clear();
-}
-
void ConnPool::Conn::terminate() {
stop();
cpool->disp_tcall->call(
[cpool=this->cpool, fd=this->fd](ThreadCall::Handle &) {
- cpool->terminate(fd);
+ cpool->remove_conn(fd);
});
}
-
void ConnPool::accept_client(int fd, int) {
int client_fd;
struct sockaddr client_addr;
@@ -154,9 +150,11 @@ void ConnPool::accept_client(int fd, int) {
conn->addr = addr;
add_conn(conn);
SALTICIDAE_LOG_INFO("accepted %s", std::string(*conn).c_str());
+ auto &worker = select_worker();
+ conn->worker = &worker;
conn->on_setup();
update_conn(conn, true);
- select_worker().feed(conn, client_fd);
+ worker.feed(conn, client_fd);
}
}
@@ -166,15 +164,16 @@ void ConnPool::Conn::conn_server(int fd, int events) {
{
ev_connect.clear();
SALTICIDAE_LOG_INFO("connected to remote %s", std::string(*this).c_str());
+ worker = &(cpool->select_worker());
on_setup();
cpool->update_conn(conn, true);
- cpool->select_worker().feed(conn, fd);
+ worker->feed(conn, fd);
}
else
{
if (events & Event::TIMEOUT)
SALTICIDAE_LOG_INFO("%s connect timeout", std::string(*this).c_str());
- terminate();
+ stop();
return;
}
}
@@ -204,9 +203,9 @@ void ConnPool::_listen(NetAddr listen_addr) {
throw ConnPoolError(std::string("binding error"));
if (::listen(listen_fd, max_listen_backlog) < 0)
throw ConnPoolError(std::string("listen error"));
- ev_listen = Event(dispatcher_ec, listen_fd, Event::READ,
+ ev_listen = Event(disp_ec, listen_fd,
std::bind(&ConnPool::accept_client, this, _1, _2));
- ev_listen.add();
+ ev_listen.add(Event::READ);
SALTICIDAE_LOG_INFO("listening to %u", ntohs(listen_addr.port));
}
@@ -238,28 +237,26 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) {
sizeof(struct sockaddr_in)) < 0 && errno != EINPROGRESS)
{
SALTICIDAE_LOG_INFO("cannot connect to %s", std::string(addr).c_str());
- conn->terminate();
+ conn->stop();
}
else
{
- conn->ev_connect = Event(dispatcher_ec, conn->fd, Event::WRITE,
- std::bind(&Conn::conn_server, conn.get(), _1, _2));
- conn->ev_connect.add_with_timeout(conn_server_timeout);
+ conn->ev_connect = Event(disp_ec, conn->fd, std::bind(&Conn::conn_server, conn.get(), _1, _2));
+ conn->ev_connect.add_with_timeout(conn_server_timeout, Event::WRITE);
add_conn(conn);
SALTICIDAE_LOG_INFO("created %s", std::string(*conn).c_str());
}
return conn;
}
-void ConnPool::terminate(int fd) {
+void ConnPool::remove_conn(int fd) {
auto it = pool.find(fd);
if (it != pool.end())
{
/* temporarily pin the conn before it dies */
auto conn = it->second;
- assert(conn->fd == fd);
+ //assert(conn->fd == fd);
pool.erase(it);
- conn->on_close();
/* inform the upper layer the connection will be destroyed */
conn->on_teardown();
update_conn(conn, false);
@@ -267,7 +264,7 @@ void ConnPool::terminate(int fd) {
}
ConnPool::conn_t ConnPool::add_conn(const conn_t &conn) {
- assert(pool.find(conn->fd) == pool.end());
+ //assert(pool.find(conn->fd) == pool.end());
return pool.insert(std::make_pair(conn->fd, conn)).first->second;
}
diff --git a/test/bench_network.cpp b/test/bench_network.cpp
index 8ff9ab2..40ba17a 100644
--- a/test/bench_network.cpp
+++ b/test/bench_network.cpp
@@ -82,17 +82,17 @@ struct MyNet: public MsgNetworkByteOp {
MsgNetwork<opcode_t>(ec, 10, 1.0, 4096),
name(name),
peer(peer),
- ev_period_stat(ec, -1, 0, [this, stat_timeout](int, short) {
+ ev_period_stat(ec, -1, [this, stat_timeout](int, short) {
printf("%.2f mps\n", nrecv / (double)stat_timeout);
nrecv = 0;
- ev_period_stat.add_with_timeout(stat_timeout);
+ ev_period_stat.add_with_timeout(stat_timeout, 0);
}),
nrecv(0) {
/* message handler could be a bound method */
reg_handler(salticidae::generic_bind(
&MyNet::on_receive_bytes, this, _1, _2));
if (stat_timeout > 0)
- ev_period_stat.add_with_timeout(0);
+ ev_period_stat.add_with_timeout(0, 0);
}
struct Conn: public MsgNetworkByteOp::Conn {
@@ -109,12 +109,12 @@ struct MyNet: public MsgNetworkByteOp {
printf("[%s] Connected, sending hello.\n",
net->name.c_str());
/* send the first message through this connection */
- net->ev_period_send = Event(net->ec, -1, 0,
+ net->ev_period_send = Event(net->ec, -1,
[net, conn = self()](int, short) {
net->send_msg(MsgBytes(256), *conn);
- net->ev_period_send.add_with_timeout(0);
+ net->ev_period_send.add_with_timeout(0, 0);
});
- net->ev_period_send.add_with_timeout(0);
+ net->ev_period_send.add_with_timeout(0, 0);
}
else
diff --git a/test/test_network.cpp b/test/test_network.cpp
index d93d0ff..d52b6c2 100644
--- a/test/test_network.cpp
+++ b/test/test_network.cpp
@@ -132,7 +132,14 @@ salticidae::EventContext ec;
NetAddr alice_addr("127.0.0.1:12345");
NetAddr bob_addr("127.0.0.1:12346");
+void signal_handler(int) {
+ throw salticidae::SalticidaeError("got termination signal");
+}
+
int main() {
+ signal(SIGTERM, signal_handler);
+ signal(SIGINT, signal_handler);
+
/* test two nodes */
MyNet alice(ec, "Alice", bob_addr);
MyNet bob(ec, "Bob", alice_addr);
@@ -141,16 +148,18 @@ int main() {
alice.reg_handler(on_receive_ack);
bob.reg_handler(on_receive_ack);
- alice.start();
- bob.start();
+ try {
+ alice.start();
+ bob.start();
- alice.listen(alice_addr);
- bob.listen(bob_addr);
+ alice.listen(alice_addr);
+ bob.listen(bob_addr);
- /* first attempt */
- alice.connect(bob_addr);
- bob.connect(alice_addr);
+ /* first attempt */
+ alice.connect(bob_addr);
+ bob.connect(alice_addr);
- ec.dispatch();
+ ec.dispatch();
+ } catch (salticidae::SalticidaeError &e) {}
return 0;
}
diff --git a/test/test_p2p.cpp b/test/test_p2p.cpp
index f52f48f..558be5c 100644
--- a/test/test_p2p.cpp
+++ b/test/test_p2p.cpp
@@ -130,7 +130,14 @@ salticidae::EventContext ec;
NetAddr alice_addr("127.0.0.1:12345");
NetAddr bob_addr("127.0.0.1:12346");
+void signal_handler(int) {
+ throw salticidae::SalticidaeError("got termination signal");
+}
+
int main() {
+ signal(SIGTERM, signal_handler);
+ signal(SIGINT, signal_handler);
+
/* test two nodes */
MyNet alice(ec, "Alice", bob_addr);
MyNet bob(ec, "Bob", alice_addr);
@@ -139,16 +146,18 @@ int main() {
alice.reg_handler(on_receive_ack);
bob.reg_handler(on_receive_ack);
- alice.start();
- bob.start();
+ try {
+ alice.start();
+ bob.start();
- alice.listen(alice_addr);
- bob.listen(bob_addr);
+ alice.listen(alice_addr);
+ bob.listen(bob_addr);
- /* first attempt */
- alice.add_peer(bob_addr);
- bob.add_peer(alice_addr);
+ /* first attempt */
+ alice.add_peer(bob_addr);
+ bob.add_peer(alice_addr);
- ec.dispatch();
+ ec.dispatch();
+ } catch (salticidae::SalticidaeError &e) {}
return 0;
}
diff --git a/test/test_queue.cpp b/test/test_queue.cpp
index a2444d3..5a7b548 100644
--- a/test/test_queue.cpp
+++ b/test/test_queue.cpp
@@ -23,11 +23,10 @@ void test_mpsc(int nproducers = 16, int nops = 100000, size_t burst_size = 128)
});
std::vector<std::thread> producers;
std::thread consumer([&collected, total, &ec]() {
- salticidae::Event timer(ec, -1, EV_TIMEOUT | EV_PERSIST,
- [&ec, &collected, total](int, short) {
+ salticidae::Event timer(ec, -1, [&ec, &collected, total](int, short) {
if (collected.load() == total) ec.stop();
});
- timer.add_with_timeout(1);
+ timer.add_with_timeout(1, EV_TIMEOUT | EV_PERSIST);
ec.dispatch();
});
for (int i = 0; i < nproducers; i++)