/**
* Copyright (c) 2018 Cornell University.
*
* Author: Ted Yin <tederminant@gmail.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is furnished to do
* so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#ifndef _SALTICIDAE_NETWORK_H
#define _SALTICIDAE_NETWORK_H
#include "salticidae/event.h"
#include "salticidae/netaddr.h"
#include "salticidae/msg.h"
#include "salticidae/conn.h"
#ifdef __cplusplus
namespace salticidae {
/** Network of nodes who can send async messages. */
template<typename OpcodeType>
class MsgNetwork: public ConnPool {
public:
using Msg = MsgBase<OpcodeType>;
/* match lambdas */
template<typename T>
struct callback_traits:
public callback_traits<decltype(&T::operator())> {};
/* match plain functions */
template<typename ReturnType, typename MsgType, typename ConnType>
struct callback_traits<ReturnType(MsgType, ConnType)> {
using ret_type = ReturnType;
using conn_type = typename std::remove_reference<ConnType>::type::type;
using msg_type = typename std::remove_reference<MsgType>::type;
};
/* match function pointers */
template<typename ReturnType, typename... Args>
struct callback_traits<ReturnType(*)(Args...)>:
public callback_traits<ReturnType(Args...)> {};
/* match const member functions */
template<typename ClassType, typename ReturnType, typename... Args>
struct callback_traits<ReturnType(ClassType::*)(Args...) const>:
public callback_traits<ReturnType(Args...)> {};
/* match member functions */
template<typename ClassType, typename ReturnType, typename... Args>
struct callback_traits<ReturnType(ClassType::*)(Args...)>:
public callback_traits<ReturnType(Args...)> {};
class Conn: public ConnPool::Conn {
friend MsgNetwork;
enum MsgState {
HEADER,
PAYLOAD
};
Msg msg;
MsgState msg_state;
protected:
#ifdef SALTICIDAE_MSG_STAT
mutable std::atomic<size_t> nsent;
mutable std::atomic<size_t> nrecv;
mutable std::atomic<size_t> nsentb;
mutable std::atomic<size_t> nrecvb;
#endif
public:
Conn(): msg_state(HEADER)
#ifdef SALTICIDAE_MSG_STAT
, nsent(0), nrecv(0), nsentb(0), nrecvb(0)
#endif
{}
MsgNetwork *get_net() {
return static_cast<MsgNetwork *>(get_pool());
}
#ifdef SALTICIDAE_MSG_STAT
size_t get_nsent() const { return nsent; }
size_t get_nrecv() const { return nrecv; }
size_t get_nsentb() const { return nsentb; }
size_t get_nrecvb() const { return nrecvb; }
void clear_msgstat() const {
nsent.store(0, std::memory_order_relaxed);
nrecv.store(0, std::memory_order_relaxed);
nsentb.store(0, std::memory_order_relaxed);
nrecvb.store(0, std::memory_order_relaxed);
}
#endif
protected:
void on_read() override;
};
using conn_t = ArcObj<Conn>;
#ifdef SALTICIDAE_MSG_STAT
// TODO: a lock-free, thread-safe, fine-grained stat
#endif
private:
std::unordered_map<
typename Msg::opcode_t,
std::function<void(const Msg &msg, const conn_t &)>> handler_map;
using queue_t = MPSCQueueEventDriven<std::pair<Msg, conn_t>>;
queue_t incoming_msgs;
protected:
ConnPool::Conn *create_conn() override { return new Conn(); }
public:
class Config: public ConnPool::Config {
friend MsgNetwork;
size_t _burst_size;
public:
Config(): Config(ConnPool::Config()) {}
Config(const ConnPool::Config &config):
ConnPool::Config(config), _burst_size(1000) {}
Config &burst_size(size_t x) {
_burst_size = x;
return *this;
}
};
virtual ~MsgNetwork() { stop(); }
MsgNetwork(