From b76912fecbf1c3bf6f667cd1146dcde03ef930c8 Mon Sep 17 00:00:00 2001 From: Mahesh Vashishtha Date: Tue, 3 Dec 2024 22:03:21 -0800 Subject: [PATCH] SNOW-1805836: Implement the dataframe interchange protocol. (#2683) Fixes SNOW-1805836 --------- Signed-off-by: sfc-gh-mvashishtha Co-authored-by: Rehan Durrani --- CHANGELOG.md | 2 + .../modin/supported/general_supported.rst | 3 + .../compiler/snowflake_query_compiler.py | 9 +- .../modin/plugin/docstrings/dataframe.py | 12 +- .../plugin/extensions/dataframe_overrides.py | 38 +- .../modin/frame/test_interchange_protocol.py | 628 ++++++++++++++++++ tests/integ/modin/test_scikit.py | 36 +- tests/integ/modin/test_telemetry.py | 25 +- tests/unit/modin/test_unsupported.py | 1 - 9 files changed, 674 insertions(+), 80 deletions(-) create mode 100644 tests/integ/modin/frame/test_interchange_protocol.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 26f746b38bf..14b9f0072aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,6 +68,8 @@ - Added support for `GroupBy.pct_change` with `axis=0`, `freq=None`, and `limit=None`. - Added support for `DataFrameGroupBy.__iter__` and `SeriesGroupBy.__iter__`. - Added support for `np.sqrt`, `np.trunc`, `np.floor`, numpy trig functions, `np.exp`, `np.abs`, `np.positive` and `np.negative`. +- Added partial support for the dataframe interchange protocol method + `DataFrame.__dataframe__()`. #### Dependency Updates diff --git a/docs/source/modin/supported/general_supported.rst b/docs/source/modin/supported/general_supported.rst index 5c97e72476d..8c7ce5120af 100644 --- a/docs/source/modin/supported/general_supported.rst +++ b/docs/source/modin/supported/general_supported.rst @@ -25,6 +25,9 @@ Data manipulations +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ | ``cut`` | P | ``retbins``, ``labels`` | ``N`` if ``retbins=True``or ``labels!=False`` | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ +| ``__dataframe__`` | P | | ``N`` for columns of type ``Timedelta`` and columns| +| | | | containing list objects | ++-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ | ``factorize`` | N | | | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ | ``from_dummies`` | N | | | diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index b454ae1c3a7..40763fe7b3e 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -23,6 +23,7 @@ import pandas as native_pd import pandas.core.resample import pandas.io.parsers +from pandas.core.interchange.dataframe_protocol import DataFrame as InterchangeDataframe import pandas.io.parsers.readers import pytz # type: ignore from modin.core.storage_formats import BaseQueryCompiler # type: ignore @@ -817,8 +818,12 @@ def from_pandas( def from_arrow(cls, at: Any, *args: Any, **kwargs: Any) -> "SnowflakeQueryCompiler": return cls(at.to_pandas()) - def to_dataframe(self, nan_as_null: bool = False, allow_copy: bool = True) -> None: - pass + def to_dataframe( + self, nan_as_null: bool = False, allow_copy: bool = True + ) -> InterchangeDataframe: + return self.to_pandas().__dataframe__( + nan_as_null=nan_as_null, allow_copy=allow_copy + ) @classmethod def from_dataframe(cls, df: native_pd.DataFrame, data_cls: Any) -> None: diff --git a/src/snowflake/snowpark/modin/plugin/docstrings/dataframe.py b/src/snowflake/snowpark/modin/plugin/docstrings/dataframe.py index 52be038d783..f9b798020ec 100644 --- a/src/snowflake/snowpark/modin/plugin/docstrings/dataframe.py +++ b/src/snowflake/snowpark/modin/plugin/docstrings/dataframe.py @@ -5099,10 +5099,20 @@ def __delitem__(): def __dataframe__(): """ - Get a Modin DataFrame that implements the dataframe exchange protocol. + Get an object that implements the dataframe interchange protocol for this dataframe. See more about the protocol in https://data-apis.org/dataframe-protocol/latest/index.html. + Like `DataFrame.to_pandas`, this method this methods triggers a query + evaluation and pulls data to the local machine. + + If this dataframe has columns of `Timedelta` type or columns containing + list objects, the interchange dataframe that this method returns will + raise `NotImplementedError` if you try to check those columns' + datatypes, to e.g. convert the interchange dataframe to pandas with + `pandas.api.interchange.from_dataframe`. This limitation comes from + pandas itself. + Parameters ---------- nan_as_null : bool, default: False diff --git a/src/snowflake/snowpark/modin/plugin/extensions/dataframe_overrides.py b/src/snowflake/snowpark/modin/plugin/extensions/dataframe_overrides.py index 956e03fadb3..7ba3eeabd26 100644 --- a/src/snowflake/snowpark/modin/plugin/extensions/dataframe_overrides.py +++ b/src/snowflake/snowpark/modin/plugin/extensions/dataframe_overrides.py @@ -31,6 +31,7 @@ import numpy as np import pandas as native_pd from modin.pandas import DataFrame, Series +from pandas.core.interchange.dataframe_protocol import DataFrame as InterchangeDataframe from modin.pandas.api.extensions import register_dataframe_accessor from modin.pandas.base import BasePandasDataset from modin.pandas.io import from_pandas @@ -729,40 +730,9 @@ def _df_init_list_data_with_snowpark_pandas_values( @register_dataframe_accessor("__dataframe__") -def __dataframe__(self, nan_as_null: bool = False, allow_copy: bool = True): - """ - Get a Modin DataFrame that implements the dataframe exchange protocol. - - See more about the protocol in https://data-apis.org/dataframe-protocol/latest/index.html. - - Parameters - ---------- - nan_as_null : bool, default: False - A keyword intended for the consumer to tell the producer - to overwrite null values in the data with ``NaN`` (or ``NaT``). - This currently has no effect; once support for nullable extension - dtypes is added, this value should be propagated to columns. - allow_copy : bool, default: True - A keyword that defines whether or not the library is allowed - to make a copy of the data. For example, copying data would be necessary - if a library supports strided buffers, given that this protocol - specifies contiguous buffers. Currently, if the flag is set to ``False`` - and a copy is needed, a ``RuntimeError`` will be raised. - - Returns - ------- - ProtocolDataframe - A dataframe object following the dataframe protocol specification. - """ - # TODO: SNOW-1063346: Modin upgrade - modin.pandas.DataFrame functions - ErrorMessage.not_implemented( - "Snowpark pandas does not support the DataFrame interchange " - + "protocol method `__dataframe__`. To use Snowpark pandas " - + "DataFrames with third-party libraries that try to call the " - + "`__dataframe__` method, please convert this Snowpark pandas " - + "DataFrame to pandas with `to_pandas()`." - ) - +def __dataframe__( + self, nan_as_null: bool = False, allow_copy: bool = True +) -> InterchangeDataframe: return self._query_compiler.to_dataframe( nan_as_null=nan_as_null, allow_copy=allow_copy ) diff --git a/tests/integ/modin/frame/test_interchange_protocol.py b/tests/integ/modin/frame/test_interchange_protocol.py new file mode 100644 index 00000000000..4d5077c0f88 --- /dev/null +++ b/tests/integ/modin/frame/test_interchange_protocol.py @@ -0,0 +1,628 @@ +# +# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. +# + +import modin.pandas as pd +import pandas as native_pd +import pytest + +import snowflake.snowpark.modin.plugin # noqa: F401 +from tests.integ.modin.utils import ( + eval_snowpark_pandas_result, + create_test_dfs, + assert_dicts_equal, +) +import re +from tests.integ.utils.sql_counter import sql_count_checker +import pandas.testing as tm +from pandas.core.interchange.from_dataframe import ( + categorical_column_to_series, + string_column_to_ndarray, + primitive_column_to_ndarray, + datetime_column_to_ndarray, +) +from pytest import param +from enum import Enum, auto, unique +from pandas.api.interchange import from_dataframe +from pandas.core.interchange.dataframe_protocol import ( + DtypeKind, + Column as InterchangeColumn, +) # Import abstract definitions of the DataFrame and Column abstractions in the interchange protocol. +import numpy as np +from pandas.core.interchange.dataframe_protocol import DlpackDeviceType +import pandas._testing as pandas_internal_testing +from tests.integ.utils.sql_counter import SqlCounter + +""" +To understand the tests in this file, it helps to reference the interchange +protocol, available in code [here](https://github.com/data-apis/dataframe-api/blob/7e76c386b0d9fda417eac13fbf08a4f73ef3437f/protocol/dataframe_protocol.py) +and in HTML [here](https://data-apis.org/dataframe-protocol/latest/API.html). + +Paraphrasing the overview of concepts from the specifications there: + +- A Buffer is a contiguous block of memory. It maps to a 1-D array and can be + converted to NumPy, CuPy, etc. +- A Column has a single dtype. It can consist of multiple chunks. A single + chunk of a Column (which may be the whole column if column.num_chunks() == 1) + is modeled as again a Column instance. It contains 1 data buffer and + (optionally) one mask for missing data. +- A DataFrame is an ordered collection of columns, which are identified with + names that are unique strings. All the DataFrame's rows are the same length. + It can consist of multiple chunks. A single chunk of a data frame is modeled + as again a DataFrame instance. +""" + + +def single_chunk_column_to_pandas(column: InterchangeColumn) -> native_pd.Series: + """ + Convert a single-chunk interchange protocol column to a native pandas Series. + + This method is nearly identical to + native_pd.core.interchange.from_dataframe.protocol_df_chunk_to_pandas. The + difference is that it takes as input a Column instead of a DataFrame object. + We need to implement a method like this ourselves, because pandas does not + provide a method to convert an interchange column to pandas. + + This method uses pandas methods like `primitive_column_to_ndarray`, which + use the `data`, `validity`, and `offsets` attributes of the `Buffer` + objects to fetch data and convert it to pandas. + + Parameters + ---------- + column : InterchangeColumn + The input column. + + Returns + ------- + native_pd.Series + The column as a pandas Series. + """ + # The implementation here mostly follows + # native_pd.core.interchange.from_dataframe.protocol_df_chunk_to_pandas + dtype = column.dtype[0] + if dtype in ( + DtypeKind.INT, + DtypeKind.UINT, + DtypeKind.FLOAT, + DtypeKind.BOOL, + ): + data, buf = primitive_column_to_ndarray(column) + elif dtype is DtypeKind.CATEGORICAL: + data, buf = categorical_column_to_series(column) + elif dtype is DtypeKind.STRING: + data, buf = string_column_to_ndarray(column) + elif dtype is DtypeKind.DATETIME: + data, buf = datetime_column_to_ndarray(column) + else: + raise NotImplementedError(f"Data type {dtype} not handled yet") + result = native_pd.Series(data) + # We need to keep a pointer to `buf` to keep the memory that `result` + # points to alive. We copy pandas's solution of storing the pointer to + # `buf` in `attrs`. + result.attrs["_INTERCHANGE_PROTOCOL_BUFFERS"] = buf + return result + + +def column_to_pandas_series(column: InterchangeColumn) -> native_pd.Series: + """ + Convert a column with one or more chunks to a native pandas Series. + + Parameters + ---------- + column : InterchangeColumn + The input column. + + Returns + ------- + native_pd.Series + The column as a pandas Series. + """ + return native_pd.concat( + (single_chunk_column_to_pandas(chunk) for chunk in column.get_chunks()), + axis=0, + ignore_index=True, + copy=False, + ) + + +@unique +class TestingDf(Enum): + """ + TestingDf represents an input dataframe for a test case. + + Dataframes are mutable, so we should return them from text fixtures instead + of defining a list of dataframes that test cases can mutate. We give the + dataframes labels in this enum, but create a new dataframe as required for + each test case in the `pandas_df` fixture. + """ + + MIXED_TYPE = auto() + EMPTY_NO_INDEX_OR_COLUMNS = auto() + EMPTY_WITH_INDEX_BUT_NO_COLUMNS = auto() + EMPTY_WITH_COLUMNS_BUT_NO_INDEX = auto() + + +@pytest.fixture +def pandas_df(request) -> native_pd.DataFrame: + """ + Create a test dataframe matching the input value of TestingDf. + """ + assert isinstance(request.param, TestingDf) + if request.param is TestingDf.MIXED_TYPE: + result = native_pd.DataFrame( + { + "int_col": [0, None, 2], + 0: [3, 4, 5], + "string_col": ["string_0", "string_1", None], + "datetime_col": [pd.Timestamp(0), pd.Timestamp(1), None], + pd.Timestamp(0): [pd.Timestamp(2), pd.Timestamp(3), pd.Timestamp(4)], + } + ) + assert len(result.columns) == MIXED_TYPE_DF_WIDTH + return result + if request.param is TestingDf.EMPTY_NO_INDEX_OR_COLUMNS: + return native_pd.DataFrame() + if request.param is TestingDf.EMPTY_WITH_INDEX_BUT_NO_COLUMNS: + return native_pd.DataFrame(index=["a", "b"]) + if request.param is TestingDf.EMPTY_WITH_COLUMNS_BUT_NO_INDEX: + return native_pd.DataFrame(columns=["a", "b"]) + + raise KeyError(f"Missing pandas test dataframe for {request.param}") + + +MIXED_TYPE_DF_WIDTH = 5 + + +class TestDataFrame: + """ + Test the API of the DataFrame class that we return from modin.pandas.DataFrame.__dataframe__() + + See the class specification here: + https://github.com/data-apis/dataframe-api/blob/7e76c386b0d9fda417eac13fbf08a4f73ef3437f/protocol/dataframe_protocol.py#L362 + """ + + @sql_count_checker(query_count=1) + @pytest.mark.parametrize("pandas_df", TestingDf, indirect=True) + def test_metadata(self, pandas_df): + # `metadata` holds the pandas index, i.e. row labels, which are not + # part of the interchange protocol. pandas and Snowpark pandas need to + # store the row labels as a separate field in the interchange object + # so that from_dataframe() can recover the row labels from that field. + eval_snowpark_pandas_result( + *create_test_dfs(pandas_df), + lambda df: df.__dataframe__().metadata, + comparator=assert_dicts_equal, + ) + + @sql_count_checker(query_count=1) + @pytest.mark.parametrize("pandas_df", TestingDf, indirect=True) + def test_num_columns(self, pandas_df): + eval_snowpark_pandas_result( + *create_test_dfs(pandas_df), + lambda df: df.__dataframe__().num_columns(), + comparator=int.__eq__, + ) + + @sql_count_checker(query_count=1) + @pytest.mark.parametrize("pandas_df", TestingDf, indirect=True) + def test_num_rows(self, pandas_df): + eval_snowpark_pandas_result( + *create_test_dfs(pandas_df), + lambda df: df.__dataframe__().num_rows(), + comparator=int.__eq__, + ) + + @sql_count_checker(query_count=1) + @pytest.mark.parametrize("pandas_df", TestingDf, indirect=True) + def test_num_chunks(self, pandas_df): + """ + Test that num_chunks() is a non-negative int equal to the length of get_chunks(). + + The interchange protocol doesn't require any particular number of + chunks, but num_chunks() should equal the number of chunks that + the iterator returned by get_chunks() returns. + """ + interchange = pd.DataFrame(pandas_df).__dataframe__() + num_chunks = interchange.num_chunks() + assert isinstance(num_chunks, int) + assert num_chunks >= 0 + assert num_chunks == len(list(interchange.get_chunks())) + + @sql_count_checker(query_count=1) + @pytest.mark.parametrize("pandas_df", TestingDf, indirect=True) + def test_column_names(self, pandas_df): + eval_snowpark_pandas_result( + *create_test_dfs(pandas_df), + lambda df: df.__dataframe__().column_names(), + comparator=tm.assert_index_equal, + ) + + @sql_count_checker(query_count=1) + @pytest.mark.parametrize( + "get_column_by_name", [True, False], ids=["by_name", "by_id"] + ) + @pytest.mark.parametrize("pandas_df", [TestingDf.MIXED_TYPE], indirect=True) + @pytest.mark.parametrize("i", range(MIXED_TYPE_DF_WIDTH)) + def test_get_column(self, pandas_df, get_column_by_name, i): + def column_getter(interchange_df): + return ( + interchange_df.get_column_by_name(str(pandas_df.columns[i])) + if get_column_by_name + else interchange_df.get_column(i) + ) + + with SqlCounter(query_count=1): + eval_snowpark_pandas_result( + *create_test_dfs(pandas_df), + lambda df: single_chunk_column_to_pandas( + column_getter(df.__dataframe__()) + ), + comparator=tm.assert_series_equal, + ) + + @sql_count_checker(query_count=1) + @pytest.mark.parametrize("pandas_df", TestingDf, indirect=True) + def test_get_columns(self, pandas_df): + def eval_columns_iterable(snow_result, pandas_result): + snow_list = list(snow_result) + pandas_list = list(pandas_result) + assert len(snow_list) == len( + pandas_list + ), "Snowflake columns and pandas columns have different lengths" + for i, (snow_column, pandas_column) in enumerate( + zip(snow_list, pandas_list) + ): + try: + tm.assert_series_equal( + single_chunk_column_to_pandas(snow_column), + single_chunk_column_to_pandas(pandas_column), + ) + except AssertionError as error: + raise AssertionError( + f"Columns at position {i} were not equal" + ) from error + + eval_snowpark_pandas_result( + *create_test_dfs(pandas_df), + lambda df: df.__dataframe__().get_columns(), + comparator=eval_columns_iterable, + ) + + @sql_count_checker(query_count=1) + @pytest.mark.parametrize( + "indices", + [ + (0,), + [], + (0, 1, 2, 3, 4), + [4, 0], + ], + ) + @pytest.mark.parametrize("pandas_df", [TestingDf.MIXED_TYPE], indirect=True) + @pytest.mark.parametrize( + "select_columns_by_name", [True, False], ids=["by_name", "by_id"] + ) + @sql_count_checker(query_count=1) + def test_select_columns(self, pandas_df, indices, select_columns_by_name): + def selector(interchange_df): + return ( + interchange_df.select_columns_by_name( + list(pandas_df.columns[list(indices)].map(str)) + ) + if select_columns_by_name + else interchange_df.select_columns(indices) + ) + + eval_snowpark_pandas_result( + *create_test_dfs(pandas_df), + # select_columns() and select_columns_by_name() return another + # interchange DataFrame object. Check that converting df to + # native pandas via the interchange protocol gives the same result + # whether `df` is a native pandas dataframe or a Snowpark pandas + # dataframe. + lambda df: from_dataframe(selector(df.__dataframe__())), + comparator=tm.assert_frame_equal, + ) + + @pytest.mark.parametrize("pandas_df", TestingDf, indirect=True) + @sql_count_checker(query_count=1) + def test_get_chunks(self, pandas_df): + # Build a pandas dataframe by converting each chunk of the dataframe + # to pandas and concatenating the results. + pandas_dfs = [] + for chunk in pd.DataFrame(pandas_df).__dataframe__().get_chunks(): + pandas_dfs.append(from_dataframe(chunk)) + if len(pandas_dfs) == 1: + chunks_result = pandas_dfs[0] + else: + chunks_result = native_pd.concat( + pandas_dfs, axis=0, ignore_index=True, copy=False + ) + # Check that the resulting pandas dataframe is equal to what we would + # get by converting the pandas dataframe to an interchange dataframe + # and then converting it back to pandas. + tm.assert_frame_equal(chunks_result, from_dataframe(pandas_df.__dataframe__())) + + @pytest.mark.parametrize("pandas_df", TestingDf, indirect=True) + @pytest.mark.parametrize("nan_as_null", [True, False]) + @sql_count_checker(query_count=1) + def test_nan_as_null(self, pandas_df, nan_as_null): + """ + Test the nan_as_null parameter to the __dataframe__ method. + + The interchange protocol has deprecated this parameter, which is no + longer supposed to have any effect. Just check that we match pandas + in a trip to the interchange protocol and then to pandas. + """ + eval_snowpark_pandas_result( + *create_test_dfs(pandas_df), + lambda df: from_dataframe(df.__dataframe__(nan_as_null=nan_as_null)), + comparator=tm.assert_frame_equal, + ) + + @pytest.mark.parametrize("pandas_df", [TestingDf.MIXED_TYPE], indirect=True) + def test_allow_copy_false(self, pandas_df): + """ + Test the allow_copy parameter to the __dataframe__ method. + + Snowpark pandas should never copy the data that each interchange object + stores because it creates a new pandas dataframe object for each + __dataframe__() call. It's difficult to test that we would never make a + copy if we have `allow_copy=False`, so we check that if we call + `__dataframe__()` twice on the same dataframe, we get pointers to 2 + different locations in memory. + """ + modin_df = pd.DataFrame(pandas_df) + with SqlCounter(query_count=1): + interchange1 = modin_df.__dataframe__(allow_copy=False) + with SqlCounter(query_count=1): + interchange2 = modin_df.__dataframe__(allow_copy=False) + + def get_buffer_memory_pointer(interchange_dataframe): + column = interchange_dataframe.get_column(0) + chunks = list(column.get_chunks()) + assert len(chunks) == 1 + buffer, _ = column.get_buffers()["data"] + return buffer.ptr + + assert get_buffer_memory_pointer(interchange1) != get_buffer_memory_pointer( + interchange2 + ) + + +@pytest.mark.parametrize("pandas_df", [TestingDf.MIXED_TYPE], indirect=True) +class TestColumn: + """ + Test the API of the interchange column class. + + See the class specification here: https://github.com/data-apis/dataframe-api/blob/7e76c386b0d9fda417eac13fbf08a4f73ef3437f/protocol/dataframe_protocol.py#L172 + """ + + @sql_count_checker(query_count=1) + def test_size(self, pandas_df): + eval_snowpark_pandas_result( + *create_test_dfs(pandas_df), + lambda df: df.__dataframe__().get_column(0).size(), + comparator=int.__eq__, + ) + + @sql_count_checker(query_count=1) + def test_offset(self, pandas_df): + """ + Test that the column offset is a positive integer. + + We don't need the offset to be any value in particular, as long as we + can read the data we expect at that offset using + column_to_pandas_series(). We test our ability to read the column in + TestDataFrame.test_get_column, among other test cases. + """ + offset = pd.DataFrame(pandas_df).__dataframe__().get_column(0).offset + assert isinstance(offset, int) + assert offset >= 0 + + @sql_count_checker(query_count=1) + @pytest.mark.parametrize("i", range(MIXED_TYPE_DF_WIDTH)) + def test_dtype(self, pandas_df, i): + eval_snowpark_pandas_result( + *create_test_dfs(pandas_df), + lambda df: df.__dataframe__().get_column(i).dtype, + comparator=tuple.__eq__, + ) + + @sql_count_checker(query_count=1) + @pytest.mark.parametrize("i", range(MIXED_TYPE_DF_WIDTH)) + def test_describe_null(self, pandas_df, i): + eval_snowpark_pandas_result( + *create_test_dfs(pandas_df), + lambda df: df.__dataframe__().get_column(i).describe_null, + comparator=tuple.__eq__, + ) + + @sql_count_checker(query_count=1) + @pytest.mark.parametrize("i", range(MIXED_TYPE_DF_WIDTH)) + def test_null_count(self, pandas_df, i): + eval_snowpark_pandas_result( + *create_test_dfs(pandas_df), + lambda df: df.__dataframe__().get_column(i).null_count, + comparator=int.__eq__, + ) + + @sql_count_checker(query_count=1) + @pytest.mark.parametrize("i", range(MIXED_TYPE_DF_WIDTH)) + def test_metadata(self, pandas_df, i): + eval_snowpark_pandas_result( + *create_test_dfs(pandas_df), + lambda df: df.__dataframe__().get_column(i).metadata, + comparator=assert_dicts_equal, + ) + + @sql_count_checker(query_count=1) + @pytest.mark.parametrize("i", range(MIXED_TYPE_DF_WIDTH)) + def test_num_chunks(self, pandas_df, i): + """ + Test that num_chunks() is a positive int that is equal to the length of get_chunks(). + + We don't need the column to have any particular number of chunks, as + long as converting the column to pandas with column_to_pandas_series() + gives the data we expect. We test our ability to read the column in + TestDataFrame.test_get_column, among other test cases. + """ + column = pd.DataFrame(pandas_df).__dataframe__().get_column(i) + num_chunks = column.num_chunks() + assert isinstance(num_chunks, int) + assert num_chunks >= 0 + assert num_chunks == len(list(column.get_chunks())) + + +@pytest.mark.parametrize("pandas_df", [TestingDf.MIXED_TYPE], indirect=True) +class TestBuffer: + """ + Test the API of the interchange Buffer class. + + See the class specification here: + https://github.com/data-apis/dataframe-api/blob/7e76c386b0d9fda417eac13fbf08a4f73ef3437f/protocol/dataframe_protocol.py#L116 + """ + + @sql_count_checker(query_count=1) + def test_bufsize(self, pandas_df): + """ + Check that bufsize is a positive integer. + + In other test cases, we use `single_chunk_column_to_pandas` to test + that that we can use `bufsize`, along with other attributes of each + buffer, to read the data that we expect. + """ + bufsize = ( + pd.DataFrame(pandas_df) + .__dataframe__() + .get_column(0) + .get_buffers()["data"][0] + .bufsize + ) + assert isinstance(bufsize, int) + assert bufsize >= 0 + + @sql_count_checker(query_count=1) + def test_ptr(self, pandas_df): + """ + Check that ptr is a positive integer. + + In other test cases, we use `single_chunk_column_to_pandas` to test + that that we can use `ptr`, along with other attributes of each buffer, + to read the data that we expect. + """ + ptr = ( + pd.DataFrame(pandas_df) + .__dataframe__() + .get_column(0) + .get_buffers()["data"][0] + .ptr + ) + assert isinstance(ptr, int) + assert ptr >= 0 + + @sql_count_checker(query_count=1) + def test___dlpack__(self, pandas_df): + """ + Test that the buffer implements the Python array API standard's __dlpack__ method. + + We test that we can convert a chunk of interchange dataframe data to + numpy with np.from_dlpack and get a numpy array that matches pandas. + + See the __dlpack__ specification here: + https://github.com/data-apis/array-api/blob/6d205d72dde3db8fc8668ad6aef5d003cc8ef80f/src/array_api_stubs/_draft/array_object.py#L296 + """ + eval_snowpark_pandas_result( + *create_test_dfs(pandas_df), + lambda df: np.from_dlpack( + df.__dataframe__().get_column(0).get_buffers()["data"][0] + ), + comparator=pandas_internal_testing.assert_numpy_array_equal, + ) + + @sql_count_checker(query_count=1) + def test__dlpack_device__(self, pandas_df): + """ + Test that the buffer implements the Python array API standard's __dlpack_device__(). + + Check that the results of __dlpack_device__ match those of pandas. + + See the __dlpack_device__() specification here: + https://github.com/data-apis/array-api/blob/6d205d72dde3db8fc8668ad6aef5d003cc8ef80f/src/array_api_stubs/_draft/array_object.py#L469 + """ + + def get_device_type_and_id(df): + return ( + df.__dataframe__() + .get_column(0) + .get_buffers()["data"][0] + .__dlpack_device__() + ) + + snow_device_type, snow_device_id = get_device_type_and_id( + pd.DataFrame(pandas_df) + ) + pandas_device_type, pandas_device_id = get_device_type_and_id(pandas_df) + assert snow_device_id is None + assert pandas_device_id is None + assert snow_device_type is DlpackDeviceType.CPU + assert pandas_device_type is DlpackDeviceType.CPU + + +@pytest.mark.parametrize( + "columns", + [ + param([0, "0"], id="int_0_and_string_0"), + param([0, 0], id="duplicate_int_column"), + param(["0", "0"], id="duplicate_string_column"), + ], +) +@sql_count_checker(query_count=1) +def test_conflicting_string_names(columns): + """ + If a dataframe has two columns whose labels are equal when converted to + string, converting the dataframe to pandas with from_dataframe() raises + an exception in both native pandas and Snowpark pandas. The interchange + protocol only allows string names and cannot handle duplicate column names. + """ + eval_snowpark_pandas_result( + *create_test_dfs(native_pd.DataFrame([[0, 1]], columns=columns)), + lambda df: from_dataframe(df.__dataframe__()), + expect_exception=True, + expect_exception_type=TypeError, + expect_exception_match=re.escape( + "Expected a Series, got a DataFrame. This likely " + + "happened because you called __dataframe__ on a DataFrame " + + "which, after converting column names to string, resulted in " + + f"duplicated names: {repr(native_pd.Index(columns).map(str))}. " + + "Please rename these columns before using the interchange " + + "protocol." + ), + ) + + +@sql_count_checker(query_count=1) +def test_list_column_dtype(): + eval_snowpark_pandas_result( + *create_test_dfs([[list("list_item")]]), + lambda df: from_dataframe(df.__dataframe__()), + expect_exception=True, + expect_exception_type=NotImplementedError, + expect_exception_match=re.escape( + "Non-string object dtypes are not supported yet" + ), + ) + + +@sql_count_checker(query_count=1) +def test_timedelta_dtype(): + eval_snowpark_pandas_result( + *create_test_dfs([[pd.Timedelta(1)]]), + lambda df: from_dataframe(df.__dataframe__()), + expect_exception=True, + expect_exception_type=NotImplementedError, + expect_exception_match=re.escape( + "Conversion of timedelta64[ns] to Arrow C format string is not " + + "implemented." + ), + ) diff --git a/tests/integ/modin/test_scikit.py b/tests/integ/modin/test_scikit.py index bf169f6b275..c28a6f59ba9 100644 --- a/tests/integ/modin/test_scikit.py +++ b/tests/integ/modin/test_scikit.py @@ -7,39 +7,19 @@ from sklearn.preprocessing import MaxAbsScaler import snowflake.snowpark.modin.plugin # noqa: F401 -from tests.integ.utils.sql_counter import SqlCounter +from tests.integ.utils.sql_counter import sql_count_checker -# SNOW-1344931 Support MaxAbsScalar on earlier versions of -# scikit-learn by supporting numpy.may_share_memory -def test_scikit_maxabs(): +@sql_count_checker(query_count=5) +def test_maxabs(): data = [[1.0, -1.0, 2.0], [2.0, 0.0, 0.0], [0.0, 1.0, -1.0]] X = pd.DataFrame(data) - # the following will result in a TypeError on earlier versions - # of scikit-learn if `numpy.may_share_memory' is not implemented as an - # array function. In later versions a NotImplementedError is - # thrown. - with SqlCounter(query_count=0): - try: - MaxAbsScaler().fit_transform(X) - raise AssertionError() - except NotImplementedError: - # scikit-learn 1.5.2 will throw an error for the DF - # interchange protocol. - pass + MaxAbsScaler().fit_transform(X) -# SNOW-1518382 Support PCA on earlier versions of -# scikit-learn by supporting numpy.may_share_memory -def test_scikit_pca(): +@sql_count_checker(query_count=3) +def test_pca(): data = [[1.0, -1.0, 2.0], [2.0, 0.0, 0.0], [0.0, 1.0, -1.0]] X = pd.DataFrame(data) - with SqlCounter(query_count=0): - pca = PCA() - try: - pca.fit(X) - raise AssertionError() - except NotImplementedError: - # scikit-learn 1.5.2 will throw an error for the DF - # interchange protocol. - pass + pca = PCA() + pca.fit(X) diff --git a/tests/integ/modin/test_telemetry.py b/tests/integ/modin/test_telemetry.py index 5a5f90db50e..3caa81741b3 100644 --- a/tests/integ/modin/test_telemetry.py +++ b/tests/integ/modin/test_telemetry.py @@ -2,7 +2,6 @@ # # Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. # -import contextlib import json import sys from typing import Any, Optional @@ -420,37 +419,35 @@ def test_telemetry_getitem_setitem(): @pytest.mark.parametrize( - "name, expected_func_name, method, expected_query_count, method_implemented", + "name, expected_func_name, method, expected_query_count", [ # __repr__ is an extension method, so the class name is shown only once. - ["__repr__", "DataFrame.__repr__", lambda df: df.__repr__(), 1, True], + ["__repr__", "DataFrame.__repr__", lambda df: df.__repr__(), 1], # __iter__ was defined on the DataFrame class, so it is shown twice. - ["__iter__", "DataFrame.DataFrame.__iter__", lambda df: df.__iter__(), 0, True], + ["__iter__", "DataFrame.DataFrame.__iter__", lambda df: df.__iter__(), 0], [ "__dataframe__", "DataFrame.__dataframe__", lambda df: df.__dataframe__(), - 0, - False, + # The interchange protocol method will trigger a query to convert + # the Snowpark pandas dataframe to native pandas. + 1, ], ], ) def test_telemetry_private_method( - name, expected_func_name, method, expected_query_count, method_implemented + name, + expected_func_name, + method, + expected_query_count, ): df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}) # Clear connector telemetry client buffer to avoid flush triggered by the next API call, ensuring log extraction. df._query_compiler._modin_frame.ordered_dataframe.session._conn._telemetry_client.telemetry.send_batch() - - with SqlCounter(query_count=expected_query_count), ( - contextlib.nullcontext() - if method_implemented - else pytest.raises(NotImplementedError) - ): + with SqlCounter(query_count=expected_query_count): method(df) # This trigger eager evaluation and the messages should have been flushed to the connector, so we have to extract # the telemetry log from the connector to validate - data = _extract_snowpark_pandas_telemetry_log_data( expected_func_name=expected_func_name, session=df._query_compiler._modin_frame.ordered_dataframe.session, diff --git a/tests/unit/modin/test_unsupported.py b/tests/unit/modin/test_unsupported.py index 4052daf8053..3fa742074bf 100644 --- a/tests/unit/modin/test_unsupported.py +++ b/tests/unit/modin/test_unsupported.py @@ -114,7 +114,6 @@ def test_unsupported_general(general_method, kwargs): ["transform", {"func": [[], {}]}], ["truncate", {}], ["xs", {"key": ""}], - ["__dataframe__", {}], ], ) def test_unsupported_df(df_method, kwargs):