diff --git a/kedro-datasets/RELEASE.md b/kedro-datasets/RELEASE.md index d20e91858..42c5bba60 100755 --- a/kedro-datasets/RELEASE.md +++ b/kedro-datasets/RELEASE.md @@ -1,4 +1,5 @@ # Upcoming Release + ## Major features and improvements - Group datasets documentation according to the dependencies to clean up the nav bar. @@ -29,14 +30,16 @@ | ------------------------------ | ------------------------------------------------------------- | ------------------------------------ | | `polars.PolarsDatabaseDataset` | A dataset to load and save data to a SQL backend using Polars | `kedro_datasets_experimental.polars` | +- Added `mode` save argument to `ibis.TableDataset`, supporting "append", "overwrite", "error"/"errorifexists", and "ignore" save modes. The deprecated `overwrite` save argument is mapped to `mode` for backward compatibility and will be removed in a future release. Specifying both `mode` and `overwrite` results in an error. + ## Bug fixes and other changes - Added primary key constraint to BaseTable. - Added save/load with `use_pyarrow=True` save_args for LazyPolarsDataset partitioned parquet files. - Updated the json schema for Kedro 1.0.0. -## Breaking Changes ## Community contributions + - [Minura Punchihewa](https://github.com/MinuraPunchihewa) - [gitgud5000](https://github.com/gitgud5000) @@ -70,7 +73,6 @@ Many thanks to the following Kedroids for contributing PRs to this release: - [Seohyun Park](https://github.com/soyamimi) - [Daniel Russell-Brain](https://github.com/killerfridge) - # Release 7.0.0 ## Major features and improvements diff --git a/kedro-datasets/kedro_datasets/ibis/table_dataset.py b/kedro-datasets/kedro_datasets/ibis/table_dataset.py index 4fdac3080..879efa78a 100644 --- a/kedro-datasets/kedro_datasets/ibis/table_dataset.py +++ b/kedro-datasets/kedro_datasets/ibis/table_dataset.py @@ -1,18 +1,37 @@ """Provide data loading and saving functionality for Ibis's backends.""" from __future__ import annotations +import sys from copy import deepcopy +from enum import auto from typing import TYPE_CHECKING, Any, ClassVar +from warnings import warn + +if sys.version_info >= (3, 11): + from enum import StrEnum # pragma: no cover +else: + from backports.strenum import StrEnum # pragma: no cover import ibis.expr.types as ir -from kedro.io import AbstractDataset +from kedro.io import AbstractDataset, DatasetError +from kedro_datasets import KedroDeprecationWarning from kedro_datasets._utils import ConnectionMixin if TYPE_CHECKING: from ibis import BaseBackend +class SaveMode(StrEnum): + """`SaveMode` is used to specify the expected behavior of saving a table.""" + + APPEND = auto() + OVERWRITE = auto() + ERROR = auto() + ERRORIFEXISTS = auto() + IGNORE = auto() + + class TableDataset(ConnectionMixin, AbstractDataset[ir.Table, ir.Table]): """`TableDataset` loads/saves data from/to Ibis table expressions. @@ -28,6 +47,7 @@ class TableDataset(ConnectionMixin, AbstractDataset[ir.Table, ir.Table]): database: company.db save_args: materialized: table + mode: append motorbikes: type: ibis.TableDataset @@ -35,7 +55,10 @@ class TableDataset(ConnectionMixin, AbstractDataset[ir.Table, ir.Table]): connection: backend: duckdb database: company.db - ``` + save_args: + materialized: view + mode: overwrite + ``` Using the [Python API](https://docs.kedro.org/en/stable/catalog-data/advanced_data_catalog_usage/): @@ -62,7 +85,7 @@ class TableDataset(ConnectionMixin, AbstractDataset[ir.Table, ir.Table]): DEFAULT_LOAD_ARGS: ClassVar[dict[str, Any]] = {} DEFAULT_SAVE_ARGS: ClassVar[dict[str, Any]] = { "materialized": "view", - "overwrite": True, + "mode": "overwrite", } _CONNECTION_GROUP: ClassVar[str] = "ibis" @@ -109,7 +132,12 @@ def __init__( # noqa: PLR0913 `create_{materialized}` method. By default, ``ir.Table`` objects are materialized as views. To save a table using a different materialization strategy, supply a value for - `materialized` in `save_args`. + `materialized` in `save_args`. The `mode` parameter controls + the behavior when saving data: + - _"overwrite"_: Overwrite existing data in the table. + - _"append"_: Append contents of the new data to the existing table (does not overwrite). + - _"error"_ or _"errorifexists"_: Throw an exception if the table already exists. + - _"ignore"_: Silently ignore the operation if the table already exists. metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ @@ -134,6 +162,28 @@ def __init__( # noqa: PLR0913 self._materialized = self._save_args.pop("materialized") + # Handle mode/overwrite conflict. + if save_args and "mode" in save_args and "overwrite" in self._save_args: + raise ValueError("Cannot specify both 'mode' and deprecated 'overwrite'.") + + # Map legacy overwrite if present. + if "overwrite" in self._save_args: + warn( + "'overwrite' is deprecated and will be removed in a future release. " + "Please use 'mode' instead.", + KedroDeprecationWarning, + stacklevel=2, + ) + legacy = self._save_args.pop("overwrite") + # Remove any lingering 'mode' key from defaults to avoid + # leaking into writer kwargs. + del self._save_args["mode"] + mode = "overwrite" if legacy else "error" + else: + mode = self._save_args.pop("mode") + + self._mode = SaveMode(mode) + def _connect(self) -> BaseBackend: import ibis # noqa: PLC0415 @@ -151,7 +201,21 @@ def load(self) -> ir.Table: def save(self, data: ir.Table) -> None: writer = getattr(self.connection, f"create_{self._materialized}") - writer(self._table_name, data, **self._save_args) + if self._mode == "append": + if not self._exists(): + writer(self._table_name, data, overwrite=False, **self._save_args) + elif hasattr(self.connection, "insert"): + self.connection.insert(self._table_name, data, **self._save_args) + else: + raise DatasetError( + f"The {self.connection.name} backend for Ibis does not support inserts." + ) + elif self._mode == "overwrite": + writer(self._table_name, data, overwrite=True, **self._save_args) + elif self._mode in {"error", "errorifexists"}: + writer(self._table_name, data, overwrite=False, **self._save_args) + elif self._mode == "ignore" and not self._exists(): + writer(self._table_name, data, overwrite=False, **self._save_args) def _describe(self) -> dict[str, Any]: load_args = deepcopy(self._load_args) @@ -165,6 +229,7 @@ def _describe(self) -> dict[str, Any]: "load_args": load_args, "save_args": save_args, "materialized": self._materialized, + "mode": self._mode, } def _exists(self) -> bool: diff --git a/kedro-datasets/pyproject.toml b/kedro-datasets/pyproject.toml index 6622cd670..4f3bab0bf 100644 --- a/kedro-datasets/pyproject.toml +++ b/kedro-datasets/pyproject.toml @@ -11,6 +11,7 @@ description = "Kedro-Datasets is where you can find all of Kedro's data connecto requires-python = ">=3.10" license = {text = "Apache Software License (Apache 2.0)"} dependencies = [ + "backports.strenum; python_version < '3.11'", "kedro>=1.0.0rc1, <2.0.0", "lazy_loader", ] diff --git a/kedro-datasets/tests/ibis/test_table_dataset.py b/kedro-datasets/tests/ibis/test_table_dataset.py index 481c011db..0cfcac7de 100644 --- a/kedro-datasets/tests/ibis/test_table_dataset.py +++ b/kedro-datasets/tests/ibis/test_table_dataset.py @@ -1,9 +1,12 @@ import duckdb import ibis +import pandas as pd import pytest +from kedro.io import DatasetError from packaging.version import Version from pandas.testing import assert_frame_equal +from kedro_datasets import KedroDeprecationWarning from kedro_datasets.ibis import FileDataset, TableDataset _SENTINEL = object() @@ -37,13 +40,15 @@ def connection_config(request, database): @pytest.fixture def table_dataset(database_name, connection_config, load_args, save_args): - return TableDataset( + ds = TableDataset( table_name="test", database=database_name, connection=connection_config, load_args=load_args, save_args=save_args, ) + yield ds + getattr(ds._connection, f"drop_{ds._materialized}")("test", force=True) @pytest.fixture @@ -77,9 +82,10 @@ def test_save_and_load(self, table_dataset, dummy_table, database): assert "test" in con.sql("SELECT * FROM duckdb_views").fetchnumpy()["view_name"] @pytest.mark.parametrize( - "connection_config", [{"backend": "polars"}], indirect=True + ("connection_config", "save_args"), + [({"backend": "polars"}, {"materialized": "table"})], + indirect=True, ) - @pytest.mark.parametrize("save_args", [{"materialized": "table"}], indirect=True) def test_save_and_load_polars( self, table_dataset, connection_config, save_args, dummy_table ): @@ -102,6 +108,139 @@ def test_exists(self, table_dataset, dummy_table): table_dataset.save(dummy_table) assert table_dataset.exists() + @pytest.mark.parametrize( + "save_args", [{"materialized": "table", "mode": "append"}], indirect=True + ) + def test_save_mode_append(self, table_dataset, dummy_table): + """Saving with mode=append should add rows to an existing table.""" + df1 = dummy_table + df2 = dummy_table + + table_dataset.save(df1) + table_dataset.save(df2) + + df1 = df1.execute() + df2 = df2.execute() + reloaded = table_dataset.load().execute() + assert len(reloaded) == len(df1) + len(df2) + + @pytest.mark.parametrize( + "save_args", + [ + {"materialized": "table", "mode": "error"}, + {"materialized": "table", "mode": "errorifexists"}, + ], + indirect=True, + ) + def test_save_mode_error_variants(self, table_dataset, dummy_table): + """Saving with error/errorifexists should raise when table exists.""" + table_dataset.save(dummy_table) + with pytest.raises(DatasetError, match='Table with name "test" already exists'): + table_dataset.save(dummy_table) + + @pytest.mark.parametrize( + "save_args", [{"materialized": "table", "mode": "ignore"}], indirect=True + ) + def test_save_mode_ignore(self, table_dataset, dummy_table): + """Saving with ignore should not change existing table.""" + df1 = dummy_table + df2 = dummy_table + + table_dataset.save(df1) + table_dataset.save(df2) + df1 = df1.execute() + + reloaded = table_dataset.load().execute() + # Should remain as first save only + assert_frame_equal(reloaded.reset_index(drop=True), df1.reset_index(drop=True)) + + def test_unsupported_save_mode_raises(self, database_name, connection_config): + """Providing an unsupported save mode should raise a DatasetError.""" + with pytest.raises( + ValueError, match="'unsupported_mode' is not a valid SaveMode" + ): + TableDataset( + table_name="unsupported_mode", + database=database_name, + connection=connection_config, + save_args={"materialized": "table", "mode": "unsupported_mode"}, + ) + + def test_legacy_overwrite_conflict_raises(self, database_name, connection_config): + """Providing both mode and overwrite should raise a ValueError.""" + with pytest.raises(ValueError): + TableDataset( + table_name="conflict", + database=database_name, + connection=connection_config, + save_args={ + "materialized": "table", + "mode": "append", + "overwrite": True, + }, + ) + + def test_legacy_overwrite_deprecation_warning( + self, database_name, connection_config + ): + """Using legacy overwrite should raise a deprecation warning.""" + with pytest.warns(KedroDeprecationWarning, match="'overwrite' is deprecated"): + TableDataset( + table_name="deprecated_overwrite", + database=database_name, + connection=connection_config, + save_args={"overwrite": True}, + ) + + @pytest.mark.parametrize( + ("connection_config", "save_args"), + [({"backend": "polars"}, {"materialized": "table", "mode": "append"})], + indirect=True, + ) + def test_append_mode_no_insert_raises(self, table_dataset, dummy_table): + """Test that saving with mode=append on a backend without 'insert' raises DatasetError (polars backend).""" + # Save once to create the table + table_dataset.save(dummy_table) + # Try to append again, should raise DatasetError + with pytest.raises(DatasetError, match="does not support inserts"): + table_dataset.save(dummy_table) + + @pytest.mark.parametrize( + "save_args", + [ + {"materialized": "table", "overwrite": True}, + {"materialized": "table", "overwrite": False}, + ], + indirect=True, + ) + def test_legacy_overwrite_behavior(self, table_dataset, save_args, dummy_table): + """Legacy overwrite should map to overwrite or error behavior.""" + legacy_overwrite = save_args["overwrite"] + df2 = ibis.memtable(pd.DataFrame({"col1": [7], "col2": [8], "col3": [9]})) + + table_dataset.save(dummy_table) # First save should always work + if legacy_overwrite: + # Should overwrite existing table with new contents + table_dataset.save(df2) + df2 = df2.execute() + out = table_dataset.load().execute().reset_index(drop=True) + assert_frame_equal(out, df2.reset_index(drop=True)) + else: + # Should raise on second save when table exists + with pytest.raises(DatasetError): + table_dataset.save(df2) + + def test_describe_includes_backend_mode_and_materialized(self, table_dataset): + """_describe should expose backend, mode and materialized; nested args exclude database.""" + + desc = table_dataset._describe() + + assert {"backend", "mode", "materialized"} <= desc.keys() + assert "database" in desc + # database key should not be duplicated inside nested args + assert "database" not in desc["load_args"] + assert "database" not in desc["save_args"] + @pytest.mark.parametrize("load_args", [{"database": "test"}], indirect=True) def test_load_extra_params(self, table_dataset, load_args): """Test overriding the default load arguments.""" @@ -109,7 +248,7 @@ def test_load_extra_params(self, table_dataset, load_args): assert table_dataset._load_args[key] == value @pytest.mark.parametrize("save_args", [{"materialized": "table"}], indirect=True) - def test_save_extra_params(self, table_dataset, save_args, dummy_table, database): + def test_save_extra_params(self, table_dataset, dummy_table, database): """Test overriding the default save arguments.""" table_dataset.save(dummy_table)