From ed5bd35ee2a4d0d37593d279b7a22826bf5b1402 Mon Sep 17 00:00:00 2001 From: hekaisheng Date: Fri, 13 May 2022 17:29:00 +0800 Subject: [PATCH] Push mapper data to reducers after execution --- mars/dataframe/merge/merge.py | 2 +- mars/services/storage/api/oscar.py | 4 ++-- mars/services/subtask/worker/processor.py | 26 +++++++++++++++++++++++ 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/mars/dataframe/merge/merge.py b/mars/dataframe/merge/merge.py index dfe24698af..3973bf44f5 100644 --- a/mars/dataframe/merge/merge.py +++ b/mars/dataframe/merge/merge.py @@ -808,7 +808,7 @@ def merge( method: str = "auto", auto_merge: str = "both", auto_merge_threshold: int = 8, - bloom_filter: Union[bool, str] = "auto", + bloom_filter: Union[bool, str] = False, bloom_filter_options: Dict[str, Any] = None, ) -> DataFrame: """ diff --git a/mars/services/storage/api/oscar.py b/mars/services/storage/api/oscar.py index 8ca077e43a..8590589724 100644 --- a/mars/services/storage/api/oscar.py +++ b/mars/services/storage/api/oscar.py @@ -13,7 +13,7 @@ # limitations under the License. import sys -from typing import Any, List, Type, TypeVar, Union +from typing import Any, List, Tuple, Type, TypeVar, Union from .... import oscar as mo from ....lib.aio import alru_cache @@ -163,7 +163,7 @@ async def batch_delete(self, args_list, kwargs_list): @mo.extensible async def fetch( self, - data_key: str, + data_key: Union[str, Tuple], level: StorageLevel = None, band_name: str = None, remote_address: str = None, diff --git a/mars/services/subtask/worker/processor.py b/mars/services/subtask/worker/processor.py index d69b0aecd6..eeb1ff2cf7 100644 --- a/mars/services/subtask/worker/processor.py +++ b/mars/services/subtask/worker/processor.py @@ -26,6 +26,8 @@ Fetch, FetchShuffle, execute, + MapReduceOperand, + OperandStage, ) from ....metrics import Metrics from ....optimization.physical import optimize @@ -424,6 +426,28 @@ async def set_chunks_meta(): # set result data size self.result.data_size = result_data_size + async def push_mapper_data(self, chunk_graph): + # TODO: use task api to get reducer bands + reducer_idx_to_band = dict() + if not reducer_idx_to_band: + return + storage_api_to_fetch_tasks = defaultdict(list) + for result_chunk in chunk_graph.result_chunks: + key = result_chunk.key + reducer_idx = key[1] + if isinstance(key, tuple): + # mapper key is a tuple + address, band_name = reducer_idx_to_band[reducer_idx] + storage_api = StorageAPI(address, self._session_id, band_name) + fetch_task = storage_api.fetch.delay( + key, band_name=self._band[1], remote_address=self._band[0] + ) + storage_api_to_fetch_tasks[storage_api].append(fetch_task) + batch_tasks = [] + for storage_api, tasks in storage_api_to_fetch_tasks.items(): + batch_tasks.append(asyncio.create_task(storage_api.fetch.batch(*tasks))) + await asyncio.gather(*batch_tasks) + async def done(self): if self.result.status == SubtaskStatus.running: self.result.status = SubtaskStatus.succeeded @@ -495,6 +519,8 @@ async def run(self): await self._unpin_data(input_keys) await self.done() + # after done, we push mapper data to reducers in advance. + await self.push_mapper_data(chunk_graph) if self.result.status == SubtaskStatus.succeeded: cost_time_secs = ( self.result.execution_end_time - self.result.execution_start_time