Skip to content

Commit

Permalink
Implements accessor for categorical types
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi committed Nov 2, 2023
1 parent bcc0005 commit fd6a0f1
Show file tree
Hide file tree
Showing 10 changed files with 326 additions and 69 deletions.
4 changes: 2 additions & 2 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,13 @@
# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {
"dateutil": ("https://dateutil.readthedocs.io/en/latest/", None),
"matplotlib": ("https://matplotlib.org/", None),
"matplotlib": ("https://matplotlib.org/stable/", None),
"numpy": ("https://numpy.org/doc/stable/", None),
"pandas": ("https://pandas.pydata.org/docs/", None),
"pandas-gbq": ("https://pandas-gbq.readthedocs.io/en/latest/", None),
"py": ("https://pylib.readthedocs.io/en/latest/", None),
"python": ("https://docs.python.org/3/", None),
"scipy": ("https://docs.scipy.org/doc/scipy/reference/", None),
"scipy": ("https://docs.scipy.org/doc/scipy/", None),
"statsmodels": ("https://www.statsmodels.org/devel/", None),
"pyarrow": ("https://arrow.apache.org/docs/", None),
}
Expand Down
25 changes: 25 additions & 0 deletions docs/source/reference/dataframe/series.rst
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,31 @@ strings and apply several methods to it. These can be accessed like
Series.str
Series.dt


.. _generated.series.cat:

Categorical accessor
~~~~~~~~~~~~~~~

Categorical-dtype specific methods and attributes are available under
the ``Series.cat`` accessor.

.. autosummary::
:toctree: generated/
:template: accessor_method.rst

Series.cat.categories
Series.cat.ordered
Series.cat.codes
Series.cat.rename_categories
Series.cat.reorder_categories
Series.cat.add_categories
Series.cat.remove_categories
Series.cat.set_categories
Series.cat.as_ordered
Series.cat.as_unordered


Plotting
--------
``Series.plot`` is both a callable method and a namespace attribute for
Expand Down
1 change: 1 addition & 0 deletions mars/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# noinspection PyUnresolvedReferences
from ..typing import ChunkType, TileableType, EntityType, OperandType
from .base import ExecutionError
from .context import get_context
from .entity import (
Entity,
EntityData,
Expand Down
22 changes: 11 additions & 11 deletions mars/dataframe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,32 +45,32 @@

from . import arithmetic
from . import base
from . import datastore
from . import groupby
from . import indexing
from . import merge as merge_
from . import missing
from . import plotting
from . import reduction
from . import statistics
from . import sort
from . import groupby
from . import statistics
from . import ufunc
from . import datastore
from . import window
from . import plotting

del (
reduction,
statistics,
arithmetic,
indexing,
merge_,
base,
datastore,
groupby,
indexing,
merge_,
missing,
ufunc,
datastore,
plotting,
reduction,
sort,
statistics,
ufunc,
window,
plotting,
)
del DataFrameFetch, DataFrameFetchShuffle

Expand Down
15 changes: 13 additions & 2 deletions mars/dataframe/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,15 @@
def _install():
from ..core import DATAFRAME_TYPE, SERIES_TYPE, INDEX_TYPE
from .standardize_range_index import ChunkStandardizeRangeIndex
from .categorical import _categorical_method_to_handlers
from .string_ import _string_method_to_handlers
from .datetimes import _datetime_method_to_handlers
from .accessor import StringAccessor, DatetimeAccessor, CachedAccessor
from .accessor import (
CachedAccessor,
CategoricalAccessor,
DatetimeAccessor,
StringAccessor,
)

for t in DATAFRAME_TYPE:
setattr(t, "to_gpu", to_gpu)
Expand Down Expand Up @@ -134,6 +140,10 @@ def _install():
setattr(t, "is_monotonic_increasing", property(fget=is_monotonic_increasing))
setattr(t, "is_monotonic_decreasing", property(fget=is_monotonic_decreasing))

for method in _categorical_method_to_handlers:
if not hasattr(CategoricalAccessor, method):
CategoricalAccessor._register(method)

for method in _string_method_to_handlers:
if not hasattr(StringAccessor, method):
StringAccessor._register(method)
Expand All @@ -143,8 +153,9 @@ def _install():
DatetimeAccessor._register(method)

for series in SERIES_TYPE:
series.str = CachedAccessor("str", StringAccessor)
series.cat = CachedAccessor("cat", CategoricalAccessor)
series.dt = CachedAccessor("dt", DatetimeAccessor)
series.str = CachedAccessor("str", StringAccessor)


_install()
Expand Down
53 changes: 51 additions & 2 deletions mars/dataframe/base/accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@

import pandas as pd
from pandas.api.types import (
is_categorical_dtype,
is_datetime64_dtype,
is_datetime64tz_dtype,
is_timedelta64_dtype,
is_period_dtype,
is_timedelta64_dtype,
)

from ...utils import adapt_mars_docstring
from .string_ import _string_method_to_handlers, SeriesStringMethod
from .categorical import _categorical_method_to_handlers, SeriesCategoricalMethod
from .datetimes import _datetime_method_to_handlers, SeriesDatetimeMethod
from .string_ import _string_method_to_handlers, SeriesStringMethod


class StringAccessor:
Expand Down Expand Up @@ -262,6 +264,53 @@ def __dir__(self) -> Iterable[str]:
return list(s)


class CategoricalAccessor:
def __init__(self, series):
if not is_categorical_dtype(series.dtype):
raise AttributeError("Can only use .cat accessor with categorical values")
self._series = series

@property
def ordered(self):
return self._series.dtype.ordered

@property
def categories(self):
return getattr(self, "_get_categories")()

@classmethod
def _gen_func(cls, method, is_property):
def _inner(self, *args, **kwargs):
op = SeriesCategoricalMethod(
method=method,
is_property=is_property,
method_args=args,
method_kwargs=kwargs,
)
return op(self._series)

if hasattr(pd.Series.cat, method):
_inner = wraps(getattr(pd.Series.cat, method))(_inner)
_inner.__doc__ = adapt_mars_docstring(
getattr(pd.Series.cat, method).__doc__
)
return _inner

@classmethod
def _register(cls, method):
# non-existing members are considered methods by default
is_property = not callable(getattr(pd.Series.cat, method, lambda: None))
func = cls._gen_func(method, is_property)
if is_property:
func = property(func)
setattr(cls, method, func)

def __dir__(self) -> Iterable[str]:
s = set(super().__dir__())
s.update(_categorical_method_to_handlers.keys())
return list(s)

Check warning on line 311 in mars/dataframe/base/accessor.py

View check run for this annotation

Codecov / codecov/patch

mars/dataframe/base/accessor.py#L309-L311

Added lines #L309 - L311 were not covered by tests


class CachedAccessor:
def __init__(self, name: str, accessor) -> None:
self._name = name
Expand Down
86 changes: 34 additions & 52 deletions mars/dataframe/base/astype.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import pandas as pd
from pandas.api.types import CategoricalDtype

from ... import opcodes as OperandDef
from ... import opcodes
from ...core import recursive_tile
from ...serialization.serializables import AnyField, StringField, ListField
from ...serialization.serializables import AnyField, ListField, StringField
from ...tensor.base import sort
from ...utils import pd_release_version
from ..core import DATAFRAME_TYPE, SERIES_TYPE
Expand All @@ -29,42 +29,23 @@


class DataFrameAstype(DataFrameOperand, DataFrameOperandMixin):
_op_type_ = OperandDef.ASTYPE

_dtype_values = AnyField("dtype_values")
_errors = StringField("errors")
_category_cols = ListField("category_cols")

def __init__(
self,
dtype_values=None,
errors=None,
category_cols=None,
output_types=None,
**kw
):
super().__init__(
_dtype_values=dtype_values,
_errors=errors,
_category_cols=category_cols,
_output_types=output_types,
**kw
)
_op_type_ = opcodes.ASTYPE

@property
def dtype_values(self):
return self._dtype_values
dtype_values = AnyField("dtype_values", default=None)
errors = StringField("errors", default=None)
category_cols = ListField("category_cols", default=None)

@property
def errors(self):
return self._errors
def __init__(self, output_types=None, **kw):
super().__init__(_output_types=output_types, **kw)

@property
def category_cols(self):
return self._category_cols
@classmethod
def _is_categories_missing(cls, dtype):
return (isinstance(dtype, str) and dtype == "category") or (
isinstance(dtype, pd.CategoricalDtype) and dtype.categories is None
)

@classmethod
def _tile_one_chunk(cls, op):
def _tile_one_chunk(cls, op: "DataFrameAstype"):
c = op.inputs[0].chunks[0]
chunk_op = op.copy().reset_key()
chunk_params = op.outputs[0].params.copy()
Expand All @@ -80,23 +61,22 @@ def _tile_one_chunk(cls, op):
)

@classmethod
def _tile_series_index(cls, op):
def _tile_series_index(cls, op: "DataFrameAstype"):
in_series = op.inputs[0]
out = op.outputs[0]

unique_chunk = None
if op.dtype_values == "category" and isinstance(op.dtype_values, str):
unique_chunk = (yield from recursive_tile(sort(in_series.unique()))).chunks[
0
]
if cls._is_categories_missing(op.dtype_values):
unique = yield from recursive_tile(sort(in_series.unique()))
unique_chunk = unique.chunks[0]

chunks = []
for c in in_series.chunks:
chunk_op = op.copy().reset_key()
params = c.params.copy()
params["dtype"] = out.dtype
if unique_chunk is not None:
chunk_op._category_cols = [in_series.name]
chunk_op.category_cols = [in_series.name]
new_chunk = chunk_op.new_chunk([c, unique_chunk], **params)
else:
new_chunk = chunk_op.new_chunk([c], **params)
Expand All @@ -108,13 +88,13 @@ def _tile_series_index(cls, op):
)

@classmethod
def _tile_dataframe(cls, op):
def _tile_dataframe(cls, op: "DataFrameAstype"):
in_df = op.inputs[0]
out = op.outputs[0]
cum_nsplits = np.cumsum((0,) + in_df.nsplits[1])
out_chunks = []

if op.dtype_values == "category":
if cls._is_categories_missing(op.dtype_values):
# all columns need unique values
for c in in_df.chunks:
chunk_op = op.copy().reset_key()
Expand All @@ -123,21 +103,19 @@ def _tile_dataframe(cls, op):
cum_nsplits[c.index[1]] : cum_nsplits[c.index[1] + 1]
]
params["dtypes"] = dtypes
chunk_op._category_cols = list(c.columns_value.to_pandas())
chunk_op.category_cols = list(c.columns_value.to_pandas())
unique_chunks = []
for col in c.columns_value.to_pandas():
unique = yield from recursive_tile(sort(in_df[col].unique()))
unique_chunks.append(unique.chunks[0])
new_chunk = chunk_op.new_chunk([c] + unique_chunks, **params)
out_chunks.append(new_chunk)
elif (
isinstance(op.dtype_values, dict) and "category" in op.dtype_values.values()
elif isinstance(op.dtype_values, dict) and any(
cls._is_categories_missing(t) for t in op.dtype_values.values()
):
# some columns' types are category
category_cols = [
c
for c, v in op.dtype_values.items()
if isinstance(v, str) and v == "category"
c for c, v in op.dtype_values.items() if cls._is_categories_missing(v)
]
unique_chunks = dict()
for col in category_cols:
Expand All @@ -156,7 +134,7 @@ def _tile_dataframe(cls, op):
if col in category_cols:
chunk_category_cols.append(col)
chunk_unique_chunks.append(unique_chunks[col])
chunk_op._category_cols = chunk_category_cols
chunk_op.category_cols = chunk_category_cols
new_chunk = chunk_op.new_chunk([c] + chunk_unique_chunks, **params)
out_chunks.append(new_chunk)
else:
Expand All @@ -176,7 +154,7 @@ def _tile_dataframe(cls, op):
)

@classmethod
def tile(cls, op):
def tile(cls, op: "DataFrameAstype"):
if len(op.inputs[0].chunks) == 1:
return cls._tile_one_chunk(op)
elif isinstance(op.inputs[0], DATAFRAME_TYPE):
Expand All @@ -185,13 +163,14 @@ def tile(cls, op):
return (yield from cls._tile_series_index(op))

@classmethod
def execute(cls, ctx, op):
def execute(cls, ctx, op: "DataFrameAstype"):
in_data = ctx[op.inputs[0].key]
if not isinstance(op.dtype_values, dict):
if op.category_cols is not None:
uniques = [ctx[c.key] for c in op.inputs[1:]]
ordered = getattr(op.dtype_values, "ordered", None)
dtype = dict(
(col, CategoricalDtype(unique_values))
(col, CategoricalDtype(unique_values, ordered=ordered))
for col, unique_values in zip(op.category_cols, uniques)
)
ctx[op.outputs[0].key] = in_data.astype(dtype, errors=op.errors)
Expand All @@ -212,7 +191,10 @@ def execute(cls, ctx, op):
if op.category_cols is not None:
uniques = [ctx[c.key] for c in op.inputs[1:]]
for col, unique_values in zip(op.category_cols, uniques):
selected_dtype[col] = CategoricalDtype(unique_values)
ordered = getattr(selected_dtype[col], "ordered", None)
selected_dtype[col] = CategoricalDtype(
unique_values, ordered=ordered
)
ctx[op.outputs[0].key] = in_data.astype(selected_dtype, errors=op.errors)

def __call__(self, df):
Expand Down
Loading

0 comments on commit fd6a0f1

Please sign in to comment.