From b767644b4d5a205fc071eeddc9902470401b8146 Mon Sep 17 00:00:00 2001 From: hekaisheng Date: Fri, 13 May 2022 18:01:00 +0800 Subject: [PATCH] Use tell for prefetch --- mars/services/subtask/worker/processor.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/mars/services/subtask/worker/processor.py b/mars/services/subtask/worker/processor.py index 78b916eee9..db0e99f168 100644 --- a/mars/services/subtask/worker/processor.py +++ b/mars/services/subtask/worker/processor.py @@ -22,13 +22,7 @@ from .... import oscar as mo from ....core import ChunkGraph, OperandType, enter_mode, ExecutionError from ....core.context import get_context, set_context -from ....core.operand import ( - Fetch, - FetchShuffle, - execute, - MapReduceOperand, - OperandStage, -) +from ....core.operand import Fetch, FetchShuffle, execute from ....metrics import Metrics from ....optimization.physical import optimize from ....typing import BandType, ChunkType @@ -426,7 +420,7 @@ async def set_chunks_meta(): # set result data size self.result.data_size = result_data_size - async def push_mapper_data(self, chunk_graph): + async def _push_mapper_data(self, chunk_graph): # TODO: use task api to get reducer bands reducer_idx_to_band = dict() if not reducer_idx_to_band: @@ -520,7 +514,7 @@ async def run(self): await self.done() # after done, we push mapper data to reducers in advance. - await self.push_mapper_data(chunk_graph) + await self.ref()._push_mapper_data.tell(chunk_graph) if self.result.status == SubtaskStatus.succeeded: cost_time_secs = ( self.result.execution_end_time - self.result.execution_start_time