aboutsummaryrefslogblamecommitdiff
path: root/include/salticidae/event.h
blob: b2438654c27fac4246d0202db5718e10b240e768 (plain) (tree)























                                                                                  


                           
                  
                             
                   
               
                        
 
                            
                             


                            



                                                 


                                     

                                                  

                       


     
                                                                    


                                              



                                                         



                                                            

                                  
                                      

                                         

  


                                                             
           
                                                               

                                         
                                     
 

                    
           
                     
                        
 
                                                                      
                        
                            
                                                     


                                           



                                                                 
                                           

                                          

     

















                                                                      

     

                                         
         

                                          
         
                     
     
 


                           
                             
         


                                                                
         





































                                                            
                                           
                                          
                              

     







                                                              
 











                                               
         

                                             
         


                     
                              

                  
                                
         
                                    
                                                                   
                               
         




                                             

     


                                                                                    
     
 
                



                                                         

                                                     






                                                       


                                 


                                                          










                                                                      
     



                                                                               



                                                     
                              

     

                                                    




                                                          
                                  

                               




                                                                                      





                                                              




















                                                   



                                                      


                                                          

                                                             







                                                      

  



                                                       
                    






                                                             
           


                                                          
                                           
                                         




                                        
                                                          
                                                 
                               

                                

     








                                            
 





                                           








                                   
                                                                 









                                             
                                  

                                                            

                               
                                  


                                                                    
                
                                                      


                                                       

                                                     

  














































































                                                                   
                    




                               
           

                                                
              
                                             
                               
     
                            

                            
                                



                        
                    
                                                 
            
                             

                               
               

           
                           


                                                            
     
                                          
 

                                                           
                            
                                                                     








                                                                                                
                                                                       
                                    

                                             
                              

     

                                        
                        
                                                
                                        

                                                                  

                                                                               



                                                


                    
                                                          

  








                                                 
                           


                                                            
     
                                          








                                                                            
                                                               









                                          
                                                
                                        

                                                                  





                                                                               
                    
     

                                                          

  


                         



                                                   
                 
















































                                                                            
                                                                            

                                            
                                                                                                          




                                                                     
                                        









                                                     



















                                                     
                                   

  

 
                           
                                                

                                                           
                                     
                                         
      

     
                   
                           
                                             

                                                       
                               
                                   

      
      

                           
                  
            
      

                                   
                                             

                                                 
 



                                                                                               
                                                                  
 

                                                                                  
                               
                                          


                                                        
 


                                                                                      



                                                
                                                            
 
                  

      
      
 
      
/**
 * Copyright (c) 2018 Cornell University.
 *
 * Author: Ted Yin <[email protected]>
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of
 * this software and associated documentation files (the "Software"), to deal in
 * the Software without restriction, including without limitation the rights to
 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
 * of the Software, and to permit persons to whom the Software is furnished to do
 * so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

#ifndef _SALTICIDAE_EVENT_H
#define _SALTICIDAE_EVENT_H

#ifdef __cplusplus
#include <condition_variable>
#include <unistd.h>
#include <uv.h>
#include <sys/eventfd.h>

#include "salticidae/type.h"
#include "salticidae/queue.h"
#include "salticidae/util.h"
#include "salticidae/ref.h"

namespace salticidae {

struct _event_context_deleter {
    constexpr _event_context_deleter() = default;
    void operator()(uv_loop_t *ptr) {
        if (ptr != nullptr)
        {
            while (uv_loop_close(ptr) == UV_EBUSY)
                uv_run(ptr, UV_RUN_NOWAIT);
            delete ptr;
        }
    }
};

using _event_context_ot = ArcObj<uv_loop_t, _event_context_deleter>;

class EventContext: public _event_context_ot {
    public:
    EventContext(): _event_context_ot(new uv_loop_t()) {
        uv_loop_init(get());
    }
    EventContext(uv_loop_t *eb): _event_context_ot(eb) {}
    EventContext(const EventContext &) = default;
    EventContext(EventContext &&) = default;
    EventContext &operator=(const EventContext &) = default;
    EventContext &operator=(EventContext &&) = default;
    void dispatch() const {
        // TODO: improve this loop
        uv_run(get(), UV_RUN_DEFAULT);
    }
    void stop() const { uv_stop(get()); }
};

static void _on_uv_handle_close(uv_handle_t *h) { delete h; }

class FdEvent {
    public:
    using callback_t = std::function<void(int fd, int events)>;
    static const int READ = UV_READABLE;
    static const int WRITE = UV_WRITABLE;
    static const int ERROR = 1 << 30;

    protected:
    EventContext ec;
    int fd;
    uv_poll_t *ev_fd;
    callback_t callback;

    static inline void fd_then(uv_poll_t *h, int status, int events) {
        if (status != 0)
            events |= ERROR;
        auto event = static_cast<FdEvent *>(h->data);
        event->callback(event->fd, events);
    }

    public:
    FdEvent(): ec(nullptr), ev_fd(nullptr) {}
    FdEvent(const EventContext &ec, int fd, callback_t callback):
            ec(ec), fd(fd), ev_fd(new uv_poll_t()),
            callback(std::move(callback)) {
        uv_poll_init(ec.get(), ev_fd, fd);
        ev_fd->data = this;
    }

    FdEvent(const FdEvent &) = delete;
    FdEvent(FdEvent &&other):
            ec(std::move(other.ec)), fd(other.fd), ev_fd(other.ev_fd),
            callback(std::move(other.callback)) {
        other.ev_fd = nullptr;
        if (ev_fd != nullptr)
            ev_fd->data = this;
    }
    
    void swap(FdEvent &other) {
        std::swap(ec, other.ec);
        std::swap(fd, other.fd);
        std::swap(ev_fd, other.ev_fd);
        std::swap(callback, other.callback);
        if (ev_fd != nullptr)
            ev_fd->data = this;
        if (other.ev_fd != nullptr)
            other.ev_fd->data = &other;
    }

    FdEvent &operator=(FdEvent &&other) {
        if (this != &other)
        {
            FdEvent tmp(std::move(other));
            tmp.swap(*this);
        }
        return *this;
    }

    ~FdEvent() { clear(); }

    void clear() {
        if (ev_fd != nullptr)
        {
            uv_poll_stop(ev_fd);
            uv_close((uv_handle_t *)ev_fd, _on_uv_handle_close);
            ev_fd = nullptr;
        }
        callback = nullptr;
    }

    void set_callback(callback_t _callback) {
        callback = _callback;
    }

    void add(int events) {
        assert(ev_fd != nullptr);
        uv_poll_start(ev_fd, events, FdEvent::fd_then);
    }

    void del() {
        if (ev_fd != nullptr) uv_poll_stop(ev_fd);
    }

    operator bool() const { return ev_fd != nullptr; }
};


class TimerEvent {
    public:
    using callback_t = std::function<void(TimerEvent &)>;

    protected:
    EventContext ec;
    uv_timer_t *ev_timer;
    callback_t callback;

    static inline void timer_then(uv_timer_t *h) {
        auto event = static_cast<TimerEvent *>(h->data);
        event->callback(*event);
    }

    public:
    TimerEvent(): ec(nullptr), ev_timer(nullptr) {}
    TimerEvent(const EventContext &ec, callback_t callback):
            ec(ec), ev_timer(new uv_timer_t()),
            callback(std::move(callback)) {
        uv_timer_init(ec.get(), ev_timer);
        ev_timer->data = this;
    }

    TimerEvent(const TimerEvent &) = delete;
    TimerEvent(TimerEvent &&other):
            ec(std::move(other.ec)), ev_timer(other.ev_timer),
            callback(std::move(other.callback)) {
        other.ev_timer = nullptr;
        if (ev_timer != nullptr)
            ev_timer->data = this;
    }

    void swap(TimerEvent &other) {
        std::swap(ec, other.ec);
        std::swap(ev_timer, other.ev_timer);
        std::swap(callback, other.callback);
        if (ev_timer != nullptr)
            ev_timer->data = this;
        if (other.ev_timer != nullptr)
            other.ev_timer->data = &other;
    }

    TimerEvent &operator=(TimerEvent &&other) {
        if (this != &other)
        {
            TimerEvent tmp(std::move(other));
            tmp.swap(*this);
        }
        return *this;
    }

    ~TimerEvent() { clear(); }

    void clear() {
        if (ev_timer != nullptr)
        {
            uv_timer_stop(ev_timer);
            uv_close((uv_handle_t *)ev_timer, _on_uv_handle_close);
            ev_timer = nullptr;
        }
        callback = nullptr;
    }

    void set_callback(callback_t _callback) {
        callback = _callback;
    }

    void add(double t_sec) {
        assert(ev_timer != nullptr);
        uv_timer_start(ev_timer, TimerEvent::timer_then, uint64_t(t_sec * 1000), 0);
    }

    void del() {
        if (ev_timer != nullptr) uv_timer_stop(ev_timer);
    }

    operator bool() const { return ev_timer != nullptr; }

    const EventContext &get_ec() const { return ec; }
};

class TimedFdEvent: public FdEvent, public TimerEvent {
    public:
    static const int TIMEOUT = 1 << 29;

    private:
    FdEvent::callback_t callback;
    uint64_t timeout;

    static inline void timer_then(uv_timer_t *h) {
        auto event = static_cast<TimedFdEvent *>(h->data);
        event->FdEvent::del();
        event->callback(event->fd, TIMEOUT);
    }

    static inline void fd_then(uv_poll_t *h, int status, int events) {
        if (status != 0)
            events |= ERROR;
        auto event = static_cast<TimedFdEvent *>(h->data);
        event->TimerEvent::del();
        uv_timer_start(event->ev_timer, TimedFdEvent::timer_then,
                        event->timeout, 0);
        event->callback(event->fd, events);
    }

    public:
    TimedFdEvent() = default;
    TimedFdEvent(const EventContext &ec, int fd, FdEvent::callback_t callback):
            FdEvent(ec, fd, FdEvent::callback_t()),
            TimerEvent(ec, TimerEvent::callback_t()),
            callback(std::move(callback)) {
        ev_fd->data = this;
        ev_timer->data = this;
    }

    TimedFdEvent(TimedFdEvent &&other):
            FdEvent(static_cast<FdEvent &&>(other)),
            TimerEvent(static_cast<TimerEvent &&>(other)),
            callback(std::move(other.callback)),
            timeout(other.timeout) {
        if (ev_fd != nullptr)
        {
            ev_timer->data = this;
            ev_fd->data = this;
        }
    }

    void swap(TimedFdEvent &other) {
        std::swap(static_cast<FdEvent &>(*this), static_cast<FdEvent &>(other));
        std::swap(static_cast<TimerEvent &>(*this), static_cast<TimerEvent &>(other));
        std::swap(callback, other.callback);
        std::swap(timeout, other.timeout);
        if (ev_fd != nullptr)
            ev_fd->data = ev_timer->data = this;
        if (other.ev_fd != nullptr)
            other.ev_fd->data = other.ev_timer->data = &other;
    }

    TimedFdEvent &operator=(TimedFdEvent &&other) {
        if (this != &other)
        {
            TimedFdEvent tmp(std::move(other));
            tmp.swap(*this);
        }
        return *this;
    }

    void clear() {
        TimerEvent::clear();
        FdEvent::clear();
    }

    using FdEvent::set_callback;

    void add(int events) = delete;
    void add(double t_sec) = delete;

    void set_callback(FdEvent::callback_t _callback) {
        callback = _callback;
    }

    void add(int events, double t_sec) {
        assert(ev_fd != nullptr && ev_timer != nullptr);
        uv_timer_start(ev_timer, TimedFdEvent::timer_then,
                        timeout = uint64_t(t_sec * 1000), 0);
        uv_poll_start(ev_fd, events, TimedFdEvent::fd_then);
    }

    void del() {
        TimerEvent::del();
        FdEvent::del();
    }

    operator bool() const { return ev_fd != nullptr; }
};

class SigEvent {
    public:
    using callback_t = std::function<void(int signum)>;
    private:
    EventContext ec;
    uv_signal_t *ev_sig;
    callback_t callback;
    static inline void sig_then(uv_signal_t *h, int signum) {
        auto event = static_cast<SigEvent *>(h->data);
        event->callback(signum);
    }

    public:
    SigEvent(): ec(nullptr), ev_sig(nullptr) {}
    SigEvent(const EventContext &ec, callback_t callback):
            ec(ec), ev_sig(new uv_signal_t()),
            callback(std::move(callback)) {
        uv_signal_init(ec.get(), ev_sig);
        ev_sig->data = this;
    }

    SigEvent(const SigEvent &) = delete;
    SigEvent(SigEvent &&other):
            ec(std::move(other.ec)), ev_sig(other.ev_sig),
            callback(std::move(other.callback)) {
        other.ev_sig = nullptr;
        if (ev_sig != nullptr)
            ev_sig->data = this;
    }

    void swap(SigEvent &other) {
        std::swap(ec, other.ec);
        std::swap(ev_sig, other.ev_sig);
        std::swap(callback, other.callback);
        if (ev_sig != nullptr)
            ev_sig->data = this;
        if (other.ev_sig != nullptr)
            other.ev_sig->data = &other;
    }

    SigEvent &operator=(SigEvent &&other) {
        if (this != &other)
        {
            SigEvent tmp(std::move(other));
            tmp.swap(*this);
        }
        return *this;
    }

    ~SigEvent() { clear(); }

    void clear() {
        if (ev_sig != nullptr)
        {
            uv_signal_stop(ev_sig);
            uv_close((uv_handle_t *)ev_sig, _on_uv_handle_close);
            ev_sig = nullptr;
        }
        callback = nullptr;
    }

    void set_callback(callback_t _callback) {
        callback = _callback;
    }

    void add(int signum) {
        assert(ev_sig != nullptr);
        uv_signal_start(ev_sig, SigEvent::sig_then, signum);
    }

    void add_once(int signum) {
        assert(ev_sig != nullptr);
        uv_signal_start_oneshot(ev_sig, SigEvent::sig_then, signum);
    }

    void del() {
        if (ev_sig != nullptr) uv_signal_stop(ev_sig);
    }

    operator bool() const { return ev_sig != nullptr; }

    const EventContext &get_ec() const { return ec; }
};

class CheckEvent {
    public:
    using callback_t = std::function<void()>;
    private:
    EventContext ec;
    uv_check_t *ev_check;
    callback_t callback;
    static inline void check_then(uv_check_t *h) {
        auto event = static_cast<CheckEvent *>(h->data);
        event->callback();
    }

    public:
    CheckEvent(): ec(nullptr), ev_check(nullptr) {}
    CheckEvent(const EventContext &ec, callback_t callback):
            ec(ec), ev_check(new uv_check_t()),
            callback(std::move(callback)) {
        uv_check_init(ec.get(), ev_check);
        ev_check->data = this;
    }

    CheckEvent(const CheckEvent &) = delete;
    CheckEvent(CheckEvent &&other):
            ec(std::move(other.ec)), ev_check(other.ev_check),
            callback(std::move(other.callback)) {
        other.ev_check = nullptr;
        if (ev_check != nullptr)
            ev_check->data = this;
    }

    void swap(CheckEvent &other) {
        std::swap(ec, other.ec);
        std::swap(ev_check, other.ev_check);
        std::swap(callback, other.callback);
        if (ev_check != nullptr)
            ev_check->data = this;
        if (other.ev_check != nullptr)
            other.ev_check->data = &other;
    }

    CheckEvent &operator=(CheckEvent &&other) {
        if (this != &other)
        {
            CheckEvent tmp(std::move(other));
            tmp.swap(*this);
        }
        return *this;
    }

    ~CheckEvent() { clear(); }

    void clear() {
        if (ev_check != nullptr)
        {
            uv_check_stop(ev_check);
            uv_close((uv_handle_t *)ev_check, _on_uv_handle_close);
            ev_check = nullptr;
        }
        callback = nullptr;
    }

    void set_callback(callback_t _callback) {
        callback = _callback;
    }

    void add() {
        assert(ev_check != nullptr);
        uv_check_start(ev_check, CheckEvent::check_then);
    }

    void del() {
        if (ev_check != nullptr) uv_check_stop(ev_check);
    }

    operator bool() const { return ev_check != nullptr; }

    const EventContext &get_ec() const { return ec; }
};

template<typename T>
class ThreadNotifier {
    std::condition_variable cv;
    std::mutex mlock;
    mutex_ul_t ul;
    bool ready;
    T data;
    public:
    ThreadNotifier(): ul(mlock), ready(false) {}
    T wait() {
        cv.wait(ul, [this]{ return ready; });
        return std::move(data);
    }
    void notify(T &&_data) {
        mutex_lg_t _(mlock);
        ready = true;
        data = std::move(_data);
        cv.notify_all();
    }
};

template<typename T>
class MPSCQueueEventDriven: public MPSCQueue<T> {
    private:
    const uint64_t dummy = 1;
    std::atomic<bool> wait_sig;
    int fd;
    FdEvent ev;

    public:
    MPSCQueueEventDriven():
            wait_sig(true),
            fd(eventfd(0, EFD_NONBLOCK)) {
        if (fd == -1) throw SalticidaeError(SALTI_ERROR_FD);
    }
    ~MPSCQueueEventDriven() { close(fd); }

    template<typename Func>
    void reg_handler(const EventContext &ec, Func &&func) {
        ev = FdEvent(ec, fd,
                    [this, func=std::forward<Func>(func)](int, int) {
                    //fprintf(stderr, "%x\n", std::this_thread::get_id());
                    uint64_t t;
                    read(fd, &t, 8);
                    // the only undesirable case is there are some new items
                    // enqueued before recovering wait_sig to true, so the consumer
                    // won't be notified. In this case, no enqueuing thread will
                    // get to write(fd). Then store(true) must happen after all exchange(false),
                    // since all enqueue operations are finalized, the dequeue should be able
                    // to see those enqueued values in func()
                    wait_sig.exchange(true, std::memory_order_acq_rel);
                    if (func(*this))
                        write(fd, &dummy, 8);
                });
        ev.add(FdEvent::READ);
    }

    void unreg_handler() { ev.clear(); }

    template<typename U>
    bool enqueue(U &&e, bool unbounded = true) {
        static const uint64_t dummy = 1;
        if (!MPSCQueue<T>::enqueue(std::forward<U>(e), unbounded))
            return false;
        // memory barrier here, so any load/store in enqueue must be finialized
        if (wait_sig.exchange(false, std::memory_order_acq_rel))
        {
            SALTICIDAE_LOG_DEBUG("mpsc notify");
            write(fd, &dummy, 8);
        }
        return true;
    }

    template<typename U> bool try_enqueue(U &&e) = delete;
};

template<typename T>
class MPMCQueueEventDriven: public MPMCQueue<T> {
    private:
    const uint64_t dummy = 1;
    std::atomic<bool> wait_sig;
    int fd;
    std::vector<FdEvent> evs;

    public:
    MPMCQueueEventDriven():
            wait_sig(true),
            fd(eventfd(0, EFD_NONBLOCK)) {
        if (fd == -1) throw SalticidaeError(SALTI_ERROR_FD);
    }
    ~MPMCQueueEventDriven() { close(fd); }

    // this function is *NOT* thread-safe
    template<typename Func>
    void reg_handler(const EventContext &ec, Func &&func) {
        FdEvent ev(ec, fd, [this, func=std::forward<Func>(func)](int, int) {
            //fprintf(stderr, "%x\n", std::this_thread::get_id());
            uint64_t t;
            if (read(fd, &t, 8) != 8) return;
            // only one consumer should be here a a time
            wait_sig.exchange(true, std::memory_order_acq_rel);
            if (func(*this))
                write(fd, &dummy, 8);
        });
        ev.add(FdEvent::READ);
        evs.push_back(std::move(ev));
    }

    void unreg_handlers() { evs.clear(); }

    template<typename U>
    bool enqueue(U &&e, bool unbounded = true) {
        static const uint64_t dummy = 1;
        if (!MPMCQueue<T>::enqueue(std::forward<U>(e), unbounded))
            return false;
        // memory barrier here, so any load/store in enqueue must be finialized
        if (wait_sig.exchange(false, std::memory_order_acq_rel))
        {
            SALTICIDAE_LOG_DEBUG("mpsc notify");
            write(fd, &dummy, 8);
        }
        return true;
    }

    template<typename U> bool try_enqueue(U &&e) = delete;
};

class ThreadCall {
    public: class Handle;
    private:
    EventContext ec;
    const size_t burst_size;
    using queue_t = MPSCQueueEventDriven<Handle *>;
    queue_t q;
    bool stopped;

    public:
    struct Result {
        void *data;
        std::function<void(void *)> deleter;
        Result(): data(nullptr) {}
        Result(void *data, std::function<void(void *)> &&deleter):
            data(data), deleter(std::move(deleter)) {}
        ~Result() { if (data != nullptr) deleter(data); }
        Result(const Result &) = delete;
        Result(Result &&other):
                data(other.data), deleter(std::move(other.deleter)) {
            other.data = nullptr;
        }
        void swap(Result &other) {
            std::swap(data, other.data);
            std::swap(deleter, other.deleter);
        }
        Result &operator=(const Result &other) = delete;
        Result &operator=(Result &&other) {
            if (this != &other)
            {
                Result tmp(std::move(other));
                tmp.swap(*this);
            }
            return *this;
        }
        void *get() { return data; }
    };
    class Handle {
        std::function<void(Handle &)> callback;
        ThreadNotifier<Result> * notifier;
        Result result;
        friend ThreadCall;
        public:
        Handle(): notifier(nullptr) {}
        void exec() {
            callback(*this);
            if (notifier)
                notifier->notify(std::move(result));
        }
        template<typename T>
        void set_result(T &&data) {
            using _T = std::remove_reference_t<T>;
            result = Result(new _T(std::forward<T>(data)),
                            [](void *ptr) {delete static_cast<_T *>(ptr);});
        }
    };

    ThreadCall(size_t burst_size): burst_size(burst_size), stopped(false) {}
    ThreadCall(const ThreadCall &) = delete;
    ThreadCall(ThreadCall &&) = delete;
    ThreadCall(EventContext ec, size_t burst_size = 128): ec(ec), burst_size(burst_size), stopped(false) {
        q.reg_handler(ec, [this, burst_size=burst_size](queue_t &q) {
            size_t cnt = 0;
            Handle *h;
            while (q.try_dequeue(h))
            {
                if (!stopped) h->exec();
                delete h;
                if (++cnt == burst_size) return true;
            }
            return false;
        });
    }

    ~ThreadCall() {
        Handle *h;
        while (q.try_dequeue(h)) delete h;
    }

    template<typename Func>
    void async_call(Func callback) {
        auto h = new Handle();
        h->callback = callback;
        q.enqueue(h);
    }

    template<typename Func>
    Result call(Func callback) {
        auto h = new Handle();
        h->callback = callback;
        ThreadNotifier<Result> notifier;
        h->notifier = &notifier;
        q.enqueue(h);
        return notifier.wait();
    }

    const EventContext &get_ec() const { return ec; }
    void stop() { stopped = true; }
};

}

#ifdef SALTICIDAE_CBINDINGS
using eventcontext_t = salticidae::EventContext;
using threadcall_t = salticidae::ThreadCall;
using threadcall_handle_t = salticidae::ThreadCall::Handle;
using sigev_t = salticidae::SigEvent;
using timerev_t = salticidae::TimerEvent;
#endif

#else
#include "config.h"
#ifdef SALTICIDAE_CBINDINGS
typedef struct eventcontext_t eventcontext_t;
typedef struct threadcall_t threadcall_t;
typedef struct threadcall_handle_t threadcall_handle_t;
typedef struct sigev_t sigev_t;
typedef struct timerev_t timerev_t;
#endif

#endif

#ifdef SALTICIDAE_CBINDINGS
#ifdef __cplusplus
extern "C" {
#endif

eventcontext_t *eventcontext_new();
void eventcontext_free(eventcontext_t *self);
void eventcontext_dispatch(eventcontext_t *self);
void eventcontext_stop(eventcontext_t *self);

typedef void (*threadcall_callback_t)(threadcall_handle_t *handle, void *userdata);
threadcall_t *threadcall_new(const eventcontext_t *ec);
void threadcall_free(threadcall_t *self);
void threadcall_async_call(threadcall_t *self, threadcall_callback_t callback, void *userdata);
const eventcontext_t *threadcall_get_ec(const threadcall_t *self);

typedef void (*sigev_callback_t)(int signum, void *);
sigev_t *sigev_new(const eventcontext_t *ec, sigev_callback_t cb, void *userdata);
void sigev_free(sigev_t *self);
void sigev_add(sigev_t *self, int signum);
void sigev_del(sigev_t *self);
void sigev_clear(sigev_t *self);
const eventcontext_t *sigev_get_ec(const sigev_t *self);

typedef void (*timerev_callback_t)(timerev_t *, void *);
timerev_t *timerev_new(const eventcontext_t *ec, timerev_callback_t callback, void *);
void timerev_set_callback(timerev_t *self, timerev_callback_t callback, void *);
void timerev_free(timerev_t *self);
void timerev_add(timerev_t *self, double t_sec);
void timerev_del(timerev_t *self);
void timerev_clear(timerev_t *self);
const eventcontext_t *timerev_get_ec(const timerev_t *self);

#ifdef __cplusplus
}
#endif
#endif

#endif