local DAGLayerT = nerv.class("nerv.DAGLayerT", "nerv.LayerT") local function parse_id(str) local id, port, _ _, _, id, port = string.find(str, "([a-zA-Z0-9_.]+)%[([0-9]+)%]") if id == nil or port == nil then _, _, id, port = string.find(str, "(.+)%[([0-9]+)%]") if not (id == "" or id == "") then nerv.error("wrong format of connection id") end end port = tonumber(port) return id, port end local function discover(id, layers, layer_repo) local ref = layers[id] if id == "" or id == "" then return nil end if ref == nil then local layer = layer_repo:get_layer(id) local dim_in, dim_out = layer:get_dim() ref = { id = layer.id, layer = layer, inputs = {}, outputs = {}, err_inputs = {}, err_outputs = {}, next_layers = {}, input_len = #dim_in, output_len = #dim_out, in_deg = 0, visited = false } layers[id] = ref end return ref end function DAGLayerT:__init(id, global_conf, layer_conf) local layers = {} local inputs = {} local outputs = {} local dim_in = layer_conf.dim_in local dim_out = layer_conf.dim_out local parsed_conn = {} for from, to in pairs(layer_conf.connections) do local id_from, port_from = parse_id(from) local id_to, port_to = parse_id(to) local ref_from = discover(id_from, layers, layer_conf.sub_layers) local ref_to = discover(id_to, layers, layer_conf.sub_layers) local input_dim, output_dim, _ if id_from == "" then input_dim, _ = ref_to.layer:get_dim() if dim_in[port_from] ~= input_dim[port_to] then nerv.error("mismatching data dimension between %s and %s", from, to) end inputs[port_from] = {ref_to, port_to} if ref_to.inputs[1] == nil then ref_to.inputs[1] = {} end if ref_to.inputs[1][port_to] ~= nil then nerv.error("port(%d) for layer(%s) already attached", port_to, to) end ref_to.inputs[1][port_to] = inputs -- just a place holder elseif id_to == "" then _, output_dim = ref_from.layer:get_dim() if output_dim[port_from] ~= dim_out[port_to] then nerv.error("mismatching data dimension between %s and %s", from, to) end outputs[port_to] = {ref_from, port_from} if ref_from.outputs[1] == nil then ref_from.outputs[1] = {} end if ref_from.outputs[1][port_from] ~= nil then nerv.error("port(%d) for layer(%s) already attached", port_from, from) end ref_from.outputs[1] = {} ref_from.outputs[1][port_from] = outputs -- just a place holder else _, output_dim = ref_from.layer:get_dim() input_dim, _ = ref_to.layer:get_dim() if output_dim[port_from] ~= input_dim[port_to] then nerv.error("mismatching data dimension between %s and %s", from, to) end table.insert(parsed_conn, {{ref_from, port_from}, {ref_to, port_to}}) table.insert(ref_from.next_layers, ref_to) -- add edge ref_to.in_deg = ref_to.in_deg + 1 -- increase the in-degree of the target layer end end -- topology sort local queue = {} local l = 1 local r = 1 for id, ref in pairs(layers) do if ref.in_deg == 0 then table.insert(queue, ref) nerv.info("adding source layer: %s", id) r = r + 1 end end if l == r then nerv.error("loop detected") end while l < r do local cur = queue[l] cur.visited = true l = l + 1 for _, nl in pairs(cur.next_layers) do nl.in_deg = nl.in_deg - 1 if nl.in_deg == 0 then table.insert(queue, nl) r = r + 1 end end end for i = 1, #queue do nerv.info("enqueued layer: %s %s", queue[i].layer, queue[i].layer.id) end for id, ref in pairs(layers) do -- check wether the graph is connected if ref.visited == false then nerv.warning("layer %s is ignored", id) end end self.layers = layers self.inputs = inputs self.outputs = outputs self.id = id self.dim_in = dim_in self.dim_out = dim_out self.parsed_conn = parsed_conn self.queue = queue self.gconf = global_conf end function DAGLayerT:init(batch_size, chunk_size) nerv.info("initing DAGLayerT %s...", self.id) if chunk_size == nil then chunk_size = 1 nerv.info("(Initing DAGLayerT) chunk_size is nil, setting it to default 1\n") end self.chunk_size = chunk_size for i, conn in ipairs(self.parsed_conn) do local _, output_dim local ref_from, port_from, ref_to, port_to ref_from, port_from = unpack(conn[1]) ref_to, port_to = unpack(conn[2]) _, output_dim = ref_from.layer:get_dim() local dim = 1 if output_dim[port_from] > 0 then dim = output_dim[port_from] end for t = 1, chunk_size do local mid = self.gconf.cumat_type(batch_size, dim) local err_mid = mid:create() if ref_from.outputs[t] == nil then ref_from.outputs[t] = {} end if ref_to.inputs[t] == nil then ref_to.inputs[t] = {} end if ref_to.err_outputs[t] == nil then ref_to.err_outputs[t] = {} end if ref_from.err_inputs[t] == nil then ref_from.err_inputs[t] = {} end ref_from.outputs[t][port_from] = mid ref_to.inputs[t][port_to] = mid ref_from.err_inputs[t][port_from] = err_mid ref_to.err_outputs[t][port_to] = err_mid end end for id, ref in pairs(self.layers) do for i = 1, ref.input_len do if ref.inputs[1][i] == nil then --peek at time 1 nerv.error("dangling input port %d of layer %s", i, id) end end for i = 1, ref.output_len do if ref.outputs[1][i] == nil then --peek at time 1 nerv.error("dangling output port %d of layer %s", i, id) end end -- initialize sub layers ref.layer:init(batch_size, chunk_size) end for i = 1, #self.dim_in do if self.inputs[i] == nil then nerv.error("dangling port %d of layer ", i) end end for i = 1, #self.dim_out do if self.outputs[i] == nil then nerv.error("dangling port %d of layer ", i) end end end function DAGLayerT:batch_resize(batch_size, chunk_size) if chunk_size == nil then chunk_size = 1 end if batch_size ~= self.gconf.batch_size or chunk_size ~= self.gconf.chunk_size then nerv.printf("warn: in DAGLayerT:batch_resize, the batch_size ~= gconf.batch_size, or chunk_size ~= gconf.chunk_size") end self.gconf.batch_size = batch_size self.gconf.chunk_size = chunk_size for i, conn in ipairs(self.parsed_conn) do local _, output_dim local ref_from, port_from, ref_to, port_to ref_from, port_from = unpack(conn[1]) ref_to, port_to = unpack(conn[2]) _, output_dim = ref_from.layer:get_dim() for t = 1, chunk_size do if ref_from.outputs[t] == nil then ref_from.outputs[t] = {} end if ref_to.inputs[t] == nil then ref_to.inputs[t] = {} end if ref_from.err_outputs[t] == nil then ref_from.err_outputs[t] = {} end if ref_from.err_inputs[t] == nil then ref_from.err_inputs[t] = {} end local mid = self.gconf.cumat_type(batch_size, dim) local err_mid = mid:create() ref_from.outputs[t][port_from] = mid ref_to.inputs[t][port_to] = mid ref_from.err_inputs[t][port_from] = err_mid ref_to.err_outputs[t][port_to] = err_mid end end for id, ref in pairs(self.layers) do ref.layer:batch_resize(batch_size, chunk_size) end collectgarbage("collect") end function DAGLayerT:set_inputs(input, t) for i = 1, #self.dim_in do if input[i] == nil then nerv.error("some input is not provided"); end local layer = self.inputs[i][1] local port = self.inputs[i][2] if layer.inputs[t] == nil then layer.inputs[t] = {} end layer.inputs[t][port] = input[i] end end function DAGLayerT:set_outputs(output, t) for i = 1, #self.dim_out do if output[i] == nil then nerv.error("some output is not provided"); end local layer = self.outputs[i][1] local port = self.outputs[i][2] if layer.outputs[t] == nil then layer.outputs[t] = {} end layer.outputs[t][port] = output[i] end end function DAGLayerT:set_err_inputs(bp_err, t) for i = 1, #self.dim_out do local layer = self.outputs[i][1] local port = self.outputs[i][2] if layer.err_inputs[t] == nil then layer.err_inputs[t] = {} end layer.err_inputs[t][port] = bp_err[i] end end function DAGLayerT:set_err_outputs(next_bp_err, t) for i = 1, #self.dim_in do local layer = self.inputs[i][1] local port = self.inputs[i][2] if layer.err_outputs[t] == nil then layer.err_outputs[t] = {} end layer.err_outputs[t][port] = next_bp_err[i] end end function DAGLayerT:update(bp_err, input, output, t) if t == nil then t = 1 end self:set_err_inputs(bp_err, t) self:set_inputs(input, t) self:set_outputs(output, t) for id, ref in pairs(self.queue) do ref.layer:update(ref.err_inputs[t], ref.inputs[t], ref.outputs[t], t) end end function DAGLayerT:propagate(input, output, t) if t == nil then t = 1 end self:set_inputs(input, t) self:set_outputs(output, t) local ret = false for i = 1, #self.queue do local ref = self.queue[i] --print("debug DAGLAyerT:propagate", ref.id, t) ret = ref.layer:propagate(ref.inputs[t], ref.outputs[t], t) end return ret end function DAGLayerT:back_propagate(bp_err, next_bp_err, input, output, t) if t == nil then t = 1 end self:set_err_outputs(next_bp_err, t) self:set_err_inputs(bp_err, t) self:set_inputs(input, t) self:set_outputs(output, t) for i = #self.queue, 1, -1 do local ref = self.queue[i] ref.layer:back_propagate(ref.err_inputs[t], ref.err_outputs[t], ref.inputs[t], ref.outputs[t], t) end end function DAGLayerT:get_params() local param_repos = {} for id, ref in pairs(self.queue) do table.insert(param_repos, ref.layer:get_params()) end return nerv.ParamRepo.merge(param_repos) end DAGLayerT.PORT_TYPES = { INPUT = {}, OUTPUT = {}, ERR_INPUT = {}, ERR_OUTPUT = {} } function DAGLayerT:get_intermediate(id, port_type) if id == "" or id == "" then nerv.error("an actual real layer id is expected") end local layer = self.layers[id] if layer == nil then nerv.error("layer id %s not found", id) end if port_type == DAGLayerT.PORT_TYPES.INPUT then return layer.inputs elseif port_type == DAGLayerT.PORT_TYPES.OUTPUT then return layer.outputs elseif port_type == DAGLayerT.PORT_TYPES.ERR_INPUT then return layer.err_inputs elseif port_type == DAGLayerT.PORT_TYPES.ERR_OUTPUT then return layer.err_outputs end nerv.error("unrecognized port type") end