aboutsummaryrefslogblamecommitdiff
path: root/include/salticidae/conn.h
blob: aa414e8296fe6377005bb1b8d9c7d0381c123190 (plain) (tree)








































                                                                                  
                             



                               

                      
                 


















                                                                        
 
                                     


                 

                             
 

                                        


                                      

                                                   
 

                                                             
                        

     
                                             

                           
                                            

                            

                     
 
                                                  

                           
                                 




                            

                                   
                                                          



                                 

                                        







                                                                   
                                        




                                         
                                                 

                  
                       







                                             
                                             


                
                                                     
                               
                                                       
                
                        






                                                                      
                             





                        

                              









                                                 

               
                                    

                                    

                         
                                                                            

         
                                        




                                                        
                                                    
                                                 
                                                              
                                                                     
 

                                                                              





                                              
                                                                    



                                                        
                                        


                         
                                                                     
                                 


                               

                        
                                                           

         
                                                 
                                   
                                                                    
                                    
                                                            



                                       
                           

                               
 







                                               
                    
                                                                              
                                    

           
                                    
                                        


                                               

                                                 
                 




                                    
                             


         


                                        




                                                                          
                                     




      
/**
 * Copyright (c) 2018 Cornell University.
 *
 * Author: Ted Yin <[email protected]>
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of
 * this software and associated documentation files (the "Software"), to deal in
 * the Software without restriction, including without limitation the rights to
 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
 * of the Software, and to permit persons to whom the Software is furnished to do
 * so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

#ifndef _SALTICIDAE_CONN_H
#define _SALTICIDAE_CONN_H

#include <cassert>
#include <cstdint>
#include <event2/event.h>
#include <arpa/inet.h>
#include <unistd.h>

#include <string>
#include <unordered_map>
#include <list>
#include <algorithm>
#include <exception>

#include "salticidae/type.h"
#include "salticidae/ref.h"
#include "salticidae/event.h"
#include "salticidae/util.h"
#include "salticidae/netaddr.h"
#include "salticidae/msg.h"

namespace salticidae {

class SegBuffer {
    struct buffer_entry_t {
        bytearray_t data;
        bytearray_t::iterator offset;
        buffer_entry_t(bytearray_t &&_data): data(std::move(_data)) {
            offset = data.begin();
        }

        buffer_entry_t(buffer_entry_t &&other) {
            size_t _offset = other.offset - other.data.begin();
            data = std::move(other.data);
            offset = data.begin() + _offset;
        }

        buffer_entry_t(const buffer_entry_t &other): data(other.data) {
            offset = data.begin() + (other.offset - other.data.begin());
        }

        size_t length() const { return data.end() - offset; }
    };

    std::list<buffer_entry_t> buffer;
    size_t _size;

    public:
    SegBuffer(): _size(0) {}
    ~SegBuffer() { clear(); }

    void swap(SegBuffer &other) {
        std::swap(buffer, other.buffer);
        std::swap(_size, other._size);
    }

    SegBuffer(const SegBuffer &other):
        buffer(other.buffer), _size(other._size) {}

    SegBuffer(SegBuffer &&other):
        buffer(std::move(other.buffer)), _size(other._size) {
        other._size = 0;
    }

    SegBuffer &operator=(SegBuffer &&other) {
        if (this != &other)
        {
            SegBuffer tmp(std::move(other));
            tmp.swap(*this);
        }
        return *this;
    }
 
    SegBuffer &operator=(const SegBuffer &other) {
        if (this != &other)
        {
            SegBuffer tmp(other);
            tmp.swap(*this);
        }
        return *this;
    }
   
    void push(bytearray_t &&data) {
        _size += data.size();
        buffer.push_back(buffer_entry_t(std::move(data)));
    }
    
    bytearray_t pop(size_t len) {
        bytearray_t res;
        auto i = buffer.begin();
        while (len && i != buffer.end())
        {
            size_t copy_len = std::min(i->length(), len);
            res.insert(res.end(), i->offset, i->offset + copy_len);
            i->offset += copy_len;
            len -= copy_len;
            if (i->offset == i->data.end())
                i++;
        }
        buffer.erase(buffer.begin(), i);
        _size -= res.size();
        return std::move(res);
    }
    
    size_t size() const { return _size; }
    bool empty() const { return buffer.empty(); }
    
    void clear() {
        buffer.clear();
        _size = 0;
    }
};

class ConnPoolError: public SalticidaeError {
    using SalticidaeError::SalticidaeError;
};

/** Abstraction for connection management. */
class ConnPool {
    public:
    class Conn;
    /** The handle to a bi-directional connection. */
    using conn_t = RcObj<Conn>;
    /** Abstraction for a bi-directional connection. */
    class Conn {
        friend ConnPool;
        public:
        enum ConnMode {
            ACTIVE, /**< the connection is established by connect() */
            PASSIVE, /**< the connection is established by accept() */
        };
    
        private:
        size_t seg_buff_size;
        conn_t self_ref;
        int fd;
        ConnPool *cpool;
        ConnMode mode;
        NetAddr addr;

        SegBuffer send_buffer;
        SegBuffer recv_buffer;

        Event ev_read;
        Event ev_write;
        Event ev_connect;
        /** does not need to wait if true */
        bool ready_send;
    
        void recv_data(evutil_socket_t, short);
        void send_data(evutil_socket_t, short);
        void conn_server(evutil_socket_t, short);

        public:
        Conn(): ready_send(false) {}
        Conn(const Conn &) = delete;
        Conn(Conn &&other) = delete;
    
        virtual ~Conn() {
            SALTICIDAE_LOG_INFO("destroyed %s", std::string(*this).c_str());
        }

        /** Get the handle to itself. */
        conn_t self() { return self_ref; }
        operator std::string() const;
        int get_fd() const { return fd; }
        const NetAddr &get_addr() const { return addr; }
        ConnMode get_mode() const { return mode; }
        ConnPool *get_pool() const { return cpool; }
        SegBuffer &read() { return recv_buffer; }
        /** Set the buffer size used for send/receive data. */
        void set_seg_buff_size(size_t size) { seg_buff_size = size; }

        /** Write data to the connection (non-blocking). The data will be sent
         * whenever I/O is available. */
        void write(bytearray_t &&data) {
            send_buffer.push(std::move(data));
            if (ready_send)
                send_data(fd, EV_WRITE);
        }

        /** Move the send buffer from the other (old) connection. */
        void move_send_buffer(conn_t other) {
            send_buffer = std::move(other->send_buffer);
        }

        /** Terminate the connection. */
        void terminate();

        protected:
        /** Close the IO and clear all on-going or planned events. */
        virtual void on_close() {
            ev_read.clear();
            ev_write.clear();
            ev_connect.clear();
            ::close(fd);
            fd = -1;
            self_ref = nullptr; /* remove the self-cycle */
        }

        /** Called when new data is available. */
        virtual void on_read() = 0;
        /** Called when the underlying connection is established. */
        virtual void on_setup() = 0;
        /** Called when the underlying connection breaks. */
        virtual void on_teardown() = 0;
    };
    
    private:
    int max_listen_backlog;
    double conn_server_timeout;
    size_t seg_buff_size;

    std::unordered_map<int, conn_t> pool;
    int listen_fd;
    Event ev_listen;

    void accept_client(evutil_socket_t, short);
    conn_t add_conn(conn_t conn);

    protected:
    EventContext ec;
    /** Should be implemented by derived class to return a new Conn object. */
    virtual Conn *create_conn() = 0;

    public:
    ConnPool(const EventContext &ec,
            int max_listen_backlog = 10,
            double conn_server_timeout = 2,
            size_t seg_buff_size = 4096):
        max_listen_backlog(max_listen_backlog),
        conn_server_timeout(conn_server_timeout),
        seg_buff_size(seg_buff_size),
        ec(ec) {}

    ~ConnPool() {
        for (auto it: pool)
        {
            conn_t conn = it.second;
            conn->on_close();
        }
    }

    ConnPool(const ConnPool &) = delete;
    ConnPool(ConnPool &&) = delete;

    /** Actively connect to remote addr. */
    conn_t connect(const NetAddr &addr);
    /** Listen for passive connections (connection initiated from remote).
     * Does not need to be called if do not want to accept any passive
     * connections. */
    void listen(NetAddr listen_addr);
};

}

#endif