Skip to content

Commit

Permalink
SNOW-1458135 Implement DataFrame and Series initialization with lazy …
Browse files Browse the repository at this point in the history
…Index objects (#2137)

<!---
Please answer these questions before creating your pull request. Thanks!
--->

1. Which Jira issue is this PR addressing? Make sure that there is an
accompanying issue to your PR.

   <!---
   In this section, please add a Snowflake Jira issue number.
   
Note that if a corresponding GitHub issue exists, you should still
include
   the Snowflake Jira issue number. For example, for GitHub issue
#1400, you should
   add "SNOW-1335071" here.
    --->

   Fixes SNOW-1458135

2. Fill out the following pre-review checklist:

- [x] I am adding a new automated test(s) to verify correctness of my
new code
- [ ] If this test skips Local Testing mode, I'm requesting review from
@snowflakedb/local-testing
   - [ ] I am adding new logging messages
   - [ ] I am adding a new telemetry message
   - [ ] I am adding new credentials
   - [ ] I am adding a new dependency
- [ ] If this is a new feature/behavior, I'm adding the Local Testing
parity changes.

3. Please describe how your code solves the related issue.

- Implemented functionality to enable creating Series and DataFrame
objects with a lazy Index object as the `data`, `index`, and/or
`columns`.
- This also covers creating Series and DataFrames with rows/columns that
don't exist in the given `data`.
- A special case is when the `data` is a Series or DataFrame object, the
new Series or DataFrame object is creating by filtering the `data` with
provided `index` and `columns`.
- In case some values in `index` don't exist in `data`'s index, these
values are added as new rows and their corresponding data values are
`NaN`.
- In case some values in `columns` don't exist in `data`'s columns,
these values are added as new `NaN` columns.
- I use a right outer join to add the new index values, and create and
append the new `NaN` columns in the logic.
  • Loading branch information
sfc-gh-vbudati authored Sep 25, 2024
1 parent 47ff444 commit 6ddffdf
Show file tree
Hide file tree
Showing 36 changed files with 2,392 additions and 521 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
- Added support for using Snowflake Interval constants with `Window.range_between()` when the order by column is TIMESTAMP or DATE type.
- Added support for file writes. This feature is currently in private preview.
- Added support for `DataFrameGroupBy.fillna` and `SeriesGroupBy.fillna`.
- Added support for constructing `Series` and `DataFrame` objects with the lazy `Index` object as `data`, `index`, and `columns` arguments.
- Added support for constructing `Series` and `DataFrame` objects with `index` and `column` values not present in `DataFrame`/`Series` `data`.

#### Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def get_valid_index_values(
-------
Optional[Row]: The desired index (a Snowpark Row) if it exists, else None.
"""
frame = frame.ensure_row_position_column()
index_quoted_identifier = frame.index_column_snowflake_quoted_identifiers
data_quoted_identifier = frame.data_column_snowflake_quoted_identifiers
row_position_quoted_identifier = frame.row_position_snowflake_quoted_identifier
Expand Down
162 changes: 161 additions & 1 deletion src/snowflake/snowpark/modin/plugin/_internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import modin.pandas as pd
import numpy as np
import pandas as native_pd
from pandas._typing import Scalar
from pandas._typing import AnyArrayLike, Scalar
from pandas.core.dtypes.base import ExtensionDtype
from pandas.core.dtypes.common import is_integer_dtype, is_object_dtype, is_scalar
from pandas.core.dtypes.inference import is_list_like

import snowflake.snowpark.modin.plugin._internal.statement_params_constants as STATEMENT_PARAMS
from snowflake.snowpark._internal.analyzer.analyzer_utils import (
Expand Down Expand Up @@ -84,6 +86,9 @@

if TYPE_CHECKING:
from snowflake.snowpark.modin.plugin._internal import frame
from snowflake.snowpark.modin.plugin.compiler.snowflake_query_compiler import (
SnowflakeQueryCompiler,
)

ROW_POSITION_COLUMN_LABEL = "row_position"
MAX_ROW_POSITION_COLUMN_LABEL = f"MAX_{ROW_POSITION_COLUMN_LABEL}"
Expand Down Expand Up @@ -2007,3 +2012,158 @@ def create_frame_with_data_columns(
def rindex(lst: list, value: int) -> int:
"""Find the last index in the list of item value."""
return len(lst) - lst[::-1].index(value) - 1


def error_checking_for_init(
index: Any, dtype: Union[str, np.dtype, ExtensionDtype]
) -> None:
"""
Common error messages for the Series and DataFrame constructors.
Parameters
----------
index: Any
The index to check.
dtype: str, numpy.dtype, or ExtensionDtype
The dtype to check.
"""
from modin.pandas import DataFrame

if isinstance(index, DataFrame): # pandas raises the same error
raise ValueError("Index data must be 1-dimensional")

if dtype == "category":
raise NotImplementedError("pandas type category is not implemented")


def assert_fields_are_none(
class_name: str, data: Any, index: Any, dtype: Any, columns: Any = None
) -> None:
assert (
data is None
), f"Invalid {class_name} construction! The `data` parameter is not supported when `query_compiler` is given."
assert (
index is None
), f"Invalid {class_name} construction! The `index` parameter is not supported when `query_compiler` is given."
assert (
dtype is None
), f"Invalid {class_name} construction! The `dtype` parameter is not supported when `query_compiler` is given."
assert (
columns is None
), f"Invalid {class_name} construction! The `columns` parameter is not supported when `query_compiler` is given."


def convert_index_to_qc(index: Any) -> "SnowflakeQueryCompiler":
"""
Method to convert an object representing an index into a query compiler for set_index or reindex.
Parameters
----------
index: Any
The object to convert to a query compiler.
Returns
-------
SnowflakeQueryCompiler
The converted query compiler.
"""
from modin.pandas import Series

from snowflake.snowpark.modin.plugin.extensions.index import Index

if isinstance(index, Index):
idx_qc = index.to_series()._query_compiler
elif isinstance(index, Series):
# The name of the index comes from the Series' name, not the index name. `reindex` does not handle this,
# so we need to set the name of the index to the name of the Series.
index.index.name = index.name
idx_qc = index._query_compiler
else:
idx_qc = Series(index)._query_compiler
return idx_qc


def convert_index_to_list_of_qcs(index: Any) -> list:
"""
Method to convert an object representing an index into a list of query compilers for set_index.
Parameters
----------
index: Any
The object to convert to a list of query compilers.
Returns
-------
list
The list of query compilers.
"""
from modin.pandas import Series

from snowflake.snowpark.modin.plugin.extensions.index import Index

if (
not isinstance(index, (native_pd.MultiIndex, Series, Index))
and is_list_like(index)
and len(index) > 0
and all((is_list_like(i) and not isinstance(i, tuple)) for i in index)
):
# If given a list of lists, convert it to a MultiIndex.
index = native_pd.MultiIndex.from_arrays(index)
if isinstance(index, native_pd.MultiIndex):
index_qc_list = [
s._query_compiler
for s in [
Series(index.get_level_values(level)) for level in range(index.nlevels)
]
]
else:
index_qc_list = [convert_index_to_qc(index)]
return index_qc_list


def add_extra_columns_and_select_required_columns(
query_compiler: "SnowflakeQueryCompiler",
columns: Union[AnyArrayLike, list],
) -> "SnowflakeQueryCompiler":
"""
Method to add extra columns to and select the required columns from the provided query compiler.
This is used in DataFrame construction in the following cases:
- general case when data is a DataFrame
- data is a named Series, and this name is in `columns`
Parameters
----------
query_compiler: Any
The query compiler to select columns from, i.e., data's query compiler.
columns: AnyArrayLike or list
The columns to select from the query compiler.
"""
from modin.pandas import DataFrame

data_columns = query_compiler.get_columns().to_list()
# The `columns` parameter is used to select the columns from `data` that will be in the resultant DataFrame.
# If a value in `columns` is not present in data's columns, it will be added as a new column filled with NaN values.
# These columns are tracked by the `extra_columns` variable.
if data_columns is not None and columns is not None:
extra_columns = [col for col in columns if col not in data_columns]
if extra_columns is not []:
# To add these new columns to the DataFrame, perform `__setitem__` only with the extra columns
# and set them to None.
extra_columns_df = DataFrame(query_compiler=query_compiler)
# In the case that the columns are MultiIndex but not all extra columns are tuples, we need to flatten the
# columns to ensure that the columns are a single-level index. If not, `__setitem__` will raise an error
# when trying to add new columns that are not in the expected tuple format.
if not all(isinstance(col, tuple) for col in extra_columns) and isinstance(
query_compiler.get_columns(), native_pd.MultiIndex
):
flattened_columns = extra_columns_df.columns.to_flat_index()
extra_columns_df.columns = flattened_columns
extra_columns_df[extra_columns] = None
query_compiler = extra_columns_df._query_compiler

# To select the columns for the resultant DataFrame, perform `take_2d_labels` on the created query compiler.
# This is the equivalent of `__getitem__` for a DataFrame.
# This step is performed to ensure that the right columns are picked from the InternalFrame since we never
# explicitly drop the unwanted columns. This also ensures that the columns in the resultant DataFrame are in the
# same order as the columns in the `columns` parameter.
return query_compiler.take_2d_labels(slice(None), columns)
Original file line number Diff line number Diff line change
Expand Up @@ -2337,7 +2337,7 @@ def any(
def reindex(
self,
axis: int,
labels: Union[pandas.Index, "pd.Index", list[Any]],
labels: Union[pandas.Index, "pd.Index", list[Any], "SnowflakeQueryCompiler"],
**kwargs: dict[str, Any],
) -> "SnowflakeQueryCompiler":
"""
Expand All @@ -2347,7 +2347,7 @@ def reindex(
----------
axis : {0, 1}
Axis to align labels along. 0 is for index, 1 is for columns.
labels : list-like
labels : list-like, SnowflakeQueryCompiler
Index-labels to align with.
method : {None, "backfill"/"bfill", "pad"/"ffill", "nearest"}
Method to use for filling holes in reindexed frame.
Expand Down Expand Up @@ -2545,15 +2545,15 @@ def _add_columns_for_monotonicity_checks(

def _reindex_axis_0(
self,
labels: Union[pandas.Index, "pd.Index", list[Any]],
labels: Union[pandas.Index, "pd.Index", list[Any], "SnowflakeQueryCompiler"],
**kwargs: dict[str, Any],
) -> "SnowflakeQueryCompiler":
"""
Align QueryCompiler data with a new index.

Parameters
----------
labels : list-like
labels : list-like, SnowflakeQueryCompiler
Index-labels to align with.
method : {None, "backfill"/"bfill", "pad"/"ffill", "nearest"}
Method to use for filling holes in reindexed frame.
Expand All @@ -2571,12 +2571,15 @@ def _reindex_axis_0(
"""
self._raise_not_implemented_error_for_timedelta()

if isinstance(labels, native_pd.Index):
labels = pd.Index(labels)
if isinstance(labels, pd.Index):
new_index_qc = labels.to_series()._query_compiler
if isinstance(labels, SnowflakeQueryCompiler):
new_index_qc = labels
else:
new_index_qc = pd.Series(labels)._query_compiler
if isinstance(labels, native_pd.Index):
labels = pd.Index(labels)
if isinstance(labels, pd.Index):
new_index_qc = labels.to_series()._query_compiler
else:
new_index_qc = pd.Series(labels)._query_compiler

new_index_modin_frame = new_index_qc._modin_frame
modin_frame = self._modin_frame
Expand Down Expand Up @@ -6515,7 +6518,7 @@ def insert(
# 'loc'
def move_last_element(arr: list, index: int) -> None:
if replace:
# swap element at loc with new colun at end, then drop last element
# swap element at loc with new column at end, then drop last element
arr[index], arr[-1] = arr[-1], arr[index]
arr.pop()
else:
Expand Down
15 changes: 6 additions & 9 deletions src/snowflake/snowpark/modin/plugin/docstrings/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,13 @@ class DataFrame(BasePandasDataset):
Notes
-----
``DataFrame`` can be created either from passed `data` or `query_compiler`. If both
parameters are provided, data source will be prioritized in the next order:
parameters are provided, an assertion error will be raised. `query_compiler` can only
be specified when the `data`, `index`, and `columns` are None.
1) Modin ``DataFrame`` or ``Series`` passed with `data` parameter.
2) Query compiler from the `query_compiler` parameter.
3) Various pandas/NumPy/Python data structures passed with `data` parameter.
The last option is less desirable since import of such data structures is very
inefficient, please use previously created Modin structures from the fist two
options or import data using highly efficient Modin IO tools (for example
``pd.read_csv``).
Using pandas/NumPy/Python data structures as the `data` parameter is less desirable since
importing such data structures is very inefficient.
Please use previously created Modin structures or import data using highly efficient Modin IO
tools (for example ``pd.read_csv``).
Examples
--------
Expand Down
Loading

0 comments on commit 6ddffdf

Please sign in to comment.