From 35496b6a648d98dc41d6226c1d43650aba58cdfc Mon Sep 17 00:00:00 2001 From: Qi Liu Date: Mon, 14 Mar 2016 16:10:59 +0800 Subject: add seq_buffer --- nerv/examples/network_debug/config.lua | 2 +- nerv/examples/network_debug/main.lua | 19 ++---- nerv/examples/network_debug/network.lua | 114 +++++++++++++++----------------- nerv/examples/network_debug/reader.lua | 78 +++++----------------- nerv/io/seq_buffer.lua | 105 +++++++++++++++++++++++++++++ 5 files changed, 182 insertions(+), 136 deletions(-) diff --git a/nerv/examples/network_debug/config.lua b/nerv/examples/network_debug/config.lua index e20d5a9..9025b78 100644 --- a/nerv/examples/network_debug/config.lua +++ b/nerv/examples/network_debug/config.lua @@ -12,7 +12,7 @@ function get_global_conf() layer_num = 1, chunk_size = 15, batch_size = 20, - max_iter = 35, + max_iter = 3, param_random = function() return (math.random() / 5 - 0.1) end, dropout_rate = 0.5, timer = nerv.Timer(), diff --git a/nerv/examples/network_debug/main.lua b/nerv/examples/network_debug/main.lua index 790c404..1bee43c 100644 --- a/nerv/examples/network_debug/main.lua +++ b/nerv/examples/network_debug/main.lua @@ -6,29 +6,20 @@ nerv.include(arg[1]) local global_conf = get_global_conf() local timer = global_conf.timer -timer:tic('IO') - local data_path = 'examples/lmptb/PTBdata/' -local train_reader = nerv.Reader(data_path .. 'vocab', data_path .. 'ptb.train.txt.adds') -local val_reader = nerv.Reader(data_path .. 'vocab', data_path .. 'ptb.valid.txt.adds') - -local train_data = train_reader:get_all_batch(global_conf) -local val_data = val_reader:get_all_batch(global_conf) local layers = get_layers(global_conf) local connections = get_connections(global_conf) -local NN = nerv.NN(global_conf, train_data, val_data, layers, connections) - -timer:toc('IO') -timer:check('IO') -io.flush() +local NN = nerv.NN(global_conf, layers, connections) timer:tic('global') local best_cv = 1e10 for i = 1, global_conf.max_iter do timer:tic('Epoch' .. i) - local train_ppl, val_ppl = NN:epoch() + local train_reader = nerv.Reader(data_path .. 'vocab', data_path .. 'ptb.train.txt.adds') + local val_reader = nerv.Reader(data_path .. 'vocab', data_path .. 'ptb.valid.txt.adds') + local train_ppl, val_ppl = NN:epoch(train_reader, val_reader) if val_ppl < best_cv then best_cv = val_ppl else @@ -43,3 +34,5 @@ timer:toc('global') timer:check('global') timer:check('network') timer:check('gc') +timer:check('IO') +global_conf.cumat_type.print_profile() diff --git a/nerv/examples/network_debug/network.lua b/nerv/examples/network_debug/network.lua index 5518e27..1841d21 100644 --- a/nerv/examples/network_debug/network.lua +++ b/nerv/examples/network_debug/network.lua @@ -2,11 +2,17 @@ nerv.include('select_linear.lua') local nn = nerv.class('nerv.NN') -function nn:__init(global_conf, train_data, val_data, layers, connections) +function nn:__init(global_conf, layers, connections) self.gconf = global_conf self.network = self:get_network(layers, connections) - self.train_data = self:get_data(train_data) - self.val_data = self:get_data(val_data) + + self.output = {} + self.err_output = {} + for i = 1, self.gconf.chunk_size do + self.output[i] = {self.gconf.cumat_type(self.gconf.batch_size, 1)} + self.err_output[i] = {self.gconf.cumat_type(self.gconf.batch_size, 1)} + self.err_output[i][2] = self.gconf.cumat_type(self.gconf.batch_size, 1) + end end function nn:get_network(layers, connections) @@ -20,79 +26,67 @@ function nn:get_network(layers, connections) return network end -function nn:get_data(data) - local err_output = {} - local softmax_output = {} - local output = {} - for i = 1, self.gconf.chunk_size do - err_output[i] = self.gconf.cumat_type(self.gconf.batch_size, 1) - softmax_output[i] = self.gconf.cumat_type(self.gconf.batch_size, self.gconf.vocab_size) - output[i] = self.gconf.cumat_type(self.gconf.batch_size, 1) - end - local ret = {} - for i = 1, #data do - ret[i] = {} - ret[i].input = {} - ret[i].output = {} - ret[i].err_input = {} - ret[i].err_output = {} - for t = 1, self.gconf.chunk_size do - ret[i].input[t] = {} - ret[i].output[t] = {} - ret[i].err_input[t] = {} - ret[i].err_output[t] = {} - ret[i].input[t][1] = data[i].input[t] - ret[i].input[t][2] = data[i].output[t] - ret[i].output[t][1] = output[t] - local err_input = self.gconf.mmat_type(self.gconf.batch_size, 1) - for j = 1, self.gconf.batch_size do - if t <= data[i].seq_len[j] then - err_input[j - 1][0] = 1 - else - err_input[j - 1][0] = 0 +function nn:process(data, do_train, reader) + local timer = self.gconf.timer + local buffer = nerv.SeqBuffer(self.gconf, { + batch_size = self.gconf.batch_size, chunk_size = self.gconf.chunk_size, + readers = {reader}, + }) + local total_err = 0 + local total_frame = 0 + self.network:epoch_init() + while true do + timer:tic('IO') + data = buffer:get_data() + if data == nil then + break + end + local err_input = {} + if do_train then + for t = 1, self.gconf.chunk_size do + local tmp = self.gconf.mmat_type(self.gconf.batch_size, 1) + for i = 1, self.gconf.batch_size do + if t <= data.seq_length[i] then + tmp[i - 1][0] = 1 + else + tmp[i - 1][0] = 0 + end end + err_input[t] = {self.gconf.cumat_type.new_from_host(tmp)} end - ret[i].err_input[t][1] = self.gconf.cumat_type.new_from_host(err_input) - ret[i].err_output[t][1] = err_output[t] - ret[i].err_output[t][2] = softmax_output[t] end - ret[i].seq_length = data[i].seq_len - ret[i].new_seq = {} - for j = 1, self.gconf.batch_size do - if data[i].seq_start[j] then - table.insert(ret[i].new_seq, j) - end + local info = {input = {}, output = self.output, err_input = err_input, do_train = do_train, + err_output = self.err_output, seq_length = data.seq_length, new_seq = data.new_seq} + for t = 1, self.gconf.chunk_size do + info.input[t] = {data.data['input'][t]} + info.input[t][2] = data.data['label'][t] end - end - return ret -end + timer:toc('IO') -function nn:process(data, do_train) - local timer = self.gconf.timer - local total_err = 0 - local total_frame = 0 - self.network:epoch_init() - for id = 1, #data do - data[id].do_train = do_train timer:tic('network') - self.network:mini_batch_init(data[id]) + self.network:mini_batch_init(info) self.network:propagate() timer:toc('network') + + timer:tic('IO') for t = 1, self.gconf.chunk_size do - local tmp = data[id].output[t][1]:new_to_host() + local tmp = info.output[t][1]:new_to_host() for i = 1, self.gconf.batch_size do - if t <= data[id].seq_length[i] then + if t <= info.seq_length[i] then total_err = total_err + math.log10(math.exp(tmp[i - 1][0])) total_frame = total_frame + 1 end end end + timer:toc('IO') + + timer:tic('network') if do_train then - timer:tic('network') self.network:back_propagate() self.network:update() - timer:toc('network') end + timer:toc('network') + timer:tic('gc') collectgarbage('collect') timer:toc('gc') @@ -100,11 +94,11 @@ function nn:process(data, do_train) return math.pow(10, - total_err / total_frame) end -function nn:epoch() - local train_error = self:process(self.train_data, true) +function nn:epoch(train_reader, val_reader) + local train_error = self:process(self.train_data, true, train_reader) local tmp = self.gconf.dropout_rate self.gconf.dropout_rate = 0 - local val_error = self:process(self.val_data, false) + local val_error = self:process(self.val_data, false, val_reader) self.gconf.dropout_rate = tmp return train_error, val_error end diff --git a/nerv/examples/network_debug/reader.lua b/nerv/examples/network_debug/reader.lua index b10baaf..70c0c97 100644 --- a/nerv/examples/network_debug/reader.lua +++ b/nerv/examples/network_debug/reader.lua @@ -3,6 +3,7 @@ local Reader = nerv.class('nerv.Reader') function Reader:__init(vocab_file, input_file) self:get_vocab(vocab_file) self:get_seq(input_file) + self.offset = 1 end function Reader:get_vocab(vocab_file) @@ -31,7 +32,8 @@ end function Reader:get_seq(input_file) local f = io.open(input_file, 'r') self.seq = {} - while true do + -- while true do + for i = 1, 26 do local seq = f:read() if seq == nil then break @@ -47,67 +49,19 @@ function Reader:get_seq(input_file) end end -function Reader:get_in_out(id, pos) - return self.seq[id][pos], self.seq[id][pos + 1], pos + 1 == #self.seq[id] -end - -function Reader:get_all_batch(global_conf) - local data = {} - local pos = {} - local offset = 1 - for i = 1, global_conf.batch_size do - pos[i] = nil +function Reader:get_data() + if self.offset > #self.seq then + return nil end - while true do - -- for i = 1, 26 do - local input = {} - local output = {} - for i = 1, global_conf.chunk_size do - input[i] = global_conf.mmat_type(global_conf.batch_size, 1) - input[i]:fill(global_conf.nn_act_default) - output[i] = global_conf.mmat_type(global_conf.batch_size, 1) - output[i]:fill(global_conf.nn_act_default) - end - local seq_start = {} - local seq_end = {} - local seq_len = {} - for i = 1, global_conf.batch_size do - seq_start[i] = false - seq_end[i] = false - seq_len[i] = 0 - end - local has_new = false - for i = 1, global_conf.batch_size do - if pos[i] == nil then - if offset < #self.seq then - seq_start[i] = true - pos[i] = {offset, 1} - offset = offset + 1 - end - end - if pos[i] ~= nil then - has_new = true - for j = 1, global_conf.chunk_size do - local final - input[j][i-1][0], output[j][i-1][0], final = self:get_in_out(pos[i][1], pos[i][2]) - seq_len[i] = j - if final then - seq_end[i] = true - pos[i] = nil - break - end - pos[i][2] = pos[i][2] + 1 - end - end - end - if not has_new then - break - end - for i = 1, global_conf.chunk_size do - input[i] = global_conf.cumat_type.new_from_host(input[i]) - output[i] = global_conf.cumat_type.new_from_host(output[i]) - end - table.insert(data, {input = input, output = output, seq_start = seq_start, seq_end = seq_end, seq_len = seq_len}) + local tmp = self.seq[self.offset] + local res = { + input = nerv.MMatrixFloat(#tmp - 1, 1), + label = nerv.MMatrixFloat(#tmp - 1, 1), + } + for i = 1, #tmp - 1 do + res.input[i - 1][0] = tmp[i] + res.label[i - 1][0] = tmp[i + 1] end - return data + self.offset = self.offset + 1 + return res end diff --git a/nerv/io/seq_buffer.lua b/nerv/io/seq_buffer.lua index e69de29..ad1b3f7 100644 --- a/nerv/io/seq_buffer.lua +++ b/nerv/io/seq_buffer.lua @@ -0,0 +1,105 @@ +local SeqBuffer = nerv.class('nerv.SeqBuffer', 'nerv.DataBuffer') + +function SeqBuffer:__init(global_conf, buffer_conf) + self.gconf = global_conf + + self.batch_size = buffer_conf.batch_size + self.chunk_size = buffer_conf.chunk_size + self.readers = buffer_conf.readers + self.nn_act_default = buffer_conf.nn_act_default + if self.nn_act_default == nil then + self.nn_act_default = 0 + end + + self.mat_type = self.gconf.mmat_type + self.queue = {} + self.head = 1 + self.tail = 0 +end + +function SeqBuffer:new_mini_batch() + local res = {} + res.data = {} + res.new_seq = {} + res.seq_length = {} + for i = 1, self.batch_size do + res.seq_length[i] = 0 + end + return res +end + +function SeqBuffer:saturate(batch) + if self.queue[self.head] ~= nil and self.queue[self.head].seq_length[batch] ~= 0 then + return true + end + local data = {} + local drow = nil + for i = 1, #self.readers do + local tmp = self.readers[i]:get_data() + if tmp == nil then + return false + end + for id, d in pairs(tmp) do + if drow == nil then + drow = d:nrow() + elseif d:nrow() ~= drow then + nerv.error('readers provides with inconsistent rows of data') + end + data[id] = d + end + end + local offset = 0 + local head = self.head + while offset < drow do + local last = math.min(offset + self.chunk_size, drow) + if head > self.tail then + self.tail = self.tail + 1 + self.queue[self.tail] = self:new_mini_batch() + end + self.queue[head].seq_length[batch] = last - offset + if offset == 0 then + table.insert(self.queue[head].new_seq, batch) + end + local mini_batch = self.queue[head].data + for id, d in pairs(data) do + if mini_batch[id] == nil then + mini_batch[id] = {} + end + local tmp = mini_batch[id] + for i = offset + 1, last do + local chunk = i - offset + if tmp[chunk] == nil then + tmp[chunk] = self.mat_type(self.batch_size, d:ncol()) + tmp[chunk]:fill(self.nn_act_default) + end + tmp[chunk]:copy_from(d, i - 1, i, batch - 1) + end + end + head = head + 1 + offset = last + end + return true +end + +function SeqBuffer:get_data() + local has_data = false + for i = 1, self.batch_size do + if self:saturate(i) then + has_data = true + end + end + if not has_data then + return nil + end + local res = self.queue[self.head] + self.queue[self.head] = nil + self.head = self.head + 1 + if not self.gconf.use_cpu then + for id, d in pairs(res.data) do + for i = 1, #d do + d[i] = self.gconf.cumat_type.new_from_host(d[i]) + end + end + end + return res +end -- cgit v1.2.3-70-g09d2