diff options
64 files changed, 5047 insertions, 61 deletions
@@ -1,7 +1,7 @@ -.PHONY: all clean install luajit luarocks speech +.PHONY: all clean install luajit luarocks speech fastnn SHELL := /bin/bash PREFIX := $(CURDIR)/install/ -all: luajit luarocks install +all: luajit luarocks install fastnn luajit: PREFIX=$(PREFIX) ./tools/build_luajit.sh luarocks: @@ -11,5 +11,8 @@ install: speech: cd speech/speech_utils; $(PREFIX)/bin/luarocks make cd speech/htk_io; $(PREFIX)/bin/luarocks make +fastnn: + cd fastnn/threads; $(PREFIX)/bin/luarocks make + cd fastnn; $(PREFIX)/bin/luarocks make clean: cd nerv && make clean diff --git a/fastnn/Makefile b/fastnn/Makefile new file mode 100644 index 0000000..1f1ba1e --- /dev/null +++ b/fastnn/Makefile @@ -0,0 +1,58 @@ +.PHONY: build install clean +SHELL := /bin/bash +BUILD_DIR := $(CURDIR)/build +LUA_DIR = $(INST_LUADIR)/fastnn +CUDA_BASE := /usr/local/cuda-6.5 + +INC_PATH := -I $(LUA_BINDIR)/../include/nerv +CUDA_INCLUDE := -I $(CUDA_BASE)/include/ +LIB_PATH := $(LUA_BINDIR)/../lib + +OBJ_DIR := $(BUILD_DIR)/objs +SUBDIR := lib io threads device + + +OBJS := init.o lib/modelsync.o lib/ModelSync.o io/example.o io/Example.o device/device.o device/Device.o +LIBS := $(INST_LIBDIR)/libfastnn.so +LUA_LIBS := init.lua lib/modelsync.lua io/example.lua device/device.lua +INCLUDE := -I $(LUA_INCDIR) $(INC_PATH) $(CUDA_INCLUDE) -DLUA_USE_APICHECK -DUSE_PTHREAD_THREADS +LDFLAGS := -L$(CUDA_BASE)/lib64/ -Wl,-rpath=$(CUDA_BASE)/lib64/ -lcudart -lcublas + + +OBJS := $(addprefix $(OBJ_DIR)/,$(OBJS)) +OBJ_SUBDIR := $(addprefix $(OBJ_DIR)/,$(SUBDIR)) +LUA_SUBDIR := $(addprefix $(LUA_DIR)/,$(SUBDIR)) +LUA_LIBS := $(addprefix $(LUA_DIR)/,$(LUA_LIBS)) + +CFLAGS := -Wall -Wextra -g -O0 + +build: $(OBJ_DIR) $(OBJ_SUBDIR) $(OBJS) +install: $(LIBS) $(LUA_DIR) $(LUA_SUBDIR) $(LUA_LIBS) + +#$(SUBMODULE): ECHO +# cd threads; $(LUA_BINDIR)/luarocks make + +$(OBJ_DIR) $(LUA_DIR) $(OBJ_SUBDIR) $(LUA_SUBDIR): + -mkdir -p $@ + +$(LUA_DIR)/%.lua: %.lua + cp $< $@ + +$(OBJ_DIR)/io/Example.o: io/Example.cpp + gcc -c -o $@ $< $(INCLUDE) -fPIC $(CFLAGS) +$(OBJ_DIR)/device/Device.o: device/Device.cpp + gcc -c -o $@ $< $(INCLUDE) -fPIC $(CFLAGS) + +$(OBJ_DIR)/%.o: %.c $(patsubst /%.o,/%.c,$@) + gcc -c -o $@ $< $(INCLUDE) -fPIC $(CFLAGS) + +$(LIBS): $(OBJS) + gcc -shared -o $@ $^ $(LDFLAGS) -Wl,-rpath=$(LIB_PATH) -L$(LIB_PATH) -lthreads -lnervcore -lluaT -lpthread -lstdc++ + +ECHO: + @echo $(SUBMODULE) + +clean: + -rm -rf $(OBJ_DIR) + + diff --git a/fastnn/device/Device.cpp b/fastnn/device/Device.cpp new file mode 100644 index 0000000..3b69086 --- /dev/null +++ b/fastnn/device/Device.cpp @@ -0,0 +1,496 @@ + +#include <vector> +#include <string> +#include <iostream> +#include "../../nerv/lib/matrix/cuda_helper.h" +#include <dlfcn.h> + + +extern "C" { + + +#include "Device.h" +#include "stdlib.h" +#include "string.h" + + +struct GPUInfo +{ + int gpuid; + float d2h_bandwidth; + float h2d_bandwidth; + size_t mem_free; + size_t mem_total; + float mem_ratio; + bool used; +}; + +struct MPIGPUInfo +{ + char hostname[STRLEN]; + int gpuid; + int myid; +}; + +struct Device +{ + std::vector<GPUInfo> gpuinfo_; + int refcount; +}; + +/////////////////////////////////////////// + +extern cublasHandle_t* nerv_get_cublas_handle(); + +CuContext* CuContext_new() +{ + CuContext *context = new CuContext; + cublasCreate(&context->cublas_handle); + cudaEventCreate(&context->profile_start); + cudaEventCreate(&context->profile_stop); + context->profile = hashmap_create(PROFILE_HASHMAP_SIZE, bkdr_hash, strcmp); + context->refcount = 1; + context->pid = pthread_self(); + return context; +} + +CuContext* CuContext_newWithId(long id) +{ + CuContext *context = (CuContext*)id; + __sync_fetch_and_add(&context->refcount, 1); + return context; +} + +long CuContext_id(CuContext *context) +{ + return (long)context; +} + +void CuContext_destroy(CuContext *context) +{ + if (NULL != context && __sync_fetch_and_add(&context->refcount, -1) == 1) + { + cublasDestroy(context->cublas_handle); + hashmap_clear(context->profile); + delete context; + context = NULL; + } +} + +Device* Device_new() +{ + Device* device = new Device; + device->refcount = 1; + return device; +} + +Device* Device_newWithId(long id) +{ + Device *device = (Device*)id; + __sync_fetch_and_add(&device->refcount, 1); + return device; +} + +long Device_id(Device *device) +{ + return (long)device; +} + +void Device_destroy(Device *device) +{ + if (NULL != device && __sync_fetch_and_add(&device->refcount, -1) == 1) + { + delete device; + device = NULL; + } +} + +void GetFreeMemory(size_t* free, size_t* total, Status *status) +{ + // WARNING! the CUDA API is inconsistent accross versions! + #if (CUDA_VERSION >= 3020) + //define the function signature type + size_t mem_free, mem_total; +#else + unsigned int mem_free, mem_total; +#endif + { + //we will load the cuMemGetInfo dynamically from libcuda.so + //cuMemGetInfo(&mem_free, &mem_total); + //pre-fill ``safe'' values that will not cause problems + mem_free = 1; mem_total = 1; + //open libcuda.so + void* libcuda = dlopen("libcuda.so",RTLD_LAZY); + if(NULL == libcuda) + { + NERV_EXIT_STATUS(status, MAT_CUDA_ERR, "Cannot open libcuda.so"); + } + else + { + //define the function signature type + //and get the symbol +#if (CUDA_VERSION >= 3020) + typedef CUresult (*cu_fun_ptr)(size_t*, size_t*); + cu_fun_ptr dl_cuMemGetInfo = (cu_fun_ptr)dlsym(libcuda,"cuMemGetInfo_v2"); +#else + typedef CUresult (*cu_fun_ptr)(int*, int*); + cu_fun_ptr dl_cuMemGetInfo = (cu_fun_ptr)dlsym(libcuda,"cuMemGetInfo"); +#endif + if(NULL == dl_cuMemGetInfo) + { + NERV_EXIT_STATUS(status, MAT_CUDA_ERR, "Cannot load cuMemGetInfo from libcuda.so"); + } + else + { + //call the function + dl_cuMemGetInfo(&mem_free, &mem_total); + } + //close the library + dlclose(libcuda); + } + } + // copy the output values outside + if(NULL != free) *free = mem_free; + if(NULL != total) *total = mem_total; +} + +void DeviceGetName(char* name, int len, int dev, Status *status) +{ + //prefill with something reasonable + strncpy(name,"Unknown GPU",len); + //open libcuda.so + void* libcuda = dlopen("libcuda.so",RTLD_LAZY); + if(NULL == libcuda) + { + NERV_EXIT_STATUS(status, MAT_CUDA_ERR, "Cannot open libcuda.so"); + } + else + { + //define the function signature type + typedef CUresult (*cu_fun_ptr)(char*,int,CUdevice); + //get the symbol + cu_fun_ptr cuDeviceGetName_ptr = (cu_fun_ptr)dlsym(libcuda,"cuDeviceGetName"); + if(NULL == cuDeviceGetName_ptr) { + NERV_EXIT_STATUS(status, MAT_CUDA_ERR, "Cannot load cuDeviceGetName from libcuda.so"); + } + else { + //call the function + cuDeviceGetName_ptr(name, len, dev); + } + //close the library + dlclose(libcuda); + } +} + +void GetBandwidth(int gpu_idx, float *d2h, float *h2d, Status *status) +{ + int idx = gpu_idx; + int memSize = 64*1024*1024; + float elapsedTimeInMs = 0.0f; + float bandwidthInMBs = 0.0f; + unsigned char *h_idata = NULL; + unsigned char *h_odata = NULL; + cudaEvent_t start, stop; + bool PINNED = true; + int MEMCOPY_ITERATIONS = 5; + + CUDA_SAFE_SYNC_CALL(cudaEventCreate(&start), status); + CUDA_SAFE_SYNC_CALL(cudaEventCreate(&stop), status); + + //allocate host memory + if (PINNED) + { + #if CUDART_VERSION >= 2020 + CUDA_SAFE_SYNC_CALL(cudaHostAlloc((void **)&h_idata, memSize, cudaHostAllocPortable), status); + CUDA_SAFE_SYNC_CALL(cudaHostAlloc((void **)&h_odata, memSize, cudaHostAllocPortable), status); + #else + CUDA_SAFE_SYNC_CALL(cudaMallocHost((void **)&h_idata, memSize), status); + CUDA_SAFE_SYNC_CALL(cudaMallocHost((void **)&h_odata, memSize), status); + #endif + } + else + { + //pageable memory mode - use malloc + h_odata = (unsigned char *)malloc(memSize); + + if (h_odata == 0) + { + NERV_EXIT_STATUS(status, MAT_CUDA_ERR, "Not enough memory available on host to run test!"); + } + } + + //initialize the memory + for (unsigned int i = 0; i < memSize/sizeof(unsigned char); i++) + { + h_idata[i] = (unsigned char)(i & 0xff); + } + + // allocate device memory + unsigned char *d_idata; + CUDA_SAFE_SYNC_CALL(cudaMalloc((void **) &d_idata, memSize), status); + + //initialize the device memory + CUDA_SAFE_SYNC_CALL(cudaMemcpy(d_idata, h_idata, memSize,cudaMemcpyHostToDevice), status); + + //copy data from GPU to Host + + CUDA_SAFE_SYNC_CALL(cudaEventRecord(start, 0), status); + + if (PINNED) + { + for (unsigned int i = 0; i < MEMCOPY_ITERATIONS; i++) + { + CUDA_SAFE_SYNC_CALL(cudaMemcpyAsync(h_odata, d_idata, memSize,cudaMemcpyDeviceToHost, 0), status); + } + } + else + { + for (unsigned int i = 0; i < MEMCOPY_ITERATIONS; i++) + { + CUDA_SAFE_SYNC_CALL(cudaMemcpy(h_odata, d_idata, memSize,cudaMemcpyDeviceToHost), status); + } + } + + CUDA_SAFE_SYNC_CALL(cudaEventRecord(stop, 0), status); + + // make sure GPU has finished copying + CUDA_SAFE_SYNC_CALL(cudaDeviceSynchronize(), status); + //get the the total elapsed time in ms + + CUDA_SAFE_SYNC_CALL(cudaEventElapsedTime(&elapsedTimeInMs, start, stop), status); + + + //calculate bandwidth in MB/s + bandwidthInMBs = (1e3f * memSize * (float)MEMCOPY_ITERATIONS) / (elapsedTimeInMs * (float)(1 << 20)); + *d2h = bandwidthInMBs; + + ///////////////////////////////////////////////////// + //copy data from Host to GPU + CUDA_SAFE_SYNC_CALL(cudaEventRecord(start, 0), status); + + if (PINNED) + { + for (unsigned int i = 0; i < MEMCOPY_ITERATIONS; i++) + { + CUDA_SAFE_SYNC_CALL(cudaMemcpyAsync(d_idata, h_odata, memSize,cudaMemcpyHostToDevice, 0), status); + } + } + else + { + for (unsigned int i = 0; i < MEMCOPY_ITERATIONS; i++) + { + CUDA_SAFE_SYNC_CALL(cudaMemcpy(d_idata, h_odata, memSize,cudaMemcpyHostToDevice), status); + } + } + + CUDA_SAFE_SYNC_CALL(cudaEventRecord(stop, 0), status); + + // make sure GPU has finished copying + CUDA_SAFE_SYNC_CALL(cudaDeviceSynchronize(), status); + //get the the total elapsed time in ms + + CUDA_SAFE_SYNC_CALL(cudaEventElapsedTime(&elapsedTimeInMs, start, stop), status); + + + //calculate bandwidth in MB/s + bandwidthInMBs = (1e3f * memSize * (float)MEMCOPY_ITERATIONS) / (elapsedTimeInMs * (float)(1 << 20)); + *h2d = bandwidthInMBs; + + //clean up memory + CUDA_SAFE_SYNC_CALL(cudaEventDestroy(stop), status); + CUDA_SAFE_SYNC_CALL(cudaEventDestroy(start), status); + + if (PINNED) + { + CUDA_SAFE_SYNC_CALL(cudaFreeHost(h_idata), status); + CUDA_SAFE_SYNC_CALL(cudaFreeHost(h_odata), status); + } + else + { + free(h_idata); + free(h_odata); + } + + CUDA_SAFE_SYNC_CALL(cudaFree(d_idata), status); +} + +int AutoSelectGPU(Device *device, Status *status) +{ + //GPU selection is based on largest proportion of free memory. + + int max_id = 0, n_gpu = 0; + std::vector<GPUInfo> &gpuinfo_ = device->gpuinfo_; + + cudaGetDeviceCount(&n_gpu); + + if(n_gpu == 0) + { + NERV_SET_STATUS(status, MAT_CUDA_ERR, "No CUDA devices found"); + return -1; + } + + if (n_gpu > 0) + { + for (int i = 0; i < gpuinfo_.size(); i++) + { + if (!gpuinfo_[i].used) + { + max_id = i; + break; + } + } + //find GPU with max free memory + for (int n = 1; n < gpuinfo_.size(); n++) + { + if (!gpuinfo_[n].used && gpuinfo_[n].mem_ratio > gpuinfo_[max_id].mem_ratio) + max_id = n; + } + + + std::cerr << "Selected device: " << max_id << " (automatically)" << std::endl; + + std::cerr << "free: " << gpuinfo_[max_id].mem_free/1024/1024 << "M, " + << "total: "<< gpuinfo_[max_id].mem_total/1024/1024 << "M, " + << "ratio: "<< gpuinfo_[max_id].mem_ratio << " " + << "d2h bandwidth: " << gpuinfo_[max_id].d2h_bandwidth << "MB/s, " + << "h2d bandwidth: "<< gpuinfo_[max_id].h2d_bandwidth << "MB/s" << std::endl; + + cudaSetDevice(max_id); + //initialize the CUBLAS + //cublasInit(); + //cublasHandle_t *cublas_handle = nerv_get_cublas_handle(); + //std::cerr << "cublasHandle_t: " << cublas_handle << std::endl; + //cublasCreate(cublas_handle); + + //create the context + cudaError_t e; + e = cudaThreadSynchronize(); //deprecated, but for legacy not cudaDeviceSynchronize + if(e != cudaSuccess) { + std::cerr << "Failed to create CUDA context on a GPU." << std::endl; + } + } + + gpuinfo_[max_id].used = true; + NERV_SET_STATUS(status, NERV_NORMAL, 0); + return max_id; +} + +void SelectGPU(Device *device, int gpu_id, Status *status) +{ + + std::vector<GPUInfo> &gpuinfo_ = device->gpuinfo_; + int n_gpu = 0; + cudaGetDeviceCount(&n_gpu); + if(gpu_id >= n_gpu) + { + NERV_EXIT_STATUS(status, MAT_CUDA_ERR, "Cannot select GPU CUDA capable cards!"); + std::cerr << "Cannot select GPU " << gpu_id + << ", detected " << n_gpu << " CUDA capable cards!" << std::endl; + } + + + std::cerr << "Selected device: " << gpu_id << " (manually)"; + + std::cerr << "free: " << gpuinfo_[gpu_id].mem_free/1024/1024 << "M, " + << "total: "<< gpuinfo_[gpu_id].mem_total/1024/1024 << "M, " + << "ratio: "<< gpuinfo_[gpu_id].mem_ratio << " " + << "d2h bandwidth: " << gpuinfo_[gpu_id].d2h_bandwidth << "MB/s, " + << "h2d bandwidth: "<< gpuinfo_[gpu_id].h2d_bandwidth << "MB/s" << std::endl; + + CUDA_SAFE_SYNC_CALL(cudaSetDevice(gpu_id), status); + //initialize the CUBLAS + //CUBLAS_SAFE_SYNC_CALL(cublasInit(), status); + //cublasHandle_t *cublas_handle = nerv_get_cublas_handle(); + //cublasCreate(cublas_handle); + + //create the context + cudaError_t e; + e = cudaThreadSynchronize(); //deprecated, but for legacy not cudaDeviceSynchronize + if(e != cudaSuccess) { + NERV_EXIT_STATUS(status, MAT_CUDA_ERR, "Failed to create CUDA context on a GPU."); + std::cerr << "Failed to create CUDA context on a GPU."; + } + + gpuinfo_[gpu_id].used = true; + + NERV_SET_STATUS(status, NERV_NORMAL, 0); +} + +void Initialize(Device *device, Status *status) +{ + // Check that we have at least one gpu + int n_gpu = 0; + cudaGetDeviceCount(&n_gpu); + if(n_gpu == 0) + NERV_EXIT_STATUS(status, MAT_CUDA_ERR, "No CUDA devices found"); + + std::vector<GPUInfo> &gpuinfo_ = device->gpuinfo_; + gpuinfo_.resize(n_gpu); + + std::cerr << "gpu information ..." << std::endl; + + // Get ratios of memory use, if possible + for(int n = 0; n < n_gpu; n++) + { + int ret = cudaSetDevice(n); + switch(ret) + { + case cudaSuccess : { + //create the CUDA context for the thread + //cudaThreadSynchronize(); //deprecated, but for legacy not cudaDeviceSynchronize + cudaDeviceSynchronize(); + //get GPU name + char name[STRLEN]; + DeviceGetName(name,STRLEN,n, status); + //get GPU memory stats + size_t free, total; + float d2h, h2d; + + GetFreeMemory(&free, &total, status); + GetBandwidth(n, &d2h, &h2d, status); + + gpuinfo_[n].gpuid = n; + gpuinfo_[n].d2h_bandwidth = d2h; + gpuinfo_[n].h2d_bandwidth = h2d; + gpuinfo_[n].mem_free = free; + gpuinfo_[n].mem_total = total; + gpuinfo_[n].mem_ratio = free/(float)total; + gpuinfo_[n].used = false; + + std::cerr << "gpu: " << n << " ==> " + << "free: " << free/1024/1024 << "M, " + << "total: "<< total/1024/1024 << "M, " + << "ratio: "<< free/(float)total << " " + << "d2h bandwidth: " << d2h << "MB/s, " + << "h2d bandwidth: "<< h2d << "MB/s" << std::endl; + + //destroy the CUDA context for the thread + //cudaThreadExit(); //deprecated, but for legacy reason not cudaDeviceReset + } break; + + #if (CUDA_VERSION > 3020) + case cudaErrorDeviceAlreadyInUse : + std::cerr << "cudaSetDevice(" << n << "): " + << "Device cannot be accessed, used EXCLUSIVE-THREAD mode..." << std::endl; + break; + #endif + case cudaErrorInvalidDevice : + std::cerr << "cudaSetDevice(" << n << "): " + << "Device cannot be accessed, not a VALID CUDA device!" << std::endl; + break; + default : + std::cerr << "cudaSetDevice(" << n << "): " + << "returned " << ret << ", " + << cudaGetErrorString((cudaError_t)ret) << std::endl; + } + } + cudaDeviceReset(); + printf("Result = PASS\n"); + NERV_SET_STATUS(status, NERV_NORMAL, 0); +} + + + + + +} // extern diff --git a/fastnn/device/Device.h b/fastnn/device/Device.h new file mode 100644 index 0000000..9a80128 --- /dev/null +++ b/fastnn/device/Device.h @@ -0,0 +1,51 @@ + +#ifndef NERV_FASTNN_EXAMPLE_H +#define NERV_FASTNN_EXAMPLE_H + + +#ifdef __cplusplus +extern "C" { +#endif + +#include "matrix/matrix.h" +#include "stdbool.h" +#include "../../nerv/lib/matrix/cuda_helper.h" +#include "../../nerv/lib/common.h" +#include "../../nerv/lib/matrix/cumatrix.h" +#define STRLEN 1024 + + + +typedef struct GPUInfo GPUInfo; + +typedef struct MPIGPUInfo MPIGPUInfo; + +typedef struct Device Device; + +CuContext* CuContext_new(); +CuContext* CuContext_newWithId(long id); +long CuContext_id(CuContext *context); +void CuContext_destroy(CuContext *context); + + +Device* Device_new(); +Device* Device_newWithId(long); +long Device_id(Device *device); +void Device_detroy(Device *device); +void Initialize(Device *device, Status *status); +void GetFreeMemory(size_t* free, size_t* total, Status *status); +void DeviceGetName(char* name, int len, int dev, Status *status); +void GetBandwidth(int gpu_idx, float *d2h, float *h2d, Status *status); +int AutoSelectGPU(Device *device, Status *status); +void SelectGPU(Device *device, int gpu_id, Status *status); + + + + +#ifdef __cplusplus +} // closing brace for extern "C" + +#endif // end Device + +#endif + diff --git a/fastnn/device/device.c b/fastnn/device/device.c new file mode 100644 index 0000000..71d6ec1 --- /dev/null +++ b/fastnn/device/device.c @@ -0,0 +1,178 @@ + +#include <stdio.h> +#include <stdlib.h> + +#include <lua.h> +#include <lualib.h> +#include <luaT/luaT.h> + +#include "../threads/lib/luaTHRD.h" +#include "Device.h" + + +const char *fastnn_device_tname = "fastnn.CDevice"; + + +static int device_new(lua_State *L) +{ + Device *device = NULL; + if(lua_gettop(L) == 0) + { + device = Device_new(); + } + else if(lua_gettop(L) == 1) + { + long id = luaL_checkinteger(L, 1); + device = Device_newWithId(id); + } + else + luaL_error(L, "device: device new invalid arguments"); + if (!device) + luaL_error(L, "device: device failed"); + + luaTHRD_pushudata(L, device, fastnn_device_tname); + + return 1; +} + +static int device_init(lua_State *L) +{ + Device *device = luaTHRD_checkudata(L, 1, fastnn_device_tname); + Status status; + Initialize(device, &status); + NERV_LUA_CHECK_STATUS(L, status); + return 0; +} + +static int device_select_gpu(lua_State *L) +{ + Device *device = luaTHRD_checkudata(L, 1, fastnn_device_tname); + + Status status; + if(lua_gettop(L) == 2) + { + int gpuid = luaL_checkinteger(L, 2); + SelectGPU(device, gpuid, &status); + NERV_LUA_CHECK_STATUS(L, status); + return 0; + } + else if(lua_gettop(L) == 1) + { + int gpuid = AutoSelectGPU(device, &status); + NERV_LUA_CHECK_STATUS(L, status); + lua_pushinteger(L, gpuid); + return 1; + } + else + luaL_error(L, "device: device select gpu failed"); +} + +static int device_id(lua_State *L) +{ + Device *device = luaTHRD_checkudata(L, 1, fastnn_device_tname); + lua_pushinteger(L, Device_id(device)); + return 1; +} + +static int device_tostring(lua_State *L) +{ + char str[STRLEN]; + Device* device = luaTHRD_checkudata(L, 1, fastnn_device_tname); + snprintf(str, STRLEN, "%s <%lx>", fastnn_device_tname, Device_id(device)); + lua_pushstring(L, str); + return 1; +} + +static int device_destroy(lua_State *L) +{ + Device *device = luaTHRD_checkudata(L, 1, fastnn_device_tname); + Device_destroy(device); + //printf("device_destroy ... end\n"); + return 0; +} + + +////////////////////////////////////////////// + +static int context_new(lua_State *L) +{ + CuContext *context = NULL; + if(lua_gettop(L) == 0) + { + context = CuContext_new(); + } + else if(lua_gettop(L) == 1) + { + long id = luaL_checkinteger(L, 1); + context = CuContext_newWithId(id); + } + else + luaL_error(L, "device: context new invalid arguments"); + if (!context) + luaL_error(L, "device: context failed"); + + luaTHRD_pushudata(L, context, nerv_context_tname); + + return 1; +} + + +static int context_id(lua_State *L) +{ + CuContext *context = luaTHRD_checkudata(L, 1, nerv_context_tname); + lua_pushinteger(L, CuContext_id(context)); + return 1; +} + +static int context_tostring(lua_State *L) +{ + char str[STRLEN]; + CuContext* context = luaTHRD_checkudata(L, 1, nerv_context_tname); + snprintf(str, STRLEN, "%s <%lx>", nerv_context_tname, CuContext_id(context)); + lua_pushstring(L, str); + return 1; +} + +static int context_destroy(lua_State *L) +{ + CuContext* context = luaTHRD_checkudata(L, 1, nerv_context_tname); + CuContext_destroy(context); + return 0; +} + + +static const struct luaL_Reg device__ [] = { + {"new", device_new}, + {"__tostring", device_tostring}, + {"id", device_id}, + {"init", device_init}, + {"select_gpu", device_select_gpu}, + {"free", device_destroy}, + {NULL, NULL} +}; + +static const struct luaL_Reg context__ [] = { + {"new", context_new}, + {"__tostring", context_tostring}, + {"id", context_id}, + {"free", context_destroy}, + {NULL, NULL} +}; + + +void fastnn_init_device(lua_State *L) +{ + luaT_newmetatable(L, fastnn_device_tname, NULL, device_new, device_destroy, NULL); + luaL_register(L, NULL, device__); + lua_pop(L, 1); +} + +void fastnn_init_context(lua_State *L) +{ + luaT_newmetatable(L, nerv_context_tname, NULL, context_new, context_destroy, NULL); + luaL_register(L, NULL, context__); + lua_pop(L, 1); +} + + + diff --git a/fastnn/device/device.lua b/fastnn/device/device.lua new file mode 100644 index 0000000..d3dea73 --- /dev/null +++ b/fastnn/device/device.lua @@ -0,0 +1,6 @@ + +local C = require 'libfastnn' + +fastnn.CDevice = C.CDevice + + diff --git a/fastnn/example/asgd_data_trainer.lua b/fastnn/example/asgd_data_trainer.lua new file mode 100644 index 0000000..33d579a --- /dev/null +++ b/fastnn/example/asgd_data_trainer.lua @@ -0,0 +1,405 @@ +require 'fastnn' +require 'libhtkio' +require 'threads' + +dofile("fastnn/fastnn_baseline.lua") + +env = string.format([[ +package.path="/home/slhome/wd007/.luarocks/share/lua/5.1/?.lua;/home/slhome/wd007/.luarocks/share/lua/5.1/?/init.lua;/sgfs/users/wd007/src/nerv/install/share/lua/5.1/?.lua;/sgfs/users/wd007/src/nerv/install/share/lua/5.1/?/init.lua;"..package.path; +package.cpath="/home/slhome/wd007/.luarocks/lib/lua/5.1/?.so;/sgfs/users/wd007/src/nerv/install/lib/lua/5.1/?.so;"..package.cpath +local k,l,_=pcall(require,"luarocks.loader") _=k and l.add_context("nerv","scm-1") +]]) + + + +local data_thread_code = [[ +%s + +require 'nerv' +require 'fastnn' +dofile("fastnn/fastnn_baseline.lua") +os.execute("export MALLOC_CHECK_=0") + +local thread_idx = %d +local example_repo_shareid = %d +local data_mutex_shareid = %d +local feat_repo_shareid = %d +local gpu_shareid = %d +local batch_size = %d +local bp = %d +local scp_file = '%s' + +local share_mutex = threads.Mutex(data_mutex_shareid) +local share_example_repo = fastnn.CExamplesRepo(example_repo_shareid, true) +local share_gpu = fastnn.CDevice(gpu_shareid) + +--print(thread_idx) +--print(share_mutex) +--print(share_gpu) +--print(share_example_repo) + +if bp == 0 then + bp = false +else + bp = true +end +gconf.randomize = bp +--print(gconf.randomize) + +share_mutex:lock() +local gpuid = share_example_repo:get_gpuid() +if gpuid < 0 then + gpuid = share_gpu:select_gpu() + share_example_repo:set_gpuid(gpuid) +else + share_gpu:select_gpu(gpuid) +end + +nerv.info_stderr("thread %%d loading transf ...", thread_idx) +local param_transf_repo = nerv.ParamRepo() +param_transf_repo:import(gconf.transf, nil, gconf) +local transf_node_repo = make_transf_node_repo(param_transf_repo) +local transf_layer_repo = make_transf_link_repo(transf_node_repo, param_transf_repo) +local transf = transf_layer_repo:get_layer("global_transf") +share_mutex:unlock() + +local feat_id = get_feat_id() + +local buffer = make_buffer(make_readers(scp_file, transf_layer_repo, feat_repo_shareid, data_mutex_shareid)) + + local t = 1; + for data in buffer.get_data, buffer do + local example = fastnn.Example:PrepareData(data, nil, feat_id) + --print(string.format("Accept NO.%%d %%s", t, example)); t = t+1; + share_example_repo:accept(example) + --print("share_example_repo:accept") + + -- collect garbage in-time to save GPU memory + collectgarbage("collect") + end + share_example_repo:done() +-- print("share_example_repo:done") + +]] + + +train_thread_code = [[ +%s + +require 'nerv' +require 'fastnn' +dofile("fastnn/fastnn_baseline.lua") +os.execute("export MALLOC_CHECK_=0") + +local thread_idx = %d +local example_repo_shareid = %d +local data_mutex_shareid = %d +local master_shareid = %d +local gpu_shareid = %d +local xent_shareid = %d +local batch_size = %d +local lrate = %f +local bp = %d +local nnet_in = '%s' +local nnet_out = '%s' + +local share_example_repo = fastnn.CExamplesRepo(example_repo_shareid, true) +local share_mutex = threads.Mutex(data_mutex_shareid) +local share_master = fastnn.ModelSync(master_shareid) +local share_gpu = fastnn.CDevice(gpu_shareid) +local share_xent = fastnn.CXent(xent_shareid) + +if bp == 0 then + bp = false +else + bp = true +end + +gconf.randomize = bp +gconf.lrate = lrate +gconf.batch_size = batch_size +gconf.network[1] = nnet_in +nerv.info_stderr("input network: %%s", gconf.network[1]) +nerv.info_stderr(gconf.randomize) +nerv.info_stderr("input batch_size: %%d", gconf.batch_size) +nerv.info_stderr("input lrate: %%f", gconf.lrate) + +share_mutex:lock() +local gpuid = share_example_repo:get_gpuid() +if gpuid < 0 then + gpuid = share_gpu:select_gpu() + share_example_repo:set_gpuid(gpuid) +else + share_gpu:select_gpu(gpuid) +end + +nerv.context = nerv.CCuContext() +--print(nerv.context) + + +nerv.info_stderr("thread %%d loading network ...", thread_idx) +local param_network_repo = nerv.ParamRepo() +param_network_repo:import(gconf.network, nil, gconf) +local network_node_repo = make_network_node_repo(param_network_repo) +local network_layer_repo = make_network_link_repo(network_node_repo, param_network_repo) +local network = get_network(network_layer_repo) +share_mutex:unlock() + + + local input_order = get_input_order() + + -- initialize the network + network:init(gconf.batch_size) + gconf.cnt = 0 + err_input = {nerv.CuMatrixFloat(gconf.batch_size, 1)} + err_input[1]:fill(1) + + share_master:Initialize(network) + share_master:SyncInc() + + for example in share_example_repo.provide, share_example_repo do + + gconf.cnt = gconf.cnt + 1 + if gconf.cnt == 2000 then + print_stat(network_node_repo) + gconf.cnt = 0 + end + + local input = {} + local n = example:size() + for i = 0, n-1 do + table.insert(input, example:at(i)) + end + + local output = {nerv.CuMatrixFloat(gconf.batch_size, 1)} + err_output = {input[1]:create()} + network:propagate(input, output) + + if bp then + network:back_propagate(err_input, err_output, input, output) + network:gradient(err_input, input, output) + + share_master:LockModel() + share_master:WeightToD(network) + network:update_gradient() + share_master:WeightFromD(network) + share_master:UnLockModel() + end + + -- collect garbage in-time to save GPU memory + collectgarbage("collect") + end + + --print_stat(network_node_repo) + local ce_crit = network_node_repo:get_layer("ce_crit") + local xent = fastnn.CXent(ce_crit.total_frames, ce_crit.total_correct, ce_crit.total_ce, ce_crit.total_ce) + + share_master:LockModel() + share_xent:add(xent) + share_master:SyncDec() + --print(string.format("ThreadCount: %%d", share_master:ThreadCount())) + if share_master:ThreadCount() == 0 and bp then + share_master:WeightToD(network) + local fname = string.format("%%s_tr%%.3f", + nnet_out, frame_acc(share_xent)) + nerv.info_stderr("writing back %%s ...", fname) + network:get_params():export(fname, nil) + end + share_master:UnLockModel() +]] + +function get_data_thread(data_thread_code, env, thread_idx, example_repo_shareid, + data_mutex_shareid, feat_repo_shareid, gpu_shareid, + batch_size, bp, scp_file) + return string.format(data_thread_code, env, thread_idx, example_repo_shareid, + data_mutex_shareid, feat_repo_shareid, gpu_shareid, + batch_size, bp, scp_file) +end + +function get_train_thread(train_thread_code, env, thread_idx, example_repo_shareid, + data_mutex_shareid, master_shareid, gpu_shareid, xent_shareid, + batch_size, lrate, bp, nnet_in, nnet_out) + return string.format(train_thread_code, env, thread_idx, example_repo_shareid, + data_mutex_shareid, master_shareid, gpu_shareid, xent_shareid, + batch_size, lrate, bp, nnet_in, nnet_out) +end + +function trainer(batch_size, lrate, bp, scp_file, nnet_in, nnet_out, num_threads) + local train_threads={} + local trainer = {} + local data_threads = {} + local data = {} + local num_threads=num_threads + + local data_mutex = threads.Mutex() + local data_mutex_shareid = data_mutex:id() + + local master = fastnn.CModelSync() + local master_shareid = master:id() + --print(master) + + local xent = fastnn.CXent() + local xent_shareid = xent:id() + --print(xent) + + local gpu = fastnn.CDevice() + local gpu_shareid = gpu:id() + --print(gpu_shareid) + gpu:init() + + local example_repo = {} + local example_repo_shareid = {} + + local feat_repo = nerv.TNetFeatureRepo(scp_file, gconf.htk_conf, gconf.frm_ext) + local feat_repo_shareid = feat_repo:id() + + for i=1,num_threads,1 do + example_repo[i] = fastnn.CExamplesRepo(128, false) + example_repo_shareid[i] = example_repo[i]:id() + + data_threads[i] = get_data_thread(data_thread_code, env, i, example_repo_shareid[i], + data_mutex_shareid, feat_repo_shareid, gpu_shareid, + batch_size, bp, scp_file) + + train_threads[i] = get_train_thread(train_thread_code, env, i, example_repo_shareid[i], + data_mutex_shareid, master_shareid, gpu_shareid, xent_shareid, + batch_size, lrate, bp, nnet_in, nnet_out) + --print(train_threads[i]) + data[i] = threads.Thread(data_threads[i]) + trainer[i] = threads.Thread(train_threads[i]) + end + + nerv.info_stderr('| waiting for thread...') + + for i=1,num_threads,1 do + data[i]:free() + trainer[i]:free() + end + + print_xent(xent) + + nerv.info_stderr('| all thread finished!') + + return frame_acc(xent) +end + +function get_filename(fname) + return string.gsub((string.gsub(fname, "(.*/)(.*)", "%2")),"(.*)%..*", "%1") +end + +function do_sds(tr_scp, sds_scp, sds_rate) + math.randomseed(os.time()) + local scp_file = io.open(tr_scp, "r") + local sds_file = io.open(sds_scp, "w") + for line in scp_file:lines() do + rate = math.random() + if (rate < sds_rate) then + sds_file:write(line.."\n") + end + end + scp_file:close() + sds_file:close() +end + +function print_tag(iter) + io.stderr:write(string.format("########################################################\n")) + io.stderr:write(string.format("# NN TRAINING ITERATION:%d, %s\n", iter, os.date())) + io.stderr:write(string.format("########################################################\n")) +end + + +start_halving_inc = 0.5 +halving_factor = 0.8 +end_halving_inc = 0.1 +min_iter = 1 +max_iter = 20 +min_halving = 0 +gconf.batch_size = 256 +pf0 = get_filename(gconf.network[1]) +nnet_in = gconf.network[1] +nnet_out = "" +sds_scp = "tr_sds_"..string.format("%.4d", math.random()*10000)..".scp" --"tr_sds.scp" +sds_factor = 0.4 +num_threads = 1 +global_option = nil + +print_gconf() +os.execute("export MALLOC_CHECK_=0") + +-- training begin +nerv.info_stderr("begin initial cross validation") +local accu_best = trainer(gconf.batch_size, gconf.lrate, 0, + gconf.cv_scp, nnet_in, nil, num_threads) +local do_halving = false +local accu_new = accu_best + +nerv.info_stderr("initial cross validation: %.3f\n", accu_best) + +for i = 1, max_iter do + + if accu_new >= accu_best then + local sds_rate = math.cos((i-1)*11.0/180*math.pi) + if (sds_rate <= sds_factor) then + sds_rate = sds_factor + end + nerv.info_stderr("iteration %d sds_rate: %.6f", i, sds_rate) + do_sds(gconf.tr_scp, sds_scp, sds_rate) + end + + nnet_out=pf0.."_iter"..i + --print(nnet_out) + print_tag(i) + nerv.info_stderr("[NN] begin iteration %d learning_rate: %.3f batch_size: %d.", i, gconf.lrate, gconf.batch_size) + local accu_tr = trainer(gconf.batch_size, gconf.lrate, 1, + sds_scp, nnet_in, nnet_out, num_threads) + nerv.info_stderr("[TR] end iteration %d frame_accuracy: %.3f.\n", i, accu_tr) + os.execute("sleep " .. 3) + + nnet_out = nnet_out.."_tr"..accu_tr + accu_new = trainer(gconf.batch_size, gconf.lrate, 0, + gconf.cv_scp, nnet_out, nil, num_threads) + nerv.info_stderr("[CV] end iteration %d frame_accuracy: %.3f.\n\n", i, accu_new) + os.execute("sleep " .. 3) + + local nnet_tmp = string.format("%s_%s_iter_%d_lr%f_tr%.3f_cv%.3f", + pf0, + os.date("%Y%m%d%H%M%S"), + i, gconf.lrate, accu_tr, accu_new) + + -- TODO: revert the weights + local accu_diff = accu_new - accu_best + local cmd + if accu_new > accu_best then + accu_best = accu_new + nnet_in = nnet_tmp + gconf.batch_size = gconf.batch_size + 128 + if gconf.batch_size > 1024 then + gconf.batch_size = 1024 + end + else + -- reject + nnet_tmp = nnet_tmp.."_rejected" + do_halving = true + end + cmd = "mv "..nnet_out.." "..nnet_tmp + os.execute(cmd) + + if do_halving and accu_diff < end_halving_inc and i > min_iter then + break; + end + + if accu_diff < start_halving_inc and i >= min_halving then + do_halving = true + end + + if do_halving then + gconf.lrate = gconf.lrate * halving_factor + halving_factor = halving_factor - 0.025 + if halving_factor < 0.6 then + halving_factor = 0.6 + end + end + nerv.info_stderr("iteration %d done!", i) +end + + diff --git a/fastnn/example/asgd_sds_trainer.lua b/fastnn/example/asgd_sds_trainer.lua new file mode 100644 index 0000000..cf1c7a6 --- /dev/null +++ b/fastnn/example/asgd_sds_trainer.lua @@ -0,0 +1,343 @@ + +NERV_ROOT = "/sgfs/users/wd007/src/nerv-2" + +env = string.format([[ +package.path="/home/slhome/wd007/.luarocks/share/lua/5.1/?.lua;/home/slhome/wd007/.luarocks/share/lua/5.1/?/init.lua;%s/install/share/lua/5.1/?.lua;%s/install/share/lua/5.1/?/init.lua;"..package.path; +package.cpath="/home/slhome/wd007/.luarocks/lib/lua/5.1/?.so;%s/install/lib/lua/5.1/?.so;"..package.cpath +local k,l,_=pcall(require,"luarocks.loader") _=k and l.add_context("nerv","scm-1") +]], NERV_ROOT, NERV_ROOT, NERV_ROOT) + +loadstring(env)() + +require 'nerv' + +require 'fastnn' +require 'libhtkio' +require 'threads' + +dofile("fastnn/example/fastnn_baseline.lua") + + + +train_thread_code = [[ +%s + +require 'nerv' +require 'fastnn' +require 'libhtkio' + +dofile("fastnn/example/fastnn_baseline.lua") +os.execute("export MALLOC_CHECK_=0") + +local thread_idx = %d +local feat_repo_shareid = %d +local data_mutex_shareid = %d +local master_shareid = %d +local gpu_shareid = %d +local xent_shareid = %d +local batch_size = %d +local lrate = %f +local bp = %d +local scp_file = '%s' +local nnet_in = '%s' +local nnet_out = '%s' + +local share_mutex = threads.Mutex(data_mutex_shareid) +local share_master = fastnn.ModelSync(master_shareid) +local share_gpu = fastnn.CDevice(gpu_shareid) +local share_xent = fastnn.CXent(xent_shareid) + +if bp == 0 then + bp = false +else + bp = true + gconf.tr_scp = scp_file +end + +share_mutex:lock() + +gconf.randomize = bp +gconf.lrate = lrate +gconf.batch_size = batch_size +gconf.initialized_param[2] = nnet_in +nerv.info_stderr("input network: %%s", gconf.initialized_param[2]) +--nerv.info_stderr(gconf.randomize) +nerv.info_stderr("input batch_size: %%d", gconf.batch_size) +nerv.info_stderr("input scp_file: %%s", scp_file) +nerv.info_stderr("input lrate: %%f", gconf.lrate) + + + +share_gpu:select_gpu() + +nerv.context = nerv.CCuContext() +--print(nerv.context) + +nerv.info_stderr("thread %%d loading parameters ...", thread_idx) +local param_repo = nerv.ParamRepo() +param_repo:import(gconf.initialized_param, nil, gconf) +local layer_repo = make_layer_repo(param_repo) +local network = get_network(layer_repo) +local global_transf = get_global_transf(layer_repo) + +share_mutex:unlock() + +local buffer = make_buffer(make_readers(nil, layer_repo, feat_repo_shareid, data_mutex_shareid)) + +local input_order = get_input_order() + + -- initialize the network + network:init(gconf.batch_size) + gconf.cnt = 0 + err_input = {nerv.CuMatrixFloat(gconf.batch_size, 1)} + err_input[1]:fill(1) + + share_master:Initialize(network) + share_master:SyncInc() + + for data in buffer.get_data, buffer do + + gconf.cnt = gconf.cnt + 1 + if gconf.cnt == 2000 then + print_stat(layer_repo) + gconf.cnt = 0 + end + + local input = {} + + for i, e in ipairs(input_order) do + local id = e.id + if data[id] == nil then + nerv.error("input data %%s not found", id) + end + local transformed + if e.global_transf then + transformed = nerv.speech_utils.global_transf(data[id], + global_transf, + gconf.frm_ext or 0, 0, + gconf) + else + transformed = data[id] + end + table.insert(input, transformed) + end + + local output = {nerv.CuMatrixFloat(gconf.batch_size, 1)} + err_output = {} + for i = 1, #input do + table.insert(err_output, input[i]:create()) + end + + network:propagate(input, output) + + if bp then + network:back_propagate(err_input, err_output, input, output) + network:gradient(err_input, input, output) + + share_master:LockModel() + share_master:WeightToD(network) + network:update_gradient() + -- network:update(err_input, input, output) + share_master:WeightFromD(network) + share_master:UnLockModel() + end + + -- collect garbage in-time to save GPU memory + collectgarbage("collect") + end + + --print_stat(network_node_repo) + local ce_crit = layer_repo:get_layer("ce_crit") + local xent = fastnn.CXent(ce_crit.total_frames, ce_crit.total_correct, ce_crit.total_ce, ce_crit.total_ce) + + share_master:LockModel() + share_xent:add(xent) + share_master:SyncDec() + --print(string.format("ThreadCount: %%d", share_master:ThreadCount())) + if share_master:ThreadCount() == 0 and bp then + share_master:WeightToD(network) + local fname = string.format("%%s_tr%%.3f", + nnet_out, frame_acc(share_xent)) + nerv.info_stderr("writing back %%s ...", fname) + network:get_params():export(fname, nil) + end + share_master:UnLockModel() +]] + + +function get_train_thread(train_thread_code, env, thread_idx, feat_repo_shareid, + data_mutex_shareid, master_shareid, gpu_shareid, xent_shareid, + batch_size, lrate, bp, scp_file, nnet_in, nnet_out) + return string.format(train_thread_code, env, thread_idx, feat_repo_shareid, + data_mutex_shareid, master_shareid, gpu_shareid, xent_shareid, + batch_size, lrate, bp, scp_file, nnet_in, nnet_out) +end + +function trainer(batch_size, lrate, bp, scp_file, nnet_in, nnet_out, num_threads) + local train_threads={} + local trainer = {} + local num_threads=num_threads + + local data_mutex = threads.Mutex() + local data_mutex_shareid = data_mutex:id() + + local master = fastnn.CModelSync() + local master_shareid = master:id() + --print(master) + + local xent = fastnn.CXent() + local xent_shareid = xent:id() + --print(xent) + + local gpu = fastnn.CDevice() + local gpu_shareid = gpu:id() + --print(gpu_shareid) + gpu:init() + + local feat_repo = nerv.TNetFeatureRepo(scp_file, gconf.htk_conf, gconf.frm_ext) + local feat_repo_shareid = feat_repo:id() + + for i=1,num_threads,1 do + + train_threads[i] = get_train_thread(train_thread_code, env, i, feat_repo_shareid, + data_mutex_shareid, master_shareid, gpu_shareid, xent_shareid, + batch_size, lrate, bp, scp_file, nnet_in, nnet_out) + --print(train_threads[i]) + trainer[i] = threads.Thread(train_threads[i]) + end + + nerv.info_stderr('| waiting for thread...') + + for i=1,num_threads,1 do + trainer[i]:free() + end + + print_xent(xent) + + nerv.info_stderr('| all thread finished!') + + return frame_acc(xent) +end + +function get_filename(fname) + return string.gsub((string.gsub(fname, "(.*/)(.*)", "%2")),"(.*)%..*", "%1") +end + +function do_sds(tr_scp, sds_scp, sds_rate) + math.randomseed(os.time()) + local scp_file = io.open(tr_scp, "r") + local sds_file = io.open(sds_scp, "w") + for line in scp_file:lines() do + rate = math.random() + if (rate < sds_rate) then + sds_file:write(line.."\n") + end + end + scp_file:close() + sds_file:close() +end + +function print_tag(iter) + io.stderr:write(string.format("########################################################\n")) + io.stderr:write(string.format("# NN TRAINING ITERATION:%d, %s\n", iter, os.date())) + io.stderr:write(string.format("########################################################\n")) +end + + +start_halving_inc = 0.5 +halving_factor = 0.8 +end_halving_inc = 0.1 +min_iter = 1 +max_iter = 20 +min_halving = 0 +gconf.batch_size = 256 +pf0 = get_filename(gconf.initialized_param[2]) +nnet_in = gconf.initialized_param[2] +nnet_out = "" +sds_scp = "tr_sds_"..string.format("%.4d", math.random()*10000)..".scp" --"tr_sds.scp" +sds_factor = 0.4 +num_threads = 2 +global_option = nil + +os.execute("export MALLOC_CHECK_=0") +print_gconf() + +-- training begin +nerv.info_stderr("begin initial cross validation") +accu_best = trainer(gconf.batch_size, gconf.lrate, 0, + gconf.cv_scp, nnet_in, "", num_threads) +local do_halving = false +local accu_new = accu_best + +nerv.info_stderr("initial cross validation: %.3f\n", accu_best) + +for i = 1, max_iter do + + if accu_new >= accu_best then + local sds_rate = math.cos((i-1)*11.0/180*math.pi) + if (sds_rate <= sds_factor) then + sds_rate = sds_factor + end + nerv.info_stderr("iteration %d sds_rate: %.6f", i, sds_rate) + do_sds(gconf.tr_scp, sds_scp, sds_rate) + end + + nnet_out=pf0.."_iter"..i + --print(nnet_out) + print_tag(i) + nerv.info_stderr("[NN] begin iteration %d learning_rate: %.3f batch_size: %d.", i, gconf.lrate, gconf.batch_size) + accu_tr = trainer(gconf.batch_size, gconf.lrate, 1, + sds_scp, nnet_in, nnet_out, num_threads) + collectgarbage("collect") + nerv.info_stderr("[TR] end iteration %d frame_accuracy: %.3f.\n", i, accu_tr) + os.execute("sleep " .. 3) + + nnet_out = nnet_out.."_tr"..accu_tr + accu_new = trainer(gconf.batch_size, gconf.lrate, 0, + gconf.cv_scp, nnet_out, "", num_threads) + collectgarbage("collect") + nerv.info_stderr("[CV] end iteration %d frame_accuracy: %.3f.\n\n", i, accu_new) + os.execute("sleep " .. 3) + + local nnet_tmp = string.format("%s_%s_iter_%d_lr%f_tr%.3f_cv%.3f", + pf0, + os.date("%Y%m%d%H%M%S"), + i, gconf.lrate, accu_tr, accu_new) + + -- TODO: revert the weights + local accu_diff = accu_new - accu_best + local cmd + if accu_new > accu_best then + accu_best = accu_new + nnet_in = nnet_tmp + gconf.batch_size = gconf.batch_size + 128 + if gconf.batch_size > 1024 then + gconf.batch_size = 1024 + end + else + -- reject + nnet_tmp = nnet_tmp.."_rejected" + do_halving = true + end + cmd = "mv "..nnet_out.." "..nnet_tmp + os.execute(cmd) + + if do_halving and accu_diff < end_halving_inc and i > min_iter then + break; + end + + if accu_diff < start_halving_inc and i >= min_halving then + do_halving = true + end + + if do_halving then + gconf.lrate = gconf.lrate * halving_factor + halving_factor = halving_factor - 0.025 + if halving_factor < 0.6 then + halving_factor = 0.6 + end + end + nerv.info_stderr("iteration %d done!", i) +end + + diff --git a/fastnn/example/fastnn_baseline.lua b/fastnn/example/fastnn_baseline.lua new file mode 100644 index 0000000..6e774de --- /dev/null +++ b/fastnn/example/fastnn_baseline.lua @@ -0,0 +1,258 @@ +require 'htk_io' + +gconf = {lrate = 0.2, wcost = 1e-6, momentum = 0.9, + cumat_type = nerv.CuMatrixFloat, + mmat_type = nerv.MMatrixFloat, + frm_ext = 5, + frm_trim = 5, + batch_size = 256, + buffer_size = 81920, + rearrange = true, + tr_scp = "/sgfs/users/wd007/asr/baseline_chn_50h/finetune/finetune_baseline/train.scp", + cv_scp = "/sgfs/users/wd007/asr/baseline_chn_50h/finetune/finetune_baseline/train_cv.scp", + htk_conf = "/sgfs/users/wd007/asr/baseline_chn_50h/finetune/finetune_baseline/fbank_d_a_z.conf", + initialized_param = {"/sgfs/users/wd007/src/nerv/tools/nerv.global.transf", + "/sgfs/users/wd007/src/nerv/tools/nerv.svd0.55_3000h_iter1.init"}, + debug = false} + +function make_layer_repo(param_repo) + local layer_repo = nerv.LayerRepo( + { + -- global transf + ["nerv.BiasLayer"] = + { + blayer1 = {{bias = "bias1"}, {dim_in = {1320}, dim_out = {1320}}}, + }, + ["nerv.WindowLayer"] = + { + wlayer1 = {{window = "window1"}, {dim_in = {1320}, dim_out = {1320}}}, + }, + -- biased linearity + ["nerv.AffineLayer"] = + { + affine0 = {{ltp = "affine0_ltp", bp = "affine0_bp"}, + {dim_in = {1320}, dim_out = {2048}}}, + affine1 = {{ltp = "affine1_ltp", bp = "affine1_bp"}, + {dim_in = {2048}, dim_out = {367}}}, + affine2 = {{ltp = "affine2_ltp", bp = "affine2_bp"}, + {dim_in = {367}, dim_out = {2048}}}, + affine3 = {{ltp = "affine3_ltp", bp = "affine3_bp"}, + {dim_in = {2048}, dim_out = {408}}}, + affine4 = {{ltp = "affine4_ltp", bp = "affine4_bp"}, + {dim_in = {408}, dim_out = {2048}}}, + affine5 = {{ltp = "affine5_ltp", bp = "affine5_bp"}, + {dim_in = {2048}, dim_out = {368}}}, + affine6 = {{ltp = "affine6_ltp", bp = "affine6_bp"}, + {dim_in = {368}, dim_out = {2048}}}, + affine7 = {{ltp = "affine7_ltp", bp = "affine7_bp"}, + {dim_in = {2048}, dim_out = {303}}}, + affine8 = {{ltp = "affine8_ltp", bp = "affine8_bp"}, + {dim_in = {303}, dim_out = {2048}}}, + affine9 = {{ltp = "affine9_ltp", bp = "affine9_bp"}, + {dim_in = {2048}, dim_out = {277}}}, + affine10 = {{ltp = "affine10_ltp", bp = "affine10_bp"}, + {dim_in = {277}, dim_out = {2048}}}, + affine11 = {{ltp = "affine11_ltp", bp = "affine11_bp"}, + {dim_in = {2048}, dim_out = {361}}}, + affine12 = {{ltp = "affine12_ltp", bp = "affine12_bp"}, + {dim_in = {361}, dim_out = {2048}}}, + affine13 = {{ltp = "affine13_ltp", bp = "affine13_bp"}, + {dim_in = {2048}, dim_out = {441}}}, + affine14 = {{ltp = "affine14_ltp", bp = "affine14_bp"}, + {dim_in = {441}, dim_out = {10092}}}, + }, + ["nerv.SigmoidLayer"] = + { + sigmoid0 = {{}, {dim_in = {2048}, dim_out = {2048}}}, + sigmoid1 = {{}, {dim_in = {2048}, dim_out = {2048}}}, + sigmoid2 = {{}, {dim_in = {2048}, dim_out = {2048}}}, + sigmoid3 = {{}, {dim_in = {2048}, dim_out = {2048}}}, + sigmoid4 = {{}, {dim_in = {2048}, dim_out = {2048}}}, + sigmoid5 = {{}, {dim_in = {2048}, dim_out = {2048}}}, + sigmoid6 = {{}, {dim_in = {2048}, dim_out = {2048}}}, + }, + ["nerv.SoftmaxCELayer"] = -- softmax + ce criterion layer for finetune output + { + ce_crit = {{}, {dim_in = {10092, 1}, dim_out = {1}, compressed = true}} + }, + ["nerv.SoftmaxLayer"] = -- softmax for decode output + { + softmax = {{}, {dim_in = {10092}, dim_out = {10092}}} + } + }, param_repo, gconf) + + layer_repo:add_layers( + { + ["nerv.DAGLayer"] = + { + global_transf = {{}, { + dim_in = {1320}, dim_out = {1320}, + sub_layers = layer_repo, + connections = + { + ["<input>[1]"] = "blayer1[1]", + ["blayer1[1]"] = "wlayer1[1]", + ["wlayer1[1]"] = "<output>[1]" + } + }}, + main = {{}, { + dim_in = {1320}, dim_out = {10092}, + sub_layers = layer_repo, + connections = { + ["<input>[1]"] = "affine0[1]", + ["affine0[1]"] = "sigmoid0[1]", + ["sigmoid0[1]"] = "affine1[1]", + ["affine1[1]"] = "affine2[1]", + ["affine2[1]"] = "sigmoid1[1]", + ["sigmoid1[1]"] = "affine3[1]", + ["affine3[1]"] = "affine4[1]", + ["affine4[1]"] = "sigmoid2[1]", + ["sigmoid2[1]"] = "affine5[1]", + ["affine5[1]"] = "affine6[1]", + ["affine6[1]"] = "sigmoid3[1]", + ["sigmoid3[1]"] = "affine7[1]", + ["affine7[1]"] = "affine8[1]", + ["affine8[1]"] = "sigmoid4[1]", + ["sigmoid4[1]"] = "affine9[1]", + ["affine9[1]"] = "affine10[1]", + ["affine10[1]"] = "sigmoid5[1]", + ["sigmoid5[1]"] = "affine11[1]", + ["affine11[1]"] = "affine12[1]", + ["affine12[1]"] = "sigmoid6[1]", + ["sigmoid6[1]"] = "affine13[1]", + ["affine13[1]"] = "affine14[1]", + ["affine14[1]"] = "<output>[1]", + } + }} + } + }, param_repo, gconf) + + layer_repo:add_layers( + { + ["nerv.DAGLayer"] = + { + ce_output = {{}, { + dim_in = {1320, 1}, dim_out = {1}, + sub_layers = layer_repo, + connections = { + ["<input>[1]"] = "main[1]", + ["main[1]"] = "ce_crit[1]", + ["<input>[2]"] = "ce_crit[2]", + ["ce_crit[1]"] = "<output>[1]" + } + }}, + softmax_output = {{}, { + dim_in = {1320}, dim_out = {10092}, + sub_layers = layer_repo, + connections = { + ["<input>[1]"] = "main[1]", + ["main[1]"] = "softmax[1]", + ["softmax[1]"] = "<output>[1]" + } + }} + } + }, param_repo, gconf) + + return layer_repo +end + + +function get_network(layer_repo) + return layer_repo:get_layer("ce_output") +end + +function get_decode_network(layer_repo) + return layer_repo:get_layer("softmax_output") +end + +function get_global_transf(layer_repo) + return layer_repo:get_layer("global_transf") +end + + + +function make_readers(scp_file, layer_repo, feat_repo_shareid, data_mutex_shareid) + return { + {reader = nerv.TNetReader(gconf, + { + id = "main_scp", + scp_file = scp_file, + conf_file = gconf.htk_conf, + frm_ext = gconf.frm_ext, + mlfs = { + phone_state = { + file = "/sgfs/users/wd007/asr/baseline_chn_50h/finetune/finetune_baseline/ref.mlf", + format = "map", + format_arg = "/sgfs/users/wd007/asr/baseline_chn_50h/finetune/finetune_baseline/dict", + dir = "*/", + ext = "lab" + } + }, + global_transf = layer_repo:get_layer("global_transf") + }, feat_repo_shareid, data_mutex_shareid), + data = {main_scp = 1320, phone_state = 1}} + } +end + +function get_feat_id() + return {main_scp = true} +end + + +function make_buffer(readers) + return nerv.SGDBuffer(gconf, + { + buffer_size = gconf.buffer_size, + randomize = gconf.randomize, + readers = readers, + use_gpu = true + }) +end + +function get_input_order() + return {{id = "main_scp", global_transf = true}, + {id = "phone_state"}} +end + +function get_accuracy(layer_repo) + local ce_crit = layer_repo:get_layer("ce_crit") + return ce_crit.total_correct / ce_crit.total_frames * 100 +end + +function print_stat(layer_repo) + local ce_crit = layer_repo:get_layer("ce_crit") + nerv.info("*** training stat begin ***") + nerv.printf("cross entropy:\t\t%.8f\n", ce_crit.total_ce) + nerv.printf("correct:\t\t%d\n", ce_crit.total_correct) + nerv.printf("frames:\t\t\t%d\n", ce_crit.total_frames) + nerv.printf("err/frm:\t\t%.8f\n", ce_crit.total_ce / ce_crit.total_frames) + nerv.printf("accuracy:\t\t%.3f%%\n", get_accuracy(layer_repo)) + nerv.info("*** training stat end ***") +end + +function print_xent(xent) + local totalframes = xent:totalframes() + local loss = xent:loss() + local correct = xent:correct() + nerv.info_stderr("*** training statistics info begin ***") + nerv.info_stderr("total frames:\t\t%d", totalframes) + nerv.info_stderr("cross entropy:\t%.8f", loss/totalframes) + nerv.info_stderr("frame accuracy:\t%.3f%%", 100*correct/totalframes) + nerv.info_stderr("*** training statistics info end ***") +end + +function frame_acc(xent) + local correct = xent:correct() + local totalframes = xent:totalframes() + return string.format("%.3f", 100*correct/totalframes) +end + +function print_gconf() + nerv.info_stderr("%s \t:= %s", "network", gconf.initialized_param[1]) + nerv.info_stderr("%s \t:= %s", "transf", gconf.initialized_param[2]) + nerv.info_stderr("%s \t:= %s", "batch_size", gconf.batch_size) + nerv.info_stderr("%s \t:= %s", "buffer_size", gconf.buffer_size) + nerv.info_stderr("%s \t:= %s", "lrate", gconf.lrate) + nerv.info_stderr("%s \t:= %s", "tr_scp", gconf.tr_scp) + nerv.info_stderr("%s \t:= %s", "cv_scp", gconf.cv_scp) +end diff --git a/fastnn/fastnn-scm-1.rockspec b/fastnn/fastnn-scm-1.rockspec new file mode 100644 index 0000000..69dfb60 --- /dev/null +++ b/fastnn/fastnn-scm-1.rockspec @@ -0,0 +1,36 @@ +package = "fastnn" +version = "scm-1" +source = { + url = "https://github.com/uphantom/nerv-fastnn.git" +} +description = { + summary = "speed up Nerv neural network training.", + detailed = [[ + ]], + homepage = "https://github.com/uphantom/nerv-fastnn", + license = "BSD" +} +dependencies = { + "nerv >= scm-1", + "lua >= 5.1" +} +build = { + type = "make", + build_variables = { + CFLAGS="$(CFLAGS)", + LIBFLAG="$(LIBFLAG)", + LUA_LIBDIR="$(LUA_LIBDIR)", + LUA_BINDIR="$(LUA_BINDIR)", + LUA_INCDIR="$(LUA_INCDIR)", + INST_PREFIX="$(PREFIX)", + LUA="$(LUA)", + }, + install_variables = { + LUA_BINDIR="$(LUA_BINDIR)", + INST_PREFIX="$(PREFIX)", + INST_BINDIR="$(BINDIR)", + INST_LIBDIR="$(LIBDIR)", + INST_LUADIR="$(LUADIR)", + INST_CONFDIR="$(CONFDIR)", + }, +} diff --git a/fastnn/init.c b/fastnn/init.c new file mode 100644 index 0000000..2022293 --- /dev/null +++ b/fastnn/init.c @@ -0,0 +1,43 @@ +#include <lua.h> +#include <lauxlib.h> + +#if LUA_VERSION_NUM == 501 +static void luaL_setfuncs(lua_State *L, const luaL_Reg *l, int nup) +{ + luaL_checkstack(L, nup+1, "too many upvalues"); + for (; l->name != NULL; l++) { /* fill the table with given functions */ + int i; + lua_pushstring(L, l->name); + for (i = 0; i < nup; i++) /* copy upvalues to the top */ + lua_pushvalue(L, -(nup+1)); + lua_pushcclosure(L, l->func, nup); /* closure with those upvalues */ + lua_settable(L, -(nup + 3)); + } + lua_pop(L, nup); /* remove upvalues */ +} +#endif + + +extern void fastnn_init_modelsync(lua_State *L); +extern void fastnn_init_example(lua_State *L); +extern void fastnn_init_example_repo(lua_State *L); +extern void fastnn_init_device(lua_State *L); +extern void fastnn_init_context(lua_State *L); + +int luaopen_libfastnn(lua_State *L) +{ + //printf("%s %lx\n", model_sync__[13].name, model_sync__[13].func); + lua_newtable(L); + /* duplicate table */ + lua_pushvalue(L, -1); + /* set table to global index */ + lua_setfield(L, LUA_GLOBALSINDEX, "fastnn"); + + fastnn_init_modelsync(L); + fastnn_init_example(L); + fastnn_init_example_repo(L); + fastnn_init_device(L); + fastnn_init_context(L); + return 1; +} + diff --git a/fastnn/init.lua b/fastnn/init.lua new file mode 100644 index 0000000..5e290ca --- /dev/null +++ b/fastnn/init.lua @@ -0,0 +1,11 @@ +require 'libfastnn' + +function fastnn.include(filename) + local caller = debug.getinfo(2, "S").source:sub(2) + dofile(nerv.dirname(caller) .. filename) +end + +fastnn.include('lib/modelsync.lua') +fastnn.include('io/example.lua') +fastnn.include('device/device.lua') + diff --git a/fastnn/io/Example.cpp b/fastnn/io/Example.cpp new file mode 100644 index 0000000..8f200b7 --- /dev/null +++ b/fastnn/io/Example.cpp @@ -0,0 +1,186 @@ + +#include <deque> +#include <vector> +#include <string> + + +extern "C" { + +#include "../threads/lib/THThread.h" +#include "Example.h" +#include "stdlib.h" +#include "stdio.h" + +#include "../../nerv/lib/matrix/generic/elem_type.h" +#include "common.h" + +extern Matrix* nerv_matrix_cuda_float_create(long nrow, long ncol, Status *status); +void nerv_matrix_cuda_float_copy_fromd(Matrix *a, const Matrix *b, + int a_begin, int b_begin, int b_end, + Status *status); + +struct Example +{ + std::vector<Matrix *> inputs; + std::string id; + int refcount; +}; + +struct ExamplesRepository +{ + int buffer_size_; + THSemaphore *full_semaphore_; + THSemaphore *empty_semaphore_; + THMutex *examples_mutex_; + + std::deque<Example*> examples_; + bool done_; + int refcount; + int gpuid; +}; + +Example* Example_new() +{ + Example *example = new Example; //(Example*)malloc(sizeof(Example)); + example->refcount = 1; + return example; +} + +Example* Example_newWithId(long id) +{ + Example* example = (Example*)(id); + __sync_fetch_and_add(&example->refcount, 1); + return example; +} + +long Example_id(Example *example) +{ + return (long)(example); +} + +void Example_destroy(Example* example) +{ + //printf("Example_destroy: %d\n", example->inputs.size()); + if (NULL != example && __sync_fetch_and_add(&example->refcount, -1) == 1) + { + delete example; + example = NULL; + } +} + +int Example_size(Example* example) +{ + return example->inputs.size(); +} + +Matrix* Example_at(Example* example, int idx) +{ + return example->inputs.at(idx); +} + +void Example_pushback(Example* example, Matrix* m) +{ + Status status; + Matrix *newm = nerv_matrix_cuda_float_create(m->nrow, m->ncol, &status); + nerv_matrix_cuda_float_copy_fromd(newm, m, 0, 0, m->nrow, &status); + //__sync_fetch_and_add(&m->refcount, 1); + return example->inputs.push_back(newm); +} + +////////////////////////////////////////////////////////////// + + +ExamplesRepository* ExamplesRepository_new(int buffersize) +{ + ExamplesRepository *repo = new ExamplesRepository; //(ExamplesRepository*)malloc(sizeof(ExamplesRepository)); + repo->buffer_size_ = buffersize; + repo->full_semaphore_ = THSemaphore_new(0); + repo->empty_semaphore_ = THSemaphore_new(buffersize); + repo->examples_mutex_ = THMutex_new(); +// repo->examples_ = new std::deque<Example*>(); + repo->done_ = false; + repo->refcount = 1; + repo->gpuid = -1; + return repo; +} + +ExamplesRepository* ExamplesRepository_newWithId(long id) +{ + ExamplesRepository *repo = (ExamplesRepository*)(id); + __sync_fetch_and_add(&repo->refcount, 1); + return repo; +} + +long ExamplesRepository_id(ExamplesRepository* repo) +{ + return (long)(repo); +} + +int ExamplesRepository_getGpuId(ExamplesRepository* repo) +{ + return repo->gpuid; +} + +void ExamplesRepository_setGpuId(ExamplesRepository* repo, int gpuid) +{ + repo->gpuid = gpuid; +} + +void ExamplesRepository_destroy(ExamplesRepository* repo) +{ + if (NULL != repo && __sync_fetch_and_add(&repo->refcount, -1) == 1) + { + if (repo->full_semaphore_) + THSemaphore_free(repo->full_semaphore_); + if (repo->empty_semaphore_) + THSemaphore_free(repo->empty_semaphore_); + if (repo->examples_mutex_) + THMutex_free(repo->examples_mutex_); + delete repo; + repo = NULL; + } +} + +void AcceptExample(ExamplesRepository *repo, Example *example) +{ + THSemaphore_wait(repo->empty_semaphore_); + THMutex_lock(repo->examples_mutex_); + __sync_fetch_and_add(&example->refcount, 1); + repo->examples_.push_back(example); + THMutex_unlock(repo->examples_mutex_); + THSemaphore_signal(repo->full_semaphore_); +} + +void ExamplesDone(ExamplesRepository *repo) +{ + for (int i = 0; i < repo->buffer_size_; i++) + THSemaphore_wait(repo->empty_semaphore_); + + repo->done_ = true; + THSemaphore_signal(repo->full_semaphore_); +} + +Example* ProvideExample(ExamplesRepository *repo) +{ + Example *ans = NULL; + THSemaphore_wait(repo->full_semaphore_); + if (repo->done_) + { + THSemaphore_signal(repo->full_semaphore_); // Increment the semaphore so + // the call by the next thread will not block. + return NULL; // no examples to return-- all finished. + } + else + { + THMutex_lock(repo->examples_mutex_); + ans = repo->examples_.front(); + repo->examples_.pop_front(); + THMutex_unlock(repo->examples_mutex_); + THSemaphore_signal(repo->empty_semaphore_); + } + return ans; +} + +} + + diff --git a/fastnn/io/Example.h b/fastnn/io/Example.h new file mode 100644 index 0000000..1c462ab --- /dev/null +++ b/fastnn/io/Example.h @@ -0,0 +1,49 @@ +#ifndef NERV_FASTNN_EXAMPLE_H +#define NERV_FASTNN_EXAMPLE_H + + +#ifdef __cplusplus +extern "C" { +#endif + +#include "matrix/matrix.h" +#include "stdbool.h" +#define STRLEN 1024 + + + +typedef struct ExamplesRepository ExamplesRepository; +typedef struct Example Example; + +ExamplesRepository* ExamplesRepository_new(int buffersize); +ExamplesRepository* ExamplesRepository_newWithId(long id); +long ExamplesRepository_id(ExamplesRepository* reop); +void ExamplesRepository_destroy(ExamplesRepository* reop); +void AcceptExample(ExamplesRepository *repo, Example *example); +void ExamplesDone(ExamplesRepository *repo); +Example* ProvideExample(ExamplesRepository *repo); +void ExamplesRepository_setGpuId(ExamplesRepository* repo, int gpuid); +int ExamplesRepository_getGpuId(ExamplesRepository* repo); + + +Example* Example_new(); +Example* Example_newWithId(long id); +long Example_id(Example* example); +void Example_destroy(Example* example); +int Example_size(Example* example); +Matrix* Example_at(Example* example, int idx); +void Example_pushback(Example* example, Matrix* m); + + + + + + + + +#ifdef __cplusplus +} // closing brace for extern "C" + +#endif // end Example + +#endif diff --git a/fastnn/io/example.c b/fastnn/io/example.c new file mode 100644 index 0000000..f2f0916 --- /dev/null +++ b/fastnn/io/example.c @@ -0,0 +1,249 @@ + +#include <stdio.h> +#include <stdlib.h> + +#include <lua.h> +#include <lualib.h> +#include <luaT/luaT.h> + + + +#include "../threads/lib/luaTHRD.h" +#include "Example.h" + + +const char *fastnn_example_repo_tname = "fastnn.CExamplesRepo"; +const char *fastnn_example_tname = "fastnn.CExample"; + +static int example_repo_new(lua_State *L) +{ + ExamplesRepository *repo = NULL ; + int buffersize; + bool shared; + long id; + if(lua_gettop(L) == 2 && lua_isboolean(L, 2)) + { + shared = lua_toboolean(L, 2); + if (shared) + { + id = luaL_checkinteger(L, 1); + repo = ExamplesRepository_newWithId(id); + } + else + { + buffersize = luaL_checkinteger(L, 1); + repo = ExamplesRepository_new(buffersize); + } + } + else + luaL_error(L, "example: example repo new invalid arguments"); + if (!repo) + luaL_error(L, "example: example new failed"); + + luaTHRD_pushudata(L, repo, fastnn_example_repo_tname); + + return 1; +} + +static int example_repo_newfromid(lua_State *L) +{ + ExamplesRepository *repo = NULL ; + if(lua_gettop(L) == 1) + { + long id = luaL_checkinteger(L, 1); + repo = ExamplesRepository_newWithId(id); + } + else + luaL_error(L, "example: example repo new invalid arguments"); + if (!repo) + luaL_error(L, "example: example new failed"); + + luaTHRD_pushudata(L, repo, fastnn_example_repo_tname); + + return 1; +} + +static int example_repo_tostring(lua_State *L) +{ + char str[STRLEN]; + ExamplesRepository *repo = luaTHRD_checkudata(L, 1, fastnn_example_repo_tname); + snprintf(str, STRLEN, "%s <%lx>", fastnn_example_repo_tname, ExamplesRepository_id(repo)); + lua_pushstring(L, str); + return 1; +} + +static int example_repo_id(lua_State *L) +{ + ExamplesRepository *repo = luaTHRD_checkudata(L, 1, fastnn_example_repo_tname); + lua_pushinteger(L, ExamplesRepository_id(repo)); + return 1; +} + +static int example_repo_get_gpuid(lua_State *L) +{ + ExamplesRepository *repo = luaTHRD_checkudata(L, 1, fastnn_example_repo_tname); + lua_pushinteger(L, ExamplesRepository_getGpuId(repo)); + return 1; +} + +static int example_repo_set_gpuid(lua_State *L) +{ + ExamplesRepository *repo = luaTHRD_checkudata(L, 1, fastnn_example_repo_tname); + int gpuid = luaL_checkinteger(L, 2); + ExamplesRepository_setGpuId(repo, gpuid); + return 0; +} + +static int example_repo_accept(lua_State *L) +{ + ExamplesRepository *repo = luaTHRD_checkudata(L, 1, fastnn_example_repo_tname); + Example *example = luaTHRD_checkudata(L, 2, fastnn_example_tname); + AcceptExample(repo, example); + return 0; +} + +static int example_repo_provide(lua_State *L) +{ + ExamplesRepository *repo = luaTHRD_checkudata(L, 1, fastnn_example_repo_tname); + Example *example = ProvideExample(repo); + if (NULL != example) + { + luaTHRD_pushudata(L, example, fastnn_example_tname); + return 1; + } + else + return 0; +} + +static int example_repo_done(lua_State *L) +{ + ExamplesRepository *repo = luaTHRD_checkudata(L, 1, fastnn_example_repo_tname); + ExamplesDone(repo); + return 0; +} + +static int example_repo_destroy(lua_State *L) +{ + ExamplesRepository *repo = luaTHRD_checkudata(L, 1, fastnn_example_repo_tname); + ExamplesRepository_destroy(repo); + return 0; +} + +/******************************************/ + +static int example_new(lua_State *L) +{ + + Example *example = NULL; + if(lua_gettop(L) == 0) + { + example = Example_new(); + } + else if(lua_gettop(L) == 1) + { + long id = luaL_checkinteger(L, 1); + example = Example_newWithId(id); + } + else + luaL_error(L, "example: example new invalid arguments"); + if (!example) + luaL_error(L, "example: example failed"); + + luaTHRD_pushudata(L, example, fastnn_example_tname); + + return 1; +} + +static int example_id(lua_State *L) +{ + Example *example = luaTHRD_checkudata(L, 1, fastnn_example_tname); + lua_pushinteger(L, Example_id(example)); + return 1; +} + +static int example_tostring(lua_State *L) +{ + char str[STRLEN]; + Example* example = luaTHRD_checkudata(L, 1, fastnn_example_tname); + snprintf(str, STRLEN, "%s <%lx>", fastnn_example_tname, Example_id(example)); + lua_pushstring(L, str); + return 1; +} + +static int example_size(lua_State *L) +{ + Example *example = luaTHRD_checkudata(L, 1, fastnn_example_tname); + lua_pushinteger(L, Example_size(example)); + return 1; +} + +static int example_at(lua_State *L) +{ + Example *example = luaTHRD_checkudata(L, 1, fastnn_example_tname); + int idx = luaL_checkinteger(L, 2); + Matrix *m = Example_at(example, idx); + luaTHRD_pushudata(L, m, "nerv.CuMatrixFloat"); + //luaT_pushudata(L, m, "nerv.CuMatrixFloat"); + return 1; +} + +static int example_pushback(lua_State *L) +{ + Example *example = luaTHRD_checkudata(L, 1, fastnn_example_tname); + Matrix *m = luaTHRD_checkudata(L, 2, "nerv.CuMatrixFloat"); + //Matrix *m = luaT_checkudata(L, 2, "nerv.CuMatrixFloat"); + Example_pushback(example, m); + return 0; +} + +static int example_destroy(lua_State *L) +{ + Example *example = luaTHRD_checkudata(L, 1, fastnn_example_tname); + Example_destroy(example); + //printf("example_destroy ... end\n"); + return 0; +} + + +static const struct luaL_Reg example_repo__ [] = { + {"new", example_repo_new}, + {"newfromid", example_repo_newfromid}, + {"__tostring", example_repo_tostring}, + {"id", example_repo_id}, + {"get_gpuid", example_repo_get_gpuid}, + {"set_gpuid", example_repo_set_gpuid}, + {"accept", example_repo_accept}, + {"provide", example_repo_provide}, + {"done", example_repo_done}, + {"free", example_repo_destroy}, + {NULL, NULL} +}; + +static const struct luaL_Reg example__ [] = { + {"new", example_new}, + {"__tostring", example_tostring}, + {"id", example_id}, + {"size", example_size}, + {"at", example_at}, + {"pushback", example_pushback}, + {"free", example_destroy}, + {NULL, NULL} +}; + +void fastnn_init_example(lua_State *L) +{ + luaT_newmetatable(L, fastnn_example_tname, NULL, example_new, example_destroy, NULL); + luaL_register(L, NULL, example__); + lua_pop(L, 1); +} + +void fastnn_init_example_repo(lua_State *L) +{ + luaT_newmetatable(L, fastnn_example_repo_tname, NULL, example_repo_new, example_repo_destroy, NULL); + luaL_register(L, NULL, example_repo__); + lua_pop(L, 1); +} + + + + diff --git a/fastnn/io/example.lua b/fastnn/io/example.lua new file mode 100644 index 0000000..8808791 --- /dev/null +++ b/fastnn/io/example.lua @@ -0,0 +1,38 @@ + +local T = require 'libthreads' +local C = require 'libfastnn' + +local ExamplesRepo = nerv.class("fastnn.ExamplesRepo") +local Example = nerv.class("fastnn.Example") + +fastnn.CExamplesRepo = C.CExamplesRepo +fastnn.CExample = C.CExample + +function ExamplesRepo:__init(shareid) + if nil ~= shareid then + -- print(shareid) + self.repo = C.CExamplesRepo(shareid, true) + -- print(self.repo:__tostring()) + end +end + + +function Example:PrepareData(data, global_transf, trains_id) + local example = fastnn.CExample() + if nil ~= data then + for id, cm in pairs(data) do + if trains_id[id] and nil ~= global_transf then + local tcm = nerv.speech_utils.normalize(cm, global_transf) + example:pushback(tcm) + else + example:pushback(cm) + end + + end + end + + return example +end + + + diff --git a/fastnn/lib/ModelSync.c b/fastnn/lib/ModelSync.c new file mode 100644 index 0000000..bd511ea --- /dev/null +++ b/fastnn/lib/ModelSync.c @@ -0,0 +1,305 @@ + +#include "ModelSync.h" +#include "../../nerv/lib/matrix/cuda_helper.h" +#include "../../nerv/lib/matrix/generic/elem_type.h" +#include "common.h" +#include <string.h> + + +ModelSync* ModelSync_new(void) +{ + ModelSync *self = (ModelSync*)malloc(sizeof(ModelSync)); + if (NULL != self) + { + self->model_mutex = THMutex_new(); + self->state_mutex = THMutex_new(); + self->initialized_ = false; + self->dim_ = 0; + self->pos_ = 0; + self->data_ = NULL; + self->free_data_ = NULL; + self->data_ = NULL; + self->refcount = 1; + self->threadcount = 0; + } + return self; +} + +ModelSync* ModelSync_newWithId(long id) +{ + ModelSync *self = (ModelSync*)id; + __sync_fetch_and_add(&self->refcount, 1); + return self; +} + +long ModelSync_id(ModelSync *self) +{ + return (long)(self); +} + +int ModelSync_lockmodel(ModelSync *self) +{ + if(THMutex_lock(self->model_mutex)) + return 1; + return 0; +} + +int ModelSync_unlockmodel(ModelSync *self) +{ + if(THMutex_unlock(self->model_mutex)) + return 1; + return 0; + +} +int ModelSync_lockstate(ModelSync *self) +{ + if(THMutex_lock(self->state_mutex)) + return 1; + return 0; +} + +int ModelSync_unlockstate(ModelSync *self) +{ + if(THMutex_unlock(self->state_mutex)) + return 1; + return 0; +} + +int ModelSync_free(ModelSync *self) +{ + if (NULL != self && __sync_fetch_and_add(&self->refcount, -1) == 1) + { + free(self->model_mutex); + free(self->state_mutex); + Status status; + CUDA_SAFE_SYNC_CALL(cudaFreeHost(self->free_data_), &status); + free(self); + } +} + +int ModelSync_initBuffer(ModelSync *self) +{ + if (NULL != self) + { + void *free_data = NULL, *data = NULL; + size_t size = self->dim_ * sizeof(float)+16; + Status status; + CUDA_SAFE_SYNC_CALL(cudaHostAlloc((void**) &free_data, size, cudaHostAllocPortable), &status); + NERV_SET_STATUS(&status, NERV_NORMAL, 0); + + data = (free_data ? (void *)( (((unsigned long)*(&free_data)) + 15) & ~0xFUL ) : NULL) ; + if (NULL != data) + { + self->data_ = (float*)(data); + self->free_data_ = (float*)(free_data); + } + return 0; + } + return 1; +} + +int ModelSync_weightfromd(ModelSync *self, Matrix *dm) +{ + + if (NULL != self && NULL != dm) + { + void *host_data_ = (void*)self->data_; + size_t width = dm->ncol * sizeof(float); + size_t src_pitch = dm->stride; + size_t dst_pitch = src_pitch; + Status status; + + CUDA_SAFE_SYNC_CALL(cudaMemcpy2D(host_data_+self->pos_, dst_pitch, dm->data.f, src_pitch, width, dm->nrow, cudaMemcpyDeviceToHost), &status); + NERV_SET_STATUS(&status, NERV_NORMAL, 0); + self->pos_ += dm->nrow * dm->stride; + return 0; + } + return 1; + +} + + +int ModelSync_weighttod(ModelSync *self, Matrix *dm) +{ + + if (NULL != self && NULL != dm) + { + void *host_data_ = (void*)self->data_; + size_t width = dm->ncol * sizeof(float); + size_t dst_pitch = dm->stride; + size_t src_pitch = dst_pitch; + Status status; + + CUDA_SAFE_SYNC_CALL(cudaMemcpy2D(dm->data.f, dst_pitch, host_data_+self->pos_, src_pitch, width, dm->nrow, cudaMemcpyHostToDevice), &status); + NERV_SET_STATUS(&status, NERV_NORMAL, 0); + + self->pos_ += dm->nrow * dm->stride; + self->initialized_ = true; + return 0; + } + return 1; +} + +void ModelSync_syncinc(ModelSync *self) +{ + __sync_fetch_and_add(&self->threadcount, 1); +} + +void ModelSync_syncdec(ModelSync *self) +{ + __sync_fetch_and_add(&self->threadcount, -1); +} + +int ModelSync_threadcount(ModelSync *self) +{ + return self->threadcount; +} + +///////////////////////////////// + +Xent* Xent_new() +{ + Xent *xent = (Xent*)malloc(sizeof(Xent)); + memset(xent, 0, sizeof(Xent)); + xent->refcount = 1; + return xent; +} + +Xent* Xent_newWithId(long id) +{ + Xent *xent = (Xent*)id; + __sync_fetch_and_add(&xent->refcount, 1); + return xent; +} + +Xent* Xent_newWithParm(size_t frames_, size_t correct_, double loss_, double entropy_) +{ + Xent *xent = (Xent*)malloc(sizeof(Xent)); + xent->frames_ = frames_; + xent->correct_ = correct_; + xent->loss_ = loss_; + xent->entropy_ = entropy_; + xent->refcount = 1; + return xent; +} + +long Xent_id(Xent *xent) +{ + return (long)(xent); +} + +Xent* Xent_add(Xent *a, Xent *b) +{ + a->frames_ += b->frames_; + a->correct_ += b->correct_; + a->loss_ += b->loss_; + a->entropy_ += b->entropy_; + return a; +} + +void Xent_free(Xent *xent) +{ + if (NULL != xent && __sync_fetch_and_add(&xent->refcount, -1) == 1) + { + free(xent); + xent = NULL; + } +} + + +////////////////////////////////// + +Mse* Mse_new() +{ + Mse *mse = (Mse*)malloc(sizeof(Mse)); + memset(mse, 0, sizeof(Mse)); + mse->refcount = 1; + return mse; +} + +Mse* Mse_newWithId(long id) +{ + Mse *mse = (Mse*)id; + __sync_fetch_and_add(&mse->refcount, 1); + return mse; +} + +Mse* Mse_newWithParm(size_t frames_, double loss_) +{ + Mse *mse = (Mse*)malloc(sizeof(Mse)); + mse->frames_ = frames_; + mse->loss_ = loss_; + mse->refcount = 1; + return mse; +} + + +long Mse_id(Mse *mse) +{ + return (long)(mse); +} + +Mse* Mse_add(Mse *a, Mse *b) +{ + a->frames_ += b->frames_; + a->loss_ += b->loss_; + return a; +} + +void Mse_free(Mse *mse) +{ + if (NULL != mse && __sync_fetch_and_add(&mse->refcount, -1) == 1) + { + free(mse); + mse = NULL; + } +} + +////////////////////////////////// + +GlobalOption* GlobalOption_new() +{ + GlobalOption *option = (GlobalOption*)malloc(sizeof(GlobalOption)); + option->refcount = 1; + return option; +} + +GlobalOption* GlobalOption_newWithParm(int batch_size, float lrate, bool bp,const char *tr_scp, const char *cv_scp, const char *transf, const char *network) +{ + GlobalOption *option = (GlobalOption*)malloc(sizeof(GlobalOption)); + option->batch_size = batch_size; + option->lrate = lrate; + option->bp = bp; + strncpy(option->tr_scp, tr_scp, strlen(tr_scp)+1); + strncpy(option->cv_scp, cv_scp, strlen(cv_scp)+1); + strncpy(option->transf, transf, strlen(transf)+1); + strncpy(option->network, network, strlen(network)+1); + option->refcount = 1; + + return option; +} + +GlobalOption* GlobalOption_newWithId(long id) +{ + GlobalOption *option = (GlobalOption*)id; + __sync_fetch_and_add(&option->refcount, 1); + return option; +} + + + +long GlobalOption_id(GlobalOption *option) +{ + return (long)(option); +} + +void GlobalOption_free(GlobalOption *option) +{ + if (NULL != option && __sync_fetch_and_add(&option->refcount, -1) == 1) + { + free(option); + option = NULL; + } +} + + diff --git a/fastnn/lib/ModelSync.h b/fastnn/lib/ModelSync.h new file mode 100644 index 0000000..71216a0 --- /dev/null +++ b/fastnn/lib/ModelSync.h @@ -0,0 +1,119 @@ + +#ifndef NERV_FASTNN_MODELSYNC_H +#define NERV_FASTNN_MODELSYNC_H + +#define STRLEN 1024 + +#include "../threads/lib/THThread.h" +#include "matrix/matrix.h" +#include "stdlib.h" +#include "stdbool.h" + +typedef struct NnetParallelOptions_ +{ + int num_threads; + int merge_size; + int num_merge; + int num_procs; + int threadid; + int myid; + int thread_level; + char merge_func[STRLEN]; + char log_file[STRLEN]; +} NnetParallelOptions; + + +typedef struct ModelSync_ +{ + THMutex *model_mutex; + THMutex *state_mutex; + bool initialized_; + int dim_; + int pos_; + float *data_; + float *free_data_; + int refcount; + int threadcount; +}ModelSync; + +ModelSync *ModelSync_new(void); +ModelSync *ModelSync_newWithId(long id); +int ModelSync_free(ModelSync *self); +long ModelSync_id(ModelSync *self); +int ModelSync_lockmodel(ModelSync *self); +int ModelSync_unlockmodel(ModelSync *self); +int ModelSync_lockstate(ModelSync *self); +int ModelSync_unlockstate(ModelSync *self); +int ModelSync_initBuffer(ModelSync *self); +int ModelSync_weightfromd(ModelSync *self, Matrix *dm); +int ModelSync_weighttod(ModelSync *self, Matrix *dm); +int ModelSync_threadcount(ModelSync *self); +void ModelSync_syncinc(ModelSync *self); +void ModelSync_syncdec(ModelSync *self); + +typedef struct Xent_ +{ + size_t frames_; + size_t correct_; + double loss_; + double entropy_; + int refcount; +} Xent; + +Xent* Xent_new(); +Xent* Xent_newWithId(long id); +Xent* Xent_newWithParm(size_t frames_, size_t correct_, double loss_, double entropy_); +long Xent_id(Xent *xent); +Xent* Xent_add(Xent *a, Xent *b); +void Xent_free(Xent *xent); + +typedef struct Mse_ +{ + size_t frames_; + double loss_; + int refcount; +} Mse; + +Mse* Mse_new(); +Mse* Mse_newWithId(long id); +Mse* Mse_newWithParm(size_t frames_, double loss_); +long Mse_id(Mse *mse); +Mse* Mse_add(Mse *a, Mse *b); +void Mse_free(Mse *mse); + +typedef struct NnetUpdateState_ +{ + int num_utter; + int num_nolabel; + int num_other_error; + long total_frames; + Xent xent; + Mse mse; +} NnetUpdateState; + +typedef struct GlobalOption_ +{ + int batch_size; + float lrate; + bool bp; + char tr_scp[STRLEN]; + char cv_scp[STRLEN]; + char transf[STRLEN]; + char network[STRLEN]; + int refcount; +}GlobalOption; + + +GlobalOption* GlobalOption_new(); +GlobalOption* GlobalOption_newWithParm(int batch_size, float lrate, bool bp, const char *tr_scp, const char *cv_scp, const char *transf, const char *network); +GlobalOption* GlobalOption_newWithId(long id); +long GlobalOption_id(GlobalOption *option); +void GlobalOption_free(GlobalOption *option); + + + + +#endif + + + diff --git a/fastnn/lib/modelsync.c b/fastnn/lib/modelsync.c new file mode 100644 index 0000000..2b52752 --- /dev/null +++ b/fastnn/lib/modelsync.c @@ -0,0 +1,532 @@ + +#include <stdio.h> +#include <stdlib.h> + +#include <lua.h> +#include <lualib.h> +#include <luaT/luaT.h> + + + +#include "ModelSync.h" +#include "../threads/lib/luaTHRD.h" + +const char *fastnn_model_sync_tname = "fastnn.CModelSync"; +const char *fastnn_xent_tname = "fastnn.CXent"; +const char *fastnn_mse_tname = "fastnn.CMse"; +const char *fastnn_global_option_tname = "fastnn.CGlobalOption"; + +static int model_sync_new(lua_State *L) +{ + ModelSync *model_sync = NULL; + if(lua_gettop(L) == 0) + { + model_sync = ModelSync_new(); + } + else if(lua_gettop(L) == 1) + { + long id = luaL_checklong(L, 1); + model_sync = ModelSync_newWithId(id); + } + else + luaL_error(L, "modelsync: modelsync new invalid arguments"); + if(!model_sync) + luaL_error(L, "modelsync: modelsync new failed"); + + luaTHRD_pushudata(L, model_sync, fastnn_model_sync_tname); + return 1; +} + + +static int model_sync_tostring(lua_State *L) +{ + char str[STRLEN]; + ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname); + snprintf(str, STRLEN, "fastnn.modelsync <%lx>", ModelSync_id(model_sync)); + lua_pushstring(L, str); + return 1; +} + +static int model_sync_id(lua_State *L) +{ + ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname); + lua_pushinteger(L, ModelSync_id(model_sync)); + return 1; +} + +static int model_sync_lockmodel(lua_State *L) +{ + ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname); + if (ModelSync_lockmodel(model_sync)) + luaL_error(L, "modelsync: model lock failed"); + return 0; +} + +static int model_sync_unlockmodel(lua_State *L) +{ + ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname); + if (ModelSync_unlockmodel(model_sync)) + luaL_error(L, "modelsync: model unlock failed"); + return 0; +} + +static int model_sync_lockstate(lua_State *L) +{ + ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname); + if (ModelSync_lockstate(model_sync)) + luaL_error(L, "modelsync: state lock failed"); + return 0; +} + +static int model_sync_unlockstate(lua_State *L) +{ + ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname); + if (ModelSync_unlockstate(model_sync)) + luaL_error(L, "modelsync: state unlock failed"); + return 0; +} + +static int model_sync_free(lua_State *L) +{ + ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname); + ModelSync_free(model_sync); + return 0; +} + +static int model_sync_initbuffer(lua_State *L) +{ + ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname); + model_sync->dim_ = luaL_checkinteger(L, 2); + ModelSync_initBuffer(model_sync); + return 0; +} + +static int model_sync_weightfromd(lua_State *L) +{ + ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname); + Matrix *dm = luaT_checkudata(L, 2, "nerv.CuMatrixFloat"); + ModelSync_weightfromd(model_sync, dm); + return 0; +} + +static int model_sync_weighttod(lua_State *L) +{ + ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname); + Matrix *dm = luaT_checkudata(L, 2, "nerv.CuMatrixFloat"); + ModelSync_weighttod(model_sync, dm); + return 0; +} + +static int model_sync_initialized(lua_State *L) +{ + ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname); + lua_pushboolean(L, model_sync->initialized_); + return 1; +} + +static int model_sync_setpos(lua_State *L) +{ + ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname); + int pos = luaL_checkinteger(L, 2); + model_sync->pos_ = pos; + return 0; +} + +static int model_sync_threadcount(lua_State *L) +{ + ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname); + lua_pushinteger(L, ModelSync_threadcount(model_sync)); + return 1; +} + +static int model_sync_syncinc(lua_State *L) +{ + ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname); + ModelSync_syncinc(model_sync); + return 0; +} + +static int model_sync_syncdec(lua_State *L) +{ + ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname); + ModelSync_syncdec(model_sync); + return 0; +} +////////////////////////////////////////// + +static int xent_new(lua_State *L) +{ + Xent *xent = NULL; + if(lua_gettop(L) == 0) + { + xent = Xent_new(); + } + else if(lua_gettop(L) == 1) + { + long id = luaL_checklong(L, 1); + xent = Xent_newWithId(id); + } + else if(lua_gettop(L) == 4) + { + size_t frames_, correct_; + double loss_, entropy_ ; + frames_ = luaL_checkinteger(L, 1); + correct_ = luaL_checkinteger(L, 2); + loss_ = luaL_checknumber(L, 3); + entropy_ = luaL_checknumber(L, 4); + xent = Xent_newWithParm(frames_, correct_, loss_, entropy_); + } + else + luaL_error(L, "xent: xent new invalid arguments"); + if(!xent) + luaL_error(L, "xent: xent new failed"); + + luaTHRD_pushudata(L, xent, fastnn_xent_tname); + return 1; +} + +static int xent_tostring(lua_State *L) +{ + char str[STRLEN]; + Xent *xent = luaTHRD_checkudata(L, 1, fastnn_xent_tname); + snprintf(str, STRLEN, "fastnn.xent <%lx>", Xent_id(xent)); + lua_pushstring(L, str); + return 1; +} + +static int xent_totalframes(lua_State *L) +{ + Xent *xent = luaTHRD_checkudata(L, 1, fastnn_xent_tname); + lua_pushinteger(L, xent->frames_); + return 1; +} + +static int xent_correct(lua_State *L) +{ + Xent *xent = luaTHRD_checkudata(L, 1, fastnn_xent_tname); + lua_pushinteger(L, xent->correct_); + return 1; +} + +static int xent_loss(lua_State *L) +{ + Xent *xent = luaTHRD_checkudata(L, 1, fastnn_xent_tname); + lua_pushnumber(L, xent->loss_); + return 1; +} + +static int xent_entropy(lua_State *L) +{ + Xent *xent = luaTHRD_checkudata(L, 1, fastnn_xent_tname); + lua_pushnumber(L, xent->entropy_); + return 1; +} + +static int xent_id(lua_State *L) +{ + Xent *xent = luaTHRD_checkudata(L, 1, fastnn_xent_tname); + lua_pushinteger(L, Xent_id(xent)); + return 1; +} + +static int xent_free(lua_State *L) +{ + Xent *xent = luaTHRD_checkudata(L, 1, fastnn_xent_tname); + Xent_free(xent); + return 0; +} + +static int xent_add(lua_State *L) +{ + Xent *a = luaTHRD_checkudata(L, 1, fastnn_xent_tname); + Xent *b = luaTHRD_checkudata(L, 2, fastnn_xent_tname); + Xent_add(a, b); + return 0; +} + +///////////////////////////////////////////// + +static int mse_new(lua_State *L) +{ + Mse *mse = NULL; + if(lua_gettop(L) == 0) + { + mse = Mse_new(); + } + else if(lua_gettop(L) == 1) + { + long id = luaL_checklong(L, 1); + mse = Mse_newWithId(id); + } + else if(lua_gettop(L) == 2) + { + size_t frames_; + double loss_; + frames_ = luaL_checkinteger(L, 1); + loss_ = luaL_checknumber(L, 2); + mse = Mse_newWithParm(frames_, loss_); + } + else + luaL_error(L, "mse: mse new invalid arguments"); + if(!mse) + luaL_error(L, "mse: mse new failed"); + + luaTHRD_pushudata(L, mse, fastnn_mse_tname); + return 1; + +} + +static int mse_tostring(lua_State *L) +{ + char str[STRLEN]; + Mse *mse = luaTHRD_checkudata(L, 1, fastnn_mse_tname); + snprintf(str, STRLEN, "fastnn.mse <%lx>", Mse_id(mse)); + lua_pushstring(L, str); + return 1; +} + +static int mse_totalframes(lua_State *L) +{ + Mse *mse = luaTHRD_checkudata(L, 1, fastnn_mse_tname); + lua_pushinteger(L, mse->frames_); + return 1; +} + +static int mse_loss(lua_State *L) +{ + Mse *mse = luaTHRD_checkudata(L, 1, fastnn_mse_tname); + lua_pushnumber(L, mse->loss_); + return 1; +} + +static int mse_id(lua_State *L) +{ + Mse *mse = luaTHRD_checkudata(L, 1, fastnn_mse_tname); + lua_pushinteger(L, Mse_id(mse)); + return 1; +} + +static int mse_free(lua_State *L) +{ + Mse *mse = luaTHRD_checkudata(L, 1, fastnn_mse_tname); + Mse_free(mse); + return 0; +} + +static int mse_add(lua_State *L) +{ + Mse *a = luaTHRD_checkudata(L, 1, fastnn_mse_tname); + Mse *b = luaTHRD_checkudata(L, 2, fastnn_mse_tname); + Mse_add(a, b); + return 0; +} +///////////////////////////////////////////// + +static int global_option_new(lua_State *L) +{ + GlobalOption *global_option = NULL; + if(lua_gettop(L) == 0) + { + global_option = GlobalOption_new(); + } + else if(lua_gettop(L) == 1) + { + long id = luaL_checklong(L, 1); + global_option = GlobalOption_newWithId(id); + } + else if(lua_gettop(L) > 3) + { + int batch_size = luaL_checkinteger(L, 1); + float lrate = luaL_checknumber(L, 2); + bool bp = lua_toboolean(L, 3); + const char *tr_scp = lua_tostring(L, 4); + const char *cv_scp = lua_tostring(L, 5); + const char *transf = lua_tostring(L, 6); + const char *network = lua_tostring(L, 7); + global_option = GlobalOption_newWithParm(batch_size, lrate, bp, tr_scp, cv_scp, transf, network); + } + else + luaL_error(L, "global_option: global_option new invalid arguments"); + if(!global_option) + luaL_error(L, "global_option: global_option new failed"); + + luaTHRD_pushudata(L, global_option, fastnn_global_option_tname); + return 1; + +} + +static int global_option_tostring(lua_State *L) +{ + char str[STRLEN]; + GlobalOption *global_option = luaTHRD_checkudata(L, 1, fastnn_global_option_tname); + snprintf(str, STRLEN, "fastnn.global_option <%lx>", GlobalOption_id(global_option)); + lua_pushstring(L, str); + return 1; +} + +static int global_option_id(lua_State *L) +{ + GlobalOption *global_option = luaTHRD_checkudata(L, 1, fastnn_global_option_tname); + lua_pushinteger(L, GlobalOption_id(global_option)); + return 1; +} + +static int global_option_free(lua_State *L) +{ + GlobalOption *global_option = luaTHRD_checkudata(L, 1, fastnn_global_option_tname); + GlobalOption_free(global_option); + return 0; +} + +static int global_option_batch_size(lua_State *L) +{ + GlobalOption *global_option = luaTHRD_checkudata(L, 1, fastnn_global_option_tname); + lua_pushinteger(L, global_option->batch_size); + return 1; +} + +static int global_option_lrate(lua_State *L) +{ + GlobalOption *global_option = luaTHRD_checkudata(L, 1, fastnn_global_option_tname); + lua_pushnumber(L, global_option->lrate); + return 1; +} + +static int global_option_bp(lua_State *L) +{ + GlobalOption *global_option = luaTHRD_checkudata(L, 1, fastnn_global_option_tname); + lua_pushboolean(L, global_option->bp); + return 1; +} + +static int global_option_tr_scp(lua_State *L) +{ + GlobalOption *global_option = luaTHRD_checkudata(L, 1, fastnn_global_option_tname); + lua_pushstring(L, global_option->tr_scp); + return 1; +} + +static int global_option_cv_scp(lua_State *L) +{ + GlobalOption *global_option = luaTHRD_checkudata(L, 1, fastnn_global_option_tname); + lua_pushstring(L, global_option->cv_scp); + return 1; +} + +static int global_option_transf(lua_State *L) +{ + GlobalOption *global_option = luaTHRD_checkudata(L, 1, fastnn_global_option_tname); + lua_pushstring(L, global_option->transf); + return 1; +} + +static int global_option_network(lua_State *L) +{ + GlobalOption *global_option = luaTHRD_checkudata(L, 1, fastnn_global_option_tname); + lua_pushstring(L, global_option->network); + return 1; +} + + +////////////////////////////////////////////// + +static const struct luaL_Reg model_sync__ [] = { + {"new", model_sync_new}, + {"__tostring", model_sync_tostring}, + {"id", model_sync_id}, + {"lockmodel", model_sync_lockmodel}, + {"unlockmodel", model_sync_unlockmodel}, + {"lockstate", model_sync_lockstate}, + {"unlockstate", model_sync_unlockstate}, + {"initbuffer", model_sync_initbuffer}, + {"setpos", model_sync_setpos}, + {"initialized", model_sync_initialized}, + {"weightfromd", model_sync_weightfromd}, + {"weighttod", model_sync_weighttod}, + {"threadcount", model_sync_threadcount}, + {"syncinc", model_sync_syncinc}, + {"syncdec", model_sync_syncdec}, + {"threadcount", model_sync_threadcount}, + {"free", model_sync_free}, + {NULL, NULL} +}; + + +static const struct luaL_Reg xent__ [] = { + {"new", xent_new}, + {"__tostring", xent_tostring}, + {"id", xent_id}, + {"totalframes", xent_totalframes}, + {"correct", xent_correct}, + {"loss", xent_loss}, + {"entropy", xent_entropy}, + {"add", xent_add}, + {"free", xent_free}, + {NULL, NULL} +}; + +static const struct luaL_Reg mse__ [] = { + {"new", mse_new}, + {"__tostring", mse_tostring}, + {"id", mse_id}, + {"totalframes", mse_totalframes}, + {"loss", mse_loss}, + {"add", mse_add}, + {"free", mse_free}, + {NULL, NULL} +}; + + +static const struct luaL_Reg global_option__ [] = { + {"new", global_option_new}, + {"__tostring", global_option_tostring}, + {"id", global_option_id}, + {"batch_size", global_option_batch_size}, + {"lrate", global_option_lrate}, + {"bp", global_option_bp}, + {"tr_scp", global_option_tr_scp}, + {"cv_scp", global_option_cv_scp}, + {"transf", global_option_transf}, + {"network", global_option_network}, + {"free", global_option_free}, + {NULL, NULL} +}; + +void fastnn_init_modelsync(lua_State *L) +{ + luaT_newmetatable(L, fastnn_model_sync_tname, NULL, model_sync_new, NULL, NULL); + luaL_register(L, NULL, model_sync__); + lua_pop(L, 1); + + luaT_newmetatable(L, fastnn_xent_tname, NULL, xent_new, xent_free, NULL); + luaL_register(L, NULL, xent__); + lua_pop(L, 1); + + luaT_newmetatable(L, fastnn_mse_tname, NULL, mse_new, mse_free, NULL); + luaL_register(L, NULL, mse__); + lua_pop(L, 1); + + luaT_newmetatable(L, fastnn_global_option_tname, NULL, global_option_new, global_option_free, NULL); + luaL_register(L, NULL, global_option__); + lua_pop(L, 1); + /* + printf("%s %lx\n", model_sync__[13].name, model_sync__[13].func); + + if(!luaL_newmetatable(L, fastnn_model_sync_tname)) + luaL_error(L, "fastnn: fastnn.modelsync type already exists"); + luaL_setfuncs(L, model_sync__, 0); + lua_pushstring(L, "__index"); + lua_pushvalue(L, -2); + lua_rawset(L, -3); + lua_pop(L, 1); + + printf("%s %lx\n", model_sync__[13].name, model_sync__[13].func); + + lua_pushstring(L, "modelsync"); + luaTHRD_pushctortable(L, model_sync_new, fastnn_model_sync_tname); + lua_rawset(L, -3); + printf("%s %lx\n", model_sync__[13].name, model_sync__[13].func); + */ +} + + diff --git a/fastnn/lib/modelsync.lua b/fastnn/lib/modelsync.lua new file mode 100644 index 0000000..a247562 --- /dev/null +++ b/fastnn/lib/modelsync.lua @@ -0,0 +1,107 @@ + +local C = require 'libfastnn' +local T = require 'libthreads' + +local ModelSync = nerv.class("fastnn.ModelSync") + +fastnn.CModelSync = C.CModelSync +fastnn.Thread = T.Thread + + +function ModelSync:__init(shareid) + self.modelsync = fastnn.CModelSync(shareid) +-- print(self.modelsync.initbuffer) + --print(self.modelsync.setpos) + --print(self.modelsync.initialized) + --print(self.modelsync.weightfromd) +-- print(self.modelsync.weighttod) +-- print(self.modelsync.aaaa) +-- print(self.modelsync.bbbb) +-- print(self.modelsync.cccc) +end + +function ModelSync:GetDim(nnet) + + local repo = nnet:get_params() + local params = repo.params + local dim = 0 + for pid, ref in pairs(params) do + if nerv.is_type(ref.trans, "nerv.Matrix") then + dim = dim + ref.trans:nrow() * ref.trans:nstride() + end + end + + return dim +end + + +function ModelSync:Initialize(nnet) + + self:LockModel() + + if not self.modelsync:initialized() then + dim = self:GetDim(nnet) + self.modelsync:initbuffer(dim) + self:WeightFromD(nnet) + end + + self:UnLockModel() +end + +function ModelSync:WeightFromD(nnet) + local repo = nnet:get_params() + local params = repo.params + self.modelsync:setpos(0) + for pid, ref in pairs(params) do + if nerv.is_type(ref.trans, "nerv.Matrix") then + self.modelsync:weightfromd(ref.trans) + end + end +end + +function ModelSync:WeightToD(nnet) + local repo = nnet:get_params() + local params = repo.params + self.modelsync:setpos(0) + for pid, ref in pairs(params) do + if nerv.is_type(ref.trans, "nerv.Matrix") then + self.modelsync:weighttod(ref.trans) + end + end +end + +function ModelSync:LockState() + self.modelsync:lockstate() +end + +function ModelSync:UnLockState() + self.modelsync:unlockstate() +end + + +function ModelSync:LockModel() + self.modelsync:lockmodel() +end + + +function ModelSync:UnLockModel() + self.modelsync:unlockmodel() +end + +function ModelSync:Id() + return self.modelsync:id() +end + +function ModelSync:ThreadCount() + return self.modelsync:threadcount() +end + +function ModelSync:SyncInc() + return self.modelsync:syncinc() +end + +function ModelSync:SyncDec() + return self.modelsync:syncdec() +end + + diff --git a/fastnn/test.lua b/fastnn/test.lua new file mode 100644 index 0000000..3b56eb1 --- /dev/null +++ b/fastnn/test.lua @@ -0,0 +1,57 @@ +require 'fastnn' + +function build_trainer(ifname) + local param_repo = nerv.ParamRepo() + param_repo:import(ifname, nil, gconf) + local sublayer_repo = make_sublayer_repo(param_repo) + local layer_repo = make_layer_repo(sublayer_repo, param_repo) + local nnet = get_network(layer_repo) + local input_order = get_input_order() + + + local iterative_trainer = function (prefix, scp_file, bp) + + -- build buffer + local buffer = make_buffer(make_readers(scp_file, layer_repo)) + + --[[local control = fastnn.modelsync(); + local lua_control = fastnn.ModelSync(control:id()) + print(control:__tostring()) + print(lua_control:GetDim(nnet)) + lua_control:Initialize(nnet) + lua_control:WeightToD(nnet) + lua_control:WeightToD(nnet) + ]] + + local example_repo = fastnn.CExamplesRepo(128, false) + -- print(example_repo) + local share_repo = fastnn.CExamplesRepo(example_repo:id(), true) + feat_id = get_feat_id() + + local t = 1; + for data in buffer.get_data, buffer do + local example = fastnn.Example.PrepareData(data, layer_repo.global_transf, feat_id) + print(example) + share_repo:accept(example) + end + + end + return iterative_trainer +end + + +dofile(arg[1]) +start_halving_inc = 0.5 +halving_factor = 0.6 +end_halving_inc = 0.1 +min_iter = 1 +max_iter = 20 +min_halving = 5 +gconf.batch_size = 256 +gconf.buffer_size = 81920 + +local pf0 = gconf.initialized_param +local trainer = build_trainer(pf0) + +local accu_best = trainer(nil, gconf.cv_scp, false) + diff --git a/fastnn/threads/Makefile b/fastnn/threads/Makefile new file mode 100644 index 0000000..4205adc --- /dev/null +++ b/fastnn/threads/Makefile @@ -0,0 +1,43 @@ +.PHONY: build install clean +SHELL := /bin/bash +BUILD_DIR := $(CURDIR)/build +INC_PATH := $(LUA_BINDIR)/../include/nerv/luaT +LUA_DIR = $(INST_LUADIR)/threads +LIB_PATH := $(LUA_BINDIR)/../lib + +OBJ_DIR := $(BUILD_DIR)/objs +SUBDIR := lib test + + +OBJS := lib/init.o lib/threads.o lib/THThread.o +LIBS := $(INST_LIBDIR)/libthreads.so +LUA_LIBS := init.lua threads.lua test/test-threads.lua +INCLUDE := -I $(LUA_INCDIR) -I $(INC_PATH) -DLUA_USE_APICHECK -DUSE_PTHREAD_THREADS + + +OBJS := $(addprefix $(OBJ_DIR)/,$(OBJS)) +OBJ_SUBDIR := $(addprefix $(OBJ_DIR)/,$(SUBDIR)) +LUA_SUBDIR := $(addprefix $(LUA_DIR)/,$(SUBDIR)) +LUA_LIBS := $(addprefix $(LUA_DIR)/,$(LUA_LIBS)) + +CFLAGS := -Wall -Wextra -O0 + +build: $(OBJ_DIR) $(OBJ_SUBDIR) $(OBJS) +install: $(LUA_DIR) $(LUA_SUBDIR) $(LIBS) $(LUA_LIBS) + +$(OBJ_DIR) $(LUA_DIR) $(OBJ_SUBDIR) $(LUA_SUBDIR): + -mkdir -p $@ +$(LUA_DIR)/%.lua: %.lua + cp $< $@ + +$(OBJ_DIR)/%.o: %.c $(patsubst /%.o,/%.c,$@) + gcc -c -o $@ $< $(INCLUDE) -fPIC $(CFLAGS) + +$(LIBS): $(OBJS) + gcc -shared -o $@ $^ $(LDFLAGS) -Wl,-rpath=$(LIB_PATH) -L$(LIB_PATH) -lnervcore -lluaT -lpthread + cp $@ $(LIB_PATH)/ + +clean: + -rm -rf $(OBJ_DIR) + + diff --git a/fastnn/threads/init.lua b/fastnn/threads/init.lua new file mode 100644 index 0000000..6cd3679 --- /dev/null +++ b/fastnn/threads/init.lua @@ -0,0 +1,14 @@ +threads = {} + +local C = require 'libthreads' + +threads.Thread = C.Thread +threads.Mutex = C.Mutex +threads.Condition = C.Condition +threads.Semaphore = C.Semaphore +--threads.Threads = require 'threads.threads' + +-- only for backward compatibility (boo) +-- setmetatable(threads, getmetatable(threads.Threads)) + +-- return threads diff --git a/fastnn/threads/lib/THThread.c b/fastnn/threads/lib/THThread.c new file mode 100644 index 0000000..7abc044 --- /dev/null +++ b/fastnn/threads/lib/THThread.c @@ -0,0 +1,349 @@ +#ifndef TH_THREAD_INC +#define TH_THREAD_INC + +#include <stdlib.h> +#include <string.h> + +/*#include "TH.h" */ +#include "THThread.h" + +#if defined(USE_PTHREAD_THREADS) +#include <pthread.h> + +#elif defined(USE_WIN32_THREADS) + +/* very basic emulation to suit our needs */ + +#include <process.h> +#include <windows.h> + +typedef HANDLE pthread_t; +typedef DWORD pthread_attr_t; +typedef HANDLE pthread_mutex_t; +typedef HANDLE pthread_cond_t; + +static int pthread_create(pthread_t *restrict thread, + const pthread_attr_t *restrict attr, void *(*start_routine)(void *), + void *restrict arg) +{ + *thread = (HANDLE)_beginthreadex(NULL, 0, (THREAD_FUNCTION)start_routine, arg, 0, NULL); + return (int)(*thread == NULL); +} + +static int pthread_join(pthread_t thread, void **value_ptr) +{ + return ((WaitForSingleObject((thread), INFINITE) != WAIT_OBJECT_0) || !CloseHandle(thread)); +} + +static int pthread_mutex_init(pthread_mutex_t *restrict mutex, + const pthread_mutexattr_t *restrict attr) +{ + *mutex = CreateMutex(NULL, FALSE, NULL); + return (int)(*mutex == NULL); +} + +static int pthread_mutex_lock(pthread_mutex_t *mutex) +{ + return WaitForSingleObject(*mutex, INFINITE) == 0; +} + +static int pthread_mutex_unlock(pthread_mutex_t *mutex) +{ + return ReleaseMutex(*mutex) == 0; +} + +static int pthread_mutex_destroy(pthread_mutex_t *mutex) +{ + return CloseHandle(*mutex) == 0; +} + +static int pthread_cond_init(pthread_cond_t *restrict cond, + const pthread_condattr_t *restrict attr) +{ + *cond = CreateEvent(NULL, FALSE, FALSE, NULL); + return (int)(*cond == NULL); +} + +static int pthread_cond_wait(pthread_cond_t *restrict cond, + pthread_mutex_t *restrict mutex) +{ + SignalObjectAndWait(*mutex, *cond, INFINITE, FALSE); + return WaitForSingleObject(*mutex, INFINITE) == 0; +} + +static int pthread_cond_destroy(pthread_cond_t *cond) +{ + return CloseHandle(*cond) == 0; +} + +int pthread_cond_signal(pthread_cond_t *cond) +{ + return SetEvent(*cond) == 0; +} + +#else +#error no thread system available +#endif + +typedef struct THThread_ { + pthread_t id; + int (*func)(void*); + void* data; + int status; +} THThread; + +typedef struct THMutex_{ + pthread_mutex_t id; + int refcount; +} THMutex; + +typedef struct THCondition_ { + pthread_cond_t id; + int refcount; +} THCondition; + +typedef struct THSemaphore_ { + pthread_mutex_t mutex_; + pthread_cond_t cond_; + int counter_; + int refcount; +}THSemaphore; + +static void* thread_closure(void *data) +{ + THThread *thread = data; + thread->status = thread->func(thread->data); + return NULL; +} + +THThread* THThread_new(int (*func)(void*), void *data) +{ + THThread *self = malloc(sizeof(THThread)); + self->func = func; + self->data = data; + self->status = 0; + if(!self) + return NULL; + if(pthread_create(&self->id, NULL, thread_closure, self)) { + free(self); + return NULL; + } + return self; +} + +long THThread_id(THThread *self) +{ + return (long)(self); +} + +int THThread_free(THThread *self) +{ + int status = 1; + if(self) { + if(pthread_join(self->id, NULL)) + return 1; + status = self->status; + free(self); + self = NULL; + } + return status; +} + +THMutex* THMutex_new(void) +{ + THMutex *self = malloc(sizeof(THMutex)); + if(!self) + return NULL; + if(pthread_mutex_init(&self->id, NULL) != 0) { + free(self); + return NULL; + } + self->refcount = 1; + return self; +} + +THMutex* THMutex_newWithId(long id) +{ + THMutex *self = (THMutex*)id; + __sync_fetch_and_add(&self->refcount, 1); + return self; +} + +long THMutex_id(THMutex *self) +{ + return (long)(self); +} + +int THMutex_lock(THMutex *self) +{ + if(pthread_mutex_lock(&self->id) != 0) + return 1; + return 0; +} + +int THMutex_trylock(THMutex *self) +{ + if (pthread_mutex_trylock(&self->id) != 0) + return 1; + return 0; +} + +int THMutex_unlock(THMutex *self) +{ + if(pthread_mutex_unlock(&self->id) != 0) + return 1; + return 0; +} + +void THMutex_free(THMutex *self) +{ + if(self) { + if(__sync_fetch_and_add(&self->refcount, -1) == 1) { + pthread_mutex_destroy(&self->id); + free(self); + self = NULL; + } + } +} + +THCondition* THCondition_new(void) +{ + THCondition *self = malloc(sizeof(THCondition)); + if(!self) + return NULL; + if(pthread_cond_init(&self->id, NULL)) { + free(self); + return NULL; + } + self->refcount = 1; + return self; +} + +THCondition* THCondition_newWithId(long id) +{ + THCondition *self = (THCondition*)id; + __sync_fetch_and_add(&self->refcount, 1); + return self; +} + +long THCondition_id(THCondition *self) +{ + return (long)(self); +} + +int THCondition_signal(THCondition *self) +{ + if(pthread_cond_signal(&self->id)) + return 1; + return 0; +} + +int THCondition_wait(THCondition *self, THMutex *mutex) +{ + if(pthread_cond_wait(&self->id, &mutex->id)) + return 1; + return 0; +} + +void THCondition_free(THCondition *self) +{ + if(self) { + if(__sync_fetch_and_add(&self->refcount, -1) == 1) { + pthread_cond_destroy(&self->id); + free(self); + self = NULL; + } + } +} + +THSemaphore* THSemaphore_new(int initValue) +{ + THSemaphore *self = malloc(sizeof(THSemaphore)); + if(!self) + return NULL; + self->counter_ = initValue; + if(pthread_mutex_init(&self->mutex_, NULL) != 0) + { + free(self); + return NULL; + } + + if(pthread_cond_init(&self->cond_, NULL) != 0) + { + free(self); + return NULL; + } + + self->refcount = 1; + + return self; +} + +THSemaphore* THSemaphore_newWithId(long id) +{ + THSemaphore *self = (THSemaphore*)id; + __sync_fetch_and_add(&self->refcount, 1); + return self; +} + +long THSemaphore_id(THSemaphore *self) +{ + return (long)(self); +} + +int THSemaphore_wait(THSemaphore *self) +{ + int ret = 0; + ret |= pthread_mutex_lock(&self->mutex_); + + while (self->counter_ <= 0) + ret |= pthread_cond_wait(&self->cond_, &self->mutex_); + + self->counter_--; + ret |= pthread_mutex_unlock(&self->mutex_); + + return ret; + +} + +int THSemaphore_signal(THSemaphore *self) +{ + int ret = 0; + ret |= pthread_mutex_lock(&self->mutex_); + self->counter_++; + ret |= pthread_cond_signal(&self->cond_); + ret |= pthread_mutex_unlock(&self->mutex_); + + return ret; +} + +int THSemaphore_trywait(THSemaphore *self) +{ + int ret = 0; + int try_wait_succeeded = 1; + ret |= pthread_mutex_lock(&self->mutex_); + if (self->counter_ > 0) { + self->counter_--; + try_wait_succeeded = 0; + } + ret |= pthread_mutex_unlock(&self->mutex_); + return try_wait_succeeded; +} + +void THSemaphore_free(THSemaphore *self) +{ + if(self) + { + if(__sync_fetch_and_add(&self->refcount, -1) == 1) + { + pthread_mutex_destroy(&self->mutex_); + pthread_cond_destroy(&self->cond_); + free(self); + self = NULL; + } + } + } + + + +#endif diff --git a/fastnn/threads/lib/THThread.h b/fastnn/threads/lib/THThread.h new file mode 100644 index 0000000..75ba417 --- /dev/null +++ b/fastnn/threads/lib/THThread.h @@ -0,0 +1,36 @@ +#ifndef TH_THREAD_INC +#define TH_THREAD_INC + +typedef struct THThread_ THThread; +typedef struct THMutex_ THMutex; +typedef struct THCondition_ THCondition; +typedef struct THSemaphore_ THSemaphore; + +THThread* THThread_new(int (*closure)(void*), void *data); +long THThread_id(THThread *self); +int THThread_free(THThread *self); + +THMutex* THMutex_new(void); +THMutex* THMutex_newWithId(long id); +long THMutex_id(THMutex *self); +int THMutex_lock(THMutex *self); +int THMutex_unlock(THMutex *self); +int THMutex_trylock(THMutex *self); +void THMutex_free(THMutex *self); + +THCondition* THCondition_new(void); +THCondition* THCondition_newWithId(long id); +long THCondition_id(THCondition *self); +int THCondition_signal(THCondition *self); +int THCondition_wait(THCondition *self, THMutex *mutex); +void THCondition_free(THCondition *self); + +THSemaphore* THSemaphore_new(int initValue); +THSemaphore* THSemaphore_newWithId(long id); +long THSemaphore_id(THSemaphore *self); +int THSemaphore_signal(THSemaphore *self); +int THSemaphore_wait(THSemaphore *self); +int THSemaphore_trywait(THSemaphore *self); +void THSemaphore_free(THSemaphore *self); + +#endif diff --git a/fastnn/threads/lib/init.c b/fastnn/threads/lib/init.c new file mode 100644 index 0000000..4042189 --- /dev/null +++ b/fastnn/threads/lib/init.c @@ -0,0 +1,27 @@ +#include <lua.h> +#include <lauxlib.h> + +#if LUA_VERSION_NUM == 501 +static void luaL_setfuncs(lua_State *L, const luaL_Reg *l, int nup) +{ + luaL_checkstack(L, nup+1, "too many upvalues"); + for (; l->name != NULL; l++) { /* fill the table with given functions */ + int i; + lua_pushstring(L, l->name); + for (i = 0; i < nup; i++) /* copy upvalues to the top */ + lua_pushvalue(L, -(nup+1)); + lua_pushcclosure(L, l->func, nup); /* closure with those upvalues */ + lua_settable(L, -(nup + 3)); + } + lua_pop(L, nup); /* remove upvalues */ +} +#endif + +#include "threads.c" + +int luaopen_libthreads(lua_State *L) +{ + lua_newtable(L); + thread_init_pkg(L); + return 1; +} diff --git a/fastnn/threads/lib/luaTHRD.h b/fastnn/threads/lib/luaTHRD.h new file mode 100644 index 0000000..3760bdb --- /dev/null +++ b/fastnn/threads/lib/luaTHRD.h @@ -0,0 +1,84 @@ +#ifndef LUA_THRD_INC +#define LUA_THRD_INC + +static int luaTHRD_pushudata(lua_State *L, void *ptr, const char* typename) +{ + void **udata = lua_newuserdata(L, sizeof(void*)); + if(udata) { + *udata = ptr; + luaL_getmetatable(L, typename); + lua_setmetatable(L, -2); + return 1; + } + return 0; +} + +static void *luaTHRD_checkudata(lua_State *L, int narg, const char *typename) +{ + void **udata = luaL_checkudata(L, narg, typename); + if(udata) + return *udata; + else + return NULL; +} + +static void *luaTHRD_toudata(lua_State *L, int narg, const char *typename) +{ + void **udata = lua_touserdata(L, narg); + if(udata) { + if(lua_getmetatable(L, -1)) { + luaL_getmetatable(L, typename); + if(lua_equal(L, -1, -2)) { + lua_pop(L, 2); + return *udata; + } + else { + lua_pop(L, 2); + return NULL; + } + } + else + return NULL; + } + else + return NULL; +} + +static int luaTHRD_ctor(lua_State *L) +{ + if(!lua_istable(L, 1)) /* dummy ctor table */ + luaL_error(L, "ctor: table expected"); + lua_getmetatable(L, 1); + lua_remove(L, 1); /* dummy ctor table */ + if(!lua_istable(L, -1)) + luaL_error(L, "ctor: no metatable found"); + lua_pushstring(L, "__new"); + lua_rawget(L, -2); + lua_remove(L, -2); /* forget about metatable */ + if(!lua_isfunction(L, -1)) + luaL_error(L, "ctor: __new appears to be not a function"); + lua_insert(L, 1); /* ctor first, arguments follow */ + lua_call(L, lua_gettop(L)-1, LUA_MULTRET); + return lua_gettop(L); +} + +static void luaTHRD_pushctortable(lua_State *L, lua_CFunction ctor, const char* typename) +{ + lua_newtable(L); /* empty useless dude */ + lua_newtable(L); /* metatable of the dude */ + lua_pushstring(L, "__index"); + luaL_getmetatable(L, typename); + lua_rawset(L, -3); + lua_pushstring(L, "__newindex"); + luaL_getmetatable(L, typename); + lua_rawset(L, -3); + lua_pushstring(L, "__new"); /* __call will look into there */ + lua_pushcfunction(L, ctor); + lua_rawset(L, -3); + lua_pushstring(L, "__call"); /* pop the table and calls __new */ + lua_pushcfunction(L, luaTHRD_ctor); + lua_rawset(L, -3); + lua_setmetatable(L, -2); +} + +#endif diff --git a/fastnn/threads/lib/threads.c b/fastnn/threads/lib/threads.c new file mode 100644 index 0000000..bf2a5ec --- /dev/null +++ b/fastnn/threads/lib/threads.c @@ -0,0 +1,355 @@ +#include <stdio.h> +#include <stdlib.h> +#include <luaT.h> +#include <string.h> + +#include "THThread.h" +#include "luaTHRD.h" + +#include <lua.h> +#include <lualib.h> + +static int newthread(void *code_) +{ + char *code = code_; + lua_State *L = luaL_newstate(); + + if(!L) { + printf("THREAD FATAL ERROR: could not create lua state\n"); + return -1; + } + luaL_openlibs(L); + + if(luaL_loadstring(L, code)) { + printf("FATAL THREAD PANIC: (loadstring) %s\n", lua_tolstring(L, -1, NULL)); + free(code); + lua_close(L); + return -1; + } + free(code); + if(lua_pcall(L, 0, 0, 0)) { + printf("FATAL THREAD PANIC: (pcall) %s\n", lua_tolstring(L, -1, NULL)); + lua_close(L); + return -1; + } + + lua_close(L); + return 0; +} + +static int thread_new(lua_State *L) +{ + THThread *thread = NULL; + size_t len = 0; + const char *code = luaL_checklstring(L, 1, &len); + char *code_dup = malloc(len+1); + if(!code_dup) + luaL_error(L, "threads: out of memory"); + memcpy(code_dup, code, len+1); + + thread = THThread_new(newthread, (void*)code_dup); + if(!thread) + luaL_error(L, "threads: thread new failed"); + luaTHRD_pushudata(L, thread, "threads.Thread"); + + return 1; +} + +static int thread_tostring(lua_State *L) +{ + char str[128]; + THThread *thread = luaTHRD_checkudata(L, 1, "threads.Thread"); + snprintf(str, 128, "threads.Thread <%lx>", THThread_id(thread)); + lua_pushstring(L, str); + return 1; +} + +static int thread_id(lua_State *L) +{ + THThread *thread = luaTHRD_checkudata(L, 1, "threads.Thread"); + lua_pushinteger(L, THThread_id(thread)); + return 1; +} + +static int thread_free(lua_State *L) +{ + THThread *thread = luaTHRD_checkudata(L, 1, "threads.Thread"); + THThread_free(thread); + return 0; +} + +static int mutex_new(lua_State *L) +{ + THMutex *mutex = NULL; + if(lua_gettop(L) == 0) { + mutex = THMutex_new(); + } + else if(lua_gettop(L) == 1) { + long id = luaL_checklong(L, 1); + mutex = THMutex_newWithId(id); + } + else + luaL_error(L, "threads: mutex new invalid arguments"); + if(!mutex) + luaL_error(L, "threads: mutex new failed"); + luaTHRD_pushudata(L, mutex, "threads.Mutex"); + return 1; +} + +static int mutex_tostring(lua_State *L) +{ + char str[128]; + THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex"); + snprintf(str, 128, "threads.Mutex <%lx>", THMutex_id(mutex)); + lua_pushstring(L, str); + return 1; +} + +static int mutex_id(lua_State *L) +{ + THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex"); + lua_pushinteger(L, THMutex_id(mutex)); + return 1; +} + +static int mutex_lock(lua_State *L) +{ + THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex"); + if(THMutex_lock(mutex)) + luaL_error(L, "threads: mutex lock failed"); + return 0; +} + +static int mutex_unlock(lua_State *L) +{ + THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex"); + if(THMutex_unlock(mutex)) + luaL_error(L, "threads: mutex unlock failed"); + return 0; +} + +static int mutex_free(lua_State *L) +{ + THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex"); + THMutex_free(mutex); + return 0; +} + +static int condition_new(lua_State *L) +{ + THCondition *condition = NULL; + if(lua_gettop(L) == 0) { + condition = THCondition_new(); + } + else if(lua_gettop(L) == 1) { + long id = luaL_checklong(L, 1); + condition = THCondition_newWithId(id); + } + else + luaL_error(L, "threads: condition new invalid arguments"); + if(!condition) + luaL_error(L, "threads: condition new failed"); + luaTHRD_pushudata(L, condition, "threads.Condition"); + return 1; +} + +static int condition_tostring(lua_State *L) +{ + char str[128]; + THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition"); + snprintf(str, 128, "threads.Condition <%lx>", THCondition_id(condition)); + lua_pushstring(L, str); + return 1; +} + +static int condition_id(lua_State *L) +{ + THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition"); + lua_pushinteger(L, THCondition_id(condition)); + return 1; +} + +static int condition_free(lua_State *L) +{ + THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition"); + if(!condition) + luaL_error(L, "threads: condition free failed"); + THCondition_free(condition); + return 0; +} + +static int condition_signal(lua_State *L) +{ + THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition"); + if(THCondition_signal(condition)) + luaL_error(L, "threads: condition signal failed"); + return 0; +} + +static int condition_wait(lua_State *L) +{ + THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition"); + THMutex *mutex = luaTHRD_checkudata(L, 2, "threads.Mutex"); + if(THCondition_wait(condition, mutex)) + luaL_error(L, "threads: condition wait failed"); + return 0; +} + +static int semaphore_new(lua_State *L) +{ + THSemaphore* semaphore = NULL; + if(lua_gettop(L) == 1) { + int initValue = luaL_checkinteger(L, 1); + semaphore = THSemaphore_new(initValue); + } + else + luaL_error(L, "threads: semaphore new invalid arguments"); + if (!semaphore) + luaL_error(L, "threads: semaphore new failed"); + luaTHRD_pushudata(L, semaphore, "threads.Semaphore"); + return 1; +} + +static int semaphore_newfromid(lua_State *L) +{ + THSemaphore* semaphore = NULL; + if(lua_gettop(L) == 1) { + long id = luaL_checklong(L, 1); + semaphore = THSemaphore_newWithId(id); + } + else + luaL_error(L, "threads: semaphore new invalid arguments"); + if(!semaphore) + luaL_error(L, "threads: semaphore new failed"); + luaTHRD_pushudata(L, semaphore, "threads.Semaphore"); + return 1; +} + +static int semaphore_id(lua_State *L) +{ + THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore"); + lua_pushinteger(L, THSemaphore_id(semaphore)); + return 1; +} + +static int semaphore_signal(lua_State *L) +{ + THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore"); + if(THSemaphore_signal(semaphore)) + luaL_error(L, "threads: semaphore signal failed"); + return 0; +} + +static int semaphore_wait(lua_State *L) +{ + THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore"); + if(THSemaphore_wait(semaphore)) + luaL_error(L, "threads: semaphore wait failed"); + return 0; +} + +static int semaphore_trywait(lua_State *L) +{ + THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore"); + int wait = THSemaphore_trywait(semaphore); + lua_pushinteger(L, wait); + return 1; +} + +static void semaphore_free(lua_State *L) +{ + THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore"); + if(!semaphore) + luaL_error(L, "threads: semaphore free failed"); + THSemaphore_free(semaphore); +} + +static const struct luaL_Reg thread__ [] = { + {"new", thread_new}, + {"__tostring", thread_tostring}, + {"id", thread_id}, + {"free", thread_free}, + {NULL, NULL} +}; + +static const struct luaL_Reg mutex__ [] = { + {"new", mutex_new}, + {"__tostring", mutex_tostring}, + {"id", mutex_id}, + {"lock", mutex_lock}, + {"unlock", mutex_unlock}, + {"free", mutex_free}, + {NULL, NULL} +}; + +static const struct luaL_Reg condition__ [] = { + {"new", condition_new}, + {"__tostring", condition_tostring}, + {"id", condition_id}, + {"signal", condition_signal}, + {"wait", condition_wait}, + {"free", condition_free}, + {NULL, NULL} +}; + +static const struct luaL_Reg semaphore__ [] = { + {"new", semaphore_new}, + {"newfromid", semaphore_newfromid}, + {"id", semaphore_id}, + {"signal", semaphore_signal}, + {"wait", semaphore_wait}, + {"trywait", semaphore_trywait}, + {"free", semaphore_free}, + {NULL, NULL} +}; + +static void thread_init_pkg(lua_State *L) +{ + if(!luaL_newmetatable(L, "threads.Thread")) + luaL_error(L, "threads: threads.Thread type already exists"); + luaL_setfuncs(L, thread__, 0); + lua_pushstring(L, "__index"); + lua_pushvalue(L, -2); + lua_rawset(L, -3); + lua_pop(L, 1); + + if(!luaL_newmetatable(L, "threads.Mutex")) + luaL_error(L, "threads: threads.Mutex type already exists"); + luaL_setfuncs(L, mutex__, 0); + lua_pushstring(L, "__index"); + lua_pushvalue(L, -2); + lua_rawset(L, -3); + lua_pop(L, 1); + + if(!luaL_newmetatable(L, "threads.Condition")) + luaL_error(L, "threads: threads.Condition type already exists"); + luaL_setfuncs(L, condition__, 0); + lua_pushstring(L, "__index"); + lua_pushvalue(L, -2); + lua_rawset(L, -3); + lua_pop(L, 1); + + if(!luaL_newmetatable(L, "threads.Semaphore")) + luaL_error(L, "threads: threads.Semaphore type already exists"); + luaL_setfuncs(L, semaphore__, 0); + lua_pushstring(L, "__index"); + lua_pushvalue(L, -2); + lua_rawset(L, -3); + lua_pop(L, 1); + + lua_pushstring(L, "Thread"); + luaTHRD_pushctortable(L, thread_new, "threads.Thread"); + lua_rawset(L, -3); + + lua_pushstring(L, "Mutex"); + luaTHRD_pushctortable(L, mutex_new, "threads.Mutex"); + lua_rawset(L, -3); + + lua_pushstring(L, "Condition"); + luaTHRD_pushctortable(L, condition_new, "threads.Condition"); + lua_rawset(L, -3); + + lua_pushstring(L, "Semaphore"); + luaTHRD_pushctortable(L, semaphore_new, "threads.Semaphore"); + lua_rawset(L, -3); +} diff --git a/fastnn/threads/test/test-low-level.lua b/fastnn/threads/test/test-low-level.lua new file mode 100644 index 0000000..aea31db --- /dev/null +++ b/fastnn/threads/test/test-low-level.lua @@ -0,0 +1,39 @@ +local t = require 'libthreads' + +local m = t.Mutex() +local c = t.Condition() +print(string.format('| main thread mutex: 0x%x', m:id())) +print(string.format('| main thread condition: 0x%x', c:id())) + +local code = string.format([[ + local t = require 'libthreads' + local c = t.Condition(%d) + print('|| hello from thread') + print(string.format('|| thread condition: 0x%%x', c:id())) + print('|| doing some stuff in thread...') + local x = 0 + for i=1,10000 do + for i=1,10000 do + x = x + math.sin(i) + end + x = x / 10000 + end + print(string.format('|| ...ok (x=%%f)', x)) + c:signal() +]], c:id()) + +print(code) + +local thread = t.Thread(code) + + +print('| waiting for thread...') +m:lock() +c:wait(m) +print('| thread finished!') + +thread:free() +m:free() +c:free() + +print('| done') diff --git a/fastnn/threads/test/test-threads-async.lua b/fastnn/threads/test/test-threads-async.lua new file mode 100644 index 0000000..68bcd35 --- /dev/null +++ b/fastnn/threads/test/test-threads-async.lua @@ -0,0 +1,66 @@ +local threads = require 'threads' + +local nthread = 4 +local njob = 100 + +local pool = threads.Threads( + nthread, + function(threadid) + print('starting a new thread/state number ' .. threadid) + end +) + + +local jobid = 0 +local result -- DO NOT put this in get +local function get() + + -- fill up the queue as much as we can + -- this will not block + while jobid < njob and pool:acceptsjob() do + jobid = jobid + 1 + + pool:addjob( + function(jobid) + print(string.format('thread ID %d is performing job %d', __threadid, jobid)) + return string.format("job output from job %d", jobid) + end, + + function(jobres) + result = jobres + end, + + jobid + ) + end + + -- is there still something to do? + if pool:hasjob() then + pool:dojob() -- yes? do it! + if pool:haserror() then -- check for errors + pool:synchronize() -- finish everything and throw error + end + return result + end + +end + +local jobdone = 0 +repeat + -- get something asynchronously + local res = get() + + -- do something with res (if any) + if res then + print(res) + jobdone = jobdone + 1 + end + +until not res -- until there is nothing remaining + +assert(jobid == 100) +assert(jobdone == 100) + +print('PASSED') + +pool:terminate() diff --git a/fastnn/threads/test/test-threads-multiple.lua b/fastnn/threads/test/test-threads-multiple.lua new file mode 100644 index 0000000..4429696 --- /dev/null +++ b/fastnn/threads/test/test-threads-multiple.lua @@ -0,0 +1,15 @@ +local threads = require 'threads' + +for i=1,1000 do + io.write(string.format('%04d.', tonumber(i))) + io.flush() + local pool = + threads.Threads( + 4, + function(threadid) + require 'torch' + end + ) +end +print() +print('PASSED') diff --git a/fastnn/threads/test/test-threads-shared.lua b/fastnn/threads/test/test-threads-shared.lua new file mode 100644 index 0000000..3d63851 --- /dev/null +++ b/fastnn/threads/test/test-threads-shared.lua @@ -0,0 +1,111 @@ +require 'torch' + +local threads = require 'threads' +local status, tds = pcall(require, 'tds') +tds = status and tds or nil + +local nthread = 4 +local njob = 10 +local msg = "hello from a satellite thread" + +threads.Threads.serialization('threads.sharedserialize') + +local x = {} +local xh = tds and tds.hash() or {} +local xs = {} +local z = tds and tds.hash() or {} +local D = 10 +local K = tds and 100000 or 100 -- good luck in non-shared (30M) +for i=1,njob do + x[i] = torch.ones(D) + xh[i] = torch.ones(D) + xs[i] = torch.FloatStorage(D):fill(1) + for j=1,K do + z[(i-1)*K+j] = "blah" .. i .. j + end +end +collectgarbage() +collectgarbage() + +print('GO') + +local pool = threads.Threads( + nthread, + function(threadIdx) + pcall(require, 'tds') + print('starting a new thread/state number:', threadIdx) + gmsg = msg -- we copy here an upvalue of the main thread + end +) + +local jobdone = 0 +for i=1,njob do + pool:addjob( + function() + assert(x[i]:sum() == D) + assert(xh[i]:sum() == D) + assert(torch.FloatTensor(xs[i]):sum() == D) + for j=1,K do + assert(z[(i-1)*K+j] == "blah" .. i .. j) + end + x[i]:add(1) + xh[i]:add(1) + torch.FloatTensor(xs[i]):add(1) + print(string.format('%s -- thread ID is %x', gmsg, __threadid)) + collectgarbage() + collectgarbage() + return __threadid + end, + + function(id) + print(string.format("task %d finished (ran on thread ID %x)", i, id)) + jobdone = jobdone + 1 + end + ) +end + +for i=1,njob do + pool:addjob( + function() + collectgarbage() + collectgarbage() + end + ) +end + +pool:synchronize() + +print(string.format('%d jobs done', jobdone)) + +pool:terminate() + +-- did we do the job in shared mode? +for i=1,njob do + assert(x[i]:sum() == 2*D) + assert(xh[i]:sum() == 2*D) + assert(torch.FloatTensor(xs[i]):sum() == 2*D) +end + +-- serialize and zero x +local str = torch.serialize(x) +local strh = torch.serialize(xh) +local strs = torch.serialize(xs) +for i=1,njob do + x[i]:zero() + xh[i]:zero() + xs[i]:fill(0) +end + +-- dude, check that unserialized x does not point on x +local y = torch.deserialize(str) +local yh = torch.deserialize(strh) +local ys = torch.deserialize(strs) +for i=1,njob do + assert(y[i]:sum() == 2*D) + assert(yh[i]:sum() == 2*D) + assert(torch.FloatTensor(ys[i]):sum() == 2*D) +end + +pool:terminate() + +print('PASSED') diff --git a/fastnn/threads/test/test-threads.lua b/fastnn/threads/test/test-threads.lua new file mode 100644 index 0000000..e49a381 --- /dev/null +++ b/fastnn/threads/test/test-threads.lua @@ -0,0 +1,20 @@ +local clib = require 'libthreads' + + +nthread = 1 + +str='ad;alkfakd;af' +code = [[ function func(str) print(str) end; print(str);]] +print(code) + +--thread = clib.Thread(code) + + +--thread:free() + + +require 'threads' + +tt = threads.Thread(code) + + diff --git a/fastnn/threads/threads-scm-1.rockspec b/fastnn/threads/threads-scm-1.rockspec new file mode 100644 index 0000000..02535d4 --- /dev/null +++ b/fastnn/threads/threads-scm-1.rockspec @@ -0,0 +1,36 @@ +package = "threads" +version = "scm-1" +source = { + url = "https://github.com/uphantom/nerv-threads.git" +} +description = { + summary = "Thread control for Nerv fastnn", + detailed = [[ + ]], + homepage = "https://github.com/uphantom/nerv-threads", + license = "BSD" +} +dependencies = { + "nerv >= scm-1", + "lua >= 5.1" +} +build = { + type = "make", + build_variables = { + CFLAGS="$(CFLAGS)", + LIBFLAG="$(LIBFLAG)", + LUA_LIBDIR="$(LUA_LIBDIR)", + LUA_BINDIR="$(LUA_BINDIR)", + LUA_INCDIR="$(LUA_INCDIR)", + INST_PREFIX="$(PREFIX)", + LUA="$(LUA)", + }, + install_variables = { + LUA_BINDIR="$(LUA_BINDIR)", + INST_PREFIX="$(PREFIX)", + INST_BINDIR="$(BINDIR)", + INST_LIBDIR="$(LIBDIR)", + INST_LUADIR="$(LUADIR)", + INST_CONFDIR="$(CONFDIR)", + }, +} diff --git a/fastnn/threads/threads.lua b/fastnn/threads/threads.lua new file mode 100644 index 0000000..2db7d51 --- /dev/null +++ b/fastnn/threads/threads.lua @@ -0,0 +1,14 @@ +local threads = {} + +local C = require 'libthreads' + +threads.Thread = C.Thread +threads.Mutex = C.Mutex +threads.Condition = C.Condition +threads.Semaphore = C.Semaphore +--threads.Threads = require 'threads.threads' + +-- only for backward compatibility (boo) +-- setmetatable(threads, getmetatable(threads.Threads)) + +-- return threads diff --git a/nerv/init.lua b/nerv/init.lua index 9c1a5c8..406aea6 100644 --- a/nerv/init.lua +++ b/nerv/init.lua @@ -22,6 +22,12 @@ function nerv.mesg_with_timestamp(fmt, ...) os.date("%H:%M:%S %F"), fmt), ...) end +function nerv.info_stderr(fmt, ...) + io.stderr:write(string.format( + string.format("(%s)[nerv] info: %s\n",os.date("%H:%M:%S %F"), fmt), + ...)) +end + function nerv.info(fmt, ...) nerv.printf( string.format("(%s)[nerv] info: %s\n", diff --git a/nerv/io/sgd_buffer.lua b/nerv/io/sgd_buffer.lua index 3f854f0..65d6da1 100644 --- a/nerv/io/sgd_buffer.lua +++ b/nerv/io/sgd_buffer.lua @@ -57,7 +57,7 @@ function SGDBuffer:saturate() buff.data:copy_from(buff.leftover, 0, lrow) buff.leftover = nil end - nerv.info("buffer leftover: %d\n", lrow) + nerv.info("buffer leftover: %d", lrow) reader.tail = lrow reader.has_leftover = false end @@ -107,7 +107,7 @@ function SGDBuffer:get_data() if not self:saturate() then return nil -- the remaining data cannot build a batch end - nerv.info("%.3fs to fill the buffer", os.clock() - t) + --nerv.info("%.3fs to fill the buffer", os.clock() - t) end if self.head + batch_size > self.tail then return nil -- the remaining data cannot build a batch diff --git a/nerv/layer/affine.lua b/nerv/layer/affine.lua index 00cbcfb..56a32f9 100644 --- a/nerv/layer/affine.lua +++ b/nerv/layer/affine.lua @@ -19,19 +19,19 @@ end function MatrixParam:update(gradient) local gconf = self.gconf - self.correction:add(self.correction, gradient, gconf.momentum, 1.0) + self.correction:add(self.correction, gradient, gconf.momentum, 1.0, nerv.context) -- momentum gain local mmt_gain = 1.0 / (1.0 - gconf.momentum); local n = self.gconf.batch_size * mmt_gain -- perform update - self.trans:add(self.trans, self.correction, 1.0, -gconf.lrate / n) + self.trans:add(self.trans, self.correction, 1.0, -gconf.lrate / n, nerv.context) end function LinearTransParam:update(gradient) MatrixParam.update(self, gradient) local gconf = self.gconf -- weight decay - self.trans:add(self.trans, self.trans, 1.0, -gconf.lrate * gconf.wcost) + self.trans:add(self.trans, self.trans, 1.0, -gconf.lrate * gconf.wcost, nerv.context) end function AffineLayer:__init(id, global_conf, layer_conf) @@ -61,29 +61,50 @@ function AffineLayer:init(batch_size) end function AffineLayer:update(bp_err, input, output) + --print(nerv.context) if self.direct_update then - self.ltp.correction:mul(input[1], bp_err[1], 1.0, gconf.momentum, 'T', 'N') + self.ltp.correction:mul(input[1], bp_err[1], 1.0, gconf.momentum, 'T', 'N', nerv.context) -- momentum gain local mmt_gain = 1.0 / (1.0 - gconf.momentum); local n = self.gconf.batch_size * mmt_gain -- perform update - self.ltp.trans:add(self.ltp.trans, self.ltp.correction, 1.0, -gconf.lrate / n) + self.ltp.trans:add(self.ltp.trans, self.ltp.correction, 1.0, -gconf.lrate / n, nerv.context) else - self.ltp_grad:mul(input[1], bp_err[1], 1.0, 0.0, 'T', 'N') + self.ltp_grad:mul(input[1], bp_err[1], 1.0, 0.0, 'T', 'N', nerv.context) self.ltp:update(self.ltp_grad) end self.bp:update(bp_err[1]:colsum()) end +function AffineLayer:gradient(bp_err, input, output) + + self.ltp.correction:mul(input[1], bp_err[1], 1.0, gconf.momentum, 'T', 'N', nerv.context) + self.bp_grad = bp_err[1]:colsum() + self.bp.correction:add(self.bp.correction, self.bp_grad, gconf.momentum, 1.0, nerv.context) +end + +function AffineLayer:update_gradient() + -- momentum gain + local mmt_gain = 1.0 / (1.0 - gconf.momentum); + local n = self.gconf.batch_size * mmt_gain + -- perform update + self.ltp.trans:add(self.ltp.trans, self.ltp.correction, 1.0, -gconf.lrate / n, nerv.context) + self.bp.trans:add(self.bp.trans, self.bp.correction, 1.0, -gconf.lrate / n, nerv.context) + + self.ltp.trans:add(self.ltp.trans, self.ltp.trans, 1.0, -gconf.lrate * gconf.wcost, nerv.context) + self.bp.trans:add(self.bp.trans, self.bp.trans, 1.0, -gconf.lrate * gconf.wcost, nerv.context) +end + function AffineLayer:propagate(input, output) -- apply linear transform - output[1]:mul(input[1], self.ltp.trans, 1.0, 0.0, 'N', 'N') + --print(nerv.context) + output[1]:mul(input[1], self.ltp.trans, 1.0, 0.0, 'N', 'N', nerv.context) -- add bias output[1]:add_row(self.bp.trans, 1.0) end function AffineLayer:back_propagate(bp_err, next_bp_err, input, output) - next_bp_err[1]:mul(bp_err[1], self.ltp.trans, 1.0, 0.0, 'N', 'T') + next_bp_err[1]:mul(bp_err[1], self.ltp.trans, 1.0, 0.0, 'N', 'T', nerv.context) end function AffineLayer:get_params() diff --git a/nerv/layer/combiner.lua b/nerv/layer/combiner.lua index 7bd7617..23cf1db 100644 --- a/nerv/layer/combiner.lua +++ b/nerv/layer/combiner.lua @@ -36,7 +36,7 @@ end function CombinerLayer:propagate(input, output) output[1]:fill(0) for i = 1, #self.dim_in do - output[1]:add(output[1], input[i], 1.0, self.lambda[i]) + output[1]:add(output[1], input[i], 1.0, self.lambda[i], nerv.context) end for i = 2, #self.dim_out do output[i]:copy_fromd(output[1]) @@ -47,10 +47,10 @@ function CombinerLayer:back_propagate(bp_err, next_bp_err, input, output) local sum = self.sum sum:copy_fromd(bp_err[1]) for i = 2, #self.dim_out do - sum:add(sum, bp_err[i], 1.0, 1.0) + sum:add(sum, bp_err[i], 1.0, 1.0, nerv.context) end for i = 1, #self.dim_in do - next_bp_err[i]:add(next_bp_err[i], sum, 0.0, self.lambda[i]) + next_bp_err[i]:add(next_bp_err[i], sum, 0.0, self.lambda[i], nerv.context) end end diff --git a/nerv/layer/sigmoid.lua b/nerv/layer/sigmoid.lua index dfd09eb..f6f1417 100644 --- a/nerv/layer/sigmoid.lua +++ b/nerv/layer/sigmoid.lua @@ -18,6 +18,14 @@ function SigmoidLayer:update(bp_err, input, output) -- no params, therefore do nothing end +function SigmoidLayer:gradient(bp_err, input, output) + -- no params, therefore do nothing +end + +function SigmoidLayer:update_gradient() + -- no params, therefore do nothing +end + function SigmoidLayer:propagate(input, output) output[1]:sigmoid(input[1]) end diff --git a/nerv/layer/softmax.lua b/nerv/layer/softmax.lua index e979ebf..7e9c6f0 100644 --- a/nerv/layer/softmax.lua +++ b/nerv/layer/softmax.lua @@ -18,6 +18,15 @@ function SoftmaxLayer:update(bp_err, input, output) -- no params, therefore do nothing end +function SoftmaxLayer:gradient(bp_err, input, output) + -- no params, therefore do nothing +end + +function SoftmaxLayer:update_gradient() + -- no params, therefore do nothing +end + + function SoftmaxLayer:propagate(input, output) output[1]:softmax(input[1]) end diff --git a/nerv/layer/softmax_ce.lua b/nerv/layer/softmax_ce.lua index f878a2f..42adbc6 100644 --- a/nerv/layer/softmax_ce.lua +++ b/nerv/layer/softmax_ce.lua @@ -27,6 +27,14 @@ function SoftmaxCELayer:update(bp_err, input, output) -- no params, therefore do nothing end +function SoftmaxCELayer:gradient(bp_err, input, output) + -- no params, therefore do nothing +end + +function SoftmaxCELayer:update_gradient(bp_err, input, output) + -- no params, therefore do nothing +end + function SoftmaxCELayer:propagate(input, output) local softmax = self.softmax local ce = self.ce diff --git a/nerv/lib/common.c b/nerv/lib/common.c index db667b2..1fa1d9f 100644 --- a/nerv/lib/common.c +++ b/nerv/lib/common.c @@ -1,4 +1,5 @@ #include "common.h" +#include "matrix/cuda_helper.h" #include <stdarg.h> int nerv_error(lua_State *L, const char *err_mesg_fmt, ...) { va_list ap; diff --git a/nerv/lib/common.h b/nerv/lib/common.h index 1c588d1..a4e3582 100644 --- a/nerv/lib/common.h +++ b/nerv/lib/common.h @@ -59,6 +59,8 @@ typedef struct Status { nerv_error_status(L, &status); \ } while (0) +#define PROFILE_HASHMAP_SIZE 123457 + typedef struct HashNode { const char *key; void *val; @@ -82,6 +84,8 @@ void hashmap_clear(HashMap *h); size_t bkdr_hash(const char *key); +extern const char *nerv_context_tname; + int nerv_error(lua_State *L, const char *err_mesg_fmt, ...); int nerv_error_status(lua_State *L, Status *status); int nerv_error_method_not_implemented(lua_State *L); diff --git a/nerv/lib/io/chunk_file.c b/nerv/lib/io/chunk_file.c index 4e00b0b..79dbee3 100644 --- a/nerv/lib/io/chunk_file.c +++ b/nerv/lib/io/chunk_file.c @@ -61,7 +61,7 @@ static const char *read_chunk_metadata(FILE *fp, const char *fn, NERV_SET_STATUS(status, (fgets(buff + LUA_RETURN_LEN, LINEBUFF_SIZE, fp) == (buff + LUA_RETURN_LEN) ? \ NERV_NORMAL : CF_INVALID_FORMAT), 0); - fprintf(stderr, "metadata: %s\n", buff); + //fprintf(stderr, "metadata: %s\n", buff); return buff; } @@ -112,7 +112,7 @@ static ChunkFile *open_read(const char *fn, Status *status) { for (i = 0;; offset += chunk_len, i++) { ChunkInfo *cip; - fprintf(stderr, "reading chunk %d from %d\n", i, (int)offset); + //fprintf(stderr, "reading chunk %d from %d\n", i, (int)offset); /* skip to the begining of chunk i */ if (fseeko(fp, offset, SEEK_SET) != 0) { diff --git a/nerv/lib/matrix/cukernel.h b/nerv/lib/matrix/cukernel.h index 2126c6f..31e199b 100644 --- a/nerv/lib/matrix/cukernel.h +++ b/nerv/lib/matrix/cukernel.h @@ -13,7 +13,7 @@ void cudak_(cuda_softmax_final)(const Matrix *a, const Matrix *max, const Matrix void cudak_(cuda_add_row)(const Matrix *a, Matrix *b, double beta); void cudak_(cuda_fill)(Matrix *a, double val); void cudak_(cuda_clip)(Matrix *a, double val_1, double val_2); -void cudak_(cuda_expand_frm)(const Matrix *a, Matrix *b, int context); +void cudak_(cuda_expand_frm)(const Matrix *a, Matrix *b, int context, int a_begin, int a_end); void cudak_(cuda_rearrange_frm)(const Matrix *a, Matrix *b, int step); void cudak_(cuda_scale_rows_by_row)(const Matrix *a, Matrix *b); void cudak_(cuda_scale_rows_by_col)(const Matrix *a, Matrix *b); diff --git a/nerv/lib/matrix/cumatrix.c b/nerv/lib/matrix/cumatrix.c index ff1168d..c913db2 100644 --- a/nerv/lib/matrix/cumatrix.c +++ b/nerv/lib/matrix/cumatrix.c @@ -2,11 +2,13 @@ #include "../common.h" #include "cuda_helper.h" #include <string.h> -#define PROFILE_HASHMAP_SIZE 123457 + static cublasHandle_t cublas_handle; static cudaEvent_t profile_start, profile_stop; static HashMap *profile; +const char *nerv_context_tname = "nerv.CCuContext"; + void nerv_cumatrix_print_profile() { size_t i; fprintf(stderr, "*** [nerv cumatrix profile] **\n"); @@ -35,6 +37,12 @@ void accu_profile(const char *name, float delta) { *val += delta; } +cublasHandle_t* nerv_get_cublas_handle() +{ + return &cublas_handle; +} + + void nerv_cumatrix_init() { cublasCreate(&cublas_handle); cudaEventCreate(&profile_start); diff --git a/nerv/lib/matrix/cumatrix.h b/nerv/lib/matrix/cumatrix.h index e6def66..e53c702 100644 --- a/nerv/lib/matrix/cumatrix.h +++ b/nerv/lib/matrix/cumatrix.h @@ -4,4 +4,18 @@ void nerv_cumatrix_print_profile(); void nerv_cumatrix_clear_profile(); void nerv_cumatrix_init(); + +void nerv_set_cublas_handle(); + +typedef struct CuContext +{ + cublasHandle_t cublas_handle; + cudaEvent_t profile_start, profile_stop; + HashMap *profile; + pthread_t pid; + int refcount; +}CuContext; + +extern const char *nerv_context_tname; + #endif diff --git a/nerv/lib/matrix/generic/cukernel.cu b/nerv/lib/matrix/generic/cukernel.cu index 08feb59..707f8fd 100644 --- a/nerv/lib/matrix/generic/cukernel.cu +++ b/nerv/lib/matrix/generic/cukernel.cu @@ -229,14 +229,15 @@ __global__ void cudak_(expand_frm)(const MATRIX_ELEM *a, MATRIX_ELEM *b, int nrow, int ncol, int enrow, int encol, int stride, int estride, - int context) { + int context, + int a_begin, int a_end) { int j = blockIdx.x * blockDim.x + threadIdx.x; int i = blockIdx.y * blockDim.y + threadIdx.y; int ridx; if (i >= enrow || j >= encol) return; ridx = i + j / ncol - context; - if (ridx < 0) ridx = 0; - else if (ridx >= nrow) ridx = nrow - 1; + if (ridx < a_begin) ridx = a_begin; + else if (ridx >= a_end) ridx = a_end - 1; b[j + i * estride] = a[j % ncol + ridx * stride]; } @@ -541,7 +542,7 @@ extern "C" { cudaStreamSynchronize(0); } - void cudak_(cuda_expand_frm)(const Matrix *a, Matrix *b, int context) { + void cudak_(cuda_expand_frm)(const Matrix *a, Matrix *b, int context, int a_begin, int a_end) { dim3 threadsPerBlock(CUDA_THREADS_N, CUDA_THREADS_N); dim3 numBlocks(CEIL_DIV(b->ncol, threadsPerBlock.x), CEIL_DIV(b->nrow, threadsPerBlock.y)); @@ -551,7 +552,8 @@ extern "C" { b->nrow, b->ncol, a->stride / sizeof(MATRIX_ELEM), b->stride / sizeof(MATRIX_ELEM), - context); + context, + a_begin, a_end); cudaStreamSynchronize(0); } diff --git a/nerv/lib/matrix/generic/cumatrix.c b/nerv/lib/matrix/generic/cumatrix.c index 770e503..5b11496 100644 --- a/nerv/lib/matrix/generic/cumatrix.c +++ b/nerv/lib/matrix/generic/cumatrix.c @@ -13,12 +13,13 @@ void nerv_matrix_(add)(Matrix *c, const Matrix *a, const Matrix *b, MATRIX_ELEM alpha, MATRIX_ELEM beta, - Status *status) { + cublasHandle_t *handle, Status *status) { CHECK_SAME_DIMENSION(a, b, status); CHECK_SAME_DIMENSION(a, c, status); - PROFILE_START + cublasHandle_t *cuhandle = (handle == NULL ? &cublas_handle : handle); + PROFILE_START //cublas_handle CUBLAS_SAFE_SYNC_CALL( - NERV_CUBLAS_(geam)(cublas_handle, CUBLAS_OP_N, CUBLAS_OP_N, + NERV_CUBLAS_(geam)(*cuhandle, CUBLAS_OP_N, CUBLAS_OP_N, a->ncol, a->nrow, &alpha, MATRIX_ELEM_PTR(a), a->stride / sizeof(MATRIX_ELEM), @@ -32,7 +33,7 @@ void nerv_matrix_(add)(Matrix *c, const Matrix *a, const Matrix *b, void nerv_matrix_(mul)(Matrix *c, const Matrix *a, const Matrix *b, MATRIX_ELEM alpha, MATRIX_ELEM beta, - int ta, int tb, Status *status) { + int ta, int tb, cublasHandle_t *handle, Status *status) { #define SWAP(a, b) \ do { int t = (a); (a) = (b); (b) = t; } while (0) @@ -42,10 +43,11 @@ void nerv_matrix_(mul)(Matrix *c, const Matrix *a, const Matrix *b, if (tb == CUBLAS_OP_T) SWAP(bm, bn); if (an != bm) NERV_EXIT_STATUS(status, MAT_WRONG_MULT_DIM, 0); + cublasHandle_t *cuhandle = (handle == NULL ? &cublas_handle : handle); /* Because matrix in Nerv is row-major, here b comes first */ - PROFILE_START + PROFILE_START //cublas_handle CUBLAS_SAFE_SYNC_CALL( - NERV_CUBLAS_(gemm)(cublas_handle, tb, ta, + NERV_CUBLAS_(gemm)(*cuhandle, tb, ta, bn, am, bm, &alpha, MATRIX_ELEM_PTR(b), b->stride / sizeof(MATRIX_ELEM), @@ -253,15 +255,16 @@ void nerv_matrix_(copy_toh)(Matrix *a, const Matrix *b, NERV_SET_STATUS(status, NERV_NORMAL, 0); } -Matrix *nerv_matrix_(trans)(Matrix *a, Status *status) { +Matrix *nerv_matrix_(trans)(Matrix *a, cublasHandle_t *handle, Status *status) { MATRIX_ELEM alpha = 1, beta = 0; Matrix *b = nerv_matrix_(create)(a->ncol, a->nrow, status); if (status->err_code != NERV_NORMAL) return NULL; + cublasHandle_t *cuhandle = (handle == NULL ? &cublas_handle : handle); /* FIXME: possible memory leak when lua error is raised */ - PROFILE_START + PROFILE_START //cublas_handle CUBLAS_SAFE_SYNC_CALL_RET( - NERV_CUBLAS_(geam)(cublas_handle, CUBLAS_OP_T, CUBLAS_OP_T, + NERV_CUBLAS_(geam)(*cuhandle, CUBLAS_OP_T, CUBLAS_OP_T, a->nrow, a->ncol, &alpha, MATRIX_ELEM_PTR(a), a->stride / sizeof(MATRIX_ELEM), @@ -360,14 +363,16 @@ void nerv_matrix_(copy_rows_fromd_by_idx)(Matrix *a, const Matrix *b, } void nerv_matrix_(expand_frm)(Matrix *a, const Matrix *b, - int context, Status *status) { - if (a->nrow != b->nrow) - NERV_EXIT_STATUS(status, MAT_MISMATCH_DIM, 0); + int context, int b_begin, int b_end, + Status *status) { + if (!(0 <= b_begin && b_begin < b_end && b_end <= b->nrow && + b_end - b_begin == a->nrow)) + NERV_EXIT_STATUS(status, MAT_MISMATCH_DIM, 0); if (a->ncol != b->ncol * (context * 2 + 1)) NERV_EXIT_STATUS(status, MAT_GENERAL_ERR, "the width should be 2 * context + 1"); PROFILE_START - cudak_(cuda_expand_frm)(b, a, context); + cudak_(cuda_expand_frm)(b, a, context, b_begin, b_end); PROFILE_STOP NERV_SET_STATUS(status, NERV_NORMAL, 0); } diff --git a/nerv/lib/matrix/generic/cumatrix.h b/nerv/lib/matrix/generic/cumatrix.h index 04e8c5a..f476414 100644 --- a/nerv/lib/matrix/generic/cumatrix.h +++ b/nerv/lib/matrix/generic/cumatrix.h @@ -1,11 +1,11 @@ #include "../../common.h" void nerv_matrix_(add)(Matrix *c, const Matrix *a, const Matrix *b, - MATRIX_ELEM alpha, MATRIX_ELEM beta, + MATRIX_ELEM alpha, MATRIX_ELEM beta, cublasHandle_t *handle, Status *status); void nerv_matrix_(mul)(Matrix *c, const Matrix *a, const Matrix *b, MATRIX_ELEM alpha, MATRIX_ELEM beta, - int ta, int tb, Status *status); + int ta, int tb, cublasHandle_t *handle, Status *status); void nerv_matrix_(sigmoid)(Matrix *a, const Matrix *b, Status *status); void nerv_matrix_(sigmoid_grad)(Matrix *nerr, const Matrix *err, const Matrix *output, Status *status); @@ -31,7 +31,7 @@ void nerv_matrix_(copy_fromh)(Matrix *a, const Matrix *b, void nerv_matrix_(copy_toh)(Matrix *a, const Matrix *b, int a_begin, int a_end, int b_begin, Status *status); -Matrix *nerv_matrix_(trans)(Matrix *a, Status *status); +Matrix *nerv_matrix_(trans)(Matrix *a, cublasHandle_t *handle, Status *status); void nerv_matrix_(mul_elem)(Matrix *c, const Matrix *a, const Matrix *b, Status *status); @@ -44,7 +44,7 @@ void nerv_matrix_(copy_rows_fromd_by_idx)(Matrix *a, const Matrix *b, const Matrix *idx, int b_begin, Status *status); void nerv_matrix_(expand_frm)(Matrix *a, const Matrix *b, - int context, Status *status); + int context, int b_begin, int b_end, Status *status); void nerv_matrix_(rearrange_frm)(Matrix *a, const Matrix *b, int step, Status *status); void nerv_matrix_(scale_rows_by_col)(Matrix *a, const Matrix *b, diff --git a/nerv/lib/matrix/generic/matrix.c b/nerv/lib/matrix/generic/matrix.c index 6cb3dc0..cac1ee7 100644 --- a/nerv/lib/matrix/generic/matrix.c +++ b/nerv/lib/matrix/generic/matrix.c @@ -5,21 +5,29 @@ void nerv_matrix_(data_free)(Matrix *self, Status *status) { assert(*self->data_ref > 0); - if (--(*self->data_ref) == 0) + //if (--(*self->data_ref) == 0) + if (NULL != self && __sync_fetch_and_add(&self->refcount, -1) == 1) { - /* free matrix data */ - MATRIX_DATA_FREE(MATRIX_ELEM_PTR(self), status); - free(self->data_ref); - free(self); - } - else { - free(self); - NERV_SET_STATUS(status, NERV_NORMAL, 0); + if(__sync_fetch_and_add(self->data_ref, -1) == 1) + { + /* free matrix data */ + MATRIX_DATA_FREE(MATRIX_ELEM_PTR(self), status); + free(self->data_ref); + free(self); + self = NULL; + } + else + { + free(self); + self = NULL; + NERV_SET_STATUS(status, NERV_NORMAL, 0); + } } } void nerv_matrix_(data_retain)(Matrix *self) { - (*self->data_ref)++; + __sync_fetch_and_add(self->data_ref, 1); + //(*self->data_ref)++; } Matrix *nerv_matrix_(create)(long nrow, long ncol, Status *status) { @@ -36,6 +44,7 @@ Matrix *nerv_matrix_(create)(long nrow, long ncol, Status *status) { free(self); return NULL; } + self->refcount = 1; self->data_ref = (long *)malloc(sizeof(long)); *self->data_ref = 0; nerv_matrix_(data_retain)(self); @@ -56,6 +65,7 @@ Matrix *nerv_matrix_(getrow)(Matrix *self, int row) { prow->nmax = prow->ncol; MATRIX_ELEM_PTR(prow) = MATRIX_ROW_PTR(self, row); prow->data_ref = self->data_ref; + prow->refcount = 1; nerv_matrix_(data_retain)(prow); return prow; } diff --git a/nerv/lib/matrix/generic/mmatrix.c b/nerv/lib/matrix/generic/mmatrix.c index 225079e..0850c6e 100644 --- a/nerv/lib/matrix/generic/mmatrix.c +++ b/nerv/lib/matrix/generic/mmatrix.c @@ -7,6 +7,7 @@ #define NERV_GENERIC_MATRIX #include "../../common.h" #include "../../io/chunk_file.h" +#include "../cuda_helper.h" #include "string.h" static void host_matrix_(free)(MATRIX_ELEM *ptr, Status *status) { @@ -79,4 +80,47 @@ void nerv_matrix_(copy_from)(Matrix *a, const Matrix *b, NERV_SET_STATUS(status, NERV_NORMAL, 0); } +void nerv_matrix_(expand_frm)(Matrix *a, const Matrix *b, + int context, int b_begin, int b_end, Status *status) { + if (!(0 <= b_begin && b_begin < b_end && b_end <= b->nrow && + b_end - b_begin == a->nrow)) + NERV_EXIT_STATUS(status, MAT_MISMATCH_DIM, 0); + if (a->ncol != b->ncol * (context * 2 + 1)) + NERV_EXIT_STATUS(status, MAT_GENERAL_ERR, + "the width should be 2 * context + 1"); + int i, j, r_off; + for (i = 0; i < a->nrow; i++) + { + for (j = 0; j < context * 2 + 1; j++) + { + r_off = b_begin + i + j - context; + if (r_off < b_begin) r_off = b_begin; + if (r_off >= b_end) r_off = b_end - 1; + memcpy(MATRIX_ROW_PTR(a, i) + j*b->ncol, MATRIX_ROW_PTR(b, r_off), sizeof(MATRIX_ELEM) * b->ncol); + } + } + NERV_SET_STATUS(status, NERV_NORMAL, 0); +} + +void nerv_matrix_(rearrange_frm)(Matrix *a, const Matrix *b, + int step, int b_begin, int b_end, Status *status) { + //CHECK_SAME_DIMENSION(a, b, status); + if (!(0 <= b_begin && b_begin < b_end && b_end <= b->nrow && + b_end - b_begin == a->nrow)) + NERV_EXIT_STATUS(status, MAT_MISMATCH_DIM, 0); + if (b->ncol % step) + NERV_EXIT_STATUS(status, MAT_GENERAL_ERR, + "the dimension of columns is not divisible by step"); + + int i, j; + int stride = a->stride / sizeof(MATRIX_ELEM); + for (i = 0; i < a->nrow; i++) + { + for (j = 0; j < a->ncol; j++) + MATRIX_ELEM_PTR(a)[j + i * stride] = MATRIX_ELEM_PTR(b)[j / step + (j % step) * (b->ncol/step) + (i+b_begin) * stride]; + } + NERV_SET_STATUS(status, NERV_NORMAL, 0); +} + + #endif diff --git a/nerv/lib/matrix/generic/mmatrix.h b/nerv/lib/matrix/generic/mmatrix.h index f00a04d..eb6c4c7 100644 --- a/nerv/lib/matrix/generic/mmatrix.h +++ b/nerv/lib/matrix/generic/mmatrix.h @@ -6,3 +6,9 @@ void nerv_matrix_(save)(Matrix *self, ChunkFile *cfp, Status *status); void nerv_matrix_(copy_from)(Matrix *a, const Matrix *b, int a_begin, int b_begin, int b_end, Status *status); + +void nerv_matrix_(expand_frm)(Matrix *a, const Matrix *b, + int context, int b_begin, int b_end, Status *status); + +void nerv_matrix_(rearrange_frm)(Matrix *a, const Matrix *b, + int step, int b_begin, int b_end, Status *status); diff --git a/nerv/lib/matrix/matrix.h b/nerv/lib/matrix/matrix.h index 67a6e30..51ca736 100644 --- a/nerv/lib/matrix/matrix.h +++ b/nerv/lib/matrix/matrix.h @@ -13,6 +13,7 @@ typedef struct Matrix { long *i; } data; /* pointer to actual storage */ long *data_ref; + int refcount; /* prevent matrix struct double free */ } Matrix; #define MATRIX_ROW_PTR(self, row) \ diff --git a/nerv/lib/matrix/mmatrix.c b/nerv/lib/matrix/mmatrix.c index b8157eb..e54b336 100644 --- a/nerv/lib/matrix/mmatrix.c +++ b/nerv/lib/matrix/mmatrix.c @@ -52,3 +52,21 @@ Matrix *nerv_matrix_(perm_gen)(int ncol, Status *status) { #define host_matrix_(NAME) host_matrix_int_##NAME #define nerv_matrix_(NAME) nerv_matrix_host_int_##NAME #include "generic/mmatrix.c" + +Matrix *nerv_matrix_(perm_gen)(int ncol, Status *status) { + int i; + Matrix *self = nerv_matrix_(create)(1, ncol, status); + if (status->err_code != NERV_NORMAL) + return NULL; + long *prow = self->data.i; + for (i = 0; i < ncol; i++) + prow[i] = i; + for (i = ncol - 1; i >= 0; i--) + { + size_t j = rand() % (i + 1); + long tmp = prow[i]; + prow[i] = prow[j]; + prow[j] = tmp; + } + return self; +} diff --git a/nerv/matrix/cumatrix.c b/nerv/matrix/cumatrix.c index fef03fc..08b7efa 100644 --- a/nerv/matrix/cumatrix.c +++ b/nerv/matrix/cumatrix.c @@ -1,4 +1,5 @@ #define NERV_GENERIC_CUMATRIX +#include "../lib/matrix/cuda_helper.h" #include "../lib/common.h" #include "../lib/matrix/cumatrix.h" #include "../lib/matrix/cuda_helper.h" diff --git a/nerv/matrix/generic/cumatrix.c b/nerv/matrix/generic/cumatrix.c index 08cb4c2..ce7e68a 100644 --- a/nerv/matrix/generic/cumatrix.c +++ b/nerv/matrix/generic/cumatrix.c @@ -6,6 +6,7 @@ #define MATRIX_BASE_TNAME nerv_matrix_cuda_tname #define NERV_GENERIC_MATRIX #define NERV_GENERIC_CUKERNEL +#include "../../lib/matrix/cuda_helper.h" #include "../../lib/common.h" #include "../../lib/matrix/generic/matrix.h" #include "../../lib/matrix/generic/cumatrix.h" @@ -17,7 +18,9 @@ static int nerv_matrix_(lua_add)(lua_State *L) { const Matrix *b = luaT_checkudata(L, 3, nerv_matrix_(tname)); MATRIX_ELEM alpha = luaL_checknumber(L, 4); MATRIX_ELEM beta = luaL_checknumber(L, 5); - nerv_matrix_(add)(c, a, b, alpha, beta, &status); + int nargs = lua_gettop(L); + CuContext *context = nargs > 5 ? luaT_toudata(L, 6, nerv_context_tname) : NULL; + nerv_matrix_(add)(c, a, b, alpha, beta, &context->cublas_handle, &status); NERV_LUA_CHECK_STATUS(L, status); return 0; } @@ -38,11 +41,21 @@ static int nerv_matrix_(lua_mul)(lua_State *L) { : CUBLAS_OP_N; int tb = nargs > 6 ? nerv_matrix_(lua_get_cublas_op)(*luaL_checkstring(L, 7)) \ : CUBLAS_OP_N; - nerv_matrix_(mul)(c, a, b, alpha, beta, ta, tb, &status); + CuContext *context = nargs > 7 ? luaT_toudata(L, 8, nerv_context_tname) : NULL; + nerv_matrix_(mul)(c, a, b, alpha, beta, ta, tb, &context->cublas_handle, &status); NERV_LUA_CHECK_STATUS(L, status); return 0; } +static int nerv_matrix_(lua_create)(lua_State *L) { + Status status; + Matrix *a = luaT_checkudata(L, 1, nerv_matrix_(tname)); + Matrix *b = nerv_matrix_(create)(a->nrow, a->ncol, &status); + NERV_LUA_CHECK_STATUS(L, status); + luaT_pushudata(L, b, nerv_matrix_(tname)); + return 1; +} + static int nerv_matrix_(lua_sigmoid)(lua_State *L) { Status status; Matrix *a = luaT_checkudata(L, 1, nerv_matrix_(tname)); @@ -193,7 +206,9 @@ static int nerv_matrix_(lua_copy_toh)(lua_State *L) { static int nerv_matrix_(lua_trans)(lua_State *L) { Status status; Matrix *a = luaT_checkudata(L, 1, nerv_matrix_(tname)); - Matrix *b = nerv_matrix_(trans)(a, &status); + int nargs = lua_gettop(L); + CuContext *context = nargs > 1 ? luaT_toudata(L, 2, nerv_context_tname) : NULL; + Matrix *b = nerv_matrix_(trans)(a, &context->cublas_handle, &status); NERV_LUA_CHECK_STATUS(L, status); luaT_pushudata(L, b, nerv_matrix_(tname)); return 1; @@ -258,7 +273,11 @@ static int nerv_matrix_(lua_expand_frm)(lua_State *L) { Matrix *a = luaT_checkudata(L, 1, nerv_matrix_(tname)); const Matrix *b = luaT_checkudata(L, 2, nerv_matrix_(tname)); int context = luaL_checkinteger(L, 3); - nerv_matrix_(expand_frm)(a, b, context, &status); + + int nargs = lua_gettop(L); + int b_begin = nargs > 3 ? luaL_checkinteger(L, 4) : 0; + int b_end = nargs > 4 ? luaL_checkinteger(L, 5) : b->nrow; + nerv_matrix_(expand_frm)(a, b, context, b_begin, b_end, &status); NERV_LUA_CHECK_STATUS(L, status); return 0; } diff --git a/nerv/matrix/generic/matrix.c b/nerv/matrix/generic/matrix.c index 8efe608..70b098c 100644 --- a/nerv/matrix/generic/matrix.c +++ b/nerv/matrix/generic/matrix.c @@ -86,6 +86,12 @@ static int nerv_matrix_(lua_ncol)(lua_State *L) { return 1; } +static int nerv_matrix_(lua_nstride)(lua_State *L) { + Matrix *self = luaT_checkudata(L, 1, nerv_matrix_(tname)); + lua_pushinteger(L, self->stride / sizeof(MATRIX_ELEM)); + return 1; +} + static int nerv_matrix_(lua_dim)(lua_State *L) { Matrix *self = luaT_checkudata(L, 1, nerv_matrix_(tname)); lua_pushinteger(L, self->dim); @@ -108,6 +114,7 @@ static const luaL_Reg nerv_matrix_(methods)[] = { {"get_elem", nerv_matrix_(lua_get_elem)}, {"set_elem", nerv_matrix_(lua_set_elem)}, {"ncol", nerv_matrix_(lua_ncol)}, + {"nstride", nerv_matrix_(lua_nstride)}, {"nrow", nerv_matrix_(lua_nrow)}, {"dim", nerv_matrix_(lua_dim)}, {"get_dataref_value", nerv_matrix_(lua_get_dataref_value)}, diff --git a/nerv/matrix/generic/mmatrix.c b/nerv/matrix/generic/mmatrix.c index 01dd9e5..709c08e 100644 --- a/nerv/matrix/generic/mmatrix.c +++ b/nerv/matrix/generic/mmatrix.c @@ -72,10 +72,44 @@ int nerv_matrix_(lua_copy_from)(lua_State *L) { return 0; } +int nerv_matrix_(lua_expand_frm)(lua_State *L) +{ + Status status; + Matrix *a = luaT_checkudata(L, 1, nerv_matrix_(tname)); + const Matrix *b = luaT_checkudata(L, 2, nerv_matrix_(tname)); + int context = luaL_checkinteger(L, 3); + + int nargs = lua_gettop(L); + int b_begin = nargs > 3 ? luaL_checkinteger(L, 4) : 0; + int b_end = nargs > 4 ? luaL_checkinteger(L, 5) : b->nrow; + nerv_matrix_(expand_frm)(a, b, context, b_begin, b_end, &status); + NERV_LUA_CHECK_STATUS(L, status); + return 0; +} + +int nerv_matrix_(lua_rearrange_frm)(lua_State *L) +{ + Status status; + Matrix *a = luaT_checkudata(L, 1, nerv_matrix_(tname)); + const Matrix *b = luaT_checkudata(L, 2, nerv_matrix_(tname)); + int step = luaL_checkinteger(L, 3); + + int nargs = lua_gettop(L); + int b_begin = nargs > 3 ? luaL_checkinteger(L, 4) : 0; + int b_end = nargs > 4 ? luaL_checkinteger(L, 5) : b->nrow; + nerv_matrix_(rearrange_frm)(a, b, step, b_begin, b_end, &status); + NERV_LUA_CHECK_STATUS(L, status); + return 0; + + +} + static const luaL_Reg nerv_matrix_(extra_methods)[] = { {"load", nerv_matrix_(lua_load)}, {"save", nerv_matrix_(lua_save)}, {"copy_from", nerv_matrix_(lua_copy_from)}, + {"expand_frm", nerv_matrix_(lua_expand_frm)}, + {"rearrange_frm", nerv_matrix_(lua_rearrange_frm)}, {NULL, NULL} }; diff --git a/nerv/matrix/init.lua b/nerv/matrix/init.lua index 1091d7e..50bdb11 100644 --- a/nerv/matrix/init.lua +++ b/nerv/matrix/init.lua @@ -53,19 +53,19 @@ nerv.MMatrixInt.fmt = "%d " function nerv.CuMatrix:__add__(b) c = self:create() - c:add(self, b, 1.0, 1.0) + c:add(self, b, 1.0, 1.0, nerv.context) return c end function nerv.CuMatrix:__sub__(b) c = self:create() - c:add(self, b, 1.0, -1.0) + c:add(self, b, 1.0, -1.0, nerv.context) return c end function nerv.CuMatrix:__mul__(b) c = nerv.get_type(self.__typename)(self:nrow(), b:ncol()) - c:mul(self, b, 1.0, 0.0, 'N', 'N') + c:mul(self, b, 1.0, 0.0, 'N', 'N', nerv.context) return c end diff --git a/nerv/nn/layer_dag.lua b/nerv/nn/layer_dag.lua index 25297c2..a262a72 100644 --- a/nerv/nn/layer_dag.lua +++ b/nerv/nn/layer_dag.lua @@ -92,7 +92,7 @@ function DAGLayer:__init(id, global_conf, layer_conf) for id, ref in pairs(layers) do if ref.in_deg == 0 then table.insert(queue, ref) - nerv.info("adding source layer: %s", id) + --nerv.info("adding source layer: %s", id) r = r + 1 end end @@ -112,7 +112,7 @@ function DAGLayer:__init(id, global_conf, layer_conf) end end for i = 1, #queue do - nerv.info("enqueued layer: %s %s", queue[i].layer, queue[i].layer.id) + --nerv.info("enqueued layer: %s %s", queue[i].layer, queue[i].layer.id) end for id, ref in pairs(layers) do @@ -225,6 +225,25 @@ function DAGLayer:update(bp_err, input, output) end end +function DAGLayer:gradient(bp_err, input, output) + self:set_err_inputs(bp_err) + self:set_inputs(input) + self:set_outputs(output) + -- print("gradient") + for id, ref in pairs(self.queue) do + -- print(ref.layer.id) + ref.layer:gradient(ref.err_inputs, ref.inputs, ref.outputs) + end +end + +function DAGLayer:update_gradient() + -- print("update gradient") + for id, ref in pairs(self.queue) do + -- print(ref.layer.id) + ref.layer:update_gradient() + end +end + function DAGLayer:propagate(input, output) self:set_inputs(input) self:set_outputs(output) diff --git a/nerv/nn/layer_repo.lua b/nerv/nn/layer_repo.lua index ef333a7..8473727 100644 --- a/nerv/nn/layer_repo.lua +++ b/nerv/nn/layer_repo.lua @@ -13,7 +13,7 @@ function LayerRepo:add_layers(layer_spec, param_repo, global_conf) if layers[id] ~= nil then nerv.error("a layer with id %s already exists", id) end - nerv.info("create layer: %s", id) + --nerv.info("create layer: %s", id) if type(spec[2]) ~= "table" then nerv.error("layer config table is need") end @@ -0,0 +1,2 @@ + +./install/bin/nerv fastnn/example/asgd_sds_trainer.lua |