diff --git a/mars/deploy/oscar/base_config.yml b/mars/deploy/oscar/base_config.yml index 8fe463234d..23dcf327a5 100644 --- a/mars/deploy/oscar/base_config.yml +++ b/mars/deploy/oscar/base_config.yml @@ -57,6 +57,8 @@ scheduling: # Max number of concurrent speculative run for a subtask. max_concurrent_run: 3 subtask_cancel_timeout: 5 + failover: + enable_lineage: no metrics: backend: console # If backend is prometheus, then we can add prometheus config as follows: diff --git a/mars/deploy/oscar/ray.py b/mars/deploy/oscar/ray.py index 1f918d87bb..a7b6be8e12 100644 --- a/mars/deploy/oscar/ray.py +++ b/mars/deploy/oscar/ray.py @@ -632,7 +632,10 @@ async def stop(self): for worker_address in self._worker_addresses: await stop_worker(worker_address, self._config) for pool in self._worker_pools: - await pool.actor_pool.remote("stop") + try: + await pool.actor_pool.remote("stop") + except: + pass if self._supervisor_pool is not None: await stop_supervisor(self.supervisor_address, self._config) await self._supervisor_pool.actor_pool.remote("stop") diff --git a/mars/deploy/oscar/tests/fault_injection_config_with_fo.yml b/mars/deploy/oscar/tests/fault_injection_config_with_fo.yml new file mode 100644 index 0000000000..ad2f74236a --- /dev/null +++ b/mars/deploy/oscar/tests/fault_injection_config_with_fo.yml @@ -0,0 +1,8 @@ +"@inherits": '@default' +third_party_modules: + - mars.services.tests.fault_injection_patch +scheduling: + subtask_max_retries: 2 + subtask_max_reschedules: 2 + failover: + enable_lineage: yes diff --git a/mars/deploy/oscar/tests/test_fault_injection.py b/mars/deploy/oscar/tests/test_fault_injection.py index 3050529f26..b13ccd81e1 100644 --- a/mars/deploy/oscar/tests/test_fault_injection.py +++ b/mars/deploy/oscar/tests/test_fault_injection.py @@ -23,12 +23,12 @@ from ....oscar.errors import ServerClosed from ....remote import spawn from ....services.tests.fault_injection_manager import ( - AbstractFaultInjectionManager, ExtraConfigKey, FaultInjectionError, FaultInjectionUnhandledError, FaultPosition, FaultType, + create_fault_injection_manager, ) from ....tensor.base.psrs import PSRSConcatPivot from ..local import new_cluster @@ -54,32 +54,6 @@ async def fault_cluster(request): yield client -async def create_fault_injection_manager( - session_id, address, fault_count, fault_type, fault_op_types=None -): - class FaultInjectionManager(AbstractFaultInjectionManager): - def __init__(self): - self._fault_count = fault_count - - def set_fault_count(self, count): - self._fault_count = count - - def get_fault_count(self): - return self._fault_count - - def get_fault(self, pos: FaultPosition, ctx=None) -> FaultType: - # Check op types if fault_op_types provided. - if fault_op_types and type(ctx.get("operand")) not in fault_op_types: - return FaultType.NoFault - if self._fault_count.get(pos, 0) > 0: - self._fault_count[pos] -= 1 - return fault_type - return FaultType.NoFault - - await FaultInjectionManager.create(session_id, address) - return FaultInjectionManager.name - - @pytest.mark.parametrize( "fault_and_exception", [ diff --git a/mars/deploy/oscar/tests/test_ray_error_recovery.py b/mars/deploy/oscar/tests/test_ray_error_recovery.py new file mode 100644 index 0000000000..a44061ba74 --- /dev/null +++ b/mars/deploy/oscar/tests/test_ray_error_recovery.py @@ -0,0 +1,114 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import numpy as np +import pandas as pd +import pytest + +from ....oscar.errors import ServerClosed +from ....remote import spawn +from ....services.tests.fault_injection_manager import ( + FaultType, + FaultPosition, + create_fault_injection_manager, + ExtraConfigKey, +) +from ....tests.core import require_ray +from ....utils import lazy_import + +from .... import tensor as mt +from .... import dataframe as md +from ..ray import new_cluster + +ray = lazy_import("ray") + +CONFIG_FILE = os.path.join(os.path.dirname(__file__), "local_test_with_ray_config.yml") +FAULT_INJECTION_CONFIG_FILE = os.path.join( + os.path.dirname(__file__), "fault_injection_config_with_fo.yml" +) + + +@pytest.fixture +async def fault_ray_cluster(request): + param = getattr(request, "param", {}) + client = await new_cluster( + config=param.get("config", CONFIG_FILE), + worker_num=2, + worker_cpu=2, + ) + async with client: + yield client + + +@pytest.mark.parametrize( + "fault_ray_cluster", [{"config": FAULT_INJECTION_CONFIG_FILE}], indirect=True +) +@pytest.mark.parametrize( + "fault_config", + [ + [ + FaultType.ProcessExit, + {FaultPosition.ON_RUN_SUBTASK: 1}, + pytest.raises(ServerClosed), + ["_UnretryableException", "*"], + ], + ], +) +@require_ray +@pytest.mark.asyncio +async def test_node_failover(fault_ray_cluster, fault_config): + fault_type, fault_count, expect_raises, exception_match = fault_config + name = await create_fault_injection_manager( + session_id=fault_ray_cluster.session.session_id, + address=fault_ray_cluster.session.address, + fault_count=fault_count, + fault_type=fault_type, + ) + + columns = list("ABCDEFGHIJ") + width = len(columns) + + df1 = md.DataFrame( + mt.random.randint( + 1, + 100, + size=(100, width), + chunk_size=(20, width), + ), + columns=columns, + ) + df2 = df1.execute() + pd1 = df2.to_pandas() + + df3 = df2.rechunk(chunk_size=(10, width)) + df4 = df3.execute() + + def f(x): + return x + 1 + + r = spawn(f, args=(1,), retry_when_fail=False) + with expect_raises: + r.execute(extra_config={ExtraConfigKey.FAULT_INJECTION_MANAGER_NAME: name}) + + df5 = df4.apply( + f, + axis=1, + dtypes=pd.Series([np.dtype(np.int64)] * width, index=columns), + output_type="dataframe", + ) + df6 = df5.execute() + pd2 = df6.to_pandas() + + pd.testing.assert_frame_equal(pd1 + 1, pd2) diff --git a/mars/oscar/backends/core.py b/mars/oscar/backends/core.py index a575bb4212..18cc3eb92e 100644 --- a/mars/oscar/backends/core.py +++ b/mars/oscar/backends/core.py @@ -66,7 +66,8 @@ async def _listen(self, client: Client): # close failed, ignore it pass raise ServerClosed( - f"Remote server {client.dest_address} closed" + f"Remote server {client.dest_address} closed", + address=client.dest_address, ) from None future = self._client_to_message_futures[client].pop(message.message_id) future.set_result(message) @@ -94,7 +95,10 @@ async def _listen(self, client: Client): message_futures = self._client_to_message_futures.get(client) self._client_to_message_futures[client] = dict() - error = ServerClosed(f"Remote server {client.dest_address} closed") + error = ServerClosed( + f"Remote server {client.dest_address} closed", + address=client.dest_address, + ) for future in message_futures.values(): future.set_exception(copy.copy(error)) @@ -119,7 +123,10 @@ async def call( except ConnectionError: # close failed, ignore it pass - raise ServerClosed(f"Remote server {client.dest_address} closed") + raise ServerClosed( + f"Remote server {client.dest_address} closed", + address=client.dest_address, + ) if not wait: r = wait_response diff --git a/mars/oscar/backends/ray/communication.py b/mars/oscar/backends/ray/communication.py index 85f2f069be..99352291ba 100644 --- a/mars/oscar/backends/ray/communication.py +++ b/mars/oscar/backends/ray/communication.py @@ -505,7 +505,8 @@ async def __on_ray_recv__(self, channel_id: ChannelID, message): if self.stopped: raise ServerClosed( f"Remote server {self.address} closed, but got message {message} " - f"from channel {channel_id}" + f"from channel {channel_id}", + address=self.address, ) channel = self._channels.get(channel_id) if not channel: diff --git a/mars/oscar/backends/ray/pool.py b/mars/oscar/backends/ray/pool.py index 820b3757da..c03c93131e 100644 --- a/mars/oscar/backends/ray/pool.py +++ b/mars/oscar/backends/ray/pool.py @@ -246,7 +246,10 @@ async def __on_ray_recv__(self, channel_id: ChannelID, message): """Method for communication based on ray actors""" try: if self._ray_server is None: - raise ServerClosed(f"Remote server {channel_id.dest_address} closed") + raise ServerClosed( + f"Remote server {channel_id.dest_address} closed", + address=channel_id.dest_address, + ) return await self._ray_server.__on_ray_recv__(channel_id, message) except Exception: # pragma: no cover return RayChannelException(*sys.exc_info()) diff --git a/mars/oscar/errors.py b/mars/oscar/errors.py index 966d26d185..10e3493479 100644 --- a/mars/oscar/errors.py +++ b/mars/oscar/errors.py @@ -46,6 +46,20 @@ class SlotStateError(MarsError): class ServerClosed(MarsError): + def __init__(self, *args, **kwargs): + self._address = kwargs.pop("address", None) + super().__init__(*args, **kwargs) + + @property + def address(self): + return self._address + + +class DataNotExist(MarsError): + pass + + +class DuplicatedSubtaskError(MarsError): pass diff --git a/mars/services/cluster/supervisor/node_info.py b/mars/services/cluster/supervisor/node_info.py index d024e60bfe..041822cb1d 100644 --- a/mars/services/cluster/supervisor/node_info.py +++ b/mars/services/cluster/supervisor/node_info.py @@ -140,20 +140,26 @@ def get_all_bands( statuses = statuses or {NodeStatus.READY} role = role or NodeRole.WORKER nodes = self._role_to_nodes.get(role, []) + return self.get_bands(nodes, statuses) + + def get_bands( + self, addresses: List[str], statuses: Set[NodeStatus] = None + ) -> Dict[BandType, Resource]: + statuses = statuses or {NodeStatus.READY} band_resource = dict() - for node in nodes: - if self._node_infos[node].status not in statuses: + for address in addresses: + if self._node_infos[address].status not in statuses: continue - node_resource = self._node_infos[node].resource + node_resource = self._node_infos[address].resource for resource_type, info in node_resource.items(): if resource_type.startswith("numa"): # cpu - band_resource[(node, resource_type)] = Resource( + band_resource[(address, resource_type)] = Resource( num_cpus=info["cpu_total"], mem_bytes=info["memory_total"] ) else: # pragma: no cover assert resource_type.startswith("gpu") - band_resource[(node, resource_type)] = Resource( + band_resource[(address, resource_type)] = Resource( num_gpus=info["gpu_total"] ) return band_resource diff --git a/mars/services/context.py b/mars/services/context.py index b2d9095fcf..bd4efad61c 100644 --- a/mars/services/context.py +++ b/mars/services/context.py @@ -295,3 +295,21 @@ def wrap(*args, **kwargs): return fut.result() return wrap + + +class _FailOverContext: + def __init__(self): + self._enable_lineage = False + self.subtask_to_dependency_subtasks = defaultdict(set) + + def enable_lineage(self): + self._enable_lineage = True + + def is_lineage_enabled(self): + return self._enable_lineage + + def cleanup(self): + self.subtask_to_dependency_subtasks.clear() + + +FailOverContext = _FailOverContext() diff --git a/mars/services/lifecycle/supervisor/tests/test_tracker.py b/mars/services/lifecycle/supervisor/tests/test_tracker.py index 3e1f8a6249..1de9627465 100644 --- a/mars/services/lifecycle/supervisor/tests/test_tracker.py +++ b/mars/services/lifecycle/supervisor/tests/test_tracker.py @@ -18,10 +18,11 @@ from ..... import oscar as mo from ..... import tensor as mt from .....core import tile +from .....oscar.errors import DataNotExist from ....cluster import MockClusterAPI from ....meta import MockMetaAPI from ....session import MockSessionAPI -from ....storage import MockStorageAPI, DataNotExist +from ....storage import MockStorageAPI from ....task.supervisor.manager import TaskManagerActor from ... import TileableNotTracked from ...supervisor.tracker import LifecycleTrackerActor diff --git a/mars/services/scheduling/api/oscar.py b/mars/services/scheduling/api/oscar.py index 40a78c935d..249a4099dd 100644 --- a/mars/services/scheduling/api/oscar.py +++ b/mars/services/scheduling/api/oscar.py @@ -71,7 +71,10 @@ async def get_subtask_schedule_summaries( return await self._manager_ref.get_schedule_summaries(task_id) async def add_subtasks( - self, subtasks: List[Subtask], priorities: Optional[List[Tuple]] = None + self, + subtasks: List[Subtask], + priorities: Optional[List[Tuple]] = None, + schedule_lost_objects: bool = False, ): """ Submit subtasks into scheduling service @@ -85,7 +88,9 @@ async def add_subtasks( """ if priorities is None: priorities = [subtask.priority or tuple() for subtask in subtasks] - await self._manager_ref.add_subtasks(subtasks, priorities) + await self._manager_ref.add_subtasks( + subtasks, priorities, schedule_lost_objects + ) @mo.extensible async def update_subtask_priority(self, subtask_id: str, priority: Tuple): @@ -155,6 +160,10 @@ async def try_enable_autoscale_in(self): `disable_autoscale_in` has been invoked.""" await self._autoscaler.try_enable_autoscale_in() + @property + def address(self): + return self._address + class MockSchedulingAPI(SchedulingAPI): @classmethod diff --git a/mars/services/scheduling/supervisor/assigner.py b/mars/services/scheduling/supervisor/assigner.py index ad478cac25..bdcea64200 100644 --- a/mars/services/scheduling/supervisor/assigner.py +++ b/mars/services/scheduling/supervisor/assigner.py @@ -216,6 +216,8 @@ async def assign_subtasks( async def reassign_subtasks( self, band_to_queued_num: Dict[BandType, int] ) -> Dict[BandType, int]: + # Note: bands may change + self._update_bands(list(await self._cluster_api.get_all_bands(NodeRole.WORKER))) move_queued_subtasks = {} for is_gpu in (False, True): band_name_prefix = "numa" if not is_gpu else "gpu" diff --git a/mars/services/scheduling/supervisor/manager.py b/mars/services/scheduling/supervisor/manager.py index 8c24713bb0..8e2f6e32fe 100644 --- a/mars/services/scheduling/supervisor/manager.py +++ b/mars/services/scheduling/supervisor/manager.py @@ -19,11 +19,14 @@ from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple, Union +from ... import NodeRole +from ...cluster.core import NodeStatus +from ...cluster.supervisor.node_info import NodeInfoCollectorActor from .... import oscar as mo from ....lib.aio import alru_cache from ....metrics import Metrics from ....oscar.backends.context import ProfilingContext -from ....oscar.errors import MarsError +from ....oscar.errors import MarsError, ServerClosed, DuplicatedSubtaskError from ....oscar.profiling import ProfilingData, MARS_ENABLE_PROFILING from ....typing import BandType from ....utils import dataslots, Timer @@ -123,6 +126,11 @@ async def __post_create__(self): ) await self._speculation_execution_scheduler.start() + self._node_info_ref = await mo.actor_ref( + NodeInfoCollectorActor.default_uid(), + address=self.address, + ) + async def __pre_destroy__(self): await self._speculation_execution_scheduler.stop() @@ -130,7 +138,12 @@ async def __pre_destroy__(self): async def _get_task_api(self): return await TaskAPI.create(self._session_id, self.address) - async def add_subtasks(self, subtasks: List[Subtask], priorities: List[Tuple]): + async def add_subtasks( + self, + subtasks: List[Subtask], + priorities: List[Tuple], + schedule_lost_objects: bool = False, + ): async with redirect_subtask_errors(self, subtasks): for subtask in subtasks: # the extra_config may be None. the extra config overwrites the default value. @@ -142,7 +155,9 @@ async def add_subtasks(self, subtasks: List[Subtask], priorities: List[Tuple]): if subtask_max_reschedules is None: subtask_max_reschedules = self._subtask_max_reschedules if subtask.subtask_id in self._subtask_infos: # pragma: no cover - raise KeyError(f"Subtask {subtask.subtask_id} already added.") + raise DuplicatedSubtaskError( + f"Subtask {subtask.subtask_id} already added." + ) self._subtask_infos[subtask.subtask_id] = SubtaskScheduleInfo( subtask, max_reschedules=subtask_max_reschedules ) @@ -161,7 +176,9 @@ async def add_subtasks(self, subtasks: List[Subtask], priorities: List[Tuple]): ) ) await self._queueing_ref.add_subtasks( - [subtask for subtask in subtasks if not subtask.virtual], priorities + [subtask for subtask in subtasks if not subtask.virtual], + priorities, + schedule_lost_objects=schedule_lost_objects, ) await self._queueing_ref.submit_subtasks.tell() @@ -177,7 +194,7 @@ async def finish_subtasks( bands: List[BandType] = None, schedule_next: bool = True, ): - logger.debug("Finished subtasks %s.", subtask_ids) + logger.debug("Finished subtasks %s in bands %s.", subtask_ids, bands) band_tasks = defaultdict(lambda: 0) bands = bands or [None] * len(subtask_ids) for subtask_id, subtask_band in zip(subtask_ids, bands): @@ -202,7 +219,7 @@ async def finish_subtasks( await aio_task if schedule_next: band_tasks[subtask_band] += 1 - if subtask_info.band_futures: + if subtask_band is not None and subtask_info.band_futures: # Cancel subtask here won't change subtask status. # See more in `TaskProcessorActor.set_subtask_result` logger.info( @@ -268,7 +285,12 @@ async def submit_subtask_to_band(self, subtask_id: str, band: BandType): "stage_id": subtask_info.subtask.stage_id, }, ) - logger.debug("Start run subtask %s in band %s.", subtask_id, band) + logger.debug( + "Start run subtask %s of stage %s in band %s.", + subtask_id, + subtask_info.subtask.stage_id, + band, + ) with Timer() as timer: task = asyncio.create_task( execution_ref.run_subtask.options( @@ -283,10 +305,44 @@ async def submit_subtask_to_band(self, subtask_id: str, band: BandType): subtask_info.subtask, band, timer.duration ) task_api = await self._get_task_api() - logger.debug("Finished subtask %s with result %s.", subtask_id, result) + logger.debug( + "Finished subtask %s of stage %s with result %s in band %s.", + subtask_id, + subtask_info.subtask.stage_id, + result.status, + band, + ) await task_api.set_subtask_result(result) except (OSError, MarsError) as ex: - # TODO: We should handle ServerClosed Error. + if isinstance(ex, ServerClosed): + # 1. reassign subtasks of closed node + band_resource = await self._node_info_ref.get_bands([ex.address]) + logger.debug("Got bands %s of node %s.", band_resource, ex.address) + for band in band_resource.keys(): + reassigned_queue = await self._queueing_ref.get_band_queue(band) + reassigned_subtasks = [] + priorities = [] + for item in reassigned_queue: + reassigned_subtasks.append(item.subtask) + priorities.append(item.subtask.priority) + await self._queueing_ref.add_subtasks( + reassigned_subtasks, + priorities, + exclude_bands=set([band]), + ) + await self._queueing_ref.remove_queue_from_band(band) + logger.info( + "Reassigned %d subtasks of band %s to other nodes.", + len(reassigned_subtasks), + band, + ) + # 2. mark closed node stopped + await self._node_info_ref.update_node_info( + address=ex.address, + role=NodeRole.WORKER, + status=NodeStatus.STOPPED, + ) + logger.info("Node %s marked as stopped.", ex.address) if ( subtask_info.subtask.retryable and subtask_info.num_reschedules < subtask_info.max_reschedules diff --git a/mars/services/scheduling/supervisor/queueing.py b/mars/services/scheduling/supervisor/queueing.py index 97c5d5f6b3..ab77e628b9 100644 --- a/mars/services/scheduling/supervisor/queueing.py +++ b/mars/services/scheduling/supervisor/queueing.py @@ -44,9 +44,20 @@ def __lt__(self, other: "HeapItem"): return self.priority > other.priority +@dataslots +@dataclass +class LostHeapItem: + subtask: Subtask + priority: Tuple + + def __lt__(self, other: "LostHeapItem"): + return self.priority < other.priority + + class SubtaskQueueingActor(mo.Actor): _stid_to_bands: DefaultDict[str, List[Tuple]] _stid_to_items: Dict[str, HeapItem] + _lost_subtask_items: List[LostHeapItem] _band_queues: DefaultDict[Tuple, List[HeapItem]] @classmethod @@ -57,6 +68,7 @@ def __init__(self, session_id: str, submit_period: Union[float, int] = None): self._session_id = session_id self._stid_to_bands = defaultdict(list) self._stid_to_items = dict() + self._lost_subtask_queue = list() # Note that we need to ensure top item in every band heap queue is valid, # so that we can ensure band queue is busy if the band queue is not empty. self._band_queues = defaultdict(list) @@ -167,7 +179,27 @@ async def add_subtasks( priorities: List[Tuple], exclude_bands: Set[Tuple] = None, random_when_unavailable: bool = True, + schedule_lost_objects: bool = False, ): + if schedule_lost_objects: + for subtask, priority in zip(subtasks, priorities): + if subtask not in self._lost_subtask_queue: + heap_item = self._stid_to_items[subtask.subtask_id] = LostHeapItem( + subtask, priority + ) + heapq.heappush(self._lost_subtask_queue, heap_item) + logger.info( + "Subtask %s with priority %s enqueued to lost subtask queue.", + subtask.subtask_id, + priority, + ) + else: + logger.info( + "Subtask %s has been in lost subtask queue, so do not enqueue it again.", + subtask.subtask_id, + ) + return + bands = await self._assigner_ref.assign_subtasks( subtasks, exclude_bands, random_when_unavailable ) @@ -206,7 +238,7 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None) self._band_to_resource[band].num_cpus or self._band_to_resource[band].num_gpus ) - task_queue = self._band_queues[band] + task_queue = self._lost_subtask_queue or self._band_queues.get(band, list()) submit_items = dict() while ( self._ensure_top_item_valid(task_queue) @@ -251,8 +283,11 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None) submitted_bands, submit_items_list, submitted_ids_list ): subtask_ids = list(submit_items) - task_queue = self._band_queues[band] - + task_queue = ( + self._lost_subtask_queue + if isinstance(list(submit_items.values())[0], LostHeapItem) + else self._band_queues[band] + ) async with redirect_subtask_errors( self, [item.subtask for item in submit_items.values()] ): @@ -323,6 +358,12 @@ async def all_bands_busy(self) -> bool: return all(len(self._band_queues[band]) > 0 for band in bands) return False + def get_band_queue(self, band: str): + return self._band_queues.get(band, list()) + + def remove_queue_from_band(self, band: str): + self._band_queues.pop(band, None) + async def balance_queued_subtasks(self): # record length of band queues band_num_queued_subtasks = { @@ -342,6 +383,11 @@ async def balance_queued_subtasks(self): item = heapq.heappop(task_queue) self._stid_to_bands[item.subtask.subtask_id].remove(band) items.append(item) + logger.debug( + "Removed subtask %s from band %s.", + item.subtask.subtask_id, + band, + ) elif move > 0: item = items.pop() self._stid_to_bands[item.subtask.subtask_id].append(band) diff --git a/mars/services/scheduling/supervisor/service.py b/mars/services/scheduling/supervisor/service.py index db026c412c..cd6004b3d3 100644 --- a/mars/services/scheduling/supervisor/service.py +++ b/mars/services/scheduling/supervisor/service.py @@ -15,6 +15,7 @@ import asyncio from .... import oscar as mo +from ...context import FailOverContext from ...core import AbstractService from .autoscale import AutoscalerActor from .manager import DEFAULT_SUBTASK_MAX_RESCHEDULES @@ -126,6 +127,11 @@ async def create_session(self, session_id: str): ) await autoscaler_ref.register_session(session_id, self._address) + # Initialize failover context + failover_config = scheduling_config.get("failover", {}) + if failover_config.get("enable_lineage"): + FailOverContext.enable_lineage() + async def destroy_session(self, session_id: str): from .queueing import SubtaskQueueingActor from .manager import SubtaskManagerActor diff --git a/mars/services/scheduling/supervisor/tests/test_manager.py b/mars/services/scheduling/supervisor/tests/test_manager.py index c4ad5c895c..2f0a7fc94e 100644 --- a/mars/services/scheduling/supervisor/tests/test_manager.py +++ b/mars/services/scheduling/supervisor/tests/test_manager.py @@ -53,6 +53,7 @@ def add_subtasks( priorities: List[Tuple], exclude_bands: Set[Tuple] = None, random_when_unavailable: bool = True, + schedule_lost_objects: bool = False, ): if self._error is not None: raise self._error diff --git a/mars/services/scheduling/worker/execution.py b/mars/services/scheduling/worker/execution.py index f811fbf8db..c64f1d0e96 100644 --- a/mars/services/scheduling/worker/execution.py +++ b/mars/services/scheduling/worker/execution.py @@ -28,7 +28,7 @@ from ....core.operand import Fetch, FetchShuffle from ....lib.aio import alru_cache from ....metrics import Metrics -from ....oscar.errors import MarsError +from ....oscar.errors import MarsError, DuplicatedSubtaskError from ....storage import StorageLevel from ....utils import dataslots, get_chunk_key_to_data_keys, wrap_exception from ...cluster import ClusterAPI @@ -519,7 +519,7 @@ async def run_subtask( self, subtask: Subtask, band_name: str, supervisor_address: str ): if subtask.subtask_id in self._subtask_info: # pragma: no cover - raise Exception( + raise DuplicatedSubtaskError( f"Subtask {subtask.subtask_id} is already running on this band[{self.address}]." ) logger.debug( diff --git a/mars/services/storage/__init__.py b/mars/services/storage/__init__.py index 435237e25f..82654ddfcd 100644 --- a/mars/services/storage/__init__.py +++ b/mars/services/storage/__init__.py @@ -14,4 +14,3 @@ from .api import StorageAPI, MockStorageAPI, WebStorageAPI from .core import DataInfo -from .errors import DataNotExist diff --git a/mars/services/storage/core.py b/mars/services/storage/core.py index eedc375389..0c6a7834f0 100644 --- a/mars/services/storage/core.py +++ b/mars/services/storage/core.py @@ -21,12 +21,13 @@ from ... import oscar as mo from ...lib.aio import AioFileObject from ...oscar.backends.allocate_strategy import IdleLabel, NoIdleSlot +from ...oscar.errors import DataNotExist from ...resource import cuda_card_stats from ...storage import StorageLevel, get_storage_backend from ...storage.base import ObjectInfo, StorageBackend from ...storage.core import StorageFileObject from ...utils import dataslots -from .errors import DataNotExist, StorageFull +from .errors import StorageFull logger = logging.getLogger(__name__) diff --git a/mars/services/storage/errors.py b/mars/services/storage/errors.py index 5ff7586c80..7e4103d72f 100644 --- a/mars/services/storage/errors.py +++ b/mars/services/storage/errors.py @@ -13,10 +13,6 @@ # limitations under the License. from ...core.base import MarsError -from ...storage.errors import DataNotExist - - -DataNotExist = DataNotExist class NoDataToSpill(MarsError): diff --git a/mars/services/storage/handler.py b/mars/services/storage/handler.py index 631d616a6e..57a7c77e34 100644 --- a/mars/services/storage/handler.py +++ b/mars/services/storage/handler.py @@ -18,6 +18,7 @@ from typing import Any, Dict, List, Union from ... import oscar as mo +from ...oscar.errors import DataNotExist from ...storage import StorageLevel, get_storage_backend from ...storage.core import StorageFileObject from ...typing import BandType @@ -31,7 +32,7 @@ build_data_info, WrappedStorageFileObject, ) -from .errors import DataNotExist, NoDataToSpill +from .errors import NoDataToSpill cupy = lazy_import("cupy") cudf = lazy_import("cudf") diff --git a/mars/services/storage/tests/test_transfer.py b/mars/services/storage/tests/test_transfer.py index c3228fca16..c1ad786308 100644 --- a/mars/services/storage/tests/test_transfer.py +++ b/mars/services/storage/tests/test_transfer.py @@ -22,9 +22,9 @@ from .... import oscar as mo from ....oscar.backends.allocate_strategy import IdleLabel +from ....oscar.errors import DataNotExist from ....storage import StorageLevel from ..core import DataManagerActor, StorageManagerActor, StorageQuotaActor -from ..errors import DataNotExist from ..handler import StorageHandlerActor from ..transfer import ReceiverManagerActor, SenderManagerActor diff --git a/mars/services/task/core.py b/mars/services/task/core.py index efb65b2bb2..5f565ebe80 100644 --- a/mars/services/task/core.py +++ b/mars/services/task/core.py @@ -33,6 +33,7 @@ class TaskStatus(Enum): pending = 0 running = 1 terminated = 2 + errored = 3 class Task(Serializable): @@ -41,6 +42,7 @@ class Task(Serializable): tileable_graph: TileableGraph = ReferenceField("tileable_graph", TileableGraph) fuse_enabled: bool = BoolField("fuse_enabled") extra_config: dict = DictField("extra_config") + counter: object = AnyField("counter") def __init__( self, @@ -49,6 +51,7 @@ def __init__( tileable_graph: TileableGraph = None, fuse_enabled: bool = True, extra_config: dict = None, + counter: object = None, ): super().__init__( task_id=task_id, @@ -56,6 +59,7 @@ def __init__( tileable_graph=tileable_graph, fuse_enabled=fuse_enabled, extra_config=extra_config, + counter=counter, ) diff --git a/mars/services/task/execution/api.py b/mars/services/task/execution/api.py index 312c22a371..f0fb5a4041 100644 --- a/mars/services/task/execution/api.py +++ b/mars/services/task/execution/api.py @@ -215,6 +215,10 @@ async def set_subtask_result(self, subtask_result: SubtaskResult): def get_stage_processors(self): """Get stage processors.""" + @abstractmethod + def get_stage_generation_order(self, stage_id: str): + """Get stage generation order.""" + _name_to_task_executor_cls: Dict[str, Type[TaskExecutor]] = {} diff --git a/mars/services/task/execution/mars/executor.py b/mars/services/task/execution/mars/executor.py index 0a30cca3a8..16f513b6dc 100644 --- a/mars/services/task/execution/mars/executor.py +++ b/mars/services/task/execution/mars/executor.py @@ -15,6 +15,7 @@ import asyncio import logging import sys +import weakref from collections import defaultdict from typing import Dict, List, Optional, Set @@ -94,6 +95,7 @@ def __init__( self._stage_processors = [] self._stage_tile_progresses = [] + self._stage_id_to_processor = weakref.WeakValueDictionary() self._cur_stage_processor = None self._result_tileables_lifecycle = None self._subtask_decref_events = dict() @@ -192,6 +194,7 @@ async def execute_subtask_graph( await self._incref_stage(stage_processor) await self._resource_evaluator.evaluate(stage_processor) self._stage_processors.append(stage_processor) + self._stage_id_to_processor[stage_id] = stage_processor self._cur_stage_processor = stage_processor # get the tiled progress for current stage prev_progress = sum(self._stage_tile_progresses) @@ -250,50 +253,43 @@ async def cancel(self): await self._cur_stage_processor.cancel() async def set_subtask_result(self, subtask_result: SubtaskResult): - if self._cur_stage_processor is None or ( - subtask_result.stage_id - and self._cur_stage_processor.stage_id != subtask_result.stage_id - ): - logger.warning( - "Stage %s for subtask %s not exists, got stale subtask result %s which may be " - "speculative execution from previous stages, just ignore it.", - subtask_result.stage_id, - subtask_result.subtask_id, - subtask_result, - ) - return - stage_processor = self._cur_stage_processor + stage_processor = ( + self._cur_stage_processor + or self._stage_id_to_processor[subtask_result.stage_id] + ) subtask = stage_processor.subtask_id_to_subtask[subtask_result.subtask_id] prev_result = stage_processor.subtask_results.get(subtask) - if prev_result and ( - prev_result.status == SubtaskStatus.succeeded - or prev_result.progress > subtask_result.progress - ): - logger.info( - "Skip set subtask %s with result %s, previous result is %s.", - subtask.subtask_id, - subtask_result, - prev_result, + if prev_result and prev_result.bands and prev_result.bands[0] is not None: + logger.debug( + "Previous result of subtask %s is %s.", subtask.subtask_id, prev_result ) - # For duplicate run of subtasks, if the progress is smaller or the subtask has finished or canceled - # in task speculation, just do nothing. - # TODO(chaokunyang) If duplicate run of subtasks failed, it may be the fault in worker node, - # print the exception, and if multiple failures on the same node, remove the node from the cluster. - return + if ( + subtask_result.status == SubtaskStatus.cancelled + or prev_result.progress > subtask_result.progress + ): + logger.info( + "Skip set subtask %s with result %s, previous result is %s.", + subtask.subtask_id, + subtask_result, + prev_result, + ) + # For duplicate run of subtasks, if the progress is smaller or the subtask has finished or canceled + # in task speculation, just do nothing. + # TODO(chaokunyang) If duplicate run of subtasks failed, it may be the fault in worker node, + # print the exception, and if multiple failures on the same node, remove the node from the cluster. + return + if subtask_result.bands: [band] = subtask_result.bands else: band = None - stage_processor.subtask_snapshots[subtask] = subtask_result.update( - stage_processor.subtask_snapshots.get(subtask) - ) + + stage_processor.subtask_snapshots[subtask] = subtask_result if subtask_result.status.is_done: # update stage_processor.subtask_results to avoid concurrent set_subtask_result # since we release lock when `_decref_input_subtasks`. - stage_processor.subtask_results[subtask] = subtask_result.update( - stage_processor.subtask_results.get(subtask) - ) + stage_processor.subtask_results[subtask] = subtask_result try: # Since every worker will call supervisor to set subtask result, # we need to release actor lock to make `decref_chunks` parallel to avoid blocking @@ -323,6 +319,9 @@ async def set_subtask_result(self, subtask_result: SubtaskResult): def get_stage_processors(self): return self._stage_processors + def get_stage_generation_order(self, stage_id): + return self._stage_id_to_processor[stage_id].generation_order + async def _incref_fetch_tileables(self): # incref fetch tileables in tileable graph to prevent them from deleting to_incref_tileable_keys = [ diff --git a/mars/services/task/execution/mars/stage.py b/mars/services/task/execution/mars/stage.py index 6cced7300b..9d1f4f274f 100644 --- a/mars/services/task/execution/mars/stage.py +++ b/mars/services/task/execution/mars/stage.py @@ -13,6 +13,7 @@ # limitations under the License. import asyncio +import importlib import itertools import logging import time @@ -22,11 +23,15 @@ from ..... import oscar as mo from .....core import ChunkGraph, Chunk from .....core.operand import Fuse, Fetch +from .....oscar import ServerClosed from .....metrics import Metrics +from .....oscar.errors import DuplicatedSubtaskError, DataNotExist from .....utils import get_chunk_params from .....typing import BandType, TileableType +from ....context import FailOverContext from ....meta import MetaAPI, WorkerMetaAPI from ....scheduling import SchedulingAPI + from ....subtask import Subtask, SubtaskGraph, SubtaskResult, SubtaskStatus from ....task.core import Task, TaskResult, TaskStatus from ..api import ExecutionChunkResult @@ -50,6 +55,7 @@ def __init__( self.task = task self.chunk_graph = chunk_graph self.subtask_graph = subtask_graph + self.generation_order = next(self.task.counter) self._bands = bands self._tile_context = tile_context @@ -140,71 +146,146 @@ async def set_subtask_result(self, result: SubtaskResult, band: BandType = None) # update subtask_results in `TaskProcessorActor.set_subtask_result` self._submitted_subtask_ids.difference_update([result.subtask_id]) - all_done = len(self.subtask_results) == len(self.subtask_graph) - error_or_cancelled = result.status in ( - SubtaskStatus.errored, - SubtaskStatus.cancelled, + finished = len(self.subtask_results) == len(self.subtask_graph) and all( + r.status == SubtaskStatus.succeeded for r in self.subtask_results.values() + ) + errored = result.status == SubtaskStatus.errored + cancelled = result.status == SubtaskStatus.cancelled + + logger.debug( + "Setting subtask %s, finished: %s, errored: %s, cancelled: %s, " + "subtask_results len: %d, subtask_graph len: %d, " + "result status: %s, self.result.status: %s, " + "task id: %s, stage id: %s.", + subtask, + finished, + errored, + cancelled, + len(self.subtask_results), + len(self.subtask_graph), + result.status, + self.result.status, + self.task.task_id, + self.stage_id, ) - if all_done or error_or_cancelled: - # tell scheduling to finish subtasks + if finished and self.result.status != TaskStatus.terminated: + logger.info( + "Stage %s of task %s is finished.", self.stage_id, self.task.task_id + ) await self._scheduling_api.finish_subtasks( - [result.subtask_id], bands=[band], schedule_next=not error_or_cancelled + [result.subtask_id], bands=[band], schedule_next=False ) - if self.result.status != TaskStatus.terminated: - self.result = TaskResult( - self.task.task_id, - self.task.session_id, - self.stage_id, - start_time=self.result.start_time, - end_time=time.time(), - status=TaskStatus.terminated, - error=result.error, - traceback=result.traceback, + + # Note: Should trigger subtasks which dependent on the finished subtask + await self._try_trigger_subtasks(subtask) + + self.result = TaskResult( + self.task.task_id, + self.task.session_id, + self.stage_id, + start_time=self.result.start_time, + end_time=time.time(), + status=TaskStatus.terminated, + error=result.error, + traceback=result.traceback, + ) + self._schedule_done() + cost_time_secs = self.result.end_time - self.result.start_time + logger.info( + "Time consuming to execute a stage is %ss with " + "session id %s, task id %s, stage id %s", + cost_time_secs, + self.result.session_id, + self.result.task_id, + self.result.stage_id, + ) + self._stage_execution_time.record( + cost_time_secs, + { + "session_id": self.result.session_id, + "task_id": self.result.task_id, + "stage_id": self.result.stage_id, + }, + ) + elif errored: + self.result = TaskResult( + self.task.task_id, + self.task.session_id, + self.stage_id, + start_time=self.result.start_time, + end_time=time.time(), + status=TaskStatus.errored, + error=result.error, + traceback=result.traceback, + ) + logger.exception( + "Subtask %s errored", + subtask.subtask_id, + exc_info=( + type(result.error), + result.error, + result.traceback, + ), + ) + ret = await self._detect_error( + subtask, + result.error, + ( + ServerClosed, + DataNotExist, + ), + ) + if ret: + await self._scheduling_api.finish_subtasks( + [result.subtask_id], + bands=[band], + schedule_next=True, + ) + return + else: + await self._scheduling_api.finish_subtasks( + [result.subtask_id], + bands=[band], + schedule_next=False, ) - if not all_done and error_or_cancelled: - if result.status == SubtaskStatus.errored: - logger.exception( - "Subtask %s errored", - subtask.subtask_id, - exc_info=( - type(result.error), - result.error, - result.traceback, - ), - ) - if result.status == SubtaskStatus.cancelled: # pragma: no cover - logger.warning( - "Subtask %s from band %s canceled.", - subtask.subtask_id, - band, - ) - logger.info( - "Start to cancel stage %s of task %s.", self.stage_id, self.task - ) - # if error or cancel, cancel all submitted subtasks - await self._scheduling_api.cancel_subtasks( - list(self._submitted_subtask_ids) - ) - self._schedule_done() - cost_time_secs = self.result.end_time - self.result.start_time logger.info( - "Time consuming to execute a stage is %ss with " - "session id %s, task id %s, stage id %s", - cost_time_secs, - self.result.session_id, - self.result.task_id, - self.result.stage_id, + "Unable to recover data and start to " + "cancel stage %s of task %s.", + self.stage_id, + self.task, ) - self._stage_execution_time.record( - cost_time_secs, - { - "session_id": self.result.session_id, - "task_id": self.result.task_id, - "stage_id": self.result.stage_id, - }, + await self.cancel() + elif cancelled: + await self._scheduling_api.finish_subtasks( + [result.subtask_id], bands=[band], schedule_next=False + ) + logger.warning( + "Subtask %s from band %s cancelled.", + subtask.subtask_id, + band, + ) + if self.result.status != TaskStatus.terminated: + self.result.status = TaskStatus.terminated + if not self.result.error: + self.result.end_time = time.time() + self.result.error = result.error + self.result.traceback = result.traceback + + logger.info( + "Start to cancel stage %s of task %s.", + self.stage_id, + self.task, ) + await self.cancel() else: + await self._scheduling_api.finish_subtasks( + [result.subtask_id], bands=[band] + ) + logger.debug( + "Continue to schedule subtasks after subtask %s finished.", + subtask.subtask_id, + ) # not terminated, push success subtasks to queue if they are ready to_schedule_subtasks = [] for succ_subtask in self.subtask_graph.successors(subtask): @@ -215,12 +296,22 @@ async def set_subtask_result(self, result: SubtaskResult, band: BandType = None) pred_subtask in self.subtask_results for pred_subtask in pred_subtasks ): - # all predecessors finished to_schedule_subtasks.append(succ_subtask) - await self._schedule_subtasks(to_schedule_subtasks) - await self._scheduling_api.finish_subtasks( - [result.subtask_id], bands=[band] - ) + + for to_schedule_subtask in to_schedule_subtasks: + try: + logger.info( + "Start to schedule subtask %s, task id: %s, stage id: %s.", + to_schedule_subtask, + self.task.task_id, + self.stage_id, + ) + await self._schedule_subtasks([to_schedule_subtask]) + except KeyError: + logger.exception("Got KeyError.") + + if not to_schedule_subtasks: + await self._try_trigger_subtasks(subtask) async def run(self): try: @@ -254,10 +345,10 @@ async def _run(self): return await self._get_stage_result() async def cancel(self): - logger.info("Start to cancel stage %s of task %s.", self.stage_id, self.task) - if self._done.is_set(): # pragma: no cover + if self._done.is_set() or self._cancelled.is_set(): # pragma: no cover # already finished, ignore cancel return + logger.info("Start to cancel stage %s of task %s.", self.stage_id, self.task) self._cancelled.set() # cancel running subtasks await self._scheduling_api.cancel_subtasks(list(self._submitted_subtask_ids)) @@ -349,3 +440,164 @@ def _get_params_fields(cls, tileable: TileableType): for _ in tileable.chunks: params_fields.append(list(fields)) return params_fields + + async def _detect_error(self, subtask, error, expect_error_cls_tuple): + """ + Detect error and trigger error recovery. + For example: `ServerClosed`, `DataNotExist`. + Parameters + ---------- + subtask: subtask + error: subtask execution error + expect_error_cls_tuple: error class or error class tuple + + Returns + ------- + False if could not rerun subtask else True. + """ + if not FailOverContext.is_lineage_enabled(): + logger.info("Lineage of failover is not enabled.") + return False + + # Note: There are some error that do not need to be handled, + # like `DuplicatedSubtaskError`. + if isinstance(error, DuplicatedSubtaskError): + logger.info("Ignored error %s of subtask %s.", error, subtask.subtask_id) + return True + + if isinstance(error, expect_error_cls_tuple): + logger.info( + "%s detected of subtask %s and try to recover it.", + error, + subtask.subtask_id, + ) + try: + # 1. find dependency subtasks + dependency_subtasks = self.subtask_graph.predecessors(subtask) + dependency_subtasks_chunk_data_keys = set( + chunk.key + for s in dependency_subtasks + for chunk in s.chunk_graph.result_chunks + ) + subtask_chunk_data_keys = set( + chunk.key + for chunk in subtask.chunk_graph.iter_indep() + if isinstance(chunk.op, Fetch) + ) + diff_chunk_data_keys = ( + subtask_chunk_data_keys - dependency_subtasks_chunk_data_keys + ) + + dependency_subtask_graph_orders = [self.generation_order] * len( + dependency_subtasks + ) + + # Note: Could not import `TaskManagerActor` directly because of + # circular dependency, so use import_module. + task_manager_actor_cls = getattr( + importlib.import_module("mars.services.task.supervisor.manager"), + "TaskManagerActor", + ) + task_manager_ref = await mo.actor_ref( + self._scheduling_api.address, + task_manager_actor_cls.gen_uid(subtask.session_id), + ) + for chunk_data_key in diff_chunk_data_keys: + s = await task_manager_ref.get_subtask(chunk_data_key) + if not s.retryable: + logger.info( + "Subtask %s is not retryable, so cannot " + "recover subtask %s.", + s, + subtask, + ) + FailOverContext.cleanup() + return False + if s not in dependency_subtasks: + order = await task_manager_ref.get_generation_order( + s.task_id, s.stage_id + ) + dependency_subtasks.append(s) + dependency_subtask_graph_orders.append(order) + FailOverContext.subtask_to_dependency_subtasks[subtask].add(s) + + # 2. submit the subtask and it's dependency subtasks + if not dependency_subtasks: + logger.warning( + "No dependent subtasks to restore of subtask %s.", + subtask.subtask_id, + ) + FailOverContext.cleanup() + return False + priorities = [ + (pri,) + s.priority + for s, pri in zip( + dependency_subtasks, dependency_subtask_graph_orders + ) + ] + + logger.info( + "Rescheduling subtasks %s with priorities %s to restore " + "data, because subtask %s need them.", + dependency_subtasks, + priorities, + subtask, + ) + # Note: May add duplicated subtasks, so should catch exception + for s, p in zip(dependency_subtasks, priorities): + try: + await self._scheduling_api.add_subtasks([s], [p], True) + except DuplicatedSubtaskError: + logger.exception( + "Adding dependency subtask %s failed.", s.subtask_id + ) + return True + except: + FailOverContext.cleanup() + logger.exception("Error recovery failed.") + return False + else: + FailOverContext.cleanup() + logger.error("Could not to recover the error: %s", error) + return False + + async def _try_trigger_subtasks(self, subtask: Subtask): + to_schedule_subtasks = [] + if FailOverContext.subtask_to_dependency_subtasks: + logger.info( + "Finding subtasks to be scheduled in history set after subtask %s finished.", + subtask.subtask_id, + ) + for ( + succ_subtask, + pred_subtasks, + ) in FailOverContext.subtask_to_dependency_subtasks.items(): + try: + pred_subtasks.remove(subtask) + except KeyError: + pass + if not pred_subtasks: + to_schedule_subtasks.append(succ_subtask) + if to_schedule_subtasks: + logger.info( + "Found subtasks %s to be scheduled after subtask %s finished.", + to_schedule_subtasks, + subtask.subtask_id, + ) + for s in to_schedule_subtasks: + FailOverContext.subtask_to_dependency_subtasks.pop(s, None) + else: + logger.info("No subtasks found to be scheduled.") + for to_schedule_subtask in to_schedule_subtasks: + try: + logger.info( + "Start to schedule subtask %s, task id: %s, stage id: %s.", + to_schedule_subtask, + self.task.task_id, + self.stage_id, + ) + await self._scheduling_api.add_subtasks( + [to_schedule_subtask], [to_schedule_subtask.priority] + ) + except KeyError: + logger.exception("Got KeyError.") diff --git a/mars/services/task/execution/ray/executor.py b/mars/services/task/execution/ray/executor.py index ccb99264bf..17df54bf60 100644 --- a/mars/services/task/execution/ray/executor.py +++ b/mars/services/task/execution/ray/executor.py @@ -1074,3 +1074,8 @@ def gc(): last_check_slow_time = curr_time # Fast to next loop and give it a chance to update object_ref_to_subtask. await asyncio.sleep(interval_seconds if len(ready_objects) == 0 else 0) + + def get_stage_generation_order(self, stage_id: str): + raise NotImplementedError( + "RayTaskExecutor doesn't support stage generation order." + ) diff --git a/mars/services/task/supervisor/manager.py b/mars/services/task/supervisor/manager.py index 2a5eb6b2d4..38e004b863 100644 --- a/mars/services/task/supervisor/manager.py +++ b/mars/services/task/supervisor/manager.py @@ -15,6 +15,7 @@ import asyncio import contextlib import importlib +import itertools import logging import time import weakref @@ -87,6 +88,7 @@ def __init__(self, session_id: str): self._task_id_to_processor_ref = dict() self._result_tileable_key_to_info = defaultdict(list) + self._counter = itertools.count() async def __post_create__(self): # get config @@ -154,6 +156,7 @@ async def submit_tileable_graph( graph, fuse_enabled=fuse_enabled, extra_config=extra_config, + counter=self._counter, ) # gen task processor tiled_context = await self._gen_tiled_context(graph) @@ -377,3 +380,45 @@ async def remove_tileables(self, tileable_keys: List[str]): if not is_done ] self._result_tileable_key_to_info[key] = not_done_info + + def get_generation_order(self, task_id, stage_id): + """ + Gets generation order of a stage subtask graph in a task. + + Parameters + ---------- + task_id: task id + stage_id: stage id + + Returns + ------- + out: an integer + + Exceptions + ------- + May throw KeyError + + """ + return self._task_id_to_processor_ref[task_id].get_generation_order( + task_id, stage_id + ) + + async def get_subtask(self, chunk_data_key: str): + """ + Gets subtask by task id and chunk data key, i.e. the subtask who + generated the chunk data. + Parameters + ---------- + task_id: task id + chunk_data_key: key of a chunk data + + Returns + ------- + a subtask + """ + for task_id, processor_ref in self._task_id_to_processor_ref.items(): + subtask = await processor_ref.get_subtask(task_id, chunk_data_key) + if subtask is not None: + return subtask + + raise KeyError(f"Chunk data key {chunk_data_key} does not exist.") diff --git a/mars/services/task/supervisor/processor.py b/mars/services/task/supervisor/processor.py index 0ce692b2ce..a25fe0b9cf 100644 --- a/mars/services/task/supervisor/processor.py +++ b/mars/services/task/supervisor/processor.py @@ -29,7 +29,8 @@ ) from ....typing import TileableType, ChunkType from ....utils import Timer -from ...subtask import SubtaskResult, Subtask +from ...context import FailOverContext +from ...subtask import SubtaskResult, SubtaskGraph, Subtask from ..core import Task, TaskResult, TaskStatus, new_task_id from ..execution.api import TaskExecutor, ExecutionChunkResult from .preprocessor import TaskPreprocessor @@ -58,6 +59,10 @@ def __init__( self._tileable_id_to_tileable = dict() self._chunk_to_subtasks = dict() self._stage_tileables = set() + # Note: Record subtask lineage info for fault recovery, key is output + # chunk data of a `SubtaskGraph`, value is subtask that generates the + # chunk data. + self._chunk_data_key_to_subtask = dict() if MARS_ENABLE_PROFILING: ProfilingData.init(task.task_id) @@ -241,6 +246,8 @@ async def _process_stage_chunk_graph( }, ) + self._record_subtask_lineage_info(subtask_graph) + tile_context = await asyncio.to_thread( self._get_stage_tile_context, {c for c in chunk_graph.result_chunks if not isinstance(c.op, Fetch)}, @@ -261,6 +268,12 @@ async def _process_stage_chunk_graph( optimization_records = None self._update_stage_meta(chunk_to_result, tile_context, optimization_records) + def _record_subtask_lineage_info(self, subtask_graph: SubtaskGraph): + if FailOverContext.is_lineage_enabled(): + for subtask in subtask_graph.iter_indep(reverse=True): + for chunk_data in subtask.chunk_graph.result_chunks: + self._chunk_data_key_to_subtask[chunk_data.key] = subtask + def _get_stage_tile_context(self, result_chunks: Set[Chunk]) -> TileContext: collected = self._stage_tileables tile_context = TileContext() @@ -468,3 +481,9 @@ def _finish(self): def is_done(self) -> bool: return self.done.is_set() + + def get_generation_order(self, stage_id: str): + return self._executor.get_stage_generation_order(stage_id) + + def get_subtask(self, chunk_data_key: str): + return self._chunk_data_key_to_subtask.get(chunk_data_key) diff --git a/mars/services/task/supervisor/task.py b/mars/services/task/supervisor/task.py index c4bdf76e03..cb4c3125d5 100644 --- a/mars/services/task/supervisor/task.py +++ b/mars/services/task/supervisor/task.py @@ -416,9 +416,24 @@ async def set_subtask_result(self, subtask_result: SubtaskResult): ) if self._cur_processor is not None: yield self._cur_processor.set_subtask_result(subtask_result) + else: + logger.info( + "Could not set subtask %s result because current processor is " + "None, maybe the subtask is in the history task, so try to " + "use the history task processor.", + subtask_result.subtask_id, + ) + task_processor = self._task_id_to_processor[subtask_result.task_id] + yield task_processor.set_subtask_result(subtask_result) def is_done(self) -> bool: for processor in self._task_id_to_processor.values(): if not processor.is_done(): return False return True + + def get_generation_order(self, task_id, stage_id): + return self._task_id_to_processor[task_id].get_generation_order(stage_id) + + def get_subtask(self, task_id, chunk_data_key): + return self._task_id_to_processor[task_id].get_subtask(chunk_data_key) diff --git a/mars/services/task/supervisor/tests/task_preprocessor.py b/mars/services/task/supervisor/tests/task_preprocessor.py index 2567f5bb2a..e200170049 100644 --- a/mars/services/task/supervisor/tests/task_preprocessor.py +++ b/mars/services/task/supervisor/tests/task_preprocessor.py @@ -177,6 +177,7 @@ def analyze( self._config, chunk_to_subtasks, shuffle_fetch_type=shuffle_fetch_type, + stage_id=stage_id, ) subtask_graph = analyzer.gen_subtask_graph() results = set( diff --git a/mars/services/tests/fault_injection_manager.py b/mars/services/tests/fault_injection_manager.py index 06fa7fb310..88efcbce31 100644 --- a/mars/services/tests/fault_injection_manager.py +++ b/mars/services/tests/fault_injection_manager.py @@ -96,3 +96,29 @@ async def create(cls, session_id, supervisor_address): """ session_api = await SessionAPI.create(supervisor_address) await session_api.create_remote_object(session_id, cls.name, cls) + + +async def create_fault_injection_manager( + session_id, address, fault_count, fault_type, fault_op_types=None +): + class FaultInjectionManager(AbstractFaultInjectionManager): + def __init__(self): + self._fault_count = fault_count + + def set_fault_count(self, count): + self._fault_count = count + + def get_fault_count(self): + return self._fault_count + + def get_fault(self, pos: FaultPosition, ctx=None) -> FaultType: + # Check op types if fault_op_types provided. + if fault_op_types and type(ctx.get("operand")) not in fault_op_types: + return FaultType.NoFault + if self._fault_count.get(pos, 0) > 0: + self._fault_count[pos] -= 1 + return fault_type + return FaultType.NoFault + + await FaultInjectionManager.create(session_id, address) + return FaultInjectionManager.name diff --git a/mars/storage/errors.py b/mars/storage/errors.py deleted file mode 100644 index 7c0bc1bf72..0000000000 --- a/mars/storage/errors.py +++ /dev/null @@ -1,19 +0,0 @@ -# Copyright 1999-2021 Alibaba Group Holding Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ..core.base import MarsError - - -class DataNotExist(MarsError): - pass diff --git a/mars/storage/plasma.py b/mars/storage/plasma.py index a3ede1da6e..1d532b0ddb 100644 --- a/mars/storage/plasma.py +++ b/mars/storage/plasma.py @@ -23,12 +23,12 @@ import psutil import pyarrow as pa +from ..oscar.errors import DataNotExist from ..resource import virtual_memory from ..serialization import AioSerializer, AioDeserializer from ..utils import implements, dataslots, calc_size_by_str, lazy_import from .base import StorageBackend, StorageLevel, ObjectInfo, register_storage_backend from .core import BufferWrappedFileObject, StorageFileObject -from .errors import DataNotExist plasma = lazy_import("pyarrow.plasma", rename="plasma") if sys.platform.startswith("win"): diff --git a/mars/storage/ray.py b/mars/storage/ray.py index b39a839e8c..a427f86819 100644 --- a/mars/storage/ray.py +++ b/mars/storage/ray.py @@ -13,10 +13,12 @@ # limitations under the License. import inspect +import logging from typing import Any, Dict, List, Tuple from ..lib import sparse from ..oscar.debug import debug_async_timeout +from ..oscar.errors import DataNotExist from ..utils import ( lazy_import, implements, @@ -27,6 +29,7 @@ from .core import BufferWrappedFileObject, StorageFileObject ray = lazy_import("ray") +logger = logging.getLogger(__name__) # TODO(fyrestone): make the SparseMatrix pickleable. @@ -207,8 +210,15 @@ async def get(self, object_id, **kwargs) -> object: "Storage get object timeout, ObjectRef: %s", object_id, ): - with record_time_cost_percentile(self._storage_get_metrics): - return await object_id + try: + with record_time_cost_percentile(self._storage_get_metrics): + return await object_id + except ray.exceptions.ObjectLostError: + logger.exception("ray.get failed.") + raise DataNotExist(f"Object {object_id} is lost due to node failure.") + except Exception: + logger.exception("ray.get failed.") + raise @implements(StorageBackend.put) async def put(self, obj, importance=0) -> ObjectInfo: