Skip to content

Commit

Permalink
Format
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongchun committed Jan 6, 2023
1 parent 94ec1e8 commit 7cccf56
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 5 deletions.
3 changes: 2 additions & 1 deletion mars/services/task/execution/mars/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ def __init__(
self._meta_api = meta_api

self._stage_processors = []
self._stage_id_to_processor = weakref.WeakValueDictionary()
self._stage_tile_progresses = []
self._stage_id_to_processor = weakref.WeakValueDictionary()
self._cur_stage_processor = None
self._result_tileables_lifecycle = None
self._subtask_decref_events = dict()
Expand Down
4 changes: 2 additions & 2 deletions mars/services/task/execution/ray/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1074,8 +1074,8 @@ def gc():
last_check_slow_time = curr_time
# Fast to next loop and give it a chance to update object_ref_to_subtask.
await asyncio.sleep(interval_seconds if len(ready_objects) == 0 else 0)
def get_stage_generation_order(self, stage_id: str):

def get_stage_generation_order(self, stage_id: str):
raise NotImplementedError(
"RayTaskExecutor doesn't support stage generation order."
)
5 changes: 3 additions & 2 deletions mars/services/task/supervisor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from ....typing import TileableType, ChunkType
from ....utils import Timer
from ...context import FailOverContext
from ...subtask import SubtaskResult, Subtask
from ...subtask import SubtaskResult, SubtaskGraph, Subtask
from ..core import Task, TaskResult, TaskStatus, new_task_id
from ..execution.api import TaskExecutor, ExecutionChunkResult
from .preprocessor import TaskPreprocessor
Expand Down Expand Up @@ -481,8 +481,9 @@ def _finish(self):

def is_done(self) -> bool:
return self.done.is_set()

def get_generation_order(self, stage_id: str):
return self._executor.get_stage_generation_order(stage_id)

def get_subtask(self, chunk_data_key: str):
return self._chunk_data_key_to_subtask.get(chunk_data_key)
return self._chunk_data_key_to_subtask.get(chunk_data_key)

0 comments on commit 7cccf56

Please sign in to comment.