diff options
Diffstat (limited to 'fastnn/threads')
-rw-r--r-- | fastnn/threads/Makefile | 42 | ||||
-rw-r--r-- | fastnn/threads/init.lua | 14 | ||||
-rw-r--r-- | fastnn/threads/lib/THThread.c | 349 | ||||
-rw-r--r-- | fastnn/threads/lib/THThread.h | 36 | ||||
-rw-r--r-- | fastnn/threads/lib/init.c | 27 | ||||
-rw-r--r-- | fastnn/threads/lib/luaTHRD.h | 84 | ||||
-rw-r--r-- | fastnn/threads/lib/threads.c | 355 | ||||
-rw-r--r-- | fastnn/threads/test/test-low-level.lua | 39 | ||||
-rw-r--r-- | fastnn/threads/test/test-threads-async.lua | 66 | ||||
-rw-r--r-- | fastnn/threads/test/test-threads-multiple.lua | 15 | ||||
-rw-r--r-- | fastnn/threads/test/test-threads-shared.lua | 111 | ||||
-rw-r--r-- | fastnn/threads/test/test-threads.lua | 20 | ||||
-rw-r--r-- | fastnn/threads/threads-scm-1.rockspec | 36 | ||||
-rw-r--r-- | fastnn/threads/threads.lua | 14 |
14 files changed, 1208 insertions, 0 deletions
diff --git a/fastnn/threads/Makefile b/fastnn/threads/Makefile new file mode 100644 index 0000000..17958f9 --- /dev/null +++ b/fastnn/threads/Makefile @@ -0,0 +1,42 @@ +.PHONY: build install clean +SHELL := /bin/bash +BUILD_DIR := $(CURDIR)/build +INC_PATH := $(LUA_BINDIR)/../include/nerv/luaT +LUA_DIR = $(INST_LUADIR)/threads +LIB_PATH := $(LUA_BINDIR)/../lib + +OBJ_DIR := $(BUILD_DIR)/objs +SUBDIR := lib test + + +OBJS := lib/init.o lib/threads.o lib/THThread.o +LIBS := $(INST_LIBDIR)/libthreads.so +LUA_LIBS := init.lua threads.lua test/test-threads.lua +INCLUDE := -I $(LUA_INCDIR) -I $(INC_PATH) -DLUA_USE_APICHECK -DUSE_PTHREAD_THREADS + + +OBJS := $(addprefix $(OBJ_DIR)/,$(OBJS)) +OBJ_SUBDIR := $(addprefix $(OBJ_DIR)/,$(SUBDIR)) +LUA_SUBDIR := $(addprefix $(LUA_DIR)/,$(SUBDIR)) +LUA_LIBS := $(addprefix $(LUA_DIR)/,$(LUA_LIBS)) + +CFLAGS := -Wall -Wextra -O0 + +build: $(OBJ_DIR) $(OBJ_SUBDIR) $(OBJS) +install: $(LUA_DIR) $(LUA_SUBDIR) $(LIBS) $(LUA_LIBS) + +$(OBJ_DIR) $(LUA_DIR) $(OBJ_SUBDIR) $(LUA_SUBDIR): + -mkdir -p $@ +$(LUA_DIR)/%.lua: %.lua + cp $< $@ + +$(OBJ_DIR)/%.o: %.c $(patsubst /%.o,/%.c,$@) + gcc -c -o $@ $< $(INCLUDE) -fPIC $(CFLAGS) + +$(LIBS): $(OBJS) + gcc -shared -o $@ $^ $(LDFLAGS) -Wl,-rpath=$(LIB_PATH) -L$(LIB_PATH) -lnervcore -lluaT -lpthread + +clean: + -rm -rf $(OBJ_DIR) + + diff --git a/fastnn/threads/init.lua b/fastnn/threads/init.lua new file mode 100644 index 0000000..6cd3679 --- /dev/null +++ b/fastnn/threads/init.lua @@ -0,0 +1,14 @@ +threads = {} + +local C = require 'libthreads' + +threads.Thread = C.Thread +threads.Mutex = C.Mutex +threads.Condition = C.Condition +threads.Semaphore = C.Semaphore +--threads.Threads = require 'threads.threads' + +-- only for backward compatibility (boo) +-- setmetatable(threads, getmetatable(threads.Threads)) + +-- return threads diff --git a/fastnn/threads/lib/THThread.c b/fastnn/threads/lib/THThread.c new file mode 100644 index 0000000..7abc044 --- /dev/null +++ b/fastnn/threads/lib/THThread.c @@ -0,0 +1,349 @@ +#ifndef TH_THREAD_INC +#define TH_THREAD_INC + +#include <stdlib.h> +#include <string.h> + +/*#include "TH.h" */ +#include "THThread.h" + +#if defined(USE_PTHREAD_THREADS) +#include <pthread.h> + +#elif defined(USE_WIN32_THREADS) + +/* very basic emulation to suit our needs */ + +#include <process.h> +#include <windows.h> + +typedef HANDLE pthread_t; +typedef DWORD pthread_attr_t; +typedef HANDLE pthread_mutex_t; +typedef HANDLE pthread_cond_t; + +static int pthread_create(pthread_t *restrict thread, + const pthread_attr_t *restrict attr, void *(*start_routine)(void *), + void *restrict arg) +{ + *thread = (HANDLE)_beginthreadex(NULL, 0, (THREAD_FUNCTION)start_routine, arg, 0, NULL); + return (int)(*thread == NULL); +} + +static int pthread_join(pthread_t thread, void **value_ptr) +{ + return ((WaitForSingleObject((thread), INFINITE) != WAIT_OBJECT_0) || !CloseHandle(thread)); +} + +static int pthread_mutex_init(pthread_mutex_t *restrict mutex, + const pthread_mutexattr_t *restrict attr) +{ + *mutex = CreateMutex(NULL, FALSE, NULL); + return (int)(*mutex == NULL); +} + +static int pthread_mutex_lock(pthread_mutex_t *mutex) +{ + return WaitForSingleObject(*mutex, INFINITE) == 0; +} + +static int pthread_mutex_unlock(pthread_mutex_t *mutex) +{ + return ReleaseMutex(*mutex) == 0; +} + +static int pthread_mutex_destroy(pthread_mutex_t *mutex) +{ + return CloseHandle(*mutex) == 0; +} + +static int pthread_cond_init(pthread_cond_t *restrict cond, + const pthread_condattr_t *restrict attr) +{ + *cond = CreateEvent(NULL, FALSE, FALSE, NULL); + return (int)(*cond == NULL); +} + +static int pthread_cond_wait(pthread_cond_t *restrict cond, + pthread_mutex_t *restrict mutex) +{ + SignalObjectAndWait(*mutex, *cond, INFINITE, FALSE); + return WaitForSingleObject(*mutex, INFINITE) == 0; +} + +static int pthread_cond_destroy(pthread_cond_t *cond) +{ + return CloseHandle(*cond) == 0; +} + +int pthread_cond_signal(pthread_cond_t *cond) +{ + return SetEvent(*cond) == 0; +} + +#else +#error no thread system available +#endif + +typedef struct THThread_ { + pthread_t id; + int (*func)(void*); + void* data; + int status; +} THThread; + +typedef struct THMutex_{ + pthread_mutex_t id; + int refcount; +} THMutex; + +typedef struct THCondition_ { + pthread_cond_t id; + int refcount; +} THCondition; + +typedef struct THSemaphore_ { + pthread_mutex_t mutex_; + pthread_cond_t cond_; + int counter_; + int refcount; +}THSemaphore; + +static void* thread_closure(void *data) +{ + THThread *thread = data; + thread->status = thread->func(thread->data); + return NULL; +} + +THThread* THThread_new(int (*func)(void*), void *data) +{ + THThread *self = malloc(sizeof(THThread)); + self->func = func; + self->data = data; + self->status = 0; + if(!self) + return NULL; + if(pthread_create(&self->id, NULL, thread_closure, self)) { + free(self); + return NULL; + } + return self; +} + +long THThread_id(THThread *self) +{ + return (long)(self); +} + +int THThread_free(THThread *self) +{ + int status = 1; + if(self) { + if(pthread_join(self->id, NULL)) + return 1; + status = self->status; + free(self); + self = NULL; + } + return status; +} + +THMutex* THMutex_new(void) +{ + THMutex *self = malloc(sizeof(THMutex)); + if(!self) + return NULL; + if(pthread_mutex_init(&self->id, NULL) != 0) { + free(self); + return NULL; + } + self->refcount = 1; + return self; +} + +THMutex* THMutex_newWithId(long id) +{ + THMutex *self = (THMutex*)id; + __sync_fetch_and_add(&self->refcount, 1); + return self; +} + +long THMutex_id(THMutex *self) +{ + return (long)(self); +} + +int THMutex_lock(THMutex *self) +{ + if(pthread_mutex_lock(&self->id) != 0) + return 1; + return 0; +} + +int THMutex_trylock(THMutex *self) +{ + if (pthread_mutex_trylock(&self->id) != 0) + return 1; + return 0; +} + +int THMutex_unlock(THMutex *self) +{ + if(pthread_mutex_unlock(&self->id) != 0) + return 1; + return 0; +} + +void THMutex_free(THMutex *self) +{ + if(self) { + if(__sync_fetch_and_add(&self->refcount, -1) == 1) { + pthread_mutex_destroy(&self->id); + free(self); + self = NULL; + } + } +} + +THCondition* THCondition_new(void) +{ + THCondition *self = malloc(sizeof(THCondition)); + if(!self) + return NULL; + if(pthread_cond_init(&self->id, NULL)) { + free(self); + return NULL; + } + self->refcount = 1; + return self; +} + +THCondition* THCondition_newWithId(long id) +{ + THCondition *self = (THCondition*)id; + __sync_fetch_and_add(&self->refcount, 1); + return self; +} + +long THCondition_id(THCondition *self) +{ + return (long)(self); +} + +int THCondition_signal(THCondition *self) +{ + if(pthread_cond_signal(&self->id)) + return 1; + return 0; +} + +int THCondition_wait(THCondition *self, THMutex *mutex) +{ + if(pthread_cond_wait(&self->id, &mutex->id)) + return 1; + return 0; +} + +void THCondition_free(THCondition *self) +{ + if(self) { + if(__sync_fetch_and_add(&self->refcount, -1) == 1) { + pthread_cond_destroy(&self->id); + free(self); + self = NULL; + } + } +} + +THSemaphore* THSemaphore_new(int initValue) +{ + THSemaphore *self = malloc(sizeof(THSemaphore)); + if(!self) + return NULL; + self->counter_ = initValue; + if(pthread_mutex_init(&self->mutex_, NULL) != 0) + { + free(self); + return NULL; + } + + if(pthread_cond_init(&self->cond_, NULL) != 0) + { + free(self); + return NULL; + } + + self->refcount = 1; + + return self; +} + +THSemaphore* THSemaphore_newWithId(long id) +{ + THSemaphore *self = (THSemaphore*)id; + __sync_fetch_and_add(&self->refcount, 1); + return self; +} + +long THSemaphore_id(THSemaphore *self) +{ + return (long)(self); +} + +int THSemaphore_wait(THSemaphore *self) +{ + int ret = 0; + ret |= pthread_mutex_lock(&self->mutex_); + + while (self->counter_ <= 0) + ret |= pthread_cond_wait(&self->cond_, &self->mutex_); + + self->counter_--; + ret |= pthread_mutex_unlock(&self->mutex_); + + return ret; + +} + +int THSemaphore_signal(THSemaphore *self) +{ + int ret = 0; + ret |= pthread_mutex_lock(&self->mutex_); + self->counter_++; + ret |= pthread_cond_signal(&self->cond_); + ret |= pthread_mutex_unlock(&self->mutex_); + + return ret; +} + +int THSemaphore_trywait(THSemaphore *self) +{ + int ret = 0; + int try_wait_succeeded = 1; + ret |= pthread_mutex_lock(&self->mutex_); + if (self->counter_ > 0) { + self->counter_--; + try_wait_succeeded = 0; + } + ret |= pthread_mutex_unlock(&self->mutex_); + return try_wait_succeeded; +} + +void THSemaphore_free(THSemaphore *self) +{ + if(self) + { + if(__sync_fetch_and_add(&self->refcount, -1) == 1) + { + pthread_mutex_destroy(&self->mutex_); + pthread_cond_destroy(&self->cond_); + free(self); + self = NULL; + } + } + } + + + +#endif diff --git a/fastnn/threads/lib/THThread.h b/fastnn/threads/lib/THThread.h new file mode 100644 index 0000000..75ba417 --- /dev/null +++ b/fastnn/threads/lib/THThread.h @@ -0,0 +1,36 @@ +#ifndef TH_THREAD_INC +#define TH_THREAD_INC + +typedef struct THThread_ THThread; +typedef struct THMutex_ THMutex; +typedef struct THCondition_ THCondition; +typedef struct THSemaphore_ THSemaphore; + +THThread* THThread_new(int (*closure)(void*), void *data); +long THThread_id(THThread *self); +int THThread_free(THThread *self); + +THMutex* THMutex_new(void); +THMutex* THMutex_newWithId(long id); +long THMutex_id(THMutex *self); +int THMutex_lock(THMutex *self); +int THMutex_unlock(THMutex *self); +int THMutex_trylock(THMutex *self); +void THMutex_free(THMutex *self); + +THCondition* THCondition_new(void); +THCondition* THCondition_newWithId(long id); +long THCondition_id(THCondition *self); +int THCondition_signal(THCondition *self); +int THCondition_wait(THCondition *self, THMutex *mutex); +void THCondition_free(THCondition *self); + +THSemaphore* THSemaphore_new(int initValue); +THSemaphore* THSemaphore_newWithId(long id); +long THSemaphore_id(THSemaphore *self); +int THSemaphore_signal(THSemaphore *self); +int THSemaphore_wait(THSemaphore *self); +int THSemaphore_trywait(THSemaphore *self); +void THSemaphore_free(THSemaphore *self); + +#endif diff --git a/fastnn/threads/lib/init.c b/fastnn/threads/lib/init.c new file mode 100644 index 0000000..4042189 --- /dev/null +++ b/fastnn/threads/lib/init.c @@ -0,0 +1,27 @@ +#include <lua.h> +#include <lauxlib.h> + +#if LUA_VERSION_NUM == 501 +static void luaL_setfuncs(lua_State *L, const luaL_Reg *l, int nup) +{ + luaL_checkstack(L, nup+1, "too many upvalues"); + for (; l->name != NULL; l++) { /* fill the table with given functions */ + int i; + lua_pushstring(L, l->name); + for (i = 0; i < nup; i++) /* copy upvalues to the top */ + lua_pushvalue(L, -(nup+1)); + lua_pushcclosure(L, l->func, nup); /* closure with those upvalues */ + lua_settable(L, -(nup + 3)); + } + lua_pop(L, nup); /* remove upvalues */ +} +#endif + +#include "threads.c" + +int luaopen_libthreads(lua_State *L) +{ + lua_newtable(L); + thread_init_pkg(L); + return 1; +} diff --git a/fastnn/threads/lib/luaTHRD.h b/fastnn/threads/lib/luaTHRD.h new file mode 100644 index 0000000..3760bdb --- /dev/null +++ b/fastnn/threads/lib/luaTHRD.h @@ -0,0 +1,84 @@ +#ifndef LUA_THRD_INC +#define LUA_THRD_INC + +static int luaTHRD_pushudata(lua_State *L, void *ptr, const char* typename) +{ + void **udata = lua_newuserdata(L, sizeof(void*)); + if(udata) { + *udata = ptr; + luaL_getmetatable(L, typename); + lua_setmetatable(L, -2); + return 1; + } + return 0; +} + +static void *luaTHRD_checkudata(lua_State *L, int narg, const char *typename) +{ + void **udata = luaL_checkudata(L, narg, typename); + if(udata) + return *udata; + else + return NULL; +} + +static void *luaTHRD_toudata(lua_State *L, int narg, const char *typename) +{ + void **udata = lua_touserdata(L, narg); + if(udata) { + if(lua_getmetatable(L, -1)) { + luaL_getmetatable(L, typename); + if(lua_equal(L, -1, -2)) { + lua_pop(L, 2); + return *udata; + } + else { + lua_pop(L, 2); + return NULL; + } + } + else + return NULL; + } + else + return NULL; +} + +static int luaTHRD_ctor(lua_State *L) +{ + if(!lua_istable(L, 1)) /* dummy ctor table */ + luaL_error(L, "ctor: table expected"); + lua_getmetatable(L, 1); + lua_remove(L, 1); /* dummy ctor table */ + if(!lua_istable(L, -1)) + luaL_error(L, "ctor: no metatable found"); + lua_pushstring(L, "__new"); + lua_rawget(L, -2); + lua_remove(L, -2); /* forget about metatable */ + if(!lua_isfunction(L, -1)) + luaL_error(L, "ctor: __new appears to be not a function"); + lua_insert(L, 1); /* ctor first, arguments follow */ + lua_call(L, lua_gettop(L)-1, LUA_MULTRET); + return lua_gettop(L); +} + +static void luaTHRD_pushctortable(lua_State *L, lua_CFunction ctor, const char* typename) +{ + lua_newtable(L); /* empty useless dude */ + lua_newtable(L); /* metatable of the dude */ + lua_pushstring(L, "__index"); + luaL_getmetatable(L, typename); + lua_rawset(L, -3); + lua_pushstring(L, "__newindex"); + luaL_getmetatable(L, typename); + lua_rawset(L, -3); + lua_pushstring(L, "__new"); /* __call will look into there */ + lua_pushcfunction(L, ctor); + lua_rawset(L, -3); + lua_pushstring(L, "__call"); /* pop the table and calls __new */ + lua_pushcfunction(L, luaTHRD_ctor); + lua_rawset(L, -3); + lua_setmetatable(L, -2); +} + +#endif diff --git a/fastnn/threads/lib/threads.c b/fastnn/threads/lib/threads.c new file mode 100644 index 0000000..bf2a5ec --- /dev/null +++ b/fastnn/threads/lib/threads.c @@ -0,0 +1,355 @@ +#include <stdio.h> +#include <stdlib.h> +#include <luaT.h> +#include <string.h> + +#include "THThread.h" +#include "luaTHRD.h" + +#include <lua.h> +#include <lualib.h> + +static int newthread(void *code_) +{ + char *code = code_; + lua_State *L = luaL_newstate(); + + if(!L) { + printf("THREAD FATAL ERROR: could not create lua state\n"); + return -1; + } + luaL_openlibs(L); + + if(luaL_loadstring(L, code)) { + printf("FATAL THREAD PANIC: (loadstring) %s\n", lua_tolstring(L, -1, NULL)); + free(code); + lua_close(L); + return -1; + } + free(code); + if(lua_pcall(L, 0, 0, 0)) { + printf("FATAL THREAD PANIC: (pcall) %s\n", lua_tolstring(L, -1, NULL)); + lua_close(L); + return -1; + } + + lua_close(L); + return 0; +} + +static int thread_new(lua_State *L) +{ + THThread *thread = NULL; + size_t len = 0; + const char *code = luaL_checklstring(L, 1, &len); + char *code_dup = malloc(len+1); + if(!code_dup) + luaL_error(L, "threads: out of memory"); + memcpy(code_dup, code, len+1); + + thread = THThread_new(newthread, (void*)code_dup); + if(!thread) + luaL_error(L, "threads: thread new failed"); + luaTHRD_pushudata(L, thread, "threads.Thread"); + + return 1; +} + +static int thread_tostring(lua_State *L) +{ + char str[128]; + THThread *thread = luaTHRD_checkudata(L, 1, "threads.Thread"); + snprintf(str, 128, "threads.Thread <%lx>", THThread_id(thread)); + lua_pushstring(L, str); + return 1; +} + +static int thread_id(lua_State *L) +{ + THThread *thread = luaTHRD_checkudata(L, 1, "threads.Thread"); + lua_pushinteger(L, THThread_id(thread)); + return 1; +} + +static int thread_free(lua_State *L) +{ + THThread *thread = luaTHRD_checkudata(L, 1, "threads.Thread"); + THThread_free(thread); + return 0; +} + +static int mutex_new(lua_State *L) +{ + THMutex *mutex = NULL; + if(lua_gettop(L) == 0) { + mutex = THMutex_new(); + } + else if(lua_gettop(L) == 1) { + long id = luaL_checklong(L, 1); + mutex = THMutex_newWithId(id); + } + else + luaL_error(L, "threads: mutex new invalid arguments"); + if(!mutex) + luaL_error(L, "threads: mutex new failed"); + luaTHRD_pushudata(L, mutex, "threads.Mutex"); + return 1; +} + +static int mutex_tostring(lua_State *L) +{ + char str[128]; + THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex"); + snprintf(str, 128, "threads.Mutex <%lx>", THMutex_id(mutex)); + lua_pushstring(L, str); + return 1; +} + +static int mutex_id(lua_State *L) +{ + THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex"); + lua_pushinteger(L, THMutex_id(mutex)); + return 1; +} + +static int mutex_lock(lua_State *L) +{ + THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex"); + if(THMutex_lock(mutex)) + luaL_error(L, "threads: mutex lock failed"); + return 0; +} + +static int mutex_unlock(lua_State *L) +{ + THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex"); + if(THMutex_unlock(mutex)) + luaL_error(L, "threads: mutex unlock failed"); + return 0; +} + +static int mutex_free(lua_State *L) +{ + THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex"); + THMutex_free(mutex); + return 0; +} + +static int condition_new(lua_State *L) +{ + THCondition *condition = NULL; + if(lua_gettop(L) == 0) { + condition = THCondition_new(); + } + else if(lua_gettop(L) == 1) { + long id = luaL_checklong(L, 1); + condition = THCondition_newWithId(id); + } + else + luaL_error(L, "threads: condition new invalid arguments"); + if(!condition) + luaL_error(L, "threads: condition new failed"); + luaTHRD_pushudata(L, condition, "threads.Condition"); + return 1; +} + +static int condition_tostring(lua_State *L) +{ + char str[128]; + THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition"); + snprintf(str, 128, "threads.Condition <%lx>", THCondition_id(condition)); + lua_pushstring(L, str); + return 1; +} + +static int condition_id(lua_State *L) +{ + THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition"); + lua_pushinteger(L, THCondition_id(condition)); + return 1; +} + +static int condition_free(lua_State *L) +{ + THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition"); + if(!condition) + luaL_error(L, "threads: condition free failed"); + THCondition_free(condition); + return 0; +} + +static int condition_signal(lua_State *L) +{ + THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition"); + if(THCondition_signal(condition)) + luaL_error(L, "threads: condition signal failed"); + return 0; +} + +static int condition_wait(lua_State *L) +{ + THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition"); + THMutex *mutex = luaTHRD_checkudata(L, 2, "threads.Mutex"); + if(THCondition_wait(condition, mutex)) + luaL_error(L, "threads: condition wait failed"); + return 0; +} + +static int semaphore_new(lua_State *L) +{ + THSemaphore* semaphore = NULL; + if(lua_gettop(L) == 1) { + int initValue = luaL_checkinteger(L, 1); + semaphore = THSemaphore_new(initValue); + } + else + luaL_error(L, "threads: semaphore new invalid arguments"); + if (!semaphore) + luaL_error(L, "threads: semaphore new failed"); + luaTHRD_pushudata(L, semaphore, "threads.Semaphore"); + return 1; +} + +static int semaphore_newfromid(lua_State *L) +{ + THSemaphore* semaphore = NULL; + if(lua_gettop(L) == 1) { + long id = luaL_checklong(L, 1); + semaphore = THSemaphore_newWithId(id); + } + else + luaL_error(L, "threads: semaphore new invalid arguments"); + if(!semaphore) + luaL_error(L, "threads: semaphore new failed"); + luaTHRD_pushudata(L, semaphore, "threads.Semaphore"); + return 1; +} + +static int semaphore_id(lua_State *L) +{ + THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore"); + lua_pushinteger(L, THSemaphore_id(semaphore)); + return 1; +} + +static int semaphore_signal(lua_State *L) +{ + THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore"); + if(THSemaphore_signal(semaphore)) + luaL_error(L, "threads: semaphore signal failed"); + return 0; +} + +static int semaphore_wait(lua_State *L) +{ + THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore"); + if(THSemaphore_wait(semaphore)) + luaL_error(L, "threads: semaphore wait failed"); + return 0; +} + +static int semaphore_trywait(lua_State *L) +{ + THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore"); + int wait = THSemaphore_trywait(semaphore); + lua_pushinteger(L, wait); + return 1; +} + +static void semaphore_free(lua_State *L) +{ + THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore"); + if(!semaphore) + luaL_error(L, "threads: semaphore free failed"); + THSemaphore_free(semaphore); +} + +static const struct luaL_Reg thread__ [] = { + {"new", thread_new}, + {"__tostring", thread_tostring}, + {"id", thread_id}, + {"free", thread_free}, + {NULL, NULL} +}; + +static const struct luaL_Reg mutex__ [] = { + {"new", mutex_new}, + {"__tostring", mutex_tostring}, + {"id", mutex_id}, + {"lock", mutex_lock}, + {"unlock", mutex_unlock}, + {"free", mutex_free}, + {NULL, NULL} +}; + +static const struct luaL_Reg condition__ [] = { + {"new", condition_new}, + {"__tostring", condition_tostring}, + {"id", condition_id}, + {"signal", condition_signal}, + {"wait", condition_wait}, + {"free", condition_free}, + {NULL, NULL} +}; + +static const struct luaL_Reg semaphore__ [] = { + {"new", semaphore_new}, + {"newfromid", semaphore_newfromid}, + {"id", semaphore_id}, + {"signal", semaphore_signal}, + {"wait", semaphore_wait}, + {"trywait", semaphore_trywait}, + {"free", semaphore_free}, + {NULL, NULL} +}; + +static void thread_init_pkg(lua_State *L) +{ + if(!luaL_newmetatable(L, "threads.Thread")) + luaL_error(L, "threads: threads.Thread type already exists"); + luaL_setfuncs(L, thread__, 0); + lua_pushstring(L, "__index"); + lua_pushvalue(L, -2); + lua_rawset(L, -3); + lua_pop(L, 1); + + if(!luaL_newmetatable(L, "threads.Mutex")) + luaL_error(L, "threads: threads.Mutex type already exists"); + luaL_setfuncs(L, mutex__, 0); + lua_pushstring(L, "__index"); + lua_pushvalue(L, -2); + lua_rawset(L, -3); + lua_pop(L, 1); + + if(!luaL_newmetatable(L, "threads.Condition")) + luaL_error(L, "threads: threads.Condition type already exists"); + luaL_setfuncs(L, condition__, 0); + lua_pushstring(L, "__index"); + lua_pushvalue(L, -2); + lua_rawset(L, -3); + lua_pop(L, 1); + + if(!luaL_newmetatable(L, "threads.Semaphore")) + luaL_error(L, "threads: threads.Semaphore type already exists"); + luaL_setfuncs(L, semaphore__, 0); + lua_pushstring(L, "__index"); + lua_pushvalue(L, -2); + lua_rawset(L, -3); + lua_pop(L, 1); + + lua_pushstring(L, "Thread"); + luaTHRD_pushctortable(L, thread_new, "threads.Thread"); + lua_rawset(L, -3); + + lua_pushstring(L, "Mutex"); + luaTHRD_pushctortable(L, mutex_new, "threads.Mutex"); + lua_rawset(L, -3); + + lua_pushstring(L, "Condition"); + luaTHRD_pushctortable(L, condition_new, "threads.Condition"); + lua_rawset(L, -3); + + lua_pushstring(L, "Semaphore"); + luaTHRD_pushctortable(L, semaphore_new, "threads.Semaphore"); + lua_rawset(L, -3); +} diff --git a/fastnn/threads/test/test-low-level.lua b/fastnn/threads/test/test-low-level.lua new file mode 100644 index 0000000..aea31db --- /dev/null +++ b/fastnn/threads/test/test-low-level.lua @@ -0,0 +1,39 @@ +local t = require 'libthreads' + +local m = t.Mutex() +local c = t.Condition() +print(string.format('| main thread mutex: 0x%x', m:id())) +print(string.format('| main thread condition: 0x%x', c:id())) + +local code = string.format([[ + local t = require 'libthreads' + local c = t.Condition(%d) + print('|| hello from thread') + print(string.format('|| thread condition: 0x%%x', c:id())) + print('|| doing some stuff in thread...') + local x = 0 + for i=1,10000 do + for i=1,10000 do + x = x + math.sin(i) + end + x = x / 10000 + end + print(string.format('|| ...ok (x=%%f)', x)) + c:signal() +]], c:id()) + +print(code) + +local thread = t.Thread(code) + + +print('| waiting for thread...') +m:lock() +c:wait(m) +print('| thread finished!') + +thread:free() +m:free() +c:free() + +print('| done') diff --git a/fastnn/threads/test/test-threads-async.lua b/fastnn/threads/test/test-threads-async.lua new file mode 100644 index 0000000..68bcd35 --- /dev/null +++ b/fastnn/threads/test/test-threads-async.lua @@ -0,0 +1,66 @@ +local threads = require 'threads' + +local nthread = 4 +local njob = 100 + +local pool = threads.Threads( + nthread, + function(threadid) + print('starting a new thread/state number ' .. threadid) + end +) + + +local jobid = 0 +local result -- DO NOT put this in get +local function get() + + -- fill up the queue as much as we can + -- this will not block + while jobid < njob and pool:acceptsjob() do + jobid = jobid + 1 + + pool:addjob( + function(jobid) + print(string.format('thread ID %d is performing job %d', __threadid, jobid)) + return string.format("job output from job %d", jobid) + end, + + function(jobres) + result = jobres + end, + + jobid + ) + end + + -- is there still something to do? + if pool:hasjob() then + pool:dojob() -- yes? do it! + if pool:haserror() then -- check for errors + pool:synchronize() -- finish everything and throw error + end + return result + end + +end + +local jobdone = 0 +repeat + -- get something asynchronously + local res = get() + + -- do something with res (if any) + if res then + print(res) + jobdone = jobdone + 1 + end + +until not res -- until there is nothing remaining + +assert(jobid == 100) +assert(jobdone == 100) + +print('PASSED') + +pool:terminate() diff --git a/fastnn/threads/test/test-threads-multiple.lua b/fastnn/threads/test/test-threads-multiple.lua new file mode 100644 index 0000000..4429696 --- /dev/null +++ b/fastnn/threads/test/test-threads-multiple.lua @@ -0,0 +1,15 @@ +local threads = require 'threads' + +for i=1,1000 do + io.write(string.format('%04d.', tonumber(i))) + io.flush() + local pool = + threads.Threads( + 4, + function(threadid) + require 'torch' + end + ) +end +print() +print('PASSED') diff --git a/fastnn/threads/test/test-threads-shared.lua b/fastnn/threads/test/test-threads-shared.lua new file mode 100644 index 0000000..3d63851 --- /dev/null +++ b/fastnn/threads/test/test-threads-shared.lua @@ -0,0 +1,111 @@ +require 'torch' + +local threads = require 'threads' +local status, tds = pcall(require, 'tds') +tds = status and tds or nil + +local nthread = 4 +local njob = 10 +local msg = "hello from a satellite thread" + +threads.Threads.serialization('threads.sharedserialize') + +local x = {} +local xh = tds and tds.hash() or {} +local xs = {} +local z = tds and tds.hash() or {} +local D = 10 +local K = tds and 100000 or 100 -- good luck in non-shared (30M) +for i=1,njob do + x[i] = torch.ones(D) + xh[i] = torch.ones(D) + xs[i] = torch.FloatStorage(D):fill(1) + for j=1,K do + z[(i-1)*K+j] = "blah" .. i .. j + end +end +collectgarbage() +collectgarbage() + +print('GO') + +local pool = threads.Threads( + nthread, + function(threadIdx) + pcall(require, 'tds') + print('starting a new thread/state number:', threadIdx) + gmsg = msg -- we copy here an upvalue of the main thread + end +) + +local jobdone = 0 +for i=1,njob do + pool:addjob( + function() + assert(x[i]:sum() == D) + assert(xh[i]:sum() == D) + assert(torch.FloatTensor(xs[i]):sum() == D) + for j=1,K do + assert(z[(i-1)*K+j] == "blah" .. i .. j) + end + x[i]:add(1) + xh[i]:add(1) + torch.FloatTensor(xs[i]):add(1) + print(string.format('%s -- thread ID is %x', gmsg, __threadid)) + collectgarbage() + collectgarbage() + return __threadid + end, + + function(id) + print(string.format("task %d finished (ran on thread ID %x)", i, id)) + jobdone = jobdone + 1 + end + ) +end + +for i=1,njob do + pool:addjob( + function() + collectgarbage() + collectgarbage() + end + ) +end + +pool:synchronize() + +print(string.format('%d jobs done', jobdone)) + +pool:terminate() + +-- did we do the job in shared mode? +for i=1,njob do + assert(x[i]:sum() == 2*D) + assert(xh[i]:sum() == 2*D) + assert(torch.FloatTensor(xs[i]):sum() == 2*D) +end + +-- serialize and zero x +local str = torch.serialize(x) +local strh = torch.serialize(xh) +local strs = torch.serialize(xs) +for i=1,njob do + x[i]:zero() + xh[i]:zero() + xs[i]:fill(0) +end + +-- dude, check that unserialized x does not point on x +local y = torch.deserialize(str) +local yh = torch.deserialize(strh) +local ys = torch.deserialize(strs) +for i=1,njob do + assert(y[i]:sum() == 2*D) + assert(yh[i]:sum() == 2*D) + assert(torch.FloatTensor(ys[i]):sum() == 2*D) +end + +pool:terminate() + +print('PASSED') diff --git a/fastnn/threads/test/test-threads.lua b/fastnn/threads/test/test-threads.lua new file mode 100644 index 0000000..e49a381 --- /dev/null +++ b/fastnn/threads/test/test-threads.lua @@ -0,0 +1,20 @@ +local clib = require 'libthreads' + + +nthread = 1 + +str='ad;alkfakd;af' +code = [[ function func(str) print(str) end; print(str);]] +print(code) + +--thread = clib.Thread(code) + + +--thread:free() + + +require 'threads' + +tt = threads.Thread(code) + + diff --git a/fastnn/threads/threads-scm-1.rockspec b/fastnn/threads/threads-scm-1.rockspec new file mode 100644 index 0000000..02535d4 --- /dev/null +++ b/fastnn/threads/threads-scm-1.rockspec @@ -0,0 +1,36 @@ +package = "threads" +version = "scm-1" +source = { + url = "https://github.com/uphantom/nerv-threads.git" +} +description = { + summary = "Thread control for Nerv fastnn", + detailed = [[ + ]], + homepage = "https://github.com/uphantom/nerv-threads", + license = "BSD" +} +dependencies = { + "nerv >= scm-1", + "lua >= 5.1" +} +build = { + type = "make", + build_variables = { + CFLAGS="$(CFLAGS)", + LIBFLAG="$(LIBFLAG)", + LUA_LIBDIR="$(LUA_LIBDIR)", + LUA_BINDIR="$(LUA_BINDIR)", + LUA_INCDIR="$(LUA_INCDIR)", + INST_PREFIX="$(PREFIX)", + LUA="$(LUA)", + }, + install_variables = { + LUA_BINDIR="$(LUA_BINDIR)", + INST_PREFIX="$(PREFIX)", + INST_BINDIR="$(BINDIR)", + INST_LIBDIR="$(LIBDIR)", + INST_LUADIR="$(LUADIR)", + INST_CONFDIR="$(CONFDIR)", + }, +} diff --git a/fastnn/threads/threads.lua b/fastnn/threads/threads.lua new file mode 100644 index 0000000..2db7d51 --- /dev/null +++ b/fastnn/threads/threads.lua @@ -0,0 +1,14 @@ +local threads = {} + +local C = require 'libthreads' + +threads.Thread = C.Thread +threads.Mutex = C.Mutex +threads.Condition = C.Condition +threads.Semaphore = C.Semaphore +--threads.Threads = require 'threads.threads' + +-- only for backward compatibility (boo) +-- setmetatable(threads, getmetatable(threads.Threads)) + +-- return threads |