aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/salticidae/conn.h21
-rw-r--r--include/salticidae/crypto.h4
-rw-r--r--include/salticidae/network.h236
-rw-r--r--src/conn.cpp44
-rw-r--r--src/network.cpp8
-rw-r--r--test/test_p2p_stress.cpp16
6 files changed, 204 insertions, 125 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index b43d3c2..d08ef91 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -74,7 +74,7 @@ class ConnPool {
protected:
std::atomic<bool> terminated;
- size_t seg_buff_size;
+ size_t recv_chunk_size;
size_t max_recv_buff_size;
int fd;
Worker *worker;
@@ -112,6 +112,13 @@ class ConnPool {
public:
Conn(): terminated(false), worker(nullptr),
+ // recv_chunk_size initialized later
+ // max_recv_buff_size initialized later
+ // fd initialized later
+ // worker initialized later
+ // cpool initialized later
+ // mode initialized later
+ // addr initialized later
ready_send(false), ready_recv(false),
send_data_func(nullptr), recv_data_func(nullptr),
tls(nullptr), peer_cert(nullptr) {}
@@ -183,7 +190,7 @@ class ConnPool {
private:
const int max_listen_backlog;
const double conn_server_timeout;
- const size_t seg_buff_size;
+ const size_t recv_chunk_size;
const size_t max_recv_buff_size;
const size_t max_send_buff_size;
tls_context_t tls_ctx;
@@ -361,7 +368,7 @@ class ConnPool {
friend class ConnPool;
int _max_listen_backlog;
double _conn_server_timeout;
- size_t _seg_buff_size;
+ size_t _recv_chunk_size;
size_t _max_recv_buff_size;
size_t _max_send_buff_size;
size_t _nworker;
@@ -377,7 +384,7 @@ class ConnPool {
Config():
_max_listen_backlog(10),
_conn_server_timeout(2),
- _seg_buff_size(4096),
+ _recv_chunk_size(4096),
_max_recv_buff_size(4096),
_max_send_buff_size(0),
_nworker(1),
@@ -399,8 +406,8 @@ class ConnPool {
return *this;
}
- Config &seg_buff_size(size_t x) {
- _seg_buff_size = x;
+ Config &recv_chunk_size(size_t x) {
+ _recv_chunk_size = x;
return *this;
}
@@ -461,7 +468,7 @@ class ConnPool {
async_id(0),
max_listen_backlog(config._max_listen_backlog),
conn_server_timeout(config._conn_server_timeout),
- seg_buff_size(config._seg_buff_size),
+ recv_chunk_size(config._recv_chunk_size),
max_recv_buff_size(config._max_recv_buff_size),
max_send_buff_size(config._max_send_buff_size),
tls_ctx(nullptr),
diff --git a/include/salticidae/crypto.h b/include/salticidae/crypto.h
index 65acc2d..d0553fb 100644
--- a/include/salticidae/crypto.h
+++ b/include/salticidae/crypto.h
@@ -376,7 +376,9 @@ class TLS {
TLS(const TLS &) = delete;
TLS(TLS &&other): ssl(other.ssl) { other.ssl = nullptr; }
- bool do_handshake(int &want_io_type) { /* 0 for read, 1 for write */
+ bool do_handshake(int &want_io_type) {
+ /* want_io_type: 0 for read, 1 for write */
+ /* return true if handshake is completed */
auto ret = SSL_do_handshake(ssl);
if (ret == 1) return true;
auto err = SSL_get_error(ssl, ret);
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index dd05ed3..152ca60 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -345,12 +345,26 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
using peer_callback_t = std::function<void(const conn_t &peer_conn, bool connected)>;
using unknown_peer_callback_t = std::function<void(const NetAddr &claimed_addr, const X509 *cert)>;
+ struct PeerId: public uint256_t {
+ PeerId(const NetAddr &addr) {
+ DataStream tmp;
+ tmp << addr;
+ *this = tmp.get_hash();
+ }
+
+ PeerId(const X509 &cert) {
+ DataStream tmp;
+ tmp << cert.get_der();
+ *this = tmp.get_hash();
+ }
+ };
+
private:
class PeerConn {
friend PeerNetwork;
/** connection addr, may be different due to passive mode */
- uint256_t peer_id;
- NetAddr peer_addr;
+ uint256_t session_id;
+ PeerId peer_id;
/** the underlying connection, may be invalid when connected = false */
conn_t conn;
conn_t inbound_conn;
@@ -365,10 +379,10 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
double ping_period;
PeerConn() = delete;
- PeerConn(const uint256_t &peer_id,
+ PeerConn(const uint256_t &session_id, const PeerId &peer_id,
conn_t conn, conn_t inbound_conn, conn_t outbound_conn,
const PeerNetwork *pn):
- peer_id(peer_id),
+ session_id(session_id),
conn(conn),
inbound_conn(inbound_conn),
outbound_conn(outbound_conn),
@@ -396,33 +410,41 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
};
struct Peer {
- uint256_t peer_id;
+ uint256_t session_id;
+ NetAddr addr;
+ double retry_delay;
+ ssize_t ntry;
TimerEvent ev_retry_timer;
conn_t peer_conn;
- Peer(const conn_t &conn):
- peer_id(), ev_retry_timer(), peer_conn(conn) {}
+ Peer():
+ session_id(),
+ retry_delay(0), ntry(0),
+ ev_retry_timer() {}
};
+ /* connections whose PeerId is unknown */
std::unordered_map<NetAddr, conn_t> pending_peers;
- std::unordered_map<NetAddr, BoxObj<Peer>> known_peers;
- std::unordered_map<uint256_t, BoxObj<PeerConn>> pid2peer;
+ /* registered peers */
+ std::unordered_map<PeerId, BoxObj<Peer>> known_peers;
+ /* peer connection deduplication map */
+ std::unordered_map<uint256_t, BoxObj<PeerConn>> sid2peer;
using pinfo_slock_t = std::shared_lock<std::shared_timed_mutex>;
using pinfo_ulock_t = std::shared_lock<std::shared_timed_mutex>;
mutable std::shared_timed_mutex known_peers_lock;
- mutable std::shared_timed_mutex pid2peer_lock;
+ mutable std::shared_timed_mutex sid2peer_lock;
peer_callback_t peer_cb;
unknown_peer_callback_t unknown_peer_cb;
const IdentityMode id_mode;
- double retry_conn_delay;
double ping_period;
double conn_timeout;
NetAddr listen_addr;
bool allow_unknown_peer;
+ bool force_two_way_handshake;
uint256_t my_nonce;
struct MsgPing {
@@ -464,12 +486,10 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
protected:
ConnPool::Conn *create_conn() override { return new Conn(); }
- virtual double gen_conn_timeout() {
- return gen_rand_timeout(retry_conn_delay);
- }
void on_setup(const ConnPool::conn_t &) override;
void on_teardown(const ConnPool::conn_t &) override;
- uint256_t gen_peer_id(const conn_t &conn, const NetAddr &claimed_addr, const uint256_t &nonce) {
+
+ uint256_t get_session_id(const conn_t &conn, const NetAddr &claimed_addr, const uint256_t &nonce) {
DataStream tmp;
if (!this->enable_tls || id_mode == ADDR_BASED)
tmp << nonce << claimed_addr;
@@ -478,14 +498,22 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
return tmp.get_hash();
}
+ PeerId get_peer_id(const conn_t &conn, const NetAddr &addr) {
+ DataStream tmp;
+ if (!this->enable_tls || id_mode == ADDR_BASED)
+ return PeerId(addr);
+ else
+ return PeerId(*conn->get_peer_cert());
+ }
+
public:
class Config: public MsgNet::Config {
friend PeerNetwork;
- double _retry_conn_delay;
double _ping_period;
double _conn_timeout;
bool _allow_unknown_peer;
+ bool _force_two_way_handshake;
IdentityMode _id_mode;
public:
@@ -493,18 +521,13 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
Config(const typename MsgNet::Config &config):
MsgNet::Config(config),
- _retry_conn_delay(2),
_ping_period(30),
_conn_timeout(180),
_allow_unknown_peer(false),
+ _force_two_way_handshake(false),
_id_mode(CERT_BASED) {}
- Config &retry_conn_delay(double x) {
- _retry_conn_delay = x;
- return *this;
- }
-
Config &ping_period(double x) {
_ping_period = x;
return *this;
@@ -524,26 +547,37 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
_allow_unknown_peer = x;
return *this;
}
+
+ Config &force_two_way_handshake(bool x) {
+ _force_two_way_handshake = x;
+ return *this;
+ }
};
PeerNetwork(const EventContext &ec, const Config &config):
MsgNet(ec, config),
id_mode(config._id_mode),
- retry_conn_delay(config._retry_conn_delay),
ping_period(config._ping_period),
conn_timeout(config._conn_timeout),
- allow_unknown_peer(config._allow_unknown_peer) {
+ allow_unknown_peer(config._allow_unknown_peer),
+ force_two_way_handshake(config._force_two_way_handshake) {
this->reg_handler(generic_bind(&PeerNetwork::ping_handler, this, _1, _2));
this->reg_handler(generic_bind(&PeerNetwork::pong_handler, this, _1, _2));
}
virtual ~PeerNetwork() { this->stop(); }
- int32_t add_peer(const NetAddr &addr);
- int32_t del_peer(const NetAddr &addr);
- bool has_peer(const NetAddr &addr) const;
+ /* register a peer as known */
+ int32_t add_peer(const PeerId &pid);
+ /* set the peer's public IP */
+ int32_t set_peer_addr(const PeerId &pid, const NetAddr &addr);
+ /* try to connect to the peer: once (ntry = 1), indefinitely (ntry = -1), give up retry (ntry = 0) */
+ int32_t try_conn_peer(const PeerId &pid, ssize_t ntry = -1, double retry_delay = 2);
+ /* unregister the peer */
+ int32_t del_peer(const PeerId &pid);
+ bool has_peer(const PeerId &pid) const;
size_t get_npending() const;
- conn_t get_peer_conn(const NetAddr &addr) const;
+ conn_t get_peer_conn(const PeerId &addr) const;
using MsgNet::send_msg;
template<typename MsgType>
inline bool send_msg(const MsgType &msg, const NetAddr &addr);
@@ -703,16 +737,16 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
pending_peers.erase(addr);
SALTICIDAE_LOG_INFO("connection lost: %s", std::string(*conn).c_str());
auto p = conn->peer;
- if (p) addr = p->peer_addr;
+ auto pid = p ? p->peer_id : PeerId();
pinfo_ulock_t _g(known_peers_lock);
- auto it = known_peers.find(addr);
+ auto it = known_peers.find(pid);
if (it == known_peers.end()) return;
auto &pp = it->second;
if (conn == pp->peer_conn)
{
- pinfo_ulock_t __g(pid2peer_lock);
- auto it2 = pid2peer.find(pp->peer_id);
- if (it2 != pid2peer.end())
+ pinfo_ulock_t __g(sid2peer_lock);
+ auto it2 = sid2peer.find(pp->session_id);
+ if (it2 != sid2peer.end())
{
auto &p = it2->second;
if (p->connected && p->conn != conn)
@@ -728,13 +762,13 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
p->connected = false;
p->outbound_handshake = false;
p->inbound_handshake = false;
- pp->peer_id = uint256_t();
+ pp->session_id = uint256_t();
PeerConn *peer = nullptr;
{
- pinfo_ulock_t __g(pid2peer_lock);
- auto it2 = pid2peer.find(p->peer_id);
+ pinfo_ulock_t __g(sid2peer_lock);
+ auto it2 = sid2peer.find(p->session_id);
peer = it2->second.unwrap();
- pid2peer.erase(it2);
+ sid2peer.erase(it2);
}
peer->conn = nullptr;
conn->_dead_peer = peer;
@@ -744,7 +778,7 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
}
else
{
- if (!it->second->peer_id.is_null()) return;
+ if (!it->second->session_id.is_null()) return;
}
auto &ev_retry_timer = it->second->ev_retry_timer;
ev_retry_timer = TimerEvent(this->disp_ec, [this, addr](TimerEvent &) {
@@ -752,7 +786,10 @@ void PeerNetwork<O, _, __>::on_teardown(const ConnPool::conn_t &_conn) {
start_active_conn(addr);
} catch (...) { this->disp_error_cb(std::current_exception()); }
});
- ev_retry_timer.add(gen_conn_timeout());
+ /* auto retry the connection */
+ if (pp->ntry > 0) pp->ntry--;
+ if (pp->ntry)
+ ev_retry_timer.add(gen_rand_timeout(pp->retry_delay));
}
template<typename O, O _, O __>
@@ -798,8 +835,10 @@ void PeerNetwork<O, _, __>::move_peer_buffer(conn_t &old_conn, const conn_t &new
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::check_handshake(PeerConn *p) {
- if (!(p->inbound_handshake && p->outbound_handshake) ||
- p->connected)
+ if ((force_two_way_handshake &&
+ !(p->inbound_handshake && p->outbound_handshake)) ||
+ !(p->inbound_handshake || p->outbound_handshake) ||
+ p->connected)
return;
p->clear_all_events();
if (p->inbound_conn && p->inbound_conn != p->conn)
@@ -813,7 +852,7 @@ void PeerNetwork<O, _, __>::check_handshake(PeerConn *p) {
{
pinfo_ulock_t _g(known_peers_lock);
auto &pp = known_peers[p->peer_addr];
- pp->peer_id = p->peer_id;
+ pp->session_id = p->session_id;
pp->ev_retry_timer.del();
auto &old_conn = pp->peer_conn;
auto &conn = p->conn;
@@ -885,11 +924,11 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) {
if (conn->is_terminated()) return;
if (!msg.claimed_addr.is_null())
{
- auto peer_id = gen_peer_id(conn, msg.claimed_addr, msg.nonce);
+ auto session_id = get_session_id(conn, msg.claimed_addr, msg.nonce);
if (conn->get_mode() == Conn::ConnMode::PASSIVE)
{
pinfo_slock_t _g(known_peers_lock);
- pinfo_ulock_t __g(pid2peer_lock);
+ pinfo_ulock_t __g(sid2peer_lock);
if (!known_peers.count(msg.claimed_addr))
{
this->user_tcall->async_call([this, addr=msg.claimed_addr, conn](ThreadCall::Handle &) {
@@ -902,8 +941,8 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) {
std::string(listen_addr).c_str(),
std::string(*conn).c_str());
send_msg(MsgPong(listen_addr, my_nonce), conn);
- auto it = pid2peer.find(peer_id);
- if (it != pid2peer.end())
+ auto it = sid2peer.find(session_id);
+ if (it != sid2peer.end())
{
auto p = it->second.get();
if (p->connected)
@@ -928,8 +967,9 @@ void PeerNetwork<O, _, __>::ping_handler(MsgPing &&msg, const conn_t &conn) {
}
else
{
- it = pid2peer.insert(std::make_pair(peer_id,
- new PeerConn(peer_id, conn, conn, nullptr, this))).first;
+ it = sid2peer.insert(std::make_pair(session_id,
+ new PeerConn(session_id, get_peer_id(conn, msg.claimed_addr),
+ conn, conn, nullptr, this))).first;
}
auto p = it->second.get();
p->inbound_handshake = true;
@@ -955,16 +995,16 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) {
if (conn->is_terminated()) return;
if (!msg.claimed_addr.is_null())
{
- auto peer_id = gen_peer_id(conn, msg.claimed_addr, msg.nonce);
+ auto session_id = get_session_id(conn, msg.claimed_addr, msg.nonce);
if (conn->get_mode() == Conn::ConnMode::ACTIVE)
{
pinfo_ulock_t _g(known_peers_lock);
- pinfo_ulock_t __g(pid2peer_lock);
+ pinfo_ulock_t __g(sid2peer_lock);
SALTICIDAE_LOG_INFO("%s outbound handshake to %s",
std::string(listen_addr).c_str(),
std::string(*conn).c_str());
- auto it = pid2peer.find(peer_id);
- if (it != pid2peer.end())
+ auto it = sid2peer.find(session_id);
+ if (it != sid2peer.end())
{
auto p = it->second.get();
if (p->connected)
@@ -996,8 +1036,9 @@ void PeerNetwork<O, _, __>::pong_handler(MsgPong &&msg, const conn_t &conn) {
}
else
{
- it = pid2peer.insert(std::make_pair(peer_id,
- new PeerConn(peer_id, conn, nullptr, conn, this))).first;
+ it = sid2peer.insert(std::make_pair(session_id,
+ new PeerConn(session_id, get_peer_id(conn, msg.claimed_addr),
+ conn, nullptr, conn, this))).first;
}
auto p = it->second.get();
p->outbound_handshake = true;
@@ -1050,20 +1091,14 @@ void PeerNetwork<O, _, __>::listen(NetAddr _listen_addr) {
}
template<typename O, O _, O __>
-int32_t PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
+int32_t PeerNetwork<O, _, __>::add_peer(const PeerId &pid) {
auto id = this->gen_async_id();
- this->disp_tcall->async_call([this, addr, id](ThreadCall::Handle &) {
+ this->disp_tcall->async_call([this, pid, id](ThreadCall::Handle &) {
try {
pinfo_ulock_t _g(known_peers_lock);
- if (known_peers.count(addr))
+ if (known_peers.count(pid))
throw PeerNetworkError(SALTI_ERROR_PEER_ALREADY_EXISTS);
- auto it = pending_peers.find(addr);
- conn_t conn;
- if (it == pending_peers.end())
- conn = start_active_conn(addr);
- else
- conn = it->second;
- known_peers.insert(std::make_pair(addr, new Peer(conn)));
+ known_peers.insert(std::make_pair(pid, new Peer()));
} catch (const PeerNetworkError &) {
this->recoverable_error(std::current_exception(), id);
} catch (...) { this->disp_error_cb(std::current_exception()); }
@@ -1072,16 +1107,60 @@ int32_t PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
}
template<typename O, O _, O __>
-int32_t PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) {
+int32_t PeerNetwork<O, _, __>::try_conn_peer(const PeerId &pid, ssize_t ntry, double retry_delay) {
+ auto id = this->gen_async_id();
+ this->disp_tcall->async_call([this, pid, ntry, retry_delay, id](ThreadCall::Handle &) {
+ try {
+ pinfo_ulock_t _g(known_peers_lock);
+ auto it = known_peers.find(pid);
+ if (it == known_peers.end())
+ throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
+ auto &p = it->second;
+ if (p.addr.is_null())
+ throw PeerNetworkError(SALTI_ERROR_PEER_NOT_READY);
+ // FIXME: ???
+ if (p.peer_conn)
+ p.peer_conn->disp_terminate();
+ p.retry_delay = retry_delay;
+ p.peer_conn = ntry ? start_active_conn(p.addr) : conn_t();
+ } catch (const PeerNetworkError &) {
+ this->recoverable_error(std::current_exception(), id);
+ } catch (...) { this->disp_error_cb(std::current_exception()); }
+ });
+ return id;
+}
+
+template<typename O, O _, O __>
+int32_t PeerNetwork<O, _, __>::set_peer_addr(const PeerId &pid, const NetAddr &addr) {
+ auto id = this->gen_async_id();
+ this->disp_tcall->async_call([this, pid, addr, id](ThreadCall::Handle &) {
+ try {
+ pinfo_ulock_t _g(known_peers_lock);
+ auto it = known_peers.find(pid);
+ if (it == known_peers.end())
+ throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
+ auto it2 = pending_peers.find(addr);
+ if (it2 != pending_peers.end())
+ it->second.peer_conn = it2->second;
+ } catch (const PeerNetworkError &) {
+ this->recoverable_error(std::current_exception(), id);
+ } catch (...) { this->disp_error_cb(std::current_exception()); }
+ });
+ return id;
+}
+
+
+template<typename O, O _, O __>
+int32_t PeerNetwork<O, _, __>::del_peer(const PeerId &pid) {
auto id = this->gen_async_id();
- this->disp_tcall->async_call([this, addr, id](ThreadCall::Handle &) {
+ this->disp_tcall->async_call([this, pid, id](ThreadCall::Handle &) {
try {
pinfo_ulock_t _g(known_peers_lock);
- pinfo_ulock_t __g(pid2peer_lock);
- auto it = known_peers.find(addr);
+ pinfo_ulock_t __g(sid2peer_lock);
+ auto it = known_peers.find(pid);
if (it == known_peers.end())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- auto peer_id = it->second->peer_id;
+ auto addr = it->second->addr;
known_peers.erase(it);
auto it2 = pending_peers.find(addr);
if (it2 != pending_peers.end())
@@ -1090,12 +1169,12 @@ int32_t PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) {
this->disp_terminate(it2->second);
pending_peers.erase(it2);
}
- auto it3 = pid2peer.find(peer_id);
- if (it3 != pid2peer.end())
+ auto it3 = sid2peer.find(pid);
+ if (it3 != sid2peer.end())
{
auto &p = it3->second;
this->disp_terminate(p->conn);
- pid2peer.erase(it3);
+ sid2peer.erase(it3);
}
} catch (const PeerNetworkError &) {
this->recoverable_error(std::current_exception(), id);
@@ -1106,21 +1185,21 @@ int32_t PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) {
template<typename O, O _, O __>
typename PeerNetwork<O, _, __>::conn_t
-PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &addr) const {
+PeerNetwork<O, _, __>::get_peer_conn(const PeerId &pid) const {
auto ret = *(static_cast<conn_t *>(
this->disp_tcall->call([this, addr](ThreadCall::Handle &h) {
conn_t conn;
pinfo_slock_t _g(known_peers_lock);
- pinfo_slock_t __g(pid2peer_lock);
+ pinfo_slock_t __g(sid2peer_lock);
auto it = known_peers.find(addr);
if (it == known_peers.end())
throw PeerNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
- if (it->second->peer_id.is_null())
+ if (it->second->session_id.is_null())
conn = nullptr;
else
{
- auto it2 = pid2peer.find(it->second->peer_id);
- assert(it2 != pid2peer.end());
+ auto it2 = sid2peer.find(it->second->session_id);
+ assert(it2 != sid2peer.end());
conn = it2->second->conn;
}
h.set_result(std::move(conn));
@@ -1313,7 +1392,7 @@ void msgnetwork_config_max_msg_queue_size(msgnetwork_config_t *self, size_t size
void msgnetwork_config_burst_size(msgnetwork_config_t *self, size_t burst_size);
void msgnetwork_config_max_listen_backlog(msgnetwork_config_t *self, int backlog);
void msgnetwork_config_conn_server_timeout(msgnetwork_config_t *self, double timeout);
-void msgnetwork_config_seg_buff_size(msgnetwork_config_t *self, size_t size);
+void msgnetwork_config_recv_chunk_size(msgnetwork_config_t *self, size_t size);
void msgnetwork_config_nworker(msgnetwork_config_t *self, size_t nworker);
void msgnetwork_config_max_recv_buff_size(msgnetwork_config_t *self, size_t size);
void msgnetwork_config_max_send_buff_size(msgnetwork_config_t *self, size_t size);
@@ -1356,7 +1435,6 @@ bool msgnetwork_conn_is_terminated(const msgnetwork_conn_t *conn);
peernetwork_config_t *peernetwork_config_new();
void peernetwork_config_free(const peernetwork_config_t *self);
-void peernetwork_config_retry_conn_delay(peernetwork_config_t *self, double t);
void peernetwork_config_ping_period(peernetwork_config_t *self, double t);
void peernetwork_config_conn_timeout(peernetwork_config_t *self, double t);
void peernetwork_config_id_mode(peernetwork_config_t *self, peernetwork_id_mode_t mode);
diff --git a/src/conn.cpp b/src/conn.cpp
index ba584c1..a5d60a7 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -66,7 +66,7 @@ void ConnPool::Conn::_send_data(const conn_t &conn, int fd, int events) {
conn->cpool->worker_terminate(conn);
return;
}
- ssize_t ret = conn->seg_buff_size;
+ ssize_t ret = conn->recv_chunk_size;
for (;;)
{
bytearray_t buff_seg = conn->send_buffer.move_pop();
@@ -94,13 +94,13 @@ void ConnPool::Conn::_send_data(const conn_t &conn, int fd, int events) {
bytearray_t(buff_seg.begin() + ret, buff_seg.end()));
/* wait for the next write callback */
conn->ready_send = false;
- //ev_write.add();
return;
}
}
+ /* the send_buffer is empty though the kernel buffer is still available, so
+ * temporarily mask the WRITE event and mark the `ready_send` flag */
conn->ev_socket.del();
conn->ev_socket.add(conn->ready_recv ? 0 : FdEvent::READ);
- /* consumed the buffer but endpoint still seems to be writable */
conn->ready_send = true;
}
@@ -110,21 +110,21 @@ void ConnPool::Conn::_recv_data(const conn_t &conn, int fd, int events) {
conn->cpool->worker_terminate(conn);
return;
}
- const size_t seg_buff_size = conn->seg_buff_size;
- ssize_t ret = seg_buff_size;
- while (ret == (ssize_t)seg_buff_size)
+ const size_t recv_chunk_size = conn->recv_chunk_size;
+ ssize_t ret = recv_chunk_size;
+ while (ret == (ssize_t)recv_chunk_size)
{
if (conn->recv_buffer.len() >= conn->max_recv_buff_size)
{
- /* receive buffer is full, disable READ event */
+ /* recv_buffer is full, temporarily mask the READ event */
conn->ev_socket.del();
conn->ev_socket.add(conn->ready_send ? 0 : FdEvent::WRITE);
conn->ready_recv = true;
return;
}
bytearray_t buff_seg;
- buff_seg.resize(seg_buff_size);
- ret = recv(fd, buff_seg.data(), seg_buff_size, 0);
+ buff_seg.resize(recv_chunk_size);
+ ret = recv(fd, buff_seg.data(), recv_chunk_size, 0);
SALTICIDAE_LOG_DEBUG("socket read %zd bytes", ret);
if (ret < 0)
{
@@ -136,14 +136,14 @@ void ConnPool::Conn::_recv_data(const conn_t &conn, int fd, int events) {
}
if (ret == 0)
{
- //SALTICIDAE_LOG_INFO("recv(%d) terminates", fd, strerror(errno));
+ /* the remote closes the connection */
conn->cpool->worker_terminate(conn);
return;
}
buff_seg.resize(ret);
conn->recv_buffer.push(std::move(buff_seg));
}
- //ev_read.add();
+ /* wait for the next read callback */
conn->ready_recv = false;
conn->cpool->on_read(conn);
}
@@ -155,7 +155,7 @@ void ConnPool::Conn::_send_data_tls(const conn_t &conn, int fd, int events) {
conn->cpool->worker_terminate(conn);
return;
}
- ssize_t ret = conn->seg_buff_size;
+ ssize_t ret = conn->recv_chunk_size;
auto &tls = conn->tls;
for (;;)
{
@@ -189,7 +189,6 @@ void ConnPool::Conn::_send_data_tls(const conn_t &conn, int fd, int events) {
}
conn->ev_socket.del();
conn->ev_socket.add(conn->ready_recv ? 0 : FdEvent::READ);
- /* consumed the buffer but endpoint still seems to be writable */
conn->ready_send = true;
}
@@ -199,28 +198,26 @@ void ConnPool::Conn::_recv_data_tls(const conn_t &conn, int fd, int events) {
conn->cpool->worker_terminate(conn);
return;
}
- const size_t seg_buff_size = conn->seg_buff_size;
- ssize_t ret = seg_buff_size;
+ const size_t recv_chunk_size = conn->recv_chunk_size;
+ ssize_t ret = recv_chunk_size;
auto &tls = conn->tls;
- while (ret == (ssize_t)seg_buff_size)
+ while (ret == (ssize_t)recv_chunk_size)
{
if (conn->recv_buffer.len() >= conn->max_recv_buff_size)
{
- /* receive buffer is full, disable READ event */
conn->ev_socket.del();
conn->ev_socket.add(conn->ready_send ? 0 : FdEvent::WRITE);
conn->ready_recv = true;
return;
}
bytearray_t buff_seg;
- buff_seg.resize(seg_buff_size);
- ret = tls->recv(buff_seg.data(), seg_buff_size);
+ buff_seg.resize(recv_chunk_size);
+ ret = tls->recv(buff_seg.data(), recv_chunk_size);
SALTICIDAE_LOG_DEBUG("ssl read %zd bytes", ret);
if (ret < 0)
{
if (tls->get_error(ret) == SSL_ERROR_WANT_READ) break;
SALTICIDAE_LOG_INFO("recv(%d) failure: %s", fd, strerror(errno));
- /* connection err or half-opened connection */
conn->cpool->worker_terminate(conn);
return;
}
@@ -247,6 +244,7 @@ void ConnPool::Conn::_recv_data_tls_handshake(const conn_t &conn, int, int) {
{
/* finishing TLS handshake */
conn->send_data_func = _send_data_tls;
+ /* do not start receiving data immediately */
conn->recv_data_func = _recv_data_dummy;
conn->ev_socket.del();
conn->ev_socket.add(FdEvent::WRITE);
@@ -320,14 +318,13 @@ void ConnPool::accept_client(int fd, int) {
NetAddr addr((struct sockaddr_in *)&client_addr);
conn_t conn = create_conn();
conn->send_buffer.set_capacity(max_send_buff_size);
- conn->seg_buff_size = seg_buff_size;
+ conn->recv_chunk_size = recv_chunk_size;
conn->max_recv_buff_size = max_recv_buff_size;
conn->fd = client_fd;
conn->cpool = this;
conn->mode = Conn::PASSIVE;
conn->addr = addr;
add_conn(conn);
- //SALTICIDAE_LOG_INFO("new %x", (uintptr_t)conn.get());
SALTICIDAE_LOG_INFO("accepted %s", std::string(*conn).c_str());
auto &worker = select_worker();
conn->worker = &worker;
@@ -401,14 +398,13 @@ ConnPool::conn_t ConnPool::_connect(const NetAddr &addr) {
throw ConnPoolError(SALTI_ERROR_CONNECT, errno);
conn_t conn = create_conn();
conn->send_buffer.set_capacity(max_send_buff_size);
- conn->seg_buff_size = seg_buff_size;
+ conn->recv_chunk_size = recv_chunk_size;
conn->max_recv_buff_size = max_recv_buff_size;
conn->fd = fd;
conn->cpool = this;
conn->mode = Conn::ACTIVE;
conn->addr = addr;
add_conn(conn);
- //SALTICIDAE_LOG_INFO("new %x", (uintptr_t)conn.get());
struct sockaddr_in sockin;
memset(&sockin, 0, sizeof(struct sockaddr_in));
diff --git a/src/network.cpp b/src/network.cpp
index e3eb56c..921ea03 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -43,8 +43,8 @@ void msgnetwork_config_conn_server_timeout(msgnetwork_config_t *self, double tim
self->conn_server_timeout(timeout);
}
-void msgnetwork_config_seg_buff_size(msgnetwork_config_t *self, size_t size) {
- self->seg_buff_size(size);
+void msgnetwork_config_recv_chunk_size(msgnetwork_config_t *self, size_t size) {
+ self->recv_chunk_size(size);
}
void msgnetwork_config_nworker(msgnetwork_config_t *self, size_t nworker) {
@@ -183,10 +183,6 @@ peernetwork_config_t *peernetwork_config_new() {
void peernetwork_config_free(const peernetwork_config_t *self) { delete self; }
-void peernetwork_config_retry_conn_delay(peernetwork_config_t *self, double t) {
- self->retry_conn_delay(t);
-}
-
void peernetwork_config_ping_period(peernetwork_config_t *self, double t) {
self->ping_period(t);
}
diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp
index 4e16e30..dca9cf4 100644
--- a/test/test_p2p_stress.cpp
+++ b/test/test_p2p_stress.cpp
@@ -107,7 +107,7 @@ struct AppContext {
std::unordered_map<NetAddr, TestContext> tc;
};
-void install_proto(AppContext &app, const size_t &seg_buff_size) {
+void install_proto(AppContext &app, const size_t &recv_chunk_size) {
auto &ec = app.ec;
auto &net = *app.net;
auto send_rand = [&](int size, const MyNet::conn_t &conn, TestContext &tc) {
@@ -157,7 +157,7 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) {
exit(1);
}
- if (tc.state == seg_buff_size * 2)
+ if (tc.state == recv_chunk_size * 2)
{
send_rand(tc.state, conn, tc);
tc.state = -1;
@@ -175,7 +175,7 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) {
SALTICIDAE_LOG_INFO("rand-bomboard phase, ending in %.2f secs", t);
}
else if (tc.state == -1)
- send_rand(rand() % (seg_buff_size * 10), conn, tc);
+ send_rand(rand() % (recv_chunk_size * 10), conn, tc);
else
send_rand(++tc.state, conn, tc);
});
@@ -192,14 +192,14 @@ int main(int argc, char **argv) {
Config config;
auto opt_no_msg = Config::OptValFlag::create(false);
auto opt_npeers = Config::OptValInt::create(5);
- auto opt_seg_buff_size = Config::OptValInt::create(4096);
+ auto opt_recv_chunk_size = Config::OptValInt::create(4096);
auto opt_nworker = Config::OptValInt::create(2);
auto opt_conn_timeout = Config::OptValDouble::create(5);
auto opt_ping_peroid = Config::OptValDouble::create(2);
auto opt_help = Config::OptValFlag::create(false);
config.add_opt("no-msg", opt_no_msg, Config::SWITCH_ON);
config.add_opt("npeers", opt_npeers, Config::SET_VAL);
- config.add_opt("seg-buff-size", opt_seg_buff_size, Config::SET_VAL);
+ config.add_opt("seg-buff-size", opt_recv_chunk_size, Config::SET_VAL);
config.add_opt("nworker", opt_nworker, Config::SET_VAL);
config.add_opt("conn-timeout", opt_conn_timeout, Config::SET_VAL);
config.add_opt("ping-period", opt_ping_peroid, Config::SET_VAL);
@@ -210,7 +210,7 @@ int main(int argc, char **argv) {
config.print_help();
exit(0);
}
- size_t seg_buff_size = opt_seg_buff_size->get();
+ size_t recv_chunk_size = opt_recv_chunk_size->get();
for (int i = 0; i < opt_npeers->get(); i++)
addrs.push_back(NetAddr("127.0.0.1:" + std::to_string(12345 + i)));
std::vector<AppContext> apps;
@@ -223,13 +223,13 @@ int main(int argc, char **argv) {
a.net = new MyNet(a.ec, MyNet::Config(
salticidae::ConnPool::Config()
.nworker(opt_nworker->get())
- .seg_buff_size(seg_buff_size))
+ .recv_chunk_size(recv_chunk_size))
.conn_timeout(opt_conn_timeout->get())
.ping_period(opt_ping_peroid->get())
.max_msg_size(65536));
a.tcall = new ThreadCall(a.ec);
if (!opt_no_msg->get())
- install_proto(a, seg_buff_size);
+ install_proto(a, recv_chunk_size);
a.net->start();
}