aboutsummaryrefslogtreecommitdiff
path: root/fastnn/threads
diff options
context:
space:
mode:
Diffstat (limited to 'fastnn/threads')
-rw-r--r--fastnn/threads/Makefile42
-rw-r--r--fastnn/threads/init.lua14
-rw-r--r--fastnn/threads/lib/THThread.c349
-rw-r--r--fastnn/threads/lib/THThread.h36
-rw-r--r--fastnn/threads/lib/init.c27
-rw-r--r--fastnn/threads/lib/luaTHRD.h84
-rw-r--r--fastnn/threads/lib/threads.c355
-rw-r--r--fastnn/threads/test/test-low-level.lua39
-rw-r--r--fastnn/threads/test/test-threads-async.lua66
-rw-r--r--fastnn/threads/test/test-threads-multiple.lua15
-rw-r--r--fastnn/threads/test/test-threads-shared.lua111
-rw-r--r--fastnn/threads/test/test-threads.lua20
-rw-r--r--fastnn/threads/threads-scm-1.rockspec36
-rw-r--r--fastnn/threads/threads.lua14
14 files changed, 1208 insertions, 0 deletions
diff --git a/fastnn/threads/Makefile b/fastnn/threads/Makefile
new file mode 100644
index 0000000..17958f9
--- /dev/null
+++ b/fastnn/threads/Makefile
@@ -0,0 +1,42 @@
+.PHONY: build install clean
+SHELL := /bin/bash
+BUILD_DIR := $(CURDIR)/build
+INC_PATH := $(LUA_BINDIR)/../include/nerv/luaT
+LUA_DIR = $(INST_LUADIR)/threads
+LIB_PATH := $(LUA_BINDIR)/../lib
+
+OBJ_DIR := $(BUILD_DIR)/objs
+SUBDIR := lib test
+
+
+OBJS := lib/init.o lib/threads.o lib/THThread.o
+LIBS := $(INST_LIBDIR)/libthreads.so
+LUA_LIBS := init.lua threads.lua test/test-threads.lua
+INCLUDE := -I $(LUA_INCDIR) -I $(INC_PATH) -DLUA_USE_APICHECK -DUSE_PTHREAD_THREADS
+
+
+OBJS := $(addprefix $(OBJ_DIR)/,$(OBJS))
+OBJ_SUBDIR := $(addprefix $(OBJ_DIR)/,$(SUBDIR))
+LUA_SUBDIR := $(addprefix $(LUA_DIR)/,$(SUBDIR))
+LUA_LIBS := $(addprefix $(LUA_DIR)/,$(LUA_LIBS))
+
+CFLAGS := -Wall -Wextra -O0
+
+build: $(OBJ_DIR) $(OBJ_SUBDIR) $(OBJS)
+install: $(LUA_DIR) $(LUA_SUBDIR) $(LIBS) $(LUA_LIBS)
+
+$(OBJ_DIR) $(LUA_DIR) $(OBJ_SUBDIR) $(LUA_SUBDIR):
+ -mkdir -p $@
+$(LUA_DIR)/%.lua: %.lua
+ cp $< $@
+
+$(OBJ_DIR)/%.o: %.c $(patsubst /%.o,/%.c,$@)
+ gcc -c -o $@ $< $(INCLUDE) -fPIC $(CFLAGS)
+
+$(LIBS): $(OBJS)
+ gcc -shared -o $@ $^ $(LDFLAGS) -Wl,-rpath=$(LIB_PATH) -L$(LIB_PATH) -lnervcore -lluaT -lpthread
+
+clean:
+ -rm -rf $(OBJ_DIR)
+
+
diff --git a/fastnn/threads/init.lua b/fastnn/threads/init.lua
new file mode 100644
index 0000000..6cd3679
--- /dev/null
+++ b/fastnn/threads/init.lua
@@ -0,0 +1,14 @@
+threads = {}
+
+local C = require 'libthreads'
+
+threads.Thread = C.Thread
+threads.Mutex = C.Mutex
+threads.Condition = C.Condition
+threads.Semaphore = C.Semaphore
+--threads.Threads = require 'threads.threads'
+
+-- only for backward compatibility (boo)
+-- setmetatable(threads, getmetatable(threads.Threads))
+
+-- return threads
diff --git a/fastnn/threads/lib/THThread.c b/fastnn/threads/lib/THThread.c
new file mode 100644
index 0000000..7abc044
--- /dev/null
+++ b/fastnn/threads/lib/THThread.c
@@ -0,0 +1,349 @@
+#ifndef TH_THREAD_INC
+#define TH_THREAD_INC
+
+#include <stdlib.h>
+#include <string.h>
+
+/*#include "TH.h" */
+#include "THThread.h"
+
+#if defined(USE_PTHREAD_THREADS)
+#include <pthread.h>
+
+#elif defined(USE_WIN32_THREADS)
+
+/* very basic emulation to suit our needs */
+
+#include <process.h>
+#include <windows.h>
+
+typedef HANDLE pthread_t;
+typedef DWORD pthread_attr_t;
+typedef HANDLE pthread_mutex_t;
+typedef HANDLE pthread_cond_t;
+
+static int pthread_create(pthread_t *restrict thread,
+ const pthread_attr_t *restrict attr, void *(*start_routine)(void *),
+ void *restrict arg)
+{
+ *thread = (HANDLE)_beginthreadex(NULL, 0, (THREAD_FUNCTION)start_routine, arg, 0, NULL);
+ return (int)(*thread == NULL);
+}
+
+static int pthread_join(pthread_t thread, void **value_ptr)
+{
+ return ((WaitForSingleObject((thread), INFINITE) != WAIT_OBJECT_0) || !CloseHandle(thread));
+}
+
+static int pthread_mutex_init(pthread_mutex_t *restrict mutex,
+ const pthread_mutexattr_t *restrict attr)
+{
+ *mutex = CreateMutex(NULL, FALSE, NULL);
+ return (int)(*mutex == NULL);
+}
+
+static int pthread_mutex_lock(pthread_mutex_t *mutex)
+{
+ return WaitForSingleObject(*mutex, INFINITE) == 0;
+}
+
+static int pthread_mutex_unlock(pthread_mutex_t *mutex)
+{
+ return ReleaseMutex(*mutex) == 0;
+}
+
+static int pthread_mutex_destroy(pthread_mutex_t *mutex)
+{
+ return CloseHandle(*mutex) == 0;
+}
+
+static int pthread_cond_init(pthread_cond_t *restrict cond,
+ const pthread_condattr_t *restrict attr)
+{
+ *cond = CreateEvent(NULL, FALSE, FALSE, NULL);
+ return (int)(*cond == NULL);
+}
+
+static int pthread_cond_wait(pthread_cond_t *restrict cond,
+ pthread_mutex_t *restrict mutex)
+{
+ SignalObjectAndWait(*mutex, *cond, INFINITE, FALSE);
+ return WaitForSingleObject(*mutex, INFINITE) == 0;
+}
+
+static int pthread_cond_destroy(pthread_cond_t *cond)
+{
+ return CloseHandle(*cond) == 0;
+}
+
+int pthread_cond_signal(pthread_cond_t *cond)
+{
+ return SetEvent(*cond) == 0;
+}
+
+#else
+#error no thread system available
+#endif
+
+typedef struct THThread_ {
+ pthread_t id;
+ int (*func)(void*);
+ void* data;
+ int status;
+} THThread;
+
+typedef struct THMutex_{
+ pthread_mutex_t id;
+ int refcount;
+} THMutex;
+
+typedef struct THCondition_ {
+ pthread_cond_t id;
+ int refcount;
+} THCondition;
+
+typedef struct THSemaphore_ {
+ pthread_mutex_t mutex_;
+ pthread_cond_t cond_;
+ int counter_;
+ int refcount;
+}THSemaphore;
+
+static void* thread_closure(void *data)
+{
+ THThread *thread = data;
+ thread->status = thread->func(thread->data);
+ return NULL;
+}
+
+THThread* THThread_new(int (*func)(void*), void *data)
+{
+ THThread *self = malloc(sizeof(THThread));
+ self->func = func;
+ self->data = data;
+ self->status = 0;
+ if(!self)
+ return NULL;
+ if(pthread_create(&self->id, NULL, thread_closure, self)) {
+ free(self);
+ return NULL;
+ }
+ return self;
+}
+
+long THThread_id(THThread *self)
+{
+ return (long)(self);
+}
+
+int THThread_free(THThread *self)
+{
+ int status = 1;
+ if(self) {
+ if(pthread_join(self->id, NULL))
+ return 1;
+ status = self->status;
+ free(self);
+ self = NULL;
+ }
+ return status;
+}
+
+THMutex* THMutex_new(void)
+{
+ THMutex *self = malloc(sizeof(THMutex));
+ if(!self)
+ return NULL;
+ if(pthread_mutex_init(&self->id, NULL) != 0) {
+ free(self);
+ return NULL;
+ }
+ self->refcount = 1;
+ return self;
+}
+
+THMutex* THMutex_newWithId(long id)
+{
+ THMutex *self = (THMutex*)id;
+ __sync_fetch_and_add(&self->refcount, 1);
+ return self;
+}
+
+long THMutex_id(THMutex *self)
+{
+ return (long)(self);
+}
+
+int THMutex_lock(THMutex *self)
+{
+ if(pthread_mutex_lock(&self->id) != 0)
+ return 1;
+ return 0;
+}
+
+int THMutex_trylock(THMutex *self)
+{
+ if (pthread_mutex_trylock(&self->id) != 0)
+ return 1;
+ return 0;
+}
+
+int THMutex_unlock(THMutex *self)
+{
+ if(pthread_mutex_unlock(&self->id) != 0)
+ return 1;
+ return 0;
+}
+
+void THMutex_free(THMutex *self)
+{
+ if(self) {
+ if(__sync_fetch_and_add(&self->refcount, -1) == 1) {
+ pthread_mutex_destroy(&self->id);
+ free(self);
+ self = NULL;
+ }
+ }
+}
+
+THCondition* THCondition_new(void)
+{
+ THCondition *self = malloc(sizeof(THCondition));
+ if(!self)
+ return NULL;
+ if(pthread_cond_init(&self->id, NULL)) {
+ free(self);
+ return NULL;
+ }
+ self->refcount = 1;
+ return self;
+}
+
+THCondition* THCondition_newWithId(long id)
+{
+ THCondition *self = (THCondition*)id;
+ __sync_fetch_and_add(&self->refcount, 1);
+ return self;
+}
+
+long THCondition_id(THCondition *self)
+{
+ return (long)(self);
+}
+
+int THCondition_signal(THCondition *self)
+{
+ if(pthread_cond_signal(&self->id))
+ return 1;
+ return 0;
+}
+
+int THCondition_wait(THCondition *self, THMutex *mutex)
+{
+ if(pthread_cond_wait(&self->id, &mutex->id))
+ return 1;
+ return 0;
+}
+
+void THCondition_free(THCondition *self)
+{
+ if(self) {
+ if(__sync_fetch_and_add(&self->refcount, -1) == 1) {
+ pthread_cond_destroy(&self->id);
+ free(self);
+ self = NULL;
+ }
+ }
+}
+
+THSemaphore* THSemaphore_new(int initValue)
+{
+ THSemaphore *self = malloc(sizeof(THSemaphore));
+ if(!self)
+ return NULL;
+ self->counter_ = initValue;
+ if(pthread_mutex_init(&self->mutex_, NULL) != 0)
+ {
+ free(self);
+ return NULL;
+ }
+
+ if(pthread_cond_init(&self->cond_, NULL) != 0)
+ {
+ free(self);
+ return NULL;
+ }
+
+ self->refcount = 1;
+
+ return self;
+}
+
+THSemaphore* THSemaphore_newWithId(long id)
+{
+ THSemaphore *self = (THSemaphore*)id;
+ __sync_fetch_and_add(&self->refcount, 1);
+ return self;
+}
+
+long THSemaphore_id(THSemaphore *self)
+{
+ return (long)(self);
+}
+
+int THSemaphore_wait(THSemaphore *self)
+{
+ int ret = 0;
+ ret |= pthread_mutex_lock(&self->mutex_);
+
+ while (self->counter_ <= 0)
+ ret |= pthread_cond_wait(&self->cond_, &self->mutex_);
+
+ self->counter_--;
+ ret |= pthread_mutex_unlock(&self->mutex_);
+
+ return ret;
+
+}
+
+int THSemaphore_signal(THSemaphore *self)
+{
+ int ret = 0;
+ ret |= pthread_mutex_lock(&self->mutex_);
+ self->counter_++;
+ ret |= pthread_cond_signal(&self->cond_);
+ ret |= pthread_mutex_unlock(&self->mutex_);
+
+ return ret;
+}
+
+int THSemaphore_trywait(THSemaphore *self)
+{
+ int ret = 0;
+ int try_wait_succeeded = 1;
+ ret |= pthread_mutex_lock(&self->mutex_);
+ if (self->counter_ > 0) {
+ self->counter_--;
+ try_wait_succeeded = 0;
+ }
+ ret |= pthread_mutex_unlock(&self->mutex_);
+ return try_wait_succeeded;
+}
+
+void THSemaphore_free(THSemaphore *self)
+{
+ if(self)
+ {
+ if(__sync_fetch_and_add(&self->refcount, -1) == 1)
+ {
+ pthread_mutex_destroy(&self->mutex_);
+ pthread_cond_destroy(&self->cond_);
+ free(self);
+ self = NULL;
+ }
+ }
+ }
+
+
+
+#endif
diff --git a/fastnn/threads/lib/THThread.h b/fastnn/threads/lib/THThread.h
new file mode 100644
index 0000000..75ba417
--- /dev/null
+++ b/fastnn/threads/lib/THThread.h
@@ -0,0 +1,36 @@
+#ifndef TH_THREAD_INC
+#define TH_THREAD_INC
+
+typedef struct THThread_ THThread;
+typedef struct THMutex_ THMutex;
+typedef struct THCondition_ THCondition;
+typedef struct THSemaphore_ THSemaphore;
+
+THThread* THThread_new(int (*closure)(void*), void *data);
+long THThread_id(THThread *self);
+int THThread_free(THThread *self);
+
+THMutex* THMutex_new(void);
+THMutex* THMutex_newWithId(long id);
+long THMutex_id(THMutex *self);
+int THMutex_lock(THMutex *self);
+int THMutex_unlock(THMutex *self);
+int THMutex_trylock(THMutex *self);
+void THMutex_free(THMutex *self);
+
+THCondition* THCondition_new(void);
+THCondition* THCondition_newWithId(long id);
+long THCondition_id(THCondition *self);
+int THCondition_signal(THCondition *self);
+int THCondition_wait(THCondition *self, THMutex *mutex);
+void THCondition_free(THCondition *self);
+
+THSemaphore* THSemaphore_new(int initValue);
+THSemaphore* THSemaphore_newWithId(long id);
+long THSemaphore_id(THSemaphore *self);
+int THSemaphore_signal(THSemaphore *self);
+int THSemaphore_wait(THSemaphore *self);
+int THSemaphore_trywait(THSemaphore *self);
+void THSemaphore_free(THSemaphore *self);
+
+#endif
diff --git a/fastnn/threads/lib/init.c b/fastnn/threads/lib/init.c
new file mode 100644
index 0000000..4042189
--- /dev/null
+++ b/fastnn/threads/lib/init.c
@@ -0,0 +1,27 @@
+#include <lua.h>
+#include <lauxlib.h>
+
+#if LUA_VERSION_NUM == 501
+static void luaL_setfuncs(lua_State *L, const luaL_Reg *l, int nup)
+{
+ luaL_checkstack(L, nup+1, "too many upvalues");
+ for (; l->name != NULL; l++) { /* fill the table with given functions */
+ int i;
+ lua_pushstring(L, l->name);
+ for (i = 0; i < nup; i++) /* copy upvalues to the top */
+ lua_pushvalue(L, -(nup+1));
+ lua_pushcclosure(L, l->func, nup); /* closure with those upvalues */
+ lua_settable(L, -(nup + 3));
+ }
+ lua_pop(L, nup); /* remove upvalues */
+}
+#endif
+
+#include "threads.c"
+
+int luaopen_libthreads(lua_State *L)
+{
+ lua_newtable(L);
+ thread_init_pkg(L);
+ return 1;
+}
diff --git a/fastnn/threads/lib/luaTHRD.h b/fastnn/threads/lib/luaTHRD.h
new file mode 100644
index 0000000..3760bdb
--- /dev/null
+++ b/fastnn/threads/lib/luaTHRD.h
@@ -0,0 +1,84 @@
+#ifndef LUA_THRD_INC
+#define LUA_THRD_INC
+
+static int luaTHRD_pushudata(lua_State *L, void *ptr, const char* typename)
+{
+ void **udata = lua_newuserdata(L, sizeof(void*));
+ if(udata) {
+ *udata = ptr;
+ luaL_getmetatable(L, typename);
+ lua_setmetatable(L, -2);
+ return 1;
+ }
+ return 0;
+}
+
+static void *luaTHRD_checkudata(lua_State *L, int narg, const char *typename)
+{
+ void **udata = luaL_checkudata(L, narg, typename);
+ if(udata)
+ return *udata;
+ else
+ return NULL;
+}
+
+static void *luaTHRD_toudata(lua_State *L, int narg, const char *typename)
+{
+ void **udata = lua_touserdata(L, narg);
+ if(udata) {
+ if(lua_getmetatable(L, -1)) {
+ luaL_getmetatable(L, typename);
+ if(lua_equal(L, -1, -2)) {
+ lua_pop(L, 2);
+ return *udata;
+ }
+ else {
+ lua_pop(L, 2);
+ return NULL;
+ }
+ }
+ else
+ return NULL;
+ }
+ else
+ return NULL;
+}
+
+static int luaTHRD_ctor(lua_State *L)
+{
+ if(!lua_istable(L, 1)) /* dummy ctor table */
+ luaL_error(L, "ctor: table expected");
+ lua_getmetatable(L, 1);
+ lua_remove(L, 1); /* dummy ctor table */
+ if(!lua_istable(L, -1))
+ luaL_error(L, "ctor: no metatable found");
+ lua_pushstring(L, "__new");
+ lua_rawget(L, -2);
+ lua_remove(L, -2); /* forget about metatable */
+ if(!lua_isfunction(L, -1))
+ luaL_error(L, "ctor: __new appears to be not a function");
+ lua_insert(L, 1); /* ctor first, arguments follow */
+ lua_call(L, lua_gettop(L)-1, LUA_MULTRET);
+ return lua_gettop(L);
+}
+
+static void luaTHRD_pushctortable(lua_State *L, lua_CFunction ctor, const char* typename)
+{
+ lua_newtable(L); /* empty useless dude */
+ lua_newtable(L); /* metatable of the dude */
+ lua_pushstring(L, "__index");
+ luaL_getmetatable(L, typename);
+ lua_rawset(L, -3);
+ lua_pushstring(L, "__newindex");
+ luaL_getmetatable(L, typename);
+ lua_rawset(L, -3);
+ lua_pushstring(L, "__new"); /* __call will look into there */
+ lua_pushcfunction(L, ctor);
+ lua_rawset(L, -3);
+ lua_pushstring(L, "__call"); /* pop the table and calls __new */
+ lua_pushcfunction(L, luaTHRD_ctor);
+ lua_rawset(L, -3);
+ lua_setmetatable(L, -2);
+}
+
+#endif
diff --git a/fastnn/threads/lib/threads.c b/fastnn/threads/lib/threads.c
new file mode 100644
index 0000000..bf2a5ec
--- /dev/null
+++ b/fastnn/threads/lib/threads.c
@@ -0,0 +1,355 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <luaT.h>
+#include <string.h>
+
+#include "THThread.h"
+#include "luaTHRD.h"
+
+#include <lua.h>
+#include <lualib.h>
+
+static int newthread(void *code_)
+{
+ char *code = code_;
+ lua_State *L = luaL_newstate();
+
+ if(!L) {
+ printf("THREAD FATAL ERROR: could not create lua state\n");
+ return -1;
+ }
+ luaL_openlibs(L);
+
+ if(luaL_loadstring(L, code)) {
+ printf("FATAL THREAD PANIC: (loadstring) %s\n", lua_tolstring(L, -1, NULL));
+ free(code);
+ lua_close(L);
+ return -1;
+ }
+ free(code);
+ if(lua_pcall(L, 0, 0, 0)) {
+ printf("FATAL THREAD PANIC: (pcall) %s\n", lua_tolstring(L, -1, NULL));
+ lua_close(L);
+ return -1;
+ }
+
+ lua_close(L);
+ return 0;
+}
+
+static int thread_new(lua_State *L)
+{
+ THThread *thread = NULL;
+ size_t len = 0;
+ const char *code = luaL_checklstring(L, 1, &len);
+ char *code_dup = malloc(len+1);
+ if(!code_dup)
+ luaL_error(L, "threads: out of memory");
+ memcpy(code_dup, code, len+1);
+
+ thread = THThread_new(newthread, (void*)code_dup);
+ if(!thread)
+ luaL_error(L, "threads: thread new failed");
+ luaTHRD_pushudata(L, thread, "threads.Thread");
+
+ return 1;
+}
+
+static int thread_tostring(lua_State *L)
+{
+ char str[128];
+ THThread *thread = luaTHRD_checkudata(L, 1, "threads.Thread");
+ snprintf(str, 128, "threads.Thread <%lx>", THThread_id(thread));
+ lua_pushstring(L, str);
+ return 1;
+}
+
+static int thread_id(lua_State *L)
+{
+ THThread *thread = luaTHRD_checkudata(L, 1, "threads.Thread");
+ lua_pushinteger(L, THThread_id(thread));
+ return 1;
+}
+
+static int thread_free(lua_State *L)
+{
+ THThread *thread = luaTHRD_checkudata(L, 1, "threads.Thread");
+ THThread_free(thread);
+ return 0;
+}
+
+static int mutex_new(lua_State *L)
+{
+ THMutex *mutex = NULL;
+ if(lua_gettop(L) == 0) {
+ mutex = THMutex_new();
+ }
+ else if(lua_gettop(L) == 1) {
+ long id = luaL_checklong(L, 1);
+ mutex = THMutex_newWithId(id);
+ }
+ else
+ luaL_error(L, "threads: mutex new invalid arguments");
+ if(!mutex)
+ luaL_error(L, "threads: mutex new failed");
+ luaTHRD_pushudata(L, mutex, "threads.Mutex");
+ return 1;
+}
+
+static int mutex_tostring(lua_State *L)
+{
+ char str[128];
+ THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex");
+ snprintf(str, 128, "threads.Mutex <%lx>", THMutex_id(mutex));
+ lua_pushstring(L, str);
+ return 1;
+}
+
+static int mutex_id(lua_State *L)
+{
+ THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex");
+ lua_pushinteger(L, THMutex_id(mutex));
+ return 1;
+}
+
+static int mutex_lock(lua_State *L)
+{
+ THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex");
+ if(THMutex_lock(mutex))
+ luaL_error(L, "threads: mutex lock failed");
+ return 0;
+}
+
+static int mutex_unlock(lua_State *L)
+{
+ THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex");
+ if(THMutex_unlock(mutex))
+ luaL_error(L, "threads: mutex unlock failed");
+ return 0;
+}
+
+static int mutex_free(lua_State *L)
+{
+ THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex");
+ THMutex_free(mutex);
+ return 0;
+}
+
+static int condition_new(lua_State *L)
+{
+ THCondition *condition = NULL;
+ if(lua_gettop(L) == 0) {
+ condition = THCondition_new();
+ }
+ else if(lua_gettop(L) == 1) {
+ long id = luaL_checklong(L, 1);
+ condition = THCondition_newWithId(id);
+ }
+ else
+ luaL_error(L, "threads: condition new invalid arguments");
+ if(!condition)
+ luaL_error(L, "threads: condition new failed");
+ luaTHRD_pushudata(L, condition, "threads.Condition");
+ return 1;
+}
+
+static int condition_tostring(lua_State *L)
+{
+ char str[128];
+ THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition");
+ snprintf(str, 128, "threads.Condition <%lx>", THCondition_id(condition));
+ lua_pushstring(L, str);
+ return 1;
+}
+
+static int condition_id(lua_State *L)
+{
+ THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition");
+ lua_pushinteger(L, THCondition_id(condition));
+ return 1;
+}
+
+static int condition_free(lua_State *L)
+{
+ THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition");
+ if(!condition)
+ luaL_error(L, "threads: condition free failed");
+ THCondition_free(condition);
+ return 0;
+}
+
+static int condition_signal(lua_State *L)
+{
+ THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition");
+ if(THCondition_signal(condition))
+ luaL_error(L, "threads: condition signal failed");
+ return 0;
+}
+
+static int condition_wait(lua_State *L)
+{
+ THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition");
+ THMutex *mutex = luaTHRD_checkudata(L, 2, "threads.Mutex");
+ if(THCondition_wait(condition, mutex))
+ luaL_error(L, "threads: condition wait failed");
+ return 0;
+}
+
+static int semaphore_new(lua_State *L)
+{
+ THSemaphore* semaphore = NULL;
+ if(lua_gettop(L) == 1) {
+ int initValue = luaL_checkinteger(L, 1);
+ semaphore = THSemaphore_new(initValue);
+ }
+ else
+ luaL_error(L, "threads: semaphore new invalid arguments");
+ if (!semaphore)
+ luaL_error(L, "threads: semaphore new failed");
+ luaTHRD_pushudata(L, semaphore, "threads.Semaphore");
+ return 1;
+}
+
+static int semaphore_newfromid(lua_State *L)
+{
+ THSemaphore* semaphore = NULL;
+ if(lua_gettop(L) == 1) {
+ long id = luaL_checklong(L, 1);
+ semaphore = THSemaphore_newWithId(id);
+ }
+ else
+ luaL_error(L, "threads: semaphore new invalid arguments");
+ if(!semaphore)
+ luaL_error(L, "threads: semaphore new failed");
+ luaTHRD_pushudata(L, semaphore, "threads.Semaphore");
+ return 1;
+}
+
+static int semaphore_id(lua_State *L)
+{
+ THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore");
+ lua_pushinteger(L, THSemaphore_id(semaphore));
+ return 1;
+}
+
+static int semaphore_signal(lua_State *L)
+{
+ THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore");
+ if(THSemaphore_signal(semaphore))
+ luaL_error(L, "threads: semaphore signal failed");
+ return 0;
+}
+
+static int semaphore_wait(lua_State *L)
+{
+ THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore");
+ if(THSemaphore_wait(semaphore))
+ luaL_error(L, "threads: semaphore wait failed");
+ return 0;
+}
+
+static int semaphore_trywait(lua_State *L)
+{
+ THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore");
+ int wait = THSemaphore_trywait(semaphore);
+ lua_pushinteger(L, wait);
+ return 1;
+}
+
+static void semaphore_free(lua_State *L)
+{
+ THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore");
+ if(!semaphore)
+ luaL_error(L, "threads: semaphore free failed");
+ THSemaphore_free(semaphore);
+}
+
+static const struct luaL_Reg thread__ [] = {
+ {"new", thread_new},
+ {"__tostring", thread_tostring},
+ {"id", thread_id},
+ {"free", thread_free},
+ {NULL, NULL}
+};
+
+static const struct luaL_Reg mutex__ [] = {
+ {"new", mutex_new},
+ {"__tostring", mutex_tostring},
+ {"id", mutex_id},
+ {"lock", mutex_lock},
+ {"unlock", mutex_unlock},
+ {"free", mutex_free},
+ {NULL, NULL}
+};
+
+static const struct luaL_Reg condition__ [] = {
+ {"new", condition_new},
+ {"__tostring", condition_tostring},
+ {"id", condition_id},
+ {"signal", condition_signal},
+ {"wait", condition_wait},
+ {"free", condition_free},
+ {NULL, NULL}
+};
+
+static const struct luaL_Reg semaphore__ [] = {
+ {"new", semaphore_new},
+ {"newfromid", semaphore_newfromid},
+ {"id", semaphore_id},
+ {"signal", semaphore_signal},
+ {"wait", semaphore_wait},
+ {"trywait", semaphore_trywait},
+ {"free", semaphore_free},
+ {NULL, NULL}
+};
+
+static void thread_init_pkg(lua_State *L)
+{
+ if(!luaL_newmetatable(L, "threads.Thread"))
+ luaL_error(L, "threads: threads.Thread type already exists");
+ luaL_setfuncs(L, thread__, 0);
+ lua_pushstring(L, "__index");
+ lua_pushvalue(L, -2);
+ lua_rawset(L, -3);
+ lua_pop(L, 1);
+
+ if(!luaL_newmetatable(L, "threads.Mutex"))
+ luaL_error(L, "threads: threads.Mutex type already exists");
+ luaL_setfuncs(L, mutex__, 0);
+ lua_pushstring(L, "__index");
+ lua_pushvalue(L, -2);
+ lua_rawset(L, -3);
+ lua_pop(L, 1);
+
+ if(!luaL_newmetatable(L, "threads.Condition"))
+ luaL_error(L, "threads: threads.Condition type already exists");
+ luaL_setfuncs(L, condition__, 0);
+ lua_pushstring(L, "__index");
+ lua_pushvalue(L, -2);
+ lua_rawset(L, -3);
+ lua_pop(L, 1);
+
+ if(!luaL_newmetatable(L, "threads.Semaphore"))
+ luaL_error(L, "threads: threads.Semaphore type already exists");
+ luaL_setfuncs(L, semaphore__, 0);
+ lua_pushstring(L, "__index");
+ lua_pushvalue(L, -2);
+ lua_rawset(L, -3);
+ lua_pop(L, 1);
+
+ lua_pushstring(L, "Thread");
+ luaTHRD_pushctortable(L, thread_new, "threads.Thread");
+ lua_rawset(L, -3);
+
+ lua_pushstring(L, "Mutex");
+ luaTHRD_pushctortable(L, mutex_new, "threads.Mutex");
+ lua_rawset(L, -3);
+
+ lua_pushstring(L, "Condition");
+ luaTHRD_pushctortable(L, condition_new, "threads.Condition");
+ lua_rawset(L, -3);
+
+ lua_pushstring(L, "Semaphore");
+ luaTHRD_pushctortable(L, semaphore_new, "threads.Semaphore");
+ lua_rawset(L, -3);
+}
diff --git a/fastnn/threads/test/test-low-level.lua b/fastnn/threads/test/test-low-level.lua
new file mode 100644
index 0000000..aea31db
--- /dev/null
+++ b/fastnn/threads/test/test-low-level.lua
@@ -0,0 +1,39 @@
+local t = require 'libthreads'
+
+local m = t.Mutex()
+local c = t.Condition()
+print(string.format('| main thread mutex: 0x%x', m:id()))
+print(string.format('| main thread condition: 0x%x', c:id()))
+
+local code = string.format([[
+ local t = require 'libthreads'
+ local c = t.Condition(%d)
+ print('|| hello from thread')
+ print(string.format('|| thread condition: 0x%%x', c:id()))
+ print('|| doing some stuff in thread...')
+ local x = 0
+ for i=1,10000 do
+ for i=1,10000 do
+ x = x + math.sin(i)
+ end
+ x = x / 10000
+ end
+ print(string.format('|| ...ok (x=%%f)', x))
+ c:signal()
+]], c:id())
+
+print(code)
+
+local thread = t.Thread(code)
+
+
+print('| waiting for thread...')
+m:lock()
+c:wait(m)
+print('| thread finished!')
+
+thread:free()
+m:free()
+c:free()
+
+print('| done')
diff --git a/fastnn/threads/test/test-threads-async.lua b/fastnn/threads/test/test-threads-async.lua
new file mode 100644
index 0000000..68bcd35
--- /dev/null
+++ b/fastnn/threads/test/test-threads-async.lua
@@ -0,0 +1,66 @@
+local threads = require 'threads'
+
+local nthread = 4
+local njob = 100
+
+local pool = threads.Threads(
+ nthread,
+ function(threadid)
+ print('starting a new thread/state number ' .. threadid)
+ end
+)
+
+
+local jobid = 0
+local result -- DO NOT put this in get
+local function get()
+
+ -- fill up the queue as much as we can
+ -- this will not block
+ while jobid < njob and pool:acceptsjob() do
+ jobid = jobid + 1
+
+ pool:addjob(
+ function(jobid)
+ print(string.format('thread ID %d is performing job %d', __threadid, jobid))
+ return string.format("job output from job %d", jobid)
+ end,
+
+ function(jobres)
+ result = jobres
+ end,
+
+ jobid
+ )
+ end
+
+ -- is there still something to do?
+ if pool:hasjob() then
+ pool:dojob() -- yes? do it!
+ if pool:haserror() then -- check for errors
+ pool:synchronize() -- finish everything and throw error
+ end
+ return result
+ end
+
+end
+
+local jobdone = 0
+repeat
+ -- get something asynchronously
+ local res = get()
+
+ -- do something with res (if any)
+ if res then
+ print(res)
+ jobdone = jobdone + 1
+ end
+
+until not res -- until there is nothing remaining
+
+assert(jobid == 100)
+assert(jobdone == 100)
+
+print('PASSED')
+
+pool:terminate()
diff --git a/fastnn/threads/test/test-threads-multiple.lua b/fastnn/threads/test/test-threads-multiple.lua
new file mode 100644
index 0000000..4429696
--- /dev/null
+++ b/fastnn/threads/test/test-threads-multiple.lua
@@ -0,0 +1,15 @@
+local threads = require 'threads'
+
+for i=1,1000 do
+ io.write(string.format('%04d.', tonumber(i)))
+ io.flush()
+ local pool =
+ threads.Threads(
+ 4,
+ function(threadid)
+ require 'torch'
+ end
+ )
+end
+print()
+print('PASSED')
diff --git a/fastnn/threads/test/test-threads-shared.lua b/fastnn/threads/test/test-threads-shared.lua
new file mode 100644
index 0000000..3d63851
--- /dev/null
+++ b/fastnn/threads/test/test-threads-shared.lua
@@ -0,0 +1,111 @@
+require 'torch'
+
+local threads = require 'threads'
+local status, tds = pcall(require, 'tds')
+tds = status and tds or nil
+
+local nthread = 4
+local njob = 10
+local msg = "hello from a satellite thread"
+
+threads.Threads.serialization('threads.sharedserialize')
+
+local x = {}
+local xh = tds and tds.hash() or {}
+local xs = {}
+local z = tds and tds.hash() or {}
+local D = 10
+local K = tds and 100000 or 100 -- good luck in non-shared (30M)
+for i=1,njob do
+ x[i] = torch.ones(D)
+ xh[i] = torch.ones(D)
+ xs[i] = torch.FloatStorage(D):fill(1)
+ for j=1,K do
+ z[(i-1)*K+j] = "blah" .. i .. j
+ end
+end
+collectgarbage()
+collectgarbage()
+
+print('GO')
+
+local pool = threads.Threads(
+ nthread,
+ function(threadIdx)
+ pcall(require, 'tds')
+ print('starting a new thread/state number:', threadIdx)
+ gmsg = msg -- we copy here an upvalue of the main thread
+ end
+)
+
+local jobdone = 0
+for i=1,njob do
+ pool:addjob(
+ function()
+ assert(x[i]:sum() == D)
+ assert(xh[i]:sum() == D)
+ assert(torch.FloatTensor(xs[i]):sum() == D)
+ for j=1,K do
+ assert(z[(i-1)*K+j] == "blah" .. i .. j)
+ end
+ x[i]:add(1)
+ xh[i]:add(1)
+ torch.FloatTensor(xs[i]):add(1)
+ print(string.format('%s -- thread ID is %x', gmsg, __threadid))
+ collectgarbage()
+ collectgarbage()
+ return __threadid
+ end,
+
+ function(id)
+ print(string.format("task %d finished (ran on thread ID %x)", i, id))
+ jobdone = jobdone + 1
+ end
+ )
+end
+
+for i=1,njob do
+ pool:addjob(
+ function()
+ collectgarbage()
+ collectgarbage()
+ end
+ )
+end
+
+pool:synchronize()
+
+print(string.format('%d jobs done', jobdone))
+
+pool:terminate()
+
+-- did we do the job in shared mode?
+for i=1,njob do
+ assert(x[i]:sum() == 2*D)
+ assert(xh[i]:sum() == 2*D)
+ assert(torch.FloatTensor(xs[i]):sum() == 2*D)
+end
+
+-- serialize and zero x
+local str = torch.serialize(x)
+local strh = torch.serialize(xh)
+local strs = torch.serialize(xs)
+for i=1,njob do
+ x[i]:zero()
+ xh[i]:zero()
+ xs[i]:fill(0)
+end
+
+-- dude, check that unserialized x does not point on x
+local y = torch.deserialize(str)
+local yh = torch.deserialize(strh)
+local ys = torch.deserialize(strs)
+for i=1,njob do
+ assert(y[i]:sum() == 2*D)
+ assert(yh[i]:sum() == 2*D)
+ assert(torch.FloatTensor(ys[i]):sum() == 2*D)
+end
+
+pool:terminate()
+
+print('PASSED')
diff --git a/fastnn/threads/test/test-threads.lua b/fastnn/threads/test/test-threads.lua
new file mode 100644
index 0000000..e49a381
--- /dev/null
+++ b/fastnn/threads/test/test-threads.lua
@@ -0,0 +1,20 @@
+local clib = require 'libthreads'
+
+
+nthread = 1
+
+str='ad;alkfakd;af'
+code = [[ function func(str) print(str) end; print(str);]]
+print(code)
+
+--thread = clib.Thread(code)
+
+
+--thread:free()
+
+
+require 'threads'
+
+tt = threads.Thread(code)
+
+
diff --git a/fastnn/threads/threads-scm-1.rockspec b/fastnn/threads/threads-scm-1.rockspec
new file mode 100644
index 0000000..02535d4
--- /dev/null
+++ b/fastnn/threads/threads-scm-1.rockspec
@@ -0,0 +1,36 @@
+package = "threads"
+version = "scm-1"
+source = {
+ url = "https://github.com/uphantom/nerv-threads.git"
+}
+description = {
+ summary = "Thread control for Nerv fastnn",
+ detailed = [[
+ ]],
+ homepage = "https://github.com/uphantom/nerv-threads",
+ license = "BSD"
+}
+dependencies = {
+ "nerv >= scm-1",
+ "lua >= 5.1"
+}
+build = {
+ type = "make",
+ build_variables = {
+ CFLAGS="$(CFLAGS)",
+ LIBFLAG="$(LIBFLAG)",
+ LUA_LIBDIR="$(LUA_LIBDIR)",
+ LUA_BINDIR="$(LUA_BINDIR)",
+ LUA_INCDIR="$(LUA_INCDIR)",
+ INST_PREFIX="$(PREFIX)",
+ LUA="$(LUA)",
+ },
+ install_variables = {
+ LUA_BINDIR="$(LUA_BINDIR)",
+ INST_PREFIX="$(PREFIX)",
+ INST_BINDIR="$(BINDIR)",
+ INST_LIBDIR="$(LIBDIR)",
+ INST_LUADIR="$(LUADIR)",
+ INST_CONFDIR="$(CONFDIR)",
+ },
+}
diff --git a/fastnn/threads/threads.lua b/fastnn/threads/threads.lua
new file mode 100644
index 0000000..2db7d51
--- /dev/null
+++ b/fastnn/threads/threads.lua
@@ -0,0 +1,14 @@
+local threads = {}
+
+local C = require 'libthreads'
+
+threads.Thread = C.Thread
+threads.Mutex = C.Mutex
+threads.Condition = C.Condition
+threads.Semaphore = C.Semaphore
+--threads.Threads = require 'threads.threads'
+
+-- only for backward compatibility (boo)
+-- setmetatable(threads, getmetatable(threads.Threads))
+
+-- return threads