aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/salticidae/conn.h73
-rw-r--r--include/salticidae/network.h150
-rw-r--r--include/salticidae/util.h1
-rw-r--r--src/conn.cpp4
-rw-r--r--src/network.cpp40
-rw-r--r--src/util.cpp1
-rw-r--r--test/bench_network.cpp2
-rw-r--r--test/bench_network_tls.cpp2
-rw-r--r--test/test_msgnet.cpp6
-rw-r--r--test/test_msgnet_c.c6
-rw-r--r--test/test_msgnet_tls.cpp6
-rw-r--r--test/test_p2p.cpp6
-rw-r--r--test/test_p2p_stress.cpp6
-rw-r--r--test/test_p2p_tls.cpp6
14 files changed, 163 insertions, 146 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index 7f74a87..0058274 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -60,7 +60,7 @@ class ConnPool {
/** The type of callback invoked when connection status is changed. */
using conn_callback_t = std::function<bool(const conn_t &, bool)>;
/** The type of callback invoked when an error occured (during async execution). */
- using error_callback_t = std::function<void(const std::exception_ptr, bool)>;
+ using error_callback_t = std::function<void(const std::exception_ptr, bool, int32_t)>;
/** Abstraction for a bi-directional connection. */
class Conn {
friend ConnPool;
@@ -149,12 +149,14 @@ class ConnPool {
using worker_error_callback_t = std::function<void(const std::exception_ptr err)>;
worker_error_callback_t disp_error_cb;
worker_error_callback_t worker_error_cb;
+ std::atomic<uint16_t> async_id;
+ int32_t gen_async_id() { return async_id.fetch_add(1, std::memory_order_relaxed); }
conn_t _connect(const NetAddr &addr);
void _listen(NetAddr listen_addr);
- void recoverable_error(const std::exception_ptr err) const {
- user_tcall->async_call([this, err](ThreadCall::Handle &) {
- if (error_cb) error_cb(err, false);
+ void recoverable_error(const std::exception_ptr err, int32_t id) const {
+ user_tcall->async_call([this, err, id](ThreadCall::Handle &) {
+ if (error_cb) error_cb(err, false, id);
});
}
@@ -279,7 +281,7 @@ class ConnPool {
else
conn->send_data_func(conn, fd, what);
} catch (...) {
- conn->cpool->recoverable_error(std::current_exception());
+ conn->cpool->recoverable_error(std::current_exception(), -1);
conn->cpool->worker_terminate(conn);
}
});
@@ -426,6 +428,7 @@ class ConnPool {
ConnPool(const EventContext &ec, const Config &config):
ec(ec),
enable_tls(config._enable_tls),
+ async_id(0),
max_listen_backlog(config._max_listen_backlog),
conn_server_timeout(config._conn_server_timeout),
seg_buff_size(config._seg_buff_size),
@@ -459,8 +462,8 @@ class ConnPool {
disp_error_cb = [this](const std::exception_ptr err) {
user_tcall->async_call([this, err](ThreadCall::Handle &) {
stop_workers();
- std::rethrow_exception(err);
- //if (error_cb) error_cb(err, true);
+ //std::rethrow_exception(err);
+ if (error_cb) error_cb(err, true, -1);
});
disp_ec.stop();
workers[0].stop_tcall();
@@ -528,36 +531,36 @@ class ConnPool {
}
/** Actively connect to remote addr. */
- conn_t connect(const NetAddr &addr, bool blocking = true) {
- if (blocking)
- {
- auto ret = *(static_cast<std::pair<conn_t, std::exception_ptr> *>(
- disp_tcall->call([this, addr](ThreadCall::Handle &h) {
- conn_t conn;
- std::exception_ptr err = nullptr;
- try {
- conn = _connect(addr);
- } catch (...) {
- err = std::current_exception();
- }
- h.set_result(std::make_pair(std::move(conn), err));
- }).get()));
- if (ret.second) std::rethrow_exception(ret.second);
- return std::move(ret.first);
- }
- else
- {
- disp_tcall->async_call([this, addr](ThreadCall::Handle &) {
- try {
- _connect(addr);
- } catch (...) {
- disp_error_cb(std::current_exception());
- }
- });
- return nullptr;
- }
+ conn_t connect_sync(const NetAddr &addr) {
+ auto ret = *(static_cast<std::pair<conn_t, std::exception_ptr> *>(
+ disp_tcall->call([this, addr](ThreadCall::Handle &h) {
+ conn_t conn;
+ std::exception_ptr err = nullptr;
+ try {
+ conn = _connect(addr);
+ } catch (...) {
+ err = std::current_exception();
+ }
+ h.set_result(std::make_pair(std::move(conn), err));
+ }).get()));
+ if (ret.second) std::rethrow_exception(ret.second);
+ return std::move(ret.first);
}
+ /** Actively connect to remote addr (async). */
+ int32_t connect(const NetAddr &addr) {
+ auto id = gen_async_id();
+ disp_tcall->async_call([this, addr, id](ThreadCall::Handle &) {
+ try {
+ _connect(addr);
+ } catch (...) {
+ this->recoverable_error(std::current_exception(), id);
+ }
+ });
+ return id;
+ }
+
+
/** Listen for passive connections (connection initiated from remote).
* Does not need to be called if do not want to accept any passive
* connections. */
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index c59bbb3..4c5fea6 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -110,8 +110,6 @@ class MsgNetwork: public ConnPool {
nrecvb.store(0, std::memory_order_relaxed);
}
#endif
-
- protected:
};
using conn_t = ArcObj<Conn>;
@@ -127,7 +125,6 @@ class MsgNetwork: public ConnPool {
queue_t incoming_msgs;
protected:
-
ConnPool::Conn *create_conn() override { return new Conn(); }
void on_read(const ConnPool::conn_t &) override;
@@ -201,16 +198,16 @@ class MsgNetwork: public ConnPool {
}
template<typename MsgType>
- inline void send_msg(const MsgType &msg, const conn_t &conn);
- inline void _send_msg(const Msg &msg, const conn_t &conn);
+ inline bool send_msg(const MsgType &msg, const conn_t &conn);
+ inline bool _send_msg(const Msg &msg, const conn_t &conn);
template<typename MsgType>
- inline void send_msg_deferred(MsgType &&msg, const conn_t &conn);
- inline void _send_msg_deferred(Msg &&msg, const conn_t &conn);
+ inline int32_t send_msg_deferred(MsgType &&msg, const conn_t &conn);
+ inline int32_t _send_msg_deferred(Msg &&msg, const conn_t &conn);
void stop() { stop_workers(); }
using ConnPool::listen;
- conn_t connect(const NetAddr &addr, bool blocking = true) {
- return static_pointer_cast<Conn>(ConnPool::connect(addr, blocking));
+ conn_t connect_sync(const NetAddr &addr) {
+ return static_pointer_cast<Conn>(ConnPool::connect_sync(addr));
}
};
@@ -248,11 +245,11 @@ class ClientNetwork: public MsgNetwork<OpcodeType> {
using MsgNet::send_msg;
template<typename MsgType>
- inline void send_msg(const MsgType &msg, const NetAddr &addr);
- inline void _send_msg(const Msg &msg, const NetAddr &addr);
+ inline bool send_msg(const MsgType &msg, const NetAddr &addr);
+ inline bool _send_msg(const Msg &msg, const NetAddr &addr);
template<typename MsgType>
- inline void send_msg_deferred(MsgType &&msg, const NetAddr &addr);
- inline void _send_msg_deferred(Msg &&msg, const NetAddr &addr);
+ inline int32_t send_msg_deferred(MsgType &&msg, const NetAddr &addr);
+ inline int32_t _send_msg_deferred(Msg &&msg, const NetAddr &addr);
};
/** Peer-to-peer network where any two nodes could hold a bi-diretional message
@@ -501,21 +498,21 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
virtual ~PeerNetwork() { this->stop(); }
- void add_peer(const NetAddr &addr);
- void del_peer(const NetAddr &addr);
+ int32_t add_peer(const NetAddr &addr);
+ int32_t del_peer(const NetAddr &addr);
bool has_peer(const NetAddr &addr) const;
size_t get_npending() const;
conn_t get_peer_conn(const NetAddr &addr) const;
using MsgNet::send_msg;
template<typename MsgType>
- inline void send_msg(const MsgType &msg, const NetAddr &addr);
- inline void _send_msg(const Msg &msg, const NetAddr &addr);
+ inline bool send_msg(const MsgType &msg, const NetAddr &addr);
+ inline bool _send_msg(const Msg &msg, const NetAddr &addr);
template<typename MsgType>
- inline void send_msg_deferred(MsgType &&msg, const NetAddr &addr);
- inline void _send_msg_deferred(Msg &&msg, const NetAddr &addr);
+ inline int32_t send_msg_deferred(MsgType &&msg, const NetAddr &addr);
+ inline int32_t _send_msg_deferred(Msg &&msg, const NetAddr &addr);
template<typename MsgType>
- void multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs);
- inline void _multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs);
+ inline int32_t multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs);
+ inline int32_t _multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs);
void listen(NetAddr listen_addr);
conn_t connect(const NetAddr &addr) = delete;
@@ -564,28 +561,31 @@ void MsgNetwork<OpcodeType>::on_read(const ConnPool::conn_t &_conn) {
template<typename OpcodeType>
template<typename MsgType>
-inline void MsgNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const conn_t &conn) {
+inline int32_t MsgNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const conn_t &conn) {
return _send_msg_deferred(std::move(msg), conn);
}
template<typename OpcodeType>
-inline void MsgNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const conn_t &conn) {
+inline int32_t MsgNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const conn_t &conn) {
+ auto id = this->gen_async_id();
this->disp_tcall->async_call(
- [this, msg=std::move(msg), conn](ThreadCall::Handle &) {
+ [this, msg=std::move(msg), conn, id](ThreadCall::Handle &) {
try {
- this->_send_msg(msg, conn);
- } catch (...) { this->recoverable_error(std::current_exception()); }
+ if (!_send_msg(msg, conn))
+ throw SalticidaeError(SALTI_ERROR_CONN_NOT_READY);
+ } catch (...) { this->recoverable_error(std::current_exception(), id); }
});
+ return id;
}
template<typename OpcodeType>
template<typename MsgType>
-inline void MsgNetwork<OpcodeType>::send_msg(const MsgType &msg, const conn_t &conn) {
+inline bool MsgNetwork<OpcodeType>::send_msg(const MsgType &msg, const conn_t &conn) {
return _send_msg(msg, conn);
}
template<typename OpcodeType>
-inline void MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn) {
+inline bool MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn) {
bytearray_t msg_data = msg.serialize();
SALTICIDAE_LOG_DEBUG("wrote message %s to %s",
std::string(msg).c_str(),
@@ -594,7 +594,7 @@ inline void MsgNetwork<OpcodeType>::_send_msg(const Msg &msg, const conn_t &conn
conn->nsent++;
conn->nsentb += msg.get_length();
#endif
- conn->write(std::move(msg_data));
+ return conn->write(std::move(msg_data));
}
template<typename O, O _, O __>
@@ -993,8 +993,9 @@ void PeerNetwork<O, _, __>::listen(NetAddr _listen_addr) {
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
- this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) {
+int32_t PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
+ auto id = this->gen_async_id();
+ this->disp_tcall->async_call([this, addr, id](ThreadCall::Handle &) {
try {
pinfo_ulock_t _g(known_peers_lock);
if (known_peers.count(addr))
@@ -1007,14 +1008,16 @@ void PeerNetwork<O, _, __>::add_peer(const NetAddr &addr) {
conn = it->second;
known_peers.insert(std::make_pair(addr, new Peer(conn)));
} catch (const PeerNetworkError &) {
- this->recoverable_error(std::current_exception());
+ this->recoverable_error(std::current_exception(), id);
} catch (...) { this->disp_error_cb(std::current_exception()); }
});
+ return id;
}
template<typename O, O _, O __>
-void PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) {
- this->disp_tcall->async_call([this, addr](ThreadCall::Handle &) {
+int32_t PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) {
+ auto id = this->gen_async_id();
+ this->disp_tcall->async_call([this, addr, id](ThreadCall::Handle &) {
try {
pinfo_ulock_t _g(known_peers_lock);
pinfo_ulock_t __g(pid2peer_lock);
@@ -1038,9 +1041,10 @@ void PeerNetwork<O, _, __>::del_peer(const NetAddr &addr) {
pid2peer.erase(it3);
}
} catch (const PeerNetworkError &) {
- this->recoverable_error(std::current_exception());
+ this->recoverable_error(std::current_exception(), id);
} catch (...) { this->disp_error_cb(std::current_exception()); }
});
+ return id;
}
template<typename O, O _, O __>
@@ -1064,8 +1068,6 @@ PeerNetwork<O, _, __>::get_peer_conn(const NetAddr &addr) const {
auto it2 = pid2peer.find(it->second->peer_id);
assert(it2 != pid2peer.end());
conn = it2->second->conn;
- } catch (const PeerNetworkError &) {
- this->recoverable_error(std::current_exception());
} catch (...) {
err = std::current_exception();
}
@@ -1094,50 +1096,54 @@ size_t PeerNetwork<O, _, __>::get_npending() const {
template<typename O, O _, O __>
template<typename MsgType>
-inline void PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) {
+inline int32_t PeerNetwork<O, _, __>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) {
return _send_msg_deferred(std::move(msg), addr);
}
template<typename O, O _, O __>
-inline void PeerNetwork<O, _, __>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) {
+inline int32_t PeerNetwork<O, _, __>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) {
+ auto id = this->gen_async_id();
this->disp_tcall->async_call(
- [this, msg=std::move(msg), addr](ThreadCall::Handle &) {
+ [this, msg=std::move(msg), addr, id](ThreadCall::Handle &) {
try {
- _send_msg(msg, addr);
- } catch (...) { this->recoverable_error(std::current_exception()); }
+ if (!_send_msg(msg, addr))
+ throw PeerNetworkError(SALTI_ERROR_CONN_NOT_READY);
+ } catch (...) { this->recoverable_error(std::current_exception(), id); }
});
+ return id;
}
template<typename O, O _, O __>
template<typename MsgType>
-inline void PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &addr) {
+inline bool PeerNetwork<O, _, __>::send_msg(const MsgType &msg, const NetAddr &addr) {
return _send_msg(msg, addr);
}
template<typename O, O _, O __>
-inline void PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const NetAddr &addr) {
+inline bool PeerNetwork<O, _, __>::_send_msg(const Msg &msg, const NetAddr &addr) {
pinfo_slock_t _g(known_peers_lock);
- pinfo_slock_t __g(pid2peer_lock);
- MsgNet::_send_msg(msg, _get_peer_conn(addr));
+ return MsgNet::_send_msg(msg, _get_peer_conn(addr));
}
template<typename O, O _, O __>
template<typename MsgType>
-inline void PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs) {
+inline int32_t PeerNetwork<O, _, __>::multicast_msg(MsgType &&msg, const std::vector<NetAddr> &addrs) {
return _multicast_msg(MsgType(std::move(msg)), addrs);
}
template<typename O, O _, O __>
-inline void PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs) {
+inline int32_t PeerNetwork<O, _, __>::_multicast_msg(Msg &&msg, const std::vector<NetAddr> &addrs) {
+ auto id = this->gen_async_id();
this->disp_tcall->async_call(
- [this, msg=std::move(msg), addrs](ThreadCall::Handle &) {
+ [this, msg=std::move(msg), addrs, id](ThreadCall::Handle &) {
try {
+ bool succ = true;
for (auto &addr: addrs)
- MsgNet::_send_msg(msg, _get_peer_conn(addr));
- } catch (const PeerNetworkError &) {
- this->recoverable_error(std::current_exception());
- } catch (...) { this->recoverable_error(std::current_exception()); }
+ succ &= MsgNet::_send_msg(msg, _get_peer_conn(addr));
+ if (!succ) throw PeerNetworkError(SALTI_ERROR_CONN_NOT_READY);
+ } catch (...) { this->recoverable_error(std::current_exception(), id); }
});
+ return id;
}
/* end: functions invoked by the user loop */
@@ -1162,33 +1168,34 @@ void ClientNetwork<OpcodeType>::on_teardown(const ConnPool::conn_t &_conn) {
template<typename OpcodeType>
template<typename MsgType>
-inline void ClientNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) {
+inline int32_t ClientNetwork<OpcodeType>::send_msg_deferred(MsgType &&msg, const NetAddr &addr) {
return _send_msg_deferred(std::move(msg), addr);
}
template<typename OpcodeType>
-inline void ClientNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) {
+inline int32_t ClientNetwork<OpcodeType>::_send_msg_deferred(Msg &&msg, const NetAddr &addr) {
+ auto id = this->gen_async_id();
this->disp_tcall->async_call(
- [this, msg=std::move(msg), addr](ThreadCall::Handle &) {
+ [this, msg=std::move(msg), addr, id](ThreadCall::Handle &) {
try {
_send_msg(msg, addr);
- } catch (...) { this->recoverable_error(std::current_exception()); }
+ } catch (...) { this->recoverable_error(std::current_exception(), id); }
});
+ return id;
}
template<typename OpcodeType>
template<typename MsgType>
-inline void ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr) {
+inline bool ClientNetwork<OpcodeType>::send_msg(const MsgType &msg, const NetAddr &addr) {
return _send_msg(msg, addr);
}
template<typename OpcodeType>
-inline void ClientNetwork<OpcodeType>::_send_msg(const Msg &msg, const NetAddr &addr) {
+inline bool ClientNetwork<OpcodeType>::_send_msg(const Msg &msg, const NetAddr &addr) {
auto it = addr2conn.find(addr);
- if (it != addr2conn.end())
- MsgNet::_send_msg(msg, it->second);
- else
+ if (it == addr2conn.end())
throw ClientNetworkError(SALTI_ERROR_PEER_NOT_EXIST);
+ return MsgNet::_send_msg(msg, it->second);
}
template<typename O, O OPCODE_PING, O _>
@@ -1264,9 +1271,10 @@ void msgnetwork_config_tls_cert_by_move(msgnetwork_config_t *self, x509_t *cert)
msgnetwork_t *msgnetwork_new(const eventcontext_t *ec, const msgnetwork_config_t *config, SalticidaeCError *err);
void msgnetwork_free(const msgnetwork_t *self);
-void msgnetwork_send_msg(msgnetwork_t *self, const msg_t *msg, const msgnetwork_conn_t *conn);
-void msgnetwork_send_msg_deferred_by_move(msgnetwork_t *self, msg_t *_moved_msg, const msgnetwork_conn_t *conn);
-msgnetwork_conn_t *msgnetwork_connect(msgnetwork_t *self, const netaddr_t *addr, bool blocking, SalticidaeCError *err);
+bool msgnetwork_send_msg(msgnetwork_t *self, const msg_t *msg, const msgnetwork_conn_t *conn);
+int32_t msgnetwork_send_msg_deferred_by_move(msgnetwork_t *self, msg_t *_moved_msg, const msgnetwork_conn_t *conn);
+msgnetwork_conn_t *msgnetwork_connect_sync(msgnetwork_t *self, const netaddr_t *addr, SalticidaeCError *err);
+int32_t msgnetwork_connect(msgnetwork_t *self, const netaddr_t *addr);
msgnetwork_conn_t *msgnetwork_conn_copy(const msgnetwork_conn_t *self);
void msgnetwork_conn_free(const msgnetwork_conn_t *self);
void msgnetwork_listen(msgnetwork_t *self, const netaddr_t *listen_addr, SalticidaeCError *err);
@@ -1281,7 +1289,7 @@ typedef bool (*msgnetwork_conn_callback_t)(const msgnetwork_conn_t *, bool conne
void msgnetwork_reg_conn_handler(msgnetwork_t *self, msgnetwork_conn_callback_t cb, void *userdata);
-typedef void (*msgnetwork_error_callback_t)(const SalticidaeCError *, bool fatal, void *userdata);
+typedef void (*msgnetwork_error_callback_t)(const SalticidaeCError *, bool fatal, int32_t async_id, void *userdata);
void msgnetwork_reg_error_handler(msgnetwork_t *self, msgnetwork_error_callback_t cb, void *userdata);
msgnetwork_t *msgnetwork_conn_get_net(const msgnetwork_conn_t *conn);
@@ -1302,8 +1310,8 @@ msgnetwork_config_t *peernetwork_config_as_msgnetwork_config(peernetwork_config_
peernetwork_t *peernetwork_new(const eventcontext_t *ec, const peernetwork_config_t *config, SalticidaeCError *err);
void peernetwork_free(const peernetwork_t *self);
-void peernetwork_add_peer(peernetwork_t *self, const netaddr_t *addr);
-void peernetwork_del_peer(peernetwork_t *self, const netaddr_t *addr);
+int32_t peernetwork_add_peer(peernetwork_t *self, const netaddr_t *addr);
+int32_t peernetwork_del_peer(peernetwork_t *self, const netaddr_t *addr);
bool peernetwork_has_peer(const peernetwork_t *self, const netaddr_t *addr);
const peernetwork_conn_t *peernetwork_get_peer_conn(const peernetwork_t *self, const netaddr_t *addr, SalticidaeCError *cerror);
msgnetwork_t *peernetwork_as_msgnetwork(peernetwork_t *self);
@@ -1314,8 +1322,8 @@ peernetwork_conn_t *peernetwork_conn_copy(const peernetwork_conn_t *self);
netaddr_t *peernetwork_conn_get_peer_addr(const peernetwork_conn_t *self);
void peernetwork_conn_free(const peernetwork_conn_t *self);
bool peernetwork_send_msg(peernetwork_t *self, const msg_t * msg, const netaddr_t *addr);
-void peernetwork_send_msg_deferred_by_move(peernetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr);
-void peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const netaddr_array_t *addrs);
+int32_t peernetwork_send_msg_deferred_by_move(peernetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr);
+int32_t peernetwork_multicast_msg_by_move(peernetwork_t *self, msg_t *_moved_msg, const netaddr_array_t *addrs);
void peernetwork_listen(peernetwork_t *self, const netaddr_t *listen_addr, SalticidaeCError *err);
typedef void (*peernetwork_peer_callback_t)(const peernetwork_conn_t *, bool connected, void *userdata);
@@ -1335,7 +1343,7 @@ clientnetwork_conn_t *clientnetwork_conn_new_from_msgnetwork_conn_unsafe(const m
clientnetwork_conn_t *clientnetwork_conn_copy(const clientnetwork_conn_t *self);
void clientnetwork_conn_free(const clientnetwork_conn_t *self);
bool clientnetwork_send_msg(clientnetwork_t *self, const msg_t * msg, const netaddr_t *addr);
-void clientnetwork_send_msg_deferred_by_move(clientnetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr);
+int32_t clientnetwork_send_msg_deferred_by_move(clientnetwork_t *self, msg_t * _moved_msg, const netaddr_t *addr);
#ifdef __cplusplus
}
diff --git a/include/salticidae/util.h b/include/salticidae/util.h
index 063058c..017d301 100644
--- a/include/salticidae/util.h
+++ b/include/salticidae/util.h
@@ -101,6 +101,7 @@ enum SalticidaeErrorCode {
SALTI_ERROR_TLS_NO_PEER_CERT,
SALTI_ERROR_FD,
SALTI_ERROR_RAND_SOURCE,
+ SALTI_ERROR_CONN_NOT_READY,
SALTI_ERROR_UNKNOWN
};
diff --git a/src/conn.cpp b/src/conn.cpp
index abea7ac..13486ee 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -303,7 +303,7 @@ void ConnPool::accept_client(int fd, int) {
conn->worker = &worker;
worker.feed(conn, client_fd);
}
- } catch (...) { recoverable_error(std::current_exception()); }
+ } catch (...) { recoverable_error(std::current_exception(), -1); }
}
void ConnPool::conn_server(const conn_t &conn, int fd, int events) {
@@ -324,7 +324,7 @@ void ConnPool::conn_server(const conn_t &conn, int fd, int events) {
}
} catch (...) {
disp_terminate(conn);
- recoverable_error(std::current_exception());
+ recoverable_error(std::current_exception(), -1);
}
}
diff --git a/src/network.cpp b/src/network.cpp
index 05e7b50..2e98439 100644
--- a/src/network.cpp
+++ b/src/network.cpp
@@ -67,22 +67,26 @@ void msgnetwork_config_tls_cert_by_move(msgnetwork_config_t *self, x509_t *cert)
self->tls_cert(new x509_t(std::move(*cert)));
}
-void msgnetwork_send_msg(msgnetwork_t *self, const msg_t *msg, const msgnetwork_conn_t *conn) {
- self->_send_msg(*msg, *conn);
+bool msgnetwork_send_msg(msgnetwork_t *self, const msg_t *msg, const msgnetwork_conn_t *conn) {
+ return self->_send_msg(*msg, *conn);
}
-void msgnetwork_send_msg_deferred_by_move(msgnetwork_t *self,
+int32_t msgnetwork_send_msg_deferred_by_move(msgnetwork_t *self,
msg_t *_moved_msg, const msgnetwork_conn_t *conn) {
- self->_send_msg_deferred(std::move(*_moved_msg), *conn);
+ return self->_send_msg_deferred(std::move(*_moved_msg), *conn);
}
-msgnetwork_conn_t *msgnetwork_connect(msgnetwork_t *self, const netaddr_t *addr, bool blocking, SalticidaeCError *cerror) {
+msgnetwork_conn_t *msgnetwork_connect_sync(msgnetwork_t *self, const netaddr_t *addr, SalticidaeCError *cerror) {
SALTICIDAE_CERROR_TRY(cerror)
- return new msgnetwork_conn_t(self->connect(*addr, blocking));
+ return new msgnetwork_conn_t(self->connect_sync(*addr));
SALTICIDAE_CERROR_CATCH(cerror)
return nullptr;
}
+int32_t msgnetwork_connect(msgnetwork_t *self, const netaddr_t *addr) {
+ return self->connect(*addr);
+}
+
msgnetwork_conn_t *msgnetwork_conn_copy(const msgnetwork_conn_t *self) {
return new msgnetwork_conn_t(*self);
}
@@ -126,7 +130,7 @@ void msgnetwork_reg_conn_handler(msgnetwork_t *self,
void msgnetwork_reg_error_handler(msgnetwork_t *self,
msgnetwork_error_callback_t cb,
void *userdata) {
- self->reg_error_handler([=](const std::exception_ptr _err, bool fatal) {
+ self->reg_error_handler([=](const std::exception_ptr _err, bool fatal, int32_t async_id) {
SalticidaeCError cerror;
try {
std::rethrow_exception(_err);
@@ -135,7 +139,7 @@ void msgnetwork_reg_error_handler(msgnetwork_t *self,
} catch (...) {
cerror = salticidae_cerror_unknown();
}
- cb(&cerror, fatal, userdata);
+ cb(&cerror, fatal, async_id, userdata);
});
}
@@ -194,12 +198,12 @@ peernetwork_t *peernetwork_new(const eventcontext_t *ec, const peernetwork_confi
void peernetwork_free(const peernetwork_t *self) { delete self; }
-void peernetwork_add_peer(peernetwork_t *self, const netaddr_t *addr) {
- self->add_peer(*addr);
+int32_t peernetwork_add_peer(peernetwork_t *self, const netaddr_t *addr) {
+ return self->add_peer(*addr);
}
-void peernetwork_del_peer(peernetwork_t *self, const netaddr_t *addr) {
- self->del_peer(*addr);
+int32_t peernetwork_del_peer(peernetwork_t *self, const netaddr_t *addr) {
+ return self->del_peer(*addr);
}
bool peernetwork_has_peer(const peernetwork_t *self, const netaddr_t *addr) {
@@ -248,14 +252,14 @@ bool peernetwork_send_msg(peernetwork_t *self, const msg_t * msg, const netaddr_
}
}
-void peernetwork_send_msg_deferred_by_move(peernetwork_t *self,
+int32_t peernetwork_send_msg_deferred_by_move(peernetwork_t *self,
msg_t *_moved_msg, const netaddr_t *addr) {
- self->_send_msg_deferred(std::move(*_moved_msg), *addr);
+ return self->_send_msg_deferred(std::move(*_moved_msg), *addr);
}
-void peernetwork_multicast_msg_by_move(peernetwork_t *self,
+int32_t peernetwork_multicast_msg_by_move(peernetwork_t *self,
msg_t *_moved_msg, const netaddr_array_t *addrs) {
- self->_multicast_msg(std::move(*_moved_msg), *addrs);
+ return self->_multicast_msg(std::move(*_moved_msg), *addrs);
}
void peernetwork_listen(peernetwork_t *self, const netaddr_t *listen_addr, SalticidaeCError *cerror) {
@@ -318,8 +322,8 @@ bool clientnetwork_send_msg(clientnetwork_t *self, const msg_t * msg, const neta
}
}
-void clientnetwork_send_msg_deferred_by_move(clientnetwork_t *self, msg_t *_moved_msg, const netaddr_t *addr) {
- self->_send_msg_deferred(std::move(*_moved_msg), *addr);
+int32_t clientnetwork_send_msg_deferred_by_move(clientnetwork_t *self, msg_t *_moved_msg, const netaddr_t *addr) {
+ return self->_send_msg_deferred(std::move(*_moved_msg), *addr);
}
}
diff --git a/src/util.cpp b/src/util.cpp
index ff41c9b..0fb990e 100644
--- a/src/util.cpp
+++ b/src/util.cpp
@@ -58,6 +58,7 @@ const char *SALTICIDAE_ERROR_STRINGS[] = {
"tls fail to get peer cert",
"fd error",
"rand source is not available, try again",
+ "connection is not ready",
"unknown error"
};
diff --git a/test/bench_network.cpp b/test/bench_network.cpp
index 5575a66..1021ec4 100644
--- a/test/bench_network.cpp
+++ b/test/bench_network.cpp
@@ -117,7 +117,7 @@ struct MyNet: public MsgNetworkByteOp {
{
printf("[%s] disconnected, retrying.\n", this->name.c_str());
/* try to reconnect to the same address */
- connect(conn->get_addr(), false);
+ connect(conn->get_addr());
}
return true;
});
diff --git a/test/bench_network_tls.cpp b/test/bench_network_tls.cpp
index b5280b4..7c682ba 100644
--- a/test/bench_network_tls.cpp
+++ b/test/bench_network_tls.cpp
@@ -119,7 +119,7 @@ struct MyNet: public MsgNetworkByteOp {
{
printf("[%s] disconnected, retrying.\n", this->name.c_str());
/* try to reconnect to the same address */
- connect(conn->get_addr(), false);
+ connect(conn->get_addr());
}
return true;
});
diff --git a/test/test_msgnet.cpp b/test/test_msgnet.cpp
index 50e35dc..9b52f85 100644
--- a/test/test_msgnet.cpp
+++ b/test/test_msgnet.cpp
@@ -108,7 +108,7 @@ struct MyNet: public MsgNetworkByteOp {
{
printf("[%s] disconnected, retrying.\n", this->name.c_str());
/* try to reconnect to the same address */
- connect(conn->get_addr(), false);
+ connect(conn->get_addr());
}
return true;
});
@@ -148,8 +148,8 @@ int main() {
bob.listen(bob_addr);
/* try to connect once */
- alice.connect(bob_addr, false);
- bob.connect(alice_addr, false);
+ alice.connect(bob_addr);
+ bob.connect(alice_addr);
/* the main loop can be shutdown by ctrl-c or kill */
auto shutdown = [&](int) {ec.stop();};
diff --git a/test/test_msgnet_c.c b/test/test_msgnet_c.c
index 1417d2b..2ee9443 100644
--- a/test/test_msgnet_c.c
+++ b/test/test_msgnet_c.c
@@ -139,7 +139,7 @@ bool conn_handler(const msgnetwork_conn_t *conn, bool connected, void *userdata)
printf("[%s] Disconnected, retrying.\n", name);
/* try to reconnect to the same address */
const netaddr_t *addr = msgnetwork_conn_get_addr(conn);
- msgnetwork_connect(net, addr, false, &err); check_err(&err);
+ msgnetwork_connect(net, addr);
}
return true;
}
@@ -194,8 +194,8 @@ int main() {
msgnetwork_listen(bob.net, bob_addr, &err); check_err(&err);
/* try to connect once */
- msgnetwork_conn_free(msgnetwork_connect(alice.net, bob_addr, false, &err)); check_err(&err);
- msgnetwork_conn_free(msgnetwork_connect(bob.net, alice_addr, false, &err)); check_err(&err);
+ msgnetwork_connect(alice.net, bob_addr);
+ msgnetwork_connect(bob.net, alice_addr);
netaddr_free(alice_addr);
netaddr_free(bob_addr);
diff --git a/test/test_msgnet_tls.cpp b/test/test_msgnet_tls.cpp
index a779ba5..07fe6ac 100644
--- a/test/test_msgnet_tls.cpp
+++ b/test/test_msgnet_tls.cpp
@@ -121,7 +121,7 @@ struct MyNet: public MsgNetworkByteOp {
{
printf("[%s] disconnected, retrying.\n", this->name.c_str());
/* try to reconnect to the same address */
- connect(conn->get_addr(), false);
+ connect(conn->get_addr());
}
return res;
});
@@ -161,8 +161,8 @@ int main() {
bob.listen(bob_addr);
/* try to connect once */
- alice.connect(bob_addr, false);
- bob.connect(alice_addr, false);
+ alice.connect(bob_addr);
+ bob.connect(alice_addr);
/* the main loop can be shutdown by ctrl-c or kill */
auto shutdown = [&](int) {ec.stop();};
diff --git a/test/test_p2p.cpp b/test/test_p2p.cpp
index 3a30ed2..d097562 100644
--- a/test/test_p2p.cpp
+++ b/test/test_p2p.cpp
@@ -80,12 +80,12 @@ struct Net {
net->reg_handler([this](const MsgText &msg, const PeerNetwork::conn_t &) {
fprintf(stdout, "net %lu: peer %lu says %s\n", this->id, msg.id, msg.text.c_str());
});
- net->reg_error_handler([this](const std::exception_ptr _err, bool fatal) {
+ net->reg_error_handler([this](const std::exception_ptr _err, bool fatal, int32_t async_id) {
try {
std::rethrow_exception(_err);
} catch (const std::exception &err) {
- fprintf(stdout, "net %lu: captured %s error during an async call: %s\n",
- this->id, fatal ? "fatal" : "recoverable", err.what());
+ fprintf(stdout, "net %lu: captured %s error during an async call %d: %s\n",
+ this->id, fatal ? "fatal" : "recoverable", async_id, err.what());
}
});
net->reg_peer_handler([this](const PeerNetwork::conn_t &conn, bool connected) {
diff --git a/test/test_p2p_stress.cpp b/test/test_p2p_stress.cpp
index fc9a430..9fe1b83 100644
--- a/test/test_p2p_stress.cpp
+++ b/test/test_p2p_stress.cpp
@@ -128,12 +128,12 @@ void install_proto(AppContext &app, const size_t &seg_buff_size) {
send_rand(tc.state, conn, tc);
}
});
- net.reg_error_handler([ec](const std::exception_ptr _err, bool fatal) {
+ net.reg_error_handler([ec](const std::exception_ptr _err, bool fatal, int32_t async_id) {
try {
std::rethrow_exception(_err);
} catch (const std::exception & err) {
- SALTICIDAE_LOG_WARN("captured %s error: %s",
- fatal ? "fatal" : "recoverable", err.what());
+ SALTICIDAE_LOG_WARN("captured %s error during async call %d: %s",
+ fatal ? "fatal" : "recoverable", async_id, err.what());
}
});
net.reg_handler([&](MsgRand &&msg, const MyNet::conn_t &conn) {
diff --git a/test/test_p2p_tls.cpp b/test/test_p2p_tls.cpp
index f814643..698bbac 100644
--- a/test/test_p2p_tls.cpp
+++ b/test/test_p2p_tls.cpp
@@ -105,12 +105,12 @@ struct Net {
}
return true;
});
- net->reg_error_handler([this](const std::exception_ptr _err, bool fatal) {
+ net->reg_error_handler([this](const std::exception_ptr _err, bool fatal, int32_t async_id) {
try {
std::rethrow_exception(_err);
} catch (const std::exception &err) {
- fprintf(stdout, "net %lu: captured %s error during an async call: %s\n",
- this->id, fatal ? "fatal" : "recoverable", err.what());
+ fprintf(stdout, "net %lu: captured %s error during an async call %d: %s\n",
+ this->id, fatal ? "fatal" : "recoverable", async_id, err.what());
}
});
net->reg_peer_handler([this](const PeerNetwork::conn_t &conn, bool connected) {