From 7cccf56364c73c441f46c6b0b4a9ad485701d814 Mon Sep 17 00:00:00 2001 From: zhongchun Date: Fri, 6 Jan 2023 15:44:39 +0800 Subject: [PATCH] Format --- mars/services/task/execution/mars/executor.py | 3 ++- mars/services/task/execution/ray/executor.py | 4 ++-- mars/services/task/supervisor/processor.py | 5 +++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/mars/services/task/execution/mars/executor.py b/mars/services/task/execution/mars/executor.py index 0ff0112010..3827cc5c71 100644 --- a/mars/services/task/execution/mars/executor.py +++ b/mars/services/task/execution/mars/executor.py @@ -95,7 +95,8 @@ def __init__( self._meta_api = meta_api self._stage_processors = [] - self._stage_id_to_processor = weakref.WeakValueDictionary() + self._stage_tile_progresses = [] + self._stage_id_to_processor = weakref.WeakValueDictionary() self._cur_stage_processor = None self._result_tileables_lifecycle = None self._subtask_decref_events = dict() diff --git a/mars/services/task/execution/ray/executor.py b/mars/services/task/execution/ray/executor.py index 6b80f3a10d..17df54bf60 100644 --- a/mars/services/task/execution/ray/executor.py +++ b/mars/services/task/execution/ray/executor.py @@ -1074,8 +1074,8 @@ def gc(): last_check_slow_time = curr_time # Fast to next loop and give it a chance to update object_ref_to_subtask. await asyncio.sleep(interval_seconds if len(ready_objects) == 0 else 0) - - def get_stage_generation_order(self, stage_id: str): + + def get_stage_generation_order(self, stage_id: str): raise NotImplementedError( "RayTaskExecutor doesn't support stage generation order." ) diff --git a/mars/services/task/supervisor/processor.py b/mars/services/task/supervisor/processor.py index 1b46440281..a25fe0b9cf 100644 --- a/mars/services/task/supervisor/processor.py +++ b/mars/services/task/supervisor/processor.py @@ -30,7 +30,7 @@ from ....typing import TileableType, ChunkType from ....utils import Timer from ...context import FailOverContext -from ...subtask import SubtaskResult, Subtask +from ...subtask import SubtaskResult, SubtaskGraph, Subtask from ..core import Task, TaskResult, TaskStatus, new_task_id from ..execution.api import TaskExecutor, ExecutionChunkResult from .preprocessor import TaskPreprocessor @@ -481,8 +481,9 @@ def _finish(self): def is_done(self) -> bool: return self.done.is_set() + def get_generation_order(self, stage_id: str): return self._executor.get_stage_generation_order(stage_id) def get_subtask(self, chunk_data_key: str): - return self._chunk_data_key_to_subtask.get(chunk_data_key) \ No newline at end of file + return self._chunk_data_key_to_subtask.get(chunk_data_key)