Skip to content

Commit

Permalink
Format
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongchun committed Jan 6, 2023
1 parent 94ec1e8 commit 70f8786
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 10 deletions.
4 changes: 2 additions & 2 deletions mars/services/task/execution/mars/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
from .....utils import Timer
from ....context import ThreadedServiceContext
from ....cluster.api import ClusterAPI
from ....cluster.core import NodeStatus
from ....lifecycle.api import LifecycleAPI
from ....meta.api import MetaAPI
from ....scheduling import SchedulingAPI
Expand Down Expand Up @@ -95,7 +94,8 @@ def __init__(
self._meta_api = meta_api

self._stage_processors = []
self._stage_id_to_processor = weakref.WeakValueDictionary()
self._stage_tile_progresses = []
self._stage_id_to_processor = weakref.WeakValueDictionary()
self._cur_stage_processor = None
self._result_tileables_lifecycle = None
self._subtask_decref_events = dict()
Expand Down
6 changes: 3 additions & 3 deletions mars/services/task/execution/mars/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ async def set_subtask_result(self, result: SubtaskResult, band: BandType = None)
)
await self.cancel()
else:
await async_call(
self._scheduling_api.finish_subtasks([result.subtask_id], bands=[band])
await self._scheduling_api.finish_subtasks(
[result.subtask_id], bands=[band]
)
logger.debug(
"Continue to schedule subtasks after subtask %s finished.",
Expand All @@ -306,7 +306,7 @@ async def set_subtask_result(self, result: SubtaskResult, band: BandType = None)
self.task.task_id,
self.stage_id,
)
await async_call(self._schedule_subtasks([to_schedule_subtask]))
await self._schedule_subtasks([to_schedule_subtask])
except KeyError:
logger.exception("Got KeyError.")

Expand Down
4 changes: 2 additions & 2 deletions mars/services/task/execution/ray/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1074,8 +1074,8 @@ def gc():
last_check_slow_time = curr_time
# Fast to next loop and give it a chance to update object_ref_to_subtask.
await asyncio.sleep(interval_seconds if len(ready_objects) == 0 else 0)
def get_stage_generation_order(self, stage_id: str):

def get_stage_generation_order(self, stage_id: str):
raise NotImplementedError(
"RayTaskExecutor doesn't support stage generation order."
)
5 changes: 3 additions & 2 deletions mars/services/task/supervisor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from ....typing import TileableType, ChunkType
from ....utils import Timer
from ...context import FailOverContext
from ...subtask import SubtaskResult, Subtask
from ...subtask import SubtaskResult, SubtaskGraph, Subtask
from ..core import Task, TaskResult, TaskStatus, new_task_id
from ..execution.api import TaskExecutor, ExecutionChunkResult
from .preprocessor import TaskPreprocessor
Expand Down Expand Up @@ -481,8 +481,9 @@ def _finish(self):

def is_done(self) -> bool:
return self.done.is_set()

def get_generation_order(self, stage_id: str):
return self._executor.get_stage_generation_order(stage_id)

def get_subtask(self, chunk_data_key: str):
return self._chunk_data_key_to_subtask.get(chunk_data_key)
return self._chunk_data_key_to_subtask.get(chunk_data_key)
2 changes: 1 addition & 1 deletion mars/services/task/supervisor/tests/task_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def analyze(
self._config,
chunk_to_subtasks,
shuffle_fetch_type=shuffle_fetch_type,
stage_id=stage_id,
stage_id=stage_id,
)
subtask_graph = analyzer.gen_subtask_graph()
results = set(
Expand Down
2 changes: 2 additions & 0 deletions mars/storage/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import inspect
import logging

from typing import Any, Dict, List, Tuple
from ..lib import sparse
Expand All @@ -28,6 +29,7 @@
from .core import BufferWrappedFileObject, StorageFileObject

ray = lazy_import("ray")
logger = logging.getLogger(__name__)


# TODO(fyrestone): make the SparseMatrix pickleable.
Expand Down

0 comments on commit 70f8786

Please sign in to comment.