diff options
-rw-r--r-- | include/salticidae/conn.h | 21 | ||||
-rw-r--r-- | include/salticidae/crypto.h | 4 | ||||
-rw-r--r-- | include/salticidae/network.h | 236 | ||||
-rw-r--r-- | src/conn.cpp | 44 | ||||
-rw-r--r-- | src/network.cpp | 8 | ||||
-rw-r--r-- | test/test_p2p_stress.cpp | 16 |
6 files changed, 204 insertions, 125 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index b43d3c2..d08ef91 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -74,7 +74,7 @@ class ConnPool { protected: std::atomic<bool> terminated; - size_t seg_buff_size; + size_t recv_chunk_size; size_t max_recv_buff_size; int fd; Worker *worker; @@ -112,6 +112,13 @@ class ConnPool { public: Conn(): terminated(false), worker(nullptr), + // recv_chunk_size initialized later + // max_recv_buff_size initialized later + // fd initialized later + // worker initialized later + // cpool initialized later + // mode initialized later + // addr initialized later ready_send(false), ready_recv(false), send_data_func(nullptr), recv_data_func(nullptr), tls(nullptr), peer_cert(nullptr) {} @@ -183,7 +190,7 @@ class ConnPool { private: const int max_listen_backlog; const double conn_server_timeout; - const size_t seg_buff_size; + const size_t recv_chunk_size; const size_t max_recv_buff_size; const size_t max_send_buff_size; tls_context_t tls_ctx; @@ -361,7 +368,7 @@ class ConnPool { friend class ConnPool; int _max_listen_backlog; double _conn_server_timeout; - size_t _seg_buff_size; + size_t _recv_chunk_size; size_t _max_recv_buff_size; size_t _max_send_buff_size; size_t _nworker; @@ -377,7 +384,7 @@ class ConnPool { Config(): _max_listen_backlog(10), _conn_server_timeout(2), - _seg_buff_size(4096), + _recv_chunk_size(4096), _max_recv_buff_size(4096), _max_send_buff_size(0), _nworker(1), @@ -399,8 +406,8 @@ class ConnPool { return *this; } - Config &seg_buff_size(size_t x) { - _seg_buff_size = x; + Config &recv_chunk_size(size_t x) { + _recv_chunk_size = x; return *this; } @@ -461,7 +468,7 @@ class ConnPool { async_id(0), max_listen_backlog(config._max_listen_backlog), conn_server_timeout(config._conn_server_timeout), - seg_buff_size(config._seg_buff_size), + recv_chunk_size(config._recv_chunk_size), max_recv_buff_size(config._max_recv_buff_size), max_send_buff_size(config._max_send_buff_size), tls_ctx(nullptr), diff --git a/include/salticidae/crypto.h b/include/salticidae/crypto.h index 65acc2d..d0553fb 100644 --- a/include/salticidae/crypto.h +++ b/include/salticidae/crypto.h @@ -376,7 +376,9 @@ class TLS { TLS(const TLS &) = delete; TLS(TLS &&other): ssl(other.ssl) { other.ssl = nullptr; } - bool do_handshake(int &want_io_type) { /* 0 for read, 1 for write */ + bool do_handshake(int &want_io_type) { + /* want_io_type: 0 for read, 1 for write */ + /* return true if handshake is completed */ auto ret = SSL_do_handshake(ssl); if (ret == 1) return true; auto err = SSL_get_error(ssl, ret); diff --git a/include/salticidae/network.h b/include/salticidae/network.h index dd05ed3..152ca60 100644 --- a/include/salticidae/network.h +++ b/include/salticidae/network.h @@ -345,12 +345,26 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { using peer_callback_t = std::function<void(const conn_t &peer_conn, bool connected)>; using unknown_peer_callback_t = std::function<void(const NetAddr &claimed_addr, const X509 *cert)>; + struct PeerId: public uint256_t { + PeerId(const NetAddr &addr) { + DataStream tmp; + tmp << addr; + *this = tmp.get_hash(); + } + + PeerId(const X509 &cert) { + DataStream tmp; + tmp << cert.get_der(); + *this = tmp.get_hash(); + } + }; + private: class PeerConn { friend PeerNetwork; /** connection addr, may be different due to passive mode */ - uint256_t peer_id; - NetAddr peer_addr; + uint256_t session_id; + PeerId peer_id; /** the underlying connection, may be invalid when connected = false */ conn_t conn; conn_t inbound_conn; @@ -365,10 +379,10 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { double ping_period; PeerConn() = delete; - PeerConn(const uint256_t &peer_id, + PeerConn(const uint256_t &session_id, const PeerId &peer_id, conn_t conn, conn_t inbound_conn, conn_t outbound_conn, const PeerNetwork *pn): - peer_id(peer_id), + session_id(session_id), conn(conn), inbound_conn(inbound_conn), outbound_conn(outbound_conn), @@ -396,33 +410,41 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { }; struct Peer { - uint256_t peer_id; + uint256_t session_id; + NetAddr addr; + double retry_delay; + ssize_t ntry; TimerEvent ev_retry_timer; conn_t peer_conn; - Peer(const conn_t &conn): - peer_id(), ev_retry_timer(), peer_conn(conn) {} + Peer(): + session_id(), + retry_delay(0), ntry(0), + ev_retry_timer() {} }; + /* connections whose PeerId is unknown */ std::unordered_map<NetAddr, conn_t> pending_peers; - std::unordered_map<NetAddr, BoxObj<Peer>> known_peers; - std::unordered_map<uint256_t, BoxObj<PeerConn>> pid2peer; + /* registered peers */ + std::unordered_map<PeerId, BoxObj<Peer>> known_peers; + /* peer connection deduplication map */ + std::unordered_map<uint256_t, BoxObj<PeerConn>> sid2peer; using pinfo_slock_t = std::shared_lock<std::shared_timed_mutex>; using pinfo_ulock_t = std::shared_lock<std::shared_timed_mutex>; mutable std::shared_timed_mutex known_peers_lock; - mutable std::shared_timed_mutex pid2peer_lock; + mutable std::shared_timed_mutex sid2peer_lock; peer_callback_t peer_cb; unknown_peer_callback_t unknown_peer_cb; const IdentityMode id_mode; - double retry_conn_delay; double ping_period; double conn_timeout; NetAddr listen_addr; bool allow_unknown_peer; + bool force_two_way_handshake; uint256_t my_nonce; struct MsgPing { @@ -464,12 +486,10 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { protected: ConnPool::Conn *create_conn() override { return new Conn(); } - virtual double gen_conn_timeout() { - return gen_rand_timeout(retry_conn_delay); - } void on_setup(const ConnPool::conn_t &) override; void on_teardown(const ConnPool::conn_t &) override; - uint256_t gen_peer_id(const conn_t &conn, const NetAddr &claimed_addr, const uint256_t &nonce) { + + uint256_t get_session_id(const conn_t &conn, const NetAddr &claimed_addr, const uint256_t &nonce) { DataStream tmp; if (!this->enable_tls || id_mode == ADDR_BASED) tmp << nonce << claimed_addr; @@ -478,14 +498,22 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { return tmp.get_hash(); } + PeerId get_peer_id(const conn_t &conn, const NetAddr &addr) { + DataStream tmp; + if (!this->enable_tls || id_mode == ADDR_BASED) + return PeerId(addr); + else + return PeerId(*conn->get_peer_cert()); + } + public: class Config: public MsgNet::Config { friend PeerNetwork; - double _retry_conn_delay; double _ping_period; double _conn_timeout; bool _allow_unknown_peer; + bool _force_two_way_handshake; IdentityMode _id_mode; public: @@ -493,18 +521,13 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { Config(const typename MsgNet::Config &config): MsgNet::Config(config), - _retry_conn_delay(2), _ping_period(30), _conn_timeout(180), _allow_unknown_peer(false), + _force_two_way_handshake(false), _id_mode(CERT_BASED) {} - Config &retry_conn_delay(double x) { - _retry_conn_delay = x; - return *this; - } - Config &ping_period(double x) { _ping_period = x; return *this; @@ -524,26 +547,37 @@ class PeerNetwork: public MsgNetwork<OpcodeType> { _allow_unknown_peer = x; return *this; } + + Config &force_two_way_handshake(bool x) { + _force_two_way_handshake = x; + return *this; + } }; PeerNetwork(const EventContext &ec, const Config &config): MsgNet(ec, config), id_mode(config._id_mode), - retry_conn_delay(config._retry_conn_delay), ping_period(config._ping_period), conn_timeout(config._conn_timeout), - allow_unknown_peer(config._allow_unknown_peer) { + allow_unknown_peer(config._allow_unknown_peer), + force_two_way_handshake(config._force_two_way_handshake) { this->reg_handler(generic_bind(&PeerNetwork::ping_handler, this, _1, _2)); this->reg_handler(generic_bind(&PeerNetwork::pong_handler, this, _1, _2)); } virtual ~PeerNetwork() { this->stop(); } - int32_t add_peer(const NetAddr &addr); - int32_t del_peer(const NetAddr &addr); - bool has_peer(const NetAddr &addr) const; + /* register a peer as known */ + int32_t add_peer(const PeerId &pid); + /* set the peer's public IP */ + int32_t set_peer_addr(const PeerId &pid, const NetAddr &addr); + /* try to connect to the peer: once (ntry = 1), indefinitely (ntry = -1), give up retry (ntry = 0) */ + int32_t try_conn_peer(const PeerId &pid, ssize_t ntry = -1, double retry_delay = 2); + /* unregister the peer */ + int32_t del_peer(const PeerId &pid); + bool has_peer(const PeerId &pid) const; size_t get_npending() const; - conn_t get_peer_conn(const NetAddr &addr) const; + conn_t get_peer_conn(const PeerId &addr) const; using MsgNet::send_msg; template<typename MsgType> inline bool send_msg(const MsgType &msg, const NetAddr &addr); @@ -703,16 +737,16 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { pending_peers.erase(addr); SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*conn).c_str()); auto p = conn->peer; - if (p) addr = p->peer_addr; + auto pid = p ? p->peer_id : PeerId(); pinfo_ulock_t _g(known_peers_lock); - auto it = known_peers.find(addr); + auto it = known_peers.find(pid); if (it == known_peers.end()) return; auto &pp = it->second; if (conn == pp->peer_conn) { - pinfo_ulock_t __g(pid2peer_lock); - auto it2 = pid2peer.find(pp->peer_id); - if (it2 != pid2peer.end()) + pinfo_ulock_t __g(sid2peer_lock); + auto it2 = sid2peer.find(pp->session_id); + if (it2 != sid2peer.end()) { auto &p = it2->second; if (p->connected && p->conn != conn) @@ -728,13 +762,13 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { p->connected = false; p->outbound_handshake = false; p->inbound_handshake = false; - pp->peer_id = uint256_t(); + pp->session_id = uint256_t(); PeerConn *peer = nullptr; { - pinfo_ulock_t __g(pid2peer_lock); - auto it2 = pid2peer.find(p->peer_id); + pinfo_ulock_t __g(sid2peer_lock); + auto it2 = sid2peer.find(p->session_id); peer = it2->second.unwrap(); - pid2peer.erase(it2); + sid2peer.erase(it2); } peer->conn = nullptr; conn->_dead_peer = peer; @@ -744,7 +778,7 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { } else { - if (!it->second->peer_id.is_null()) return; + if (!it->second->session_id.is_null()) return; } auto &ev_retry_timer = it->second->ev_retry_timer; ev_retry_timer = TimerEvent(this->disp_ec, [this, addr](TimerEvent &) { @@ -752,7 +786,10 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) { start_active_conn(addr); } catch (...) { this->disp_error_cb(std::current_exception()); } }); - ev_retry_timer.add(gen_conn_timeout()); + /* auto retry the connection */ + if (pp->ntry > 0) pp->ntry--; + if (pp->ntry) + ev_retry_timer.add(gen_rand_timeout(pp->retry_delay)); } template<typename O, O _, O __> @@ -798,8 +835,10 @@ void PeerNetwork<O, _, __>::move_peer_buffer(conn_t &old_conn, const conn_t &new template<typename O, O _, O __> void PeerNetwork<O, _, __>::check_handshake(PeerConn *p) { - if (!(p->inbound_handshake && p->outbound_handshake) || - p->connected) + if ((force_two_way_handshake && + !(p->inbound_handshake && p->outbound_handshake)) || + !(p->inbound_handshake || p->outbound_handshake) || + p->connected) return; p->clear_all_events(); if (p->inbound_conn && p->inbound_conn != p->conn) @@ -813,7 +852,7 @@ void PeerNetwork<O, _, __>::check_handshake(PeerConn *p) { { pinfo_ulock_t _g(known_peers_lock); auto &pp = known_peers[p->peer_addr]; - pp->peer_id = p->peer_id; + pp->session_id = p->session_id; pp->ev_retry_timer.del(); auto &old_conn = pp->peer_conn; auto &conn = p->conn; @@ -885,11 +924,11 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) { if (conn->is_terminated()) return; if (!msg.claimed_addr.is_null()) { - auto peer_id = gen_peer_id(conn, msg.claimed_addr, msg.nonce); + auto session_id = get_session_id(conn, msg.claimed_addr, msg.nonce); if (conn->get_mode() == Conn::ConnMode::PASSIVE) { pinfo_slock_t _g(known_peers_lock); - pinfo_ulock_t __g(pid2peer_lock); + pinfo_ulock_t __g(sid2peer_lock); if (!known_peers.count(msg.claimed_addr)) { this->user_tcall->async_call([this, addr=msg.claimed_addr, conn](ThreadCall::Handle &) { @@ -902,8 +941,8 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) { std::string(listen_addr).c_str(), std::string(*conn).c_str()); send_msg(MsgPong(listen_addr, my_nonce), conn); - auto it = pid2peer.find(peer_id); - if (it != pid2peer.end()) + auto it = sid2peer.find(session_id); + if (it != sid2peer.end()) { auto p = it->second.get(); if (p->connected) @@ -928,8 +967,9 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) { } else { - it = pid2peer.insert(std::make_pair(peer_id, - new PeerConn(peer_id, conn, conn, nullptr, this))).first; + it = sid2peer.insert(std::make_pair(session_id, + new PeerConn(session_id, get_peer_id(conn, msg.claimed_addr), + conn, conn, nullptr, this))).first; } auto p = it->second.get(); p->inbound_handshake = true; @@ -955,16 +995,16 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) { if (conn->is_terminated()) return; if (!msg.claimed_addr.is_null()) { - auto peer_id = gen_peer_id(conn, msg.claimed_addr, msg.nonce); + auto session_id = get_session_id(conn, msg.claimed_addr, msg.nonce); if (conn->get_mode() == Conn::ConnMode::ACTIVE) { pinfo_ulock_t _g(known_peers_lock); - pinfo_ulock_t __g(pid2peer_lock); + pinfo_ulock_t __g(sid2peer_lock); SALTICIDAE_LOG_INFO("%s outbound handshake to %s", std::string(listen_addr).c_str(), std::string(*conn).c_str()); - auto it = pid2peer.find(peer_id); - if (it != pid2peer.end()) + auto it = sid2peer.find(session_id); + if (it != sid2peer.end()) { auto p = it->second.get(); if (p->connected) @@ -996,8 +1036,9 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) { } else { - it = pid2peer.insert(std::make_pair(peer_id, - new PeerConn(peer_id, conn, nullptr, conn, this))).first; + it = sid2peer.insert(std::make_pair(session_id, + new PeerConn(session_id, get_peer_id(conn, msg.claimed_addr), + conn, nullptr, conn, this))).first; } auto p = it->second.get(); p->outbound_handshake = true; @@ -1050,20 +1091,14 @@ void PeerNetwork<O, _, __>::listen(NetAddr _listen_addr) { } template<typename O, O _, O __> -int32_t PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) { +int32_t PeerNetwork<O, _, __>::add_peer(const PeerId &pid) { auto id = this->gen_async_id(); - this->disp_tcall->async_call([this, addr, id](ThreadCall::Handle &) { + this->disp_tcall->async_call([this, pid, id](ThreadCall::Handle &) { try { pinfo_ulock_t _g(known_peers_lock); - if (known_peers.count(addr)) + if (known_peers.count(pid)) throw PeerNetworkError(SALTI_ERROR_PEER_ALREADY_EXISTS); - auto it = pending_peers.find(addr); - conn_t conn; - if (it == pending_peers.end()) - conn = start_active_conn(addr); - else - conn = it->second; - known_peers.insert(std::make_pair(addr, new Peer(conn))); + known_peers.insert(std::make_pair(pid, new Peer())); } catch (const PeerNetworkError &) { this->recoverable_error(std::current_exception(), id); } catch (...) { this->disp_error_cb(std::current_exception()); } @@ -1072,16 +1107,60 @@ int32_t PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) { } template<typename O, O _, O __> -int32_t PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) { +int32_t PeerNetwork<O, _, __>::try_conn_peer(const PeerId &pid, ssize_t ntry, double retry_delay) { + auto id = this->gen_async_id(); + this->disp_tcall->async_call([this, pid, ntry, retry_delay, id](ThreadCall::Handle &) { + try { + pinfo_ulock_t _g(known_peers_lock); + auto it = known_peers.find(pid); + if (it == known_peers.end()) + throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); + auto &p = it->second; + if (p.addr.is_null()) + throw PeerNetworkError(SALTI_ERROR_PEER_NOT_READY); + // FIXME: ??? + if (p.peer_conn) + p.peer_conn->disp_terminate(); + p.retry_delay = retry_delay; + p.peer_conn = ntry ? start_active_conn(p.addr) : conn_t(); + } catch (const PeerNetworkError &) { + this->recoverable_error(std::current_exception(), id); + } catch (...) { this->disp_error_cb(std::current_exception()); } + }); + return id; +} + +template<typename O, O _, O __> +int32_t PeerNetwork<O, _, __>::set_peer_addr(const PeerId &pid, const NetAddr &addr) { + auto id = this->gen_async_id(); + this->disp_tcall->async_call([this, pid, addr, id](ThreadCall::Handle &) { + try { + pinfo_ulock_t _g(known_peers_lock); + auto it = known_peers.find(pid); + if (it == known_peers.end()) + throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); + auto it2 = pending_peers.find(addr); + if (it2 != pending_peers.end()) + it->second.peer_conn = it2->second; + } catch (const PeerNetworkError &) { + this->recoverable_error(std::current_exception(), id); + } catch (...) { this->disp_error_cb(std::current_exception()); } + }); + return id; +} + + +template<typename O, O _, O __> +int32_t PeerNetwork<O, _, __>::del_peer(const PeerId &pid) { auto id = this->gen_async_id(); - this->disp_tcall->async_call([this, addr, id](ThreadCall::Handle &) { + this->disp_tcall->async_call([this, pid, id](ThreadCall::Handle &) { try { pinfo_ulock_t _g(known_peers_lock); - pinfo_ulock_t __g(pid2peer_lock); - auto it = known_peers.find(addr); + pinfo_ulock_t __g(sid2peer_lock); + auto it = known_peers.find(pid); if (it == known_peers.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - auto peer_id = it->second->peer_id; + auto addr = it->second->addr; known_peers.erase(it); auto it2 = pending_peers.find(addr); if (it2 != pending_peers.end()) @@ -1090,12 +1169,12 @@ int32_t PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) { this->disp_terminate(it2->second); pending_peers.erase(it2); } - auto it3 = pid2peer.find(peer_id); - if (it3 != pid2peer.end()) + auto it3 = sid2peer.find(pid); + if (it3 != sid2peer.end()) { auto &p = it3->second; this->disp_terminate(p->conn); - pid2peer.erase(it3); + sid2peer.erase(it3); } } catch (const PeerNetworkError &) { this->recoverable_error(std::current_exception(), id); @@ -1106,21 +1185,21 @@ int32_t PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) { template<typename O, O _, O __> typename PeerNetwork<O, _, __>::conn_t -PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &addr) const { +PeerNetwork<O, _, __>::get_peer_conn(const PeerId &pid) const { auto ret = *(static_cast<conn_t *>( this->disp_tcall->call([this, addr](ThreadCall::Handle &h) { conn_t conn; pinfo_slock_t _g(known_peers_lock); - pinfo_slock_t __g(pid2peer_lock); + pinfo_slock_t __g(sid2peer_lock); auto it = known_peers.find(addr); if (it == known_peers.end()) throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST); - if (it->second->peer_id.is_null()) + if (it->second->session_id.is_null()) conn = nullptr; else { - auto it2 = pid2peer.find(it->second->peer_id); - assert(it2 != pid2peer.end()); + auto it2 = sid2peer.find(it->second->session_id); + assert(it2 != sid2peer.end()); conn = it2->second->conn; } h.set_result(std::move(conn)); @@ -1313,7 +1392,7 @@ void msgnetwork_config_max_msg_queue_size(msgnetwork_config_t *self, size_t size void msgnetwork_config_burst_size(msgnetwork_config_t *self, size_t burst_size); void msgnetwork_config_max_listen_backlog(msgnetwork_config_t *self, int backlog); void msgnetwork_config_conn_server_timeout(msgnetwork_config_t *self, double timeout); -void msgnetwork_config_seg_buff_size(msgnetwork_config_t *self, size_t size); +void msgnetwork_config_recv_chunk_size(msgnetwork_config_t *self, size_t size); void msgnetwork_config_nworker(msgnetwork_config_t *self, size_t nworker); void msgnetwork_config_max_recv_buff_size(msgnetwork_config_t *self, size_t size); void msgnetwork_config_max_send_buff_size(msgnetwork_config_t *self, size_t size); @@ -1356,7 +1435,6 @@ bool msgnetwork_conn_is_terminated(const msgnetwork_conn_t *conn); peernetwork_config_t *peernetwork_config_new(); void peernetwork_config_free(const peernetwork_config_t *self); -void peernetwork_config_retry_conn_delay(peernetwork_config_t *self, double t); void peernetwork_config_ping_period(peernetwork_config_t *self, double t); void peernetwork_config_conn_timeout(peernetwork_config_t *self, double t); void peernetwork_config_id_mode(peernetwork_config_t *self, peernetwork_id_mode_t mode); diff --git a/src/conn.cpp b/src/conn.cpp index ba584c1..a5d60a7 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -66,7 +66,7 @@ void ConnPool::Conn::_send_data(const conn_t &conn, int fd, int events) { conn->cpool->worker_terminate(conn); return; } - ssize_t ret = conn->seg_buff_size; + ssize_t ret = conn->recv_chunk_size; for (;;) { bytearray_t buff_seg = conn->send_buffer.move_pop(); @@ -94,13 +94,13 @@ void ConnPool::Conn::_send_data(const conn_t &conn, int fd, int events) { bytearray_t(buff_seg.begin() + ret, buff_seg.end())); /* wait for the next write callback */ conn->ready_send = false; - //ev_write.add(); return; } } + /* the send_buffer is empty though the kernel buffer is still available, so + * temporarily mask the WRITE event and mark the `ready_send` flag */ conn->ev_socket.del(); conn->ev_socket.add(conn->ready_recv ? 0 : FdEvent::READ); - /* consumed the buffer but endpoint still seems to be writable */ conn->ready_send = true; } @@ -110,21 +110,21 @@ void ConnPool::Conn::_recv_data(const conn_t &conn, int fd, int events) { conn->cpool->worker_terminate(conn); return; } - const size_t seg_buff_size = conn->seg_buff_size; - ssize_t ret = seg_buff_size; - while (ret == (ssize_t)seg_buff_size) + const size_t recv_chunk_size = conn->recv_chunk_size; + ssize_t ret = recv_chunk_size; + while (ret == (ssize_t)recv_chunk_size) { if (conn->recv_buffer.len() >= conn->max_recv_buff_size) { - /* receive buffer is full, disable READ event */ + /* recv_buffer is full, temporarily mask the READ event */ conn->ev_socket.del(); conn->ev_socket.add(conn->ready_send ? 0 : FdEvent::WRITE); conn->ready_recv = true; return; } bytearray_t buff_seg; - buff_seg.resize(seg_buff_size); - ret = recv(fd, buff_seg.data(), seg_buff_size, 0); + buff_seg.resize(recv_chunk_size); + ret = recv(fd, buff_seg.data(), recv_chunk_size, 0); SALTICIDAE_LOG_DEBUG("socket read %zd bytes", ret); if (ret < 0) { @@ -136,14 +136,14 @@ void ConnPool::Conn::_recv_data(const conn_t &conn, int fd, int events) { } if (ret == 0) { - //SALTICIDAE_LOG_INFO("recv(%d) terminates", fd, strerror(errno)); + /* the remote closes the connection */ conn->cpool->worker_terminate(conn); return; } buff_seg.resize(ret); conn->recv_buffer.push(std::move(buff_seg)); } - //ev_read.add(); + /* wait for the next read callback */ conn->ready_recv = false; conn->cpool->on_read(conn); } @@ -155,7 +155,7 @@ void ConnPool::Conn::_send_data_tls(const conn_t &conn, int fd, int events) { conn->cpool->worker_terminate(conn); return; } - ssize_t ret = conn->seg_buff_size; + ssize_t ret = conn->recv_chunk_size; auto &tls = conn->tls; for (;;) { @@ -189,7 +189,6 @@ void ConnPool::Conn::_send_data_tls(const conn_t &conn, int fd, int events) { } conn->ev_socket.del(); conn->ev_socket.add(conn->ready_recv ? 0 : FdEvent::READ); - /* consumed the buffer but endpoint still seems to be writable */ conn->ready_send = true; } @@ -199,28 +198,26 @@ void ConnPool::Conn::_recv_data_tls(const conn_t &conn, int fd, int events) { conn->cpool->worker_terminate(conn); return; } - const size_t seg_buff_size = conn->seg_buff_size; - ssize_t ret = seg_buff_size; + const size_t recv_chunk_size = conn->recv_chunk_size; + ssize_t ret = recv_chunk_size; auto &tls = conn->tls; - while (ret == (ssize_t)seg_buff_size) + while (ret == (ssize_t)recv_chunk_size) { if (conn->recv_buffer.len() >= conn->max_recv_buff_size) { - /* receive buffer is full, disable READ event */ conn->ev_socket.del(); conn->ev_socket.add(conn->ready_send ? 0 : FdEvent::WRITE); conn->ready_recv = true; return; } bytearray_t buff_seg; - buff_seg.resize(seg_buff_size); - ret = tls->recv(buff_seg.data(), seg_buff_size); + buff_seg.resize(recv_chunk_size); + ret = tls->recv(buff_seg.data(), recv_chunk_size); SALTICIDAE_LOG_DEBUG("ssl read %zd bytes", ret); if (ret < 0) { if (tls->get_error(ret) == SSL_ERROR_WANT_READ) break; SALTICIDAE_LOG_INFO("recv(%d) failure: %s", fd, strerror(errno)); - /* connection err or half-opened connection */ conn->cpool->worker_terminate(conn); return; } @@ -247,6 +244,7 @@ void ConnPool::Conn::_recv_data_tls_handshake(const conn_t &conn, int, int) { { /* finishing TLS handshake */ conn->send_data_func = _send_data_tls; + /* do not start receiving data immediately */ conn->recv_data_func = _recv_data_dummy; conn->ev_socket.del(); conn->ev_socket.add(FdEvent::WRITE); @@ -320,14 +318,13 @@ void ConnPool::accept_client(int fd, int) { NetAddr addr((struct sockaddr_in *)&client_addr); conn_t conn = create_conn(); conn->send_buffer.set_capacity(max_send_buff_size); - conn->seg_buff_size = seg_buff_size; + conn->recv_chunk_size = recv_chunk_size; conn->max_recv_buff_size = max_recv_buff_size; conn->fd = client_fd; conn->cpool = this; conn->mode = Conn::PASSIVE; conn->addr = addr; add_conn(conn); - //SALTICIDAE_LOG_INFO("new %x", (uintptr_t)conn.get()); SALTICIDAE_LOG_INFO("accepted %s", std::string(*conn).c_str()); auto &worker = select_worker(); conn->worker = &worker; @@ -401,14 +398,13 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) { throw ConnPoolError(SALTI_ERROR_CONNECT, errno); conn_t conn = create_conn(); conn->send_buffer.set_capacity(max_send_buff_size); - conn->seg_buff_size = seg_buff_size; + conn->recv_chunk_size = recv_chunk_size; conn->max_recv_buff_size = max_recv_buff_size; conn->fd = fd; conn->cpool = this; conn->mode = Conn::ACTIVE; conn->addr = addr; add_conn(conn); - //SALTICIDAE_LOG_INFO("new %x", (uintptr_t)conn.get()); struct sockaddr_in sockin; memset(&sockin, 0, sizeof(struct sockaddr_in)); diff --git a/src/network.cpp b/src/network.cpp index e3eb56c..921ea03 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -43,8 +43,8 @@ void msgnetwork_config_conn_server_timeout(msgnetwork_config_t *self, double tim self->conn_server_timeout(timeout); } -void msgnetwork_config_seg_buff_size(msgnetwork_config_t *self, size_t size) { - self->seg_buff_size(size); +void msgnetwork_config_recv_chunk_size(msgnetwork_config_t *self, size_t size) { + self->recv_chunk_size(size); } void msgnetwork_config_nworker(msgnetwork_config_t *self, size_t nworker) { @@ -183,10 +183,6 @@ peernetwork_config_t *peernetwork_config_new() { void peernetwork_config_free(const peernetwork_config_t *self) { delete self; } -void peernetwork_config_retry_conn_delay(peernetwork_config_t *self, double t) { - self->retry_conn_delay(t); -} - void peernetwork_config_ping_period(peernetwork_config_t *self, double t) { self->ping_period(t); } diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp index 4e16e30..dca9cf4 100644 --- a/test/test_p2p_stress.cpp +++ b/test/test_p2p_stress.cpp @@ -107,7 +107,7 @@ struct AppContext { std::unordered_map<NetAddr, TestContext> tc; }; -void install_proto(AppContext &app, const size_t &seg_buff_size) { +void install_proto(AppContext &app, const size_t &recv_chunk_size) { auto &ec = app.ec; auto &net = *app.net; auto send_rand = [&](int size, const MyNet::conn_t &conn, TestContext &tc) { @@ -157,7 +157,7 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) { exit(1); } - if (tc.state == seg_buff_size * 2) + if (tc.state == recv_chunk_size * 2) { send_rand(tc.state, conn, tc); tc.state = -1; @@ -175,7 +175,7 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) { SALTICIDAE_LOG_INFO("rand-bomboard phase, ending in %.2f secs", t); } else if (tc.state == -1) - send_rand(rand() % (seg_buff_size * 10), conn, tc); + send_rand(rand() % (recv_chunk_size * 10), conn, tc); else send_rand(++tc.state, conn, tc); }); @@ -192,14 +192,14 @@ int main(int argc, char **argv) { Config config; auto opt_no_msg = Config::OptValFlag::create(false); auto opt_npeers = Config::OptValInt::create(5); - auto opt_seg_buff_size = Config::OptValInt::create(4096); + auto opt_recv_chunk_size = Config::OptValInt::create(4096); auto opt_nworker = Config::OptValInt::create(2); auto opt_conn_timeout = Config::OptValDouble::create(5); auto opt_ping_peroid = Config::OptValDouble::create(2); auto opt_help = Config::OptValFlag::create(false); config.add_opt("no-msg", opt_no_msg, Config::SWITCH_ON); config.add_opt("npeers", opt_npeers, Config::SET_VAL); - config.add_opt("seg-buff-size", opt_seg_buff_size, Config::SET_VAL); + config.add_opt("seg-buff-size", opt_recv_chunk_size, Config::SET_VAL); config.add_opt("nworker", opt_nworker, Config::SET_VAL); config.add_opt("conn-timeout", opt_conn_timeout, Config::SET_VAL); config.add_opt("ping-period", opt_ping_peroid, Config::SET_VAL); @@ -210,7 +210,7 @@ int main(int argc, char **argv) { config.print_help(); exit(0); } - size_t seg_buff_size = opt_seg_buff_size->get(); + size_t recv_chunk_size = opt_recv_chunk_size->get(); for (int i = 0; i < opt_npeers->get(); i++) addrs.push_back(NetAddr("127.0.0.1:" + std::to_string(12345 + i))); std::vector<AppContext> apps; @@ -223,13 +223,13 @@ int main(int argc, char **argv) { a.net = new MyNet(a.ec, MyNet::Config( salticidae::ConnPool::Config() .nworker(opt_nworker->get()) - .seg_buff_size(seg_buff_size)) + .recv_chunk_size(recv_chunk_size)) .conn_timeout(opt_conn_timeout->get()) .ping_period(opt_ping_peroid->get()) .max_msg_size(65536)); a.tcall = new ThreadCall(a.ec); if (!opt_no_msg->get()) - install_proto(a, seg_buff_size); + install_proto(a, recv_chunk_size); a.net->start(); } |