/**
* Copyright (c) 2018 Cornell University.
*
* Author: Ted Yin <[email protected]>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is furnished to do
* so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#ifndef _SALTICIDAE_CONN_H
#define _SALTICIDAE_CONN_H
#include <cassert>
#include <cstdint>
#include <event2/event.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string>
#include <unordered_map>
#include <list>
#include <algorithm>
#include <exception>
#include <mutex>
#include <thread>
#include <fcntl.h>
#include <event2/thread.h>
#include "salticidae/type.h"
#include "salticidae/ref.h"
#include "salticidae/event.h"
#include "salticidae/util.h"
#include "salticidae/netaddr.h"
#include "salticidae/msg.h"
#include "salticidae/buffer.h"
namespace salticidae {
struct ConnPoolError: public SalticidaeError {
using SalticidaeError::SalticidaeError;
};
/** Abstraction for connection management. */
class ConnPool {
public:
class Conn;
/** The handle to a bi-directional connection. */
using conn_t = ArcObj<Conn>;
/** The type of callback invoked when connection status is changed. */
using conn_callback_t = std::function<void(Conn &, bool)>;
/** Abstraction for a bi-directional connection. */
class Conn {
friend ConnPool;
public:
enum ConnMode {
ACTIVE, /**< the connection is established by connect() */
PASSIVE, /**< the connection is established by accept() */
};
protected:
size_t seg_buff_size;
conn_t self_ref;
int fd;
ConnPool *cpool;
ConnMode mode;
NetAddr addr;
// TODO: send_buffer should be a thread-safe mpsc queue
MPSCWriteBuffer send_buffer;
SegBuffer recv_buffer;
Event ev_read;
Event ev_write;
Event ev_connect;
/** does not need to wait if true */
bool ready_send;
void recv_data(int, int);
void send_data(int, int);
void conn_server(int, int);
/** Stop the worker and I/O events. */
void stop();
/** Terminate the connection. */
void terminate();
public:
Conn(): ready_send(false) {}
Conn(const Conn &) = delete;
Conn(Conn &&other) = delete;
virtual ~Conn() {
SALTICIDAE_LOG_INFO("destroyed %s", std::string(*this).c_str());
}
/** Get the handle to itself. */
conn_t self() { return self_ref; }
operator std::string() const;
const NetAddr &get_addr() const { return addr; }
ConnMode get_mode() const { return mode; }
ConnPool *get_pool() const { return cpool; }
MPSCWriteBuffer &get_send_buffer() { return send_buffer; }
/** Write data to the connection (non-blocking). The data will be sent
* whenever I/O is available. */
void write(bytearray_t &&data) {
send_buffer.push(std::move(data));
}
protected:
/** Close the IO and clear all on-going or planned events. */
virtual void on_close() {
ev_read.clear();
ev_write.clear();
ev_connect.clear();
::close(fd);
fd = -1;
self_ref = nullptr; /* remove the self-cycle */
}
/** Called when new data is available. */
virtual void on_read() {}
/** Called when the underlying connection is established. */
virtual void on_setup() {}
/** Called when the underlying connection breaks. */
virtual void on_teardown() {}
};
private:
const int max_listen_backlog;
const double conn_server_timeout;
const size_t seg_buff_size;
/* owned by user loop */
BoxObj<ThreadCall> user_tcall;
conn_callback_t conn_cb;
/* owned by the dispatcher */
Event ev_listen;
std::unordered_map<int, conn_t> pool;
int listen_fd; /**< for accepting new network connections */
void update_conn(const conn_t &conn, bool connected) {
user_tcall->call([this, conn, connected](ThreadCall::Handle &) {
if (conn_cb) conn_cb(*conn, connected);
});
}
class Worker {
EventContext ec;
ThreadCall msgr;
std::thread handle;
public:
Worker(): msgr(ec) {}
/* the following functions are called by the dispatcher */
void start() {
handle = std::thread([this]() { ec.dispatch(); });
}
void feed(const conn_t &conn, int client_fd) {
msgr.call([this, conn, client_fd](ThreadCall::Handle &) {
SALTICIDAE_LOG_INFO("worker %x got %s",
std::this_thread::get_id(),
std::string(*conn).c_str());
conn->get_send_buffer()
.get_queue()
.reg_handler(this->ec, [conn, client_fd]
(MPSCWriteBuffer::queue_t &) {
if (conn->ready_send && conn->fd != -1)
conn->send_data(client_fd, Event::WRITE);
return false;
});
//auto conn_ptr = conn.get();
conn->ev_read = Event(ec, client_fd, Event::READ | Event::WRITE, [conn=conn](int fd, int what) {
if (what & Event::READ)
conn->recv_data(fd, what);
else
conn->send_data(fd, what);
});
// std::bind(&Conn::recv_data, conn_ptr, _1, _2));
//conn->ev_write = Event(ec, client_fd, Event::WRITE,
// std::bind(&Conn::send_data, conn_ptr, _1, _2));
conn->ev_read.add();
//conn->ev_write.add();
});
}
void stop() {
msgr.call([this](ThreadCall::Handle &) {
ec.stop();
});
}
std::thread &get_handle() { return handle; }
const EventContext &get_ec() { return ec; }
};
/* related to workers */
size_t nworker;
salticidae::BoxObj<Worker[]> workers;
void accept_client(int, int);
conn_t add_conn(const conn_t &conn);
void terminate(int fd);
protected:
conn_t _connect(const NetAddr &addr);
void _listen(NetAddr listen_addr);
private:
//class DspMulticast: public DispatchCmd {
// std::vector<conn_t> receivers;
// bytearray_t data;
// public:
// DspMulticast(std::vector<conn_t> &&receivers, bytearray_t &&data):
// receivers(std::move(receivers)),
// data(std::move(data)) {}
// void exec(ConnPool *) override {
// for (auto &r: receivers) r->write(bytearray_t(data));
// }
//};
Worker &select_worker() {
return workers[1];
}
protected:
EventContext ec;
EventContext dispatcher_ec;
BoxObj<ThreadCall> disp_tcall;
/** Should be implemented by derived class to return a new Conn object. */
virtual Conn *create_conn() = 0;
public:
ConnPool(const EventContext &ec,
int max_listen_backlog = 10,
double conn_server_timeout = 2,
size_t seg_buff_size = 4096,
size_t nworker = 2):
max_listen_backlog(max_listen_backlog),
conn_server_timeout(conn_server_timeout),
seg_buff_size(seg_buff_size),
listen_fd(-1),
nworker(std::max((size_t)1, nworker)),
ec(ec) {
workers = new Worker[nworker];
dispatcher_ec = workers[0].get_ec();
user_tcall = new ThreadCall(ec);
disp_tcall = new ThreadCall(dispatcher_ec);
}
~ConnPool() {
/* stop all workers */
for (size_t i = 0; i < nworker; i++)
workers[i].stop();
/* join all worker threads */
for (size_t i = 0; i < nworker; i++)
workers[i].get_handle().join();
for (auto it: pool)
{
conn_t conn = it.second;
conn->on_close();
}
if (listen_fd != -1) close(listen_fd);
}
ConnPool(const ConnPool &) = delete;
ConnPool(ConnPool &&) = delete;
void start() {
SALTICIDAE_LOG_INFO("starting all threads...");
for (size_t i = 0; i < nworker; i++)
workers[i].start();
}
/** Actively connect to remote addr. */
conn_t connect(const NetAddr &addr, bool blocking = true) {
if (blocking)
{
auto ret = static_cast<conn_t *>(disp_tcall->call(
[this, addr](ThreadCall::Handle &h) {
auto ptr = new conn_t(_connect(addr));
std::atomic_thread_fence(std::memory_order_release);
h.set_result(ptr);
}, true));
auto conn = *ret;
delete ret;
return std::move(conn);
}
else
{
disp_tcall->call([this, addr](ThreadCall::Handle &) {
_connect(addr);
}, false);
return nullptr;
}
}
/** Listen for passive connections (connection initiated from remote).
* Does not need to be called if do not want to accept any passive
* connections. */
void listen(NetAddr listen_addr) {
disp_tcall->call([this, listen_addr](ThreadCall::Handle &) {
_listen(listen_addr);
}, true);
}
template<typename Func>
void reg_conn_handler(Func cb) { conn_cb = cb; }
};
}
#endif