Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Iceberg table format in Dynamic Tables #1183

Merged
merged 36 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
4f7a40e
changelog
mikealfare Sep 17, 2024
7126592
changelog
mikealfare Sep 17, 2024
7d05939
first draft for iceberg dynamic tables
mikealfare Sep 17, 2024
01000a5
fix comparison operator
mikealfare Sep 17, 2024
1b9c00b
fix validation rules on catalog
mikealfare Sep 17, 2024
60094ed
Merge branch 'main' into dynamic-tables/iceberg-format
mikealfare Sep 18, 2024
cdc90b5
consider the entire catalog for a config change
mikealfare Sep 18, 2024
e65bba2
remove is_dynamic-related guards as that is ga now
mikealfare Sep 18, 2024
2ffe6ab
formatting
mikealfare Sep 18, 2024
f9f1d6a
formatting
mikealfare Sep 18, 2024
24626f9
Merge branch 'main' into dynamic-tables/iceberg-format
mikealfare Sep 18, 2024
3d6a76a
move .select back from agate.Row to agate.Table
mikealfare Sep 18, 2024
7444dff
formatting
mikealfare Sep 18, 2024
b1fa0ad
formatting
mikealfare Sep 18, 2024
0aee10b
fix catalog data marshalling
mikealfare Sep 19, 2024
c74283e
flip the flag for testing
mikealfare Sep 19, 2024
fb9aee1
simplify dynamic table testing
mikealfare Sep 20, 2024
50d75ae
Merge branch 'main' into dynamic-tables/iceberg-format
mikealfare Sep 20, 2024
04ec380
Merge branch 'main' into dynamic-tables/iceberg-format
mikealfare Sep 20, 2024
4373bdd
undo formatting changes to make function changes easier to see
mikealfare Sep 20, 2024
951a2ab
add iceberg dynamic tables to existing dynamic table tests
mikealfare Sep 20, 2024
610d379
update docstrings
mikealfare Sep 23, 2024
d9b8a6b
revert default iceberg to false, update tests to test both scenarios
mikealfare Sep 23, 2024
69d483b
Merge branch 'main' into dynamic-tables/iceberg-format
mikealfare Sep 24, 2024
24f7bee
use direct assignment instead of dict.update
mikealfare Sep 26, 2024
23419dd
update how the catalog gets built in the default scenario
mikealfare Sep 26, 2024
cf2f7be
Merge branch 'main' into dynamic-tables/iceberg-format
mikealfare Sep 26, 2024
8265e5e
remove string building from Model.name
mikealfare Sep 26, 2024
2c91dd4
stop testing odd target lag inputs which we don't want to support anyway
mikealfare Sep 26, 2024
529b875
comments and formatting
mikealfare Sep 26, 2024
f0684bf
comments and formatting
mikealfare Sep 26, 2024
7534895
Merge branch 'main' into dynamic-tables/iceberg-format
VersusFacit Sep 27, 2024
1d89f11
add standard incremental tables into the relation swap scenarios
mikealfare Sep 27, 2024
989c072
account for the fact that snowflake does not support renaming iceberg…
mikealfare Sep 27, 2024
5881986
account for all scenarios when swapping relation types, including tho…
mikealfare Sep 27, 2024
a42adda
make it clearer which scenarios are included in each run and why by p…
mikealfare Sep 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240917-100505.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add support for Iceberg table format in Dynamic Tables
time: 2024-09-17T10:05:05.609859-04:00
custom:
Author: mikealfare
Issue: "1183"
46 changes: 15 additions & 31 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,28 +247,19 @@ def list_relations_without_caching(
return []
raise

# this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory
columns = ["database_name", "schema_name", "name", "kind"]
if "is_dynamic" in schema_objects.column_names:
columns.append("is_dynamic")
if "is_iceberg" in schema_objects.column_names:
columns.append("is_iceberg")

return [self._parse_list_relations_result(obj) for obj in schema_objects.select(columns)]
return [self._parse_list_relations_result(obj) for obj in schema_objects]

def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation:
# this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory
# this can be reduced to always including `is_iceberg` once Snowflake adds it to show objects
try:
if self.behavior.enable_iceberg_materializations.no_warn:
database, schema, identifier, relation_type, is_dynamic, is_iceberg = result
else:
database, schema, identifier, relation_type, is_dynamic = result
except ValueError:
database, schema, identifier, relation_type = result
is_dynamic = "N"
if self.behavior.enable_iceberg_materializations.no_warn:
is_iceberg = "N"
# this can be collapsed once Snowflake adds is_iceberg to show objects
if self.behavior.enable_iceberg_materializations:
columns = ["database_name", "schema_name", "name", "kind", "is_dynamic", "is_iceberg"]
database, schema, identifier, relation_type, is_dynamic, is_iceberg = result.select(
columns
)
else:
columns = ["database_name", "schema_name", "name", "kind", "is_dynamic"]
database, schema, identifier, relation_type, is_dynamic = result.select(columns)
is_iceberg = "NO"

try:
relation_type = self.Relation.get_relation_type(relation_type.lower())
Expand All @@ -278,22 +269,15 @@ def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation
if relation_type == self.Relation.Table and is_dynamic == "Y":
relation_type = self.Relation.DynamicTable

# This line is the main gate on supporting Iceberg materializations. Pass forward a default
# table format, and no downstream table macros can build iceberg relations.
table_format: str = (
TableFormat.ICEBERG
if self.behavior.enable_iceberg_materializations.no_warn and is_iceberg in ("Y", "YES")
else TableFormat.DEFAULT
)
quote_policy = {"database": True, "schema": True, "identifier": True}

return self.Relation.create(
database=database,
schema=schema,
identifier=identifier,
type=relation_type,
table_format=table_format,
quote_policy=quote_policy,
table_format=(
TableFormat.ICEBERG if is_iceberg in ("Y", "YES") else TableFormat.DEFAULT
),
quote_policy={"database": True, "schema": True, "identifier": True},
)

def quote_seed_column(self, column: str, quote_config: Optional[bool]) -> str:
Expand Down
7 changes: 7 additions & 0 deletions dbt/adapters/snowflake/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from dbt_common.events.functions import fire_event, warn_or_error

from dbt.adapters.snowflake.relation_configs import (
SnowflakeCatalogConfigChange,
SnowflakeDynamicTableConfig,
SnowflakeDynamicTableConfigChangeset,
SnowflakeDynamicTableRefreshModeConfigChange,
Expand Down Expand Up @@ -114,6 +115,12 @@ def dynamic_table_config_changeset(
context=new_dynamic_table.refresh_mode,
)

if new_dynamic_table.catalog != existing_dynamic_table.catalog:
config_change_collection.catalog = SnowflakeCatalogConfigChange(
action=RelationConfigChangeAction.create,
context=new_dynamic_table.catalog,
)

if config_change_collection.has_changes:
return config_change_collection
return None
Expand Down
6 changes: 5 additions & 1 deletion dbt/adapters/snowflake/relation_configs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
from dbt.adapters.snowflake.relation_configs.catalog import (
SnowflakeCatalogConfig,
SnowflakeCatalogConfigChange,
)
from dbt.adapters.snowflake.relation_configs.dynamic_table import (
SnowflakeDynamicTableConfig,
SnowflakeDynamicTableConfigChangeset,
SnowflakeDynamicTableRefreshModeConfigChange,
SnowflakeDynamicTableWarehouseConfigChange,
SnowflakeDynamicTableTargetLagConfigChange,
)
from dbt.adapters.snowflake.relation_configs.formats import TableFormat
from dbt.adapters.snowflake.relation_configs.policies import (
SnowflakeIncludePolicy,
SnowflakeQuotePolicy,
SnowflakeRelationType,
)
from dbt.adapters.snowflake.relation_configs.formats import TableFormat
121 changes: 121 additions & 0 deletions dbt/adapters/snowflake/relation_configs/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional, TYPE_CHECKING, Set

if TYPE_CHECKING:
import agate

from dbt.adapters.relation_configs import (
RelationConfigChange,
RelationResults,
RelationConfigValidationMixin,
RelationConfigValidationRule,
)
from dbt.adapters.contracts.relation import RelationConfig
from dbt_common.exceptions import DbtConfigError
from typing_extensions import Self

from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase
from dbt.adapters.snowflake.relation_configs.formats import TableFormat


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeCatalogConfig(SnowflakeRelationConfigBase, RelationConfigValidationMixin):
"""
This config follow the specs found here:
https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table
https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#create-dynamic-iceberg-table

The following parameters are configurable by dbt:
- table_format: format for interfacing with the table, e.g. default, iceberg
- external_volume: name of the external volume in Snowflake
- base_location: the directory within the external volume that contains the data
*Note*: This directory can’t be changed after you create a table.

The following parameters are not currently configurable by dbt:
- name: snowflake
"""

table_format: Optional[TableFormat] = TableFormat.default()
name: Optional[str] = "SNOWFLAKE"
external_volume: Optional[str] = None
base_location: Optional[str] = None

@property
def validation_rules(self) -> Set[RelationConfigValidationRule]:
return {
RelationConfigValidationRule(
(self.table_format == "default")
or (self.table_format == "iceberg" and self.base_location is not None),
DbtConfigError("Please provide a `base_location` when using iceberg"),
),
RelationConfigValidationRule(
(self.table_format == "default")
or (self.table_format == "iceberg" and self.name == "SNOWFLAKE"),
DbtConfigError(
"Only Snowflake catalogs are currently supported when using iceberg"
),
),
}

@classmethod
def from_dict(cls, config_dict: Dict[str, Any]) -> Self:
kwargs_dict = {
"name": config_dict.get("name"),
"external_volume": config_dict.get("external_volume"),
"base_location": config_dict.get("base_location"),
}
if table_format := config_dict.get("table_format"):
kwargs_dict.update({"table_format": TableFormat(table_format)})
return super().from_dict(kwargs_dict)

@classmethod
def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any]:

if relation_config.config.extra.get("table_format") is None:
return {"table_format": "default"}

config_dict = {
"table_format": relation_config.config.extra.get("table_format"),
"name": "SNOWFLAKE", # this is not currently configurable
}

if external_volume := relation_config.config.extra.get("external_volume"):
config_dict.update({"external_volume": external_volume})
mikealfare marked this conversation as resolved.
Show resolved Hide resolved

if base_location := relation_config.config.extra.get("base_location_subpath"):
config_dict.update({"base_location": base_location})

return config_dict

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
catalog_results: "agate.Table" = relation_results["catalog"]
mikealfare marked this conversation as resolved.
Show resolved Hide resolved

if catalog_results is None or len(catalog_results) == 0:
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
return {"table_format": "default"}

# for now, if we get catalog results, it's because this is an iceberg table
# this is because we only run `show iceberg tables` to get catalog metadata
# this will need to be updated once this is in `show objects`
catalog: "agate.Row" = catalog_results.rows[0]
config_dict = {"table_format": "iceberg"}

if name := catalog.get("catalog_name"):
config_dict.update({"name": name})
mikealfare marked this conversation as resolved.
Show resolved Hide resolved

if external_volume := catalog.get("external_volume_name"):
config_dict.update({"external_volume": external_volume})

if base_location := catalog.get("base_location"):
config_dict.update({"base_location": base_location})

return config_dict


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeCatalogConfigChange(RelationConfigChange):
context: Optional[SnowflakeCatalogConfig] = None

@property
def requires_full_refresh(self) -> bool:
return True
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
20 changes: 15 additions & 5 deletions dbt/adapters/snowflake/relation_configs/dynamic_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
from typing_extensions import Self

from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase
from dbt.adapters.snowflake.relation_configs.catalog import (
SnowflakeCatalogConfig,
SnowflakeCatalogConfigChange,
)


if TYPE_CHECKING:
import agate
Expand Down Expand Up @@ -55,11 +60,12 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
query: str
target_lag: str
snowflake_warehouse: str
catalog: SnowflakeCatalogConfig
refresh_mode: Optional[RefreshMode] = RefreshMode.default()
initialize: Optional[Initialize] = Initialize.default()

@classmethod
def from_dict(cls, config_dict) -> "SnowflakeDynamicTableConfig":
def from_dict(cls, config_dict: Dict[str, Any]) -> Self:
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
kwargs_dict = {
"name": cls._render_part(ComponentName.Identifier, config_dict.get("name")),
"schema_name": cls._render_part(ComponentName.Schema, config_dict.get("schema_name")),
Expand All @@ -69,12 +75,12 @@ def from_dict(cls, config_dict) -> "SnowflakeDynamicTableConfig":
"query": config_dict.get("query"),
"target_lag": config_dict.get("target_lag"),
"snowflake_warehouse": config_dict.get("snowflake_warehouse"),
"catalog": SnowflakeCatalogConfig.from_dict(config_dict),
"refresh_mode": config_dict.get("refresh_mode"),
"initialize": config_dict.get("initialize"),
}

dynamic_table: "SnowflakeDynamicTableConfig" = super().from_dict(kwargs_dict)
return dynamic_table
return super().from_dict(kwargs_dict)

@classmethod
def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any]:
Expand All @@ -85,6 +91,7 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any
"query": relation_config.compiled_code,
"target_lag": relation_config.config.extra.get("target_lag"),
"snowflake_warehouse": relation_config.config.extra.get("snowflake_warehouse"),
"catalog": SnowflakeCatalogConfig.parse_relation_config(relation_config),
}

if refresh_mode := relation_config.config.extra.get("refresh_mode"):
Expand All @@ -96,7 +103,7 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any
return config_dict

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict:
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
dynamic_table: "agate.Row" = relation_results["dynamic_table"].rows[0]

config_dict = {
Expand All @@ -106,6 +113,7 @@ def parse_relation_results(cls, relation_results: RelationResults) -> Dict:
"query": dynamic_table.get("text"),
"target_lag": dynamic_table.get("target_lag"),
"snowflake_warehouse": dynamic_table.get("warehouse"),
"catalog": SnowflakeCatalogConfig.parse_relation_results(relation_results),
"refresh_mode": dynamic_table.get("refresh_mode"),
# we don't get initialize since that's a one-time scheduler attribute, not a DT attribute
}
Expand Down Expand Up @@ -145,6 +153,7 @@ class SnowflakeDynamicTableConfigChangeset:
target_lag: Optional[SnowflakeDynamicTableTargetLagConfigChange] = None
snowflake_warehouse: Optional[SnowflakeDynamicTableWarehouseConfigChange] = None
refresh_mode: Optional[SnowflakeDynamicTableRefreshModeConfigChange] = None
catalog: Optional[SnowflakeCatalogConfigChange] = None

@property
def requires_full_refresh(self) -> bool:
Expand All @@ -157,9 +166,10 @@ def requires_full_refresh(self) -> bool:
else False
),
self.refresh_mode.requires_full_refresh if self.refresh_mode else False,
self.catalog.requires_full_refresh if self.catalog else False,
]
)

@property
def has_changes(self) -> bool:
return any([self.target_lag, self.snowflake_warehouse, self.refresh_mode])
return any([self.target_lag, self.snowflake_warehouse, self.refresh_mode, self.catalog])
5 changes: 5 additions & 0 deletions dbt/adapters/snowflake/relation_configs/formats.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11
from typing_extensions import Self


class TableFormat(StrEnum):
Expand All @@ -10,5 +11,9 @@ class TableFormat(StrEnum):
DEFAULT = "default"
ICEBERG = "iceberg"

@classmethod
def default(cls) -> Self:
return cls("default")

def __str__(self):
return self.value
1 change: 1 addition & 0 deletions dbt/include/snowflake/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
{% macro snowflake__list_relations_without_caching(schema_relation, max_iter=10, max_results_per_iter=10000) %}

{%- set max_total_results = max_results_per_iter * max_iter -%}

{%- set sql -%}
{% if schema_relation is string %}
show objects in {{ schema_relation }} limit {{ max_results_per_iter }};
Expand Down
24 changes: 12 additions & 12 deletions dbt/include/snowflake/macros/relations/dynamic_table/alter.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,22 @@
target_relation,
sql
) -%}
{{- log('Applying ALTER to: ' ~ existing_relation) -}}
{{- log('Applying ALTER to: ' ~ existing_relation) -}}

{% if configuration_changes.requires_full_refresh %}
{{- get_replace_sql(existing_relation, target_relation, sql) -}}
{% if configuration_changes.requires_full_refresh %}
{{- get_replace_sql(existing_relation, target_relation, sql) -}}

{% else %}
{% else %}

{%- set target_lag = configuration_changes.target_lag -%}
{%- if target_lag -%}{{- log('Applying UPDATE TARGET_LAG to: ' ~ existing_relation) -}}{%- endif -%}
{%- set snowflake_warehouse = configuration_changes.snowflake_warehouse -%}
{%- if snowflake_warehouse -%}{{- log('Applying UPDATE WAREHOUSE to: ' ~ existing_relation) -}}{%- endif -%}
{%- set target_lag = configuration_changes.target_lag -%}
{%- if target_lag -%}{{- log('Applying UPDATE TARGET_LAG to: ' ~ existing_relation) -}}{%- endif -%}
{%- set snowflake_warehouse = configuration_changes.snowflake_warehouse -%}
{%- if snowflake_warehouse -%}{{- log('Applying UPDATE WAREHOUSE to: ' ~ existing_relation) -}}{%- endif -%}

alter dynamic table {{ existing_relation }} set
{% if target_lag %}target_lag = '{{ target_lag.context }}'{% endif %}
{% if snowflake_warehouse %}warehouse = {{ snowflake_warehouse.context }}{% endif %}
alter dynamic table {{ existing_relation }} set
{% if target_lag %}target_lag = '{{ target_lag.context }}'{% endif %}
{% if snowflake_warehouse %}warehouse = {{ snowflake_warehouse.context }}{% endif %}

{%- endif -%}
{%- endif -%}

{%- endmacro %}
Loading
Loading