aboutsummaryrefslogtreecommitdiff
path: root/fastnn
diff options
context:
space:
mode:
Diffstat (limited to 'fastnn')
-rw-r--r--fastnn/Makefile58
-rw-r--r--fastnn/device/Device.cpp496
-rw-r--r--fastnn/device/Device.h51
-rw-r--r--fastnn/device/device.c178
-rw-r--r--fastnn/device/device.lua6
-rw-r--r--fastnn/example/asgd_data_trainer.lua405
-rw-r--r--fastnn/example/asgd_sds_trainer.lua343
-rw-r--r--fastnn/example/fastnn_baseline.lua258
-rw-r--r--fastnn/fastnn-scm-1.rockspec36
-rw-r--r--fastnn/init.c43
-rw-r--r--fastnn/init.lua11
-rw-r--r--fastnn/io/Example.cpp186
-rw-r--r--fastnn/io/Example.h49
-rw-r--r--fastnn/io/example.c249
-rw-r--r--fastnn/io/example.lua38
-rw-r--r--fastnn/lib/ModelSync.c305
-rw-r--r--fastnn/lib/ModelSync.h119
-rw-r--r--fastnn/lib/modelsync.c532
-rw-r--r--fastnn/lib/modelsync.lua107
-rw-r--r--fastnn/test.lua57
-rw-r--r--fastnn/threads/Makefile43
-rw-r--r--fastnn/threads/init.lua14
-rw-r--r--fastnn/threads/lib/THThread.c349
-rw-r--r--fastnn/threads/lib/THThread.h36
-rw-r--r--fastnn/threads/lib/init.c27
-rw-r--r--fastnn/threads/lib/luaTHRD.h84
-rw-r--r--fastnn/threads/lib/threads.c355
-rw-r--r--fastnn/threads/test/test-low-level.lua39
-rw-r--r--fastnn/threads/test/test-threads-async.lua66
-rw-r--r--fastnn/threads/test/test-threads-multiple.lua15
-rw-r--r--fastnn/threads/test/test-threads-shared.lua111
-rw-r--r--fastnn/threads/test/test-threads.lua20
-rw-r--r--fastnn/threads/threads-scm-1.rockspec36
-rw-r--r--fastnn/threads/threads.lua14
34 files changed, 4736 insertions, 0 deletions
diff --git a/fastnn/Makefile b/fastnn/Makefile
new file mode 100644
index 0000000..1f1ba1e
--- /dev/null
+++ b/fastnn/Makefile
@@ -0,0 +1,58 @@
+.PHONY: build install clean
+SHELL := /bin/bash
+BUILD_DIR := $(CURDIR)/build
+LUA_DIR = $(INST_LUADIR)/fastnn
+CUDA_BASE := /usr/local/cuda-6.5
+
+INC_PATH := -I $(LUA_BINDIR)/../include/nerv
+CUDA_INCLUDE := -I $(CUDA_BASE)/include/
+LIB_PATH := $(LUA_BINDIR)/../lib
+
+OBJ_DIR := $(BUILD_DIR)/objs
+SUBDIR := lib io threads device
+
+
+OBJS := init.o lib/modelsync.o lib/ModelSync.o io/example.o io/Example.o device/device.o device/Device.o
+LIBS := $(INST_LIBDIR)/libfastnn.so
+LUA_LIBS := init.lua lib/modelsync.lua io/example.lua device/device.lua
+INCLUDE := -I $(LUA_INCDIR) $(INC_PATH) $(CUDA_INCLUDE) -DLUA_USE_APICHECK -DUSE_PTHREAD_THREADS
+LDFLAGS := -L$(CUDA_BASE)/lib64/ -Wl,-rpath=$(CUDA_BASE)/lib64/ -lcudart -lcublas
+
+
+OBJS := $(addprefix $(OBJ_DIR)/,$(OBJS))
+OBJ_SUBDIR := $(addprefix $(OBJ_DIR)/,$(SUBDIR))
+LUA_SUBDIR := $(addprefix $(LUA_DIR)/,$(SUBDIR))
+LUA_LIBS := $(addprefix $(LUA_DIR)/,$(LUA_LIBS))
+
+CFLAGS := -Wall -Wextra -g -O0
+
+build: $(OBJ_DIR) $(OBJ_SUBDIR) $(OBJS)
+install: $(LIBS) $(LUA_DIR) $(LUA_SUBDIR) $(LUA_LIBS)
+
+#$(SUBMODULE): ECHO
+# cd threads; $(LUA_BINDIR)/luarocks make
+
+$(OBJ_DIR) $(LUA_DIR) $(OBJ_SUBDIR) $(LUA_SUBDIR):
+ -mkdir -p $@
+
+$(LUA_DIR)/%.lua: %.lua
+ cp $< $@
+
+$(OBJ_DIR)/io/Example.o: io/Example.cpp
+ gcc -c -o $@ $< $(INCLUDE) -fPIC $(CFLAGS)
+$(OBJ_DIR)/device/Device.o: device/Device.cpp
+ gcc -c -o $@ $< $(INCLUDE) -fPIC $(CFLAGS)
+
+$(OBJ_DIR)/%.o: %.c $(patsubst /%.o,/%.c,$@)
+ gcc -c -o $@ $< $(INCLUDE) -fPIC $(CFLAGS)
+
+$(LIBS): $(OBJS)
+ gcc -shared -o $@ $^ $(LDFLAGS) -Wl,-rpath=$(LIB_PATH) -L$(LIB_PATH) -lthreads -lnervcore -lluaT -lpthread -lstdc++
+
+ECHO:
+ @echo $(SUBMODULE)
+
+clean:
+ -rm -rf $(OBJ_DIR)
+
+
diff --git a/fastnn/device/Device.cpp b/fastnn/device/Device.cpp
new file mode 100644
index 0000000..3b69086
--- /dev/null
+++ b/fastnn/device/Device.cpp
@@ -0,0 +1,496 @@
+
+#include <vector>
+#include <string>
+#include <iostream>
+#include "../../nerv/lib/matrix/cuda_helper.h"
+#include <dlfcn.h>
+
+
+extern "C" {
+
+
+#include "Device.h"
+#include "stdlib.h"
+#include "string.h"
+
+
+struct GPUInfo
+{
+ int gpuid;
+ float d2h_bandwidth;
+ float h2d_bandwidth;
+ size_t mem_free;
+ size_t mem_total;
+ float mem_ratio;
+ bool used;
+};
+
+struct MPIGPUInfo
+{
+ char hostname[STRLEN];
+ int gpuid;
+ int myid;
+};
+
+struct Device
+{
+ std::vector<GPUInfo> gpuinfo_;
+ int refcount;
+};
+
+///////////////////////////////////////////
+
+extern cublasHandle_t* nerv_get_cublas_handle();
+
+CuContext* CuContext_new()
+{
+ CuContext *context = new CuContext;
+ cublasCreate(&context->cublas_handle);
+ cudaEventCreate(&context->profile_start);
+ cudaEventCreate(&context->profile_stop);
+ context->profile = hashmap_create(PROFILE_HASHMAP_SIZE, bkdr_hash, strcmp);
+ context->refcount = 1;
+ context->pid = pthread_self();
+ return context;
+}
+
+CuContext* CuContext_newWithId(long id)
+{
+ CuContext *context = (CuContext*)id;
+ __sync_fetch_and_add(&context->refcount, 1);
+ return context;
+}
+
+long CuContext_id(CuContext *context)
+{
+ return (long)context;
+}
+
+void CuContext_destroy(CuContext *context)
+{
+ if (NULL != context && __sync_fetch_and_add(&context->refcount, -1) == 1)
+ {
+ cublasDestroy(context->cublas_handle);
+ hashmap_clear(context->profile);
+ delete context;
+ context = NULL;
+ }
+}
+
+Device* Device_new()
+{
+ Device* device = new Device;
+ device->refcount = 1;
+ return device;
+}
+
+Device* Device_newWithId(long id)
+{
+ Device *device = (Device*)id;
+ __sync_fetch_and_add(&device->refcount, 1);
+ return device;
+}
+
+long Device_id(Device *device)
+{
+ return (long)device;
+}
+
+void Device_destroy(Device *device)
+{
+ if (NULL != device && __sync_fetch_and_add(&device->refcount, -1) == 1)
+ {
+ delete device;
+ device = NULL;
+ }
+}
+
+void GetFreeMemory(size_t* free, size_t* total, Status *status)
+{
+ // WARNING! the CUDA API is inconsistent accross versions!
+ #if (CUDA_VERSION >= 3020)
+ //define the function signature type
+ size_t mem_free, mem_total;
+#else
+ unsigned int mem_free, mem_total;
+#endif
+ {
+ //we will load the cuMemGetInfo dynamically from libcuda.so
+ //cuMemGetInfo(&mem_free, &mem_total);
+ //pre-fill ``safe'' values that will not cause problems
+ mem_free = 1; mem_total = 1;
+ //open libcuda.so
+ void* libcuda = dlopen("libcuda.so",RTLD_LAZY);
+ if(NULL == libcuda)
+ {
+ NERV_EXIT_STATUS(status, MAT_CUDA_ERR, "Cannot open libcuda.so");
+ }
+ else
+ {
+ //define the function signature type
+ //and get the symbol
+#if (CUDA_VERSION >= 3020)
+ typedef CUresult (*cu_fun_ptr)(size_t*, size_t*);
+ cu_fun_ptr dl_cuMemGetInfo = (cu_fun_ptr)dlsym(libcuda,"cuMemGetInfo_v2");
+#else
+ typedef CUresult (*cu_fun_ptr)(int*, int*);
+ cu_fun_ptr dl_cuMemGetInfo = (cu_fun_ptr)dlsym(libcuda,"cuMemGetInfo");
+#endif
+ if(NULL == dl_cuMemGetInfo)
+ {
+ NERV_EXIT_STATUS(status, MAT_CUDA_ERR, "Cannot load cuMemGetInfo from libcuda.so");
+ }
+ else
+ {
+ //call the function
+ dl_cuMemGetInfo(&mem_free, &mem_total);
+ }
+ //close the library
+ dlclose(libcuda);
+ }
+ }
+ // copy the output values outside
+ if(NULL != free) *free = mem_free;
+ if(NULL != total) *total = mem_total;
+}
+
+void DeviceGetName(char* name, int len, int dev, Status *status)
+{
+ //prefill with something reasonable
+ strncpy(name,"Unknown GPU",len);
+ //open libcuda.so
+ void* libcuda = dlopen("libcuda.so",RTLD_LAZY);
+ if(NULL == libcuda)
+ {
+ NERV_EXIT_STATUS(status, MAT_CUDA_ERR, "Cannot open libcuda.so");
+ }
+ else
+ {
+ //define the function signature type
+ typedef CUresult (*cu_fun_ptr)(char*,int,CUdevice);
+ //get the symbol
+ cu_fun_ptr cuDeviceGetName_ptr = (cu_fun_ptr)dlsym(libcuda,"cuDeviceGetName");
+ if(NULL == cuDeviceGetName_ptr) {
+ NERV_EXIT_STATUS(status, MAT_CUDA_ERR, "Cannot load cuDeviceGetName from libcuda.so");
+ }
+ else {
+ //call the function
+ cuDeviceGetName_ptr(name, len, dev);
+ }
+ //close the library
+ dlclose(libcuda);
+ }
+}
+
+void GetBandwidth(int gpu_idx, float *d2h, float *h2d, Status *status)
+{
+ int idx = gpu_idx;
+ int memSize = 64*1024*1024;
+ float elapsedTimeInMs = 0.0f;
+ float bandwidthInMBs = 0.0f;
+ unsigned char *h_idata = NULL;
+ unsigned char *h_odata = NULL;
+ cudaEvent_t start, stop;
+ bool PINNED = true;
+ int MEMCOPY_ITERATIONS = 5;
+
+ CUDA_SAFE_SYNC_CALL(cudaEventCreate(&start), status);
+ CUDA_SAFE_SYNC_CALL(cudaEventCreate(&stop), status);
+
+ //allocate host memory
+ if (PINNED)
+ {
+ #if CUDART_VERSION >= 2020
+ CUDA_SAFE_SYNC_CALL(cudaHostAlloc((void **)&h_idata, memSize, cudaHostAllocPortable), status);
+ CUDA_SAFE_SYNC_CALL(cudaHostAlloc((void **)&h_odata, memSize, cudaHostAllocPortable), status);
+ #else
+ CUDA_SAFE_SYNC_CALL(cudaMallocHost((void **)&h_idata, memSize), status);
+ CUDA_SAFE_SYNC_CALL(cudaMallocHost((void **)&h_odata, memSize), status);
+ #endif
+ }
+ else
+ {
+ //pageable memory mode - use malloc
+ h_odata = (unsigned char *)malloc(memSize);
+
+ if (h_odata == 0)
+ {
+ NERV_EXIT_STATUS(status, MAT_CUDA_ERR, "Not enough memory available on host to run test!");
+ }
+ }
+
+ //initialize the memory
+ for (unsigned int i = 0; i < memSize/sizeof(unsigned char); i++)
+ {
+ h_idata[i] = (unsigned char)(i & 0xff);
+ }
+
+ // allocate device memory
+ unsigned char *d_idata;
+ CUDA_SAFE_SYNC_CALL(cudaMalloc((void **) &d_idata, memSize), status);
+
+ //initialize the device memory
+ CUDA_SAFE_SYNC_CALL(cudaMemcpy(d_idata, h_idata, memSize,cudaMemcpyHostToDevice), status);
+
+ //copy data from GPU to Host
+
+ CUDA_SAFE_SYNC_CALL(cudaEventRecord(start, 0), status);
+
+ if (PINNED)
+ {
+ for (unsigned int i = 0; i < MEMCOPY_ITERATIONS; i++)
+ {
+ CUDA_SAFE_SYNC_CALL(cudaMemcpyAsync(h_odata, d_idata, memSize,cudaMemcpyDeviceToHost, 0), status);
+ }
+ }
+ else
+ {
+ for (unsigned int i = 0; i < MEMCOPY_ITERATIONS; i++)
+ {
+ CUDA_SAFE_SYNC_CALL(cudaMemcpy(h_odata, d_idata, memSize,cudaMemcpyDeviceToHost), status);
+ }
+ }
+
+ CUDA_SAFE_SYNC_CALL(cudaEventRecord(stop, 0), status);
+
+ // make sure GPU has finished copying
+ CUDA_SAFE_SYNC_CALL(cudaDeviceSynchronize(), status);
+ //get the the total elapsed time in ms
+
+ CUDA_SAFE_SYNC_CALL(cudaEventElapsedTime(&elapsedTimeInMs, start, stop), status);
+
+
+ //calculate bandwidth in MB/s
+ bandwidthInMBs = (1e3f * memSize * (float)MEMCOPY_ITERATIONS) / (elapsedTimeInMs * (float)(1 << 20));
+ *d2h = bandwidthInMBs;
+
+ /////////////////////////////////////////////////////
+ //copy data from Host to GPU
+ CUDA_SAFE_SYNC_CALL(cudaEventRecord(start, 0), status);
+
+ if (PINNED)
+ {
+ for (unsigned int i = 0; i < MEMCOPY_ITERATIONS; i++)
+ {
+ CUDA_SAFE_SYNC_CALL(cudaMemcpyAsync(d_idata, h_odata, memSize,cudaMemcpyHostToDevice, 0), status);
+ }
+ }
+ else
+ {
+ for (unsigned int i = 0; i < MEMCOPY_ITERATIONS; i++)
+ {
+ CUDA_SAFE_SYNC_CALL(cudaMemcpy(d_idata, h_odata, memSize,cudaMemcpyHostToDevice), status);
+ }
+ }
+
+ CUDA_SAFE_SYNC_CALL(cudaEventRecord(stop, 0), status);
+
+ // make sure GPU has finished copying
+ CUDA_SAFE_SYNC_CALL(cudaDeviceSynchronize(), status);
+ //get the the total elapsed time in ms
+
+ CUDA_SAFE_SYNC_CALL(cudaEventElapsedTime(&elapsedTimeInMs, start, stop), status);
+
+
+ //calculate bandwidth in MB/s
+ bandwidthInMBs = (1e3f * memSize * (float)MEMCOPY_ITERATIONS) / (elapsedTimeInMs * (float)(1 << 20));
+ *h2d = bandwidthInMBs;
+
+ //clean up memory
+ CUDA_SAFE_SYNC_CALL(cudaEventDestroy(stop), status);
+ CUDA_SAFE_SYNC_CALL(cudaEventDestroy(start), status);
+
+ if (PINNED)
+ {
+ CUDA_SAFE_SYNC_CALL(cudaFreeHost(h_idata), status);
+ CUDA_SAFE_SYNC_CALL(cudaFreeHost(h_odata), status);
+ }
+ else
+ {
+ free(h_idata);
+ free(h_odata);
+ }
+
+ CUDA_SAFE_SYNC_CALL(cudaFree(d_idata), status);
+}
+
+int AutoSelectGPU(Device *device, Status *status)
+{
+ //GPU selection is based on largest proportion of free memory.
+
+ int max_id = 0, n_gpu = 0;
+ std::vector<GPUInfo> &gpuinfo_ = device->gpuinfo_;
+
+ cudaGetDeviceCount(&n_gpu);
+
+ if(n_gpu == 0)
+ {
+ NERV_SET_STATUS(status, MAT_CUDA_ERR, "No CUDA devices found");
+ return -1;
+ }
+
+ if (n_gpu > 0)
+ {
+ for (int i = 0; i < gpuinfo_.size(); i++)
+ {
+ if (!gpuinfo_[i].used)
+ {
+ max_id = i;
+ break;
+ }
+ }
+ //find GPU with max free memory
+ for (int n = 1; n < gpuinfo_.size(); n++)
+ {
+ if (!gpuinfo_[n].used && gpuinfo_[n].mem_ratio > gpuinfo_[max_id].mem_ratio)
+ max_id = n;
+ }
+
+
+ std::cerr << "Selected device: " << max_id << " (automatically)" << std::endl;
+
+ std::cerr << "free: " << gpuinfo_[max_id].mem_free/1024/1024 << "M, "
+ << "total: "<< gpuinfo_[max_id].mem_total/1024/1024 << "M, "
+ << "ratio: "<< gpuinfo_[max_id].mem_ratio << " "
+ << "d2h bandwidth: " << gpuinfo_[max_id].d2h_bandwidth << "MB/s, "
+ << "h2d bandwidth: "<< gpuinfo_[max_id].h2d_bandwidth << "MB/s" << std::endl;
+
+ cudaSetDevice(max_id);
+ //initialize the CUBLAS
+ //cublasInit();
+ //cublasHandle_t *cublas_handle = nerv_get_cublas_handle();
+ //std::cerr << "cublasHandle_t: " << cublas_handle << std::endl;
+ //cublasCreate(cublas_handle);
+
+ //create the context
+ cudaError_t e;
+ e = cudaThreadSynchronize(); //deprecated, but for legacy not cudaDeviceSynchronize
+ if(e != cudaSuccess) {
+ std::cerr << "Failed to create CUDA context on a GPU." << std::endl;
+ }
+ }
+
+ gpuinfo_[max_id].used = true;
+ NERV_SET_STATUS(status, NERV_NORMAL, 0);
+ return max_id;
+}
+
+void SelectGPU(Device *device, int gpu_id, Status *status)
+{
+
+ std::vector<GPUInfo> &gpuinfo_ = device->gpuinfo_;
+ int n_gpu = 0;
+ cudaGetDeviceCount(&n_gpu);
+ if(gpu_id >= n_gpu)
+ {
+ NERV_EXIT_STATUS(status, MAT_CUDA_ERR, "Cannot select GPU CUDA capable cards!");
+ std::cerr << "Cannot select GPU " << gpu_id
+ << ", detected " << n_gpu << " CUDA capable cards!" << std::endl;
+ }
+
+
+ std::cerr << "Selected device: " << gpu_id << " (manually)";
+
+ std::cerr << "free: " << gpuinfo_[gpu_id].mem_free/1024/1024 << "M, "
+ << "total: "<< gpuinfo_[gpu_id].mem_total/1024/1024 << "M, "
+ << "ratio: "<< gpuinfo_[gpu_id].mem_ratio << " "
+ << "d2h bandwidth: " << gpuinfo_[gpu_id].d2h_bandwidth << "MB/s, "
+ << "h2d bandwidth: "<< gpuinfo_[gpu_id].h2d_bandwidth << "MB/s" << std::endl;
+
+ CUDA_SAFE_SYNC_CALL(cudaSetDevice(gpu_id), status);
+ //initialize the CUBLAS
+ //CUBLAS_SAFE_SYNC_CALL(cublasInit(), status);
+ //cublasHandle_t *cublas_handle = nerv_get_cublas_handle();
+ //cublasCreate(cublas_handle);
+
+ //create the context
+ cudaError_t e;
+ e = cudaThreadSynchronize(); //deprecated, but for legacy not cudaDeviceSynchronize
+ if(e != cudaSuccess) {
+ NERV_EXIT_STATUS(status, MAT_CUDA_ERR, "Failed to create CUDA context on a GPU.");
+ std::cerr << "Failed to create CUDA context on a GPU.";
+ }
+
+ gpuinfo_[gpu_id].used = true;
+
+ NERV_SET_STATUS(status, NERV_NORMAL, 0);
+}
+
+void Initialize(Device *device, Status *status)
+{
+ // Check that we have at least one gpu
+ int n_gpu = 0;
+ cudaGetDeviceCount(&n_gpu);
+ if(n_gpu == 0)
+ NERV_EXIT_STATUS(status, MAT_CUDA_ERR, "No CUDA devices found");
+
+ std::vector<GPUInfo> &gpuinfo_ = device->gpuinfo_;
+ gpuinfo_.resize(n_gpu);
+
+ std::cerr << "gpu information ..." << std::endl;
+
+ // Get ratios of memory use, if possible
+ for(int n = 0; n < n_gpu; n++)
+ {
+ int ret = cudaSetDevice(n);
+ switch(ret)
+ {
+ case cudaSuccess : {
+ //create the CUDA context for the thread
+ //cudaThreadSynchronize(); //deprecated, but for legacy not cudaDeviceSynchronize
+ cudaDeviceSynchronize();
+ //get GPU name
+ char name[STRLEN];
+ DeviceGetName(name,STRLEN,n, status);
+ //get GPU memory stats
+ size_t free, total;
+ float d2h, h2d;
+
+ GetFreeMemory(&free, &total, status);
+ GetBandwidth(n, &d2h, &h2d, status);
+
+ gpuinfo_[n].gpuid = n;
+ gpuinfo_[n].d2h_bandwidth = d2h;
+ gpuinfo_[n].h2d_bandwidth = h2d;
+ gpuinfo_[n].mem_free = free;
+ gpuinfo_[n].mem_total = total;
+ gpuinfo_[n].mem_ratio = free/(float)total;
+ gpuinfo_[n].used = false;
+
+ std::cerr << "gpu: " << n << " ==> "
+ << "free: " << free/1024/1024 << "M, "
+ << "total: "<< total/1024/1024 << "M, "
+ << "ratio: "<< free/(float)total << " "
+ << "d2h bandwidth: " << d2h << "MB/s, "
+ << "h2d bandwidth: "<< h2d << "MB/s" << std::endl;
+
+ //destroy the CUDA context for the thread
+ //cudaThreadExit(); //deprecated, but for legacy reason not cudaDeviceReset
+ } break;
+
+ #if (CUDA_VERSION > 3020)
+ case cudaErrorDeviceAlreadyInUse :
+ std::cerr << "cudaSetDevice(" << n << "): "
+ << "Device cannot be accessed, used EXCLUSIVE-THREAD mode..." << std::endl;
+ break;
+ #endif
+ case cudaErrorInvalidDevice :
+ std::cerr << "cudaSetDevice(" << n << "): "
+ << "Device cannot be accessed, not a VALID CUDA device!" << std::endl;
+ break;
+ default :
+ std::cerr << "cudaSetDevice(" << n << "): "
+ << "returned " << ret << ", "
+ << cudaGetErrorString((cudaError_t)ret) << std::endl;
+ }
+ }
+ cudaDeviceReset();
+ printf("Result = PASS\n");
+ NERV_SET_STATUS(status, NERV_NORMAL, 0);
+}
+
+
+
+
+
+} // extern
diff --git a/fastnn/device/Device.h b/fastnn/device/Device.h
new file mode 100644
index 0000000..9a80128
--- /dev/null
+++ b/fastnn/device/Device.h
@@ -0,0 +1,51 @@
+
+#ifndef NERV_FASTNN_EXAMPLE_H
+#define NERV_FASTNN_EXAMPLE_H
+
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "matrix/matrix.h"
+#include "stdbool.h"
+#include "../../nerv/lib/matrix/cuda_helper.h"
+#include "../../nerv/lib/common.h"
+#include "../../nerv/lib/matrix/cumatrix.h"
+#define STRLEN 1024
+
+
+
+typedef struct GPUInfo GPUInfo;
+
+typedef struct MPIGPUInfo MPIGPUInfo;
+
+typedef struct Device Device;
+
+CuContext* CuContext_new();
+CuContext* CuContext_newWithId(long id);
+long CuContext_id(CuContext *context);
+void CuContext_destroy(CuContext *context);
+
+
+Device* Device_new();
+Device* Device_newWithId(long);
+long Device_id(Device *device);
+void Device_detroy(Device *device);
+void Initialize(Device *device, Status *status);
+void GetFreeMemory(size_t* free, size_t* total, Status *status);
+void DeviceGetName(char* name, int len, int dev, Status *status);
+void GetBandwidth(int gpu_idx, float *d2h, float *h2d, Status *status);
+int AutoSelectGPU(Device *device, Status *status);
+void SelectGPU(Device *device, int gpu_id, Status *status);
+
+
+
+
+#ifdef __cplusplus
+} // closing brace for extern "C"
+
+#endif // end Device
+
+#endif
+
diff --git a/fastnn/device/device.c b/fastnn/device/device.c
new file mode 100644
index 0000000..71d6ec1
--- /dev/null
+++ b/fastnn/device/device.c
@@ -0,0 +1,178 @@
+
+#include <stdio.h>
+#include <stdlib.h>
+
+#include <lua.h>
+#include <lualib.h>
+#include <luaT/luaT.h>
+
+#include "../threads/lib/luaTHRD.h"
+#include "Device.h"
+
+
+const char *fastnn_device_tname = "fastnn.CDevice";
+
+
+static int device_new(lua_State *L)
+{
+ Device *device = NULL;
+ if(lua_gettop(L) == 0)
+ {
+ device = Device_new();
+ }
+ else if(lua_gettop(L) == 1)
+ {
+ long id = luaL_checkinteger(L, 1);
+ device = Device_newWithId(id);
+ }
+ else
+ luaL_error(L, "device: device new invalid arguments");
+ if (!device)
+ luaL_error(L, "device: device failed");
+
+ luaTHRD_pushudata(L, device, fastnn_device_tname);
+
+ return 1;
+}
+
+static int device_init(lua_State *L)
+{
+ Device *device = luaTHRD_checkudata(L, 1, fastnn_device_tname);
+ Status status;
+ Initialize(device, &status);
+ NERV_LUA_CHECK_STATUS(L, status);
+ return 0;
+}
+
+static int device_select_gpu(lua_State *L)
+{
+ Device *device = luaTHRD_checkudata(L, 1, fastnn_device_tname);
+
+ Status status;
+ if(lua_gettop(L) == 2)
+ {
+ int gpuid = luaL_checkinteger(L, 2);
+ SelectGPU(device, gpuid, &status);
+ NERV_LUA_CHECK_STATUS(L, status);
+ return 0;
+ }
+ else if(lua_gettop(L) == 1)
+ {
+ int gpuid = AutoSelectGPU(device, &status);
+ NERV_LUA_CHECK_STATUS(L, status);
+ lua_pushinteger(L, gpuid);
+ return 1;
+ }
+ else
+ luaL_error(L, "device: device select gpu failed");
+}
+
+static int device_id(lua_State *L)
+{
+ Device *device = luaTHRD_checkudata(L, 1, fastnn_device_tname);
+ lua_pushinteger(L, Device_id(device));
+ return 1;
+}
+
+static int device_tostring(lua_State *L)
+{
+ char str[STRLEN];
+ Device* device = luaTHRD_checkudata(L, 1, fastnn_device_tname);
+ snprintf(str, STRLEN, "%s <%lx>", fastnn_device_tname, Device_id(device));
+ lua_pushstring(L, str);
+ return 1;
+}
+
+static int device_destroy(lua_State *L)
+{
+ Device *device = luaTHRD_checkudata(L, 1, fastnn_device_tname);
+ Device_destroy(device);
+ //printf("device_destroy ... end\n");
+ return 0;
+}
+
+
+//////////////////////////////////////////////
+
+static int context_new(lua_State *L)
+{
+ CuContext *context = NULL;
+ if(lua_gettop(L) == 0)
+ {
+ context = CuContext_new();
+ }
+ else if(lua_gettop(L) == 1)
+ {
+ long id = luaL_checkinteger(L, 1);
+ context = CuContext_newWithId(id);
+ }
+ else
+ luaL_error(L, "device: context new invalid arguments");
+ if (!context)
+ luaL_error(L, "device: context failed");
+
+ luaTHRD_pushudata(L, context, nerv_context_tname);
+
+ return 1;
+}
+
+
+static int context_id(lua_State *L)
+{
+ CuContext *context = luaTHRD_checkudata(L, 1, nerv_context_tname);
+ lua_pushinteger(L, CuContext_id(context));
+ return 1;
+}
+
+static int context_tostring(lua_State *L)
+{
+ char str[STRLEN];
+ CuContext* context = luaTHRD_checkudata(L, 1, nerv_context_tname);
+ snprintf(str, STRLEN, "%s <%lx>", nerv_context_tname, CuContext_id(context));
+ lua_pushstring(L, str);
+ return 1;
+}
+
+static int context_destroy(lua_State *L)
+{
+ CuContext* context = luaTHRD_checkudata(L, 1, nerv_context_tname);
+ CuContext_destroy(context);
+ return 0;
+}
+
+
+static const struct luaL_Reg device__ [] = {
+ {"new", device_new},
+ {"__tostring", device_tostring},
+ {"id", device_id},
+ {"init", device_init},
+ {"select_gpu", device_select_gpu},
+ {"free", device_destroy},
+ {NULL, NULL}
+};
+
+static const struct luaL_Reg context__ [] = {
+ {"new", context_new},
+ {"__tostring", context_tostring},
+ {"id", context_id},
+ {"free", context_destroy},
+ {NULL, NULL}
+};
+
+
+void fastnn_init_device(lua_State *L)
+{
+ luaT_newmetatable(L, fastnn_device_tname, NULL, device_new, device_destroy, NULL);
+ luaL_register(L, NULL, device__);
+ lua_pop(L, 1);
+}
+
+void fastnn_init_context(lua_State *L)
+{
+ luaT_newmetatable(L, nerv_context_tname, NULL, context_new, context_destroy, NULL);
+ luaL_register(L, NULL, context__);
+ lua_pop(L, 1);
+}
+
+
+
diff --git a/fastnn/device/device.lua b/fastnn/device/device.lua
new file mode 100644
index 0000000..d3dea73
--- /dev/null
+++ b/fastnn/device/device.lua
@@ -0,0 +1,6 @@
+
+local C = require 'libfastnn'
+
+fastnn.CDevice = C.CDevice
+
+
diff --git a/fastnn/example/asgd_data_trainer.lua b/fastnn/example/asgd_data_trainer.lua
new file mode 100644
index 0000000..33d579a
--- /dev/null
+++ b/fastnn/example/asgd_data_trainer.lua
@@ -0,0 +1,405 @@
+require 'fastnn'
+require 'libhtkio'
+require 'threads'
+
+dofile("fastnn/fastnn_baseline.lua")
+
+env = string.format([[
+package.path="/home/slhome/wd007/.luarocks/share/lua/5.1/?.lua;/home/slhome/wd007/.luarocks/share/lua/5.1/?/init.lua;/sgfs/users/wd007/src/nerv/install/share/lua/5.1/?.lua;/sgfs/users/wd007/src/nerv/install/share/lua/5.1/?/init.lua;"..package.path;
+package.cpath="/home/slhome/wd007/.luarocks/lib/lua/5.1/?.so;/sgfs/users/wd007/src/nerv/install/lib/lua/5.1/?.so;"..package.cpath
+local k,l,_=pcall(require,"luarocks.loader") _=k and l.add_context("nerv","scm-1")
+]])
+
+
+
+local data_thread_code = [[
+%s
+
+require 'nerv'
+require 'fastnn'
+dofile("fastnn/fastnn_baseline.lua")
+os.execute("export MALLOC_CHECK_=0")
+
+local thread_idx = %d
+local example_repo_shareid = %d
+local data_mutex_shareid = %d
+local feat_repo_shareid = %d
+local gpu_shareid = %d
+local batch_size = %d
+local bp = %d
+local scp_file = '%s'
+
+local share_mutex = threads.Mutex(data_mutex_shareid)
+local share_example_repo = fastnn.CExamplesRepo(example_repo_shareid, true)
+local share_gpu = fastnn.CDevice(gpu_shareid)
+
+--print(thread_idx)
+--print(share_mutex)
+--print(share_gpu)
+--print(share_example_repo)
+
+if bp == 0 then
+ bp = false
+else
+ bp = true
+end
+gconf.randomize = bp
+--print(gconf.randomize)
+
+share_mutex:lock()
+local gpuid = share_example_repo:get_gpuid()
+if gpuid < 0 then
+ gpuid = share_gpu:select_gpu()
+ share_example_repo:set_gpuid(gpuid)
+else
+ share_gpu:select_gpu(gpuid)
+end
+
+nerv.info_stderr("thread %%d loading transf ...", thread_idx)
+local param_transf_repo = nerv.ParamRepo()
+param_transf_repo:import(gconf.transf, nil, gconf)
+local transf_node_repo = make_transf_node_repo(param_transf_repo)
+local transf_layer_repo = make_transf_link_repo(transf_node_repo, param_transf_repo)
+local transf = transf_layer_repo:get_layer("global_transf")
+share_mutex:unlock()
+
+local feat_id = get_feat_id()
+
+local buffer = make_buffer(make_readers(scp_file, transf_layer_repo, feat_repo_shareid, data_mutex_shareid))
+
+ local t = 1;
+ for data in buffer.get_data, buffer do
+ local example = fastnn.Example:PrepareData(data, nil, feat_id)
+ --print(string.format("Accept NO.%%d %%s", t, example)); t = t+1;
+ share_example_repo:accept(example)
+ --print("share_example_repo:accept")
+
+ -- collect garbage in-time to save GPU memory
+ collectgarbage("collect")
+ end
+ share_example_repo:done()
+-- print("share_example_repo:done")
+
+]]
+
+
+train_thread_code = [[
+%s
+
+require 'nerv'
+require 'fastnn'
+dofile("fastnn/fastnn_baseline.lua")
+os.execute("export MALLOC_CHECK_=0")
+
+local thread_idx = %d
+local example_repo_shareid = %d
+local data_mutex_shareid = %d
+local master_shareid = %d
+local gpu_shareid = %d
+local xent_shareid = %d
+local batch_size = %d
+local lrate = %f
+local bp = %d
+local nnet_in = '%s'
+local nnet_out = '%s'
+
+local share_example_repo = fastnn.CExamplesRepo(example_repo_shareid, true)
+local share_mutex = threads.Mutex(data_mutex_shareid)
+local share_master = fastnn.ModelSync(master_shareid)
+local share_gpu = fastnn.CDevice(gpu_shareid)
+local share_xent = fastnn.CXent(xent_shareid)
+
+if bp == 0 then
+ bp = false
+else
+ bp = true
+end
+
+gconf.randomize = bp
+gconf.lrate = lrate
+gconf.batch_size = batch_size
+gconf.network[1] = nnet_in
+nerv.info_stderr("input network: %%s", gconf.network[1])
+nerv.info_stderr(gconf.randomize)
+nerv.info_stderr("input batch_size: %%d", gconf.batch_size)
+nerv.info_stderr("input lrate: %%f", gconf.lrate)
+
+share_mutex:lock()
+local gpuid = share_example_repo:get_gpuid()
+if gpuid < 0 then
+ gpuid = share_gpu:select_gpu()
+ share_example_repo:set_gpuid(gpuid)
+else
+ share_gpu:select_gpu(gpuid)
+end
+
+nerv.context = nerv.CCuContext()
+--print(nerv.context)
+
+
+nerv.info_stderr("thread %%d loading network ...", thread_idx)
+local param_network_repo = nerv.ParamRepo()
+param_network_repo:import(gconf.network, nil, gconf)
+local network_node_repo = make_network_node_repo(param_network_repo)
+local network_layer_repo = make_network_link_repo(network_node_repo, param_network_repo)
+local network = get_network(network_layer_repo)
+share_mutex:unlock()
+
+
+ local input_order = get_input_order()
+
+ -- initialize the network
+ network:init(gconf.batch_size)
+ gconf.cnt = 0
+ err_input = {nerv.CuMatrixFloat(gconf.batch_size, 1)}
+ err_input[1]:fill(1)
+
+ share_master:Initialize(network)
+ share_master:SyncInc()
+
+ for example in share_example_repo.provide, share_example_repo do
+
+ gconf.cnt = gconf.cnt + 1
+ if gconf.cnt == 2000 then
+ print_stat(network_node_repo)
+ gconf.cnt = 0
+ end
+
+ local input = {}
+ local n = example:size()
+ for i = 0, n-1 do
+ table.insert(input, example:at(i))
+ end
+
+ local output = {nerv.CuMatrixFloat(gconf.batch_size, 1)}
+ err_output = {input[1]:create()}
+ network:propagate(input, output)
+
+ if bp then
+ network:back_propagate(err_input, err_output, input, output)
+ network:gradient(err_input, input, output)
+
+ share_master:LockModel()
+ share_master:WeightToD(network)
+ network:update_gradient()
+ share_master:WeightFromD(network)
+ share_master:UnLockModel()
+ end
+
+ -- collect garbage in-time to save GPU memory
+ collectgarbage("collect")
+ end
+
+ --print_stat(network_node_repo)
+ local ce_crit = network_node_repo:get_layer("ce_crit")
+ local xent = fastnn.CXent(ce_crit.total_frames, ce_crit.total_correct, ce_crit.total_ce, ce_crit.total_ce)
+
+ share_master:LockModel()
+ share_xent:add(xent)
+ share_master:SyncDec()
+ --print(string.format("ThreadCount: %%d", share_master:ThreadCount()))
+ if share_master:ThreadCount() == 0 and bp then
+ share_master:WeightToD(network)
+ local fname = string.format("%%s_tr%%.3f",
+ nnet_out, frame_acc(share_xent))
+ nerv.info_stderr("writing back %%s ...", fname)
+ network:get_params():export(fname, nil)
+ end
+ share_master:UnLockModel()
+]]
+
+function get_data_thread(data_thread_code, env, thread_idx, example_repo_shareid,
+ data_mutex_shareid, feat_repo_shareid, gpu_shareid,
+ batch_size, bp, scp_file)
+ return string.format(data_thread_code, env, thread_idx, example_repo_shareid,
+ data_mutex_shareid, feat_repo_shareid, gpu_shareid,
+ batch_size, bp, scp_file)
+end
+
+function get_train_thread(train_thread_code, env, thread_idx, example_repo_shareid,
+ data_mutex_shareid, master_shareid, gpu_shareid, xent_shareid,
+ batch_size, lrate, bp, nnet_in, nnet_out)
+ return string.format(train_thread_code, env, thread_idx, example_repo_shareid,
+ data_mutex_shareid, master_shareid, gpu_shareid, xent_shareid,
+ batch_size, lrate, bp, nnet_in, nnet_out)
+end
+
+function trainer(batch_size, lrate, bp, scp_file, nnet_in, nnet_out, num_threads)
+ local train_threads={}
+ local trainer = {}
+ local data_threads = {}
+ local data = {}
+ local num_threads=num_threads
+
+ local data_mutex = threads.Mutex()
+ local data_mutex_shareid = data_mutex:id()
+
+ local master = fastnn.CModelSync()
+ local master_shareid = master:id()
+ --print(master)
+
+ local xent = fastnn.CXent()
+ local xent_shareid = xent:id()
+ --print(xent)
+
+ local gpu = fastnn.CDevice()
+ local gpu_shareid = gpu:id()
+ --print(gpu_shareid)
+ gpu:init()
+
+ local example_repo = {}
+ local example_repo_shareid = {}
+
+ local feat_repo = nerv.TNetFeatureRepo(scp_file, gconf.htk_conf, gconf.frm_ext)
+ local feat_repo_shareid = feat_repo:id()
+
+ for i=1,num_threads,1 do
+ example_repo[i] = fastnn.CExamplesRepo(128, false)
+ example_repo_shareid[i] = example_repo[i]:id()
+
+ data_threads[i] = get_data_thread(data_thread_code, env, i, example_repo_shareid[i],
+ data_mutex_shareid, feat_repo_shareid, gpu_shareid,
+ batch_size, bp, scp_file)
+
+ train_threads[i] = get_train_thread(train_thread_code, env, i, example_repo_shareid[i],
+ data_mutex_shareid, master_shareid, gpu_shareid, xent_shareid,
+ batch_size, lrate, bp, nnet_in, nnet_out)
+ --print(train_threads[i])
+ data[i] = threads.Thread(data_threads[i])
+ trainer[i] = threads.Thread(train_threads[i])
+ end
+
+ nerv.info_stderr('| waiting for thread...')
+
+ for i=1,num_threads,1 do
+ data[i]:free()
+ trainer[i]:free()
+ end
+
+ print_xent(xent)
+
+ nerv.info_stderr('| all thread finished!')
+
+ return frame_acc(xent)
+end
+
+function get_filename(fname)
+ return string.gsub((string.gsub(fname, "(.*/)(.*)", "%2")),"(.*)%..*", "%1")
+end
+
+function do_sds(tr_scp, sds_scp, sds_rate)
+ math.randomseed(os.time())
+ local scp_file = io.open(tr_scp, "r")
+ local sds_file = io.open(sds_scp, "w")
+ for line in scp_file:lines() do
+ rate = math.random()
+ if (rate < sds_rate) then
+ sds_file:write(line.."\n")
+ end
+ end
+ scp_file:close()
+ sds_file:close()
+end
+
+function print_tag(iter)
+ io.stderr:write(string.format("########################################################\n"))
+ io.stderr:write(string.format("# NN TRAINING ITERATION:%d, %s\n", iter, os.date()))
+ io.stderr:write(string.format("########################################################\n"))
+end
+
+
+start_halving_inc = 0.5
+halving_factor = 0.8
+end_halving_inc = 0.1
+min_iter = 1
+max_iter = 20
+min_halving = 0
+gconf.batch_size = 256
+pf0 = get_filename(gconf.network[1])
+nnet_in = gconf.network[1]
+nnet_out = ""
+sds_scp = "tr_sds_"..string.format("%.4d", math.random()*10000)..".scp" --"tr_sds.scp"
+sds_factor = 0.4
+num_threads = 1
+global_option = nil
+
+print_gconf()
+os.execute("export MALLOC_CHECK_=0")
+
+-- training begin
+nerv.info_stderr("begin initial cross validation")
+local accu_best = trainer(gconf.batch_size, gconf.lrate, 0,
+ gconf.cv_scp, nnet_in, nil, num_threads)
+local do_halving = false
+local accu_new = accu_best
+
+nerv.info_stderr("initial cross validation: %.3f\n", accu_best)
+
+for i = 1, max_iter do
+
+ if accu_new >= accu_best then
+ local sds_rate = math.cos((i-1)*11.0/180*math.pi)
+ if (sds_rate <= sds_factor) then
+ sds_rate = sds_factor
+ end
+ nerv.info_stderr("iteration %d sds_rate: %.6f", i, sds_rate)
+ do_sds(gconf.tr_scp, sds_scp, sds_rate)
+ end
+
+ nnet_out=pf0.."_iter"..i
+ --print(nnet_out)
+ print_tag(i)
+ nerv.info_stderr("[NN] begin iteration %d learning_rate: %.3f batch_size: %d.", i, gconf.lrate, gconf.batch_size)
+ local accu_tr = trainer(gconf.batch_size, gconf.lrate, 1,
+ sds_scp, nnet_in, nnet_out, num_threads)
+ nerv.info_stderr("[TR] end iteration %d frame_accuracy: %.3f.\n", i, accu_tr)
+ os.execute("sleep " .. 3)
+
+ nnet_out = nnet_out.."_tr"..accu_tr
+ accu_new = trainer(gconf.batch_size, gconf.lrate, 0,
+ gconf.cv_scp, nnet_out, nil, num_threads)
+ nerv.info_stderr("[CV] end iteration %d frame_accuracy: %.3f.\n\n", i, accu_new)
+ os.execute("sleep " .. 3)
+
+ local nnet_tmp = string.format("%s_%s_iter_%d_lr%f_tr%.3f_cv%.3f",
+ pf0,
+ os.date("%Y%m%d%H%M%S"),
+ i, gconf.lrate, accu_tr, accu_new)
+
+ -- TODO: revert the weights
+ local accu_diff = accu_new - accu_best
+ local cmd
+ if accu_new > accu_best then
+ accu_best = accu_new
+ nnet_in = nnet_tmp
+ gconf.batch_size = gconf.batch_size + 128
+ if gconf.batch_size > 1024 then
+ gconf.batch_size = 1024
+ end
+ else
+ -- reject
+ nnet_tmp = nnet_tmp.."_rejected"
+ do_halving = true
+ end
+ cmd = "mv "..nnet_out.." "..nnet_tmp
+ os.execute(cmd)
+
+ if do_halving and accu_diff < end_halving_inc and i > min_iter then
+ break;
+ end
+
+ if accu_diff < start_halving_inc and i >= min_halving then
+ do_halving = true
+ end
+
+ if do_halving then
+ gconf.lrate = gconf.lrate * halving_factor
+ halving_factor = halving_factor - 0.025
+ if halving_factor < 0.6 then
+ halving_factor = 0.6
+ end
+ end
+ nerv.info_stderr("iteration %d done!", i)
+end
+
+
diff --git a/fastnn/example/asgd_sds_trainer.lua b/fastnn/example/asgd_sds_trainer.lua
new file mode 100644
index 0000000..cf1c7a6
--- /dev/null
+++ b/fastnn/example/asgd_sds_trainer.lua
@@ -0,0 +1,343 @@
+
+NERV_ROOT = "/sgfs/users/wd007/src/nerv-2"
+
+env = string.format([[
+package.path="/home/slhome/wd007/.luarocks/share/lua/5.1/?.lua;/home/slhome/wd007/.luarocks/share/lua/5.1/?/init.lua;%s/install/share/lua/5.1/?.lua;%s/install/share/lua/5.1/?/init.lua;"..package.path;
+package.cpath="/home/slhome/wd007/.luarocks/lib/lua/5.1/?.so;%s/install/lib/lua/5.1/?.so;"..package.cpath
+local k,l,_=pcall(require,"luarocks.loader") _=k and l.add_context("nerv","scm-1")
+]], NERV_ROOT, NERV_ROOT, NERV_ROOT)
+
+loadstring(env)()
+
+require 'nerv'
+
+require 'fastnn'
+require 'libhtkio'
+require 'threads'
+
+dofile("fastnn/example/fastnn_baseline.lua")
+
+
+
+train_thread_code = [[
+%s
+
+require 'nerv'
+require 'fastnn'
+require 'libhtkio'
+
+dofile("fastnn/example/fastnn_baseline.lua")
+os.execute("export MALLOC_CHECK_=0")
+
+local thread_idx = %d
+local feat_repo_shareid = %d
+local data_mutex_shareid = %d
+local master_shareid = %d
+local gpu_shareid = %d
+local xent_shareid = %d
+local batch_size = %d
+local lrate = %f
+local bp = %d
+local scp_file = '%s'
+local nnet_in = '%s'
+local nnet_out = '%s'
+
+local share_mutex = threads.Mutex(data_mutex_shareid)
+local share_master = fastnn.ModelSync(master_shareid)
+local share_gpu = fastnn.CDevice(gpu_shareid)
+local share_xent = fastnn.CXent(xent_shareid)
+
+if bp == 0 then
+ bp = false
+else
+ bp = true
+ gconf.tr_scp = scp_file
+end
+
+share_mutex:lock()
+
+gconf.randomize = bp
+gconf.lrate = lrate
+gconf.batch_size = batch_size
+gconf.initialized_param[2] = nnet_in
+nerv.info_stderr("input network: %%s", gconf.initialized_param[2])
+--nerv.info_stderr(gconf.randomize)
+nerv.info_stderr("input batch_size: %%d", gconf.batch_size)
+nerv.info_stderr("input scp_file: %%s", scp_file)
+nerv.info_stderr("input lrate: %%f", gconf.lrate)
+
+
+
+share_gpu:select_gpu()
+
+nerv.context = nerv.CCuContext()
+--print(nerv.context)
+
+nerv.info_stderr("thread %%d loading parameters ...", thread_idx)
+local param_repo = nerv.ParamRepo()
+param_repo:import(gconf.initialized_param, nil, gconf)
+local layer_repo = make_layer_repo(param_repo)
+local network = get_network(layer_repo)
+local global_transf = get_global_transf(layer_repo)
+
+share_mutex:unlock()
+
+local buffer = make_buffer(make_readers(nil, layer_repo, feat_repo_shareid, data_mutex_shareid))
+
+local input_order = get_input_order()
+
+ -- initialize the network
+ network:init(gconf.batch_size)
+ gconf.cnt = 0
+ err_input = {nerv.CuMatrixFloat(gconf.batch_size, 1)}
+ err_input[1]:fill(1)
+
+ share_master:Initialize(network)
+ share_master:SyncInc()
+
+ for data in buffer.get_data, buffer do
+
+ gconf.cnt = gconf.cnt + 1
+ if gconf.cnt == 2000 then
+ print_stat(layer_repo)
+ gconf.cnt = 0
+ end
+
+ local input = {}
+
+ for i, e in ipairs(input_order) do
+ local id = e.id
+ if data[id] == nil then
+ nerv.error("input data %%s not found", id)
+ end
+ local transformed
+ if e.global_transf then
+ transformed = nerv.speech_utils.global_transf(data[id],
+ global_transf,
+ gconf.frm_ext or 0, 0,
+ gconf)
+ else
+ transformed = data[id]
+ end
+ table.insert(input, transformed)
+ end
+
+ local output = {nerv.CuMatrixFloat(gconf.batch_size, 1)}
+ err_output = {}
+ for i = 1, #input do
+ table.insert(err_output, input[i]:create())
+ end
+
+ network:propagate(input, output)
+
+ if bp then
+ network:back_propagate(err_input, err_output, input, output)
+ network:gradient(err_input, input, output)
+
+ share_master:LockModel()
+ share_master:WeightToD(network)
+ network:update_gradient()
+ -- network:update(err_input, input, output)
+ share_master:WeightFromD(network)
+ share_master:UnLockModel()
+ end
+
+ -- collect garbage in-time to save GPU memory
+ collectgarbage("collect")
+ end
+
+ --print_stat(network_node_repo)
+ local ce_crit = layer_repo:get_layer("ce_crit")
+ local xent = fastnn.CXent(ce_crit.total_frames, ce_crit.total_correct, ce_crit.total_ce, ce_crit.total_ce)
+
+ share_master:LockModel()
+ share_xent:add(xent)
+ share_master:SyncDec()
+ --print(string.format("ThreadCount: %%d", share_master:ThreadCount()))
+ if share_master:ThreadCount() == 0 and bp then
+ share_master:WeightToD(network)
+ local fname = string.format("%%s_tr%%.3f",
+ nnet_out, frame_acc(share_xent))
+ nerv.info_stderr("writing back %%s ...", fname)
+ network:get_params():export(fname, nil)
+ end
+ share_master:UnLockModel()
+]]
+
+
+function get_train_thread(train_thread_code, env, thread_idx, feat_repo_shareid,
+ data_mutex_shareid, master_shareid, gpu_shareid, xent_shareid,
+ batch_size, lrate, bp, scp_file, nnet_in, nnet_out)
+ return string.format(train_thread_code, env, thread_idx, feat_repo_shareid,
+ data_mutex_shareid, master_shareid, gpu_shareid, xent_shareid,
+ batch_size, lrate, bp, scp_file, nnet_in, nnet_out)
+end
+
+function trainer(batch_size, lrate, bp, scp_file, nnet_in, nnet_out, num_threads)
+ local train_threads={}
+ local trainer = {}
+ local num_threads=num_threads
+
+ local data_mutex = threads.Mutex()
+ local data_mutex_shareid = data_mutex:id()
+
+ local master = fastnn.CModelSync()
+ local master_shareid = master:id()
+ --print(master)
+
+ local xent = fastnn.CXent()
+ local xent_shareid = xent:id()
+ --print(xent)
+
+ local gpu = fastnn.CDevice()
+ local gpu_shareid = gpu:id()
+ --print(gpu_shareid)
+ gpu:init()
+
+ local feat_repo = nerv.TNetFeatureRepo(scp_file, gconf.htk_conf, gconf.frm_ext)
+ local feat_repo_shareid = feat_repo:id()
+
+ for i=1,num_threads,1 do
+
+ train_threads[i] = get_train_thread(train_thread_code, env, i, feat_repo_shareid,
+ data_mutex_shareid, master_shareid, gpu_shareid, xent_shareid,
+ batch_size, lrate, bp, scp_file, nnet_in, nnet_out)
+ --print(train_threads[i])
+ trainer[i] = threads.Thread(train_threads[i])
+ end
+
+ nerv.info_stderr('| waiting for thread...')
+
+ for i=1,num_threads,1 do
+ trainer[i]:free()
+ end
+
+ print_xent(xent)
+
+ nerv.info_stderr('| all thread finished!')
+
+ return frame_acc(xent)
+end
+
+function get_filename(fname)
+ return string.gsub((string.gsub(fname, "(.*/)(.*)", "%2")),"(.*)%..*", "%1")
+end
+
+function do_sds(tr_scp, sds_scp, sds_rate)
+ math.randomseed(os.time())
+ local scp_file = io.open(tr_scp, "r")
+ local sds_file = io.open(sds_scp, "w")
+ for line in scp_file:lines() do
+ rate = math.random()
+ if (rate < sds_rate) then
+ sds_file:write(line.."\n")
+ end
+ end
+ scp_file:close()
+ sds_file:close()
+end
+
+function print_tag(iter)
+ io.stderr:write(string.format("########################################################\n"))
+ io.stderr:write(string.format("# NN TRAINING ITERATION:%d, %s\n", iter, os.date()))
+ io.stderr:write(string.format("########################################################\n"))
+end
+
+
+start_halving_inc = 0.5
+halving_factor = 0.8
+end_halving_inc = 0.1
+min_iter = 1
+max_iter = 20
+min_halving = 0
+gconf.batch_size = 256
+pf0 = get_filename(gconf.initialized_param[2])
+nnet_in = gconf.initialized_param[2]
+nnet_out = ""
+sds_scp = "tr_sds_"..string.format("%.4d", math.random()*10000)..".scp" --"tr_sds.scp"
+sds_factor = 0.4
+num_threads = 2
+global_option = nil
+
+os.execute("export MALLOC_CHECK_=0")
+print_gconf()
+
+-- training begin
+nerv.info_stderr("begin initial cross validation")
+accu_best = trainer(gconf.batch_size, gconf.lrate, 0,
+ gconf.cv_scp, nnet_in, "", num_threads)
+local do_halving = false
+local accu_new = accu_best
+
+nerv.info_stderr("initial cross validation: %.3f\n", accu_best)
+
+for i = 1, max_iter do
+
+ if accu_new >= accu_best then
+ local sds_rate = math.cos((i-1)*11.0/180*math.pi)
+ if (sds_rate <= sds_factor) then
+ sds_rate = sds_factor
+ end
+ nerv.info_stderr("iteration %d sds_rate: %.6f", i, sds_rate)
+ do_sds(gconf.tr_scp, sds_scp, sds_rate)
+ end
+
+ nnet_out=pf0.."_iter"..i
+ --print(nnet_out)
+ print_tag(i)
+ nerv.info_stderr("[NN] begin iteration %d learning_rate: %.3f batch_size: %d.", i, gconf.lrate, gconf.batch_size)
+ accu_tr = trainer(gconf.batch_size, gconf.lrate, 1,
+ sds_scp, nnet_in, nnet_out, num_threads)
+ collectgarbage("collect")
+ nerv.info_stderr("[TR] end iteration %d frame_accuracy: %.3f.\n", i, accu_tr)
+ os.execute("sleep " .. 3)
+
+ nnet_out = nnet_out.."_tr"..accu_tr
+ accu_new = trainer(gconf.batch_size, gconf.lrate, 0,
+ gconf.cv_scp, nnet_out, "", num_threads)
+ collectgarbage("collect")
+ nerv.info_stderr("[CV] end iteration %d frame_accuracy: %.3f.\n\n", i, accu_new)
+ os.execute("sleep " .. 3)
+
+ local nnet_tmp = string.format("%s_%s_iter_%d_lr%f_tr%.3f_cv%.3f",
+ pf0,
+ os.date("%Y%m%d%H%M%S"),
+ i, gconf.lrate, accu_tr, accu_new)
+
+ -- TODO: revert the weights
+ local accu_diff = accu_new - accu_best
+ local cmd
+ if accu_new > accu_best then
+ accu_best = accu_new
+ nnet_in = nnet_tmp
+ gconf.batch_size = gconf.batch_size + 128
+ if gconf.batch_size > 1024 then
+ gconf.batch_size = 1024
+ end
+ else
+ -- reject
+ nnet_tmp = nnet_tmp.."_rejected"
+ do_halving = true
+ end
+ cmd = "mv "..nnet_out.." "..nnet_tmp
+ os.execute(cmd)
+
+ if do_halving and accu_diff < end_halving_inc and i > min_iter then
+ break;
+ end
+
+ if accu_diff < start_halving_inc and i >= min_halving then
+ do_halving = true
+ end
+
+ if do_halving then
+ gconf.lrate = gconf.lrate * halving_factor
+ halving_factor = halving_factor - 0.025
+ if halving_factor < 0.6 then
+ halving_factor = 0.6
+ end
+ end
+ nerv.info_stderr("iteration %d done!", i)
+end
+
+
diff --git a/fastnn/example/fastnn_baseline.lua b/fastnn/example/fastnn_baseline.lua
new file mode 100644
index 0000000..6e774de
--- /dev/null
+++ b/fastnn/example/fastnn_baseline.lua
@@ -0,0 +1,258 @@
+require 'htk_io'
+
+gconf = {lrate = 0.2, wcost = 1e-6, momentum = 0.9,
+ cumat_type = nerv.CuMatrixFloat,
+ mmat_type = nerv.MMatrixFloat,
+ frm_ext = 5,
+ frm_trim = 5,
+ batch_size = 256,
+ buffer_size = 81920,
+ rearrange = true,
+ tr_scp = "/sgfs/users/wd007/asr/baseline_chn_50h/finetune/finetune_baseline/train.scp",
+ cv_scp = "/sgfs/users/wd007/asr/baseline_chn_50h/finetune/finetune_baseline/train_cv.scp",
+ htk_conf = "/sgfs/users/wd007/asr/baseline_chn_50h/finetune/finetune_baseline/fbank_d_a_z.conf",
+ initialized_param = {"/sgfs/users/wd007/src/nerv/tools/nerv.global.transf",
+ "/sgfs/users/wd007/src/nerv/tools/nerv.svd0.55_3000h_iter1.init"},
+ debug = false}
+
+function make_layer_repo(param_repo)
+ local layer_repo = nerv.LayerRepo(
+ {
+ -- global transf
+ ["nerv.BiasLayer"] =
+ {
+ blayer1 = {{bias = "bias1"}, {dim_in = {1320}, dim_out = {1320}}},
+ },
+ ["nerv.WindowLayer"] =
+ {
+ wlayer1 = {{window = "window1"}, {dim_in = {1320}, dim_out = {1320}}},
+ },
+ -- biased linearity
+ ["nerv.AffineLayer"] =
+ {
+ affine0 = {{ltp = "affine0_ltp", bp = "affine0_bp"},
+ {dim_in = {1320}, dim_out = {2048}}},
+ affine1 = {{ltp = "affine1_ltp", bp = "affine1_bp"},
+ {dim_in = {2048}, dim_out = {367}}},
+ affine2 = {{ltp = "affine2_ltp", bp = "affine2_bp"},
+ {dim_in = {367}, dim_out = {2048}}},
+ affine3 = {{ltp = "affine3_ltp", bp = "affine3_bp"},
+ {dim_in = {2048}, dim_out = {408}}},
+ affine4 = {{ltp = "affine4_ltp", bp = "affine4_bp"},
+ {dim_in = {408}, dim_out = {2048}}},
+ affine5 = {{ltp = "affine5_ltp", bp = "affine5_bp"},
+ {dim_in = {2048}, dim_out = {368}}},
+ affine6 = {{ltp = "affine6_ltp", bp = "affine6_bp"},
+ {dim_in = {368}, dim_out = {2048}}},
+ affine7 = {{ltp = "affine7_ltp", bp = "affine7_bp"},
+ {dim_in = {2048}, dim_out = {303}}},
+ affine8 = {{ltp = "affine8_ltp", bp = "affine8_bp"},
+ {dim_in = {303}, dim_out = {2048}}},
+ affine9 = {{ltp = "affine9_ltp", bp = "affine9_bp"},
+ {dim_in = {2048}, dim_out = {277}}},
+ affine10 = {{ltp = "affine10_ltp", bp = "affine10_bp"},
+ {dim_in = {277}, dim_out = {2048}}},
+ affine11 = {{ltp = "affine11_ltp", bp = "affine11_bp"},
+ {dim_in = {2048}, dim_out = {361}}},
+ affine12 = {{ltp = "affine12_ltp", bp = "affine12_bp"},
+ {dim_in = {361}, dim_out = {2048}}},
+ affine13 = {{ltp = "affine13_ltp", bp = "affine13_bp"},
+ {dim_in = {2048}, dim_out = {441}}},
+ affine14 = {{ltp = "affine14_ltp", bp = "affine14_bp"},
+ {dim_in = {441}, dim_out = {10092}}},
+ },
+ ["nerv.SigmoidLayer"] =
+ {
+ sigmoid0 = {{}, {dim_in = {2048}, dim_out = {2048}}},
+ sigmoid1 = {{}, {dim_in = {2048}, dim_out = {2048}}},
+ sigmoid2 = {{}, {dim_in = {2048}, dim_out = {2048}}},
+ sigmoid3 = {{}, {dim_in = {2048}, dim_out = {2048}}},
+ sigmoid4 = {{}, {dim_in = {2048}, dim_out = {2048}}},
+ sigmoid5 = {{}, {dim_in = {2048}, dim_out = {2048}}},
+ sigmoid6 = {{}, {dim_in = {2048}, dim_out = {2048}}},
+ },
+ ["nerv.SoftmaxCELayer"] = -- softmax + ce criterion layer for finetune output
+ {
+ ce_crit = {{}, {dim_in = {10092, 1}, dim_out = {1}, compressed = true}}
+ },
+ ["nerv.SoftmaxLayer"] = -- softmax for decode output
+ {
+ softmax = {{}, {dim_in = {10092}, dim_out = {10092}}}
+ }
+ }, param_repo, gconf)
+
+ layer_repo:add_layers(
+ {
+ ["nerv.DAGLayer"] =
+ {
+ global_transf = {{}, {
+ dim_in = {1320}, dim_out = {1320},
+ sub_layers = layer_repo,
+ connections =
+ {
+ ["<input>[1]"] = "blayer1[1]",
+ ["blayer1[1]"] = "wlayer1[1]",
+ ["wlayer1[1]"] = "<output>[1]"
+ }
+ }},
+ main = {{}, {
+ dim_in = {1320}, dim_out = {10092},
+ sub_layers = layer_repo,
+ connections = {
+ ["<input>[1]"] = "affine0[1]",
+ ["affine0[1]"] = "sigmoid0[1]",
+ ["sigmoid0[1]"] = "affine1[1]",
+ ["affine1[1]"] = "affine2[1]",
+ ["affine2[1]"] = "sigmoid1[1]",
+ ["sigmoid1[1]"] = "affine3[1]",
+ ["affine3[1]"] = "affine4[1]",
+ ["affine4[1]"] = "sigmoid2[1]",
+ ["sigmoid2[1]"] = "affine5[1]",
+ ["affine5[1]"] = "affine6[1]",
+ ["affine6[1]"] = "sigmoid3[1]",
+ ["sigmoid3[1]"] = "affine7[1]",
+ ["affine7[1]"] = "affine8[1]",
+ ["affine8[1]"] = "sigmoid4[1]",
+ ["sigmoid4[1]"] = "affine9[1]",
+ ["affine9[1]"] = "affine10[1]",
+ ["affine10[1]"] = "sigmoid5[1]",
+ ["sigmoid5[1]"] = "affine11[1]",
+ ["affine11[1]"] = "affine12[1]",
+ ["affine12[1]"] = "sigmoid6[1]",
+ ["sigmoid6[1]"] = "affine13[1]",
+ ["affine13[1]"] = "affine14[1]",
+ ["affine14[1]"] = "<output>[1]",
+ }
+ }}
+ }
+ }, param_repo, gconf)
+
+ layer_repo:add_layers(
+ {
+ ["nerv.DAGLayer"] =
+ {
+ ce_output = {{}, {
+ dim_in = {1320, 1}, dim_out = {1},
+ sub_layers = layer_repo,
+ connections = {
+ ["<input>[1]"] = "main[1]",
+ ["main[1]"] = "ce_crit[1]",
+ ["<input>[2]"] = "ce_crit[2]",
+ ["ce_crit[1]"] = "<output>[1]"
+ }
+ }},
+ softmax_output = {{}, {
+ dim_in = {1320}, dim_out = {10092},
+ sub_layers = layer_repo,
+ connections = {
+ ["<input>[1]"] = "main[1]",
+ ["main[1]"] = "softmax[1]",
+ ["softmax[1]"] = "<output>[1]"
+ }
+ }}
+ }
+ }, param_repo, gconf)
+
+ return layer_repo
+end
+
+
+function get_network(layer_repo)
+ return layer_repo:get_layer("ce_output")
+end
+
+function get_decode_network(layer_repo)
+ return layer_repo:get_layer("softmax_output")
+end
+
+function get_global_transf(layer_repo)
+ return layer_repo:get_layer("global_transf")
+end
+
+
+
+function make_readers(scp_file, layer_repo, feat_repo_shareid, data_mutex_shareid)
+ return {
+ {reader = nerv.TNetReader(gconf,
+ {
+ id = "main_scp",
+ scp_file = scp_file,
+ conf_file = gconf.htk_conf,
+ frm_ext = gconf.frm_ext,
+ mlfs = {
+ phone_state = {
+ file = "/sgfs/users/wd007/asr/baseline_chn_50h/finetune/finetune_baseline/ref.mlf",
+ format = "map",
+ format_arg = "/sgfs/users/wd007/asr/baseline_chn_50h/finetune/finetune_baseline/dict",
+ dir = "*/",
+ ext = "lab"
+ }
+ },
+ global_transf = layer_repo:get_layer("global_transf")
+ }, feat_repo_shareid, data_mutex_shareid),
+ data = {main_scp = 1320, phone_state = 1}}
+ }
+end
+
+function get_feat_id()
+ return {main_scp = true}
+end
+
+
+function make_buffer(readers)
+ return nerv.SGDBuffer(gconf,
+ {
+ buffer_size = gconf.buffer_size,
+ randomize = gconf.randomize,
+ readers = readers,
+ use_gpu = true
+ })
+end
+
+function get_input_order()
+ return {{id = "main_scp", global_transf = true},
+ {id = "phone_state"}}
+end
+
+function get_accuracy(layer_repo)
+ local ce_crit = layer_repo:get_layer("ce_crit")
+ return ce_crit.total_correct / ce_crit.total_frames * 100
+end
+
+function print_stat(layer_repo)
+ local ce_crit = layer_repo:get_layer("ce_crit")
+ nerv.info("*** training stat begin ***")
+ nerv.printf("cross entropy:\t\t%.8f\n", ce_crit.total_ce)
+ nerv.printf("correct:\t\t%d\n", ce_crit.total_correct)
+ nerv.printf("frames:\t\t\t%d\n", ce_crit.total_frames)
+ nerv.printf("err/frm:\t\t%.8f\n", ce_crit.total_ce / ce_crit.total_frames)
+ nerv.printf("accuracy:\t\t%.3f%%\n", get_accuracy(layer_repo))
+ nerv.info("*** training stat end ***")
+end
+
+function print_xent(xent)
+ local totalframes = xent:totalframes()
+ local loss = xent:loss()
+ local correct = xent:correct()
+ nerv.info_stderr("*** training statistics info begin ***")
+ nerv.info_stderr("total frames:\t\t%d", totalframes)
+ nerv.info_stderr("cross entropy:\t%.8f", loss/totalframes)
+ nerv.info_stderr("frame accuracy:\t%.3f%%", 100*correct/totalframes)
+ nerv.info_stderr("*** training statistics info end ***")
+end
+
+function frame_acc(xent)
+ local correct = xent:correct()
+ local totalframes = xent:totalframes()
+ return string.format("%.3f", 100*correct/totalframes)
+end
+
+function print_gconf()
+ nerv.info_stderr("%s \t:= %s", "network", gconf.initialized_param[1])
+ nerv.info_stderr("%s \t:= %s", "transf", gconf.initialized_param[2])
+ nerv.info_stderr("%s \t:= %s", "batch_size", gconf.batch_size)
+ nerv.info_stderr("%s \t:= %s", "buffer_size", gconf.buffer_size)
+ nerv.info_stderr("%s \t:= %s", "lrate", gconf.lrate)
+ nerv.info_stderr("%s \t:= %s", "tr_scp", gconf.tr_scp)
+ nerv.info_stderr("%s \t:= %s", "cv_scp", gconf.cv_scp)
+end
diff --git a/fastnn/fastnn-scm-1.rockspec b/fastnn/fastnn-scm-1.rockspec
new file mode 100644
index 0000000..69dfb60
--- /dev/null
+++ b/fastnn/fastnn-scm-1.rockspec
@@ -0,0 +1,36 @@
+package = "fastnn"
+version = "scm-1"
+source = {
+ url = "https://github.com/uphantom/nerv-fastnn.git"
+}
+description = {
+ summary = "speed up Nerv neural network training.",
+ detailed = [[
+ ]],
+ homepage = "https://github.com/uphantom/nerv-fastnn",
+ license = "BSD"
+}
+dependencies = {
+ "nerv >= scm-1",
+ "lua >= 5.1"
+}
+build = {
+ type = "make",
+ build_variables = {
+ CFLAGS="$(CFLAGS)",
+ LIBFLAG="$(LIBFLAG)",
+ LUA_LIBDIR="$(LUA_LIBDIR)",
+ LUA_BINDIR="$(LUA_BINDIR)",
+ LUA_INCDIR="$(LUA_INCDIR)",
+ INST_PREFIX="$(PREFIX)",
+ LUA="$(LUA)",
+ },
+ install_variables = {
+ LUA_BINDIR="$(LUA_BINDIR)",
+ INST_PREFIX="$(PREFIX)",
+ INST_BINDIR="$(BINDIR)",
+ INST_LIBDIR="$(LIBDIR)",
+ INST_LUADIR="$(LUADIR)",
+ INST_CONFDIR="$(CONFDIR)",
+ },
+}
diff --git a/fastnn/init.c b/fastnn/init.c
new file mode 100644
index 0000000..2022293
--- /dev/null
+++ b/fastnn/init.c
@@ -0,0 +1,43 @@
+#include <lua.h>
+#include <lauxlib.h>
+
+#if LUA_VERSION_NUM == 501
+static void luaL_setfuncs(lua_State *L, const luaL_Reg *l, int nup)
+{
+ luaL_checkstack(L, nup+1, "too many upvalues");
+ for (; l->name != NULL; l++) { /* fill the table with given functions */
+ int i;
+ lua_pushstring(L, l->name);
+ for (i = 0; i < nup; i++) /* copy upvalues to the top */
+ lua_pushvalue(L, -(nup+1));
+ lua_pushcclosure(L, l->func, nup); /* closure with those upvalues */
+ lua_settable(L, -(nup + 3));
+ }
+ lua_pop(L, nup); /* remove upvalues */
+}
+#endif
+
+
+extern void fastnn_init_modelsync(lua_State *L);
+extern void fastnn_init_example(lua_State *L);
+extern void fastnn_init_example_repo(lua_State *L);
+extern void fastnn_init_device(lua_State *L);
+extern void fastnn_init_context(lua_State *L);
+
+int luaopen_libfastnn(lua_State *L)
+{
+ //printf("%s %lx\n", model_sync__[13].name, model_sync__[13].func);
+ lua_newtable(L);
+ /* duplicate table */
+ lua_pushvalue(L, -1);
+ /* set table to global index */
+ lua_setfield(L, LUA_GLOBALSINDEX, "fastnn");
+
+ fastnn_init_modelsync(L);
+ fastnn_init_example(L);
+ fastnn_init_example_repo(L);
+ fastnn_init_device(L);
+ fastnn_init_context(L);
+ return 1;
+}
+
diff --git a/fastnn/init.lua b/fastnn/init.lua
new file mode 100644
index 0000000..5e290ca
--- /dev/null
+++ b/fastnn/init.lua
@@ -0,0 +1,11 @@
+require 'libfastnn'
+
+function fastnn.include(filename)
+ local caller = debug.getinfo(2, "S").source:sub(2)
+ dofile(nerv.dirname(caller) .. filename)
+end
+
+fastnn.include('lib/modelsync.lua')
+fastnn.include('io/example.lua')
+fastnn.include('device/device.lua')
+
diff --git a/fastnn/io/Example.cpp b/fastnn/io/Example.cpp
new file mode 100644
index 0000000..8f200b7
--- /dev/null
+++ b/fastnn/io/Example.cpp
@@ -0,0 +1,186 @@
+
+#include <deque>
+#include <vector>
+#include <string>
+
+
+extern "C" {
+
+#include "../threads/lib/THThread.h"
+#include "Example.h"
+#include "stdlib.h"
+#include "stdio.h"
+
+#include "../../nerv/lib/matrix/generic/elem_type.h"
+#include "common.h"
+
+extern Matrix* nerv_matrix_cuda_float_create(long nrow, long ncol, Status *status);
+void nerv_matrix_cuda_float_copy_fromd(Matrix *a, const Matrix *b,
+ int a_begin, int b_begin, int b_end,
+ Status *status);
+
+struct Example
+{
+ std::vector<Matrix *> inputs;
+ std::string id;
+ int refcount;
+};
+
+struct ExamplesRepository
+{
+ int buffer_size_;
+ THSemaphore *full_semaphore_;
+ THSemaphore *empty_semaphore_;
+ THMutex *examples_mutex_;
+
+ std::deque<Example*> examples_;
+ bool done_;
+ int refcount;
+ int gpuid;
+};
+
+Example* Example_new()
+{
+ Example *example = new Example; //(Example*)malloc(sizeof(Example));
+ example->refcount = 1;
+ return example;
+}
+
+Example* Example_newWithId(long id)
+{
+ Example* example = (Example*)(id);
+ __sync_fetch_and_add(&example->refcount, 1);
+ return example;
+}
+
+long Example_id(Example *example)
+{
+ return (long)(example);
+}
+
+void Example_destroy(Example* example)
+{
+ //printf("Example_destroy: %d\n", example->inputs.size());
+ if (NULL != example && __sync_fetch_and_add(&example->refcount, -1) == 1)
+ {
+ delete example;
+ example = NULL;
+ }
+}
+
+int Example_size(Example* example)
+{
+ return example->inputs.size();
+}
+
+Matrix* Example_at(Example* example, int idx)
+{
+ return example->inputs.at(idx);
+}
+
+void Example_pushback(Example* example, Matrix* m)
+{
+ Status status;
+ Matrix *newm = nerv_matrix_cuda_float_create(m->nrow, m->ncol, &status);
+ nerv_matrix_cuda_float_copy_fromd(newm, m, 0, 0, m->nrow, &status);
+ //__sync_fetch_and_add(&m->refcount, 1);
+ return example->inputs.push_back(newm);
+}
+
+//////////////////////////////////////////////////////////////
+
+
+ExamplesRepository* ExamplesRepository_new(int buffersize)
+{
+ ExamplesRepository *repo = new ExamplesRepository; //(ExamplesRepository*)malloc(sizeof(ExamplesRepository));
+ repo->buffer_size_ = buffersize;
+ repo->full_semaphore_ = THSemaphore_new(0);
+ repo->empty_semaphore_ = THSemaphore_new(buffersize);
+ repo->examples_mutex_ = THMutex_new();
+// repo->examples_ = new std::deque<Example*>();
+ repo->done_ = false;
+ repo->refcount = 1;
+ repo->gpuid = -1;
+ return repo;
+}
+
+ExamplesRepository* ExamplesRepository_newWithId(long id)
+{
+ ExamplesRepository *repo = (ExamplesRepository*)(id);
+ __sync_fetch_and_add(&repo->refcount, 1);
+ return repo;
+}
+
+long ExamplesRepository_id(ExamplesRepository* repo)
+{
+ return (long)(repo);
+}
+
+int ExamplesRepository_getGpuId(ExamplesRepository* repo)
+{
+ return repo->gpuid;
+}
+
+void ExamplesRepository_setGpuId(ExamplesRepository* repo, int gpuid)
+{
+ repo->gpuid = gpuid;
+}
+
+void ExamplesRepository_destroy(ExamplesRepository* repo)
+{
+ if (NULL != repo && __sync_fetch_and_add(&repo->refcount, -1) == 1)
+ {
+ if (repo->full_semaphore_)
+ THSemaphore_free(repo->full_semaphore_);
+ if (repo->empty_semaphore_)
+ THSemaphore_free(repo->empty_semaphore_);
+ if (repo->examples_mutex_)
+ THMutex_free(repo->examples_mutex_);
+ delete repo;
+ repo = NULL;
+ }
+}
+
+void AcceptExample(ExamplesRepository *repo, Example *example)
+{
+ THSemaphore_wait(repo->empty_semaphore_);
+ THMutex_lock(repo->examples_mutex_);
+ __sync_fetch_and_add(&example->refcount, 1);
+ repo->examples_.push_back(example);
+ THMutex_unlock(repo->examples_mutex_);
+ THSemaphore_signal(repo->full_semaphore_);
+}
+
+void ExamplesDone(ExamplesRepository *repo)
+{
+ for (int i = 0; i < repo->buffer_size_; i++)
+ THSemaphore_wait(repo->empty_semaphore_);
+
+ repo->done_ = true;
+ THSemaphore_signal(repo->full_semaphore_);
+}
+
+Example* ProvideExample(ExamplesRepository *repo)
+{
+ Example *ans = NULL;
+ THSemaphore_wait(repo->full_semaphore_);
+ if (repo->done_)
+ {
+ THSemaphore_signal(repo->full_semaphore_); // Increment the semaphore so
+ // the call by the next thread will not block.
+ return NULL; // no examples to return-- all finished.
+ }
+ else
+ {
+ THMutex_lock(repo->examples_mutex_);
+ ans = repo->examples_.front();
+ repo->examples_.pop_front();
+ THMutex_unlock(repo->examples_mutex_);
+ THSemaphore_signal(repo->empty_semaphore_);
+ }
+ return ans;
+}
+
+}
+
+
diff --git a/fastnn/io/Example.h b/fastnn/io/Example.h
new file mode 100644
index 0000000..1c462ab
--- /dev/null
+++ b/fastnn/io/Example.h
@@ -0,0 +1,49 @@
+#ifndef NERV_FASTNN_EXAMPLE_H
+#define NERV_FASTNN_EXAMPLE_H
+
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "matrix/matrix.h"
+#include "stdbool.h"
+#define STRLEN 1024
+
+
+
+typedef struct ExamplesRepository ExamplesRepository;
+typedef struct Example Example;
+
+ExamplesRepository* ExamplesRepository_new(int buffersize);
+ExamplesRepository* ExamplesRepository_newWithId(long id);
+long ExamplesRepository_id(ExamplesRepository* reop);
+void ExamplesRepository_destroy(ExamplesRepository* reop);
+void AcceptExample(ExamplesRepository *repo, Example *example);
+void ExamplesDone(ExamplesRepository *repo);
+Example* ProvideExample(ExamplesRepository *repo);
+void ExamplesRepository_setGpuId(ExamplesRepository* repo, int gpuid);
+int ExamplesRepository_getGpuId(ExamplesRepository* repo);
+
+
+Example* Example_new();
+Example* Example_newWithId(long id);
+long Example_id(Example* example);
+void Example_destroy(Example* example);
+int Example_size(Example* example);
+Matrix* Example_at(Example* example, int idx);
+void Example_pushback(Example* example, Matrix* m);
+
+
+
+
+
+
+
+
+#ifdef __cplusplus
+} // closing brace for extern "C"
+
+#endif // end Example
+
+#endif
diff --git a/fastnn/io/example.c b/fastnn/io/example.c
new file mode 100644
index 0000000..f2f0916
--- /dev/null
+++ b/fastnn/io/example.c
@@ -0,0 +1,249 @@
+
+#include <stdio.h>
+#include <stdlib.h>
+
+#include <lua.h>
+#include <lualib.h>
+#include <luaT/luaT.h>
+
+
+
+#include "../threads/lib/luaTHRD.h"
+#include "Example.h"
+
+
+const char *fastnn_example_repo_tname = "fastnn.CExamplesRepo";
+const char *fastnn_example_tname = "fastnn.CExample";
+
+static int example_repo_new(lua_State *L)
+{
+ ExamplesRepository *repo = NULL ;
+ int buffersize;
+ bool shared;
+ long id;
+ if(lua_gettop(L) == 2 && lua_isboolean(L, 2))
+ {
+ shared = lua_toboolean(L, 2);
+ if (shared)
+ {
+ id = luaL_checkinteger(L, 1);
+ repo = ExamplesRepository_newWithId(id);
+ }
+ else
+ {
+ buffersize = luaL_checkinteger(L, 1);
+ repo = ExamplesRepository_new(buffersize);
+ }
+ }
+ else
+ luaL_error(L, "example: example repo new invalid arguments");
+ if (!repo)
+ luaL_error(L, "example: example new failed");
+
+ luaTHRD_pushudata(L, repo, fastnn_example_repo_tname);
+
+ return 1;
+}
+
+static int example_repo_newfromid(lua_State *L)
+{
+ ExamplesRepository *repo = NULL ;
+ if(lua_gettop(L) == 1)
+ {
+ long id = luaL_checkinteger(L, 1);
+ repo = ExamplesRepository_newWithId(id);
+ }
+ else
+ luaL_error(L, "example: example repo new invalid arguments");
+ if (!repo)
+ luaL_error(L, "example: example new failed");
+
+ luaTHRD_pushudata(L, repo, fastnn_example_repo_tname);
+
+ return 1;
+}
+
+static int example_repo_tostring(lua_State *L)
+{
+ char str[STRLEN];
+ ExamplesRepository *repo = luaTHRD_checkudata(L, 1, fastnn_example_repo_tname);
+ snprintf(str, STRLEN, "%s <%lx>", fastnn_example_repo_tname, ExamplesRepository_id(repo));
+ lua_pushstring(L, str);
+ return 1;
+}
+
+static int example_repo_id(lua_State *L)
+{
+ ExamplesRepository *repo = luaTHRD_checkudata(L, 1, fastnn_example_repo_tname);
+ lua_pushinteger(L, ExamplesRepository_id(repo));
+ return 1;
+}
+
+static int example_repo_get_gpuid(lua_State *L)
+{
+ ExamplesRepository *repo = luaTHRD_checkudata(L, 1, fastnn_example_repo_tname);
+ lua_pushinteger(L, ExamplesRepository_getGpuId(repo));
+ return 1;
+}
+
+static int example_repo_set_gpuid(lua_State *L)
+{
+ ExamplesRepository *repo = luaTHRD_checkudata(L, 1, fastnn_example_repo_tname);
+ int gpuid = luaL_checkinteger(L, 2);
+ ExamplesRepository_setGpuId(repo, gpuid);
+ return 0;
+}
+
+static int example_repo_accept(lua_State *L)
+{
+ ExamplesRepository *repo = luaTHRD_checkudata(L, 1, fastnn_example_repo_tname);
+ Example *example = luaTHRD_checkudata(L, 2, fastnn_example_tname);
+ AcceptExample(repo, example);
+ return 0;
+}
+
+static int example_repo_provide(lua_State *L)
+{
+ ExamplesRepository *repo = luaTHRD_checkudata(L, 1, fastnn_example_repo_tname);
+ Example *example = ProvideExample(repo);
+ if (NULL != example)
+ {
+ luaTHRD_pushudata(L, example, fastnn_example_tname);
+ return 1;
+ }
+ else
+ return 0;
+}
+
+static int example_repo_done(lua_State *L)
+{
+ ExamplesRepository *repo = luaTHRD_checkudata(L, 1, fastnn_example_repo_tname);
+ ExamplesDone(repo);
+ return 0;
+}
+
+static int example_repo_destroy(lua_State *L)
+{
+ ExamplesRepository *repo = luaTHRD_checkudata(L, 1, fastnn_example_repo_tname);
+ ExamplesRepository_destroy(repo);
+ return 0;
+}
+
+/******************************************/
+
+static int example_new(lua_State *L)
+{
+
+ Example *example = NULL;
+ if(lua_gettop(L) == 0)
+ {
+ example = Example_new();
+ }
+ else if(lua_gettop(L) == 1)
+ {
+ long id = luaL_checkinteger(L, 1);
+ example = Example_newWithId(id);
+ }
+ else
+ luaL_error(L, "example: example new invalid arguments");
+ if (!example)
+ luaL_error(L, "example: example failed");
+
+ luaTHRD_pushudata(L, example, fastnn_example_tname);
+
+ return 1;
+}
+
+static int example_id(lua_State *L)
+{
+ Example *example = luaTHRD_checkudata(L, 1, fastnn_example_tname);
+ lua_pushinteger(L, Example_id(example));
+ return 1;
+}
+
+static int example_tostring(lua_State *L)
+{
+ char str[STRLEN];
+ Example* example = luaTHRD_checkudata(L, 1, fastnn_example_tname);
+ snprintf(str, STRLEN, "%s <%lx>", fastnn_example_tname, Example_id(example));
+ lua_pushstring(L, str);
+ return 1;
+}
+
+static int example_size(lua_State *L)
+{
+ Example *example = luaTHRD_checkudata(L, 1, fastnn_example_tname);
+ lua_pushinteger(L, Example_size(example));
+ return 1;
+}
+
+static int example_at(lua_State *L)
+{
+ Example *example = luaTHRD_checkudata(L, 1, fastnn_example_tname);
+ int idx = luaL_checkinteger(L, 2);
+ Matrix *m = Example_at(example, idx);
+ luaTHRD_pushudata(L, m, "nerv.CuMatrixFloat");
+ //luaT_pushudata(L, m, "nerv.CuMatrixFloat");
+ return 1;
+}
+
+static int example_pushback(lua_State *L)
+{
+ Example *example = luaTHRD_checkudata(L, 1, fastnn_example_tname);
+ Matrix *m = luaTHRD_checkudata(L, 2, "nerv.CuMatrixFloat");
+ //Matrix *m = luaT_checkudata(L, 2, "nerv.CuMatrixFloat");
+ Example_pushback(example, m);
+ return 0;
+}
+
+static int example_destroy(lua_State *L)
+{
+ Example *example = luaTHRD_checkudata(L, 1, fastnn_example_tname);
+ Example_destroy(example);
+ //printf("example_destroy ... end\n");
+ return 0;
+}
+
+
+static const struct luaL_Reg example_repo__ [] = {
+ {"new", example_repo_new},
+ {"newfromid", example_repo_newfromid},
+ {"__tostring", example_repo_tostring},
+ {"id", example_repo_id},
+ {"get_gpuid", example_repo_get_gpuid},
+ {"set_gpuid", example_repo_set_gpuid},
+ {"accept", example_repo_accept},
+ {"provide", example_repo_provide},
+ {"done", example_repo_done},
+ {"free", example_repo_destroy},
+ {NULL, NULL}
+};
+
+static const struct luaL_Reg example__ [] = {
+ {"new", example_new},
+ {"__tostring", example_tostring},
+ {"id", example_id},
+ {"size", example_size},
+ {"at", example_at},
+ {"pushback", example_pushback},
+ {"free", example_destroy},
+ {NULL, NULL}
+};
+
+void fastnn_init_example(lua_State *L)
+{
+ luaT_newmetatable(L, fastnn_example_tname, NULL, example_new, example_destroy, NULL);
+ luaL_register(L, NULL, example__);
+ lua_pop(L, 1);
+}
+
+void fastnn_init_example_repo(lua_State *L)
+{
+ luaT_newmetatable(L, fastnn_example_repo_tname, NULL, example_repo_new, example_repo_destroy, NULL);
+ luaL_register(L, NULL, example_repo__);
+ lua_pop(L, 1);
+}
+
+
+
+
diff --git a/fastnn/io/example.lua b/fastnn/io/example.lua
new file mode 100644
index 0000000..8808791
--- /dev/null
+++ b/fastnn/io/example.lua
@@ -0,0 +1,38 @@
+
+local T = require 'libthreads'
+local C = require 'libfastnn'
+
+local ExamplesRepo = nerv.class("fastnn.ExamplesRepo")
+local Example = nerv.class("fastnn.Example")
+
+fastnn.CExamplesRepo = C.CExamplesRepo
+fastnn.CExample = C.CExample
+
+function ExamplesRepo:__init(shareid)
+ if nil ~= shareid then
+ -- print(shareid)
+ self.repo = C.CExamplesRepo(shareid, true)
+ -- print(self.repo:__tostring())
+ end
+end
+
+
+function Example:PrepareData(data, global_transf, trains_id)
+ local example = fastnn.CExample()
+ if nil ~= data then
+ for id, cm in pairs(data) do
+ if trains_id[id] and nil ~= global_transf then
+ local tcm = nerv.speech_utils.normalize(cm, global_transf)
+ example:pushback(tcm)
+ else
+ example:pushback(cm)
+ end
+
+ end
+ end
+
+ return example
+end
+
+
+
diff --git a/fastnn/lib/ModelSync.c b/fastnn/lib/ModelSync.c
new file mode 100644
index 0000000..bd511ea
--- /dev/null
+++ b/fastnn/lib/ModelSync.c
@@ -0,0 +1,305 @@
+
+#include "ModelSync.h"
+#include "../../nerv/lib/matrix/cuda_helper.h"
+#include "../../nerv/lib/matrix/generic/elem_type.h"
+#include "common.h"
+#include <string.h>
+
+
+ModelSync* ModelSync_new(void)
+{
+ ModelSync *self = (ModelSync*)malloc(sizeof(ModelSync));
+ if (NULL != self)
+ {
+ self->model_mutex = THMutex_new();
+ self->state_mutex = THMutex_new();
+ self->initialized_ = false;
+ self->dim_ = 0;
+ self->pos_ = 0;
+ self->data_ = NULL;
+ self->free_data_ = NULL;
+ self->data_ = NULL;
+ self->refcount = 1;
+ self->threadcount = 0;
+ }
+ return self;
+}
+
+ModelSync* ModelSync_newWithId(long id)
+{
+ ModelSync *self = (ModelSync*)id;
+ __sync_fetch_and_add(&self->refcount, 1);
+ return self;
+}
+
+long ModelSync_id(ModelSync *self)
+{
+ return (long)(self);
+}
+
+int ModelSync_lockmodel(ModelSync *self)
+{
+ if(THMutex_lock(self->model_mutex))
+ return 1;
+ return 0;
+}
+
+int ModelSync_unlockmodel(ModelSync *self)
+{
+ if(THMutex_unlock(self->model_mutex))
+ return 1;
+ return 0;
+
+}
+int ModelSync_lockstate(ModelSync *self)
+{
+ if(THMutex_lock(self->state_mutex))
+ return 1;
+ return 0;
+}
+
+int ModelSync_unlockstate(ModelSync *self)
+{
+ if(THMutex_unlock(self->state_mutex))
+ return 1;
+ return 0;
+}
+
+int ModelSync_free(ModelSync *self)
+{
+ if (NULL != self && __sync_fetch_and_add(&self->refcount, -1) == 1)
+ {
+ free(self->model_mutex);
+ free(self->state_mutex);
+ Status status;
+ CUDA_SAFE_SYNC_CALL(cudaFreeHost(self->free_data_), &status);
+ free(self);
+ }
+}
+
+int ModelSync_initBuffer(ModelSync *self)
+{
+ if (NULL != self)
+ {
+ void *free_data = NULL, *data = NULL;
+ size_t size = self->dim_ * sizeof(float)+16;
+ Status status;
+ CUDA_SAFE_SYNC_CALL(cudaHostAlloc((void**) &free_data, size, cudaHostAllocPortable), &status);
+ NERV_SET_STATUS(&status, NERV_NORMAL, 0);
+
+ data = (free_data ? (void *)( (((unsigned long)*(&free_data)) + 15) & ~0xFUL ) : NULL) ;
+ if (NULL != data)
+ {
+ self->data_ = (float*)(data);
+ self->free_data_ = (float*)(free_data);
+ }
+ return 0;
+ }
+ return 1;
+}
+
+int ModelSync_weightfromd(ModelSync *self, Matrix *dm)
+{
+
+ if (NULL != self && NULL != dm)
+ {
+ void *host_data_ = (void*)self->data_;
+ size_t width = dm->ncol * sizeof(float);
+ size_t src_pitch = dm->stride;
+ size_t dst_pitch = src_pitch;
+ Status status;
+
+ CUDA_SAFE_SYNC_CALL(cudaMemcpy2D(host_data_+self->pos_, dst_pitch, dm->data.f, src_pitch, width, dm->nrow, cudaMemcpyDeviceToHost), &status);
+ NERV_SET_STATUS(&status, NERV_NORMAL, 0);
+ self->pos_ += dm->nrow * dm->stride;
+ return 0;
+ }
+ return 1;
+
+}
+
+
+int ModelSync_weighttod(ModelSync *self, Matrix *dm)
+{
+
+ if (NULL != self && NULL != dm)
+ {
+ void *host_data_ = (void*)self->data_;
+ size_t width = dm->ncol * sizeof(float);
+ size_t dst_pitch = dm->stride;
+ size_t src_pitch = dst_pitch;
+ Status status;
+
+ CUDA_SAFE_SYNC_CALL(cudaMemcpy2D(dm->data.f, dst_pitch, host_data_+self->pos_, src_pitch, width, dm->nrow, cudaMemcpyHostToDevice), &status);
+ NERV_SET_STATUS(&status, NERV_NORMAL, 0);
+
+ self->pos_ += dm->nrow * dm->stride;
+ self->initialized_ = true;
+ return 0;
+ }
+ return 1;
+}
+
+void ModelSync_syncinc(ModelSync *self)
+{
+ __sync_fetch_and_add(&self->threadcount, 1);
+}
+
+void ModelSync_syncdec(ModelSync *self)
+{
+ __sync_fetch_and_add(&self->threadcount, -1);
+}
+
+int ModelSync_threadcount(ModelSync *self)
+{
+ return self->threadcount;
+}
+
+/////////////////////////////////
+
+Xent* Xent_new()
+{
+ Xent *xent = (Xent*)malloc(sizeof(Xent));
+ memset(xent, 0, sizeof(Xent));
+ xent->refcount = 1;
+ return xent;
+}
+
+Xent* Xent_newWithId(long id)
+{
+ Xent *xent = (Xent*)id;
+ __sync_fetch_and_add(&xent->refcount, 1);
+ return xent;
+}
+
+Xent* Xent_newWithParm(size_t frames_, size_t correct_, double loss_, double entropy_)
+{
+ Xent *xent = (Xent*)malloc(sizeof(Xent));
+ xent->frames_ = frames_;
+ xent->correct_ = correct_;
+ xent->loss_ = loss_;
+ xent->entropy_ = entropy_;
+ xent->refcount = 1;
+ return xent;
+}
+
+long Xent_id(Xent *xent)
+{
+ return (long)(xent);
+}
+
+Xent* Xent_add(Xent *a, Xent *b)
+{
+ a->frames_ += b->frames_;
+ a->correct_ += b->correct_;
+ a->loss_ += b->loss_;
+ a->entropy_ += b->entropy_;
+ return a;
+}
+
+void Xent_free(Xent *xent)
+{
+ if (NULL != xent && __sync_fetch_and_add(&xent->refcount, -1) == 1)
+ {
+ free(xent);
+ xent = NULL;
+ }
+}
+
+
+//////////////////////////////////
+
+Mse* Mse_new()
+{
+ Mse *mse = (Mse*)malloc(sizeof(Mse));
+ memset(mse, 0, sizeof(Mse));
+ mse->refcount = 1;
+ return mse;
+}
+
+Mse* Mse_newWithId(long id)
+{
+ Mse *mse = (Mse*)id;
+ __sync_fetch_and_add(&mse->refcount, 1);
+ return mse;
+}
+
+Mse* Mse_newWithParm(size_t frames_, double loss_)
+{
+ Mse *mse = (Mse*)malloc(sizeof(Mse));
+ mse->frames_ = frames_;
+ mse->loss_ = loss_;
+ mse->refcount = 1;
+ return mse;
+}
+
+
+long Mse_id(Mse *mse)
+{
+ return (long)(mse);
+}
+
+Mse* Mse_add(Mse *a, Mse *b)
+{
+ a->frames_ += b->frames_;
+ a->loss_ += b->loss_;
+ return a;
+}
+
+void Mse_free(Mse *mse)
+{
+ if (NULL != mse && __sync_fetch_and_add(&mse->refcount, -1) == 1)
+ {
+ free(mse);
+ mse = NULL;
+ }
+}
+
+//////////////////////////////////
+
+GlobalOption* GlobalOption_new()
+{
+ GlobalOption *option = (GlobalOption*)malloc(sizeof(GlobalOption));
+ option->refcount = 1;
+ return option;
+}
+
+GlobalOption* GlobalOption_newWithParm(int batch_size, float lrate, bool bp,const char *tr_scp, const char *cv_scp, const char *transf, const char *network)
+{
+ GlobalOption *option = (GlobalOption*)malloc(sizeof(GlobalOption));
+ option->batch_size = batch_size;
+ option->lrate = lrate;
+ option->bp = bp;
+ strncpy(option->tr_scp, tr_scp, strlen(tr_scp)+1);
+ strncpy(option->cv_scp, cv_scp, strlen(cv_scp)+1);
+ strncpy(option->transf, transf, strlen(transf)+1);
+ strncpy(option->network, network, strlen(network)+1);
+ option->refcount = 1;
+
+ return option;
+}
+
+GlobalOption* GlobalOption_newWithId(long id)
+{
+ GlobalOption *option = (GlobalOption*)id;
+ __sync_fetch_and_add(&option->refcount, 1);
+ return option;
+}
+
+
+
+long GlobalOption_id(GlobalOption *option)
+{
+ return (long)(option);
+}
+
+void GlobalOption_free(GlobalOption *option)
+{
+ if (NULL != option && __sync_fetch_and_add(&option->refcount, -1) == 1)
+ {
+ free(option);
+ option = NULL;
+ }
+}
+
+
diff --git a/fastnn/lib/ModelSync.h b/fastnn/lib/ModelSync.h
new file mode 100644
index 0000000..71216a0
--- /dev/null
+++ b/fastnn/lib/ModelSync.h
@@ -0,0 +1,119 @@
+
+#ifndef NERV_FASTNN_MODELSYNC_H
+#define NERV_FASTNN_MODELSYNC_H
+
+#define STRLEN 1024
+
+#include "../threads/lib/THThread.h"
+#include "matrix/matrix.h"
+#include "stdlib.h"
+#include "stdbool.h"
+
+typedef struct NnetParallelOptions_
+{
+ int num_threads;
+ int merge_size;
+ int num_merge;
+ int num_procs;
+ int threadid;
+ int myid;
+ int thread_level;
+ char merge_func[STRLEN];
+ char log_file[STRLEN];
+} NnetParallelOptions;
+
+
+typedef struct ModelSync_
+{
+ THMutex *model_mutex;
+ THMutex *state_mutex;
+ bool initialized_;
+ int dim_;
+ int pos_;
+ float *data_;
+ float *free_data_;
+ int refcount;
+ int threadcount;
+}ModelSync;
+
+ModelSync *ModelSync_new(void);
+ModelSync *ModelSync_newWithId(long id);
+int ModelSync_free(ModelSync *self);
+long ModelSync_id(ModelSync *self);
+int ModelSync_lockmodel(ModelSync *self);
+int ModelSync_unlockmodel(ModelSync *self);
+int ModelSync_lockstate(ModelSync *self);
+int ModelSync_unlockstate(ModelSync *self);
+int ModelSync_initBuffer(ModelSync *self);
+int ModelSync_weightfromd(ModelSync *self, Matrix *dm);
+int ModelSync_weighttod(ModelSync *self, Matrix *dm);
+int ModelSync_threadcount(ModelSync *self);
+void ModelSync_syncinc(ModelSync *self);
+void ModelSync_syncdec(ModelSync *self);
+
+typedef struct Xent_
+{
+ size_t frames_;
+ size_t correct_;
+ double loss_;
+ double entropy_;
+ int refcount;
+} Xent;
+
+Xent* Xent_new();
+Xent* Xent_newWithId(long id);
+Xent* Xent_newWithParm(size_t frames_, size_t correct_, double loss_, double entropy_);
+long Xent_id(Xent *xent);
+Xent* Xent_add(Xent *a, Xent *b);
+void Xent_free(Xent *xent);
+
+typedef struct Mse_
+{
+ size_t frames_;
+ double loss_;
+ int refcount;
+} Mse;
+
+Mse* Mse_new();
+Mse* Mse_newWithId(long id);
+Mse* Mse_newWithParm(size_t frames_, double loss_);
+long Mse_id(Mse *mse);
+Mse* Mse_add(Mse *a, Mse *b);
+void Mse_free(Mse *mse);
+
+typedef struct NnetUpdateState_
+{
+ int num_utter;
+ int num_nolabel;
+ int num_other_error;
+ long total_frames;
+ Xent xent;
+ Mse mse;
+} NnetUpdateState;
+
+typedef struct GlobalOption_
+{
+ int batch_size;
+ float lrate;
+ bool bp;
+ char tr_scp[STRLEN];
+ char cv_scp[STRLEN];
+ char transf[STRLEN];
+ char network[STRLEN];
+ int refcount;
+}GlobalOption;
+
+
+GlobalOption* GlobalOption_new();
+GlobalOption* GlobalOption_newWithParm(int batch_size, float lrate, bool bp, const char *tr_scp, const char *cv_scp, const char *transf, const char *network);
+GlobalOption* GlobalOption_newWithId(long id);
+long GlobalOption_id(GlobalOption *option);
+void GlobalOption_free(GlobalOption *option);
+
+
+
+
+#endif
+
+
+
diff --git a/fastnn/lib/modelsync.c b/fastnn/lib/modelsync.c
new file mode 100644
index 0000000..2b52752
--- /dev/null
+++ b/fastnn/lib/modelsync.c
@@ -0,0 +1,532 @@
+
+#include <stdio.h>
+#include <stdlib.h>
+
+#include <lua.h>
+#include <lualib.h>
+#include <luaT/luaT.h>
+
+
+
+#include "ModelSync.h"
+#include "../threads/lib/luaTHRD.h"
+
+const char *fastnn_model_sync_tname = "fastnn.CModelSync";
+const char *fastnn_xent_tname = "fastnn.CXent";
+const char *fastnn_mse_tname = "fastnn.CMse";
+const char *fastnn_global_option_tname = "fastnn.CGlobalOption";
+
+static int model_sync_new(lua_State *L)
+{
+ ModelSync *model_sync = NULL;
+ if(lua_gettop(L) == 0)
+ {
+ model_sync = ModelSync_new();
+ }
+ else if(lua_gettop(L) == 1)
+ {
+ long id = luaL_checklong(L, 1);
+ model_sync = ModelSync_newWithId(id);
+ }
+ else
+ luaL_error(L, "modelsync: modelsync new invalid arguments");
+ if(!model_sync)
+ luaL_error(L, "modelsync: modelsync new failed");
+
+ luaTHRD_pushudata(L, model_sync, fastnn_model_sync_tname);
+ return 1;
+}
+
+
+static int model_sync_tostring(lua_State *L)
+{
+ char str[STRLEN];
+ ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname);
+ snprintf(str, STRLEN, "fastnn.modelsync <%lx>", ModelSync_id(model_sync));
+ lua_pushstring(L, str);
+ return 1;
+}
+
+static int model_sync_id(lua_State *L)
+{
+ ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname);
+ lua_pushinteger(L, ModelSync_id(model_sync));
+ return 1;
+}
+
+static int model_sync_lockmodel(lua_State *L)
+{
+ ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname);
+ if (ModelSync_lockmodel(model_sync))
+ luaL_error(L, "modelsync: model lock failed");
+ return 0;
+}
+
+static int model_sync_unlockmodel(lua_State *L)
+{
+ ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname);
+ if (ModelSync_unlockmodel(model_sync))
+ luaL_error(L, "modelsync: model unlock failed");
+ return 0;
+}
+
+static int model_sync_lockstate(lua_State *L)
+{
+ ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname);
+ if (ModelSync_lockstate(model_sync))
+ luaL_error(L, "modelsync: state lock failed");
+ return 0;
+}
+
+static int model_sync_unlockstate(lua_State *L)
+{
+ ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname);
+ if (ModelSync_unlockstate(model_sync))
+ luaL_error(L, "modelsync: state unlock failed");
+ return 0;
+}
+
+static int model_sync_free(lua_State *L)
+{
+ ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname);
+ ModelSync_free(model_sync);
+ return 0;
+}
+
+static int model_sync_initbuffer(lua_State *L)
+{
+ ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname);
+ model_sync->dim_ = luaL_checkinteger(L, 2);
+ ModelSync_initBuffer(model_sync);
+ return 0;
+}
+
+static int model_sync_weightfromd(lua_State *L)
+{
+ ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname);
+ Matrix *dm = luaT_checkudata(L, 2, "nerv.CuMatrixFloat");
+ ModelSync_weightfromd(model_sync, dm);
+ return 0;
+}
+
+static int model_sync_weighttod(lua_State *L)
+{
+ ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname);
+ Matrix *dm = luaT_checkudata(L, 2, "nerv.CuMatrixFloat");
+ ModelSync_weighttod(model_sync, dm);
+ return 0;
+}
+
+static int model_sync_initialized(lua_State *L)
+{
+ ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname);
+ lua_pushboolean(L, model_sync->initialized_);
+ return 1;
+}
+
+static int model_sync_setpos(lua_State *L)
+{
+ ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname);
+ int pos = luaL_checkinteger(L, 2);
+ model_sync->pos_ = pos;
+ return 0;
+}
+
+static int model_sync_threadcount(lua_State *L)
+{
+ ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname);
+ lua_pushinteger(L, ModelSync_threadcount(model_sync));
+ return 1;
+}
+
+static int model_sync_syncinc(lua_State *L)
+{
+ ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname);
+ ModelSync_syncinc(model_sync);
+ return 0;
+}
+
+static int model_sync_syncdec(lua_State *L)
+{
+ ModelSync *model_sync = luaTHRD_checkudata(L, 1, fastnn_model_sync_tname);
+ ModelSync_syncdec(model_sync);
+ return 0;
+}
+//////////////////////////////////////////
+
+static int xent_new(lua_State *L)
+{
+ Xent *xent = NULL;
+ if(lua_gettop(L) == 0)
+ {
+ xent = Xent_new();
+ }
+ else if(lua_gettop(L) == 1)
+ {
+ long id = luaL_checklong(L, 1);
+ xent = Xent_newWithId(id);
+ }
+ else if(lua_gettop(L) == 4)
+ {
+ size_t frames_, correct_;
+ double loss_, entropy_ ;
+ frames_ = luaL_checkinteger(L, 1);
+ correct_ = luaL_checkinteger(L, 2);
+ loss_ = luaL_checknumber(L, 3);
+ entropy_ = luaL_checknumber(L, 4);
+ xent = Xent_newWithParm(frames_, correct_, loss_, entropy_);
+ }
+ else
+ luaL_error(L, "xent: xent new invalid arguments");
+ if(!xent)
+ luaL_error(L, "xent: xent new failed");
+
+ luaTHRD_pushudata(L, xent, fastnn_xent_tname);
+ return 1;
+}
+
+static int xent_tostring(lua_State *L)
+{
+ char str[STRLEN];
+ Xent *xent = luaTHRD_checkudata(L, 1, fastnn_xent_tname);
+ snprintf(str, STRLEN, "fastnn.xent <%lx>", Xent_id(xent));
+ lua_pushstring(L, str);
+ return 1;
+}
+
+static int xent_totalframes(lua_State *L)
+{
+ Xent *xent = luaTHRD_checkudata(L, 1, fastnn_xent_tname);
+ lua_pushinteger(L, xent->frames_);
+ return 1;
+}
+
+static int xent_correct(lua_State *L)
+{
+ Xent *xent = luaTHRD_checkudata(L, 1, fastnn_xent_tname);
+ lua_pushinteger(L, xent->correct_);
+ return 1;
+}
+
+static int xent_loss(lua_State *L)
+{
+ Xent *xent = luaTHRD_checkudata(L, 1, fastnn_xent_tname);
+ lua_pushnumber(L, xent->loss_);
+ return 1;
+}
+
+static int xent_entropy(lua_State *L)
+{
+ Xent *xent = luaTHRD_checkudata(L, 1, fastnn_xent_tname);
+ lua_pushnumber(L, xent->entropy_);
+ return 1;
+}
+
+static int xent_id(lua_State *L)
+{
+ Xent *xent = luaTHRD_checkudata(L, 1, fastnn_xent_tname);
+ lua_pushinteger(L, Xent_id(xent));
+ return 1;
+}
+
+static int xent_free(lua_State *L)
+{
+ Xent *xent = luaTHRD_checkudata(L, 1, fastnn_xent_tname);
+ Xent_free(xent);
+ return 0;
+}
+
+static int xent_add(lua_State *L)
+{
+ Xent *a = luaTHRD_checkudata(L, 1, fastnn_xent_tname);
+ Xent *b = luaTHRD_checkudata(L, 2, fastnn_xent_tname);
+ Xent_add(a, b);
+ return 0;
+}
+
+/////////////////////////////////////////////
+
+static int mse_new(lua_State *L)
+{
+ Mse *mse = NULL;
+ if(lua_gettop(L) == 0)
+ {
+ mse = Mse_new();
+ }
+ else if(lua_gettop(L) == 1)
+ {
+ long id = luaL_checklong(L, 1);
+ mse = Mse_newWithId(id);
+ }
+ else if(lua_gettop(L) == 2)
+ {
+ size_t frames_;
+ double loss_;
+ frames_ = luaL_checkinteger(L, 1);
+ loss_ = luaL_checknumber(L, 2);
+ mse = Mse_newWithParm(frames_, loss_);
+ }
+ else
+ luaL_error(L, "mse: mse new invalid arguments");
+ if(!mse)
+ luaL_error(L, "mse: mse new failed");
+
+ luaTHRD_pushudata(L, mse, fastnn_mse_tname);
+ return 1;
+
+}
+
+static int mse_tostring(lua_State *L)
+{
+ char str[STRLEN];
+ Mse *mse = luaTHRD_checkudata(L, 1, fastnn_mse_tname);
+ snprintf(str, STRLEN, "fastnn.mse <%lx>", Mse_id(mse));
+ lua_pushstring(L, str);
+ return 1;
+}
+
+static int mse_totalframes(lua_State *L)
+{
+ Mse *mse = luaTHRD_checkudata(L, 1, fastnn_mse_tname);
+ lua_pushinteger(L, mse->frames_);
+ return 1;
+}
+
+static int mse_loss(lua_State *L)
+{
+ Mse *mse = luaTHRD_checkudata(L, 1, fastnn_mse_tname);
+ lua_pushnumber(L, mse->loss_);
+ return 1;
+}
+
+static int mse_id(lua_State *L)
+{
+ Mse *mse = luaTHRD_checkudata(L, 1, fastnn_mse_tname);
+ lua_pushinteger(L, Mse_id(mse));
+ return 1;
+}
+
+static int mse_free(lua_State *L)
+{
+ Mse *mse = luaTHRD_checkudata(L, 1, fastnn_mse_tname);
+ Mse_free(mse);
+ return 0;
+}
+
+static int mse_add(lua_State *L)
+{
+ Mse *a = luaTHRD_checkudata(L, 1, fastnn_mse_tname);
+ Mse *b = luaTHRD_checkudata(L, 2, fastnn_mse_tname);
+ Mse_add(a, b);
+ return 0;
+}
+/////////////////////////////////////////////
+
+static int global_option_new(lua_State *L)
+{
+ GlobalOption *global_option = NULL;
+ if(lua_gettop(L) == 0)
+ {
+ global_option = GlobalOption_new();
+ }
+ else if(lua_gettop(L) == 1)
+ {
+ long id = luaL_checklong(L, 1);
+ global_option = GlobalOption_newWithId(id);
+ }
+ else if(lua_gettop(L) > 3)
+ {
+ int batch_size = luaL_checkinteger(L, 1);
+ float lrate = luaL_checknumber(L, 2);
+ bool bp = lua_toboolean(L, 3);
+ const char *tr_scp = lua_tostring(L, 4);
+ const char *cv_scp = lua_tostring(L, 5);
+ const char *transf = lua_tostring(L, 6);
+ const char *network = lua_tostring(L, 7);
+ global_option = GlobalOption_newWithParm(batch_size, lrate, bp, tr_scp, cv_scp, transf, network);
+ }
+ else
+ luaL_error(L, "global_option: global_option new invalid arguments");
+ if(!global_option)
+ luaL_error(L, "global_option: global_option new failed");
+
+ luaTHRD_pushudata(L, global_option, fastnn_global_option_tname);
+ return 1;
+
+}
+
+static int global_option_tostring(lua_State *L)
+{
+ char str[STRLEN];
+ GlobalOption *global_option = luaTHRD_checkudata(L, 1, fastnn_global_option_tname);
+ snprintf(str, STRLEN, "fastnn.global_option <%lx>", GlobalOption_id(global_option));
+ lua_pushstring(L, str);
+ return 1;
+}
+
+static int global_option_id(lua_State *L)
+{
+ GlobalOption *global_option = luaTHRD_checkudata(L, 1, fastnn_global_option_tname);
+ lua_pushinteger(L, GlobalOption_id(global_option));
+ return 1;
+}
+
+static int global_option_free(lua_State *L)
+{
+ GlobalOption *global_option = luaTHRD_checkudata(L, 1, fastnn_global_option_tname);
+ GlobalOption_free(global_option);
+ return 0;
+}
+
+static int global_option_batch_size(lua_State *L)
+{
+ GlobalOption *global_option = luaTHRD_checkudata(L, 1, fastnn_global_option_tname);
+ lua_pushinteger(L, global_option->batch_size);
+ return 1;
+}
+
+static int global_option_lrate(lua_State *L)
+{
+ GlobalOption *global_option = luaTHRD_checkudata(L, 1, fastnn_global_option_tname);
+ lua_pushnumber(L, global_option->lrate);
+ return 1;
+}
+
+static int global_option_bp(lua_State *L)
+{
+ GlobalOption *global_option = luaTHRD_checkudata(L, 1, fastnn_global_option_tname);
+ lua_pushboolean(L, global_option->bp);
+ return 1;
+}
+
+static int global_option_tr_scp(lua_State *L)
+{
+ GlobalOption *global_option = luaTHRD_checkudata(L, 1, fastnn_global_option_tname);
+ lua_pushstring(L, global_option->tr_scp);
+ return 1;
+}
+
+static int global_option_cv_scp(lua_State *L)
+{
+ GlobalOption *global_option = luaTHRD_checkudata(L, 1, fastnn_global_option_tname);
+ lua_pushstring(L, global_option->cv_scp);
+ return 1;
+}
+
+static int global_option_transf(lua_State *L)
+{
+ GlobalOption *global_option = luaTHRD_checkudata(L, 1, fastnn_global_option_tname);
+ lua_pushstring(L, global_option->transf);
+ return 1;
+}
+
+static int global_option_network(lua_State *L)
+{
+ GlobalOption *global_option = luaTHRD_checkudata(L, 1, fastnn_global_option_tname);
+ lua_pushstring(L, global_option->network);
+ return 1;
+}
+
+
+//////////////////////////////////////////////
+
+static const struct luaL_Reg model_sync__ [] = {
+ {"new", model_sync_new},
+ {"__tostring", model_sync_tostring},
+ {"id", model_sync_id},
+ {"lockmodel", model_sync_lockmodel},
+ {"unlockmodel", model_sync_unlockmodel},
+ {"lockstate", model_sync_lockstate},
+ {"unlockstate", model_sync_unlockstate},
+ {"initbuffer", model_sync_initbuffer},
+ {"setpos", model_sync_setpos},
+ {"initialized", model_sync_initialized},
+ {"weightfromd", model_sync_weightfromd},
+ {"weighttod", model_sync_weighttod},
+ {"threadcount", model_sync_threadcount},
+ {"syncinc", model_sync_syncinc},
+ {"syncdec", model_sync_syncdec},
+ {"threadcount", model_sync_threadcount},
+ {"free", model_sync_free},
+ {NULL, NULL}
+};
+
+
+static const struct luaL_Reg xent__ [] = {
+ {"new", xent_new},
+ {"__tostring", xent_tostring},
+ {"id", xent_id},
+ {"totalframes", xent_totalframes},
+ {"correct", xent_correct},
+ {"loss", xent_loss},
+ {"entropy", xent_entropy},
+ {"add", xent_add},
+ {"free", xent_free},
+ {NULL, NULL}
+};
+
+static const struct luaL_Reg mse__ [] = {
+ {"new", mse_new},
+ {"__tostring", mse_tostring},
+ {"id", mse_id},
+ {"totalframes", mse_totalframes},
+ {"loss", mse_loss},
+ {"add", mse_add},
+ {"free", mse_free},
+ {NULL, NULL}
+};
+
+
+static const struct luaL_Reg global_option__ [] = {
+ {"new", global_option_new},
+ {"__tostring", global_option_tostring},
+ {"id", global_option_id},
+ {"batch_size", global_option_batch_size},
+ {"lrate", global_option_lrate},
+ {"bp", global_option_bp},
+ {"tr_scp", global_option_tr_scp},
+ {"cv_scp", global_option_cv_scp},
+ {"transf", global_option_transf},
+ {"network", global_option_network},
+ {"free", global_option_free},
+ {NULL, NULL}
+};
+
+void fastnn_init_modelsync(lua_State *L)
+{
+ luaT_newmetatable(L, fastnn_model_sync_tname, NULL, model_sync_new, NULL, NULL);
+ luaL_register(L, NULL, model_sync__);
+ lua_pop(L, 1);
+
+ luaT_newmetatable(L, fastnn_xent_tname, NULL, xent_new, xent_free, NULL);
+ luaL_register(L, NULL, xent__);
+ lua_pop(L, 1);
+
+ luaT_newmetatable(L, fastnn_mse_tname, NULL, mse_new, mse_free, NULL);
+ luaL_register(L, NULL, mse__);
+ lua_pop(L, 1);
+
+ luaT_newmetatable(L, fastnn_global_option_tname, NULL, global_option_new, global_option_free, NULL);
+ luaL_register(L, NULL, global_option__);
+ lua_pop(L, 1);
+ /*
+ printf("%s %lx\n", model_sync__[13].name, model_sync__[13].func);
+
+ if(!luaL_newmetatable(L, fastnn_model_sync_tname))
+ luaL_error(L, "fastnn: fastnn.modelsync type already exists");
+ luaL_setfuncs(L, model_sync__, 0);
+ lua_pushstring(L, "__index");
+ lua_pushvalue(L, -2);
+ lua_rawset(L, -3);
+ lua_pop(L, 1);
+
+ printf("%s %lx\n", model_sync__[13].name, model_sync__[13].func);
+
+ lua_pushstring(L, "modelsync");
+ luaTHRD_pushctortable(L, model_sync_new, fastnn_model_sync_tname);
+ lua_rawset(L, -3);
+ printf("%s %lx\n", model_sync__[13].name, model_sync__[13].func);
+ */
+}
+
+
diff --git a/fastnn/lib/modelsync.lua b/fastnn/lib/modelsync.lua
new file mode 100644
index 0000000..a247562
--- /dev/null
+++ b/fastnn/lib/modelsync.lua
@@ -0,0 +1,107 @@
+
+local C = require 'libfastnn'
+local T = require 'libthreads'
+
+local ModelSync = nerv.class("fastnn.ModelSync")
+
+fastnn.CModelSync = C.CModelSync
+fastnn.Thread = T.Thread
+
+
+function ModelSync:__init(shareid)
+ self.modelsync = fastnn.CModelSync(shareid)
+-- print(self.modelsync.initbuffer)
+ --print(self.modelsync.setpos)
+ --print(self.modelsync.initialized)
+ --print(self.modelsync.weightfromd)
+-- print(self.modelsync.weighttod)
+-- print(self.modelsync.aaaa)
+-- print(self.modelsync.bbbb)
+-- print(self.modelsync.cccc)
+end
+
+function ModelSync:GetDim(nnet)
+
+ local repo = nnet:get_params()
+ local params = repo.params
+ local dim = 0
+ for pid, ref in pairs(params) do
+ if nerv.is_type(ref.trans, "nerv.Matrix") then
+ dim = dim + ref.trans:nrow() * ref.trans:nstride()
+ end
+ end
+
+ return dim
+end
+
+
+function ModelSync:Initialize(nnet)
+
+ self:LockModel()
+
+ if not self.modelsync:initialized() then
+ dim = self:GetDim(nnet)
+ self.modelsync:initbuffer(dim)
+ self:WeightFromD(nnet)
+ end
+
+ self:UnLockModel()
+end
+
+function ModelSync:WeightFromD(nnet)
+ local repo = nnet:get_params()
+ local params = repo.params
+ self.modelsync:setpos(0)
+ for pid, ref in pairs(params) do
+ if nerv.is_type(ref.trans, "nerv.Matrix") then
+ self.modelsync:weightfromd(ref.trans)
+ end
+ end
+end
+
+function ModelSync:WeightToD(nnet)
+ local repo = nnet:get_params()
+ local params = repo.params
+ self.modelsync:setpos(0)
+ for pid, ref in pairs(params) do
+ if nerv.is_type(ref.trans, "nerv.Matrix") then
+ self.modelsync:weighttod(ref.trans)
+ end
+ end
+end
+
+function ModelSync:LockState()
+ self.modelsync:lockstate()
+end
+
+function ModelSync:UnLockState()
+ self.modelsync:unlockstate()
+end
+
+
+function ModelSync:LockModel()
+ self.modelsync:lockmodel()
+end
+
+
+function ModelSync:UnLockModel()
+ self.modelsync:unlockmodel()
+end
+
+function ModelSync:Id()
+ return self.modelsync:id()
+end
+
+function ModelSync:ThreadCount()
+ return self.modelsync:threadcount()
+end
+
+function ModelSync:SyncInc()
+ return self.modelsync:syncinc()
+end
+
+function ModelSync:SyncDec()
+ return self.modelsync:syncdec()
+end
+
+
diff --git a/fastnn/test.lua b/fastnn/test.lua
new file mode 100644
index 0000000..3b56eb1
--- /dev/null
+++ b/fastnn/test.lua
@@ -0,0 +1,57 @@
+require 'fastnn'
+
+function build_trainer(ifname)
+ local param_repo = nerv.ParamRepo()
+ param_repo:import(ifname, nil, gconf)
+ local sublayer_repo = make_sublayer_repo(param_repo)
+ local layer_repo = make_layer_repo(sublayer_repo, param_repo)
+ local nnet = get_network(layer_repo)
+ local input_order = get_input_order()
+
+
+ local iterative_trainer = function (prefix, scp_file, bp)
+
+ -- build buffer
+ local buffer = make_buffer(make_readers(scp_file, layer_repo))
+
+ --[[local control = fastnn.modelsync();
+ local lua_control = fastnn.ModelSync(control:id())
+ print(control:__tostring())
+ print(lua_control:GetDim(nnet))
+ lua_control:Initialize(nnet)
+ lua_control:WeightToD(nnet)
+ lua_control:WeightToD(nnet)
+ ]]
+
+ local example_repo = fastnn.CExamplesRepo(128, false)
+ -- print(example_repo)
+ local share_repo = fastnn.CExamplesRepo(example_repo:id(), true)
+ feat_id = get_feat_id()
+
+ local t = 1;
+ for data in buffer.get_data, buffer do
+ local example = fastnn.Example.PrepareData(data, layer_repo.global_transf, feat_id)
+ print(example)
+ share_repo:accept(example)
+ end
+
+ end
+ return iterative_trainer
+end
+
+
+dofile(arg[1])
+start_halving_inc = 0.5
+halving_factor = 0.6
+end_halving_inc = 0.1
+min_iter = 1
+max_iter = 20
+min_halving = 5
+gconf.batch_size = 256
+gconf.buffer_size = 81920
+
+local pf0 = gconf.initialized_param
+local trainer = build_trainer(pf0)
+
+local accu_best = trainer(nil, gconf.cv_scp, false)
+
diff --git a/fastnn/threads/Makefile b/fastnn/threads/Makefile
new file mode 100644
index 0000000..4205adc
--- /dev/null
+++ b/fastnn/threads/Makefile
@@ -0,0 +1,43 @@
+.PHONY: build install clean
+SHELL := /bin/bash
+BUILD_DIR := $(CURDIR)/build
+INC_PATH := $(LUA_BINDIR)/../include/nerv/luaT
+LUA_DIR = $(INST_LUADIR)/threads
+LIB_PATH := $(LUA_BINDIR)/../lib
+
+OBJ_DIR := $(BUILD_DIR)/objs
+SUBDIR := lib test
+
+
+OBJS := lib/init.o lib/threads.o lib/THThread.o
+LIBS := $(INST_LIBDIR)/libthreads.so
+LUA_LIBS := init.lua threads.lua test/test-threads.lua
+INCLUDE := -I $(LUA_INCDIR) -I $(INC_PATH) -DLUA_USE_APICHECK -DUSE_PTHREAD_THREADS
+
+
+OBJS := $(addprefix $(OBJ_DIR)/,$(OBJS))
+OBJ_SUBDIR := $(addprefix $(OBJ_DIR)/,$(SUBDIR))
+LUA_SUBDIR := $(addprefix $(LUA_DIR)/,$(SUBDIR))
+LUA_LIBS := $(addprefix $(LUA_DIR)/,$(LUA_LIBS))
+
+CFLAGS := -Wall -Wextra -O0
+
+build: $(OBJ_DIR) $(OBJ_SUBDIR) $(OBJS)
+install: $(LUA_DIR) $(LUA_SUBDIR) $(LIBS) $(LUA_LIBS)
+
+$(OBJ_DIR) $(LUA_DIR) $(OBJ_SUBDIR) $(LUA_SUBDIR):
+ -mkdir -p $@
+$(LUA_DIR)/%.lua: %.lua
+ cp $< $@
+
+$(OBJ_DIR)/%.o: %.c $(patsubst /%.o,/%.c,$@)
+ gcc -c -o $@ $< $(INCLUDE) -fPIC $(CFLAGS)
+
+$(LIBS): $(OBJS)
+ gcc -shared -o $@ $^ $(LDFLAGS) -Wl,-rpath=$(LIB_PATH) -L$(LIB_PATH) -lnervcore -lluaT -lpthread
+ cp $@ $(LIB_PATH)/
+
+clean:
+ -rm -rf $(OBJ_DIR)
+
+
diff --git a/fastnn/threads/init.lua b/fastnn/threads/init.lua
new file mode 100644
index 0000000..6cd3679
--- /dev/null
+++ b/fastnn/threads/init.lua
@@ -0,0 +1,14 @@
+threads = {}
+
+local C = require 'libthreads'
+
+threads.Thread = C.Thread
+threads.Mutex = C.Mutex
+threads.Condition = C.Condition
+threads.Semaphore = C.Semaphore
+--threads.Threads = require 'threads.threads'
+
+-- only for backward compatibility (boo)
+-- setmetatable(threads, getmetatable(threads.Threads))
+
+-- return threads
diff --git a/fastnn/threads/lib/THThread.c b/fastnn/threads/lib/THThread.c
new file mode 100644
index 0000000..7abc044
--- /dev/null
+++ b/fastnn/threads/lib/THThread.c
@@ -0,0 +1,349 @@
+#ifndef TH_THREAD_INC
+#define TH_THREAD_INC
+
+#include <stdlib.h>
+#include <string.h>
+
+/*#include "TH.h" */
+#include "THThread.h"
+
+#if defined(USE_PTHREAD_THREADS)
+#include <pthread.h>
+
+#elif defined(USE_WIN32_THREADS)
+
+/* very basic emulation to suit our needs */
+
+#include <process.h>
+#include <windows.h>
+
+typedef HANDLE pthread_t;
+typedef DWORD pthread_attr_t;
+typedef HANDLE pthread_mutex_t;
+typedef HANDLE pthread_cond_t;
+
+static int pthread_create(pthread_t *restrict thread,
+ const pthread_attr_t *restrict attr, void *(*start_routine)(void *),
+ void *restrict arg)
+{
+ *thread = (HANDLE)_beginthreadex(NULL, 0, (THREAD_FUNCTION)start_routine, arg, 0, NULL);
+ return (int)(*thread == NULL);
+}
+
+static int pthread_join(pthread_t thread, void **value_ptr)
+{
+ return ((WaitForSingleObject((thread), INFINITE) != WAIT_OBJECT_0) || !CloseHandle(thread));
+}
+
+static int pthread_mutex_init(pthread_mutex_t *restrict mutex,
+ const pthread_mutexattr_t *restrict attr)
+{
+ *mutex = CreateMutex(NULL, FALSE, NULL);
+ return (int)(*mutex == NULL);
+}
+
+static int pthread_mutex_lock(pthread_mutex_t *mutex)
+{
+ return WaitForSingleObject(*mutex, INFINITE) == 0;
+}
+
+static int pthread_mutex_unlock(pthread_mutex_t *mutex)
+{
+ return ReleaseMutex(*mutex) == 0;
+}
+
+static int pthread_mutex_destroy(pthread_mutex_t *mutex)
+{
+ return CloseHandle(*mutex) == 0;
+}
+
+static int pthread_cond_init(pthread_cond_t *restrict cond,
+ const pthread_condattr_t *restrict attr)
+{
+ *cond = CreateEvent(NULL, FALSE, FALSE, NULL);
+ return (int)(*cond == NULL);
+}
+
+static int pthread_cond_wait(pthread_cond_t *restrict cond,
+ pthread_mutex_t *restrict mutex)
+{
+ SignalObjectAndWait(*mutex, *cond, INFINITE, FALSE);
+ return WaitForSingleObject(*mutex, INFINITE) == 0;
+}
+
+static int pthread_cond_destroy(pthread_cond_t *cond)
+{
+ return CloseHandle(*cond) == 0;
+}
+
+int pthread_cond_signal(pthread_cond_t *cond)
+{
+ return SetEvent(*cond) == 0;
+}
+
+#else
+#error no thread system available
+#endif
+
+typedef struct THThread_ {
+ pthread_t id;
+ int (*func)(void*);
+ void* data;
+ int status;
+} THThread;
+
+typedef struct THMutex_{
+ pthread_mutex_t id;
+ int refcount;
+} THMutex;
+
+typedef struct THCondition_ {
+ pthread_cond_t id;
+ int refcount;
+} THCondition;
+
+typedef struct THSemaphore_ {
+ pthread_mutex_t mutex_;
+ pthread_cond_t cond_;
+ int counter_;
+ int refcount;
+}THSemaphore;
+
+static void* thread_closure(void *data)
+{
+ THThread *thread = data;
+ thread->status = thread->func(thread->data);
+ return NULL;
+}
+
+THThread* THThread_new(int (*func)(void*), void *data)
+{
+ THThread *self = malloc(sizeof(THThread));
+ self->func = func;
+ self->data = data;
+ self->status = 0;
+ if(!self)
+ return NULL;
+ if(pthread_create(&self->id, NULL, thread_closure, self)) {
+ free(self);
+ return NULL;
+ }
+ return self;
+}
+
+long THThread_id(THThread *self)
+{
+ return (long)(self);
+}
+
+int THThread_free(THThread *self)
+{
+ int status = 1;
+ if(self) {
+ if(pthread_join(self->id, NULL))
+ return 1;
+ status = self->status;
+ free(self);
+ self = NULL;
+ }
+ return status;
+}
+
+THMutex* THMutex_new(void)
+{
+ THMutex *self = malloc(sizeof(THMutex));
+ if(!self)
+ return NULL;
+ if(pthread_mutex_init(&self->id, NULL) != 0) {
+ free(self);
+ return NULL;
+ }
+ self->refcount = 1;
+ return self;
+}
+
+THMutex* THMutex_newWithId(long id)
+{
+ THMutex *self = (THMutex*)id;
+ __sync_fetch_and_add(&self->refcount, 1);
+ return self;
+}
+
+long THMutex_id(THMutex *self)
+{
+ return (long)(self);
+}
+
+int THMutex_lock(THMutex *self)
+{
+ if(pthread_mutex_lock(&self->id) != 0)
+ return 1;
+ return 0;
+}
+
+int THMutex_trylock(THMutex *self)
+{
+ if (pthread_mutex_trylock(&self->id) != 0)
+ return 1;
+ return 0;
+}
+
+int THMutex_unlock(THMutex *self)
+{
+ if(pthread_mutex_unlock(&self->id) != 0)
+ return 1;
+ return 0;
+}
+
+void THMutex_free(THMutex *self)
+{
+ if(self) {
+ if(__sync_fetch_and_add(&self->refcount, -1) == 1) {
+ pthread_mutex_destroy(&self->id);
+ free(self);
+ self = NULL;
+ }
+ }
+}
+
+THCondition* THCondition_new(void)
+{
+ THCondition *self = malloc(sizeof(THCondition));
+ if(!self)
+ return NULL;
+ if(pthread_cond_init(&self->id, NULL)) {
+ free(self);
+ return NULL;
+ }
+ self->refcount = 1;
+ return self;
+}
+
+THCondition* THCondition_newWithId(long id)
+{
+ THCondition *self = (THCondition*)id;
+ __sync_fetch_and_add(&self->refcount, 1);
+ return self;
+}
+
+long THCondition_id(THCondition *self)
+{
+ return (long)(self);
+}
+
+int THCondition_signal(THCondition *self)
+{
+ if(pthread_cond_signal(&self->id))
+ return 1;
+ return 0;
+}
+
+int THCondition_wait(THCondition *self, THMutex *mutex)
+{
+ if(pthread_cond_wait(&self->id, &mutex->id))
+ return 1;
+ return 0;
+}
+
+void THCondition_free(THCondition *self)
+{
+ if(self) {
+ if(__sync_fetch_and_add(&self->refcount, -1) == 1) {
+ pthread_cond_destroy(&self->id);
+ free(self);
+ self = NULL;
+ }
+ }
+}
+
+THSemaphore* THSemaphore_new(int initValue)
+{
+ THSemaphore *self = malloc(sizeof(THSemaphore));
+ if(!self)
+ return NULL;
+ self->counter_ = initValue;
+ if(pthread_mutex_init(&self->mutex_, NULL) != 0)
+ {
+ free(self);
+ return NULL;
+ }
+
+ if(pthread_cond_init(&self->cond_, NULL) != 0)
+ {
+ free(self);
+ return NULL;
+ }
+
+ self->refcount = 1;
+
+ return self;
+}
+
+THSemaphore* THSemaphore_newWithId(long id)
+{
+ THSemaphore *self = (THSemaphore*)id;
+ __sync_fetch_and_add(&self->refcount, 1);
+ return self;
+}
+
+long THSemaphore_id(THSemaphore *self)
+{
+ return (long)(self);
+}
+
+int THSemaphore_wait(THSemaphore *self)
+{
+ int ret = 0;
+ ret |= pthread_mutex_lock(&self->mutex_);
+
+ while (self->counter_ <= 0)
+ ret |= pthread_cond_wait(&self->cond_, &self->mutex_);
+
+ self->counter_--;
+ ret |= pthread_mutex_unlock(&self->mutex_);
+
+ return ret;
+
+}
+
+int THSemaphore_signal(THSemaphore *self)
+{
+ int ret = 0;
+ ret |= pthread_mutex_lock(&self->mutex_);
+ self->counter_++;
+ ret |= pthread_cond_signal(&self->cond_);
+ ret |= pthread_mutex_unlock(&self->mutex_);
+
+ return ret;
+}
+
+int THSemaphore_trywait(THSemaphore *self)
+{
+ int ret = 0;
+ int try_wait_succeeded = 1;
+ ret |= pthread_mutex_lock(&self->mutex_);
+ if (self->counter_ > 0) {
+ self->counter_--;
+ try_wait_succeeded = 0;
+ }
+ ret |= pthread_mutex_unlock(&self->mutex_);
+ return try_wait_succeeded;
+}
+
+void THSemaphore_free(THSemaphore *self)
+{
+ if(self)
+ {
+ if(__sync_fetch_and_add(&self->refcount, -1) == 1)
+ {
+ pthread_mutex_destroy(&self->mutex_);
+ pthread_cond_destroy(&self->cond_);
+ free(self);
+ self = NULL;
+ }
+ }
+ }
+
+
+
+#endif
diff --git a/fastnn/threads/lib/THThread.h b/fastnn/threads/lib/THThread.h
new file mode 100644
index 0000000..75ba417
--- /dev/null
+++ b/fastnn/threads/lib/THThread.h
@@ -0,0 +1,36 @@
+#ifndef TH_THREAD_INC
+#define TH_THREAD_INC
+
+typedef struct THThread_ THThread;
+typedef struct THMutex_ THMutex;
+typedef struct THCondition_ THCondition;
+typedef struct THSemaphore_ THSemaphore;
+
+THThread* THThread_new(int (*closure)(void*), void *data);
+long THThread_id(THThread *self);
+int THThread_free(THThread *self);
+
+THMutex* THMutex_new(void);
+THMutex* THMutex_newWithId(long id);
+long THMutex_id(THMutex *self);
+int THMutex_lock(THMutex *self);
+int THMutex_unlock(THMutex *self);
+int THMutex_trylock(THMutex *self);
+void THMutex_free(THMutex *self);
+
+THCondition* THCondition_new(void);
+THCondition* THCondition_newWithId(long id);
+long THCondition_id(THCondition *self);
+int THCondition_signal(THCondition *self);
+int THCondition_wait(THCondition *self, THMutex *mutex);
+void THCondition_free(THCondition *self);
+
+THSemaphore* THSemaphore_new(int initValue);
+THSemaphore* THSemaphore_newWithId(long id);
+long THSemaphore_id(THSemaphore *self);
+int THSemaphore_signal(THSemaphore *self);
+int THSemaphore_wait(THSemaphore *self);
+int THSemaphore_trywait(THSemaphore *self);
+void THSemaphore_free(THSemaphore *self);
+
+#endif
diff --git a/fastnn/threads/lib/init.c b/fastnn/threads/lib/init.c
new file mode 100644
index 0000000..4042189
--- /dev/null
+++ b/fastnn/threads/lib/init.c
@@ -0,0 +1,27 @@
+#include <lua.h>
+#include <lauxlib.h>
+
+#if LUA_VERSION_NUM == 501
+static void luaL_setfuncs(lua_State *L, const luaL_Reg *l, int nup)
+{
+ luaL_checkstack(L, nup+1, "too many upvalues");
+ for (; l->name != NULL; l++) { /* fill the table with given functions */
+ int i;
+ lua_pushstring(L, l->name);
+ for (i = 0; i < nup; i++) /* copy upvalues to the top */
+ lua_pushvalue(L, -(nup+1));
+ lua_pushcclosure(L, l->func, nup); /* closure with those upvalues */
+ lua_settable(L, -(nup + 3));
+ }
+ lua_pop(L, nup); /* remove upvalues */
+}
+#endif
+
+#include "threads.c"
+
+int luaopen_libthreads(lua_State *L)
+{
+ lua_newtable(L);
+ thread_init_pkg(L);
+ return 1;
+}
diff --git a/fastnn/threads/lib/luaTHRD.h b/fastnn/threads/lib/luaTHRD.h
new file mode 100644
index 0000000..3760bdb
--- /dev/null
+++ b/fastnn/threads/lib/luaTHRD.h
@@ -0,0 +1,84 @@
+#ifndef LUA_THRD_INC
+#define LUA_THRD_INC
+
+static int luaTHRD_pushudata(lua_State *L, void *ptr, const char* typename)
+{
+ void **udata = lua_newuserdata(L, sizeof(void*));
+ if(udata) {
+ *udata = ptr;
+ luaL_getmetatable(L, typename);
+ lua_setmetatable(L, -2);
+ return 1;
+ }
+ return 0;
+}
+
+static void *luaTHRD_checkudata(lua_State *L, int narg, const char *typename)
+{
+ void **udata = luaL_checkudata(L, narg, typename);
+ if(udata)
+ return *udata;
+ else
+ return NULL;
+}
+
+static void *luaTHRD_toudata(lua_State *L, int narg, const char *typename)
+{
+ void **udata = lua_touserdata(L, narg);
+ if(udata) {
+ if(lua_getmetatable(L, -1)) {
+ luaL_getmetatable(L, typename);
+ if(lua_equal(L, -1, -2)) {
+ lua_pop(L, 2);
+ return *udata;
+ }
+ else {
+ lua_pop(L, 2);
+ return NULL;
+ }
+ }
+ else
+ return NULL;
+ }
+ else
+ return NULL;
+}
+
+static int luaTHRD_ctor(lua_State *L)
+{
+ if(!lua_istable(L, 1)) /* dummy ctor table */
+ luaL_error(L, "ctor: table expected");
+ lua_getmetatable(L, 1);
+ lua_remove(L, 1); /* dummy ctor table */
+ if(!lua_istable(L, -1))
+ luaL_error(L, "ctor: no metatable found");
+ lua_pushstring(L, "__new");
+ lua_rawget(L, -2);
+ lua_remove(L, -2); /* forget about metatable */
+ if(!lua_isfunction(L, -1))
+ luaL_error(L, "ctor: __new appears to be not a function");
+ lua_insert(L, 1); /* ctor first, arguments follow */
+ lua_call(L, lua_gettop(L)-1, LUA_MULTRET);
+ return lua_gettop(L);
+}
+
+static void luaTHRD_pushctortable(lua_State *L, lua_CFunction ctor, const char* typename)
+{
+ lua_newtable(L); /* empty useless dude */
+ lua_newtable(L); /* metatable of the dude */
+ lua_pushstring(L, "__index");
+ luaL_getmetatable(L, typename);
+ lua_rawset(L, -3);
+ lua_pushstring(L, "__newindex");
+ luaL_getmetatable(L, typename);
+ lua_rawset(L, -3);
+ lua_pushstring(L, "__new"); /* __call will look into there */
+ lua_pushcfunction(L, ctor);
+ lua_rawset(L, -3);
+ lua_pushstring(L, "__call"); /* pop the table and calls __new */
+ lua_pushcfunction(L, luaTHRD_ctor);
+ lua_rawset(L, -3);
+ lua_setmetatable(L, -2);
+}
+
+#endif
diff --git a/fastnn/threads/lib/threads.c b/fastnn/threads/lib/threads.c
new file mode 100644
index 0000000..bf2a5ec
--- /dev/null
+++ b/fastnn/threads/lib/threads.c
@@ -0,0 +1,355 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <luaT.h>
+#include <string.h>
+
+#include "THThread.h"
+#include "luaTHRD.h"
+
+#include <lua.h>
+#include <lualib.h>
+
+static int newthread(void *code_)
+{
+ char *code = code_;
+ lua_State *L = luaL_newstate();
+
+ if(!L) {
+ printf("THREAD FATAL ERROR: could not create lua state\n");
+ return -1;
+ }
+ luaL_openlibs(L);
+
+ if(luaL_loadstring(L, code)) {
+ printf("FATAL THREAD PANIC: (loadstring) %s\n", lua_tolstring(L, -1, NULL));
+ free(code);
+ lua_close(L);
+ return -1;
+ }
+ free(code);
+ if(lua_pcall(L, 0, 0, 0)) {
+ printf("FATAL THREAD PANIC: (pcall) %s\n", lua_tolstring(L, -1, NULL));
+ lua_close(L);
+ return -1;
+ }
+
+ lua_close(L);
+ return 0;
+}
+
+static int thread_new(lua_State *L)
+{
+ THThread *thread = NULL;
+ size_t len = 0;
+ const char *code = luaL_checklstring(L, 1, &len);
+ char *code_dup = malloc(len+1);
+ if(!code_dup)
+ luaL_error(L, "threads: out of memory");
+ memcpy(code_dup, code, len+1);
+
+ thread = THThread_new(newthread, (void*)code_dup);
+ if(!thread)
+ luaL_error(L, "threads: thread new failed");
+ luaTHRD_pushudata(L, thread, "threads.Thread");
+
+ return 1;
+}
+
+static int thread_tostring(lua_State *L)
+{
+ char str[128];
+ THThread *thread = luaTHRD_checkudata(L, 1, "threads.Thread");
+ snprintf(str, 128, "threads.Thread <%lx>", THThread_id(thread));
+ lua_pushstring(L, str);
+ return 1;
+}
+
+static int thread_id(lua_State *L)
+{
+ THThread *thread = luaTHRD_checkudata(L, 1, "threads.Thread");
+ lua_pushinteger(L, THThread_id(thread));
+ return 1;
+}
+
+static int thread_free(lua_State *L)
+{
+ THThread *thread = luaTHRD_checkudata(L, 1, "threads.Thread");
+ THThread_free(thread);
+ return 0;
+}
+
+static int mutex_new(lua_State *L)
+{
+ THMutex *mutex = NULL;
+ if(lua_gettop(L) == 0) {
+ mutex = THMutex_new();
+ }
+ else if(lua_gettop(L) == 1) {
+ long id = luaL_checklong(L, 1);
+ mutex = THMutex_newWithId(id);
+ }
+ else
+ luaL_error(L, "threads: mutex new invalid arguments");
+ if(!mutex)
+ luaL_error(L, "threads: mutex new failed");
+ luaTHRD_pushudata(L, mutex, "threads.Mutex");
+ return 1;
+}
+
+static int mutex_tostring(lua_State *L)
+{
+ char str[128];
+ THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex");
+ snprintf(str, 128, "threads.Mutex <%lx>", THMutex_id(mutex));
+ lua_pushstring(L, str);
+ return 1;
+}
+
+static int mutex_id(lua_State *L)
+{
+ THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex");
+ lua_pushinteger(L, THMutex_id(mutex));
+ return 1;
+}
+
+static int mutex_lock(lua_State *L)
+{
+ THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex");
+ if(THMutex_lock(mutex))
+ luaL_error(L, "threads: mutex lock failed");
+ return 0;
+}
+
+static int mutex_unlock(lua_State *L)
+{
+ THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex");
+ if(THMutex_unlock(mutex))
+ luaL_error(L, "threads: mutex unlock failed");
+ return 0;
+}
+
+static int mutex_free(lua_State *L)
+{
+ THMutex *mutex = luaTHRD_checkudata(L, 1, "threads.Mutex");
+ THMutex_free(mutex);
+ return 0;
+}
+
+static int condition_new(lua_State *L)
+{
+ THCondition *condition = NULL;
+ if(lua_gettop(L) == 0) {
+ condition = THCondition_new();
+ }
+ else if(lua_gettop(L) == 1) {
+ long id = luaL_checklong(L, 1);
+ condition = THCondition_newWithId(id);
+ }
+ else
+ luaL_error(L, "threads: condition new invalid arguments");
+ if(!condition)
+ luaL_error(L, "threads: condition new failed");
+ luaTHRD_pushudata(L, condition, "threads.Condition");
+ return 1;
+}
+
+static int condition_tostring(lua_State *L)
+{
+ char str[128];
+ THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition");
+ snprintf(str, 128, "threads.Condition <%lx>", THCondition_id(condition));
+ lua_pushstring(L, str);
+ return 1;
+}
+
+static int condition_id(lua_State *L)
+{
+ THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition");
+ lua_pushinteger(L, THCondition_id(condition));
+ return 1;
+}
+
+static int condition_free(lua_State *L)
+{
+ THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition");
+ if(!condition)
+ luaL_error(L, "threads: condition free failed");
+ THCondition_free(condition);
+ return 0;
+}
+
+static int condition_signal(lua_State *L)
+{
+ THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition");
+ if(THCondition_signal(condition))
+ luaL_error(L, "threads: condition signal failed");
+ return 0;
+}
+
+static int condition_wait(lua_State *L)
+{
+ THCondition *condition = luaTHRD_checkudata(L, 1, "threads.Condition");
+ THMutex *mutex = luaTHRD_checkudata(L, 2, "threads.Mutex");
+ if(THCondition_wait(condition, mutex))
+ luaL_error(L, "threads: condition wait failed");
+ return 0;
+}
+
+static int semaphore_new(lua_State *L)
+{
+ THSemaphore* semaphore = NULL;
+ if(lua_gettop(L) == 1) {
+ int initValue = luaL_checkinteger(L, 1);
+ semaphore = THSemaphore_new(initValue);
+ }
+ else
+ luaL_error(L, "threads: semaphore new invalid arguments");
+ if (!semaphore)
+ luaL_error(L, "threads: semaphore new failed");
+ luaTHRD_pushudata(L, semaphore, "threads.Semaphore");
+ return 1;
+}
+
+static int semaphore_newfromid(lua_State *L)
+{
+ THSemaphore* semaphore = NULL;
+ if(lua_gettop(L) == 1) {
+ long id = luaL_checklong(L, 1);
+ semaphore = THSemaphore_newWithId(id);
+ }
+ else
+ luaL_error(L, "threads: semaphore new invalid arguments");
+ if(!semaphore)
+ luaL_error(L, "threads: semaphore new failed");
+ luaTHRD_pushudata(L, semaphore, "threads.Semaphore");
+ return 1;
+}
+
+static int semaphore_id(lua_State *L)
+{
+ THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore");
+ lua_pushinteger(L, THSemaphore_id(semaphore));
+ return 1;
+}
+
+static int semaphore_signal(lua_State *L)
+{
+ THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore");
+ if(THSemaphore_signal(semaphore))
+ luaL_error(L, "threads: semaphore signal failed");
+ return 0;
+}
+
+static int semaphore_wait(lua_State *L)
+{
+ THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore");
+ if(THSemaphore_wait(semaphore))
+ luaL_error(L, "threads: semaphore wait failed");
+ return 0;
+}
+
+static int semaphore_trywait(lua_State *L)
+{
+ THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore");
+ int wait = THSemaphore_trywait(semaphore);
+ lua_pushinteger(L, wait);
+ return 1;
+}
+
+static void semaphore_free(lua_State *L)
+{
+ THSemaphore *semaphore = luaTHRD_checkudata(L, 1, "threads.Semaphore");
+ if(!semaphore)
+ luaL_error(L, "threads: semaphore free failed");
+ THSemaphore_free(semaphore);
+}
+
+static const struct luaL_Reg thread__ [] = {
+ {"new", thread_new},
+ {"__tostring", thread_tostring},
+ {"id", thread_id},
+ {"free", thread_free},
+ {NULL, NULL}
+};
+
+static const struct luaL_Reg mutex__ [] = {
+ {"new", mutex_new},
+ {"__tostring", mutex_tostring},
+ {"id", mutex_id},
+ {"lock", mutex_lock},
+ {"unlock", mutex_unlock},
+ {"free", mutex_free},
+ {NULL, NULL}
+};
+
+static const struct luaL_Reg condition__ [] = {
+ {"new", condition_new},
+ {"__tostring", condition_tostring},
+ {"id", condition_id},
+ {"signal", condition_signal},
+ {"wait", condition_wait},
+ {"free", condition_free},
+ {NULL, NULL}
+};
+
+static const struct luaL_Reg semaphore__ [] = {
+ {"new", semaphore_new},
+ {"newfromid", semaphore_newfromid},
+ {"id", semaphore_id},
+ {"signal", semaphore_signal},
+ {"wait", semaphore_wait},
+ {"trywait", semaphore_trywait},
+ {"free", semaphore_free},
+ {NULL, NULL}
+};
+
+static void thread_init_pkg(lua_State *L)
+{
+ if(!luaL_newmetatable(L, "threads.Thread"))
+ luaL_error(L, "threads: threads.Thread type already exists");
+ luaL_setfuncs(L, thread__, 0);
+ lua_pushstring(L, "__index");
+ lua_pushvalue(L, -2);
+ lua_rawset(L, -3);
+ lua_pop(L, 1);
+
+ if(!luaL_newmetatable(L, "threads.Mutex"))
+ luaL_error(L, "threads: threads.Mutex type already exists");
+ luaL_setfuncs(L, mutex__, 0);
+ lua_pushstring(L, "__index");
+ lua_pushvalue(L, -2);
+ lua_rawset(L, -3);
+ lua_pop(L, 1);
+
+ if(!luaL_newmetatable(L, "threads.Condition"))
+ luaL_error(L, "threads: threads.Condition type already exists");
+ luaL_setfuncs(L, condition__, 0);
+ lua_pushstring(L, "__index");
+ lua_pushvalue(L, -2);
+ lua_rawset(L, -3);
+ lua_pop(L, 1);
+
+ if(!luaL_newmetatable(L, "threads.Semaphore"))
+ luaL_error(L, "threads: threads.Semaphore type already exists");
+ luaL_setfuncs(L, semaphore__, 0);
+ lua_pushstring(L, "__index");
+ lua_pushvalue(L, -2);
+ lua_rawset(L, -3);
+ lua_pop(L, 1);
+
+ lua_pushstring(L, "Thread");
+ luaTHRD_pushctortable(L, thread_new, "threads.Thread");
+ lua_rawset(L, -3);
+
+ lua_pushstring(L, "Mutex");
+ luaTHRD_pushctortable(L, mutex_new, "threads.Mutex");
+ lua_rawset(L, -3);
+
+ lua_pushstring(L, "Condition");
+ luaTHRD_pushctortable(L, condition_new, "threads.Condition");
+ lua_rawset(L, -3);
+
+ lua_pushstring(L, "Semaphore");
+ luaTHRD_pushctortable(L, semaphore_new, "threads.Semaphore");
+ lua_rawset(L, -3);
+}
diff --git a/fastnn/threads/test/test-low-level.lua b/fastnn/threads/test/test-low-level.lua
new file mode 100644
index 0000000..aea31db
--- /dev/null
+++ b/fastnn/threads/test/test-low-level.lua
@@ -0,0 +1,39 @@
+local t = require 'libthreads'
+
+local m = t.Mutex()
+local c = t.Condition()
+print(string.format('| main thread mutex: 0x%x', m:id()))
+print(string.format('| main thread condition: 0x%x', c:id()))
+
+local code = string.format([[
+ local t = require 'libthreads'
+ local c = t.Condition(%d)
+ print('|| hello from thread')
+ print(string.format('|| thread condition: 0x%%x', c:id()))
+ print('|| doing some stuff in thread...')
+ local x = 0
+ for i=1,10000 do
+ for i=1,10000 do
+ x = x + math.sin(i)
+ end
+ x = x / 10000
+ end
+ print(string.format('|| ...ok (x=%%f)', x))
+ c:signal()
+]], c:id())
+
+print(code)
+
+local thread = t.Thread(code)
+
+
+print('| waiting for thread...')
+m:lock()
+c:wait(m)
+print('| thread finished!')
+
+thread:free()
+m:free()
+c:free()
+
+print('| done')
diff --git a/fastnn/threads/test/test-threads-async.lua b/fastnn/threads/test/test-threads-async.lua
new file mode 100644
index 0000000..68bcd35
--- /dev/null
+++ b/fastnn/threads/test/test-threads-async.lua
@@ -0,0 +1,66 @@
+local threads = require 'threads'
+
+local nthread = 4
+local njob = 100
+
+local pool = threads.Threads(
+ nthread,
+ function(threadid)
+ print('starting a new thread/state number ' .. threadid)
+ end
+)
+
+
+local jobid = 0
+local result -- DO NOT put this in get
+local function get()
+
+ -- fill up the queue as much as we can
+ -- this will not block
+ while jobid < njob and pool:acceptsjob() do
+ jobid = jobid + 1
+
+ pool:addjob(
+ function(jobid)
+ print(string.format('thread ID %d is performing job %d', __threadid, jobid))
+ return string.format("job output from job %d", jobid)
+ end,
+
+ function(jobres)
+ result = jobres
+ end,
+
+ jobid
+ )
+ end
+
+ -- is there still something to do?
+ if pool:hasjob() then
+ pool:dojob() -- yes? do it!
+ if pool:haserror() then -- check for errors
+ pool:synchronize() -- finish everything and throw error
+ end
+ return result
+ end
+
+end
+
+local jobdone = 0
+repeat
+ -- get something asynchronously
+ local res = get()
+
+ -- do something with res (if any)
+ if res then
+ print(res)
+ jobdone = jobdone + 1
+ end
+
+until not res -- until there is nothing remaining
+
+assert(jobid == 100)
+assert(jobdone == 100)
+
+print('PASSED')
+
+pool:terminate()
diff --git a/fastnn/threads/test/test-threads-multiple.lua b/fastnn/threads/test/test-threads-multiple.lua
new file mode 100644
index 0000000..4429696
--- /dev/null
+++ b/fastnn/threads/test/test-threads-multiple.lua
@@ -0,0 +1,15 @@
+local threads = require 'threads'
+
+for i=1,1000 do
+ io.write(string.format('%04d.', tonumber(i)))
+ io.flush()
+ local pool =
+ threads.Threads(
+ 4,
+ function(threadid)
+ require 'torch'
+ end
+ )
+end
+print()
+print('PASSED')
diff --git a/fastnn/threads/test/test-threads-shared.lua b/fastnn/threads/test/test-threads-shared.lua
new file mode 100644
index 0000000..3d63851
--- /dev/null
+++ b/fastnn/threads/test/test-threads-shared.lua
@@ -0,0 +1,111 @@
+require 'torch'
+
+local threads = require 'threads'
+local status, tds = pcall(require, 'tds')
+tds = status and tds or nil
+
+local nthread = 4
+local njob = 10
+local msg = "hello from a satellite thread"
+
+threads.Threads.serialization('threads.sharedserialize')
+
+local x = {}
+local xh = tds and tds.hash() or {}
+local xs = {}
+local z = tds and tds.hash() or {}
+local D = 10
+local K = tds and 100000 or 100 -- good luck in non-shared (30M)
+for i=1,njob do
+ x[i] = torch.ones(D)
+ xh[i] = torch.ones(D)
+ xs[i] = torch.FloatStorage(D):fill(1)
+ for j=1,K do
+ z[(i-1)*K+j] = "blah" .. i .. j
+ end
+end
+collectgarbage()
+collectgarbage()
+
+print('GO')
+
+local pool = threads.Threads(
+ nthread,
+ function(threadIdx)
+ pcall(require, 'tds')
+ print('starting a new thread/state number:', threadIdx)
+ gmsg = msg -- we copy here an upvalue of the main thread
+ end
+)
+
+local jobdone = 0
+for i=1,njob do
+ pool:addjob(
+ function()
+ assert(x[i]:sum() == D)
+ assert(xh[i]:sum() == D)
+ assert(torch.FloatTensor(xs[i]):sum() == D)
+ for j=1,K do
+ assert(z[(i-1)*K+j] == "blah" .. i .. j)
+ end
+ x[i]:add(1)
+ xh[i]:add(1)
+ torch.FloatTensor(xs[i]):add(1)
+ print(string.format('%s -- thread ID is %x', gmsg, __threadid))
+ collectgarbage()
+ collectgarbage()
+ return __threadid
+ end,
+
+ function(id)
+ print(string.format("task %d finished (ran on thread ID %x)", i, id))
+ jobdone = jobdone + 1
+ end
+ )
+end
+
+for i=1,njob do
+ pool:addjob(
+ function()
+ collectgarbage()
+ collectgarbage()
+ end
+ )
+end
+
+pool:synchronize()
+
+print(string.format('%d jobs done', jobdone))
+
+pool:terminate()
+
+-- did we do the job in shared mode?
+for i=1,njob do
+ assert(x[i]:sum() == 2*D)
+ assert(xh[i]:sum() == 2*D)
+ assert(torch.FloatTensor(xs[i]):sum() == 2*D)
+end
+
+-- serialize and zero x
+local str = torch.serialize(x)
+local strh = torch.serialize(xh)
+local strs = torch.serialize(xs)
+for i=1,njob do
+ x[i]:zero()
+ xh[i]:zero()
+ xs[i]:fill(0)
+end
+
+-- dude, check that unserialized x does not point on x
+local y = torch.deserialize(str)
+local yh = torch.deserialize(strh)
+local ys = torch.deserialize(strs)
+for i=1,njob do
+ assert(y[i]:sum() == 2*D)
+ assert(yh[i]:sum() == 2*D)
+ assert(torch.FloatTensor(ys[i]):sum() == 2*D)
+end
+
+pool:terminate()
+
+print('PASSED')
diff --git a/fastnn/threads/test/test-threads.lua b/fastnn/threads/test/test-threads.lua
new file mode 100644
index 0000000..e49a381
--- /dev/null
+++ b/fastnn/threads/test/test-threads.lua
@@ -0,0 +1,20 @@
+local clib = require 'libthreads'
+
+
+nthread = 1
+
+str='ad;alkfakd;af'
+code = [[ function func(str) print(str) end; print(str);]]
+print(code)
+
+--thread = clib.Thread(code)
+
+
+--thread:free()
+
+
+require 'threads'
+
+tt = threads.Thread(code)
+
+
diff --git a/fastnn/threads/threads-scm-1.rockspec b/fastnn/threads/threads-scm-1.rockspec
new file mode 100644
index 0000000..02535d4
--- /dev/null
+++ b/fastnn/threads/threads-scm-1.rockspec
@@ -0,0 +1,36 @@
+package = "threads"
+version = "scm-1"
+source = {
+ url = "https://github.com/uphantom/nerv-threads.git"
+}
+description = {
+ summary = "Thread control for Nerv fastnn",
+ detailed = [[
+ ]],
+ homepage = "https://github.com/uphantom/nerv-threads",
+ license = "BSD"
+}
+dependencies = {
+ "nerv >= scm-1",
+ "lua >= 5.1"
+}
+build = {
+ type = "make",
+ build_variables = {
+ CFLAGS="$(CFLAGS)",
+ LIBFLAG="$(LIBFLAG)",
+ LUA_LIBDIR="$(LUA_LIBDIR)",
+ LUA_BINDIR="$(LUA_BINDIR)",
+ LUA_INCDIR="$(LUA_INCDIR)",
+ INST_PREFIX="$(PREFIX)",
+ LUA="$(LUA)",
+ },
+ install_variables = {
+ LUA_BINDIR="$(LUA_BINDIR)",
+ INST_PREFIX="$(PREFIX)",
+ INST_BINDIR="$(BINDIR)",
+ INST_LIBDIR="$(LIBDIR)",
+ INST_LUADIR="$(LUADIR)",
+ INST_CONFDIR="$(CONFDIR)",
+ },
+}
diff --git a/fastnn/threads/threads.lua b/fastnn/threads/threads.lua
new file mode 100644
index 0000000..2db7d51
--- /dev/null
+++ b/fastnn/threads/threads.lua
@@ -0,0 +1,14 @@
+local threads = {}
+
+local C = require 'libthreads'
+
+threads.Thread = C.Thread
+threads.Mutex = C.Mutex
+threads.Condition = C.Condition
+threads.Semaphore = C.Semaphore
+--threads.Threads = require 'threads.threads'
+
+-- only for backward compatibility (boo)
+-- setmetatable(threads, getmetatable(threads.Threads))
+
+-- return threads