Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements accessor for categorical types #3360

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,13 @@
# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {
"dateutil": ("https://dateutil.readthedocs.io/en/latest/", None),
"matplotlib": ("https://matplotlib.org/", None),
"matplotlib": ("https://matplotlib.org/stable/", None),
"numpy": ("https://numpy.org/doc/stable/", None),
"pandas": ("https://pandas.pydata.org/docs/", None),
"pandas-gbq": ("https://pandas-gbq.readthedocs.io/en/latest/", None),
"py": ("https://pylib.readthedocs.io/en/latest/", None),
"python": ("https://docs.python.org/3/", None),
"scipy": ("https://docs.scipy.org/doc/scipy/reference/", None),
"scipy": ("https://docs.scipy.org/doc/scipy/", None),
"statsmodels": ("https://www.statsmodels.org/devel/", None),
"pyarrow": ("https://arrow.apache.org/docs/", None),
}
Expand Down
25 changes: 25 additions & 0 deletions docs/source/reference/dataframe/series.rst
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,31 @@ strings and apply several methods to it. These can be accessed like
Series.str
Series.dt


.. _generated.series.cat:

Categorical accessor
~~~~~~~~~~~~~~~

Categorical-dtype specific methods and attributes are available under
the ``Series.cat`` accessor.

.. autosummary::
:toctree: generated/
:template: accessor_method.rst

Series.cat.categories
Series.cat.ordered
Series.cat.codes
Series.cat.rename_categories
Series.cat.reorder_categories
Series.cat.add_categories
Series.cat.remove_categories
Series.cat.set_categories
Series.cat.as_ordered
Series.cat.as_unordered


Plotting
--------
``Series.plot`` is both a callable method and a namespace attribute for
Expand Down
1 change: 1 addition & 0 deletions mars/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# noinspection PyUnresolvedReferences
from ..typing import ChunkType, TileableType, EntityType, OperandType
from .base import ExecutionError
from .context import get_context
from .entity import (
Entity,
EntityData,
Expand Down
22 changes: 11 additions & 11 deletions mars/dataframe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,32 +45,32 @@

from . import arithmetic
from . import base
from . import datastore
from . import groupby
from . import indexing
from . import merge as merge_
from . import missing
from . import plotting
from . import reduction
from . import statistics
from . import sort
from . import groupby
from . import statistics
from . import ufunc
from . import datastore
from . import window
from . import plotting

del (
reduction,
statistics,
arithmetic,
indexing,
merge_,
base,
datastore,
groupby,
indexing,
merge_,
missing,
ufunc,
datastore,
plotting,
reduction,
sort,
statistics,
ufunc,
window,
plotting,
)
del DataFrameFetch, DataFrameFetchShuffle

Expand Down
15 changes: 13 additions & 2 deletions mars/dataframe/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,15 @@
def _install():
from ..core import DATAFRAME_TYPE, SERIES_TYPE, INDEX_TYPE
from .standardize_range_index import ChunkStandardizeRangeIndex
from .categorical import _categorical_method_to_handlers
from .string_ import _string_method_to_handlers
from .datetimes import _datetime_method_to_handlers
from .accessor import StringAccessor, DatetimeAccessor, CachedAccessor
from .accessor import (
CachedAccessor,
CategoricalAccessor,
DatetimeAccessor,
StringAccessor,
)

for t in DATAFRAME_TYPE:
setattr(t, "to_gpu", to_gpu)
Expand Down Expand Up @@ -134,6 +140,10 @@ def _install():
setattr(t, "is_monotonic_increasing", property(fget=is_monotonic_increasing))
setattr(t, "is_monotonic_decreasing", property(fget=is_monotonic_decreasing))

for method in _categorical_method_to_handlers:
if not hasattr(CategoricalAccessor, method):
CategoricalAccessor._register(method)

for method in _string_method_to_handlers:
if not hasattr(StringAccessor, method):
StringAccessor._register(method)
Expand All @@ -143,8 +153,9 @@ def _install():
DatetimeAccessor._register(method)

for series in SERIES_TYPE:
series.str = CachedAccessor("str", StringAccessor)
series.cat = CachedAccessor("cat", CategoricalAccessor)
series.dt = CachedAccessor("dt", DatetimeAccessor)
series.str = CachedAccessor("str", StringAccessor)


_install()
Expand Down
53 changes: 51 additions & 2 deletions mars/dataframe/base/accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@

import pandas as pd
from pandas.api.types import (
is_categorical_dtype,
is_datetime64_dtype,
is_datetime64tz_dtype,
is_timedelta64_dtype,
is_period_dtype,
is_timedelta64_dtype,
)

from ...utils import adapt_mars_docstring
from .string_ import _string_method_to_handlers, SeriesStringMethod
from .categorical import _categorical_method_to_handlers, SeriesCategoricalMethod
from .datetimes import _datetime_method_to_handlers, SeriesDatetimeMethod
from .string_ import _string_method_to_handlers, SeriesStringMethod


class StringAccessor:
Expand Down Expand Up @@ -262,6 +264,53 @@ def __dir__(self) -> Iterable[str]:
return list(s)


class CategoricalAccessor:
def __init__(self, series):
if not is_categorical_dtype(series.dtype):
raise AttributeError("Can only use .cat accessor with categorical values")
self._series = series

@property
def ordered(self):
return self._series.dtype.ordered

@property
def categories(self):
return getattr(self, "_get_categories")()

@classmethod
def _gen_func(cls, method, is_property):
def _inner(self, *args, **kwargs):
op = SeriesCategoricalMethod(
method=method,
is_property=is_property,
method_args=args,
method_kwargs=kwargs,
)
return op(self._series)

if hasattr(pd.Series.cat, method):
_inner = wraps(getattr(pd.Series.cat, method))(_inner)
_inner.__doc__ = adapt_mars_docstring(
getattr(pd.Series.cat, method).__doc__
)
return _inner

@classmethod
def _register(cls, method):
# non-existing members are considered methods by default
is_property = not callable(getattr(pd.Series.cat, method, lambda: None))
func = cls._gen_func(method, is_property)
if is_property:
func = property(func)
setattr(cls, method, func)

def __dir__(self) -> Iterable[str]:
s = set(super().__dir__())
s.update(_categorical_method_to_handlers.keys())
return list(s)


class CachedAccessor:
def __init__(self, name: str, accessor) -> None:
self._name = name
Expand Down
86 changes: 34 additions & 52 deletions mars/dataframe/base/astype.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import pandas as pd
from pandas.api.types import CategoricalDtype

from ... import opcodes as OperandDef
from ... import opcodes
from ...core import recursive_tile
from ...serialization.serializables import AnyField, StringField, ListField
from ...serialization.serializables import AnyField, ListField, StringField
from ...tensor.base import sort
from ...utils import pd_release_version
from ..core import DATAFRAME_TYPE, SERIES_TYPE
Expand All @@ -29,42 +29,23 @@


class DataFrameAstype(DataFrameOperand, DataFrameOperandMixin):
_op_type_ = OperandDef.ASTYPE

_dtype_values = AnyField("dtype_values")
_errors = StringField("errors")
_category_cols = ListField("category_cols")

def __init__(
self,
dtype_values=None,
errors=None,
category_cols=None,
output_types=None,
**kw
):
super().__init__(
_dtype_values=dtype_values,
_errors=errors,
_category_cols=category_cols,
_output_types=output_types,
**kw
)
_op_type_ = opcodes.ASTYPE

@property
def dtype_values(self):
return self._dtype_values
dtype_values = AnyField("dtype_values", default=None)
errors = StringField("errors", default=None)
category_cols = ListField("category_cols", default=None)

@property
def errors(self):
return self._errors
def __init__(self, output_types=None, **kw):
super().__init__(_output_types=output_types, **kw)

@property
def category_cols(self):
return self._category_cols
@classmethod
def _is_categories_missing(cls, dtype):
return (isinstance(dtype, str) and dtype == "category") or (
isinstance(dtype, pd.CategoricalDtype) and dtype.categories is None
)

@classmethod
def _tile_one_chunk(cls, op):
def _tile_one_chunk(cls, op: "DataFrameAstype"):
c = op.inputs[0].chunks[0]
chunk_op = op.copy().reset_key()
chunk_params = op.outputs[0].params.copy()
Expand All @@ -80,23 +61,22 @@ def _tile_one_chunk(cls, op):
)

@classmethod
def _tile_series_index(cls, op):
def _tile_series_index(cls, op: "DataFrameAstype"):
in_series = op.inputs[0]
out = op.outputs[0]

unique_chunk = None
if op.dtype_values == "category" and isinstance(op.dtype_values, str):
unique_chunk = (yield from recursive_tile(sort(in_series.unique()))).chunks[
0
]
if cls._is_categories_missing(op.dtype_values):
unique = yield from recursive_tile(sort(in_series.unique()))
unique_chunk = unique.chunks[0]

chunks = []
for c in in_series.chunks:
chunk_op = op.copy().reset_key()
params = c.params.copy()
params["dtype"] = out.dtype
if unique_chunk is not None:
chunk_op._category_cols = [in_series.name]
chunk_op.category_cols = [in_series.name]
new_chunk = chunk_op.new_chunk([c, unique_chunk], **params)
else:
new_chunk = chunk_op.new_chunk([c], **params)
Expand All @@ -108,13 +88,13 @@ def _tile_series_index(cls, op):
)

@classmethod
def _tile_dataframe(cls, op):
def _tile_dataframe(cls, op: "DataFrameAstype"):
in_df = op.inputs[0]
out = op.outputs[0]
cum_nsplits = np.cumsum((0,) + in_df.nsplits[1])
out_chunks = []

if op.dtype_values == "category":
if cls._is_categories_missing(op.dtype_values):
# all columns need unique values
for c in in_df.chunks:
chunk_op = op.copy().reset_key()
Expand All @@ -123,21 +103,19 @@ def _tile_dataframe(cls, op):
cum_nsplits[c.index[1]] : cum_nsplits[c.index[1] + 1]
]
params["dtypes"] = dtypes
chunk_op._category_cols = list(c.columns_value.to_pandas())
chunk_op.category_cols = list(c.columns_value.to_pandas())
unique_chunks = []
for col in c.columns_value.to_pandas():
unique = yield from recursive_tile(sort(in_df[col].unique()))
unique_chunks.append(unique.chunks[0])
new_chunk = chunk_op.new_chunk([c] + unique_chunks, **params)
out_chunks.append(new_chunk)
elif (
isinstance(op.dtype_values, dict) and "category" in op.dtype_values.values()
elif isinstance(op.dtype_values, dict) and any(
cls._is_categories_missing(t) for t in op.dtype_values.values()
):
# some columns' types are category
category_cols = [
c
for c, v in op.dtype_values.items()
if isinstance(v, str) and v == "category"
c for c, v in op.dtype_values.items() if cls._is_categories_missing(v)
]
unique_chunks = dict()
for col in category_cols:
Expand All @@ -156,7 +134,7 @@ def _tile_dataframe(cls, op):
if col in category_cols:
chunk_category_cols.append(col)
chunk_unique_chunks.append(unique_chunks[col])
chunk_op._category_cols = chunk_category_cols
chunk_op.category_cols = chunk_category_cols
new_chunk = chunk_op.new_chunk([c] + chunk_unique_chunks, **params)
out_chunks.append(new_chunk)
else:
Expand All @@ -176,7 +154,7 @@ def _tile_dataframe(cls, op):
)

@classmethod
def tile(cls, op):
def tile(cls, op: "DataFrameAstype"):
if len(op.inputs[0].chunks) == 1:
return cls._tile_one_chunk(op)
elif isinstance(op.inputs[0], DATAFRAME_TYPE):
Expand All @@ -185,13 +163,14 @@ def tile(cls, op):
return (yield from cls._tile_series_index(op))

@classmethod
def execute(cls, ctx, op):
def execute(cls, ctx, op: "DataFrameAstype"):
in_data = ctx[op.inputs[0].key]
if not isinstance(op.dtype_values, dict):
if op.category_cols is not None:
uniques = [ctx[c.key] for c in op.inputs[1:]]
ordered = getattr(op.dtype_values, "ordered", None)
dtype = dict(
(col, CategoricalDtype(unique_values))
(col, CategoricalDtype(unique_values, ordered=ordered))
for col, unique_values in zip(op.category_cols, uniques)
)
ctx[op.outputs[0].key] = in_data.astype(dtype, errors=op.errors)
Expand All @@ -212,7 +191,10 @@ def execute(cls, ctx, op):
if op.category_cols is not None:
uniques = [ctx[c.key] for c in op.inputs[1:]]
for col, unique_values in zip(op.category_cols, uniques):
selected_dtype[col] = CategoricalDtype(unique_values)
ordered = getattr(selected_dtype[col], "ordered", None)
selected_dtype[col] = CategoricalDtype(
unique_values, ordered=ordered
)
ctx[op.outputs[0].key] = in_data.astype(selected_dtype, errors=op.errors)

def __call__(self, df):
Expand Down
Loading