Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix errors when calling df.agg("nunique") #2970

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 136 additions & 36 deletions mars/dataframe/groupby/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from ... import opcodes as OperandDef
from ...config import options
from ...core.custom_log import redirect_custom_log
from ...core import ENTITY_TYPE, OutputType
from ...core import ENTITY_TYPE, OutputType, recursive_tile
from ...core.context import get_context
from ...core.operand import OperandStage
from ...serialization.serializables import (
Expand All @@ -51,6 +51,7 @@
ReductionCompiler,
ReductionSteps,
ReductionAggStep,
CustomReduction,
)
from ..reduction.aggregation import is_funcs_aggregate, normalize_reduction_funcs
from ..utils import parse_index, build_concatenated_rows_frame, is_cudf
Expand All @@ -63,6 +64,8 @@

_support_get_group_without_as_index = pd_release_version[:2] > (1, 0)

_FUNCS_PREFER_SHUFFLE = {"nunique"}


class SizeRecorder:
def __init__(self):
Expand Down Expand Up @@ -94,6 +97,7 @@ def get(self):
"skew": lambda x, bias=False: x.skew(bias=bias),
"kurt": lambda x, bias=False: x.kurt(bias=bias),
"kurtosis": lambda x, bias=False: x.kurtosis(bias=bias),
"nunique": lambda x: x.nunique(),
}
_series_col_name = "col_name"

Expand Down Expand Up @@ -161,6 +165,8 @@ class DataFrameGroupByAgg(DataFrameOperand, DataFrameOperandMixin):
method = StringField("method")
use_inf_as_na = BoolField("use_inf_as_na")

map_on_shuffle = AnyField("map_on_shuffle")

# for chunk
combine_size = Int32Field("combine_size")
chunk_store_limit = Int64Field("chunk_store_limit")
Expand Down Expand Up @@ -419,10 +425,29 @@ def _tile_with_shuffle(
in_df: TileableType,
out_df: TileableType,
func_infos: ReductionSteps,
agg_chunks: List[ChunkType] = None,
):
# First, perform groupby and aggregation on each chunk.
agg_chunks = cls._gen_map_chunks(op, in_df.chunks, out_df, func_infos)
return cls._perform_shuffle(op, agg_chunks, in_df, out_df, func_infos)
if op.map_on_shuffle is None:
op.map_on_shuffle = all(
agg_fun.custom_reduction is None for agg_fun in func_infos.agg_funcs
)

if not op.map_on_shuffle:
groupby_params = op.groupby_params.copy()
selection = groupby_params.pop("selection", None)
groupby = in_df.groupby(**groupby_params)
if selection:
groupby = groupby[selection]
result = groupby.transform(
op.raw_func, _call_agg=True, index=out_df.index_value
)
return (yield from recursive_tile(result))
else:
# First, perform groupby and aggregation on each chunk.
agg_chunks = agg_chunks or cls._gen_map_chunks(
op, in_df.chunks, out_df, func_infos
)
return cls._perform_shuffle(op, agg_chunks, in_df, out_df, func_infos)

@classmethod
def _perform_shuffle(
Expand Down Expand Up @@ -622,8 +647,10 @@ def _tile_auto(
else:
# otherwise, use shuffle
logger.debug("Choose shuffle method for groupby operand %s", op)
return cls._perform_shuffle(
op, chunks + left_chunks, in_df, out_df, func_infos
return (
yield from cls._tile_with_shuffle(
op, in_df, out_df, func_infos, chunks + left_chunks
)
)

@classmethod
Expand All @@ -636,12 +663,16 @@ def tile(cls, op: "DataFrameGroupByAgg"):
func_infos = cls._compile_funcs(op, in_df)

if op.method == "auto":
if len(in_df.chunks) <= op.combine_size:
if set(op.func) & _FUNCS_PREFER_SHUFFLE:
return (
yield from cls._tile_with_shuffle(op, in_df, out_df, func_infos)
)
elif len(in_df.chunks) <= op.combine_size:
return cls._tile_with_tree(op, in_df, out_df, func_infos)
else:
return (yield from cls._tile_auto(op, in_df, out_df, func_infos))
if op.method == "shuffle":
return cls._tile_with_shuffle(op, in_df, out_df, func_infos)
return (yield from cls._tile_with_shuffle(op, in_df, out_df, func_infos))
elif op.method == "tree":
return cls._tile_with_tree(op, in_df, out_df, func_infos)
else: # pragma: no cover
Expand Down Expand Up @@ -694,39 +725,66 @@ def _pack_inputs(agg_funcs: List[ReductionAggStep], in_data):
return out_dict

@staticmethod
def _do_custom_agg(op, custom_reduction, *input_objs):
def _do_custom_agg_single(op, custom_reduction: CustomReduction, input_obj):
if op.stage == OperandStage.map:
if custom_reduction.pre_with_agg:
apply_fun = custom_reduction.pre
else:

def apply_fun(obj):
return custom_reduction.agg(custom_reduction.pre(obj))

elif op.stage == OperandStage.agg:
if custom_reduction.post_with_agg:
apply_fun = custom_reduction.post
else:

def apply_fun(obj):
return custom_reduction.post(custom_reduction.agg(obj))

else:
apply_fun = custom_reduction.agg

res = input_obj.apply(apply_fun)
return (res,)

@staticmethod
def _do_custom_agg_multiple(op, custom_reduction: CustomReduction, *input_objs):
xdf = cudf if op.gpu else pd
results = []
out = op.outputs[0]
for group_key in input_objs[0].groups.keys():
group_objs = [o.get_group(group_key) for o in input_objs]

agg_done = False
if op.stage == OperandStage.map:
result = custom_reduction.pre(group_objs[0])
res_tuple = custom_reduction.pre(group_objs[0])
agg_done = custom_reduction.pre_with_agg
if not isinstance(result, tuple):
result = (result,)
if not isinstance(res_tuple, tuple):
res_tuple = (res_tuple,)
else:
result = group_objs
res_tuple = group_objs

if not agg_done:
result = custom_reduction.agg(*result)
if not isinstance(result, tuple):
result = (result,)
res_tuple = custom_reduction.agg(*res_tuple)
if not isinstance(res_tuple, tuple):
res_tuple = (res_tuple,)

if op.stage == OperandStage.agg:
result = custom_reduction.post(*result)
if not isinstance(result, tuple):
result = (result,)

if out.ndim == 2:
result = tuple(r.to_frame().T for r in result)
if op.stage == OperandStage.agg:
result = tuple(r.astype(out.dtypes) for r in result)
else:
result = tuple(xdf.Series(r) for r in result)
res_tuple = custom_reduction.post(*res_tuple)
if not isinstance(res_tuple, tuple):
res_tuple = (res_tuple,)

new_res_list = []
for r in res_tuple:
if out.ndim == 2 and r.ndim == 1:
r = r.to_frame().T
elif out.ndim < 2:
if getattr(r, "ndim", 0) == 2:
r = r.iloc[0, :]
else:
r = xdf.Series(r)

for r in result:
if len(input_objs[0].grouper.names) == 1:
r.index = xdf.Index(
[group_key], name=input_objs[0].grouper.names[0]
Expand All @@ -735,7 +793,21 @@ def _do_custom_agg(op, custom_reduction, *input_objs):
r.index = xdf.MultiIndex.from_tuples(
[group_key], names=input_objs[0].grouper.names
)
results.append(result)

if op.groupby_params.get("selection"):
# correct columns for groupby-selection-agg paradigms
selection = op.groupby_params["selection"]
r.columns = [selection] if input_objs[0].ndim == 1 else selection

if out.ndim == 2 and op.stage == OperandStage.agg:
dtype_cols = set(out.dtypes.index) & set(r.columns)
conv_dtypes = {
k: v for k, v in out.dtypes.items() if k in dtype_cols
}
r = r.astype(conv_dtypes)
new_res_list.append(r)

results.append(tuple(new_res_list))
if not results and op.stage == OperandStage.agg:
empty_df = pd.DataFrame(
[], columns=out.dtypes.index, index=out.index_value.to_pandas()[:0]
Expand All @@ -745,6 +817,13 @@ def _do_custom_agg(op, custom_reduction, *input_objs):
concat_result = tuple(xdf.concat(parts) for parts in zip(*results))
return concat_result

@classmethod
def _do_custom_agg(cls, op, custom_reduction, *input_objs, output_limit: int = 1):
if output_limit == 1:
return cls._do_custom_agg_single(op, custom_reduction, input_objs[0])
else:
return cls._do_custom_agg_multiple(op, custom_reduction, *input_objs)

@staticmethod
def _do_predefined_agg(input_obj, agg_func, single_func=False, **kwds):
ndim = getattr(input_obj, "ndim", None) or input_obj.obj.ndim
Expand Down Expand Up @@ -839,12 +918,16 @@ def _wrapped_func(col):
_agg_func_name,
custom_reduction,
_output_key,
_output_limit,
output_limit,
kwds,
) in op.agg_funcs:
input_obj = ret_map_groupbys[input_key]
if map_func_name == "custom_reduction":
agg_dfs.extend(cls._do_custom_agg(op, custom_reduction, input_obj))
agg_dfs.extend(
cls._do_custom_agg(
op, custom_reduction, input_obj, output_limit=output_limit
)
)
else:
single_func = map_func_name == op.raw_func
agg_dfs.append(
Expand Down Expand Up @@ -885,12 +968,16 @@ def _execute_combine(cls, ctx, op: "DataFrameGroupByAgg"):
agg_func_name,
custom_reduction,
output_key,
_output_limit,
output_limit,
kwds,
) in op.agg_funcs:
input_obj = in_data_dict[output_key]
if agg_func_name == "custom_reduction":
combines.extend(cls._do_custom_agg(op, custom_reduction, *input_obj))
combines.extend(
cls._do_custom_agg(
op, custom_reduction, *input_obj, output_limit=output_limit
)
)
else:
combines.append(
cls._do_predefined_agg(input_obj, agg_func_name, **kwds)
Expand Down Expand Up @@ -925,15 +1012,15 @@ def _execute_agg(cls, ctx, op: "DataFrameGroupByAgg"):
agg_func_name,
custom_reduction,
output_key,
_output_limit,
output_limit,
kwds,
) in op.agg_funcs:
if agg_func_name == "custom_reduction":
input_obj = tuple(
cls._get_grouped(op, o, ctx) for o in in_data_dict[output_key]
)
in_data_dict[output_key] = cls._do_custom_agg(
op, custom_reduction, *input_obj
op, custom_reduction, *input_obj, output_limit=output_limit
)[0]
else:
input_obj = cls._get_grouped(op, in_data_dict[output_key], ctx)
Expand Down Expand Up @@ -1017,7 +1104,15 @@ def execute(cls, ctx, op: "DataFrameGroupByAgg"):
pd.reset_option("mode.use_inf_as_na")


def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs):
def agg(
groupby,
func=None,
method="auto",
combine_size=None,
map_on_shuffle=None,
*args,
**kwargs,
):
"""
Aggregate using one or more operations on grouped data.

Expand All @@ -1033,7 +1128,11 @@ def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs):
in distributed mode and use 'tree' in local mode.
combine_size : int
The number of chunks to combine when method is 'tree'

map_on_shuffle : bool
When not specified, will decide whether to perform aggregation on the
map stage of shuffle (currently no aggregation when there is custom
reduction in functions). Otherwise, whether to call map on map stage
of shuffle is determined by the value.

Returns
-------
Expand Down Expand Up @@ -1080,5 +1179,6 @@ def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs):
combine_size=combine_size or options.combine_size,
chunk_store_limit=options.chunk_store_limit,
use_inf_as_na=use_inf_as_na,
map_on_shuffle=map_on_shuffle,
)
return agg_op(groupby)
19 changes: 11 additions & 8 deletions mars/dataframe/groupby/tests/test_groupby_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -1241,13 +1241,16 @@ def test_groupby_nunique(setup):
# test with as_index=False
mdf = md.DataFrame(df1, chunk_size=13)
if _agg_size_as_frame:
res = mdf.groupby("b", as_index=False)["a"].nunique().execute().fetch()
expected = df1.groupby("b", as_index=False)["a"].nunique()
pd.testing.assert_frame_equal(
mdf.groupby("b", as_index=False)["a"]
.nunique()
.execute()
.fetch()
.sort_values(by="b", ignore_index=True),
df1.groupby("b", as_index=False)["a"]
.nunique()
.sort_values(by="b", ignore_index=True),
res.sort_values(by="b", ignore_index=True),
expected.sort_values(by="b", ignore_index=True),
)

res = mdf.groupby("b", as_index=False)[["a", "c"]].nunique().execute().fetch()
expected = df1.groupby("b", as_index=False)[["a", "c"]].nunique()
pd.testing.assert_frame_equal(
res.sort_values(by="b", ignore_index=True),
expected.sort_values(by="b", ignore_index=True),
)
5 changes: 4 additions & 1 deletion mars/dataframe/merge/concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,10 @@ def _auto_concat_dataframe_chunks(chunk, inputs):
)

if chunk.op.axis is not None:
return xdf.concat(inputs, axis=op.axis)
try:
return xdf.concat(inputs, axis=op.axis)
except:
raise

# auto generated concat when executing a DataFrame
if len(inputs) == 1:
Expand Down
1 change: 1 addition & 0 deletions mars/dataframe/reduction/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def where_function(cond, var1, var2):
"skew": lambda x, skipna=True, bias=False: x.skew(skipna=skipna, bias=bias),
"kurt": lambda x, skipna=True, bias=False: x.kurt(skipna=skipna, bias=bias),
"kurtosis": lambda x, skipna=True, bias=False: x.kurtosis(skipna=skipna, bias=bias),
"nunique": lambda x: x.nunique(),
}


Expand Down
6 changes: 5 additions & 1 deletion mars/dataframe/reduction/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,8 @@ class CustomReduction:

# set to True when pre() already performs aggregation
pre_with_agg = False
# set to True when post() already performs aggregation
post_with_agg = False

def __init__(self, name=None, is_gpu=None):
self.name = name or "<custom>"
Expand Down Expand Up @@ -972,13 +974,15 @@ def _compile_function(self, func, func_name=None, ndim=1) -> ReductionSteps:
else:
map_func_name, agg_func_name = step_func_name, step_func_name

op_custom_reduction = getattr(t.op, "custom_reduction", None)

# build agg description
agg_funcs.append(
ReductionAggStep(
agg_input_key,
map_func_name,
agg_func_name,
custom_reduction,
op_custom_reduction or custom_reduction,
t.key,
output_limit,
t.op.get_reduction_args(axis=self._axis),
Expand Down
Loading