diff options
Diffstat (limited to 'include')
-rw-r--r-- | include/salticidae/conn.h | 4 | ||||
-rw-r--r-- | include/salticidae/endian.h | 134 | ||||
-rw-r--r-- | include/salticidae/event.h | 108 | ||||
-rw-r--r-- | include/salticidae/stream.h | 2 | ||||
-rw-r--r-- | include/salticidae/type.h | 2 |
5 files changed, 201 insertions, 49 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index e20a03c..2f72376 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -52,7 +52,9 @@ namespace salticidae { /** Abstraction for connection management. */ class ConnPool { + protected: class Worker; + public: class Conn; /** The handle to a bi-directional connection. */ @@ -212,6 +214,7 @@ class ConnPool { }); } + protected: class Worker { EventContext ec; ThreadCall tcall; @@ -326,6 +329,7 @@ class ConnPool { void stop_tcall() { tcall.stop(); } }; + private: /* related to workers */ size_t nworker; salticidae::BoxObj<Worker[]> workers; diff --git a/include/salticidae/endian.h b/include/salticidae/endian.h index 06a0663..cfbdaff 100644 --- a/include/salticidae/endian.h +++ b/include/salticidae/endian.h @@ -1,33 +1,119 @@ -#ifndef __FINK_ENDIANDEV_PKG_ENDIAN_H__ -#define __FINK_ENDIANDEV_PKG_ENDIAN_H__ 1 +// Ted: this file is downloaded from https://gist.github.com/panzi/6856583 +// "License": Public Domain +// I, Mathias Panzenböck, place this file hereby into the public domain. Use it at your own risk for whatever you like. +// In case there are jurisdictions that don't support putting things in the public domain you can also consider it to +// be "dual licensed" under the BSD, MIT and Apache licenses, if you want to. This code is trivial anyway. Consider it +// an example on how to get the endian conversion functions on different platforms. -/** compatibility header for endian.h - * This is a simple compatibility shim to convert - * BSD/Linux endian macros to the Mac OS X equivalents. - * It is public domain. - * */ +#ifndef PORTABLE_ENDIAN_H__ +#define PORTABLE_ENDIAN_H__ -#ifndef __APPLE__ - #warning "This header file (endian.h) is MacOS X specific.\n" -#endif /* __APPLE__ */ +#if (defined(_WIN16) || defined(_WIN32) || defined(_WIN64)) && !defined(__WINDOWS__) +# define __WINDOWS__ -#include <libkern/OSByteOrder.h> +#endif -#define htobe16(x) OSSwapHostToBigInt16(x) -#define htole16(x) OSSwapHostToLittleInt16(x) -#define be16toh(x) OSSwapBigToHostInt16(x) -#define le16toh(x) OSSwapLittleToHostInt16(x) +#if defined(__linux__) || defined(__CYGWIN__) -#define htobe32(x) OSSwapHostToBigInt32(x) -#define htole32(x) OSSwapHostToLittleInt32(x) -#define be32toh(x) OSSwapBigToHostInt32(x) -#define le32toh(x) OSSwapLittleToHostInt32(x) +# include <endian.h> -#define htobe64(x) OSSwapHostToBigInt64(x) -#define htole64(x) OSSwapHostToLittleInt64(x) -#define be64toh(x) OSSwapBigToHostInt64(x) -#define le64toh(x) OSSwapLittleToHostInt64(x) +#elif defined(__APPLE__) +# include <libkern/OSByteOrder.h> -#endif /* __FINK_ENDIANDEV_PKG_ENDIAN_H__ */ +# define htobe16(x) OSSwapHostToBigInt16(x) +# define htole16(x) OSSwapHostToLittleInt16(x) +# define be16toh(x) OSSwapBigToHostInt16(x) +# define le16toh(x) OSSwapLittleToHostInt16(x) + +# define htobe32(x) OSSwapHostToBigInt32(x) +# define htole32(x) OSSwapHostToLittleInt32(x) +# define be32toh(x) OSSwapBigToHostInt32(x) +# define le32toh(x) OSSwapLittleToHostInt32(x) + +# define htobe64(x) OSSwapHostToBigInt64(x) +# define htole64(x) OSSwapHostToLittleInt64(x) +# define be64toh(x) OSSwapBigToHostInt64(x) +# define le64toh(x) OSSwapLittleToHostInt64(x) + +# define __BYTE_ORDER BYTE_ORDER +# define __BIG_ENDIAN BIG_ENDIAN +# define __LITTLE_ENDIAN LITTLE_ENDIAN +# define __PDP_ENDIAN PDP_ENDIAN + +#elif defined(__OpenBSD__) + +# include <sys/endian.h> + +#elif defined(__NetBSD__) || defined(__FreeBSD__) || defined(__DragonFly__) + +# include <sys/endian.h> + +# define be16toh(x) betoh16(x) +# define le16toh(x) letoh16(x) + +# define be32toh(x) betoh32(x) +# define le32toh(x) letoh32(x) + +# define be64toh(x) betoh64(x) +# define le64toh(x) letoh64(x) + +#elif defined(__WINDOWS__) + +# include <winsock2.h> +# include <sys/param.h> + +# if BYTE_ORDER == LITTLE_ENDIAN + +# define htobe16(x) htons(x) +# define htole16(x) (x) +# define be16toh(x) ntohs(x) +# define le16toh(x) (x) + +# define htobe32(x) htonl(x) +# define htole32(x) (x) +# define be32toh(x) ntohl(x) +# define le32toh(x) (x) + +# define htobe64(x) htonll(x) +# define htole64(x) (x) +# define be64toh(x) ntohll(x) +# define le64toh(x) (x) + +# elif BYTE_ORDER == BIG_ENDIAN + + /* that would be xbox 360 */ +# define htobe16(x) (x) +# define htole16(x) __builtin_bswap16(x) +# define be16toh(x) (x) +# define le16toh(x) __builtin_bswap16(x) + +# define htobe32(x) (x) +# define htole32(x) __builtin_bswap32(x) +# define be32toh(x) (x) +# define le32toh(x) __builtin_bswap32(x) + +# define htobe64(x) (x) +# define htole64(x) __builtin_bswap64(x) +# define be64toh(x) (x) +# define le64toh(x) __builtin_bswap64(x) + +# else + +# error byte order not supported + +# endif + +# define __BYTE_ORDER BYTE_ORDER +# define __BIG_ENDIAN BIG_ENDIAN +# define __LITTLE_ENDIAN LITTLE_ENDIAN +# define __PDP_ENDIAN PDP_ENDIAN + +#else + +# error platform not supported + +#endif + +#endif diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 2f22e1a..fe94c0a 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -29,7 +29,6 @@ #include <condition_variable> #include <unistd.h> #include <uv.h> -#include <sys/eventfd.h> #include "salticidae/type.h" #include "salticidae/queue.h" @@ -544,29 +543,83 @@ class ThreadNotifier { } }; +#if defined(__linux__) +#include <sys/eventfd.h> + +class NotifyFd { + int fd; + static const uint64_t dummy; + public: + NotifyFd(): fd(eventfd(0, EFD_NONBLOCK)) { + if (fd < 0) throw SalticidaeError(SALTI_ERROR_FD); + } + bool reset() { + uint64_t _; + return read(fd, &_, 8) == 8; + } + void notify() { write(fd, &dummy, 8); } + int read_fd() { return fd; } + ~NotifyFd() { close(fd); } +}; + +#elif defined(__APPLE__) +// NOTE: using kqueue/kevent with EVFILT_USER is optimal, but libuv doesn't +// seem to offer such interface for such user level kevent (and its +// identifier). Thus, we downgrade to pipe-based solution on OSX/BSD system. + +class NotifyFd { + int fds[0]; + uint8_t dummy[8]; + public: + NotifyFd() { + if (pipe(fds) < 0 || + fcntl(fds[0], F_SETFL, O_NONBLOCK)) + throw SalticidaeError(SALTI_ERROR_FD); + } + bool reset() { + return read(fds[0], dummy, 8) == 8; + } + void notify() { + write(fds[1], dummy, 8); + } + int read_fd() { return fds[0]; } + ~NotifyFd() { + close(fds[0]); + close(fds[1]); + } +}; + +#else +#warning "platform not supported!" +#endif + template<typename T> class MPSCQueueEventDriven: public MPSCQueue<T> { private: - const uint64_t dummy = 1; std::atomic<bool> wait_sig; - int fd; + //int fd; + NotifyFd nfd; FdEvent ev; public: MPSCQueueEventDriven(): - wait_sig(true), - fd(eventfd(0, EFD_NONBLOCK)) { - if (fd == -1) throw SalticidaeError(SALTI_ERROR_FD); + wait_sig(true) { + //fd(eventfd(0, EFD_NONBLOCK)) { + //if (fd == -1) throw SalticidaeError(SALTI_ERROR_FD); + } + ~MPSCQueueEventDriven() { + //close(fd); + unreg_handler(); } - ~MPSCQueueEventDriven() { close(fd); unreg_handler(); } template<typename Func> void reg_handler(const EventContext &ec, Func &&func) { - ev = FdEvent(ec, fd, + ev = FdEvent(ec, nfd.read_fd(), [this, func=std::forward<Func>(func)](int, int) { //fprintf(stderr, "%x\n", std::this_thread::get_id()); - uint64_t t; - read(fd, &t, 8); + //uint64_t t; + //read(fd, &t, 8); + nfd.reset(); // the only undesirable case is there are some new items // enqueued before recovering wait_sig to true, so the consumer // won't be notified. In this case, no enqueuing thread will @@ -575,7 +628,8 @@ class MPSCQueueEventDriven: public MPSCQueue<T> { // to see those enqueued values in func() wait_sig.exchange(true, std::memory_order_acq_rel); if (func(*this)) - write(fd, &dummy, 8); + nfd.notify(); + //write(fd, &dummy, 8); }); ev.add(FdEvent::READ); } @@ -584,14 +638,14 @@ class MPSCQueueEventDriven: public MPSCQueue<T> { template<typename U> bool enqueue(U &&e, bool unbounded = true) { - static const uint64_t dummy = 1; if (!MPSCQueue<T>::enqueue(std::forward<U>(e), unbounded)) return false; // memory barrier here, so any load/store in enqueue must be finialized if (wait_sig.exchange(false, std::memory_order_acq_rel)) { //SALTICIDAE_LOG_DEBUG("mpsc notify"); - write(fd, &dummy, 8); + //write(fd, &dummy, 8); + nfd.notify(); } return true; } @@ -599,33 +653,39 @@ class MPSCQueueEventDriven: public MPSCQueue<T> { template<typename U> bool try_enqueue(U &&e) = delete; }; +// NOTE: the MPMC implementation below hasn't been heavily tested. template<typename T> class MPMCQueueEventDriven: public MPMCQueue<T> { private: - const uint64_t dummy = 1; std::atomic<bool> wait_sig; - int fd; + //int fd; + NotifyFd nfd; std::vector<FdEvent> evs; public: MPMCQueueEventDriven(): - wait_sig(true), - fd(eventfd(0, EFD_NONBLOCK)) { - if (fd == -1) throw SalticidaeError(SALTI_ERROR_FD); + wait_sig(true) { + //fd(eventfd(0, EFD_NONBLOCK)) { + //if (fd == -1) throw SalticidaeError(SALTI_ERROR_FD); + } + ~MPMCQueueEventDriven() { + //close(fd); + unreg_handlers(); } - ~MPMCQueueEventDriven() { close(fd); unreg_handlers(); } // this function is *NOT* thread-safe template<typename Func> void reg_handler(const EventContext &ec, Func &&func) { - FdEvent ev(ec, fd, [this, func=std::forward<Func>(func)](int, int) { + FdEvent ev(ec, nfd.read_fd(), [this, func=std::forward<Func>(func)](int, int) { //fprintf(stderr, "%x\n", std::this_thread::get_id()); uint64_t t; - if (read(fd, &t, 8) != 8) return; + //if (read(fd, &t, 8) != 8) return; + if (!nfd.reset()) return; // only one consumer should be here a a time wait_sig.exchange(true, std::memory_order_acq_rel); if (func(*this)) - write(fd, &dummy, 8); + //write(fd, &dummy, 8); + nfd.notify(); }); ev.add(FdEvent::READ); evs.push_back(std::move(ev)); @@ -635,14 +695,14 @@ class MPMCQueueEventDriven: public MPMCQueue<T> { template<typename U> bool enqueue(U &&e, bool unbounded = true) { - static const uint64_t dummy = 1; if (!MPMCQueue<T>::enqueue(std::forward<U>(e), unbounded)) return false; // memory barrier here, so any load/store in enqueue must be finialized if (wait_sig.exchange(false, std::memory_order_acq_rel)) { //SALTICIDAE_LOG_DEBUG("mpmc notify"); - write(fd, &dummy, 8); + //write(fd, &dummy, 8); + nfd.notify(); } return true; } diff --git a/include/salticidae/stream.h b/include/salticidae/stream.h index fb951cb..1ac166a 100644 --- a/include/salticidae/stream.h +++ b/include/salticidae/stream.h @@ -349,7 +349,7 @@ class _Bits { ndata = (nbits + bit_per_datum - 1) / bit_per_datum; data = new _impl_type[ndata]; - uint8_t *end = arr + len; + const uint8_t *end = arr + len; for (_impl_type *ptr = data.get(); ptr < data.get() + ndata;) { _impl_type x = 0; diff --git a/include/salticidae/type.h b/include/salticidae/type.h index 4578837..f95b85c 100644 --- a/include/salticidae/type.h +++ b/include/salticidae/type.h @@ -41,6 +41,8 @@ #include "endian.h" #endif +#include "salticidae/endian.h" + namespace salticidae { const auto _1 = std::placeholders::_1; |