From a68d3c982ed0dd4ef5bbc9e0c22b9ecf9565b924 Mon Sep 17 00:00:00 2001 From: uphantom Date: Fri, 28 Aug 2015 17:41:14 +0800 Subject: fastnn version 1.0 --- fastnn/threads/Makefile | 42 +++ fastnn/threads/init.lua | 14 + fastnn/threads/lib/THThread.c | 349 +++++++++++++++++++++++++ fastnn/threads/lib/THThread.h | 36 +++ fastnn/threads/lib/init.c | 27 ++ fastnn/threads/lib/luaTHRD.h | 84 ++++++ fastnn/threads/lib/threads.c | 355 ++++++++++++++++++++++++++ fastnn/threads/test/test-low-level.lua | 39 +++ fastnn/threads/test/test-threads-async.lua | 66 +++++ fastnn/threads/test/test-threads-multiple.lua | 15 ++ fastnn/threads/test/test-threads-shared.lua | 111 ++++++++ fastnn/threads/test/test-threads.lua | 20 ++ fastnn/threads/threads-scm-1.rockspec | 36 +++ fastnn/threads/threads.lua | 14 + 14 files changed, 1208 insertions(+) create mode 100644 fastnn/threads/Makefile create mode 100644 fastnn/threads/init.lua create mode 100644 fastnn/threads/lib/THThread.c create mode 100644 fastnn/threads/lib/THThread.h create mode 100644 fastnn/threads/lib/init.c create mode 100644 fastnn/threads/lib/luaTHRD.h create mode 100644 fastnn/threads/lib/threads.c create mode 100644 fastnn/threads/test/test-low-level.lua create mode 100644 fastnn/threads/test/test-threads-async.lua create mode 100644 fastnn/threads/test/test-threads-multiple.lua create mode 100644 fastnn/threads/test/test-threads-shared.lua create mode 100644 fastnn/threads/test/test-threads.lua create mode 100644 fastnn/threads/threads-scm-1.rockspec create mode 100644 fastnn/threads/threads.lua (limited to 'fastnn/threads') diff --git a/fastnn/threads/Makefile b/fastnn/threads/Makefile new file mode 100644 index 0000000..17958f9 --- /dev/null +++ b/fastnn/threads/Makefile @@ -0,0 +1,42 @@ +.PHONY: build install clean +SHELL := /bin/bash +BUILD_DIR := $(CURDIR)/build +INC_PATH := $(LUA_BINDIR)/../include/nerv/luaT +LUA_DIR = $(INST_LUADIR)/threads +LIB_PATH := $(LUA_BINDIR)/../lib + +OBJ_DIR := $(BUILD_DIR)/objs +SUBDIR := lib test + + +OBJS := lib/init.o lib/threads.o lib/THThread.o +LIBS := $(INST_LIBDIR)/libthreads.so +LUA_LIBS := init.lua threads.lua test/test-threads.lua +INCLUDE := -I $(LUA_INCDIR) -I $(INC_PATH) -DLUA_USE_APICHECK -DUSE_PTHREAD_THREADS + + +OBJS := $(addprefix $(OBJ_DIR)/,$(OBJS)) +OBJ_SUBDIR := $(addprefix $(OBJ_DIR)/,$(SUBDIR)) +LUA_SUBDIR := $(addprefix $(LUA_DIR)/,$(SUBDIR)) +LUA_LIBS := $(addprefix $(LUA_DIR)/,$(LUA_LIBS)) + +CFLAGS := -Wall -Wextra -O0 + +build: $(OBJ_DIR) $(OBJ_SUBDIR) $(OBJS) +install: $(LUA_DIR) $(LUA_SUBDIR) $(LIBS) $(LUA_LIBS) + +$(OBJ_DIR) $(LUA_DIR) $(OBJ_SUBDIR) $(LUA_SUBDIR): + -mkdir -p $@ +$(LUA_DIR)/%.lua: %.lua + cp $< $@ + +$(OBJ_DIR)/%.o: %.c $(patsubst /%.o,/%.c,$@) + gcc -c -o $@ $< $(INCLUDE) -fPIC $(CFLAGS) + +$(LIBS): $(OBJS) + gcc -shared -o $@ $^ $(LDFLAGS) -Wl,-rpath=$(LIB_PATH) -L$(LIB_PATH) -lnervcore -lluaT -lpthread + +clean: + -rm -rf $(OBJ_DIR) + + diff --git a/fastnn/threads/init.lua b/fastnn/threads/init.lua new file mode 100644 index 0000000..6cd3679 --- /dev/null +++ b/fastnn/threads/init.lua @@ -0,0 +1,14 @@ +threads = {} + +local C = require 'libthreads' + +threads.Thread = C.Thread +threads.Mutex = C.Mutex +threads.Condition = C.Condition +threads.Semaphore = C.Semaphore +--threads.Threads = require 'threads.threads' + +-- only for backward compatibility (boo) +-- setmetatable(threads, getmetatable(threads.Threads)) + +-- return threads diff --git a/fastnn/threads/lib/THThread.c b/fastnn/threads/lib/THThread.c new file mode 100644 index 0000000..7abc044 --- /dev/null +++ b/fastnn/threads/lib/THThread.c @@ -0,0 +1,349 @@ +#ifndef TH_THREAD_INC +#define TH_THREAD_INC + +#include +#include + +/*#include "TH.h" */ +#include "THThread.h" + +#if defined(USE_PTHREAD_THREADS) +#include + +#elif defined(USE_WIN32_THREADS) + +/* very basic emulation to suit our needs */ + +#include +#include + +typedef HANDLE pthread_t; +typedef DWORD pthread_attr_t; +typedef HANDLE pthread_mutex_t; +typedef HANDLE pthread_cond_t; + +static int pthread_create(pthread_t *restrict thread, + const pthread_attr_t *restrict attr, void *(*start_routine)(void *), + void *restrict arg) +{ + *thread = (HANDLE)_beginthreadex(NULL, 0, (THREAD_FUNCTION)start_routine, arg, 0, NULL); + return (int)(*thread == NULL); +} + +static int pthread_join(pthread_t thread, void **value_ptr) +{ + return ((WaitForSingleObject((thread), INFINITE) != WAIT_OBJECT_0) || !CloseHandle(thread)); +} + +static int pthread_mutex_init(pthread_mutex_t *restrict mutex, + const pthread_mutexattr_t *restrict attr) +{ + *mutex = CreateMutex(NULL, FALSE, NULL); + return (int)(*mutex == NULL); +} + +static int pthread_mutex_lock(pthread_mutex_t *mutex) +{ + return WaitForSingleObject(*mutex, INFINITE) == 0; +} + +static int pthread_mutex_unlock(pthread_mutex_t *mutex) +{ + return ReleaseMutex(*mutex) == 0; +} + +static int pthread_mutex_destroy(pthread_mutex_t *mutex) +{ + return CloseHandle(*mutex) == 0; +} + +static int pthread_cond_init(pthread_cond_t *restrict cond, + const pthread_condattr_t *restrict attr) +{ + *cond = CreateEvent(NULL, FALSE, FALSE, NULL); + return (int)(*cond == NULL); +} + +static int pthread_cond_wait(pthread_cond_t *restrict cond, + pthread_mutex_t *restrict mutex) +{ + SignalObjectAndWait(*mutex, *cond, INFINITE, FALSE); + return WaitForSingleObject(*mutex, INFINITE) == 0; +} + +static int pthread_cond_destroy(pthread_cond_t *cond) +{ + return CloseHandle(*cond) == 0; +} + +int pthread_cond_signal(pthread_cond_t *cond) +{ + return SetEvent(*cond) == 0; +} + +#else +#error no thread system available +#endif + +typedef struct THThread_ { + pthread_t id; + int (*func)(void*); + void* data; + int status; +} THThread; + +typedef struct THMutex_{ + pthread_mutex_t id; + int refcount; +} THMutex; + +typedef struct THCondition_ { + pthread_cond_t id; + int refcount; +} THCondition; + +typedef struct THSemaphore_ { + pthread_mutex_t mutex_; + pthread_cond_t cond_; + int counter_; + int refcount; +}THSemaphore; + +static void* thread_closure(void *data) +{ + THThread *thread = data; + thread->status = thread->func(thread->data); + return NULL; +} + +THThread* THThread_new(int (*func)(void*), void *data) +{ + THThread *self = malloc(sizeof(THThread)); + self->func = func; + self->data = data; + self->status = 0; + if(!self) + return NULL; + if(pthread_create(&self->id, NULL, thread_closure, self)) { + free(self); + return NULL; + } + return self; +} + +long THThread_id(THThread *self) +{ + return (long)(self); +} + +int THThread_free(THThread *self) +{ + int status = 1; + if(self) { + if(pthread_join(self->id, NULL)) + return 1; + status = self->status; + free(self); + self = NULL; + } + return status; +} + +THMutex* THMutex_new(void) +{ + THMutex *self = malloc(sizeof(THMutex)); + if(!self) + return NULL; + if(pthread_mutex_init(&self->id, NULL) != 0) { + free(self); + return NULL; + } + self->refcount = 1; + return self; +} + +THMutex* THMutex_newWithId(long id) +{ + THMutex *self = (THMutex*)id; + __sync_fetch_and_add(&self->refcount, 1); + return self; +} + +long THMutex_id(THMutex *self) +{ + return (long)(self); +} + +int THMutex_lock(THMutex *self) +{ + if(pthread_mutex_lock(&self->id) != 0) + return 1; + return 0; +} + +int THMutex_trylock(THMutex *self) +{ + if (pthread_mutex_trylock(&self->id) != 0) + return 1; + return 0; +} + +int THMutex_unlock(THMutex *self) +{ + if(pthread_mutex_unlock(&self->id) != 0) + return 1; + return 0; +} + +void THMutex_free(THMutex *self) +{ + if(self) { + if(__sync_fetch_and_add(&self->refcount, -1) == 1) { + pthread_mutex_destroy(&self->id); + free(self); + self = NULL; + } + } +} + +THCondition* THCondition_new(void) +{ + THCondition *self = malloc(sizeof(THCondition)); + if(!self) + return NULL; + if(pthread_cond_init(&self->id, NULL)) { + free(self); + return NULL; + } + self->refcount = 1; + return self; +} + +THCondition* THCondition_newWithId(long id) +{ + THCondition *self = (THCondition*)id; + __sync_fetch_and_add(&self->refcount, 1); + return self; +} + +long THCondition_id(THCondition *self) +{ + return (long)(self); +} + +int THCondition_signal(THCondition *self) +{ + if(pthread_cond_signal(&self->id)) + return 1; + return 0; +} + +int THCondition_wait(THCondition *self, THMutex *mutex) +{ + if(pthread_cond_wait(&self->id, &mutex->id)) + return 1; + return 0; +} + +void THCondition_free(THCondition *self) +{ + if(self) { + if(__sync_fetch_and_add(&self->refcount, -1) == 1) { + pthread_cond_destroy(&self->id); + free(self); + self = NULL; + } + } +} + +THSemaphore* THSemaphore_new(int initValue) +{ + THSemaphore *self = malloc(sizeof(THSemaphore)); + if(!self) + return NULL; + self->counter_ = initValue; + if(pthread_mutex_init(&self->mutex_, NULL) != 0) + { + free(self); + return NULL; + } + + if(pthread_cond_init(&self->cond_, NULL) != 0) + { + free(self); + return NULL; + } + + self->refcount = 1; + + return self; +} + +THSemaphore* THSemaphore_newWithId(long id) +{ + THSemaphore *self = (THSemaphore*)id; + __sync_fetch_and_add(&self->refcount, 1); + return self; +} + +long THSemaphore_id(THSemaphore *self) +{ + return (long)(self); +} + +int THSemaphore_wait(THSemaphore *self) +{ + int ret = 0; + ret |= pthread_mutex_lock(&self->mutex_); + + while (self->counter_ <= 0) + ret |= pthread_cond_wait(&self->cond_, &self->mutex_); + + self->counter_--; + ret |= pthread_mutex_unlock(&self->mutex_); + + return ret; + +} + +int THSemaphore_signal(THSemaphore *self) +{ + int ret = 0; + ret |= pthread_mutex_lock(&self->mutex_); + self->counter_++; + ret |= pthread_cond_signal(&self->cond_); + ret |= pthread_mutex_unlock(&self->mutex_); + + return ret; +} + +int THSemaphore_trywait(THSemaphore *self) +{ + int ret = 0; + int try_wait_succeeded = 1; + ret |= pthread_mutex_lock(&self->mutex_); + if (self->counter_ > 0) { + self->counter_--; + try_wait_succeeded = 0; + } + ret |= pthread_mutex_unlock(&self->mutex_); + return try_wait_succeeded; +} + +void THSemaphore_free(THSemaphore *self) +{ + if(self) + { + if(__sync_fetch_and_add(&self->refcount, -1) == 1) + { + pthread_mutex_destroy(&self->mutex_); + pthread_cond_destroy(&self->cond_); + free(self); + self = NULL; + } + } + } + + + +#endif diff --git a/fastnn/threads/lib/THThread.h b/fastnn/threads/lib/THThread.h new file mode 100644 index 0000000..75ba417 --- /dev/null +++ b/fastnn/threads/lib/THThread.h @@ -0,0 +1,36 @@ +#ifndef TH_THREAD_INC +#define TH_THREAD_INC + +typedef struct THThread_ THThread; +typedef struct THMutex_ THMutex; +typedef struct THCondition_ THCondition; +typedef struct THSemaphore_ THSemaphore; + +THThread* THThread_new(int (*closure)(void*), void *data); +long THThread_id(THThread *self); +int THThread_free(THThread *self); + +THMutex* THMutex_new(void); +THMutex* THMutex_newWithId(long id); +long THMutex_id(THMutex *self); +int THMutex_lock(THMutex *self); +int THMutex_unlock(THMutex *self); +int THMutex_trylock(THMutex *self); +void THMutex_free(THMutex *self); + +THCondition* THCondition_new(void); +THCondition* THCondition_newWithId(long id); +long THCondition_id(THCondition *self); +int THCondition_signal(THCondition *self); +int THCondition_wait(THCondition *self, THMutex *mutex); +void THCondition_free(THCondition *self); + +THSemaphore* THSemaphore_new(int initValue); +THSemaphore* THSemaphore_newWithId(long id); +long THSemaphore_id(THSemaphore *self); +int THSemaphore_signal(THSemaphore *self); +int THSemaphore_wait(THSemaphore *self); +int THSemaphore_trywait(THSemaphore *self); +void THSemaphore_free(THSemaphore *self); + +#endif diff --git a/fastnn/threads/lib/init.c b/fastnn/threads/lib/init.c new file mode 100644 index 0000000..4042189 --- /dev/null +++ b/fastnn/threads/lib/init.c @@ -0,0 +1,27 @@ +#include +#include + +#if LUA_VERSION_NUM == 501 +static void luaL_setfuncs(lua_State *L, const luaL_Reg *l, int nup) +{ + luaL_checkstack(L, nup+1, "too many upvalues"); + for (; l->name != NULL; l++) { /* fill the table with given functions */ + int i; + lua_pushstring(L, l->name); + for (i = 0; i < nup; i++) /* copy upvalues to the top */ + lua_pushvalue(L, -(nup+1)); + lua_pushcclosure(L, l->func, nup); /* closure with those upvalues */ + lua_settable(L, -(nup + 3)); + } + lua_pop(L, nup); /* remove upvalues */ +} +#endif + +#include "threads.c" + +int luaopen_libthreads(lua_State *L) +{ + lua_newtable(L); + thread_init_pkg(L); + return 1; +} diff --git a/fastnn/threads/lib/luaTHRD.h b/fastnn/threads/lib/luaTHRD.h new file mode 100644 index 0000000..3760bdb --- /dev/null +++ b/fastnn/threads/lib/luaTHRD.h @@ -0,0 +1,84 @@ +#ifndef LUA_THRD_INC +#define LUA_THRD_INC + +static int luaTHRD_pushudata(lua_State *L, void *ptr, const char* typename) +{ + void **udata = lua_newuserdata(L, sizeof(void*)); + if(udata) { + *udata = ptr; + luaL_getmetatable(L, typename); + lua_setmetatable(L, -2); + return 1; + } + return 0; +} + +static void *luaTHRD_checkudata(lua_State *L, int narg, const char *typename) +{ + void **udata = luaL_checkudata(L, narg, typename); + if(udata) + return *udata; + else + return NULL; +} + +static void *luaTHRD_toudata(lua_State *L, int narg, const char *typename) +{ + void **udata = lua_touserdata(L, narg); + if(udata) { + if(lua_getmetatable(L, -1)) { + luaL_getmetatable(L, typename); + if(lua_equal(L, -1, -2)) { + lua_pop(L, 2); + return *udata; + } + else { + lua_pop(L, 2); + return NULL; + } + } + else + return NULL; + } + else + return NULL; +} + +static int luaTHRD_ctor(lua_State *L) +{ + if(!lua_istable(L, 1)) /* dummy ctor table */ + luaL_error(L, "ctor: table expected"); + lua_getmetatable(L, 1); + lua_remove(L, 1); /* dummy ctor table */ + if(!lua_istable(L, -1)) + luaL_error(L, "ctor: no metatable found"); + lua_pushstring(L, "__new"); + lua_rawget(L, -2); + lua_remove(L, -2); /* forget about metatable */ + if(!lua_isfunction(L, -1)) + luaL_error(L, "ctor: __new appears to be not a function"); + lua_insert(L, 1); /* ctor first, arguments follow */ + lua_call(L, lua_gettop(L)-1, LUA_MULTRET); + return lua_gettop(L); +} + +static void luaTHRD_pushctortable(lua_State *L, lua_CFunction ctor, const char* typename) +{ + lua_newtable(L); /* empty useless dude */ + lua_newtable(L); /* metatable of the dude */ + lua_pushstring(L, "__index"); + luaL_getmetatable(L, typename); + lua_rawset(L, -3); + lua_pushstring(L, "__newindex"); + luaL_getmetatable(L, typename); + lua_rawset(L, -3); + lua_pushstring(L, "__new"); /* __call will look into there */ + lua_pushcfunction(L, ctor); + lua_rawset(L, -3); + lua_pushstring(L, "__call"); /* pop the table and calls __new */ + lua_pushcfunction(L, luaTHRD_ctor); + lua_rawset(L, -3); + lua_setmetatable(L, -2); +} + +#endif diff --git a/fastnn/threads/lib/threads.c b/fastnn/threads/lib/threads.c new file mode 100644 index 0000000..bf2a5ec --- /dev/null +++ b/fastnn/threads/lib/threads.c @@ -0,0 +1,355 @@ +#include +#include +#include +#include + +#include "THThread.h" +#include "luaTHRD.h" + +#include +#include + +static int newthread(void *code_) +{ + char *code = code_; + lua_State *L = luaL_newstate(); + + if(!L) { + printf("THREAD FATAL ERROR: could not create lua state\n"); + return -1; + } + luaL_openlibs(L); + + if(luaL_loadstring(L, code)) { + printf("FATAL THREAD PANIC: (loadstring) %s\n", lua_tolstring(L, -1, NULL)); + free(code); + lua_close(L); + return -1; + } + free(code); + if(lua_pcall(L, 0, 0, 0)) { + printf("FATAL THREAD PANIC: (pcall) %s\n", lua_tolstring(L, -1, NULL)); + lua_close(L); + return -1; + } + + lua_close(L); + return 0; +} + +static int thread_new(lua_State *L) +{ + THThread *thread = NULL; + size_t len = 0; + const char *code = luaL_checklstring(L, 1, &len); + char *code_dup = malloc(len+1); + if(!code_dup) + luaL_error(L, "threads: out of memory"); + memcpy(code_dup, code, len+1); + + thread = THThread_new(newthread, (void*)code_dup); + if(!thread) + luaL_error(L, "threads: thread new failed"); + luaTHRD_pushudata(L, thread, "threads.Thread"); + + return 1; +} + +static int thread_tostring(lua_State *L) +{ + char str[128]; + THThread *thread = luaTHRD_checkudata(L, 1, "threads.Thread"); + snprintf(str, 128, "threads.Thread <%lx>", THThread_id(thread)); + lua_pushstring(L, str); + return 1; +} + +static int thread_id(lua_State *L) +{ + THThread *thread = luaTHRD_checkudata(L, 1, "threads.Thread"); + lua_pushinteger(L, THThread_id(thread)); + return 1; +} + +static int thread_free(lua_State *L) +{ + THThread *thread = luaTHRD_checkudata(L, 1, "threads.Thread"); + THThread_free(thread); + return 0; +} + +static int mutex_new(lua_State *L) +{ + THMutex *mutex = NULL; + if(lua_gettop(L) == 0) { + mutex = THMutex_new(); + } + else if(lua_gettop(L) == 1) { + long id = luaL_checklong(L, 1); + mutex = THMutex_newWithId(id); + } + else + luaL_error(L, "threads: mutex new invalid arguments"); + if(!mutex) + luaL_error(L, "threads: mutex new failed"); + luaTHRD_pushudata(L, mutex, "threads.Mutex"); + return 1; +} + +static int mutex_tostring(lua_State *L) +{ + char str[128]; + THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex"); + snprintf(str, 128, "threads.Mutex <%lx>", THMutex_id(mutex)); + lua_pushstring(L, str); + return 1; +} + +static int mutex_id(lua_State *L) +{ + THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex"); + lua_pushinteger(L, THMutex_id(mutex)); + return 1; +} + +static int mutex_lock(lua_State *L) +{ + THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex"); + if(THMutex_lock(mutex)) + luaL_error(L, "threads: mutex lock failed"); + return 0; +} + +static int mutex_unlock(lua_State *L) +{ + THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex"); + if(THMutex_unlock(mutex)) + luaL_error(L, "threads: mutex unlock failed"); + return 0; +} + +static int mutex_free(lua_State *L) +{ + THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex"); + THMutex_free(mutex); + return 0; +} + +static int condition_new(lua_State *L) +{ + THCondition *condition = NULL; + if(lua_gettop(L) == 0) { + condition = THCondition_new(); + } + else if(lua_gettop(L) == 1) { + long id = luaL_checklong(L, 1); + condition = THCondition_newWithId(id); + } + else + luaL_error(L, "threads: condition new invalid arguments"); + if(!condition) + luaL_error(L, "threads: condition new failed"); + luaTHRD_pushudata(L, condition, "threads.Condition"); + return 1; +} + +static int condition_tostring(lua_State *L) +{ + char str[128]; + THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition"); + snprintf(str, 128, "threads.Condition <%lx>", THCondition_id(condition)); + lua_pushstring(L, str); + return 1; +} + +static int condition_id(lua_State *L) +{ + THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition"); + lua_pushinteger(L, THCondition_id(condition)); + return 1; +} + +static int condition_free(lua_State *L) +{ + THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition"); + if(!condition) + luaL_error(L, "threads: condition free failed"); + THCondition_free(condition); + return 0; +} + +static int condition_signal(lua_State *L) +{ + THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition"); + if(THCondition_signal(condition)) + luaL_error(L, "threads: condition signal failed"); + return 0; +} + +static int condition_wait(lua_State *L) +{ + THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition"); + THMutex *mutex = luaTHRD_checkudata(L, 2, "threads.Mutex"); + if(THCondition_wait(condition, mutex)) + luaL_error(L, "threads: condition wait failed"); + return 0; +} + +static int semaphore_new(lua_State *L) +{ + THSemaphore* semaphore = NULL; + if(lua_gettop(L) == 1) { + int initValue = luaL_checkinteger(L, 1); + semaphore = THSemaphore_new(initValue); + } + else + luaL_error(L, "threads: semaphore new invalid arguments"); + if (!semaphore) + luaL_error(L, "threads: semaphore new failed"); + luaTHRD_pushudata(L, semaphore, "threads.Semaphore"); + return 1; +} + +static int semaphore_newfromid(lua_State *L) +{ + THSemaphore* semaphore = NULL; + if(lua_gettop(L) == 1) { + long id = luaL_checklong(L, 1); + semaphore = THSemaphore_newWithId(id); + } + else + luaL_error(L, "threads: semaphore new invalid arguments"); + if(!semaphore) + luaL_error(L, "threads: semaphore new failed"); + luaTHRD_pushudata(L, semaphore, "threads.Semaphore"); + return 1; +} + +static int semaphore_id(lua_State *L) +{ + THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore"); + lua_pushinteger(L, THSemaphore_id(semaphore)); + return 1; +} + +static int semaphore_signal(lua_State *L) +{ + THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore"); + if(THSemaphore_signal(semaphore)) + luaL_error(L, "threads: semaphore signal failed"); + return 0; +} + +static int semaphore_wait(lua_State *L) +{ + THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore"); + if(THSemaphore_wait(semaphore)) + luaL_error(L, "threads: semaphore wait failed"); + return 0; +} + +static int semaphore_trywait(lua_State *L) +{ + THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore"); + int wait = THSemaphore_trywait(semaphore); + lua_pushinteger(L, wait); + return 1; +} + +static void semaphore_free(lua_State *L) +{ + THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore"); + if(!semaphore) + luaL_error(L, "threads: semaphore free failed"); + THSemaphore_free(semaphore); +} + +static const struct luaL_Reg thread__ [] = { + {"new", thread_new}, + {"__tostring", thread_tostring}, + {"id", thread_id}, + {"free", thread_free}, + {NULL, NULL} +}; + +static const struct luaL_Reg mutex__ [] = { + {"new", mutex_new}, + {"__tostring", mutex_tostring}, + {"id", mutex_id}, + {"lock", mutex_lock}, + {"unlock", mutex_unlock}, + {"free", mutex_free}, + {NULL, NULL} +}; + +static const struct luaL_Reg condition__ [] = { + {"new", condition_new}, + {"__tostring", condition_tostring}, + {"id", condition_id}, + {"signal", condition_signal}, + {"wait", condition_wait}, + {"free", condition_free}, + {NULL, NULL} +}; + +static const struct luaL_Reg semaphore__ [] = { + {"new", semaphore_new}, + {"newfromid", semaphore_newfromid}, + {"id", semaphore_id}, + {"signal", semaphore_signal}, + {"wait", semaphore_wait}, + {"trywait", semaphore_trywait}, + {"free", semaphore_free}, + {NULL, NULL} +}; + +static void thread_init_pkg(lua_State *L) +{ + if(!luaL_newmetatable(L, "threads.Thread")) + luaL_error(L, "threads: threads.Thread type already exists"); + luaL_setfuncs(L, thread__, 0); + lua_pushstring(L, "__index"); + lua_pushvalue(L, -2); + lua_rawset(L, -3); + lua_pop(L, 1); + + if(!luaL_newmetatable(L, "threads.Mutex")) + luaL_error(L, "threads: threads.Mutex type already exists"); + luaL_setfuncs(L, mutex__, 0); + lua_pushstring(L, "__index"); + lua_pushvalue(L, -2); + lua_rawset(L, -3); + lua_pop(L, 1); + + if(!luaL_newmetatable(L, "threads.Condition")) + luaL_error(L, "threads: threads.Condition type already exists"); + luaL_setfuncs(L, condition__, 0); + lua_pushstring(L, "__index"); + lua_pushvalue(L, -2); + lua_rawset(L, -3); + lua_pop(L, 1); + + if(!luaL_newmetatable(L, "threads.Semaphore")) + luaL_error(L, "threads: threads.Semaphore type already exists"); + luaL_setfuncs(L, semaphore__, 0); + lua_pushstring(L, "__index"); + lua_pushvalue(L, -2); + lua_rawset(L, -3); + lua_pop(L, 1); + + lua_pushstring(L, "Thread"); + luaTHRD_pushctortable(L, thread_new, "threads.Thread"); + lua_rawset(L, -3); + + lua_pushstring(L, "Mutex"); + luaTHRD_pushctortable(L, mutex_new, "threads.Mutex"); + lua_rawset(L, -3); + + lua_pushstring(L, "Condition"); + luaTHRD_pushctortable(L, condition_new, "threads.Condition"); + lua_rawset(L, -3); + + lua_pushstring(L, "Semaphore"); + luaTHRD_pushctortable(L, semaphore_new, "threads.Semaphore"); + lua_rawset(L, -3); +} diff --git a/fastnn/threads/test/test-low-level.lua b/fastnn/threads/test/test-low-level.lua new file mode 100644 index 0000000..aea31db --- /dev/null +++ b/fastnn/threads/test/test-low-level.lua @@ -0,0 +1,39 @@ +local t = require 'libthreads' + +local m = t.Mutex() +local c = t.Condition() +print(string.format('| main thread mutex: 0x%x', m:id())) +print(string.format('| main thread condition: 0x%x', c:id())) + +local code = string.format([[ + local t = require 'libthreads' + local c = t.Condition(%d) + print('|| hello from thread') + print(string.format('|| thread condition: 0x%%x', c:id())) + print('|| doing some stuff in thread...') + local x = 0 + for i=1,10000 do + for i=1,10000 do + x = x + math.sin(i) + end + x = x / 10000 + end + print(string.format('|| ...ok (x=%%f)', x)) + c:signal() +]], c:id()) + +print(code) + +local thread = t.Thread(code) + + +print('| waiting for thread...') +m:lock() +c:wait(m) +print('| thread finished!') + +thread:free() +m:free() +c:free() + +print('| done') diff --git a/fastnn/threads/test/test-threads-async.lua b/fastnn/threads/test/test-threads-async.lua new file mode 100644 index 0000000..68bcd35 --- /dev/null +++ b/fastnn/threads/test/test-threads-async.lua @@ -0,0 +1,66 @@ +local threads = require 'threads' + +local nthread = 4 +local njob = 100 + +local pool = threads.Threads( + nthread, + function(threadid) + print('starting a new thread/state number ' .. threadid) + end +) + + +local jobid = 0 +local result -- DO NOT put this in get +local function get() + + -- fill up the queue as much as we can + -- this will not block + while jobid < njob and pool:acceptsjob() do + jobid = jobid + 1 + + pool:addjob( + function(jobid) + print(string.format('thread ID %d is performing job %d', __threadid, jobid)) + return string.format("job output from job %d", jobid) + end, + + function(jobres) + result = jobres + end, + + jobid + ) + end + + -- is there still something to do? + if pool:hasjob() then + pool:dojob() -- yes? do it! + if pool:haserror() then -- check for errors + pool:synchronize() -- finish everything and throw error + end + return result + end + +end + +local jobdone = 0 +repeat + -- get something asynchronously + local res = get() + + -- do something with res (if any) + if res then + print(res) + jobdone = jobdone + 1 + end + +until not res -- until there is nothing remaining + +assert(jobid == 100) +assert(jobdone == 100) + +print('PASSED') + +pool:terminate() diff --git a/fastnn/threads/test/test-threads-multiple.lua b/fastnn/threads/test/test-threads-multiple.lua new file mode 100644 index 0000000..4429696 --- /dev/null +++ b/fastnn/threads/test/test-threads-multiple.lua @@ -0,0 +1,15 @@ +local threads = require 'threads' + +for i=1,1000 do + io.write(string.format('%04d.', tonumber(i))) + io.flush() + local pool = + threads.Threads( + 4, + function(threadid) + require 'torch' + end + ) +end +print() +print('PASSED') diff --git a/fastnn/threads/test/test-threads-shared.lua b/fastnn/threads/test/test-threads-shared.lua new file mode 100644 index 0000000..3d63851 --- /dev/null +++ b/fastnn/threads/test/test-threads-shared.lua @@ -0,0 +1,111 @@ +require 'torch' + +local threads = require 'threads' +local status, tds = pcall(require, 'tds') +tds = status and tds or nil + +local nthread = 4 +local njob = 10 +local msg = "hello from a satellite thread" + +threads.Threads.serialization('threads.sharedserialize') + +local x = {} +local xh = tds and tds.hash() or {} +local xs = {} +local z = tds and tds.hash() or {} +local D = 10 +local K = tds and 100000 or 100 -- good luck in non-shared (30M) +for i=1,njob do + x[i] = torch.ones(D) + xh[i] = torch.ones(D) + xs[i] = torch.FloatStorage(D):fill(1) + for j=1,K do + z[(i-1)*K+j] = "blah" .. i .. j + end +end +collectgarbage() +collectgarbage() + +print('GO') + +local pool = threads.Threads( + nthread, + function(threadIdx) + pcall(require, 'tds') + print('starting a new thread/state number:', threadIdx) + gmsg = msg -- we copy here an upvalue of the main thread + end +) + +local jobdone = 0 +for i=1,njob do + pool:addjob( + function() + assert(x[i]:sum() == D) + assert(xh[i]:sum() == D) + assert(torch.FloatTensor(xs[i]):sum() == D) + for j=1,K do + assert(z[(i-1)*K+j] == "blah" .. i .. j) + end + x[i]:add(1) + xh[i]:add(1) + torch.FloatTensor(xs[i]):add(1) + print(string.format('%s -- thread ID is %x', gmsg, __threadid)) + collectgarbage() + collectgarbage() + return __threadid + end, + + function(id) + print(string.format("task %d finished (ran on thread ID %x)", i, id)) + jobdone = jobdone + 1 + end + ) +end + +for i=1,njob do + pool:addjob( + function() + collectgarbage() + collectgarbage() + end + ) +end + +pool:synchronize() + +print(string.format('%d jobs done', jobdone)) + +pool:terminate() + +-- did we do the job in shared mode? +for i=1,njob do + assert(x[i]:sum() == 2*D) + assert(xh[i]:sum() == 2*D) + assert(torch.FloatTensor(xs[i]):sum() == 2*D) +end + +-- serialize and zero x +local str = torch.serialize(x) +local strh = torch.serialize(xh) +local strs = torch.serialize(xs) +for i=1,njob do + x[i]:zero() + xh[i]:zero() + xs[i]:fill(0) +end + +-- dude, check that unserialized x does not point on x +local y = torch.deserialize(str) +local yh = torch.deserialize(strh) +local ys = torch.deserialize(strs) +for i=1,njob do + assert(y[i]:sum() == 2*D) + assert(yh[i]:sum() == 2*D) + assert(torch.FloatTensor(ys[i]):sum() == 2*D) +end + +pool:terminate() + +print('PASSED') diff --git a/fastnn/threads/test/test-threads.lua b/fastnn/threads/test/test-threads.lua new file mode 100644 index 0000000..e49a381 --- /dev/null +++ b/fastnn/threads/test/test-threads.lua @@ -0,0 +1,20 @@ +local clib = require 'libthreads' + + +nthread = 1 + +str='ad;alkfakd;af' +code = [[ function func(str) print(str) end; print(str);]] +print(code) + +--thread = clib.Thread(code) + + +--thread:free() + + +require 'threads' + +tt = threads.Thread(code) + + diff --git a/fastnn/threads/threads-scm-1.rockspec b/fastnn/threads/threads-scm-1.rockspec new file mode 100644 index 0000000..02535d4 --- /dev/null +++ b/fastnn/threads/threads-scm-1.rockspec @@ -0,0 +1,36 @@ +package = "threads" +version = "scm-1" +source = { + url = "https://github.com/uphantom/nerv-threads.git" +} +description = { + summary = "Thread control for Nerv fastnn", + detailed = [[ + ]], + homepage = "https://github.com/uphantom/nerv-threads", + license = "BSD" +} +dependencies = { + "nerv >= scm-1", + "lua >= 5.1" +} +build = { + type = "make", + build_variables = { + CFLAGS="$(CFLAGS)", + LIBFLAG="$(LIBFLAG)", + LUA_LIBDIR="$(LUA_LIBDIR)", + LUA_BINDIR="$(LUA_BINDIR)", + LUA_INCDIR="$(LUA_INCDIR)", + INST_PREFIX="$(PREFIX)", + LUA="$(LUA)", + }, + install_variables = { + LUA_BINDIR="$(LUA_BINDIR)", + INST_PREFIX="$(PREFIX)", + INST_BINDIR="$(BINDIR)", + INST_LIBDIR="$(LIBDIR)", + INST_LUADIR="$(LUADIR)", + INST_CONFDIR="$(CONFDIR)", + }, +} diff --git a/fastnn/threads/threads.lua b/fastnn/threads/threads.lua new file mode 100644 index 0000000..2db7d51 --- /dev/null +++ b/fastnn/threads/threads.lua @@ -0,0 +1,14 @@ +local threads = {} + +local C = require 'libthreads' + +threads.Thread = C.Thread +threads.Mutex = C.Mutex +threads.Condition = C.Condition +threads.Semaphore = C.Semaphore +--threads.Threads = require 'threads.threads' + +-- only for backward compatibility (boo) +-- setmetatable(threads, getmetatable(threads.Threads)) + +-- return threads -- cgit v1.2.3-70-g09d2 From c8c6cc75f26db476ff99d98f707a0294f72e899c Mon Sep 17 00:00:00 2001 From: uphantom Date: Mon, 31 Aug 2015 08:59:35 +0800 Subject: fastnn compile --- fastnn/example/asgd_sds_trainer.lua | 93 ++++++----- fastnn/example/fastnn_baseline.lua | 258 ++++++++++++++++++++++++++++++ fastnn/lib/ModelSync.c | 305 ++++++++++++++++++++++++++++++++++++ fastnn/threads/Makefile | 1 + nerv/io/sgd_buffer.lua | 4 +- nerv/lib/common.h | 2 + nerv/lib/matrix/cumatrix.c | 2 +- nerv/nn/layer_repo.lua | 2 +- 8 files changed, 624 insertions(+), 43 deletions(-) create mode 100644 fastnn/example/fastnn_baseline.lua create mode 100644 fastnn/lib/ModelSync.c (limited to 'fastnn/threads') diff --git a/fastnn/example/asgd_sds_trainer.lua b/fastnn/example/asgd_sds_trainer.lua index 44611b2..cf1c7a6 100644 --- a/fastnn/example/asgd_sds_trainer.lua +++ b/fastnn/example/asgd_sds_trainer.lua @@ -1,19 +1,22 @@ -package.path="/home/slhome/wd007/.luarocks/share/lua/5.1/?.lua;/home/slhome/wd007/.luarocks/share/lua/5.1/?/init.lua;/sgfs/users/wd007/src/nerv/install/share/lua/5.1/?.lua;/sgfs/users/wd007/src/nerv/install/share/lua/5.1/?/init.lua;"..package.path; -package.cpath="/home/slhome/wd007/.luarocks/lib/lua/5.1/?.so;/sgfs/users/wd007/src/nerv/install/lib/lua/5.1/?.so;"..package.cpath + +NERV_ROOT = "/sgfs/users/wd007/src/nerv-2" + +env = string.format([[ +package.path="/home/slhome/wd007/.luarocks/share/lua/5.1/?.lua;/home/slhome/wd007/.luarocks/share/lua/5.1/?/init.lua;%s/install/share/lua/5.1/?.lua;%s/install/share/lua/5.1/?/init.lua;"..package.path; +package.cpath="/home/slhome/wd007/.luarocks/lib/lua/5.1/?.so;%s/install/lib/lua/5.1/?.so;"..package.cpath local k,l,_=pcall(require,"luarocks.loader") _=k and l.add_context("nerv","scm-1") +]], NERV_ROOT, NERV_ROOT, NERV_ROOT) + +loadstring(env)() + require 'nerv' require 'fastnn' require 'libhtkio' require 'threads' -dofile("fastnn/fastnn_baseline.lua") +dofile("fastnn/example/fastnn_baseline.lua") -env = string.format([[ -package.path="/home/slhome/wd007/.luarocks/share/lua/5.1/?.lua;/home/slhome/wd007/.luarocks/share/lua/5.1/?/init.lua;/sgfs/users/wd007/src/nerv/install/share/lua/5.1/?.lua;/sgfs/users/wd007/src/nerv/install/share/lua/5.1/?/init.lua;"..package.path; -package.cpath="/home/slhome/wd007/.luarocks/lib/lua/5.1/?.so;/sgfs/users/wd007/src/nerv/install/lib/lua/5.1/?.so;"..package.cpath -local k,l,_=pcall(require,"luarocks.loader") _=k and l.add_context("nerv","scm-1") -]]) train_thread_code = [[ @@ -21,7 +24,9 @@ train_thread_code = [[ require 'nerv' require 'fastnn' -dofile("fastnn/fastnn_baseline.lua") +require 'libhtkio' + +dofile("fastnn/example/fastnn_baseline.lua") os.execute("export MALLOC_CHECK_=0") local thread_idx = %d @@ -49,39 +54,35 @@ else gconf.tr_scp = scp_file end +share_mutex:lock() + gconf.randomize = bp gconf.lrate = lrate gconf.batch_size = batch_size -gconf.network[1] = nnet_in -nerv.info_stderr("input network: %%s", gconf.network[1]) +gconf.initialized_param[2] = nnet_in +nerv.info_stderr("input network: %%s", gconf.initialized_param[2]) --nerv.info_stderr(gconf.randomize) nerv.info_stderr("input batch_size: %%d", gconf.batch_size) nerv.info_stderr("input scp_file: %%s", scp_file) nerv.info_stderr("input lrate: %%f", gconf.lrate) -share_mutex:lock() + share_gpu:select_gpu() nerv.context = nerv.CCuContext() --print(nerv.context) -nerv.info_stderr("thread %%d loading transf ...", thread_idx) -local param_transf_repo = nerv.ParamRepo() -param_transf_repo:import(gconf.transf, nil, gconf) -local transf_node_repo = make_transf_node_repo(param_transf_repo) -local transf_layer_repo = make_transf_link_repo(transf_node_repo, param_transf_repo) -local transf = transf_layer_repo:get_layer("global_transf") - -nerv.info_stderr("thread %%d loading network ...", thread_idx) -local param_network_repo = nerv.ParamRepo() -param_network_repo:import(gconf.network, nil, gconf) -local network_node_repo = make_network_node_repo(param_network_repo) -local network_layer_repo = make_network_link_repo(network_node_repo, param_network_repo) -local network = get_network(network_layer_repo) +nerv.info_stderr("thread %%d loading parameters ...", thread_idx) +local param_repo = nerv.ParamRepo() +param_repo:import(gconf.initialized_param, nil, gconf) +local layer_repo = make_layer_repo(param_repo) +local network = get_network(layer_repo) +local global_transf = get_global_transf(layer_repo) + share_mutex:unlock() -local buffer = make_buffer(make_readers(nil, transf_layer_repo, feat_repo_shareid, data_mutex_shareid)) +local buffer = make_buffer(make_readers(nil, layer_repo, feat_repo_shareid, data_mutex_shareid)) local input_order = get_input_order() @@ -98,21 +99,35 @@ local input_order = get_input_order() gconf.cnt = gconf.cnt + 1 if gconf.cnt == 2000 then - print_stat(network_node_repo) + print_stat(layer_repo) gconf.cnt = 0 end local input = {} + + for i, e in ipairs(input_order) do + local id = e.id + if data[id] == nil then + nerv.error("input data %%s not found", id) + end + local transformed + if e.global_transf then + transformed = nerv.speech_utils.global_transf(data[id], + global_transf, + gconf.frm_ext or 0, 0, + gconf) + else + transformed = data[id] + end + table.insert(input, transformed) + end + + local output = {nerv.CuMatrixFloat(gconf.batch_size, 1)} + err_output = {} + for i = 1, #input do + table.insert(err_output, input[i]:create()) + end - for i, id in ipairs(input_order) do - if data[id] == nil then - nerv.error("input data %%s not found", id) - end - table.insert(input, data[id]) - end - - local output = {nerv.CuMatrixFloat(gconf.batch_size, 1)} - err_output = {input[1]:create()} network:propagate(input, output) if bp then @@ -132,7 +147,7 @@ local input_order = get_input_order() end --print_stat(network_node_repo) - local ce_crit = network_node_repo:get_layer("ce_crit") + local ce_crit = layer_repo:get_layer("ce_crit") local xent = fastnn.CXent(ce_crit.total_frames, ce_crit.total_correct, ce_crit.total_ce, ce_crit.total_ce) share_master:LockModel() @@ -236,8 +251,8 @@ min_iter = 1 max_iter = 20 min_halving = 0 gconf.batch_size = 256 -pf0 = get_filename(gconf.network[1]) -nnet_in = gconf.network[1] +pf0 = get_filename(gconf.initialized_param[2]) +nnet_in = gconf.initialized_param[2] nnet_out = "" sds_scp = "tr_sds_"..string.format("%.4d", math.random()*10000)..".scp" --"tr_sds.scp" sds_factor = 0.4 diff --git a/fastnn/example/fastnn_baseline.lua b/fastnn/example/fastnn_baseline.lua new file mode 100644 index 0000000..6e774de --- /dev/null +++ b/fastnn/example/fastnn_baseline.lua @@ -0,0 +1,258 @@ +require 'htk_io' + +gconf = {lrate = 0.2, wcost = 1e-6, momentum = 0.9, + cumat_type = nerv.CuMatrixFloat, + mmat_type = nerv.MMatrixFloat, + frm_ext = 5, + frm_trim = 5, + batch_size = 256, + buffer_size = 81920, + rearrange = true, + tr_scp = "/sgfs/users/wd007/asr/baseline_chn_50h/finetune/finetune_baseline/train.scp", + cv_scp = "/sgfs/users/wd007/asr/baseline_chn_50h/finetune/finetune_baseline/train_cv.scp", + htk_conf = "/sgfs/users/wd007/asr/baseline_chn_50h/finetune/finetune_baseline/fbank_d_a_z.conf", + initialized_param = {"/sgfs/users/wd007/src/nerv/tools/nerv.global.transf", + "/sgfs/users/wd007/src/nerv/tools/nerv.svd0.55_3000h_iter1.init"}, + debug = false} + +function make_layer_repo(param_repo) + local layer_repo = nerv.LayerRepo( + { + -- global transf + ["nerv.BiasLayer"] = + { + blayer1 = {{bias = "bias1"}, {dim_in = {1320}, dim_out = {1320}}}, + }, + ["nerv.WindowLayer"] = + { + wlayer1 = {{window = "window1"}, {dim_in = {1320}, dim_out = {1320}}}, + }, + -- biased linearity + ["nerv.AffineLayer"] = + { + affine0 = {{ltp = "affine0_ltp", bp = "affine0_bp"}, + {dim_in = {1320}, dim_out = {2048}}}, + affine1 = {{ltp = "affine1_ltp", bp = "affine1_bp"}, + {dim_in = {2048}, dim_out = {367}}}, + affine2 = {{ltp = "affine2_ltp", bp = "affine2_bp"}, + {dim_in = {367}, dim_out = {2048}}}, + affine3 = {{ltp = "affine3_ltp", bp = "affine3_bp"}, + {dim_in = {2048}, dim_out = {408}}}, + affine4 = {{ltp = "affine4_ltp", bp = "affine4_bp"}, + {dim_in = {408}, dim_out = {2048}}}, + affine5 = {{ltp = "affine5_ltp", bp = "affine5_bp"}, + {dim_in = {2048}, dim_out = {368}}}, + affine6 = {{ltp = "affine6_ltp", bp = "affine6_bp"}, + {dim_in = {368}, dim_out = {2048}}}, + affine7 = {{ltp = "affine7_ltp", bp = "affine7_bp"}, + {dim_in = {2048}, dim_out = {303}}}, + affine8 = {{ltp = "affine8_ltp", bp = "affine8_bp"}, + {dim_in = {303}, dim_out = {2048}}}, + affine9 = {{ltp = "affine9_ltp", bp = "affine9_bp"}, + {dim_in = {2048}, dim_out = {277}}}, + affine10 = {{ltp = "affine10_ltp", bp = "affine10_bp"}, + {dim_in = {277}, dim_out = {2048}}}, + affine11 = {{ltp = "affine11_ltp", bp = "affine11_bp"}, + {dim_in = {2048}, dim_out = {361}}}, + affine12 = {{ltp = "affine12_ltp", bp = "affine12_bp"}, + {dim_in = {361}, dim_out = {2048}}}, + affine13 = {{ltp = "affine13_ltp", bp = "affine13_bp"}, + {dim_in = {2048}, dim_out = {441}}}, + affine14 = {{ltp = "affine14_ltp", bp = "affine14_bp"}, + {dim_in = {441}, dim_out = {10092}}}, + }, + ["nerv.SigmoidLayer"] = + { + sigmoid0 = {{}, {dim_in = {2048}, dim_out = {2048}}}, + sigmoid1 = {{}, {dim_in = {2048}, dim_out = {2048}}}, + sigmoid2 = {{}, {dim_in = {2048}, dim_out = {2048}}}, + sigmoid3 = {{}, {dim_in = {2048}, dim_out = {2048}}}, + sigmoid4 = {{}, {dim_in = {2048}, dim_out = {2048}}}, + sigmoid5 = {{}, {dim_in = {2048}, dim_out = {2048}}}, + sigmoid6 = {{}, {dim_in = {2048}, dim_out = {2048}}}, + }, + ["nerv.SoftmaxCELayer"] = -- softmax + ce criterion layer for finetune output + { + ce_crit = {{}, {dim_in = {10092, 1}, dim_out = {1}, compressed = true}} + }, + ["nerv.SoftmaxLayer"] = -- softmax for decode output + { + softmax = {{}, {dim_in = {10092}, dim_out = {10092}}} + } + }, param_repo, gconf) + + layer_repo:add_layers( + { + ["nerv.DAGLayer"] = + { + global_transf = {{}, { + dim_in = {1320}, dim_out = {1320}, + sub_layers = layer_repo, + connections = + { + ["[1]"] = "blayer1[1]", + ["blayer1[1]"] = "wlayer1[1]", + ["wlayer1[1]"] = "[1]" + } + }}, + main = {{}, { + dim_in = {1320}, dim_out = {10092}, + sub_layers = layer_repo, + connections = { + ["[1]"] = "affine0[1]", + ["affine0[1]"] = "sigmoid0[1]", + ["sigmoid0[1]"] = "affine1[1]", + ["affine1[1]"] = "affine2[1]", + ["affine2[1]"] = "sigmoid1[1]", + ["sigmoid1[1]"] = "affine3[1]", + ["affine3[1]"] = "affine4[1]", + ["affine4[1]"] = "sigmoid2[1]", + ["sigmoid2[1]"] = "affine5[1]", + ["affine5[1]"] = "affine6[1]", + ["affine6[1]"] = "sigmoid3[1]", + ["sigmoid3[1]"] = "affine7[1]", + ["affine7[1]"] = "affine8[1]", + ["affine8[1]"] = "sigmoid4[1]", + ["sigmoid4[1]"] = "affine9[1]", + ["affine9[1]"] = "affine10[1]", + ["affine10[1]"] = "sigmoid5[1]", + ["sigmoid5[1]"] = "affine11[1]", + ["affine11[1]"] = "affine12[1]", + ["affine12[1]"] = "sigmoid6[1]", + ["sigmoid6[1]"] = "affine13[1]", + ["affine13[1]"] = "affine14[1]", + ["affine14[1]"] = "[1]", + } + }} + } + }, param_repo, gconf) + + layer_repo:add_layers( + { + ["nerv.DAGLayer"] = + { + ce_output = {{}, { + dim_in = {1320, 1}, dim_out = {1}, + sub_layers = layer_repo, + connections = { + ["[1]"] = "main[1]", + ["main[1]"] = "ce_crit[1]", + ["[2]"] = "ce_crit[2]", + ["ce_crit[1]"] = "[1]" + } + }}, + softmax_output = {{}, { + dim_in = {1320}, dim_out = {10092}, + sub_layers = layer_repo, + connections = { + ["[1]"] = "main[1]", + ["main[1]"] = "softmax[1]", + ["softmax[1]"] = "[1]" + } + }} + } + }, param_repo, gconf) + + return layer_repo +end + + +function get_network(layer_repo) + return layer_repo:get_layer("ce_output") +end + +function get_decode_network(layer_repo) + return layer_repo:get_layer("softmax_output") +end + +function get_global_transf(layer_repo) + return layer_repo:get_layer("global_transf") +end + + + +function make_readers(scp_file, layer_repo, feat_repo_shareid, data_mutex_shareid) + return { + {reader = nerv.TNetReader(gconf, + { + id = "main_scp", + scp_file = scp_file, + conf_file = gconf.htk_conf, + frm_ext = gconf.frm_ext, + mlfs = { + phone_state = { + file = "/sgfs/users/wd007/asr/baseline_chn_50h/finetune/finetune_baseline/ref.mlf", + format = "map", + format_arg = "/sgfs/users/wd007/asr/baseline_chn_50h/finetune/finetune_baseline/dict", + dir = "*/", + ext = "lab" + } + }, + global_transf = layer_repo:get_layer("global_transf") + }, feat_repo_shareid, data_mutex_shareid), + data = {main_scp = 1320, phone_state = 1}} + } +end + +function get_feat_id() + return {main_scp = true} +end + + +function make_buffer(readers) + return nerv.SGDBuffer(gconf, + { + buffer_size = gconf.buffer_size, + randomize = gconf.randomize, + readers = readers, + use_gpu = true + }) +end + +function get_input_order() + return {{id = "main_scp", global_transf = true}, + {id = "phone_state"}} +end + +function get_accuracy(layer_repo) + local ce_crit = layer_repo:get_layer("ce_crit") + return ce_crit.total_correct / ce_crit.total_frames * 100 +end + +function print_stat(layer_repo) + local ce_crit = layer_repo:get_layer("ce_crit") + nerv.info("*** training stat begin ***") + nerv.printf("cross entropy:\t\t%.8f\n", ce_crit.total_ce) + nerv.printf("correct:\t\t%d\n", ce_crit.total_correct) + nerv.printf("frames:\t\t\t%d\n", ce_crit.total_frames) + nerv.printf("err/frm:\t\t%.8f\n", ce_crit.total_ce / ce_crit.total_frames) + nerv.printf("accuracy:\t\t%.3f%%\n", get_accuracy(layer_repo)) + nerv.info("*** training stat end ***") +end + +function print_xent(xent) + local totalframes = xent:totalframes() + local loss = xent:loss() + local correct = xent:correct() + nerv.info_stderr("*** training statistics info begin ***") + nerv.info_stderr("total frames:\t\t%d", totalframes) + nerv.info_stderr("cross entropy:\t%.8f", loss/totalframes) + nerv.info_stderr("frame accuracy:\t%.3f%%", 100*correct/totalframes) + nerv.info_stderr("*** training statistics info end ***") +end + +function frame_acc(xent) + local correct = xent:correct() + local totalframes = xent:totalframes() + return string.format("%.3f", 100*correct/totalframes) +end + +function print_gconf() + nerv.info_stderr("%s \t:= %s", "network", gconf.initialized_param[1]) + nerv.info_stderr("%s \t:= %s", "transf", gconf.initialized_param[2]) + nerv.info_stderr("%s \t:= %s", "batch_size", gconf.batch_size) + nerv.info_stderr("%s \t:= %s", "buffer_size", gconf.buffer_size) + nerv.info_stderr("%s \t:= %s", "lrate", gconf.lrate) + nerv.info_stderr("%s \t:= %s", "tr_scp", gconf.tr_scp) + nerv.info_stderr("%s \t:= %s", "cv_scp", gconf.cv_scp) +end diff --git a/fastnn/lib/ModelSync.c b/fastnn/lib/ModelSync.c new file mode 100644 index 0000000..bd511ea --- /dev/null +++ b/fastnn/lib/ModelSync.c @@ -0,0 +1,305 @@ + +#include "ModelSync.h" +#include "../../nerv/lib/matrix/cuda_helper.h" +#include "../../nerv/lib/matrix/generic/elem_type.h" +#include "common.h" +#include + + +ModelSync* ModelSync_new(void) +{ + ModelSync *self = (ModelSync*)malloc(sizeof(ModelSync)); + if (NULL != self) + { + self->model_mutex = THMutex_new(); + self->state_mutex = THMutex_new(); + self->initialized_ = false; + self->dim_ = 0; + self->pos_ = 0; + self->data_ = NULL; + self->free_data_ = NULL; + self->data_ = NULL; + self->refcount = 1; + self->threadcount = 0; + } + return self; +} + +ModelSync* ModelSync_newWithId(long id) +{ + ModelSync *self = (ModelSync*)id; + __sync_fetch_and_add(&self->refcount, 1); + return self; +} + +long ModelSync_id(ModelSync *self) +{ + return (long)(self); +} + +int ModelSync_lockmodel(ModelSync *self) +{ + if(THMutex_lock(self->model_mutex)) + return 1; + return 0; +} + +int ModelSync_unlockmodel(ModelSync *self) +{ + if(THMutex_unlock(self->model_mutex)) + return 1; + return 0; + +} +int ModelSync_lockstate(ModelSync *self) +{ + if(THMutex_lock(self->state_mutex)) + return 1; + return 0; +} + +int ModelSync_unlockstate(ModelSync *self) +{ + if(THMutex_unlock(self->state_mutex)) + return 1; + return 0; +} + +int ModelSync_free(ModelSync *self) +{ + if (NULL != self && __sync_fetch_and_add(&self->refcount, -1) == 1) + { + free(self->model_mutex); + free(self->state_mutex); + Status status; + CUDA_SAFE_SYNC_CALL(cudaFreeHost(self->free_data_), &status); + free(self); + } +} + +int ModelSync_initBuffer(ModelSync *self) +{ + if (NULL != self) + { + void *free_data = NULL, *data = NULL; + size_t size = self->dim_ * sizeof(float)+16; + Status status; + CUDA_SAFE_SYNC_CALL(cudaHostAlloc((void**) &free_data, size, cudaHostAllocPortable), &status); + NERV_SET_STATUS(&status, NERV_NORMAL, 0); + + data = (free_data ? (void *)( (((unsigned long)*(&free_data)) + 15) & ~0xFUL ) : NULL) ; + if (NULL != data) + { + self->data_ = (float*)(data); + self->free_data_ = (float*)(free_data); + } + return 0; + } + return 1; +} + +int ModelSync_weightfromd(ModelSync *self, Matrix *dm) +{ + + if (NULL != self && NULL != dm) + { + void *host_data_ = (void*)self->data_; + size_t width = dm->ncol * sizeof(float); + size_t src_pitch = dm->stride; + size_t dst_pitch = src_pitch; + Status status; + + CUDA_SAFE_SYNC_CALL(cudaMemcpy2D(host_data_+self->pos_, dst_pitch, dm->data.f, src_pitch, width, dm->nrow, cudaMemcpyDeviceToHost), &status); + NERV_SET_STATUS(&status, NERV_NORMAL, 0); + self->pos_ += dm->nrow * dm->stride; + return 0; + } + return 1; + +} + + +int ModelSync_weighttod(ModelSync *self, Matrix *dm) +{ + + if (NULL != self && NULL != dm) + { + void *host_data_ = (void*)self->data_; + size_t width = dm->ncol * sizeof(float); + size_t dst_pitch = dm->stride; + size_t src_pitch = dst_pitch; + Status status; + + CUDA_SAFE_SYNC_CALL(cudaMemcpy2D(dm->data.f, dst_pitch, host_data_+self->pos_, src_pitch, width, dm->nrow, cudaMemcpyHostToDevice), &status); + NERV_SET_STATUS(&status, NERV_NORMAL, 0); + + self->pos_ += dm->nrow * dm->stride; + self->initialized_ = true; + return 0; + } + return 1; +} + +void ModelSync_syncinc(ModelSync *self) +{ + __sync_fetch_and_add(&self->threadcount, 1); +} + +void ModelSync_syncdec(ModelSync *self) +{ + __sync_fetch_and_add(&self->threadcount, -1); +} + +int ModelSync_threadcount(ModelSync *self) +{ + return self->threadcount; +} + +///////////////////////////////// + +Xent* Xent_new() +{ + Xent *xent = (Xent*)malloc(sizeof(Xent)); + memset(xent, 0, sizeof(Xent)); + xent->refcount = 1; + return xent; +} + +Xent* Xent_newWithId(long id) +{ + Xent *xent = (Xent*)id; + __sync_fetch_and_add(&xent->refcount, 1); + return xent; +} + +Xent* Xent_newWithParm(size_t frames_, size_t correct_, double loss_, double entropy_) +{ + Xent *xent = (Xent*)malloc(sizeof(Xent)); + xent->frames_ = frames_; + xent->correct_ = correct_; + xent->loss_ = loss_; + xent->entropy_ = entropy_; + xent->refcount = 1; + return xent; +} + +long Xent_id(Xent *xent) +{ + return (long)(xent); +} + +Xent* Xent_add(Xent *a, Xent *b) +{ + a->frames_ += b->frames_; + a->correct_ += b->correct_; + a->loss_ += b->loss_; + a->entropy_ += b->entropy_; + return a; +} + +void Xent_free(Xent *xent) +{ + if (NULL != xent && __sync_fetch_and_add(&xent->refcount, -1) == 1) + { + free(xent); + xent = NULL; + } +} + + +////////////////////////////////// + +Mse* Mse_new() +{ + Mse *mse = (Mse*)malloc(sizeof(Mse)); + memset(mse, 0, sizeof(Mse)); + mse->refcount = 1; + return mse; +} + +Mse* Mse_newWithId(long id) +{ + Mse *mse = (Mse*)id; + __sync_fetch_and_add(&mse->refcount, 1); + return mse; +} + +Mse* Mse_newWithParm(size_t frames_, double loss_) +{ + Mse *mse = (Mse*)malloc(sizeof(Mse)); + mse->frames_ = frames_; + mse->loss_ = loss_; + mse->refcount = 1; + return mse; +} + + +long Mse_id(Mse *mse) +{ + return (long)(mse); +} + +Mse* Mse_add(Mse *a, Mse *b) +{ + a->frames_ += b->frames_; + a->loss_ += b->loss_; + return a; +} + +void Mse_free(Mse *mse) +{ + if (NULL != mse && __sync_fetch_and_add(&mse->refcount, -1) == 1) + { + free(mse); + mse = NULL; + } +} + +////////////////////////////////// + +GlobalOption* GlobalOption_new() +{ + GlobalOption *option = (GlobalOption*)malloc(sizeof(GlobalOption)); + option->refcount = 1; + return option; +} + +GlobalOption* GlobalOption_newWithParm(int batch_size, float lrate, bool bp,const char *tr_scp, const char *cv_scp, const char *transf, const char *network) +{ + GlobalOption *option = (GlobalOption*)malloc(sizeof(GlobalOption)); + option->batch_size = batch_size; + option->lrate = lrate; + option->bp = bp; + strncpy(option->tr_scp, tr_scp, strlen(tr_scp)+1); + strncpy(option->cv_scp, cv_scp, strlen(cv_scp)+1); + strncpy(option->transf, transf, strlen(transf)+1); + strncpy(option->network, network, strlen(network)+1); + option->refcount = 1; + + return option; +} + +GlobalOption* GlobalOption_newWithId(long id) +{ + GlobalOption *option = (GlobalOption*)id; + __sync_fetch_and_add(&option->refcount, 1); + return option; +} + + + +long GlobalOption_id(GlobalOption *option) +{ + return (long)(option); +} + +void GlobalOption_free(GlobalOption *option) +{ + if (NULL != option && __sync_fetch_and_add(&option->refcount, -1) == 1) + { + free(option); + option = NULL; + } +} + + diff --git a/fastnn/threads/Makefile b/fastnn/threads/Makefile index 17958f9..4205adc 100644 --- a/fastnn/threads/Makefile +++ b/fastnn/threads/Makefile @@ -35,6 +35,7 @@ $(OBJ_DIR)/%.o: %.c $(patsubst /%.o,/%.c,$@) $(LIBS): $(OBJS) gcc -shared -o $@ $^ $(LDFLAGS) -Wl,-rpath=$(LIB_PATH) -L$(LIB_PATH) -lnervcore -lluaT -lpthread + cp $@ $(LIB_PATH)/ clean: -rm -rf $(OBJ_DIR) diff --git a/nerv/io/sgd_buffer.lua b/nerv/io/sgd_buffer.lua index 3f854f0..65d6da1 100644 --- a/nerv/io/sgd_buffer.lua +++ b/nerv/io/sgd_buffer.lua @@ -57,7 +57,7 @@ function SGDBuffer:saturate() buff.data:copy_from(buff.leftover, 0, lrow) buff.leftover = nil end - nerv.info("buffer leftover: %d\n", lrow) + nerv.info("buffer leftover: %d", lrow) reader.tail = lrow reader.has_leftover = false end @@ -107,7 +107,7 @@ function SGDBuffer:get_data() if not self:saturate() then return nil -- the remaining data cannot build a batch end - nerv.info("%.3fs to fill the buffer", os.clock() - t) + --nerv.info("%.3fs to fill the buffer", os.clock() - t) end if self.head + batch_size > self.tail then return nil -- the remaining data cannot build a batch diff --git a/nerv/lib/common.h b/nerv/lib/common.h index 6878e34..a4e3582 100644 --- a/nerv/lib/common.h +++ b/nerv/lib/common.h @@ -59,6 +59,8 @@ typedef struct Status { nerv_error_status(L, &status); \ } while (0) +#define PROFILE_HASHMAP_SIZE 123457 + typedef struct HashNode { const char *key; void *val; diff --git a/nerv/lib/matrix/cumatrix.c b/nerv/lib/matrix/cumatrix.c index a5991ab..c913db2 100644 --- a/nerv/lib/matrix/cumatrix.c +++ b/nerv/lib/matrix/cumatrix.c @@ -2,7 +2,7 @@ #include "../common.h" #include "cuda_helper.h" #include -#define PROFILE_HASHMAP_SIZE 123457 + static cublasHandle_t cublas_handle; static cudaEvent_t profile_start, profile_stop; static HashMap *profile; diff --git a/nerv/nn/layer_repo.lua b/nerv/nn/layer_repo.lua index ef333a7..8473727 100644 --- a/nerv/nn/layer_repo.lua +++ b/nerv/nn/layer_repo.lua @@ -13,7 +13,7 @@ function LayerRepo:add_layers(layer_spec, param_repo, global_conf) if layers[id] ~= nil then nerv.error("a layer with id %s already exists", id) end - nerv.info("create layer: %s", id) + --nerv.info("create layer: %s", id) if type(spec[2]) ~= "table" then nerv.error("layer config table is need") end -- cgit v1.2.3-70-g09d2