aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/salticidae/conn.h4
-rw-r--r--include/salticidae/endian.h119
-rw-r--r--include/salticidae/event.h108
-rw-r--r--include/salticidae/stream.h2
-rw-r--r--include/salticidae/type.h2
-rw-r--r--src/conn.cpp12
-rw-r--r--src/event.cpp4
7 files changed, 224 insertions, 27 deletions
diff --git a/include/salticidae/conn.h b/include/salticidae/conn.h
index e20a03c..2f72376 100644
--- a/include/salticidae/conn.h
+++ b/include/salticidae/conn.h
@@ -52,7 +52,9 @@ namespace salticidae {
/** Abstraction for connection management. */
class ConnPool {
+ protected:
class Worker;
+
public:
class Conn;
/** The handle to a bi-directional connection. */
@@ -212,6 +214,7 @@ class ConnPool {
});
}
+ protected:
class Worker {
EventContext ec;
ThreadCall tcall;
@@ -326,6 +329,7 @@ class ConnPool {
void stop_tcall() { tcall.stop(); }
};
+ private:
/* related to workers */
size_t nworker;
salticidae::BoxObj<Worker[]> workers;
diff --git a/include/salticidae/endian.h b/include/salticidae/endian.h
new file mode 100644
index 0000000..cfbdaff
--- /dev/null
+++ b/include/salticidae/endian.h
@@ -0,0 +1,119 @@
+// Ted: this file is downloaded from https://gist.github.com/panzi/6856583
+// "License": Public Domain
+// I, Mathias Panzenböck, place this file hereby into the public domain. Use it at your own risk for whatever you like.
+// In case there are jurisdictions that don't support putting things in the public domain you can also consider it to
+// be "dual licensed" under the BSD, MIT and Apache licenses, if you want to. This code is trivial anyway. Consider it
+// an example on how to get the endian conversion functions on different platforms.
+
+#ifndef PORTABLE_ENDIAN_H__
+#define PORTABLE_ENDIAN_H__
+
+#if (defined(_WIN16) || defined(_WIN32) || defined(_WIN64)) && !defined(__WINDOWS__)
+
+# define __WINDOWS__
+
+#endif
+
+#if defined(__linux__) || defined(__CYGWIN__)
+
+# include <endian.h>
+
+#elif defined(__APPLE__)
+
+# include <libkern/OSByteOrder.h>
+
+# define htobe16(x) OSSwapHostToBigInt16(x)
+# define htole16(x) OSSwapHostToLittleInt16(x)
+# define be16toh(x) OSSwapBigToHostInt16(x)
+# define le16toh(x) OSSwapLittleToHostInt16(x)
+
+# define htobe32(x) OSSwapHostToBigInt32(x)
+# define htole32(x) OSSwapHostToLittleInt32(x)
+# define be32toh(x) OSSwapBigToHostInt32(x)
+# define le32toh(x) OSSwapLittleToHostInt32(x)
+
+# define htobe64(x) OSSwapHostToBigInt64(x)
+# define htole64(x) OSSwapHostToLittleInt64(x)
+# define be64toh(x) OSSwapBigToHostInt64(x)
+# define le64toh(x) OSSwapLittleToHostInt64(x)
+
+# define __BYTE_ORDER BYTE_ORDER
+# define __BIG_ENDIAN BIG_ENDIAN
+# define __LITTLE_ENDIAN LITTLE_ENDIAN
+# define __PDP_ENDIAN PDP_ENDIAN
+
+#elif defined(__OpenBSD__)
+
+# include <sys/endian.h>
+
+#elif defined(__NetBSD__) || defined(__FreeBSD__) || defined(__DragonFly__)
+
+# include <sys/endian.h>
+
+# define be16toh(x) betoh16(x)
+# define le16toh(x) letoh16(x)
+
+# define be32toh(x) betoh32(x)
+# define le32toh(x) letoh32(x)
+
+# define be64toh(x) betoh64(x)
+# define le64toh(x) letoh64(x)
+
+#elif defined(__WINDOWS__)
+
+# include <winsock2.h>
+# include <sys/param.h>
+
+# if BYTE_ORDER == LITTLE_ENDIAN
+
+# define htobe16(x) htons(x)
+# define htole16(x) (x)
+# define be16toh(x) ntohs(x)
+# define le16toh(x) (x)
+
+# define htobe32(x) htonl(x)
+# define htole32(x) (x)
+# define be32toh(x) ntohl(x)
+# define le32toh(x) (x)
+
+# define htobe64(x) htonll(x)
+# define htole64(x) (x)
+# define be64toh(x) ntohll(x)
+# define le64toh(x) (x)
+
+# elif BYTE_ORDER == BIG_ENDIAN
+
+ /* that would be xbox 360 */
+# define htobe16(x) (x)
+# define htole16(x) __builtin_bswap16(x)
+# define be16toh(x) (x)
+# define le16toh(x) __builtin_bswap16(x)
+
+# define htobe32(x) (x)
+# define htole32(x) __builtin_bswap32(x)
+# define be32toh(x) (x)
+# define le32toh(x) __builtin_bswap32(x)
+
+# define htobe64(x) (x)
+# define htole64(x) __builtin_bswap64(x)
+# define be64toh(x) (x)
+# define le64toh(x) __builtin_bswap64(x)
+
+# else
+
+# error byte order not supported
+
+# endif
+
+# define __BYTE_ORDER BYTE_ORDER
+# define __BIG_ENDIAN BIG_ENDIAN
+# define __LITTLE_ENDIAN LITTLE_ENDIAN
+# define __PDP_ENDIAN PDP_ENDIAN
+
+#else
+
+# error platform not supported
+
+#endif
+
+#endif
diff --git a/include/salticidae/event.h b/include/salticidae/event.h
index 2f22e1a..fe94c0a 100644
--- a/include/salticidae/event.h
+++ b/include/salticidae/event.h
@@ -29,7 +29,6 @@
#include <condition_variable>
#include <unistd.h>
#include <uv.h>
-#include <sys/eventfd.h>
#include "salticidae/type.h"
#include "salticidae/queue.h"
@@ -544,29 +543,83 @@ class ThreadNotifier {
}
};
+#if defined(__linux__)
+#include <sys/eventfd.h>
+
+class NotifyFd {
+ int fd;
+ static const uint64_t dummy;
+ public:
+ NotifyFd(): fd(eventfd(0, EFD_NONBLOCK)) {
+ if (fd < 0) throw SalticidaeError(SALTI_ERROR_FD);
+ }
+ bool reset() {
+ uint64_t _;
+ return read(fd, &_, 8) == 8;
+ }
+ void notify() { write(fd, &dummy, 8); }
+ int read_fd() { return fd; }
+ ~NotifyFd() { close(fd); }
+};
+
+#elif defined(__APPLE__)
+// NOTE: using kqueue/kevent with EVFILT_USER is optimal, but libuv doesn't
+// seem to offer such interface for such user level kevent (and its
+// identifier). Thus, we downgrade to pipe-based solution on OSX/BSD system.
+
+class NotifyFd {
+ int fds[0];
+ uint8_t dummy[8];
+ public:
+ NotifyFd() {
+ if (pipe(fds) < 0 ||
+ fcntl(fds[0], F_SETFL, O_NONBLOCK))
+ throw SalticidaeError(SALTI_ERROR_FD);
+ }
+ bool reset() {
+ return read(fds[0], dummy, 8) == 8;
+ }
+ void notify() {
+ write(fds[1], dummy, 8);
+ }
+ int read_fd() { return fds[0]; }
+ ~NotifyFd() {
+ close(fds[0]);
+ close(fds[1]);
+ }
+};
+
+#else
+#warning "platform not supported!"
+#endif
+
template<typename T>
class MPSCQueueEventDriven: public MPSCQueue<T> {
private:
- const uint64_t dummy = 1;
std::atomic<bool> wait_sig;
- int fd;
+ //int fd;
+ NotifyFd nfd;
FdEvent ev;
public:
MPSCQueueEventDriven():
- wait_sig(true),
- fd(eventfd(0, EFD_NONBLOCK)) {
- if (fd == -1) throw SalticidaeError(SALTI_ERROR_FD);
+ wait_sig(true) {
+ //fd(eventfd(0, EFD_NONBLOCK)) {
+ //if (fd == -1) throw SalticidaeError(SALTI_ERROR_FD);
+ }
+ ~MPSCQueueEventDriven() {
+ //close(fd);
+ unreg_handler();
}
- ~MPSCQueueEventDriven() { close(fd); unreg_handler(); }
template<typename Func>
void reg_handler(const EventContext &ec, Func &&func) {
- ev = FdEvent(ec, fd,
+ ev = FdEvent(ec, nfd.read_fd(),
[this, func=std::forward<Func>(func)](int, int) {
//fprintf(stderr, "%x\n", std::this_thread::get_id());
- uint64_t t;
- read(fd, &t, 8);
+ //uint64_t t;
+ //read(fd, &t, 8);
+ nfd.reset();
// the only undesirable case is there are some new items
// enqueued before recovering wait_sig to true, so the consumer
// won't be notified. In this case, no enqueuing thread will
@@ -575,7 +628,8 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
// to see those enqueued values in func()
wait_sig.exchange(true, std::memory_order_acq_rel);
if (func(*this))
- write(fd, &dummy, 8);
+ nfd.notify();
+ //write(fd, &dummy, 8);
});
ev.add(FdEvent::READ);
}
@@ -584,14 +638,14 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
template<typename U>
bool enqueue(U &&e, bool unbounded = true) {
- static const uint64_t dummy = 1;
if (!MPSCQueue<T>::enqueue(std::forward<U>(e), unbounded))
return false;
// memory barrier here, so any load/store in enqueue must be finialized
if (wait_sig.exchange(false, std::memory_order_acq_rel))
{
//SALTICIDAE_LOG_DEBUG("mpsc notify");
- write(fd, &dummy, 8);
+ //write(fd, &dummy, 8);
+ nfd.notify();
}
return true;
}
@@ -599,33 +653,39 @@ class MPSCQueueEventDriven: public MPSCQueue<T> {
template<typename U> bool try_enqueue(U &&e) = delete;
};
+// NOTE: the MPMC implementation below hasn't been heavily tested.
template<typename T>
class MPMCQueueEventDriven: public MPMCQueue<T> {
private:
- const uint64_t dummy = 1;
std::atomic<bool> wait_sig;
- int fd;
+ //int fd;
+ NotifyFd nfd;
std::vector<FdEvent> evs;
public:
MPMCQueueEventDriven():
- wait_sig(true),
- fd(eventfd(0, EFD_NONBLOCK)) {
- if (fd == -1) throw SalticidaeError(SALTI_ERROR_FD);
+ wait_sig(true) {
+ //fd(eventfd(0, EFD_NONBLOCK)) {
+ //if (fd == -1) throw SalticidaeError(SALTI_ERROR_FD);
+ }
+ ~MPMCQueueEventDriven() {
+ //close(fd);
+ unreg_handlers();
}
- ~MPMCQueueEventDriven() { close(fd); unreg_handlers(); }
// this function is *NOT* thread-safe
template<typename Func>
void reg_handler(const EventContext &ec, Func &&func) {
- FdEvent ev(ec, fd, [this, func=std::forward<Func>(func)](int, int) {
+ FdEvent ev(ec, nfd.read_fd(), [this, func=std::forward<Func>(func)](int, int) {
//fprintf(stderr, "%x\n", std::this_thread::get_id());
uint64_t t;
- if (read(fd, &t, 8) != 8) return;
+ //if (read(fd, &t, 8) != 8) return;
+ if (!nfd.reset()) return;
// only one consumer should be here a a time
wait_sig.exchange(true, std::memory_order_acq_rel);
if (func(*this))
- write(fd, &dummy, 8);
+ //write(fd, &dummy, 8);
+ nfd.notify();
});
ev.add(FdEvent::READ);
evs.push_back(std::move(ev));
@@ -635,14 +695,14 @@ class MPMCQueueEventDriven: public MPMCQueue<T> {
template<typename U>
bool enqueue(U &&e, bool unbounded = true) {
- static const uint64_t dummy = 1;
if (!MPMCQueue<T>::enqueue(std::forward<U>(e), unbounded))
return false;
// memory barrier here, so any load/store in enqueue must be finialized
if (wait_sig.exchange(false, std::memory_order_acq_rel))
{
//SALTICIDAE_LOG_DEBUG("mpmc notify");
- write(fd, &dummy, 8);
+ //write(fd, &dummy, 8);
+ nfd.notify();
}
return true;
}
diff --git a/include/salticidae/stream.h b/include/salticidae/stream.h
index fb951cb..1ac166a 100644
--- a/include/salticidae/stream.h
+++ b/include/salticidae/stream.h
@@ -349,7 +349,7 @@ class _Bits {
ndata = (nbits + bit_per_datum - 1) / bit_per_datum;
data = new _impl_type[ndata];
- uint8_t *end = arr + len;
+ const uint8_t *end = arr + len;
for (_impl_type *ptr = data.get(); ptr < data.get() + ndata;)
{
_impl_type x = 0;
diff --git a/include/salticidae/type.h b/include/salticidae/type.h
index 2d97ae9..8080d6b 100644
--- a/include/salticidae/type.h
+++ b/include/salticidae/type.h
@@ -38,6 +38,8 @@
#include <functional>
#include <mutex>
+#include "salticidae/endian.h"
+
namespace salticidae {
const auto _1 = std::placeholders::_1;
diff --git a/src/conn.cpp b/src/conn.cpp
index dab10dc..311b1f8 100644
--- a/src/conn.cpp
+++ b/src/conn.cpp
@@ -33,6 +33,14 @@
#include "salticidae/util.h"
#include "salticidae/conn.h"
+#if !defined(SOL_TCP) && defined(IPPROTO_TCP)
+#define SOL_TCP IPPROTO_TCP
+#endif
+
+#if !defined(TCP_KEEPIDLE) && defined(TCP_KEEPALIVE)
+#define TCP_KEEPIDLE TCP_KEEPALIVE
+#endif
+
namespace salticidae {
ConnPool::Conn::operator std::string() const {
@@ -64,7 +72,7 @@ void ConnPool::Conn::_send_data(const conn_t &conn, int fd, int events) {
bytearray_t buff_seg = conn->send_buffer.move_pop();
ssize_t size = buff_seg.size();
if (!size) break;
- ret = send(fd, buff_seg.data(), size, MSG_NOSIGNAL);
+ ret = send(fd, buff_seg.data(), size, 0);
SALTICIDAE_LOG_DEBUG("socket sent %zd bytes", ret);
size -= ret;
if (size > 0)
@@ -330,7 +338,7 @@ void ConnPool::accept_client(int fd, int) {
void ConnPool::conn_server(const conn_t &conn, int fd, int events) {
try {
- if (send(fd, "", 0, MSG_NOSIGNAL) == 0)
+ if (send(fd, "", 0, 0) == 0)
{
conn->ev_connect.del();
SALTICIDAE_LOG_INFO("connected to remote %s", std::string(*conn).c_str());
diff --git a/src/event.cpp b/src/event.cpp
index 18aca18..16a3c41 100644
--- a/src/event.cpp
+++ b/src/event.cpp
@@ -2,6 +2,10 @@
#ifdef SALTICIDAE_CBINDINGS
#include "salticidae/event.h"
+#if defined(__linux__)
+const uint64_t salticidae::NotifyFd::dummy = 1;
+#endif
+
extern "C" {
eventcontext_t *eventcontext_new() { return new eventcontext_t(); }