diff --git a/mars/services/subtask/worker/processor.py b/mars/services/subtask/worker/processor.py index db0e99f168..8ee04e80ed 100644 --- a/mars/services/subtask/worker/processor.py +++ b/mars/services/subtask/worker/processor.py @@ -17,12 +17,17 @@ import sys import time from collections import defaultdict -from typing import Any, Dict, List, Optional, Set, Type +from typing import Any, Dict, List, Optional, Set, Type, Tuple from .... import oscar as mo from ....core import ChunkGraph, OperandType, enter_mode, ExecutionError from ....core.context import get_context, set_context -from ....core.operand import Fetch, FetchShuffle, execute +from ....core.operand import ( + Fetch, + FetchShuffle, + execute, +) +from ....lib.aio import alru_cache from ....metrics import Metrics from ....optimization.physical import optimize from ....typing import BandType, ChunkType @@ -420,26 +425,56 @@ async def set_chunks_meta(): # set result data size self.result.data_size = result_data_size - async def _push_mapper_data(self, chunk_graph): - # TODO: use task api to get reducer bands - reducer_idx_to_band = dict() - if not reducer_idx_to_band: - return + @classmethod + @alru_cache(cache_exceptions=False) + async def _gen_reducer_index_to_bands( + cls, session_id: str, supervisor_address: str, task_id: str, map_reduce_id: int + ) -> Dict[Tuple[int], BandType]: + task_api = await TaskAPI.create(session_id, supervisor_address) + map_reduce_info = await task_api.get_map_reduce_info(task_id, map_reduce_id) + assert len(map_reduce_info.reducer_indexes) == len( + map_reduce_info.reducer_bands + ) + return { + reducer_index: band + for reducer_index, band in zip( + map_reduce_info.reducer_indexes, map_reduce_info.reducer_bands + ) + } + + async def _push_mapper_data(self): storage_api_to_fetch_tasks = defaultdict(list) - for result_chunk in chunk_graph.result_chunks: - key = result_chunk.key - reducer_idx = key[1] - if isinstance(key, tuple): + skip = True + for result_chunk in self._chunk_graph.result_chunks: + map_reduce_id = getattr(result_chunk.op, "extra_params", dict()).get( + "analyzer_map_reduce_id" + ) + if map_reduce_id is None: + continue + skip = False + reducer_index_to_bands = await self._gen_reducer_index_to_bands( + self._session_id, + self._supervisor_address, + self.subtask.task_id, + map_reduce_id, + ) + for reducer_index, band in reducer_index_to_bands.items(): # mapper key is a tuple - address, band_name = reducer_idx_to_band[reducer_idx] - storage_api = StorageAPI(address, self._session_id, band_name) + address, band_name = band + storage_api = await StorageAPI.create( + self._session_id, address, band_name + ) fetch_task = storage_api.fetch.delay( - key, band_name=self._band[1], remote_address=self._band[0] + (result_chunk.key, reducer_index), + band_name=self._band[1], + remote_address=self._band[0], ) storage_api_to_fetch_tasks[storage_api].append(fetch_task) + if skip: + return batch_tasks = [] for storage_api, tasks in storage_api_to_fetch_tasks.items(): - batch_tasks.append(asyncio.create_task(storage_api.fetch.batch(*tasks))) + batch_tasks.append(storage_api.fetch.batch(*tasks)) await asyncio.gather(*batch_tasks) async def done(self): @@ -513,8 +548,6 @@ async def run(self): await self._unpin_data(input_keys) await self.done() - # after done, we push mapper data to reducers in advance. - await self.ref()._push_mapper_data.tell(chunk_graph) if self.result.status == SubtaskStatus.succeeded: cost_time_secs = ( self.result.execution_end_time - self.result.execution_start_time @@ -536,6 +569,9 @@ async def run(self): pass return self.result + async def post_run(self): + await self._push_mapper_data() + async def report_progress_periodically(self, interval=0.5, eps=0.001): last_progress = self.result.progress while not self.result.status.is_done: @@ -618,7 +654,7 @@ async def _init_context(self, session_id: str): await context.init() set_context(context) - async def run(self, subtask: Subtask): + async def run(self, subtask: Subtask, wait_post_run: bool = False): logger.info( "Start to run subtask: %r on %s. chunk graph contains %s", subtask, @@ -644,10 +680,18 @@ async def run(self, subtask: Subtask): try: result = yield self._running_aio_task logger.info("Finished subtask: %s", subtask.subtask_id) + # post run with actor tell which will not block + if not wait_post_run: + await self.ref().post_run.tell(processor) + else: + await self.post_run(processor) raise mo.Return(result) finally: self._processor = self._running_aio_task = None + async def post_run(self, processor: SubtaskProcessor): + await processor.post_run() + async def wait(self): return self._processor.is_done.wait() diff --git a/mars/services/subtask/worker/runner.py b/mars/services/subtask/worker/runner.py index dbe7ac7236..15cd46fab3 100644 --- a/mars/services/subtask/worker/runner.py +++ b/mars/services/subtask/worker/runner.py @@ -89,7 +89,7 @@ async def _get_supervisor_address(self, session_id: str): [address] = await self._cluster_api.get_supervisors_by_keys([session_id]) return address - async def run_subtask(self, subtask: Subtask): + async def run_subtask(self, subtask: Subtask, wait_post_run: bool = False): if self._running_processor is not None: # pragma: no cover running_subtask_id = await self._running_processor.get_running_subtask_id() # current subtask is still running @@ -122,7 +122,9 @@ async def run_subtask(self, subtask: Subtask): processor = self._session_id_to_processors[session_id] try: self._running_processor = self._last_processor = processor - result = yield self._running_processor.run(subtask) + result = yield self._running_processor.run( + subtask, wait_post_run=wait_post_run + ) finally: self._running_processor = None raise mo.Return(result) diff --git a/mars/services/subtask/worker/tests/test_subtask.py b/mars/services/subtask/worker/tests/test_subtask.py index 0302ceff1c..067f44dc7d 100644 --- a/mars/services/subtask/worker/tests/test_subtask.py +++ b/mars/services/subtask/worker/tests/test_subtask.py @@ -18,14 +18,17 @@ import time import numpy as np +import pandas as pd import pytest from ..... import oscar as mo +from ..... import dataframe as md from ..... import tensor as mt from ..... import remote as mr -from .....core import ExecutionError +from .....core import ExecutionError, ChunkGraph from .....core.context import get_context from .....core.graph import TileableGraph, TileableGraphBuilder, ChunkGraphBuilder +from .....core.operand import OperandStage from .....resource import Resource from .....utils import Timer from ....cluster import MockClusterAPI @@ -34,7 +37,7 @@ from ....scheduling import MockSchedulingAPI from ....session import MockSessionAPI from ....storage import MockStorageAPI -from ....task import new_task_id +from ....task import new_task_id, MapReduceInfo from ....task.supervisor.manager import TaskManagerActor, TaskConfigurationActor from ....mutable import MockMutableAPI from ... import Subtask, SubtaskStatus, SubtaskResult @@ -46,6 +49,13 @@ class FakeTaskManager(TaskManagerActor): def set_subtask_result(self, subtask_result: SubtaskResult): return + def get_map_reduce_info(self, task_id: str, map_reduce_id: int) -> MapReduceInfo: + return MapReduceInfo( + map_reduce_id=0, + reducer_indexes=[(0, 0)], + reducer_bands=[(self.address, "numa-0")], + ) + @pytest.fixture async def actor_pool(): @@ -142,6 +152,39 @@ async def test_subtask_success(actor_pool): assert await subtask_runner.is_runner_free() is True +@pytest.mark.asyncio +async def test_shuffle_subtask(actor_pool): + pool, session_id, meta_api, storage_api, manager = actor_pool + + pdf = pd.DataFrame({"f1": ["a", "b", "a"], "f2": [1, 2, 3]}) + df = md.DataFrame(pdf) + result = df.groupby("f1").sum(method="shuffle") + + graph = TileableGraph([result.data]) + next(TileableGraphBuilder(graph).build()) + chunk_graph = next(ChunkGraphBuilder(graph, fuse_enabled=False).build()) + result_chunks = [] + new_chunk_graph = ChunkGraph(result_chunks) + chunk_graph_iter = chunk_graph.topological_iter() + curr = None + for _ in range(3): + prev = curr + curr = next(chunk_graph_iter) + new_chunk_graph.add_node(curr) + if prev is not None: + new_chunk_graph.add_edge(prev, curr) + assert curr.op.stage == OperandStage.map + curr.op.extra_params = {"analyzer_map_reduce_id": 0} + result_chunks.append(curr) + subtask = Subtask(new_task_id(), session_id, new_task_id(), new_chunk_graph) + subtask_runner: SubtaskRunnerRef = await mo.actor_ref( + SubtaskRunnerActor.gen_uid("numa-0", 0), address=pool.external_address + ) + await subtask_runner.run_subtask(subtask, wait_post_run=True) + result = await subtask_runner.get_subtask_result() + assert result.status == SubtaskStatus.succeeded + + @pytest.mark.asyncio async def test_subtask_failure(actor_pool): pool, session_id, meta_api, storage_api, manager = actor_pool diff --git a/mars/services/task/__init__.py b/mars/services/task/__init__.py index 6851dbfd56..1b5656cf34 100644 --- a/mars/services/task/__init__.py +++ b/mars/services/task/__init__.py @@ -14,5 +14,5 @@ from .api import AbstractTaskAPI, TaskAPI, WebTaskAPI from .config import task_options -from .core import Task, TaskStatus, TaskResult, new_task_id +from .core import Task, TaskStatus, TaskResult, new_task_id, MapReduceInfo from .errors import TaskNotExist diff --git a/mars/services/task/analyzer/analyzer.py b/mars/services/task/analyzer/analyzer.py index 86e95840ae..4a63b961ba 100644 --- a/mars/services/task/analyzer/analyzer.py +++ b/mars/services/task/analyzer/analyzer.py @@ -25,12 +25,14 @@ LogicKeyGenerator, MapReduceOperand, OperandStage, + ShuffleProxy, ) +from ....lib.ordered_set import OrderedSet from ....resource import Resource from ....typing import BandType, OperandType from ....utils import build_fetch, tokenize from ...subtask import SubtaskGraph, Subtask -from ..core import Task, new_task_id +from ..core import Task, new_task_id, MapReduceInfo from .assigner import AbstractGraphAssigner, GraphAssigner from .fusion import Coloring @@ -50,6 +52,8 @@ def need_reassign_worker(op: OperandType) -> bool: class GraphAnalyzer: + _map_reduce_id = itertools.count() + def __init__( self, chunk_graph: ChunkGraph, @@ -59,6 +63,7 @@ def __init__( chunk_to_subtasks: Dict[ChunkType, Subtask], graph_assigner_cls: Type[AbstractGraphAssigner] = None, stage_id: str = None, + map_reduce_id_to_infos: Dict[int, MapReduceInfo] = None, ): self._chunk_graph = chunk_graph self._band_resource = band_resource @@ -68,12 +73,17 @@ def __init__( self._fuse_enabled = task.fuse_enabled self._extra_config = task.extra_config self._chunk_to_subtasks = chunk_to_subtasks + self._map_reduce_id_to_infos = map_reduce_id_to_infos if graph_assigner_cls is None: graph_assigner_cls = GraphAssigner self._graph_assigner_cls = graph_assigner_cls self._chunk_to_copied = dict() self._logic_key_generator = LogicKeyGenerator() + @classmethod + def next_map_reduce_id(cls) -> int: + return next(cls._map_reduce_id) + @classmethod def _iter_start_ops(cls, chunk_graph: ChunkGraph): visited = set() @@ -300,6 +310,38 @@ def _gen_logic_key(self, chunks: List[ChunkType]): *[self._logic_key_generator.get_logic_key(chunk.op) for chunk in chunks] ) + def _gen_map_reduce_info( + self, chunk: ChunkType, assign_results: Dict[ChunkType, BandType] + ): + reducer_ops = OrderedSet( + [ + c.op + for c in self._chunk_graph.successors(chunk) + if c.op.stage == OperandStage.reduce + ] + ) + map_chunks = [ + c + for c in self._chunk_graph.predecessors(chunk) + if c.op.stage == OperandStage.map + ] + map_reduce_id = self.next_map_reduce_id() + for map_chunk in map_chunks: + # record analyzer map reduce id for mapper op + # copied chunk exists because map chunk must have + # been processed before shuffle proxy + copied_map_chunk_op = self._chunk_to_copied[map_chunk].op + if not hasattr(copied_map_chunk_op, "extra_params"): + copied_map_chunk_op.extra_params = dict() + copied_map_chunk_op.extra_params["analyzer_map_reduce_id"] = map_reduce_id + reducer_bands = [assign_results[r.outputs[0]] for r in reducer_ops] + map_reduce_info = MapReduceInfo( + map_reduce_id=map_reduce_id, + reducer_indexes=[reducer_op.reducer_index for reducer_op in reducer_ops], + reducer_bands=reducer_bands, + ) + self._map_reduce_id_to_infos[map_reduce_id] = map_reduce_info + @enter_mode(build=True) def gen_subtask_graph( self, op_to_bands: Dict[str, BandType] = None @@ -420,6 +462,10 @@ def gen_subtask_graph( for c in same_color_chunks: chunk_to_subtask[c] = subtask + if self._map_reduce_id_to_infos is not None and isinstance( + chunk.op, ShuffleProxy + ): + self._gen_map_reduce_info(chunk, chunk_to_bands) visited.update(same_color_chunks) for subtasks in logic_key_to_subtasks.values(): diff --git a/mars/services/task/api/oscar.py b/mars/services/task/api/oscar.py index 8127730f59..6505d7f8d1 100644 --- a/mars/services/task/api/oscar.py +++ b/mars/services/task/api/oscar.py @@ -18,7 +18,7 @@ from ....core import Tileable from ....lib.aio import alru_cache from ...subtask import SubtaskResult -from ..core import TileableGraph, TaskResult +from ..core import TileableGraph, TaskResult, MapReduceInfo from ..supervisor.manager import TaskManagerActor from .core import AbstractTaskAPI @@ -104,3 +104,8 @@ async def set_subtask_result(self, subtask_result: SubtaskResult): async def get_last_idle_time(self) -> Union[float, None]: return await self._task_manager_ref.get_last_idle_time() + + async def get_map_reduce_info( + self, task_id: str, map_reduce_id: int + ) -> MapReduceInfo: + return await self._task_manager_ref.get_map_reduce_info(task_id, map_reduce_id) diff --git a/mars/services/task/core.py b/mars/services/task/core.py index 13353c6aa3..bc4a76a0ca 100644 --- a/mars/services/task/core.py +++ b/mars/services/task/core.py @@ -15,17 +15,20 @@ import random from enum import Enum from string import ascii_letters, digits -from typing import Any, Optional, Dict +from typing import Any, Optional, Dict, List, Tuple from ...core import TileableGraph +from ...typing import BandType from ...serialization.serializables import ( Serializable, + FieldTypes, StringField, ReferenceField, Int32Field, BoolField, AnyField, DictField, + ListField, Float64Field, ) @@ -110,3 +113,17 @@ def __init__( def new_task_id(): return "".join(random.choice(ascii_letters + digits) for _ in range(24)) + + +class MapReduceInfo(Serializable): + # record map reduce info during analyzing + # record reducer indexes, and assigned bands + map_reduce_id: int = Int32Field("map_reduce_id") + reducer_indexes: List[Tuple[int]] = ListField( + "reducer_indexes", FieldTypes.tuple(FieldTypes.int64), default_factory=list + ) + reducer_bands: List[BandType] = ListField( + "reducer_bands", + FieldTypes.tuple(FieldTypes.string, FieldTypes.string), + default_factory=list, + ) diff --git a/mars/services/task/supervisor/manager.py b/mars/services/task/supervisor/manager.py index d81ec5854f..416cdc3a68 100644 --- a/mars/services/task/supervisor/manager.py +++ b/mars/services/task/supervisor/manager.py @@ -25,7 +25,7 @@ from ....core.operand import Fetch from ...subtask import SubtaskResult, SubtaskGraph from ..config import task_options -from ..core import Task, new_task_id, TaskStatus +from ..core import Task, new_task_id, TaskStatus, MapReduceInfo from ..errors import TaskNotExist from .preprocessor import TaskPreprocessor from .processor import TaskProcessor @@ -326,3 +326,13 @@ async def get_last_idle_time(self): else: self._last_idle_time = time.time() return self._last_idle_time + + async def get_map_reduce_info( + self, task_id: str, map_reduce_id: int + ) -> MapReduceInfo: + try: + processor_ref = self._task_id_to_processor_ref[task_id] + except KeyError: # pragma: no cover + raise TaskNotExist(f"Task {task_id} does not exist") + + return await processor_ref.get_map_reduce_info(map_reduce_id) diff --git a/mars/services/task/supervisor/preprocessor.py b/mars/services/task/supervisor/preprocessor.py index e64b7da6aa..1d7f00ad6b 100644 --- a/mars/services/task/supervisor/preprocessor.py +++ b/mars/services/task/supervisor/preprocessor.py @@ -26,7 +26,7 @@ from ....typing import BandType, TileableType, ChunkType from ...subtask import Subtask, SubtaskGraph from ..analyzer import GraphAnalyzer -from ..core import Task +from ..core import Task, MapReduceInfo logger = logging.getLogger(__name__) @@ -112,9 +112,11 @@ class TaskPreprocessor: "chunk_optimization_records_list", "_cancelled", "_done", + "map_reduce_id_to_infos", ) tile_context: TileContext + map_reduce_id_to_infos: Dict[int, MapReduceInfo] def __init__( self, @@ -129,6 +131,7 @@ def __init__( self.tile_context = tiled_context self.tileable_optimization_records = None self.chunk_optimization_records_list = [] + self.map_reduce_id_to_infos = dict() self._cancelled = asyncio.Event() self._done = asyncio.Event() @@ -221,6 +224,7 @@ def analyze( self._config, chunk_to_subtasks, stage_id=stage_id, + map_reduce_id_to_infos=self.map_reduce_id_to_infos, ) graph = analyzer.gen_subtask_graph(op_to_bands) logger.debug( @@ -248,5 +252,8 @@ def get_tiled(self, tileable: TileableType): tileable = tileable.data if hasattr(tileable, "data") else tileable return self.tile_context[tileable] + def get_map_reduce_info(self, map_reduce_id: int) -> MapReduceInfo: + return self.map_reduce_id_to_infos[map_reduce_id] + def __await__(self): return self._done.wait().__await__() diff --git a/mars/services/task/supervisor/processor.py b/mars/services/task/supervisor/processor.py index cfe239490d..c4cf78b743 100644 --- a/mars/services/task/supervisor/processor.py +++ b/mars/services/task/supervisor/processor.py @@ -30,7 +30,7 @@ from ....typing import TileableType, ChunkType from ....utils import Timer from ...subtask import SubtaskResult, Subtask -from ..core import Task, TaskResult, TaskStatus, new_task_id +from ..core import Task, TaskResult, TaskStatus, new_task_id, MapReduceInfo from ..execution.api import TaskExecutor, ExecutionChunkResult from .preprocessor import TaskPreprocessor @@ -407,6 +407,9 @@ def _gen_result(self): {"session_id": self._task.session_id, "task_id": self._task.task_id}, ) + def get_map_reduce_info(self, map_reduce_id: int) -> MapReduceInfo: + return self._preprocessor.get_map_reduce_info(map_reduce_id) + def dump_subtask_graph(self): from .graph_visualizer import GraphVisualizer diff --git a/mars/services/task/supervisor/task.py b/mars/services/task/supervisor/task.py index 722f9fe50d..516fecc06b 100644 --- a/mars/services/task/supervisor/task.py +++ b/mars/services/task/supervisor/task.py @@ -25,7 +25,7 @@ from ....typing import TileableType from ....utils import build_fetch from ...subtask import SubtaskResult, SubtaskStatus, SubtaskGraph -from ..core import Task, TaskStatus +from ..core import Task, TaskStatus, MapReduceInfo from ..execution.api import TaskExecutor from .preprocessor import TaskPreprocessor from .processor import TaskProcessor @@ -417,6 +417,10 @@ async def set_subtask_result(self, subtask_result: SubtaskResult): if self._cur_processor is not None: await self._cur_processor.set_subtask_result(subtask_result) + async def get_map_reduce_info(self, map_reduce_id: int) -> MapReduceInfo: + for processor in self._task_id_to_processor.values(): + return processor.get_map_reduce_info(map_reduce_id) + def is_done(self) -> bool: for processor in self._task_id_to_processor.values(): if not processor.is_done(): diff --git a/mars/services/task/supervisor/tests/test_task_manager.py b/mars/services/task/supervisor/tests/test_task_manager.py index 749e381a8d..51735333c1 100644 --- a/mars/services/task/supervisor/tests/test_task_manager.py +++ b/mars/services/task/supervisor/tests/test_task_manager.py @@ -499,6 +499,25 @@ async def test_shuffle(actor_pool): ) np.testing.assert_array_equal(result, expect) + # test generating map reduce info + subtask_graphs = (await manager.get_subtask_graphs(task_id))[0] + map_reduce_ids = [] + for subtask in subtask_graphs: + for chunk in subtask.chunk_graph.result_chunks: + map_reduce_id = getattr(chunk.op, "extra_params", dict()).get( + "analyzer_map_reduce_id" + ) + if map_reduce_id is not None: + map_reduce_ids.append(map_reduce_id) + assert len(map_reduce_ids) > 0 + map_reduce_info = await manager.get_map_reduce_info(task_id, map_reduce_ids[0]) + assert ( + len(set(map_reduce_info.reducer_indexes)) + == len(map_reduce_info.reducer_indexes) + == len(map_reduce_info.reducer_bands) + > 0 + ) + # test ref counts assert (await lifecycle_api.get_tileable_ref_counts([c.key]))[0] == 1 assert ( diff --git a/setup.cfg b/setup.cfg index 00d405dda5..d08b1e0bb0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -61,7 +61,7 @@ dev = pytest-cov>=2.5.0 pytest-timeout>=1.2.0 pytest-forked>=1.0 - pytest-asyncio>=0.14.0 + pytest-asyncio>=0.17.0 mock>=4.0.0; python_version<"3.8" flake8>=3.8.0 black