diff options
-rw-r--r-- | include/salticidae/conn.h | 4 | ||||
-rw-r--r-- | include/salticidae/endian.h | 119 | ||||
-rw-r--r-- | include/salticidae/event.h | 108 | ||||
-rw-r--r-- | include/salticidae/stream.h | 2 | ||||
-rw-r--r-- | include/salticidae/type.h | 2 | ||||
-rw-r--r-- | src/conn.cpp | 12 | ||||
-rw-r--r-- | src/event.cpp | 4 |
7 files changed, 224 insertions, 27 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h index e20a03c..2f72376 100644 --- a/include/salticidae/conn.h +++ b/include/salticidae/conn.h @@ -52,7 +52,9 @@ namespace salticidae { /** Abstraction for connection management. */ class ConnPool { + protected: class Worker; + public: class Conn; /** The handle to a bi-directional connection. */ @@ -212,6 +214,7 @@ class ConnPool { }); } + protected: class Worker { EventContext ec; ThreadCall tcall; @@ -326,6 +329,7 @@ class ConnPool { void stop_tcall() { tcall.stop(); } }; + private: /* related to workers */ size_t nworker; salticidae::BoxObj<Worker[]> workers; diff --git a/include/salticidae/endian.h b/include/salticidae/endian.h new file mode 100644 index 0000000..cfbdaff --- /dev/null +++ b/include/salticidae/endian.h @@ -0,0 +1,119 @@ +// Ted: this file is downloaded from https://gist.github.com/panzi/6856583 +// "License": Public Domain +// I, Mathias Panzenböck, place this file hereby into the public domain. Use it at your own risk for whatever you like. +// In case there are jurisdictions that don't support putting things in the public domain you can also consider it to +// be "dual licensed" under the BSD, MIT and Apache licenses, if you want to. This code is trivial anyway. Consider it +// an example on how to get the endian conversion functions on different platforms. + +#ifndef PORTABLE_ENDIAN_H__ +#define PORTABLE_ENDIAN_H__ + +#if (defined(_WIN16) || defined(_WIN32) || defined(_WIN64)) && !defined(__WINDOWS__) + +# define __WINDOWS__ + +#endif + +#if defined(__linux__) || defined(__CYGWIN__) + +# include <endian.h> + +#elif defined(__APPLE__) + +# include <libkern/OSByteOrder.h> + +# define htobe16(x) OSSwapHostToBigInt16(x) +# define htole16(x) OSSwapHostToLittleInt16(x) +# define be16toh(x) OSSwapBigToHostInt16(x) +# define le16toh(x) OSSwapLittleToHostInt16(x) + +# define htobe32(x) OSSwapHostToBigInt32(x) +# define htole32(x) OSSwapHostToLittleInt32(x) +# define be32toh(x) OSSwapBigToHostInt32(x) +# define le32toh(x) OSSwapLittleToHostInt32(x) + +# define htobe64(x) OSSwapHostToBigInt64(x) +# define htole64(x) OSSwapHostToLittleInt64(x) +# define be64toh(x) OSSwapBigToHostInt64(x) +# define le64toh(x) OSSwapLittleToHostInt64(x) + +# define __BYTE_ORDER BYTE_ORDER +# define __BIG_ENDIAN BIG_ENDIAN +# define __LITTLE_ENDIAN LITTLE_ENDIAN +# define __PDP_ENDIAN PDP_ENDIAN + +#elif defined(__OpenBSD__) + +# include <sys/endian.h> + +#elif defined(__NetBSD__) || defined(__FreeBSD__) || defined(__DragonFly__) + +# include <sys/endian.h> + +# define be16toh(x) betoh16(x) +# define le16toh(x) letoh16(x) + +# define be32toh(x) betoh32(x) +# define le32toh(x) letoh32(x) + +# define be64toh(x) betoh64(x) +# define le64toh(x) letoh64(x) + +#elif defined(__WINDOWS__) + +# include <winsock2.h> +# include <sys/param.h> + +# if BYTE_ORDER == LITTLE_ENDIAN + +# define htobe16(x) htons(x) +# define htole16(x) (x) +# define be16toh(x) ntohs(x) +# define le16toh(x) (x) + +# define htobe32(x) htonl(x) +# define htole32(x) (x) +# define be32toh(x) ntohl(x) +# define le32toh(x) (x) + +# define htobe64(x) htonll(x) +# define htole64(x) (x) +# define be64toh(x) ntohll(x) +# define le64toh(x) (x) + +# elif BYTE_ORDER == BIG_ENDIAN + + /* that would be xbox 360 */ +# define htobe16(x) (x) +# define htole16(x) __builtin_bswap16(x) +# define be16toh(x) (x) +# define le16toh(x) __builtin_bswap16(x) + +# define htobe32(x) (x) +# define htole32(x) __builtin_bswap32(x) +# define be32toh(x) (x) +# define le32toh(x) __builtin_bswap32(x) + +# define htobe64(x) (x) +# define htole64(x) __builtin_bswap64(x) +# define be64toh(x) (x) +# define le64toh(x) __builtin_bswap64(x) + +# else + +# error byte order not supported + +# endif + +# define __BYTE_ORDER BYTE_ORDER +# define __BIG_ENDIAN BIG_ENDIAN +# define __LITTLE_ENDIAN LITTLE_ENDIAN +# define __PDP_ENDIAN PDP_ENDIAN + +#else + +# error platform not supported + +#endif + +#endif diff --git a/include/salticidae/event.h b/include/salticidae/event.h index 2f22e1a..fe94c0a 100644 --- a/include/salticidae/event.h +++ b/include/salticidae/event.h @@ -29,7 +29,6 @@ #include <condition_variable> #include <unistd.h> #include <uv.h> -#include <sys/eventfd.h> #include "salticidae/type.h" #include "salticidae/queue.h" @@ -544,29 +543,83 @@ class ThreadNotifier { } }; +#if defined(__linux__) +#include <sys/eventfd.h> + +class NotifyFd { + int fd; + static const uint64_t dummy; + public: + NotifyFd(): fd(eventfd(0, EFD_NONBLOCK)) { + if (fd < 0) throw SalticidaeError(SALTI_ERROR_FD); + } + bool reset() { + uint64_t _; + return read(fd, &_, 8) == 8; + } + void notify() { write(fd, &dummy, 8); } + int read_fd() { return fd; } + ~NotifyFd() { close(fd); } +}; + +#elif defined(__APPLE__) +// NOTE: using kqueue/kevent with EVFILT_USER is optimal, but libuv doesn't +// seem to offer such interface for such user level kevent (and its +// identifier). Thus, we downgrade to pipe-based solution on OSX/BSD system. + +class NotifyFd { + int fds[0]; + uint8_t dummy[8]; + public: + NotifyFd() { + if (pipe(fds) < 0 || + fcntl(fds[0], F_SETFL, O_NONBLOCK)) + throw SalticidaeError(SALTI_ERROR_FD); + } + bool reset() { + return read(fds[0], dummy, 8) == 8; + } + void notify() { + write(fds[1], dummy, 8); + } + int read_fd() { return fds[0]; } + ~NotifyFd() { + close(fds[0]); + close(fds[1]); + } +}; + +#else +#warning "platform not supported!" +#endif + template<typename T> class MPSCQueueEventDriven: public MPSCQueue<T> { private: - const uint64_t dummy = 1; std::atomic<bool> wait_sig; - int fd; + //int fd; + NotifyFd nfd; FdEvent ev; public: MPSCQueueEventDriven(): - wait_sig(true), - fd(eventfd(0, EFD_NONBLOCK)) { - if (fd == -1) throw SalticidaeError(SALTI_ERROR_FD); + wait_sig(true) { + //fd(eventfd(0, EFD_NONBLOCK)) { + //if (fd == -1) throw SalticidaeError(SALTI_ERROR_FD); + } + ~MPSCQueueEventDriven() { + //close(fd); + unreg_handler(); } - ~MPSCQueueEventDriven() { close(fd); unreg_handler(); } template<typename Func> void reg_handler(const EventContext &ec, Func &&func) { - ev = FdEvent(ec, fd, + ev = FdEvent(ec, nfd.read_fd(), [this, func=std::forward<Func>(func)](int, int) { //fprintf(stderr, "%x\n", std::this_thread::get_id()); - uint64_t t; - read(fd, &t, 8); + //uint64_t t; + //read(fd, &t, 8); + nfd.reset(); // the only undesirable case is there are some new items // enqueued before recovering wait_sig to true, so the consumer // won't be notified. In this case, no enqueuing thread will @@ -575,7 +628,8 @@ class MPSCQueueEventDriven: public MPSCQueue<T> { // to see those enqueued values in func() wait_sig.exchange(true, std::memory_order_acq_rel); if (func(*this)) - write(fd, &dummy, 8); + nfd.notify(); + //write(fd, &dummy, 8); }); ev.add(FdEvent::READ); } @@ -584,14 +638,14 @@ class MPSCQueueEventDriven: public MPSCQueue<T> { template<typename U> bool enqueue(U &&e, bool unbounded = true) { - static const uint64_t dummy = 1; if (!MPSCQueue<T>::enqueue(std::forward<U>(e), unbounded)) return false; // memory barrier here, so any load/store in enqueue must be finialized if (wait_sig.exchange(false, std::memory_order_acq_rel)) { //SALTICIDAE_LOG_DEBUG("mpsc notify"); - write(fd, &dummy, 8); + //write(fd, &dummy, 8); + nfd.notify(); } return true; } @@ -599,33 +653,39 @@ class MPSCQueueEventDriven: public MPSCQueue<T> { template<typename U> bool try_enqueue(U &&e) = delete; }; +// NOTE: the MPMC implementation below hasn't been heavily tested. template<typename T> class MPMCQueueEventDriven: public MPMCQueue<T> { private: - const uint64_t dummy = 1; std::atomic<bool> wait_sig; - int fd; + //int fd; + NotifyFd nfd; std::vector<FdEvent> evs; public: MPMCQueueEventDriven(): - wait_sig(true), - fd(eventfd(0, EFD_NONBLOCK)) { - if (fd == -1) throw SalticidaeError(SALTI_ERROR_FD); + wait_sig(true) { + //fd(eventfd(0, EFD_NONBLOCK)) { + //if (fd == -1) throw SalticidaeError(SALTI_ERROR_FD); + } + ~MPMCQueueEventDriven() { + //close(fd); + unreg_handlers(); } - ~MPMCQueueEventDriven() { close(fd); unreg_handlers(); } // this function is *NOT* thread-safe template<typename Func> void reg_handler(const EventContext &ec, Func &&func) { - FdEvent ev(ec, fd, [this, func=std::forward<Func>(func)](int, int) { + FdEvent ev(ec, nfd.read_fd(), [this, func=std::forward<Func>(func)](int, int) { //fprintf(stderr, "%x\n", std::this_thread::get_id()); uint64_t t; - if (read(fd, &t, 8) != 8) return; + //if (read(fd, &t, 8) != 8) return; + if (!nfd.reset()) return; // only one consumer should be here a a time wait_sig.exchange(true, std::memory_order_acq_rel); if (func(*this)) - write(fd, &dummy, 8); + //write(fd, &dummy, 8); + nfd.notify(); }); ev.add(FdEvent::READ); evs.push_back(std::move(ev)); @@ -635,14 +695,14 @@ class MPMCQueueEventDriven: public MPMCQueue<T> { template<typename U> bool enqueue(U &&e, bool unbounded = true) { - static const uint64_t dummy = 1; if (!MPMCQueue<T>::enqueue(std::forward<U>(e), unbounded)) return false; // memory barrier here, so any load/store in enqueue must be finialized if (wait_sig.exchange(false, std::memory_order_acq_rel)) { //SALTICIDAE_LOG_DEBUG("mpmc notify"); - write(fd, &dummy, 8); + //write(fd, &dummy, 8); + nfd.notify(); } return true; } diff --git a/include/salticidae/stream.h b/include/salticidae/stream.h index fb951cb..1ac166a 100644 --- a/include/salticidae/stream.h +++ b/include/salticidae/stream.h @@ -349,7 +349,7 @@ class _Bits { ndata = (nbits + bit_per_datum - 1) / bit_per_datum; data = new _impl_type[ndata]; - uint8_t *end = arr + len; + const uint8_t *end = arr + len; for (_impl_type *ptr = data.get(); ptr < data.get() + ndata;) { _impl_type x = 0; diff --git a/include/salticidae/type.h b/include/salticidae/type.h index 2d97ae9..8080d6b 100644 --- a/include/salticidae/type.h +++ b/include/salticidae/type.h @@ -38,6 +38,8 @@ #include <functional> #include <mutex> +#include "salticidae/endian.h" + namespace salticidae { const auto _1 = std::placeholders::_1; diff --git a/src/conn.cpp b/src/conn.cpp index dab10dc..311b1f8 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -33,6 +33,14 @@ #include "salticidae/util.h" #include "salticidae/conn.h" +#if !defined(SOL_TCP) && defined(IPPROTO_TCP) +#define SOL_TCP IPPROTO_TCP +#endif + +#if !defined(TCP_KEEPIDLE) && defined(TCP_KEEPALIVE) +#define TCP_KEEPIDLE TCP_KEEPALIVE +#endif + namespace salticidae { ConnPool::Conn::operator std::string() const { @@ -64,7 +72,7 @@ void ConnPool::Conn::_send_data(const conn_t &conn, int fd, int events) { bytearray_t buff_seg = conn->send_buffer.move_pop(); ssize_t size = buff_seg.size(); if (!size) break; - ret = send(fd, buff_seg.data(), size, MSG_NOSIGNAL); + ret = send(fd, buff_seg.data(), size, 0); SALTICIDAE_LOG_DEBUG("socket sent %zd bytes", ret); size -= ret; if (size > 0) @@ -330,7 +338,7 @@ void ConnPool::accept_client(int fd, int) { void ConnPool::conn_server(const conn_t &conn, int fd, int events) { try { - if (send(fd, "", 0, MSG_NOSIGNAL) == 0) + if (send(fd, "", 0, 0) == 0) { conn->ev_connect.del(); SALTICIDAE_LOG_INFO("connected to remote %s", std::string(*conn).c_str()); diff --git a/src/event.cpp b/src/event.cpp index 18aca18..16a3c41 100644 --- a/src/event.cpp +++ b/src/event.cpp @@ -2,6 +2,10 @@ #ifdef SALTICIDAE_CBINDINGS #include "salticidae/event.h" +#if defined(__linux__) +const uint64_t salticidae::NotifyFd::dummy = 1; +#endif + extern "C" { eventcontext_t *eventcontext_new() { return new eventcontext_t(); } |