require 'fastnn'
require 'libhtkio'
require 'threads'
dofile("fastnn/fastnn_baseline.lua")
env = string.format([[
package.path="/home/slhome/wd007/.luarocks/share/lua/5.1/?.lua;/home/slhome/wd007/.luarocks/share/lua/5.1/?/init.lua;/sgfs/users/wd007/src/nerv/install/share/lua/5.1/?.lua;/sgfs/users/wd007/src/nerv/install/share/lua/5.1/?/init.lua;"..package.path;
package.cpath="/home/slhome/wd007/.luarocks/lib/lua/5.1/?.so;/sgfs/users/wd007/src/nerv/install/lib/lua/5.1/?.so;"..package.cpath
local k,l,_=pcall(require,"luarocks.loader") _=k and l.add_context("nerv","scm-1")
]])
local data_thread_code = [[
%s
require 'nerv'
require 'fastnn'
dofile("fastnn/fastnn_baseline.lua")
os.execute("export MALLOC_CHECK_=0")
local thread_idx = %d
local example_repo_shareid = %d
local data_mutex_shareid = %d
local feat_repo_shareid = %d
local gpu_shareid = %d
local batch_size = %d
local bp = %d
local scp_file = '%s'
local share_mutex = threads.Mutex(data_mutex_shareid)
local share_example_repo = fastnn.CExamplesRepo(example_repo_shareid, true)
local share_gpu = fastnn.CDevice(gpu_shareid)
--print(thread_idx)
--print(share_mutex)
--print(share_gpu)
--print(share_example_repo)
if bp == 0 then
bp = false
else
bp = true
end
gconf.randomize = bp
--print(gconf.randomize)
share_mutex:lock()
local gpuid = share_example_repo:get_gpuid()
if gpuid < 0 then
gpuid = share_gpu:select_gpu()
share_example_repo:set_gpuid(gpuid)
else
share_gpu:select_gpu(gpuid)
end
nerv.info_stderr("thread %%d loading transf ...", thread_idx)
local param_transf_repo = nerv.ParamRepo()
param_transf_repo:import(gconf.transf, nil, gconf)
local transf_node_repo = make_transf_node_repo(param_transf_repo)
local transf_layer_repo = make_transf_link_repo(transf_node_repo, param_transf_repo)
local transf = transf_layer_repo:get_layer("global_transf")
share_mutex:unlock()
local feat_id = get_feat_id()
local buffer = make_buffer(make_readers(scp_file, transf_layer_repo, feat_repo_shareid, data_mutex_shareid))
local t = 1;
for data in buffer.get_data, buffer do
local example = fastnn.Example:PrepareData(data, nil, feat_id)
--print(string.format("Accept NO.%%d %%s", t, example)); t = t+1;
share_example_repo:accept(example)
--print("share_example_repo:accept")
-- collect garbage in-time to save GPU memory
collectgarbage("collect")
end
share_example_repo:done()
-- print("share_example_repo:done")
]]
train_thread_code = [[
%s
require 'nerv'
require 'fastnn'
dofile("fastnn/fastnn_baseline.lua")
os.execute("export MALLOC_CHECK_=0")
local thread_idx = %d
local example_repo_shareid = %d
local data_mutex_shareid = %d
local master_shareid = %d
local gpu_shareid = %d
local xent_shareid = %d
local batch_size = %d
local lrate = %f
local bp = %d
local nnet_in = '%s'
local nnet_out = '%s'
local share_example_repo = fastnn.CExamplesRepo(example_repo_shareid, true)
local share_mutex = threads.Mutex(data_mutex_shareid)
local share_master = fastnn.ModelSync(master_shareid)
local share_gpu = fastnn.CDevice(gpu_shareid)
local share_xent = fastnn.CXent(xent_shareid)
if bp == 0 then
bp = false
else
bp = true
end
gconf.randomize = bp
gconf.lrate = lrate
gconf.batch_size = batch_size
gconf.network[1] = nnet_in
nerv.info_stderr("input network: %%s", gconf.network[1])
nerv.info_stderr(gconf.randomize)
nerv.info_stderr("input batch_size: %%d", gconf.batch_size)
nerv.info_stderr("input lrate: %%f", gconf.lrate)
share_mutex:lock()
local gpuid = share_example_repo:get_gpuid()
if gpuid < 0 then
gpuid = share_gpu:select_gpu()
share_example_repo:set_gpuid(gpuid)
else
share_gpu:select_gpu(gpuid)
end
nerv.context = nerv.CCuContext()
--print(nerv.context)
nerv.info_stderr("thread %%d loading network ...", thread_idx)
local param_network_repo = nerv.ParamRepo()
param_network_repo:import(gconf.network, nil, gconf)
local network_node_repo = make_network_node_repo(param_network_repo)
local network_layer_repo = make_network_link_repo(network_node_repo, param_network_repo)
local network = get_network(network_layer_repo)
share_mutex:unlock()
local input_order = get_input_order()
-- initialize the network
network:init(gconf.batch_size)
gconf.cnt = 0
err_input = {nerv.CuMatrixFloat(gconf.batch_size, 1)}
err_input[1]:fill(1)
share_master:Initialize(network)
share_master:SyncInc()
for example in share_example_repo.provide, share_example_repo do
gconf.cnt = gconf.cnt + 1
if gconf.cnt == 2000 then
print_stat(network_node_repo)
gconf.cnt = 0
end
local input = {}
local n = example:size()
for i = 0, n-1 do
table.insert(input, example:at(i))
end
local output = {nerv.CuMatrixFloat(gconf.batch_size, 1)}
err_output = {input[1]:create()}
network:propagate(input, output)
if bp then
network:back_propagate(err_input, err_output, input, output)
network:gradient(err_input, input, output)
share_master:LockModel()
share_master:WeightToD(network)
network:update_gradient()
share_master:WeightFromD(network)
share_master:UnLockModel()
end
-- collect garbage in-time to save GPU memory
collectgarbage("collect")
end
--print_stat(network_node_repo)
local ce_crit = network_node_repo:get_layer("ce_crit")
local xent = fastnn.CXent(ce_crit.total_frames, ce_crit.total_correct, ce_crit.total_ce, ce_crit.total_ce)
share_master:LockModel()
share_xent:add(xent)
share_master:SyncDec()
--print(string.format("ThreadCount: %%d", share_master:ThreadCount()))
if share_master:ThreadCount() == 0 and bp then
share_master:WeightToD(network)
local fname = string.format("%%s_tr%%.3f",
nnet_out, frame_acc(share_xent))
nerv.info_stderr("writing back %%s ...", fname)
network:get_params():export(fname, nil)
end
share_master:UnLockModel()
]]
function get_data_thread(data_thread_code, env, thread_idx, example_repo_shareid,
data_mutex_shareid, feat_repo_shareid, gpu_shareid,
batch_size, bp, scp_file)
return string.format(data_thread_code, env, thread_idx, example_repo_shareid,
data_mutex_shareid, feat_repo_shareid, gpu_shareid,
batch_size, bp, scp_file)
end
function get_train_thread(train_thread_code, env, thread_idx, example_repo_shareid,
data_mutex_shareid, master_shareid, gpu_shareid, xent_shareid,
batch_size, lrate, bp, nnet_in, nnet_out)
return string.format(train_thread_code, env, thread_idx, example_repo_shareid,
data_mutex_shareid, master_shareid, gpu_shareid, xent_shareid,
batch_size, lrate, bp, nnet_in, nnet_out)
end
function trainer(batch_size, lrate, bp, scp_file, nnet_in, nnet_out, num_threads)
local train_threads={}
local trainer = {}
local data_threads = {}
local data = {}
local num_threads=num_threads
local data_mutex = threads.Mutex()
local data_mutex_shareid = data_mutex:id()
local master = fastnn.CModelSync()
local master_shareid = master:id()
--print(master)
local xent = fastnn.CXent()
local xent_shareid = xent:id()
--print(xent)
local gpu = fastnn.CDevice()
local gpu_shareid = gpu:id()
--print(gpu_shareid)
gpu:init()
local example_repo = {}
local example_repo_shareid = {}
local feat_repo = nerv.TNetFeatureRepo(scp_file, gconf.htk_conf, gconf.frm_ext)
local feat_repo_shareid = feat_repo:id()
for i=1,num_threads,1 do
example_repo[i] = fastnn.CExamplesRepo(128, false)
example_repo_shareid[i] = example_repo[i]:id()
data_threads[i] = get_data_thread(data_thread_code, env, i, example_repo_shareid[i],
data_mutex_shareid, feat_repo_shareid, gpu_shareid,
batch_size, bp, scp_file)
train_threads[i] = get_train_thread(train_thread_code, env, i, example_repo_shareid[i],
data_mutex_shareid, master_shareid, gpu_shareid, xent_shareid,
batch_size, lrate, bp, nnet_in, nnet_out)
--print(train_threads[i])
data[i] = threads.Thread(data_threads[i])
trainer[i] = threads.Thread(train_threads[i])
end
nerv.info_stderr('| waiting for thread...')
for i=1,num_threads,1 do
data[i]:free()
trainer[i]:free()
end
print_xent(xent)
nerv.info_stderr('| all thread finished!')
return frame_acc(xent)
end
function get_filename(fname)
return string.gsub((string.gsub(fname, "(.*/)(.*)", "%2")),"(.*)%..*", "%1")
end
function do_sds(tr_scp, sds_scp, sds_rate)
math.randomseed(os.time())
local scp_file = io.open(tr_scp, "r")
local sds_file = io.open(sds_scp, "w")
for line in scp_file:lines() do
rate = math.random()
if (rate < sds_rate) then
sds_file:write(line.."\n")
end
end
scp_file:close()
sds_file:close()
end
function print_tag(iter)
io.stderr:write(string.format("########################################################\n"))
io.stderr:write(string.format("# NN TRAINING ITERATION:%d, %s\n", iter, os.date()))
io.stderr:write(string.format("########################################################\n"))
end
start_halving_inc = 0.5
halving_factor = 0.8
end_halving_inc = 0.1
min_iter = 1
max_iter = 20
min_halving = 0
gconf.batch_size = 256
pf0 = get_filename(gconf.network[1])
nnet_in = gconf.network[1]
nnet_out = ""
sds_scp = "tr_sds_"..string.format("%.4d", math.random()*10000)..".scp" --"tr_sds.scp"
sds_factor = 0.4
num_threads = 1
global_option = nil
print_gconf()
os.execute("export MALLOC_CHECK_=0")
-- training begin
nerv.info_stderr("begin initial cross validation")
local accu_best = trainer(gconf.batch_size, gconf.lrate, 0,
gconf.cv_scp, nnet_in, nil, num_threads)
local do_halving = false
local accu_new = accu_best
nerv.info_stderr("initial cross validation: %.3f\n", accu_best)
for i = 1, max_iter do
if accu_new >= accu_best then
local sds_rate = math.cos((i-1)*11.0/180*math.pi)
if (sds_rate <= sds_factor) then
sds_rate = sds_factor
end
nerv.info_stderr("iteration %d sds_rate: %.6f", i, sds_rate)
do_sds(gconf.tr_scp, sds_scp, sds_rate)
end
nnet_out=pf0.."_iter"..i
--print(nnet_out)
print_tag(i)
nerv.info_stderr("[NN] begin iteration %d learning_rate: %.3f batch_size: %d.", i, gconf.lrate, gconf.batch_size)
local accu_tr = trainer(gconf.batch_size, gconf.lrate, 1,
sds_scp, nnet_in, nnet_out, num_threads)
nerv.info_stderr("[TR] end iteration %d frame_accuracy: %.3f.\n", i, accu_tr)
os.execute("sleep " .. 3)
nnet_out = nnet_out.."_tr"..accu_tr
accu_new = trainer(gconf.batch_size, gconf.lrate, 0,
gconf.cv_scp, nnet_out, nil, num_threads)
nerv.info_stderr("[CV] end iteration %d frame_accuracy: %.3f.\n\n", i, accu_new)
os.execute("sleep " .. 3)
local nnet_tmp = string.format("%s_%s_iter_%d_lr%f_tr%.3f_cv%.3f",
pf0,
os.date("%Y%m%d%H%M%S"),
i, gconf.lrate, accu_tr, accu_new)
-- TODO: revert the weights
local accu_diff = accu_new - accu_best
local cmd
if accu_new > accu_best then
accu_best = accu_new
nnet_in = nnet_tmp
gconf.batch_size = gconf.batch_size + 128
if gconf.batch_size > 1024 then
gconf.batch_size = 1024
end
else
-- reject
nnet_tmp = nnet_tmp.."_rejected"
do_halving = true
end
cmd = "mv "..nnet_out.." "..nnet_tmp
os.execute(cmd)
if do_halving and accu_diff < end_halving_inc and i > min_iter then
break;
end
if accu_diff < start_halving_inc and i >= min_halving then
do_halving = true
end
if do_halving then
gconf.lrate = gconf.lrate * halving_factor
halving_factor = halving_factor - 0.025
if halving_factor < 0.6 then
halving_factor = 0.6
end
end
nerv.info_stderr("iteration %d done!", i)
end