/**
* Copyright (c) 2018 Cornell University.
*
* Author: Ted Yin <tederminant@gmail.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is furnished to do
* so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#ifndef _SALTICIDAE_NETWORK_H
#define _SALTICIDAE_NETWORK_H
#include "salticidae/netaddr.h"
#include "salticidae/msg.h"
#include "salticidae/conn.h"
namespace salticidae {
/** Network of nodes who can send async messages. */
template<typename MsgType>
class MsgNetwork: public ConnPool {
public:
class Conn: public ConnPool::Conn {
enum MsgState {
HEADER,
PAYLOAD
};
MsgType msg;
MsgState msg_state;
MsgNetwork *mn;
protected:
mutable size_t nsent;
mutable size_t nrecv;
public:
friend MsgNetwork;
Conn(MsgNetwork *mn): msg_state(HEADER), mn(mn), nsent(0), nrecv(0) {}
size_t get_nsent() const { return nsent; }
size_t get_nrecv() const { return nrecv; }
void clear_nsent() const { nsent = 0; }
void clear_nrecv() const { nrecv = 0; }
protected:
void on_read() override;
void on_setup() override {}
void on_teardown() override {}
};
using conn_t = RcObj<Conn>;
using msg_callback_t = std::function<void(const MsgType &msg, conn_t conn)>;
class msg_stat_by_opcode_t:
public std::unordered_map<typename MsgType::opcode_t,
std::pair<uint32_t, size_t>> {
public:
void add(const MsgType &msg) {
auto &p = this->operator[](msg.get_opcode());
p.first++;
p.second += msg.get_length();
}
};
private:
std::unordered_map<typename MsgType::opcode_t,
msg_callback_t> handler_map;
protected:
mutable msg_stat_by_opcode_t sent_by_opcode;
mutable msg_stat_by_opcode_t recv_by_opcode;
uint16_t listen_port;
ConnPool::conn_t create_conn() override { return (new Conn(this))->self(); }
public:
MsgNetwork(struct event_base *eb): ConnPool(eb) {}
void reg_handler(typename MsgType::opcode_t opcode, msg_callback_t handler);
void send_msg(const MsgType &msg, conn_t conn);
void init(NetAddr listen_addr);
msg_stat_by_opcode_t &get_sent_by_opcode() const {
return sent_by_opcode;
}
msg_stat_by_opcode_t &get_recv_by_opcode() const {
return recv_by_opcode;
}
};
/** Simple network that handles client-server requests. */
template<typename MsgType>
class ClientNetwork: public MsgNetwork<MsgType> {
using MsgNet = MsgNetwork<MsgType>;
std::unordered_map<NetAddr, typename MsgNet::conn_t> addr2conn;
public:
class Conn: public MsgNet::Conn {
ClientNetwork *cn;
public:
Conn(ClientNetwork *cn):
MsgNet::Conn(static_cast<MsgNet *>(cn)),
cn(cn) {}
protected:
void on_setup() override;
void on_teardown() override;
};
protected:
ConnPool::conn_t create_conn() override { return (new Conn(this))->self(); }
public:
ClientNetwork(struct event_base *eb): MsgNet(eb) {}
void send_msg(const MsgType &msg, const NetAddr &addr);
};
class PeerNetworkError: public SalticidaeError {
using SalticidaeError::SalticidaeError;
};
/** Peer-to-peer network where any two nodes could hold a bi-diretional message
* channel, established by either side. */
template<typename MsgType>
class PeerNetwork: public MsgNetwork<MsgType> {
using MsgNet= MsgNetwork<MsgType>;
public:
enum IdentityMode {
IP_BASED,
IP_PORT_BASED
};
class Conn: public MsgNet::Conn {
NetAddr peer_id;
Event ev_timeout;
PeerNetwork *pn;
public:
friend PeerNetwork;
const NetAddr &get_peer() { return peer_id; }
Conn(PeerNetwork *pn):
MsgNet::Conn(static_cast<MsgNet *>(pn)),
pn(pn) {}
protected:
void close() override {
ev_timeout.clear();
MsgNet::Conn::close();
}
void on_setup() override;
void on_teardown() override;
};
using conn_t = RcObj<Conn>;
private:
struct Peer {
/** connection addr, may be different due to passive mode */
NetAddr addr;
/** the underlying connection, may be invalid when connected = false */
conn_t conn;
PeerNetwork *pn;
Event ev_ping_timer;
bool ping_timer_ok;
bool pong_msg_ok;
bool connected;
Peer() = delete;
Peer(NetAddr addr, conn_t conn, PeerNetwork *pn, struct event_base *eb):
addr(addr), conn(conn), pn(pn),
ev_ping_timer(
Event(eb, -1, 0, std::bind(&Peer::ping_timer, this, _1, _2))),
connected(false) {}
~Peer() {}
Peer &operator=(const Peer &) = delete;
Peer(const Peer &) = delete;
void ping_timer(evutil_socket_t, short);
void reset_ping_timer();
void send_ping();
void clear_all_events() {
if (ev_ping_timer)
ev_ping_timer.del();
}
void reset_conn(conn_t conn);
};
std::unordered_map <NetAddr, Peer *> id2peer;
std::vector<NetAddr> peer_list;
IdentityMode id_mode;
double ping_period;
double conn_timeout;
void msg_ping(const MsgType &msg, ConnPool::conn_t conn);
void msg_pong(const MsgType &msg, ConnPool::conn_t conn);
void reset_conn_timeout(conn_t conn);
bool check_new_conn(conn_t conn, uint16_t port);
void start_active_conn(const NetAddr &paddr);
protected:
ConnPool::conn_t create_conn() override { return (new Conn(this))->self(); }
public:
PeerNetwork(struct event_base *eb,
double ping_period = 30,
double conn_timeout = 180,
IdentityMode id_mode = IP_PORT_BASED):
MsgNet(eb),
id_mode(id_mode),
ping_period(ping_period),
conn_timeout(conn_timeout) {}
void add_peer(const NetAddr &paddr);