diff --git a/mars/dataframe/groupby/aggregation.py b/mars/dataframe/groupby/aggregation.py index 31018729dc..f9ee600ed7 100644 --- a/mars/dataframe/groupby/aggregation.py +++ b/mars/dataframe/groupby/aggregation.py @@ -25,7 +25,7 @@ from ... import opcodes as OperandDef from ...config import options from ...core.custom_log import redirect_custom_log -from ...core import ENTITY_TYPE, OutputType +from ...core import ENTITY_TYPE, OutputType, recursive_tile from ...core.context import get_context from ...core.operand import OperandStage from ...serialization.serializables import ( @@ -64,6 +64,8 @@ _support_get_group_without_as_index = pd_release_version[:2] > (1, 0) +_FUNCS_PREFER_SHUFFLE = {"nunique"} + class SizeRecorder: def __init__(self): @@ -163,6 +165,8 @@ class DataFrameGroupByAgg(DataFrameOperand, DataFrameOperandMixin): method = StringField("method") use_inf_as_na = BoolField("use_inf_as_na") + map_on_shuffle = AnyField("map_on_shuffle") + # for chunk combine_size = Int32Field("combine_size") chunk_store_limit = Int64Field("chunk_store_limit") @@ -421,10 +425,29 @@ def _tile_with_shuffle( in_df: TileableType, out_df: TileableType, func_infos: ReductionSteps, + agg_chunks: List[ChunkType] = None, ): - # First, perform groupby and aggregation on each chunk. - agg_chunks = cls._gen_map_chunks(op, in_df.chunks, out_df, func_infos) - return cls._perform_shuffle(op, agg_chunks, in_df, out_df, func_infos) + if op.map_on_shuffle is None: + op.map_on_shuffle = all( + agg_fun.custom_reduction is None for agg_fun in func_infos.agg_funcs + ) + + if not op.map_on_shuffle: + groupby_params = op.groupby_params.copy() + selection = groupby_params.pop("selection", None) + groupby = in_df.groupby(**groupby_params) + if selection: + groupby = groupby[selection] + result = groupby.transform( + op.raw_func, _call_agg=True, index=out_df.index_value + ) + return (yield from recursive_tile(result)) + else: + # First, perform groupby and aggregation on each chunk. + agg_chunks = agg_chunks or cls._gen_map_chunks( + op, in_df.chunks, out_df, func_infos + ) + return cls._perform_shuffle(op, agg_chunks, in_df, out_df, func_infos) @classmethod def _perform_shuffle( @@ -624,8 +647,10 @@ def _tile_auto( else: # otherwise, use shuffle logger.debug("Choose shuffle method for groupby operand %s", op) - return cls._perform_shuffle( - op, chunks + left_chunks, in_df, out_df, func_infos + return ( + yield from cls._tile_with_shuffle( + op, in_df, out_df, func_infos, chunks + left_chunks + ) ) @classmethod @@ -638,12 +663,16 @@ def tile(cls, op: "DataFrameGroupByAgg"): func_infos = cls._compile_funcs(op, in_df) if op.method == "auto": - if len(in_df.chunks) <= op.combine_size: + if set(op.func) & _FUNCS_PREFER_SHUFFLE: + return ( + yield from cls._tile_with_shuffle(op, in_df, out_df, func_infos) + ) + elif len(in_df.chunks) <= op.combine_size: return cls._tile_with_tree(op, in_df, out_df, func_infos) else: return (yield from cls._tile_auto(op, in_df, out_df, func_infos)) if op.method == "shuffle": - return cls._tile_with_shuffle(op, in_df, out_df, func_infos) + return (yield from cls._tile_with_shuffle(op, in_df, out_df, func_infos)) elif op.method == "tree": return cls._tile_with_tree(op, in_df, out_df, func_infos) else: # pragma: no cover @@ -1075,7 +1104,15 @@ def execute(cls, ctx, op: "DataFrameGroupByAgg"): pd.reset_option("mode.use_inf_as_na") -def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs): +def agg( + groupby, + func=None, + method="auto", + combine_size=None, + map_on_shuffle=None, + *args, + **kwargs, +): """ Aggregate using one or more operations on grouped data. @@ -1091,7 +1128,11 @@ def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs): in distributed mode and use 'tree' in local mode. combine_size : int The number of chunks to combine when method is 'tree' - + map_on_shuffle : bool + When not specified, will decide whether to perform aggregation on the + map stage of shuffle (currently no aggregation when there is custom + reduction in functions). Otherwise, whether to call map on map stage + of shuffle is determined by the value. Returns ------- @@ -1138,5 +1179,6 @@ def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs): combine_size=combine_size or options.combine_size, chunk_store_limit=options.chunk_store_limit, use_inf_as_na=use_inf_as_na, + map_on_shuffle=map_on_shuffle, ) return agg_op(groupby) diff --git a/mars/dataframe/reduction/nunique.py b/mars/dataframe/reduction/nunique.py index 77dfd871cc..6075bc15b9 100644 --- a/mars/dataframe/reduction/nunique.py +++ b/mars/dataframe/reduction/nunique.py @@ -58,6 +58,8 @@ def _drop_duplicates(self, value, explode=False, agg=False): value = value.values if explode: + if len(value) == 0: + return [xp.array([], dtype=object)] value = xp.concatenate(value) value = xdf.unique(value) @@ -79,7 +81,8 @@ def _drop_duplicates(self, value, explode=False, agg=False): def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ xp, xdf = self._get_modules() if isinstance(in_data, xdf.Series): - unique_values = self._drop_duplicates(in_data) + # unique_values = self._drop_duplicates(in_data) + unique_values = [in_data.values] return xdf.Series(unique_values, name=in_data.name, dtype=object) else: if self._axis == 0: