aboutsummaryrefslogtreecommitdiff
path: root/include
diff options
context:
space:
mode:
Diffstat (limited to 'include')
-rw-r--r--include/salticidae/conn.h6
-rw-r--r--include/salticidae/network.h34
2 files changed, 23 insertions, 17 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index e0bf009..44a1bf9 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -85,7 +85,9 @@ class ConnPool {
MPSCWriteBuffer send_buffer;
SegBuffer recv_buffer;
+ /* initialized and destroyed by the dispatcher */
TimedFdEvent ev_connect;
+ /* initialized and destroyed by the worker */
FdEvent ev_socket;
/** does not need to wait if true */
bool ready_send;
@@ -125,7 +127,7 @@ class ConnPool {
Conn(Conn &&other) = delete;
virtual ~Conn() {
- SALTICIDAE_LOG_INFO("destroyed %s", std::string(*this).c_str());
+ SALTICIDAE_LOG_DEBUG("destroyed %s", std::string(*this).c_str());
}
bool is_terminated() const {
@@ -311,7 +313,7 @@ class ConnPool {
}
assert(conn->fd != -1);
assert(conn->worker == this);
- SALTICIDAE_LOG_INFO("worker %x got %s",
+ SALTICIDAE_LOG_DEBUG("worker %x got %s",
std::this_thread::get_id(),
std::string(*conn).c_str());
nconn++;
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index c581aa4..1cfe150 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -79,6 +79,7 @@ class MsgNetwork: public ConnPool {
Msg msg;
MsgState msg_state;
bool msg_sleep;
+ /* initialized and destroyed by the worker */
TimerEvent ev_enqueue_poll;
protected:
@@ -88,6 +89,10 @@ class MsgNetwork: public ConnPool {
mutable std::atomic<size_t> nsentb;
mutable std::atomic<size_t> nrecvb;
#endif
+ void stop() override {
+ ev_enqueue_poll.clear();
+ ConnPool::Conn::stop();
+ }
public:
Conn(): msg_state(HEADER), msg_sleep(false)
@@ -135,24 +140,22 @@ class MsgNetwork: public ConnPool {
void on_setup(const ConnPool::conn_t &_conn) override {
auto conn = static_pointer_cast<Conn>(_conn);
- conn->ev_enqueue_poll = TimerEvent(conn->worker->get_ec(),
- [this, conn](TimerEvent &) {
- if (!incoming_msgs.enqueue(std::make_pair(conn->msg, conn), false))
- {
- conn->msg_sleep = true;
- conn->ev_enqueue_poll.add(0);
- return;
- }
- conn->msg_sleep = false;
- on_read(conn);
+ auto worker = conn->worker;
+ worker->get_tcall()->async_call([this, conn, worker](ThreadCall::Handle &) {
+ conn->ev_enqueue_poll = TimerEvent(worker->get_ec(),
+ [this, conn](TimerEvent &) {
+ if (!incoming_msgs.enqueue(std::make_pair(conn->msg, conn), false))
+ {
+ conn->msg_sleep = true;
+ conn->ev_enqueue_poll.add(0);
+ return;
+ }
+ conn->msg_sleep = false;
+ on_read(conn);
+ });
});
}
- void on_teardown(const ConnPool::conn_t &_conn) override {
- auto conn = static_pointer_cast<Conn>(_conn);
- conn->ev_enqueue_poll.clear();
- }
-
public:
class Config: public ConnPool::Config {
@@ -355,6 +358,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
class Conn: public MsgNet::Conn {
friend PeerNetwork;
Peer *peer;
+ /* initialized and destroyed by the worker */
TimerEvent ev_timeout;
void reset_timeout(double timeout);