Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] Dev/groupby order preservation poc #3000

Draft
wants to merge 44 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
3cae9e7
PoC complete. need to test
sak2002 Apr 19, 2022
73b61f9
Merge branch 'master' of https://github.com/sak2002/mars into dev/gro…
sak2002 Apr 19, 2022
aa39493
method auto will also use sort. added changes so that if partition do…
sak2002 Apr 20, 2022
9823ddf
Merge branch 'master' of https://github.com/mars-project/mars into de…
sak2002 Apr 21, 2022
98b995a
fixing unit tests
sak2002 Apr 22, 2022
0fcbc47
added unit tests for group by sort
sak2002 Apr 22, 2022
8186e64
fixed conflicts
sak2002 Apr 25, 2022
a7684d5
fixed conflicts
sak2002 Apr 25, 2022
f9f4e9c
sample output chunk shape does not rely on by
sakshamkumar-byt Apr 26, 2022
2f2f4d9
order preserve poc
sak2002 Apr 25, 2022
c93adb0
impl execute for indexing phase
sakshamkumar-byt Apr 28, 2022
ef8bfaa
Merge remote-tracking branch 'upstream/master' into dev/groupby-sort-poc
May 5, 2022
e75bfcf
Fix output_type of DataFrameGroupbyAgg map
May 5, 2022
3966d05
removed redundant comments
sakshamkumar-byt May 5, 2022
dd37724
changes for order preservation
sakshamkumar-byt May 5, 2022
0b379c9
pulled origin master. fixed merge conflict
sakshamkumar-byt May 5, 2022
83c49ad
Merge branch 'dev/groupby-sort-poc' of https://github.com/sak2002/mar…
sakshamkumar-byt May 5, 2022
dfbb6da
Made changes for sampling on the basis of series or dataframe output
sakshamkumar-byt May 5, 2022
03fb0e1
code cleanup
sakshamkumar-byt May 5, 2022
ff58e50
rebased to groupby-sort-poc. working poc for order preservation for d…
sakshamkumar-byt May 5, 2022
59c447f
Fix ut
May 6, 2022
19dc420
merge back groupby-sort-poc
sakshamkumar-byt May 8, 2022
e658810
fixed imports. removed redundant code
sakshamkumar-byt May 8, 2022
1126278
reformat code
sakshamkumar-byt May 8, 2022
49ee66f
code style check
sakshamkumar-byt May 8, 2022
3bd71ea
code style check
sakshamkumar-byt May 8, 2022
10591b6
added apache 2.0 licence
sakshamkumar-byt May 8, 2022
19cedfb
code cleanup
sakshamkumar-byt May 8, 2022
72c6c89
Merge branch 'master' of https://github.com/mars-project/mars into de…
sakshamkumar-byt May 8, 2022
df6f6fa
Added test for empty df
sakshamkumar-byt May 8, 2022
86bab10
removed unused import
sakshamkumar-byt May 8, 2022
68ef1ac
fixed code style issues
sakshamkumar-byt May 8, 2022
f893fc3
removed redundant code
sakshamkumar-byt May 8, 2022
9f88148
Merge branch 'dev/groupby-sort-poc' of https://github.com/sak2002/mar…
sakshamkumar-byt May 8, 2022
515f692
made changes to map and reduce stage to support series and dataframe
sakshamkumar-byt May 9, 2022
bb0236d
resolved a few review comments
sakshamkumar-byt May 9, 2022
daf4319
code style checks
sakshamkumar-byt May 9, 2022
0c8ca0b
code style checks
sakshamkumar-byt May 9, 2022
e98e585
resolved a few review comments
sakshamkumar-byt May 9, 2022
8ac2555
resolved review comment to direct output type to init
sakshamkumar-byt May 9, 2022
b09f47c
removed unused sort
sakshamkumar-byt May 9, 2022
ccbdebd
Merge remote-tracking branch 'upstream/master' into dev/groupby-sort-poc
May 9, 2022
ea997b2
Fix ut
May 9, 2022
6b01acb
merge back sort poc
sakshamkumar-byt May 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
444 changes: 422 additions & 22 deletions mars/dataframe/groupby/aggregation.py

Large diffs are not rendered by default.

14 changes: 13 additions & 1 deletion mars/dataframe/groupby/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class DataFrameGroupByOperand(MapReduceOperand, DataFrameOperandMixin):
_level = AnyField("level")
_as_index = BoolField("as_index")
_sort = BoolField("sort")
_preserve_order = BoolField("preserve_order")
_group_keys = BoolField("group_keys")

_shuffle_size = Int32Field("shuffle_size")
Expand All @@ -61,6 +62,7 @@ def __init__(
group_keys=None,
shuffle_size=None,
output_types=None,
preserve_order=None,
**kw
):
super().__init__(
Expand All @@ -71,8 +73,13 @@ def __init__(
_group_keys=group_keys,
_shuffle_size=shuffle_size,
_output_types=output_types,
_preserve_order=preserve_order,
**kw
)
if sort:
self._preserve_order = False
else:
self._preserve_order = preserve_order
if output_types:
if self.stage in (OperandStage.map, OperandStage.reduce):
if output_types[0] in (
Expand Down Expand Up @@ -108,6 +115,10 @@ def as_index(self):
def sort(self):
return self._sort

@property
def preserve_order(self):
return self._preserve_order

@property
def group_keys(self):
return self._group_keys
Expand Down Expand Up @@ -485,7 +496,7 @@ def execute(cls, ctx, op: "DataFrameGroupByOperand"):
)


def groupby(df, by=None, level=None, as_index=True, sort=True, group_keys=True):
def groupby(df, by=None, level=None, as_index=True, sort=True, group_keys=True, preserve_order=False):
if not as_index and df.op.output_types[0] == OutputType.series:
raise TypeError("as_index=False only valid with DataFrame")

Expand All @@ -505,5 +516,6 @@ def groupby(df, by=None, level=None, as_index=True, sort=True, group_keys=True):
sort=sort,
group_keys=group_keys,
output_types=output_types,
preserve_order=preserve_order,
)
return op(df)
12 changes: 9 additions & 3 deletions mars/dataframe/groupby/getitem.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from ... import opcodes
from ...core import OutputType
from ...serialization.serializables import AnyField
from ...serialization.serializables import AnyField, BoolField
from ..operands import DataFrameOperandMixin, DataFrameOperand
from ..utils import parse_index

Expand All @@ -27,8 +27,10 @@ class GroupByIndex(DataFrameOperandMixin, DataFrameOperand):

_selection = AnyField("selection")

def __init__(self, selection=None, output_types=None, **kw):
super().__init__(_selection=selection, _output_types=output_types, **kw)
_preserve_order = BoolField("preserve_order")

def __init__(self, selection=None, output_types=None, preserve_order=None, **kw):
super().__init__(_selection=selection, _output_types=output_types, _preserve_order=preserve_order, **kw)

@property
def selection(self):
Expand All @@ -40,6 +42,10 @@ def groupby_params(self):
params["selection"] = self.selection
return params

@property
def preserve_order(self):
return self._preserve_order

def build_mock_groupby(self, **kwargs):
groupby_op = self.inputs[0].op
return groupby_op.build_mock_groupby(**kwargs)[self.selection]
Expand Down
251 changes: 251 additions & 0 deletions mars/dataframe/groupby/preserve_order.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
import numpy as np
import pandas as pd
from pandas import MultiIndex

from ... import opcodes as OperandDef
from mars.dataframe.operands import DataFrameOperandMixin, DataFrameOperand
from mars.utils import lazy_import
from ...core.operand import OperandStage, MapReduceOperand
from ...serialization.serializables import Int32Field, AnyField, StringField, ListField, BoolField

cudf = lazy_import("cudf", globals=globals())


class DataFrameOrderPreserveIndexOperand(DataFrameOperand, DataFrameOperandMixin):
_op_type_ = OperandDef.GROUPBY_SORT_ORDER_INDEX

_index_prefix = Int32Field("index_prefix")

def __init__(self, output_types=None, index_prefix=None, *args, **kwargs):
super().__init__(_output_types=output_types, _index_prefix=index_prefix, *args, **kwargs)

@property
def index_prefix(self):
return self._index_prefix

@property
def output_limit(self):
return 1

@classmethod
def execute(cls, ctx, op):
a = ctx[op.inputs[0].key][0]
xdf = pd if isinstance(a, (pd.DataFrame, pd.Series)) else cudf
if len(a) == 0:
# when chunk is empty, return the empty chunk itself
ctx[op.outputs[0].key] = a
return

min_table = xdf.DataFrame({"min_col": np.arange(0, len((a))), "chunk_index": op.index_prefix} , index=a.index)

ctx[op.outputs[-1].key] = min_table


class DataFrameOrderPreservePivotOperand(DataFrameOperand, DataFrameOperandMixin):
_op_type_ = OperandDef.GROUPBY_SORT_ORDER_PIVOT

_n_partition = Int32Field("n_partition")
_by = AnyField("by")

def __init__(self, n_partition=None, output_types=None, by=None, *args, **kwargs):
super().__init__(_n_partition=n_partition, _output_types=output_types, _by=by, *args, **kwargs)

@property
def by(self):
return self._by

@property
def output_limit(self):
return 2

@classmethod
def execute(cls, ctx, op):
inputs = [ctx[c.key] for c in op.inputs if len(ctx[c.key]) > 0]
if len(inputs) == 0:
# corner case: nothing sampled, we need to do nothing
ctx[op.outputs[0].key] = ctx[op.outputs[-1].key] = ctx[op.inputs[0].key]
return

xdf = pd if isinstance(inputs[0], (pd.DataFrame, pd.Series)) else cudf

a = xdf.concat(inputs, axis=0)
a = a.sort_index()
a_group = a.groupby(op.by).groups
a_list = []
for g in a_group:
group_df = a.loc[g]
group_min_index = group_df['chunk_index'].min()
group_min_col = group_df.loc[group_df['chunk_index'] == group_min_index]['min_col'].min()
if isinstance(a.axes[0], MultiIndex):
index = pd.MultiIndex.from_tuples([g], names=group_df.index.names)
else:
index = pd.Index([g], name=group_df.index.names)
a_list_df = pd.DataFrame({"chunk_index" : group_min_index, "min_col" : group_min_col}, index=index)
a_list.append(a_list_df)

a = pd.concat(a_list)

ctx[op.outputs[0].key] = a

sort_values_df = a.sort_values(['chunk_index', 'min_col'])

p = len(inputs)
if len(sort_values_df) < p:
num = p // len(a) + 1
sort_values_df = sort_values_df.append([sort_values_df] * (num - 1))

sort_values_df = sort_values_df.sort_values(['chunk_index', 'min_col'])

w = sort_values_df.shape[0] * 1.0 / (p + 1)

values = sort_values_df[['chunk_index', 'min_col']].values

slc = np.linspace(
max(w-1, 0), len(sort_values_df) - 1, num=len(op.inputs) - 1, endpoint=False
).astype(int)
out = values[slc]
ctx[op.outputs[-1].key] = out

class DataFrameGroupbyOrderPresShuffle(MapReduceOperand, DataFrameOperandMixin):
_op_type_ = OperandDef.GROUPBY_SORT_SHUFFLE

_by = ListField("by")
_n_partition = Int32Field("n_partition")

def __init__(
self,
by=None,
n_partition=None,
output_types=None,
**kw
):
super().__init__(
_by=by,
_n_partition=n_partition,
_output_types=output_types,
**kw
)

@property
def by(self):
return self._by

@property
def n_partition(self):
return self._n_partition

@property
def output_limit(self):
return 1


@classmethod
def _execute_dataframe_map(cls, ctx, op):
df, pivots, min_table = [ctx[c.key] for c in op.inputs]
out = op.outputs[0]
if isinstance(df, tuple):
ijoin_df = tuple(x.join(min_table, how="inner") for x in df)
else:
ijoin_df = df.join(min_table, how="inner")

if isinstance(df, tuple):
for i in range(len(df)):
ijoin_df[i].index = ijoin_df[i].index.rename(df[i].index.names) if isinstance(df[i].index, MultiIndex) else ijoin_df[i].index.rename(df[i].index.name)
else:
ijoin_df.index = ijoin_df.index.rename(df.index.names) if isinstance(df.index, MultiIndex) else ijoin_df.index.rename(df.index.name)

def _get_out_df(p_index, in_df):
if p_index == 0:
index_upper = pivots[p_index][0]+1
intermediary_dfs = []
for i in range(0, index_upper):
if i == index_upper-1:
intermediary_dfs.append(in_df.loc[in_df['chunk_index'] == i].loc[in_df['min_col'] < pivots[p_index][1]])
else:
intermediary_dfs.append(in_df.loc[in_df['chunk_index'] == i])
elif p_index == op.n_partition - 1:
intermediary_dfs = []
index_lower = pivots[p_index-1][0]
index_upper = in_df['chunk_index'].max() + 1
for i in range(index_lower, index_upper):
if i == index_lower:
intermediary_dfs.append(in_df.loc[in_df['chunk_index'] == i].loc[in_df['min_col'] >= pivots[p_index-1][1]])
else:
intermediary_dfs.append(in_df.loc[in_df['chunk_index'] == i])
else:
intermediary_dfs = []
index_lower = pivots[p_index - 1][0]
index_upper = pivots[p_index][0]+1
if index_upper == index_lower + 1:
intermediary_dfs.append(
in_df.loc[in_df['chunk_index'] == index_lower].loc[
(in_df['min_col'] >= pivots[p_index - 1][1]) & (in_df['min_col'] < pivots[p_index][1])])
else:
for i in range(index_lower, index_upper):
if i == index_lower:
if index_lower != index_upper:
intermediary_dfs.append(in_df.loc[in_df['chunk_index'] == i].loc[in_df['min_col'] >= pivots[p_index-1][1]])
elif i == index_upper-1:
intermediary_dfs.append(in_df.loc[in_df['chunk_index'] == i].loc[in_df['min_col'] < pivots[p_index][1]])
else:
intermediary_dfs.append(in_df.loc[in_df['chunk_index'] == i])
if len(intermediary_dfs) > 0:
out_df = pd.concat(intermediary_dfs)
else:
out_df = None
return out_df

for i in range(op.n_partition):
index = (i, 0)
if isinstance(df, tuple):
out_df = tuple(_get_out_df(i, x) for x in ijoin_df)
else:
out_df = _get_out_df(i, ijoin_df)
if out_df is not None:
ctx[out.key, index] = out_df

@classmethod
def _execute_map(cls, ctx, op):
cls._execute_dataframe_map(ctx, op)

@classmethod
def _execute_reduce(cls, ctx, op: "DataFramePSRSShuffle"):
out_chunk = op.outputs[0]
raw_inputs = list(op.iter_mapper_data(ctx, pop=False))
by = op.by
xdf = cudf if op.gpu else pd

r = []

if isinstance(raw_inputs[0], tuple):
tuple_len = len(raw_inputs[0])
for i in range(tuple_len):
concat_df = xdf.concat([inp[i] for inp in raw_inputs], axis=0)
concat_df = concat_df.sort_values(["chunk_index", "min_col"]).drop(columns=["chunk_index", "min_col"])
r.append(concat_df)
r = tuple(r)
else:
concat_df = xdf.concat(raw_inputs, axis=0)
concat_df = concat_df.sort_values(["chunk_index", "min_col"]).drop(columns=["chunk_index", "min_col"])
r = concat_df

if isinstance(r, tuple):
ctx[op.outputs[0].key] = r + (by,)
else:
ctx[op.outputs[0].key] = (r, by)

@classmethod
def estimate_size(cls, ctx, op):
super().estimate_size(ctx, op)
result = ctx[op.outputs[0].key]
if op.stage == OperandStage.reduce:
ctx[op.outputs[0].key] = (result[0], result[1] * 1.5)
else:
ctx[op.outputs[0].key] = result

@classmethod
def execute(cls, ctx, op):
if op.stage == OperandStage.map:
cls._execute_map(ctx, op)
else:
cls._execute_reduce(ctx, op)
Loading