aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQi Liu <[email protected]>2016-03-14 16:10:59 +0800
committerQi Liu <[email protected]>2016-03-14 16:10:59 +0800
commit35496b6a648d98dc41d6226c1d43650aba58cdfc (patch)
tree72d3667cff52b32894608b9929508b257837cbf8
parent2660af7f6a9ac243a8ad38bf3375ef0fd292bf52 (diff)
add seq_buffer
-rw-r--r--nerv/examples/network_debug/config.lua2
-rw-r--r--nerv/examples/network_debug/main.lua19
-rw-r--r--nerv/examples/network_debug/network.lua114
-rw-r--r--nerv/examples/network_debug/reader.lua78
-rw-r--r--nerv/io/seq_buffer.lua105
5 files changed, 182 insertions, 136 deletions
diff --git a/nerv/examples/network_debug/config.lua b/nerv/examples/network_debug/config.lua
index e20d5a9..9025b78 100644
--- a/nerv/examples/network_debug/config.lua
+++ b/nerv/examples/network_debug/config.lua
@@ -12,7 +12,7 @@ function get_global_conf()
layer_num = 1,
chunk_size = 15,
batch_size = 20,
- max_iter = 35,
+ max_iter = 3,
param_random = function() return (math.random() / 5 - 0.1) end,
dropout_rate = 0.5,
timer = nerv.Timer(),
diff --git a/nerv/examples/network_debug/main.lua b/nerv/examples/network_debug/main.lua
index 790c404..1bee43c 100644
--- a/nerv/examples/network_debug/main.lua
+++ b/nerv/examples/network_debug/main.lua
@@ -6,29 +6,20 @@ nerv.include(arg[1])
local global_conf = get_global_conf()
local timer = global_conf.timer
-timer:tic('IO')
-
local data_path = 'examples/lmptb/PTBdata/'
-local train_reader = nerv.Reader(data_path .. 'vocab', data_path .. 'ptb.train.txt.adds')
-local val_reader = nerv.Reader(data_path .. 'vocab', data_path .. 'ptb.valid.txt.adds')
-
-local train_data = train_reader:get_all_batch(global_conf)
-local val_data = val_reader:get_all_batch(global_conf)
local layers = get_layers(global_conf)
local connections = get_connections(global_conf)
-local NN = nerv.NN(global_conf, train_data, val_data, layers, connections)
-
-timer:toc('IO')
-timer:check('IO')
-io.flush()
+local NN = nerv.NN(global_conf, layers, connections)
timer:tic('global')
local best_cv = 1e10
for i = 1, global_conf.max_iter do
timer:tic('Epoch' .. i)
- local train_ppl, val_ppl = NN:epoch()
+ local train_reader = nerv.Reader(data_path .. 'vocab', data_path .. 'ptb.train.txt.adds')
+ local val_reader = nerv.Reader(data_path .. 'vocab', data_path .. 'ptb.valid.txt.adds')
+ local train_ppl, val_ppl = NN:epoch(train_reader, val_reader)
if val_ppl < best_cv then
best_cv = val_ppl
else
@@ -43,3 +34,5 @@ timer:toc('global')
timer:check('global')
timer:check('network')
timer:check('gc')
+timer:check('IO')
+global_conf.cumat_type.print_profile()
diff --git a/nerv/examples/network_debug/network.lua b/nerv/examples/network_debug/network.lua
index 5518e27..1841d21 100644
--- a/nerv/examples/network_debug/network.lua
+++ b/nerv/examples/network_debug/network.lua
@@ -2,11 +2,17 @@ nerv.include('select_linear.lua')
local nn = nerv.class('nerv.NN')
-function nn:__init(global_conf, train_data, val_data, layers, connections)
+function nn:__init(global_conf, layers, connections)
self.gconf = global_conf
self.network = self:get_network(layers, connections)
- self.train_data = self:get_data(train_data)
- self.val_data = self:get_data(val_data)
+
+ self.output = {}
+ self.err_output = {}
+ for i = 1, self.gconf.chunk_size do
+ self.output[i] = {self.gconf.cumat_type(self.gconf.batch_size, 1)}
+ self.err_output[i] = {self.gconf.cumat_type(self.gconf.batch_size, 1)}
+ self.err_output[i][2] = self.gconf.cumat_type(self.gconf.batch_size, 1)
+ end
end
function nn:get_network(layers, connections)
@@ -20,79 +26,67 @@ function nn:get_network(layers, connections)
return network
end
-function nn:get_data(data)
- local err_output = {}
- local softmax_output = {}
- local output = {}
- for i = 1, self.gconf.chunk_size do
- err_output[i] = self.gconf.cumat_type(self.gconf.batch_size, 1)
- softmax_output[i] = self.gconf.cumat_type(self.gconf.batch_size, self.gconf.vocab_size)
- output[i] = self.gconf.cumat_type(self.gconf.batch_size, 1)
- end
- local ret = {}
- for i = 1, #data do
- ret[i] = {}
- ret[i].input = {}
- ret[i].output = {}
- ret[i].err_input = {}
- ret[i].err_output = {}
- for t = 1, self.gconf.chunk_size do
- ret[i].input[t] = {}
- ret[i].output[t] = {}
- ret[i].err_input[t] = {}
- ret[i].err_output[t] = {}
- ret[i].input[t][1] = data[i].input[t]
- ret[i].input[t][2] = data[i].output[t]
- ret[i].output[t][1] = output[t]
- local err_input = self.gconf.mmat_type(self.gconf.batch_size, 1)
- for j = 1, self.gconf.batch_size do
- if t <= data[i].seq_len[j] then
- err_input[j - 1][0] = 1
- else
- err_input[j - 1][0] = 0
+function nn:process(data, do_train, reader)
+ local timer = self.gconf.timer
+ local buffer = nerv.SeqBuffer(self.gconf, {
+ batch_size = self.gconf.batch_size, chunk_size = self.gconf.chunk_size,
+ readers = {reader},
+ })
+ local total_err = 0
+ local total_frame = 0
+ self.network:epoch_init()
+ while true do
+ timer:tic('IO')
+ data = buffer:get_data()
+ if data == nil then
+ break
+ end
+ local err_input = {}
+ if do_train then
+ for t = 1, self.gconf.chunk_size do
+ local tmp = self.gconf.mmat_type(self.gconf.batch_size, 1)
+ for i = 1, self.gconf.batch_size do
+ if t <= data.seq_length[i] then
+ tmp[i - 1][0] = 1
+ else
+ tmp[i - 1][0] = 0
+ end
end
+ err_input[t] = {self.gconf.cumat_type.new_from_host(tmp)}
end
- ret[i].err_input[t][1] = self.gconf.cumat_type.new_from_host(err_input)
- ret[i].err_output[t][1] = err_output[t]
- ret[i].err_output[t][2] = softmax_output[t]
end
- ret[i].seq_length = data[i].seq_len
- ret[i].new_seq = {}
- for j = 1, self.gconf.batch_size do
- if data[i].seq_start[j] then
- table.insert(ret[i].new_seq, j)
- end
+ local info = {input = {}, output = self.output, err_input = err_input, do_train = do_train,
+ err_output = self.err_output, seq_length = data.seq_length, new_seq = data.new_seq}
+ for t = 1, self.gconf.chunk_size do
+ info.input[t] = {data.data['input'][t]}
+ info.input[t][2] = data.data['label'][t]
end
- end
- return ret
-end
+ timer:toc('IO')
-function nn:process(data, do_train)
- local timer = self.gconf.timer
- local total_err = 0
- local total_frame = 0
- self.network:epoch_init()
- for id = 1, #data do
- data[id].do_train = do_train
timer:tic('network')
- self.network:mini_batch_init(data[id])
+ self.network:mini_batch_init(info)
self.network:propagate()
timer:toc('network')
+
+ timer:tic('IO')
for t = 1, self.gconf.chunk_size do
- local tmp = data[id].output[t][1]:new_to_host()
+ local tmp = info.output[t][1]:new_to_host()
for i = 1, self.gconf.batch_size do
- if t <= data[id].seq_length[i] then
+ if t <= info.seq_length[i] then
total_err = total_err + math.log10(math.exp(tmp[i - 1][0]))
total_frame = total_frame + 1
end
end
end
+ timer:toc('IO')
+
+ timer:tic('network')
if do_train then
- timer:tic('network')
self.network:back_propagate()
self.network:update()
- timer:toc('network')
end
+ timer:toc('network')
+
timer:tic('gc')
collectgarbage('collect')
timer:toc('gc')
@@ -100,11 +94,11 @@ function nn:process(data, do_train)
return math.pow(10, - total_err / total_frame)
end
-function nn:epoch()
- local train_error = self:process(self.train_data, true)
+function nn:epoch(train_reader, val_reader)
+ local train_error = self:process(self.train_data, true, train_reader)
local tmp = self.gconf.dropout_rate
self.gconf.dropout_rate = 0
- local val_error = self:process(self.val_data, false)
+ local val_error = self:process(self.val_data, false, val_reader)
self.gconf.dropout_rate = tmp
return train_error, val_error
end
diff --git a/nerv/examples/network_debug/reader.lua b/nerv/examples/network_debug/reader.lua
index b10baaf..70c0c97 100644
--- a/nerv/examples/network_debug/reader.lua
+++ b/nerv/examples/network_debug/reader.lua
@@ -3,6 +3,7 @@ local Reader = nerv.class('nerv.Reader')
function Reader:__init(vocab_file, input_file)
self:get_vocab(vocab_file)
self:get_seq(input_file)
+ self.offset = 1
end
function Reader:get_vocab(vocab_file)
@@ -31,7 +32,8 @@ end
function Reader:get_seq(input_file)
local f = io.open(input_file, 'r')
self.seq = {}
- while true do
+ -- while true do
+ for i = 1, 26 do
local seq = f:read()
if seq == nil then
break
@@ -47,67 +49,19 @@ function Reader:get_seq(input_file)
end
end
-function Reader:get_in_out(id, pos)
- return self.seq[id][pos], self.seq[id][pos + 1], pos + 1 == #self.seq[id]
-end
-
-function Reader:get_all_batch(global_conf)
- local data = {}
- local pos = {}
- local offset = 1
- for i = 1, global_conf.batch_size do
- pos[i] = nil
+function Reader:get_data()
+ if self.offset > #self.seq then
+ return nil
end
- while true do
- -- for i = 1, 26 do
- local input = {}
- local output = {}
- for i = 1, global_conf.chunk_size do
- input[i] = global_conf.mmat_type(global_conf.batch_size, 1)
- input[i]:fill(global_conf.nn_act_default)
- output[i] = global_conf.mmat_type(global_conf.batch_size, 1)
- output[i]:fill(global_conf.nn_act_default)
- end
- local seq_start = {}
- local seq_end = {}
- local seq_len = {}
- for i = 1, global_conf.batch_size do
- seq_start[i] = false
- seq_end[i] = false
- seq_len[i] = 0
- end
- local has_new = false
- for i = 1, global_conf.batch_size do
- if pos[i] == nil then
- if offset < #self.seq then
- seq_start[i] = true
- pos[i] = {offset, 1}
- offset = offset + 1
- end
- end
- if pos[i] ~= nil then
- has_new = true
- for j = 1, global_conf.chunk_size do
- local final
- input[j][i-1][0], output[j][i-1][0], final = self:get_in_out(pos[i][1], pos[i][2])
- seq_len[i] = j
- if final then
- seq_end[i] = true
- pos[i] = nil
- break
- end
- pos[i][2] = pos[i][2] + 1
- end
- end
- end
- if not has_new then
- break
- end
- for i = 1, global_conf.chunk_size do
- input[i] = global_conf.cumat_type.new_from_host(input[i])
- output[i] = global_conf.cumat_type.new_from_host(output[i])
- end
- table.insert(data, {input = input, output = output, seq_start = seq_start, seq_end = seq_end, seq_len = seq_len})
+ local tmp = self.seq[self.offset]
+ local res = {
+ input = nerv.MMatrixFloat(#tmp - 1, 1),
+ label = nerv.MMatrixFloat(#tmp - 1, 1),
+ }
+ for i = 1, #tmp - 1 do
+ res.input[i - 1][0] = tmp[i]
+ res.label[i - 1][0] = tmp[i + 1]
end
- return data
+ self.offset = self.offset + 1
+ return res
end
diff --git a/nerv/io/seq_buffer.lua b/nerv/io/seq_buffer.lua
index e69de29..ad1b3f7 100644
--- a/nerv/io/seq_buffer.lua
+++ b/nerv/io/seq_buffer.lua
@@ -0,0 +1,105 @@
+local SeqBuffer = nerv.class('nerv.SeqBuffer', 'nerv.DataBuffer')
+
+function SeqBuffer:__init(global_conf, buffer_conf)
+ self.gconf = global_conf
+
+ self.batch_size = buffer_conf.batch_size
+ self.chunk_size = buffer_conf.chunk_size
+ self.readers = buffer_conf.readers
+ self.nn_act_default = buffer_conf.nn_act_default
+ if self.nn_act_default == nil then
+ self.nn_act_default = 0
+ end
+
+ self.mat_type = self.gconf.mmat_type
+ self.queue = {}
+ self.head = 1
+ self.tail = 0
+end
+
+function SeqBuffer:new_mini_batch()
+ local res = {}
+ res.data = {}
+ res.new_seq = {}
+ res.seq_length = {}
+ for i = 1, self.batch_size do
+ res.seq_length[i] = 0
+ end
+ return res
+end
+
+function SeqBuffer:saturate(batch)
+ if self.queue[self.head] ~= nil and self.queue[self.head].seq_length[batch] ~= 0 then
+ return true
+ end
+ local data = {}
+ local drow = nil
+ for i = 1, #self.readers do
+ local tmp = self.readers[i]:get_data()
+ if tmp == nil then
+ return false
+ end
+ for id, d in pairs(tmp) do
+ if drow == nil then
+ drow = d:nrow()
+ elseif d:nrow() ~= drow then
+ nerv.error('readers provides with inconsistent rows of data')
+ end
+ data[id] = d
+ end
+ end
+ local offset = 0
+ local head = self.head
+ while offset < drow do
+ local last = math.min(offset + self.chunk_size, drow)
+ if head > self.tail then
+ self.tail = self.tail + 1
+ self.queue[self.tail] = self:new_mini_batch()
+ end
+ self.queue[head].seq_length[batch] = last - offset
+ if offset == 0 then
+ table.insert(self.queue[head].new_seq, batch)
+ end
+ local mini_batch = self.queue[head].data
+ for id, d in pairs(data) do
+ if mini_batch[id] == nil then
+ mini_batch[id] = {}
+ end
+ local tmp = mini_batch[id]
+ for i = offset + 1, last do
+ local chunk = i - offset
+ if tmp[chunk] == nil then
+ tmp[chunk] = self.mat_type(self.batch_size, d:ncol())
+ tmp[chunk]:fill(self.nn_act_default)
+ end
+ tmp[chunk]:copy_from(d, i - 1, i, batch - 1)
+ end
+ end
+ head = head + 1
+ offset = last
+ end
+ return true
+end
+
+function SeqBuffer:get_data()
+ local has_data = false
+ for i = 1, self.batch_size do
+ if self:saturate(i) then
+ has_data = true
+ end
+ end
+ if not has_data then
+ return nil
+ end
+ local res = self.queue[self.head]
+ self.queue[self.head] = nil
+ self.head = self.head + 1
+ if not self.gconf.use_cpu then
+ for id, d in pairs(res.data) do
+ for i = 1, #d do
+ d[i] = self.gconf.cumat_type.new_from_host(d[i])
+ end
+ end
+ end
+ return res
+end