diff --git a/mars/services/task/analyzer/analyzer.py b/mars/services/task/analyzer/analyzer.py index f797419440..95b694ae86 100644 --- a/mars/services/task/analyzer/analyzer.py +++ b/mars/services/task/analyzer/analyzer.py @@ -273,11 +273,13 @@ def _gen_subtask_info( chunk_priority = chunk.op.priority # process input chunks inp_chunks = [] + input_changed = False build_fetch_index_to_chunks = dict() for i, inp_chunk in enumerate(chunk.inputs): if inp_chunk in chunks_set: inp_chunks.append(chunk_to_copied[inp_chunk]) else: + input_changed = True build_fetch_index_to_chunks[i] = inp_chunk inp_chunks.append(None) if not isinstance(inp_chunk.op, Fetch): @@ -287,14 +289,23 @@ def _gen_subtask_info( ) for i, fetch_chunk in zip(build_fetch_index_to_chunks, fetch_chunks): inp_chunks[i] = fetch_chunk - copied_op = chunk.op.copy() - copied_op._key = chunk.op.key - out_chunks = [ - c.data - for c in copied_op.new_chunks( - inp_chunks, kws=[c.params.copy() for c in chunk.op.outputs] - ) - ] + + if input_changed: + copied_op = chunk.op.copy() + copied_op._key = chunk.op.key + out_chunks = [ + c.data + for c in copied_op.new_chunks( + inp_chunks, kws=[c.params.copy() for c in chunk.op.outputs] + ) + ] + else: + out_chunks = chunk.op.outputs + # Note: `dtypes`, `index_value`, and `columns_value` are lazily + # initialized, so we should call property `params` to initialize + # these fields. + [o.params for o in out_chunks] + for src_chunk, out_chunk in zip(chunk.op.outputs, out_chunks): processed.add(src_chunk) out_chunk._key = src_chunk.key