aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.rst69
-rw-r--r--include/salticidae/conn.h20
-rw-r--r--include/salticidae/network.h27
-rw-r--r--test/test_network.cpp63
4 files changed, 93 insertions, 86 deletions
diff --git a/README.rst b/README.rst
index 64298de..1645671 100644
--- a/README.rst
+++ b/README.rst
@@ -84,11 +84,10 @@ Example (MsgNetwork layer)
using salticidae::letoh;
using std::placeholders::_1;
using std::placeholders::_2;
- using opcode_t = uint8_t;
/** Hello Message. */
struct MsgHello {
- static const opcode_t opcode = 0x0;
+ static const uint8_t opcode = 0x0;
DataStream serialized;
std::string name;
std::string text;
@@ -111,66 +110,60 @@ Example (MsgNetwork layer)
/** Acknowledgement Message. */
struct MsgAck {
- static const opcode_t opcode = 0x1;
+ static const uint8_t opcode = 0x1;
DataStream serialized;
MsgAck() {}
MsgAck(DataStream &&s) {}
};
-
- const opcode_t MsgHello::opcode;
- const opcode_t MsgAck::opcode;
-
- using MsgNetworkByteOp = MsgNetwork<opcode_t>;
+
+ const uint8_t MsgHello::opcode;
+ const uint8_t MsgAck::opcode;
+
+ using MsgNetworkByteOp = MsgNetwork<uint8_t>;
struct MyNet: public MsgNetworkByteOp {
const std::string name;
const NetAddr peer;
-
+
MyNet(const salticidae::EventContext &ec,
const std::string name,
const NetAddr &peer):
- MsgNetwork<opcode_t>(ec, 10, 1.0, 4096),
+ MsgNetwork<uint8_t>(ec, 10, 1.0, 4096),
name(name),
peer(peer) {
/* message handler could be a bound method */
reg_handler(salticidae::handler_bind(
&MyNet::on_receive_hello, this, _1, _2));
- }
-
- struct Conn: public MsgNetworkByteOp::Conn {
- MyNet *get_net() { return static_cast<MyNet *>(get_pool()); }
- salticidae::RcObj<Conn> self() {
- return salticidae::static_pointer_cast<Conn>(
- MsgNetworkByteOp::Conn::self());
- }
- void on_setup() override {
- auto net = get_net();
- if (get_mode() == ACTIVE)
+ reg_conn_handler([this](const ConnPool::conn_t &conn) {
+ if (conn->get_fd() != -1)
{
- printf("[%s] Connected, sending hello.\n",
- net->name.c_str());
- /* send the first message through this connection */
- net->send_msg(MsgHello(net->name, "Hello there!"), self());
+ if (conn->get_mode() == ConnPool::Conn::ACTIVE)
+ {
+ printf("[%s] Connected, sending hello.\n",
+ this->name.c_str());
+ /* send the first message through this connection */
+ send_msg(MsgHello(this->name, "Hello there!"),
+ salticidae::static_pointer_cast<Conn>(conn));
+ }
+ else
+ printf("[%s] Accepted, waiting for greetings.\n",
+ this->name.c_str());
}
else
- printf("[%s] Passively connected, waiting for greetings.\n",
- net->name.c_str());
- }
- void on_teardown() override {
- auto net = get_net();
- printf("[%s] Disconnected, retrying.\n", net->name.c_str());
- /* try to reconnect to the same address */
- net->connect(get_addr());
- }
- };
- using conn_t = salticidae::RcObj<Conn>;
+ {
+ printf("[%s] Disconnected, retrying.\n", this->name.c_str());
+ /* try to reconnect to the same address */
+ connect(conn->get_addr());
+ }
+ });
+ }
salticidae::ConnPool::Conn *create_conn() override {
return new Conn();
}
- void on_receive_hello(MsgHello &&msg, conn_t conn) {
+ void on_receive_hello(MsgHello &&msg, MyNet::conn_t conn) {
printf("[%s] %s says %s\n",
name.c_str(),
msg.name.c_str(), msg.text.c_str());
@@ -181,7 +174,7 @@ Example (MsgNetwork layer)
void on_receive_ack(MsgAck &&msg, MyNet::conn_t conn) {
- auto net = conn->get_net();
+ auto net = static_cast<MyNet *>(conn->get_net());
printf("[%s] the peer knows\n", net->name.c_str());
}
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index aa414e8..f8156ee 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -146,6 +146,9 @@ class ConnPool {
class Conn;
/** The handle to a bi-directional connection. */
using conn_t = RcObj<Conn>;
+ /** The type of callback invoked when connection status is changed. */
+ using conn_callback_t = std::function<void(const conn_t &)>;
+
/** Abstraction for a bi-directional connection. */
class Conn {
friend ConnPool;
@@ -224,17 +227,25 @@ class ConnPool {
}
/** Called when new data is available. */
- virtual void on_read() = 0;
+ virtual void on_read() {
+ if (cpool->read_cb) cpool->read_cb(self());
+ }
/** Called when the underlying connection is established. */
- virtual void on_setup() = 0;
+ virtual void on_setup() {
+ if (cpool->conn_cb) cpool->conn_cb(self());
+ }
/** Called when the underlying connection breaks. */
- virtual void on_teardown() = 0;
+ virtual void on_teardown() {
+ if (cpool->conn_cb) cpool->conn_cb(self());
+ }
};
private:
int max_listen_backlog;
double conn_server_timeout;
size_t seg_buff_size;
+ conn_callback_t read_cb;
+ conn_callback_t conn_cb;
std::unordered_map<int, conn_t> pool;
int listen_fd;
@@ -275,6 +286,9 @@ class ConnPool {
* Does not need to be called if do not want to accept any passive
* connections. */
void listen(NetAddr listen_addr);
+
+ template<typename Func>
+ void reg_conn_handler(Func cb) { conn_cb = cb; }
};
}
diff --git a/include/salticidae/network.h b/include/salticidae/network.h
index 2bc445c..8ede187 100644
--- a/include/salticidae/network.h
+++ b/include/salticidae/network.h
@@ -79,9 +79,6 @@ class MsgNetwork: public ConnPool {
Msg msg;
MsgState msg_state;
- MsgNetwork *get_net() {
- return static_cast<MsgNetwork *>(get_pool());
- }
protected:
#ifdef SALTICIDAE_MSG_STAT
@@ -96,6 +93,10 @@ class MsgNetwork: public ConnPool {
#endif
{}
+ MsgNetwork *get_net() {
+ return static_cast<MsgNetwork *>(get_pool());
+ }
+
#ifdef SALTICIDAE_MSG_STAT
size_t get_nsent() const { return nsent; }
size_t get_nrecv() const { return nrecv; }
@@ -105,8 +106,6 @@ class MsgNetwork: public ConnPool {
protected:
void on_read() override;
- void on_setup() override {}
- void on_teardown() override {}
};
using conn_t = RcObj<Conn>;
@@ -182,13 +181,14 @@ class ClientNetwork: public MsgNetwork<OpcodeType> {
public:
class Conn: public MsgNet::Conn {
friend ClientNetwork;
- ClientNetwork *get_net() {
- return static_cast<ClientNetwork *>(ConnPool::Conn::get_pool());
- }
public:
Conn() = default;
+ ClientNetwork *get_net() {
+ return static_cast<ClientNetwork *>(ConnPool::Conn::get_pool());
+ }
+
protected:
void on_setup() override;
void on_teardown() override;
@@ -234,12 +234,14 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
friend PeerNetwork;
NetAddr peer_id;
Event ev_timeout;
+
+ public:
+ Conn() = default;
+
PeerNetwork *get_net() {
return static_cast<PeerNetwork *>(ConnPool::Conn::get_pool());
}
- public:
- Conn() = default;
const NetAddr &get_peer() { return peer_id; }
protected:
@@ -366,6 +368,7 @@ class PeerNetwork: public MsgNetwork<OpcodeType> {
template<typename OpcodeType>
void MsgNetwork<OpcodeType>::Conn::on_read() {
+ ConnPool::Conn::on_read();
auto &recv_buffer = read();
auto mn = get_net();
while (get_fd() != -1)
@@ -431,6 +434,7 @@ void PeerNetwork<O, _, __>::Peer::reset_conn(conn_t new_conn) {
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::Conn::on_setup() {
+ MsgNet::Conn::on_setup();
auto pn = get_net();
assert(!ev_timeout);
ev_timeout = Event(pn->ec, -1, 0, [this](evutil_socket_t, short) {
@@ -445,6 +449,7 @@ void PeerNetwork<O, _, __>::Conn::on_setup() {
template<typename O, O _, O __>
void PeerNetwork<O, _, __>::Conn::on_teardown() {
+ MsgNet::Conn::on_teardown();
auto pn = get_net();
auto it = pn->id2peer.find(peer_id);
if (it == pn->id2peer.end()) return;
@@ -645,6 +650,7 @@ const std::vector<NetAddr> &PeerNetwork<O, _, __>::all_peers() const {
template<typename OpcodeType>
void ClientNetwork<OpcodeType>::Conn::on_setup() {
+ MsgNet::Conn::on_setup();
assert(this->get_mode() == Conn::PASSIVE);
const auto &addr = this->get_addr();
auto cn = get_net();
@@ -656,6 +662,7 @@ void ClientNetwork<OpcodeType>::Conn::on_setup() {
template<typename OpcodeType>
void ClientNetwork<OpcodeType>::Conn::on_teardown() {
+ MsgNet::Conn::on_teardown();
assert(this->get_mode() == Conn::PASSIVE);
get_net()->addr2conn.erase(this->get_addr());
}
diff --git a/test/test_network.cpp b/test/test_network.cpp
index fd84c3b..8c1d0db 100644
--- a/test/test_network.cpp
+++ b/test/test_network.cpp
@@ -38,11 +38,10 @@ using salticidae::htole;
using salticidae::letoh;
using std::placeholders::_1;
using std::placeholders::_2;
-using opcode_t = uint8_t;
/** Hello Message. */
struct MsgHello {
- static const opcode_t opcode = 0x0;
+ static const uint8_t opcode = 0x0;
DataStream serialized;
std::string name;
std::string text;
@@ -65,16 +64,16 @@ struct MsgHello {
/** Acknowledgement Message. */
struct MsgAck {
- static const opcode_t opcode = 0x1;
+ static const uint8_t opcode = 0x1;
DataStream serialized;
MsgAck() {}
MsgAck(DataStream &&s) {}
};
-const opcode_t MsgHello::opcode;
-const opcode_t MsgAck::opcode;
+const uint8_t MsgHello::opcode;
+const uint8_t MsgAck::opcode;
-using MsgNetworkByteOp = MsgNetwork<opcode_t>;
+using MsgNetworkByteOp = MsgNetwork<uint8_t>;
struct MyNet: public MsgNetworkByteOp {
const std::string name;
@@ -83,48 +82,42 @@ struct MyNet: public MsgNetworkByteOp {
MyNet(const salticidae::EventContext &ec,
const std::string name,
const NetAddr &peer):
- MsgNetwork<opcode_t>(ec, 10, 1.0, 4096),
+ MsgNetwork<uint8_t>(ec, 10, 1.0, 4096),
name(name),
peer(peer) {
/* message handler could be a bound method */
reg_handler(salticidae::handler_bind(
&MyNet::on_receive_hello, this, _1, _2));
- }
-
- struct Conn: public MsgNetworkByteOp::Conn {
- MyNet *get_net() { return static_cast<MyNet *>(get_pool()); }
- salticidae::RcObj<Conn> self() {
- return salticidae::static_pointer_cast<Conn>(
- MsgNetworkByteOp::Conn::self());
- }
- void on_setup() override {
- auto net = get_net();
- if (get_mode() == ACTIVE)
+ reg_conn_handler([this](const ConnPool::conn_t &conn) {
+ if (conn->get_fd() != -1)
{
- printf("[%s] Connected, sending hello.\n",
- net->name.c_str());
- /* send the first message through this connection */
- net->send_msg(MsgHello(net->name, "Hello there!"), self());
+ if (conn->get_mode() == ConnPool::Conn::ACTIVE)
+ {
+ printf("[%s] Connected, sending hello.\n",
+ this->name.c_str());
+ /* send the first message through this connection */
+ send_msg(MsgHello(this->name, "Hello there!"),
+ salticidae::static_pointer_cast<Conn>(conn));
+ }
+ else
+ printf("[%s] Accepted, waiting for greetings.\n",
+ this->name.c_str());
}
else
- printf("[%s] Passively connected, waiting for greetings.\n",
- net->name.c_str());
- }
- void on_teardown() override {
- auto net = get_net();
- printf("[%s] Disconnected, retrying.\n", net->name.c_str());
- /* try to reconnect to the same address */
- net->connect(get_addr());
- }
- };
- using conn_t = salticidae::RcObj<Conn>;
+ {
+ printf("[%s] Disconnected, retrying.\n", this->name.c_str());
+ /* try to reconnect to the same address */
+ connect(conn->get_addr());
+ }
+ });
+ }
salticidae::ConnPool::Conn *create_conn() override {
return new Conn();
}
- void on_receive_hello(MsgHello &&msg, conn_t conn) {
+ void on_receive_hello(MsgHello &&msg, MyNet::conn_t conn) {
printf("[%s] %s says %s\n",
name.c_str(),
msg.name.c_str(), msg.text.c_str());
@@ -135,7 +128,7 @@ struct MyNet: public MsgNetworkByteOp {
void on_receive_ack(MsgAck &&msg, MyNet::conn_t conn) {
- auto net = conn->get_net();
+ auto net = static_cast<MyNet *>(conn->get_net());
printf("[%s] the peer knows\n", net->name.c_str());
}