diff --git a/.changes/unreleased/Features-20240917-100505.yaml b/.changes/unreleased/Features-20240917-100505.yaml new file mode 100644 index 000000000..22cabc904 --- /dev/null +++ b/.changes/unreleased/Features-20240917-100505.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add support for Iceberg table format in Dynamic Tables +time: 2024-09-17T10:05:05.609859-04:00 +custom: + Author: mikealfare + Issue: "1183" diff --git a/dbt/adapters/snowflake/relation.py b/dbt/adapters/snowflake/relation.py index 224b2b75e..b6924b9b3 100644 --- a/dbt/adapters/snowflake/relation.py +++ b/dbt/adapters/snowflake/relation.py @@ -17,6 +17,7 @@ from dbt_common.events.functions import fire_event, warn_or_error from dbt.adapters.snowflake.relation_configs import ( + SnowflakeCatalogConfigChange, SnowflakeDynamicTableConfig, SnowflakeDynamicTableConfigChangeset, SnowflakeDynamicTableRefreshModeConfigChange, @@ -114,6 +115,12 @@ def dynamic_table_config_changeset( context=new_dynamic_table.refresh_mode, ) + if new_dynamic_table.catalog != existing_dynamic_table.catalog: + config_change_collection.catalog = SnowflakeCatalogConfigChange( + action=RelationConfigChangeAction.create, + context=new_dynamic_table.catalog, + ) + if config_change_collection.has_changes: return config_change_collection return None @@ -132,6 +139,14 @@ def as_case_sensitive(self) -> "SnowflakeRelation": return self.replace_path(**path_part_map) + @property + def can_be_renamed(self) -> bool: + """ + Standard tables and dynamic tables can be renamed, but Snowflake does not support renaming iceberg relations. + The iceberg standard does support renaming, so this may change in the future. + """ + return self.type in self.renameable_relations and not self.is_iceberg_format + def get_ddl_prefix_for_create(self, config: RelationConfig, temporary: bool) -> str: """ This macro renders the appropriate DDL prefix during the create_table_as diff --git a/dbt/adapters/snowflake/relation_configs/__init__.py b/dbt/adapters/snowflake/relation_configs/__init__.py index 61941ab50..fec9d8a54 100644 --- a/dbt/adapters/snowflake/relation_configs/__init__.py +++ b/dbt/adapters/snowflake/relation_configs/__init__.py @@ -1,3 +1,7 @@ +from dbt.adapters.snowflake.relation_configs.catalog import ( + SnowflakeCatalogConfig, + SnowflakeCatalogConfigChange, +) from dbt.adapters.snowflake.relation_configs.dynamic_table import ( SnowflakeDynamicTableConfig, SnowflakeDynamicTableConfigChangeset, @@ -5,9 +9,9 @@ SnowflakeDynamicTableWarehouseConfigChange, SnowflakeDynamicTableTargetLagConfigChange, ) +from dbt.adapters.snowflake.relation_configs.formats import TableFormat from dbt.adapters.snowflake.relation_configs.policies import ( SnowflakeIncludePolicy, SnowflakeQuotePolicy, SnowflakeRelationType, ) -from dbt.adapters.snowflake.relation_configs.formats import TableFormat diff --git a/dbt/adapters/snowflake/relation_configs/catalog.py b/dbt/adapters/snowflake/relation_configs/catalog.py new file mode 100644 index 000000000..09e338635 --- /dev/null +++ b/dbt/adapters/snowflake/relation_configs/catalog.py @@ -0,0 +1,123 @@ +from dataclasses import dataclass +from typing import Any, Dict, Optional, TYPE_CHECKING, Set + +if TYPE_CHECKING: + import agate + +from dbt.adapters.relation_configs import ( + RelationConfigChange, + RelationResults, + RelationConfigValidationMixin, + RelationConfigValidationRule, +) +from dbt.adapters.contracts.relation import RelationConfig +from dbt_common.exceptions import DbtConfigError +from typing_extensions import Self + +from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase +from dbt.adapters.snowflake.relation_configs.formats import TableFormat + + +@dataclass(frozen=True, eq=True, unsafe_hash=True) +class SnowflakeCatalogConfig(SnowflakeRelationConfigBase, RelationConfigValidationMixin): + """ + This config follow the specs found here: + https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table + https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#create-dynamic-iceberg-table + + The following parameters are configurable by dbt: + - table_format: format for interfacing with the table, e.g. default, iceberg + - external_volume: name of the external volume in Snowflake + - base_location: the directory within the external volume that contains the data + *Note*: This directory can’t be changed after you create a table. + + The following parameters are not currently configurable by dbt: + - name: snowflake + """ + + table_format: Optional[TableFormat] = TableFormat.default() + name: Optional[str] = "SNOWFLAKE" + external_volume: Optional[str] = None + base_location: Optional[str] = None + + @property + def validation_rules(self) -> Set[RelationConfigValidationRule]: + return { + RelationConfigValidationRule( + (self.table_format == "default") + or (self.table_format == "iceberg" and self.base_location is not None), + DbtConfigError("Please provide a `base_location` when using iceberg"), + ), + RelationConfigValidationRule( + (self.table_format == "default") + or (self.table_format == "iceberg" and self.name == "SNOWFLAKE"), + DbtConfigError( + "Only Snowflake catalogs are currently supported when using iceberg" + ), + ), + } + + @classmethod + def from_dict(cls, config_dict: Dict[str, Any]) -> Self: + kwargs_dict = { + "name": config_dict.get("name"), + "external_volume": config_dict.get("external_volume"), + "base_location": config_dict.get("base_location"), + } + if table_format := config_dict.get("table_format"): + kwargs_dict["table_format"] = TableFormat(table_format) + return super().from_dict(kwargs_dict) + + @classmethod + def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any]: + + if relation_config.config.extra.get("table_format") is None: + return {} + + config_dict = { + "table_format": relation_config.config.extra.get("table_format"), + "name": "SNOWFLAKE", # this is not currently configurable + } + + if external_volume := relation_config.config.extra.get("external_volume"): + config_dict["external_volume"] = external_volume + + if base_location := relation_config.config.extra.get("base_location_subpath"): + config_dict["base_location"] = base_location + + return config_dict + + @classmethod + def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]: + # this try block can be removed once enable_iceberg_materializations is retired + try: + catalog_results: "agate.Table" = relation_results["catalog"] + except KeyError: + # this happens when `enable_iceberg_materializations` is turned off + return {} + + if len(catalog_results) == 0: + # this happens when the dynamic table is a standard dynamic table (e.g. not iceberg) + return {} + + # for now, if we get catalog results, it's because this is an iceberg table + # this is because we only run `show iceberg tables` to get catalog metadata + # this will need to be updated once this is in `show objects` + catalog: "agate.Row" = catalog_results.rows[0] + config_dict = { + "table_format": "iceberg", + "name": catalog.get("catalog_name"), + "external_volume": catalog.get("external_volume_name"), + "base_location": catalog.get("base_location"), + } + + return config_dict + + +@dataclass(frozen=True, eq=True, unsafe_hash=True) +class SnowflakeCatalogConfigChange(RelationConfigChange): + context: Optional[SnowflakeCatalogConfig] = None + + @property + def requires_full_refresh(self) -> bool: + return True diff --git a/dbt/adapters/snowflake/relation_configs/dynamic_table.py b/dbt/adapters/snowflake/relation_configs/dynamic_table.py index 2e227d3a4..7361df80a 100644 --- a/dbt/adapters/snowflake/relation_configs/dynamic_table.py +++ b/dbt/adapters/snowflake/relation_configs/dynamic_table.py @@ -8,6 +8,11 @@ from typing_extensions import Self from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase +from dbt.adapters.snowflake.relation_configs.catalog import ( + SnowflakeCatalogConfig, + SnowflakeCatalogConfigChange, +) + if TYPE_CHECKING: import agate @@ -55,11 +60,12 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase): query: str target_lag: str snowflake_warehouse: str + catalog: SnowflakeCatalogConfig refresh_mode: Optional[RefreshMode] = RefreshMode.default() initialize: Optional[Initialize] = Initialize.default() @classmethod - def from_dict(cls, config_dict) -> "SnowflakeDynamicTableConfig": + def from_dict(cls, config_dict: Dict[str, Any]) -> Self: kwargs_dict = { "name": cls._render_part(ComponentName.Identifier, config_dict.get("name")), "schema_name": cls._render_part(ComponentName.Schema, config_dict.get("schema_name")), @@ -69,12 +75,12 @@ def from_dict(cls, config_dict) -> "SnowflakeDynamicTableConfig": "query": config_dict.get("query"), "target_lag": config_dict.get("target_lag"), "snowflake_warehouse": config_dict.get("snowflake_warehouse"), + "catalog": SnowflakeCatalogConfig.from_dict(config_dict["catalog"]), "refresh_mode": config_dict.get("refresh_mode"), "initialize": config_dict.get("initialize"), } - dynamic_table: "SnowflakeDynamicTableConfig" = super().from_dict(kwargs_dict) - return dynamic_table + return super().from_dict(kwargs_dict) @classmethod def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any]: @@ -85,18 +91,19 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any "query": relation_config.compiled_code, "target_lag": relation_config.config.extra.get("target_lag"), "snowflake_warehouse": relation_config.config.extra.get("snowflake_warehouse"), + "catalog": SnowflakeCatalogConfig.parse_relation_config(relation_config), } if refresh_mode := relation_config.config.extra.get("refresh_mode"): - config_dict.update(refresh_mode=refresh_mode.upper()) + config_dict["refresh_mode"] = refresh_mode.upper() if initialize := relation_config.config.extra.get("initialize"): - config_dict.update(initialize=initialize.upper()) + config_dict["initialize"] = initialize.upper() return config_dict @classmethod - def parse_relation_results(cls, relation_results: RelationResults) -> Dict: + def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]: dynamic_table: "agate.Row" = relation_results["dynamic_table"].rows[0] config_dict = { @@ -106,6 +113,7 @@ def parse_relation_results(cls, relation_results: RelationResults) -> Dict: "query": dynamic_table.get("text"), "target_lag": dynamic_table.get("target_lag"), "snowflake_warehouse": dynamic_table.get("warehouse"), + "catalog": SnowflakeCatalogConfig.parse_relation_results(relation_results), "refresh_mode": dynamic_table.get("refresh_mode"), # we don't get initialize since that's a one-time scheduler attribute, not a DT attribute } @@ -145,6 +153,7 @@ class SnowflakeDynamicTableConfigChangeset: target_lag: Optional[SnowflakeDynamicTableTargetLagConfigChange] = None snowflake_warehouse: Optional[SnowflakeDynamicTableWarehouseConfigChange] = None refresh_mode: Optional[SnowflakeDynamicTableRefreshModeConfigChange] = None + catalog: Optional[SnowflakeCatalogConfigChange] = None @property def requires_full_refresh(self) -> bool: @@ -157,9 +166,10 @@ def requires_full_refresh(self) -> bool: else False ), self.refresh_mode.requires_full_refresh if self.refresh_mode else False, + self.catalog.requires_full_refresh if self.catalog else False, ] ) @property def has_changes(self) -> bool: - return any([self.target_lag, self.snowflake_warehouse, self.refresh_mode]) + return any([self.target_lag, self.snowflake_warehouse, self.refresh_mode, self.catalog]) diff --git a/dbt/adapters/snowflake/relation_configs/formats.py b/dbt/adapters/snowflake/relation_configs/formats.py index 460241d9d..b6bb0bdda 100644 --- a/dbt/adapters/snowflake/relation_configs/formats.py +++ b/dbt/adapters/snowflake/relation_configs/formats.py @@ -1,4 +1,5 @@ from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11 +from typing_extensions import Self class TableFormat(StrEnum): @@ -10,5 +11,9 @@ class TableFormat(StrEnum): DEFAULT = "default" ICEBERG = "iceberg" + @classmethod + def default(cls) -> Self: + return cls("default") + def __str__(self): return self.value diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/create.sql b/dbt/include/snowflake/macros/relations/dynamic_table/create.sql index 253788779..0bd190dcc 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/create.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/create.sql @@ -1,16 +1,83 @@ {% macro snowflake__get_create_dynamic_table_as_sql(relation, sql) -%} +{#- +-- Produce DDL that creates a dynamic table +-- +-- Args: +-- - relation: Union[SnowflakeRelation, str] +-- - SnowflakeRelation - required for relation.render() +-- - str - is already the rendered relation name +-- - sql: str - the code defining the model +-- Globals: +-- - config: NodeConfig - contains the attribution required to produce a SnowflakeDynamicTableConfig +-- Returns: +-- A valid DDL statement which will result in a new dynamic table. +-#} {%- set dynamic_table = relation.from_config(config.model) -%} + {%- if dynamic_table.catalog.table_format == 'iceberg' -%} + {{ _get_create_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }} + {%- else -%} + {{ _get_create_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }} + {%- endif -%} + +{%- endmacro %} + + +{% macro _get_create_dynamic_standard_table_as_sql(dynamic_table, relation, sql) -%} +{#- +-- Produce DDL that creates a standard dynamic table +-- +-- This follows the syntax outlined here: +-- https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#syntax +-- +-- Args: +-- - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table +-- - relation: Union[SnowflakeRelation, str] +-- - SnowflakeRelation - required for relation.render() +-- - str - is already the rendered relation name +-- - sql: str - the code defining the model +-- Returns: +-- A valid DDL statement which will result in a new dynamic standard table. +-#} + create dynamic table {{ relation }} target_lag = '{{ dynamic_table.target_lag }}' warehouse = {{ dynamic_table.snowflake_warehouse }} - {% if dynamic_table.refresh_mode %} - refresh_mode = {{ dynamic_table.refresh_mode }} - {% endif %} - {% if dynamic_table.initialize %} - initialize = {{ dynamic_table.initialize }} - {% endif %} + {{ optional('refresh_mode', dynamic_table.refresh_mode) }} + {{ optional('initialize', dynamic_table.initialize) }} + as ( + {{ sql }} + ) + +{%- endmacro %} + + +{% macro _get_create_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) -%} +{#- +-- Produce DDL that creates a dynamic iceberg table +-- +-- This follows the syntax outlined here: +-- https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#create-dynamic-iceberg-table +-- +-- Args: +-- - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table +-- - relation: Union[SnowflakeRelation, str] +-- - SnowflakeRelation - required for relation.render() +-- - str - is already the rendered relation name +-- - sql: str - the code defining the model +-- Returns: +-- A valid DDL statement which will result in a new dynamic iceberg table. +-#} + + create dynamic iceberg table {{ relation }} + target_lag = '{{ dynamic_table.target_lag }}' + warehouse = {{ dynamic_table.snowflake_warehouse }} + {{ optional('external_volume', dynamic_table.catalog.external_volume) }} + {{ optional('catalog', dynamic_table.catalog.name) }} + base_location = {{ dynamic_table.catalog.base_location }} + {{ optional('refresh_mode', dynamic_table.refresh_mode) }} + {{ optional('initialize', dynamic_table.initialize) }} as ( {{ sql }} ) diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql b/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql index cc79328fe..b5c49ad37 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/describe.sql @@ -1,4 +1,14 @@ {% macro snowflake__describe_dynamic_table(relation) %} +{#- +-- Get all relevant metadata about a dynamic table +-- +-- Args: +-- - relation: SnowflakeRelation - the relation to describe +-- Returns: +-- A dictionary with one or two entries depending on whether iceberg is enabled: +-- - dynamic_table: the metadata associated with a standard dynamic table +-- - catalog: the metadata associated with the iceberg catalog +-#} {%- set _dynamic_table_sql -%} show dynamic tables like '{{ relation.identifier }}' @@ -14,7 +24,32 @@ "refresh_mode" from table(result_scan(last_query_id())) {%- endset %} - {% set _dynamic_table = run_query(_dynamic_table_sql) %} + {% set results = {'dynamic_table': run_query(_dynamic_table_sql)} %} - {% do return({'dynamic_table': _dynamic_table}) %} + {% if adapter.behavior.enable_iceberg_materializations.no_warn %} + {% set _ = results.update({'catalog': run_query(_get_describe_iceberg_catalog_sql(relation))}) %} + {% endif %} + + {% do return(results) %} +{% endmacro %} + + +{% macro _get_describe_iceberg_catalog_sql(relation) %} +{#- +-- Produce DQL that returns all relevant metadata about an iceberg catalog +-- +-- Args: +-- - relation: SnowflakeRelation - the relation to describe +-- Returns: +-- A valid DQL statement that will return metadata associated with an iceberg catalog +-#} + show iceberg tables + like '{{ relation.identifier }}' + in schema {{ relation.database }}.{{ relation.schema }} + ; + select + "catalog_name", + "external_volume_name", + "base_location" + from table(result_scan(last_query_id())) {% endmacro %} diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql b/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql index dbe27d66e..f9ba1275a 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql @@ -1,16 +1,82 @@ {% macro snowflake__get_replace_dynamic_table_sql(relation, sql) -%} +{#- +-- Produce DDL that replaces a dynamic table with a new dynamic table +-- +-- Args: +-- - relation: Union[SnowflakeRelation, str] +-- - SnowflakeRelation - required for relation.render() +-- - str - is already the rendered relation name +-- - sql: str - the code defining the model +-- Globals: +-- - config: NodeConfig - contains the attribution required to produce a SnowflakeDynamicTableConfig +-- Returns: +-- A valid DDL statement which will result in a new dynamic table. +-#} {%- set dynamic_table = relation.from_config(config.model) -%} + {%- if dynamic_table.catalog.table_format == 'iceberg' -%} + {{ _get_replace_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }} + {%- else -%} + {{ _get_replace_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }} + {%- endif -%} + +{%- endmacro %} + +{% macro _get_replace_dynamic_standard_table_as_sql(dynamic_table, relation, sql) -%} +{#- +-- Produce DDL that replaces a standard dynamic table with a new standard dynamic table +-- +-- This follows the syntax outlined here: +-- https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#syntax +-- +-- Args: +-- - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table +-- - relation: Union[SnowflakeRelation, str] +-- - SnowflakeRelation - required for relation.render() +-- - str - is already the rendered relation name +-- - sql: str - the code defining the model +-- Returns: +-- A valid DDL statement which will result in a new dynamic standard table. +-#} + create or replace dynamic table {{ relation }} target_lag = '{{ dynamic_table.target_lag }}' warehouse = {{ dynamic_table.snowflake_warehouse }} - {% if dynamic_table.refresh_mode %} - refresh_mode = {{ dynamic_table.refresh_mode }} - {% endif %} - {% if dynamic_table.initialize %} - initialize = {{ dynamic_table.initialize }} - {% endif %} + {{ optional('refresh_mode', dynamic_table.refresh_mode) }} + {{ optional('initialize', dynamic_table.initialize) }} + as ( + {{ sql }} + ) + +{%- endmacro %} + + +{% macro _get_replace_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) -%} +{#- +-- Produce DDL that replaces a dynamic iceberg table with a new dynamic iceberg table +-- +-- This follows the syntax outlined here: +-- https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#create-dynamic-iceberg-table +-- +-- Args: +-- - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table +-- - relation: Union[SnowflakeRelation, str] +-- - SnowflakeRelation - required for relation.render() +-- - str - is already the rendered relation name +-- - sql: str - the code defining the model +-- Returns: +-- A valid DDL statement which will result in a new dynamic iceberg table. +-#} + + create or replace dynamic iceberg table {{ relation }} + target_lag = '{{ dynamic_table.target_lag }}' + warehouse = {{ dynamic_table.snowflake_warehouse }} + {{ optional('external_volume', dynamic_table.catalog.external_volume) }} + {{ optional('catalog', dynamic_table.catalog.name) }} + base_location = {{ dynamic_table.catalog.base_location }} + {{ optional('refresh_mode', dynamic_table.refresh_mode) }} + {{ optional('initialize', dynamic_table.initialize) }} as ( {{ sql }} ) diff --git a/dbt/include/snowflake/macros/utils/optional.sql b/dbt/include/snowflake/macros/utils/optional.sql new file mode 100644 index 000000000..0758ca59f --- /dev/null +++ b/dbt/include/snowflake/macros/utils/optional.sql @@ -0,0 +1,14 @@ +{% macro optional(name, value, quote_char = '') %} +{#- +-- Insert optional DDL parameters only when their value is provided; makes DDL statements more readable +-- +-- Args: +-- - name: the name of the DDL option +-- - value: the value of the DDL option, may be None +-- - quote_char: the quote character to use (e.g. string), leave blank if unnecessary (e.g. integer or bool) +-- Returns: +-- If the value is not None (e.g. provided by the user), return the option setting DDL +-- If the value is None, return an empty string +-#} +{% if value is not none %}{{ name }} = {{ quote_char }}{{ value }}{{ quote_char }}{% endif %} +{% endmacro %} diff --git a/tests/functional/relation_tests/dynamic_table_tests/models.py b/tests/functional/relation_tests/dynamic_table_tests/models.py index 5e46bed53..4dcd6cf48 100644 --- a/tests/functional/relation_tests/dynamic_table_tests/models.py +++ b/tests/functional/relation_tests/dynamic_table_tests/models.py @@ -10,7 +10,7 @@ {{ config( materialized='dynamic_table', snowflake_warehouse='DBT_TESTING', - target_lag='2 minutes', + target_lag='2 minutes', refresh_mode='INCREMENTAL', ) }} select * from {{ ref('my_seed') }} @@ -28,11 +28,25 @@ """ +DYNAMIC_ICEBERG_TABLE = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='2 minutes', + refresh_mode='INCREMENTAL', + table_format="iceberg", + external_volume="s3_iceberg_snow", + base_location_subpath="subpath", +) }} +select * from {{ ref('my_seed') }} +""" + + DYNAMIC_TABLE_ALTER = """ {{ config( materialized='dynamic_table', snowflake_warehouse='DBT_TESTING', - target_lag='5 minutes', + target_lag='5 minutes', refresh_mode='INCREMENTAL', ) }} select * from {{ ref('my_seed') }} @@ -43,8 +57,36 @@ {{ config( materialized='dynamic_table', snowflake_warehouse='DBT_TESTING', - target_lag='2 minutes', + target_lag='2 minutes', + refresh_mode='FULL', +) }} +select * from {{ ref('my_seed') }} +""" + + +DYNAMIC_ICEBERG_TABLE_ALTER = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='5 minutes', + refresh_mode='INCREMENTAL', + table_format="iceberg", + external_volume="s3_iceberg_snow", + base_location_subpath="subpath", +) }} +select * from {{ ref('my_seed') }} +""" + + +DYNAMIC_ICEBERG_TABLE_REPLACE = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='2 minutes', refresh_mode='FULL', + table_format="iceberg", + external_volume="s3_iceberg_snow", + base_location_subpath="subpath", ) }} select * from {{ ref('my_seed') }} """ diff --git a/tests/functional/relation_tests/dynamic_table_tests/test_basic.py b/tests/functional/relation_tests/dynamic_table_tests/test_basic.py index 2406e1c14..79a2241ca 100644 --- a/tests/functional/relation_tests/dynamic_table_tests/test_basic.py +++ b/tests/functional/relation_tests/dynamic_table_tests/test_basic.py @@ -7,6 +7,7 @@ class TestBasic: + iceberg: bool = False @pytest.fixture(scope="class", autouse=True) def seeds(self): @@ -14,10 +15,17 @@ def seeds(self): @pytest.fixture(scope="class", autouse=True) def models(self): - yield { + my_models = { "my_dynamic_table.sql": models.DYNAMIC_TABLE, "my_dynamic_table_downstream.sql": models.DYNAMIC_TABLE_DOWNSTREAM, } + if self.iceberg: + my_models.update( + { + "my_dynamic_iceberg_table.sql": models.DYNAMIC_ICEBERG_TABLE, + } + ) + yield my_models @pytest.fixture(scope="class", autouse=True) def setup(self, project): @@ -28,3 +36,13 @@ def test_dynamic_table_full_refresh(self, project): run_dbt(["run", "--full-refresh"]) assert query_relation_type(project, "my_dynamic_table") == "dynamic_table" assert query_relation_type(project, "my_dynamic_table_downstream") == "dynamic_table" + if self.iceberg: + assert query_relation_type(project, "my_dynamic_iceberg_table") == "dynamic_table" + + +class TestBasicIcebergOn(TestBasic): + iceberg = True + + @pytest.fixture(scope="class") + def project_config_update(self): + return {"flags": {"enable_iceberg_materializations": True}} diff --git a/tests/functional/relation_tests/dynamic_table_tests/test_configuration_changes.py b/tests/functional/relation_tests/dynamic_table_tests/test_configuration_changes.py index 3c4f65a87..f389344e0 100644 --- a/tests/functional/relation_tests/dynamic_table_tests/test_configuration_changes.py +++ b/tests/functional/relation_tests/dynamic_table_tests/test_configuration_changes.py @@ -7,6 +7,7 @@ class Changes: + iceberg: bool = False @pytest.fixture(scope="class", autouse=True) def seeds(self): @@ -14,10 +15,18 @@ def seeds(self): @pytest.fixture(scope="class", autouse=True) def models(self): - yield { + my_models = { "dynamic_table_alter.sql": models.DYNAMIC_TABLE, "dynamic_table_replace.sql": models.DYNAMIC_TABLE, } + if self.iceberg: + my_models.update( + { + "dynamic_table_iceberg_alter.sql": models.DYNAMIC_ICEBERG_TABLE, + "dynamic_table_iceberg_replace.sql": models.DYNAMIC_ICEBERG_TABLE, + } + ) + yield my_models @pytest.fixture(scope="function", autouse=True) def setup_class(self, project): @@ -33,14 +42,23 @@ def setup_method(self, project, setup_class): update_model(project, "dynamic_table_alter", models.DYNAMIC_TABLE_ALTER) update_model(project, "dynamic_table_replace", models.DYNAMIC_TABLE_REPLACE) + if self.iceberg: + update_model( + project, "dynamic_table_iceberg_alter", models.DYNAMIC_ICEBERG_TABLE_ALTER + ) + update_model( + project, "dynamic_table_iceberg_replace", models.DYNAMIC_ICEBERG_TABLE_REPLACE + ) yield update_model(project, "dynamic_table_alter", models.DYNAMIC_TABLE) update_model(project, "dynamic_table_replace", models.DYNAMIC_TABLE) + if self.iceberg: + update_model(project, "dynamic_table_iceberg_alter", models.DYNAMIC_ICEBERG_TABLE) + update_model(project, "dynamic_table_iceberg_replace", models.DYNAMIC_ICEBERG_TABLE) - @staticmethod - def assert_changes_are_applied(project): + def assert_changes_are_applied(self, project): altered = describe_dynamic_table(project, "dynamic_table_alter") assert altered.snowflake_warehouse == "DBT_TESTING" assert altered.target_lag == "5 minutes" # this updated @@ -51,8 +69,18 @@ def assert_changes_are_applied(project): assert replaced.target_lag == "2 minutes" assert replaced.refresh_mode == "FULL" # this updated - @staticmethod - def assert_changes_are_not_applied(project): + if self.iceberg: + altered_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_alter") + assert altered_iceberg.snowflake_warehouse == "DBT_TESTING" + assert altered_iceberg.target_lag == "5 minutes" # this updated + assert altered_iceberg.refresh_mode == "INCREMENTAL" + + replaced_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_replace") + assert replaced_iceberg.snowflake_warehouse == "DBT_TESTING" + assert replaced_iceberg.target_lag == "2 minutes" + assert replaced_iceberg.refresh_mode == "FULL" # this updated + + def assert_changes_are_not_applied(self, project): altered = describe_dynamic_table(project, "dynamic_table_alter") assert altered.snowflake_warehouse == "DBT_TESTING" assert altered.target_lag == "2 minutes" # this would have updated, but didn't @@ -63,6 +91,19 @@ def assert_changes_are_not_applied(project): assert replaced.target_lag == "2 minutes" assert replaced.refresh_mode == "INCREMENTAL" # this would have updated, but didn't + if self.iceberg: + altered_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_alter") + assert altered_iceberg.snowflake_warehouse == "DBT_TESTING" + assert altered_iceberg.target_lag == "2 minutes" # this would have updated, but didn't + assert altered_iceberg.refresh_mode == "INCREMENTAL" + + replaced_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_replace") + assert replaced_iceberg.snowflake_warehouse == "DBT_TESTING" + assert replaced_iceberg.target_lag == "2 minutes" + assert ( + replaced_iceberg.refresh_mode == "INCREMENTAL" + ) # this would have updated, but didn't + def test_full_refresh_is_always_successful(self, project): # this always passes and always changes the configuration, regardless of on_configuration_change # and regardless of whether the changes require a replace versus an alter @@ -81,6 +122,17 @@ def test_changes_are_applied(self, project): self.assert_changes_are_applied(project) +class TestChangesApplyIcebergOn(TestChangesApply): + iceberg = True + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": {"on_configuration_change": "apply"}, + "flags": {"enable_iceberg_materializations": True}, + } + + class TestChangesContinue(Changes): @pytest.fixture(scope="class") def project_config_update(self): @@ -92,6 +144,17 @@ def test_changes_are_not_applied(self, project): self.assert_changes_are_not_applied(project) +class TestChangesContinueIcebergOn(TestChangesContinue): + iceberg = True + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": {"on_configuration_change": "continue"}, + "flags": {"enable_iceberg_materializations": True}, + } + + class TestChangesFail(Changes): @pytest.fixture(scope="class") def project_config_update(self): @@ -101,3 +164,14 @@ def test_changes_are_not_applied(self, project): # this fails and does not change the configuration run_dbt(["run"], expect_pass=False) self.assert_changes_are_not_applied(project) + + +class TestChangesFailIcebergOn(TestChangesFail): + iceberg = True + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": {"on_configuration_change": "fail"}, + "flags": {"enable_iceberg_materializations": True}, + } diff --git a/tests/functional/relation_tests/models.py b/tests/functional/relation_tests/models.py index 63dfff045..7b0050d11 100644 --- a/tests/functional/relation_tests/models.py +++ b/tests/functional/relation_tests/models.py @@ -55,7 +55,7 @@ select * from {{ ref('my_seed') }} """ -ICEBERG_INCREMENTAL_TABLE = """ +INCREMENTAL_ICEBERG_TABLE = """ {{ config( materialized='incremental', table_format='iceberg', @@ -65,3 +65,13 @@ ) }} select * from {{ ref('my_seed') }} """ + + +INCREMENTAL_TABLE = """ +{{ config( + materialized='incremental', + incremental_strategy='append', + unique_key="id", +) }} +select * from {{ ref('my_seed') }} +""" diff --git a/tests/functional/relation_tests/test_relation_type_change.py b/tests/functional/relation_tests/test_relation_type_change.py index c2886ad04..1024a92ca 100644 --- a/tests/functional/relation_tests/test_relation_type_change.py +++ b/tests/functional/relation_tests/test_relation_type_change.py @@ -13,16 +13,25 @@ class Model: model: str relation_type: str - table_format: Optional[str] = None - incremental: Optional[bool] = None + table_format: Optional[str] = "default" + is_incremental: Optional[bool] = False @property def name(self) -> str: - name = f"{self.relation_type}" - if self.table_format: - name += f"_{self.table_format}" + if self.is_incremental: + name = f"{self.relation_type}_{self.table_format}_incremental" + else: + name = f"{self.relation_type}_{self.table_format}" return name + @property + def is_iceberg(self) -> bool: + return self.table_format == "iceberg" + + @property + def is_standard_table(self) -> bool: + return self.relation_type == "table" and not self.is_incremental + @dataclass class Scenario: @@ -37,26 +46,47 @@ def name(self) -> str: def error_message(self) -> str: return f"Failed when migrating from: {self.initial.name} to: {self.final.name}" + @property + def uses_iceberg(self) -> bool: + return any([self.initial.is_iceberg, self.final.is_iceberg]) + relations = [ Model(models.VIEW, "view"), Model(models.TABLE, "table", "default"), - # to be activated upon merge of dynamic table support PR - # Model(models.DYNAMIC_TABLE, "dynamic_table", "default"), - # Model(models.DYNAMIC_ICEBERG_TABLE, "dynamic_table", "iceberg"), + Model(models.INCREMENTAL_TABLE, "table", "default", is_incremental=True), + Model(models.DYNAMIC_TABLE, "dynamic_table", "default"), Model(models.ICEBERG_TABLE, "table", "iceberg"), - Model(models.ICEBERG_INCREMENTAL_TABLE, "table", "iceberg", incremental=True), + Model(models.INCREMENTAL_ICEBERG_TABLE, "table", "iceberg", is_incremental=True), + Model(models.DYNAMIC_ICEBERG_TABLE, "dynamic_table", "iceberg"), ] scenarios = [Scenario(*scenario) for scenario in product(relations, relations)] +def requires_full_refresh(scenario) -> bool: + return any( + [ + # we can only swap incremental to table and back if both are iceberg + scenario.initial.is_incremental + and scenario.final.is_standard_table + and scenario.initial.table_format != scenario.final.table_format, + scenario.initial.is_standard_table + and scenario.final.is_incremental + and scenario.initial.table_format != scenario.final.table_format, + # we can't swap from an incremental to a dynamic table because the materialization does not handle this case + scenario.initial.relation_type == "dynamic_table" and scenario.final.is_incremental, + ] + ) + + class TestRelationTypeChange: + @pytest.fixture(scope="class") + def project_config_update(self): + return {"flags": {"enable_iceberg_materializations": False}} @staticmethod def include(scenario) -> bool: - return ( - scenario.initial.table_format != "iceberg" and scenario.final.table_format != "iceberg" - ) + return not scenario.uses_iceberg and not requires_full_refresh(scenario) @pytest.fixture(scope="class", autouse=True) def seeds(self): @@ -77,7 +107,11 @@ def setup(self, project): for scenario in scenarios: if self.include(scenario): update_model(project, scenario.name, scenario.final.model) - run_dbt(["run"]) + # allow for dbt to fail so that we can see which scenarios pass and which scenarios fail + try: + run_dbt(["run"], expect_pass=False) + except Exception: + pass @pytest.mark.parametrize("scenario", scenarios, ids=[scenario.name for scenario in scenarios]) def test_replace(self, project, scenario): @@ -91,9 +125,17 @@ def test_replace(self, project, scenario): pytest.skip() -""" -Upon adding the logic needed for seamless transitions to and from incremental models without data loss, we can coalesce these test cases. -""" +class TestRelationTypeChangeFullRefreshRequired(TestRelationTypeChange): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": {"enable_iceberg_materializations": False}, + "models": {"full_refresh": True}, + } + + @staticmethod + def include(scenario) -> bool: + return not scenario.uses_iceberg and requires_full_refresh(scenario) class TestRelationTypeChangeIcebergOn(TestRelationTypeChange): @@ -103,21 +145,17 @@ def project_config_update(self): @staticmethod def include(scenario) -> bool: - return any( - ( - # scenario 1: Everything that doesn't include incremental relations on Iceberg - ( - ( - scenario.initial.table_format == "iceberg" - or scenario.final.table_format == "iceberg" - ) - and not scenario.initial.incremental - and not scenario.final.incremental - ), - # scenario 2: Iceberg Incremental swaps allowed - ( - scenario.initial.table_format == "iceberg" - and scenario.final.table_format == "iceberg" - ), - ) - ) + return scenario.uses_iceberg and not requires_full_refresh(scenario) + + +class TestRelationTypeChangeIcebergOnFullRefreshRequired(TestRelationTypeChange): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": {"enable_iceberg_materializations": True}, + "models": {"full_refresh": True}, + } + + @staticmethod + def include(scenario) -> bool: + return scenario.uses_iceberg and requires_full_refresh(scenario)