Skip to content

Commit

Permalink
Push mapper data to reducers after execution
Browse files Browse the repository at this point in the history
  • Loading branch information
hekaisheng committed May 13, 2022
1 parent acecc9c commit ed5bd35
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 3 deletions.
2 changes: 1 addition & 1 deletion mars/dataframe/merge/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ def merge(
method: str = "auto",
auto_merge: str = "both",
auto_merge_threshold: int = 8,
bloom_filter: Union[bool, str] = "auto",
bloom_filter: Union[bool, str] = False,
bloom_filter_options: Dict[str, Any] = None,
) -> DataFrame:
"""
Expand Down
4 changes: 2 additions & 2 deletions mars/services/storage/api/oscar.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import sys
from typing import Any, List, Type, TypeVar, Union
from typing import Any, List, Tuple, Type, TypeVar, Union

from .... import oscar as mo
from ....lib.aio import alru_cache
Expand Down Expand Up @@ -163,7 +163,7 @@ async def batch_delete(self, args_list, kwargs_list):
@mo.extensible
async def fetch(
self,
data_key: str,
data_key: Union[str, Tuple],
level: StorageLevel = None,
band_name: str = None,
remote_address: str = None,
Expand Down
26 changes: 26 additions & 0 deletions mars/services/subtask/worker/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
Fetch,
FetchShuffle,
execute,
MapReduceOperand,
OperandStage,
)
from ....metrics import Metrics
from ....optimization.physical import optimize
Expand Down Expand Up @@ -424,6 +426,28 @@ async def set_chunks_meta():
# set result data size
self.result.data_size = result_data_size

async def push_mapper_data(self, chunk_graph):
# TODO: use task api to get reducer bands
reducer_idx_to_band = dict()
if not reducer_idx_to_band:
return
storage_api_to_fetch_tasks = defaultdict(list)
for result_chunk in chunk_graph.result_chunks:
key = result_chunk.key
reducer_idx = key[1]
if isinstance(key, tuple):
# mapper key is a tuple
address, band_name = reducer_idx_to_band[reducer_idx]
storage_api = StorageAPI(address, self._session_id, band_name)
fetch_task = storage_api.fetch.delay(
key, band_name=self._band[1], remote_address=self._band[0]
)
storage_api_to_fetch_tasks[storage_api].append(fetch_task)
batch_tasks = []
for storage_api, tasks in storage_api_to_fetch_tasks.items():
batch_tasks.append(asyncio.create_task(storage_api.fetch.batch(*tasks)))
await asyncio.gather(*batch_tasks)

async def done(self):
if self.result.status == SubtaskStatus.running:
self.result.status = SubtaskStatus.succeeded
Expand Down Expand Up @@ -495,6 +519,8 @@ async def run(self):
await self._unpin_data(input_keys)

await self.done()
# after done, we push mapper data to reducers in advance.
await self.push_mapper_data(chunk_graph)
if self.result.status == SubtaskStatus.succeeded:
cost_time_secs = (
self.result.execution_end_time - self.result.execution_start_time
Expand Down

0 comments on commit ed5bd35

Please sign in to comment.