From d2782dc6a74ad37bee94df18b2793000420b40a9 Mon Sep 17 00:00:00 2001 From: Anders Swanson Date: Wed, 15 May 2024 15:39:42 -0400 Subject: [PATCH 1/2] add external table --- .../snowflake/relation_configs/policies.py | 1 + .../external_table/external_table.sql | 77 +++++++++++ .../relations/external_table/helpers.sql | 125 ++++++++++++++++++ 3 files changed, 203 insertions(+) create mode 100644 dbt/include/snowflake/macros/relations/external_table/external_table.sql create mode 100644 dbt/include/snowflake/macros/relations/external_table/helpers.sql diff --git a/dbt/adapters/snowflake/relation_configs/policies.py b/dbt/adapters/snowflake/relation_configs/policies.py index 75195f9a3..2ad42dc0b 100644 --- a/dbt/adapters/snowflake/relation_configs/policies.py +++ b/dbt/adapters/snowflake/relation_configs/policies.py @@ -10,6 +10,7 @@ class SnowflakeRelationType(StrEnum): CTE = "cte" External = "external" DynamicTable = "dynamic_table" + ExternalTable = "external_table" class SnowflakeIncludePolicy(Policy): diff --git a/dbt/include/snowflake/macros/relations/external_table/external_table.sql b/dbt/include/snowflake/macros/relations/external_table/external_table.sql new file mode 100644 index 000000000..02fce568a --- /dev/null +++ b/dbt/include/snowflake/macros/relations/external_table/external_table.sql @@ -0,0 +1,77 @@ +{% macro snowflake__create_external_table(source_node) %} + + {%- set columns = source_node.columns.values() -%} + {%- set external = source_node.external -%} + {%- set partitions = external.partitions -%} + {%- set infer_schema = external.infer_schema -%} + + {% if infer_schema %} + {% set query_infer_schema %} + select * from table( infer_schema( location=>'{{external.location}}', file_format=>'{{external.file_format}}') ) + {% endset %} + {% if execute %} + {% set columns_infer = run_query(query_infer_schema) %} + {% endif %} + {% endif %} + + {%- set is_csv = dbt_external_tables.is_csv(external.file_format) -%} + +{# https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html #} +{# This assumes you have already created an external stage #} + create or replace external table {{source(source_node.source_name, source_node.name)}} + {%- if columns or partitions or infer_schema -%} + ( + {%- if partitions -%}{%- for partition in partitions %} + {{partition.name}} {{partition.data_type}} as {{partition.expression}}{{- ',' if not loop.last or columns|length > 0 or infer_schema -}} + {%- endfor -%}{%- endif -%} + {%- if not infer_schema -%} + {%- for column in columns %} + {%- set column_quoted = adapter.quote(column.name) if column.quote else column.name %} + {%- set column_alias -%} + {%- if 'alias' in column and column.quote -%} + {{adapter.quote(column.alias)}} + {%- elif 'alias' in column -%} + {{column.alias}} + {%- else -%} + {{column_quoted}} + {%- endif -%} + {%- endset %} + {%- set col_expression -%} + {%- if column.expression -%} + {{column.expression}} + {%- else -%} + {%- set col_id = 'value:c' ~ loop.index if is_csv else 'value:' ~ column_alias -%} + (case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end) + {%- endif -%} + {%- endset %} + {{column_alias}} {{column.data_type}} as ({{col_expression}}::{{column.data_type}}) + {{- ',' if not loop.last -}} + {% endfor %} + {% else %} + {%- for column in columns_infer %} + {%- set col_expression -%} + {%- set col_id = 'value:' ~ column[0] -%} + (case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end) + {%- endset %} + {{column[0]}} {{column[1]}} as ({{col_expression}}::{{column[1]}}) + {{- ',' if not loop.last -}} + {% endfor %} + {%- endif -%} + ) + {%- endif -%} + {% if partitions %} partition by ({{partitions|map(attribute='name')|join(', ')}}) {% endif %} + location = {{external.location}} {# stage #} + {% if external.auto_refresh in (true, false) -%} + auto_refresh = {{external.auto_refresh}} + {%- endif %} + {% if external.aws_sns_topic -%} + aws_sns_topic = '{{external.aws_sns_topic}}' + {%- endif %} + {% if external.table_format | lower == "delta" %} + refresh_on_create = false + {% endif %} + {% if external.pattern -%} pattern = '{{external.pattern}}' {%- endif %} + {% if external.integration -%} integration = '{{external.integration}}' {%- endif %} + file_format = {{external.file_format}} + {% if external.table_format -%} table_format = '{{external.table_format}}' {%- endif %} +{% endmacro %} diff --git a/dbt/include/snowflake/macros/relations/external_table/helpers.sql b/dbt/include/snowflake/macros/relations/external_table/helpers.sql new file mode 100644 index 000000000..257f35b54 --- /dev/null +++ b/dbt/include/snowflake/macros/relations/external_table/helpers.sql @@ -0,0 +1,125 @@ +{% macro snowflake__create_external_schema(source_node) %} + + {% set schema_exists_query %} + show terse schemas like '{{ source_node.schema }}' in database {{ source_node.database }} limit 1; + {% endset %} + {% if execute %} + {% set schema_exists = run_query(schema_exists_query)|length > 0 %} + {% else %} + {% set schema_exists = false %} + {% endif %} + + {% if schema_exists %} + {% set ddl %} + select 'Schema {{ source_node.schema }} exists' from dual; + {% endset %} + {% else %} + {% set fqn %} + {% if source_node.database %} + {{ source_node.database }}.{{ source_node.schema }} + {% else %} + {{ source_node.schema }} + {% endif %} + {% endset %} + + {% set ddl %} + create schema if not exists {{ fqn }}; + {% endset %} + {% endif %} + + {% do return(ddl) %} + +{% endmacro %} + +{% macro snowflake__refresh_external_table(source_node) %} + + {% set external = source_node.external %} + {% set snowpipe = source_node.external.get('snowpipe', none) %} + + {% set auto_refresh = external.get('auto_refresh', false) %} + {% set partitions = external.get('partitions', none) %} + {% set delta_format = (external.table_format | lower == "delta") %} + + {% set manual_refresh = not auto_refresh %} + + {% if manual_refresh %} + + {% set ddl %} + begin; + alter external table {{source(source_node.source_name, source_node.name)}} refresh; + commit; + {% endset %} + + {% do return([ddl]) %} + + {% else %} + + {% do return([]) %} + + {% endif %} + +{% endmacro %} + +{% macro is_csv(file_format) %} + +{# From https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html: + +Important: The external table does not inherit the file format, if any, in the +stage definition. You must explicitly specify any file format options for the +external table using the FILE_FORMAT parameter. + +Note: FORMAT_NAME and TYPE are mutually exclusive; to avoid unintended behavior, +you should only specify one or the other when creating an external table. + +#} + + {% set ff_ltrimmed = file_format|lower|replace(' ','') %} + + {% if 'type=' in ff_ltrimmed %} + + {% if 'type=csv' in ff_ltrimmed %} + + {{return(true)}} + + {% else %} + + {{return(false)}} + + {% endif %} + + {% else %} + + {% set ff_standardized = ff_ltrimmed + | replace('(','') | replace(')','') + | replace('format_name=','') %} + {% set fqn = ff_standardized.split('.') %} + + {% if fqn | length == 3 %} + {% set ff_database, ff_schema, ff_identifier = fqn[0], fqn[1], fqn[2] %} + {% elif fqn | length == 2 %} + {% set ff_database, ff_schema, ff_identifier = target.database, fqn[0], fqn[1] %} + {% else %} + {% set ff_database, ff_schema, ff_identifier = target.database, target.schema, fqn[0] %} + {% endif %} + + {% call statement('get_file_format', fetch_result = True) %} + show file formats in {{ff_database}}.{{ff_schema}} + {% endcall %} + + {% set ffs = load_result('get_file_format').table %} + + {% for ff in ffs %} + + {% if ff['name']|lower == ff_identifier and ff['type']|lower == 'csv' %} + + {{return(true)}} + + {% endif %} + + {% endfor %} + + {{return(false)}} + + {% endif %} + +{% endmacro %} From bd49b63599fcb0836af401cb24b6b22adc0e2480 Mon Sep 17 00:00:00 2001 From: Anders Swanson Date: Wed, 15 May 2024 17:48:18 -0400 Subject: [PATCH 2/2] materialize exactly one external table --- .../external_table/external_table.sql | 88 ++++++------------- 1 file changed, 27 insertions(+), 61 deletions(-) diff --git a/dbt/include/snowflake/macros/relations/external_table/external_table.sql b/dbt/include/snowflake/macros/relations/external_table/external_table.sql index 02fce568a..edf127abc 100644 --- a/dbt/include/snowflake/macros/relations/external_table/external_table.sql +++ b/dbt/include/snowflake/macros/relations/external_table/external_table.sql @@ -3,75 +3,41 @@ {%- set columns = source_node.columns.values() -%} {%- set external = source_node.external -%} {%- set partitions = external.partitions -%} - {%- set infer_schema = external.infer_schema -%} - {% if infer_schema %} - {% set query_infer_schema %} - select * from table( infer_schema( location=>'{{external.location}}', file_format=>'{{external.file_format}}') ) - {% endset %} - {% if execute %} - {% set columns_infer = run_query(query_infer_schema) %} - {% endif %} - {% endif %} + {# {{ log('XXX: columns: ' ~ columns, info=True) }} + {{ log('XXX: partitions: ' ~ columns, info=True) }} + {% set partition_map = partitions|map(attribute='name')|join(', ') %} + {{ log('XXX: partition_map: ' ~ partition_map, info=True) }} #} + + + {%- set is_csv = dbt_external_tables.is_csv(external.file_format) -%} + {%- set relation = api.Relation.create( + database=source_node.database, schema=source_node.schema, identifier=source_node.name, + type='external_table') -%} + {# https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html #} {# This assumes you have already created an external stage #} - create or replace external table {{source(source_node.source_name, source_node.name)}} - {%- if columns or partitions or infer_schema -%} + + create or replace external table {{ relation }} ( - {%- if partitions -%}{%- for partition in partitions %} - {{partition.name}} {{partition.data_type}} as {{partition.expression}}{{- ',' if not loop.last or columns|length > 0 or infer_schema -}} - {%- endfor -%}{%- endif -%} - {%- if not infer_schema -%} - {%- for column in columns %} - {%- set column_quoted = adapter.quote(column.name) if column.quote else column.name %} - {%- set column_alias -%} - {%- if 'alias' in column and column.quote -%} - {{adapter.quote(column.alias)}} - {%- elif 'alias' in column -%} - {{column.alias}} - {%- else -%} - {{column_quoted}} - {%- endif -%} - {%- endset %} - {%- set col_expression -%} - {%- if column.expression -%} - {{column.expression}} - {%- else -%} - {%- set col_id = 'value:c' ~ loop.index if is_csv else 'value:' ~ column_alias -%} - (case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end) - {%- endif -%} - {%- endset %} - {{column_alias}} {{column.data_type}} as ({{col_expression}}::{{column.data_type}}) - {{- ',' if not loop.last -}} - {% endfor %} - {% else %} - {%- for column in columns_infer %} - {%- set col_expression -%} - {%- set col_id = 'value:' ~ column[0] -%} - (case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end) - {%- endset %} - {{column[0]}} {{column[1]}} as ({{col_expression}}::{{column[1]}}) - {{- ',' if not loop.last -}} - {% endfor %} - {%- endif -%} + + {%- for column in columns %} + {{ log('column: ' ~ column.name, info=True) }} + {%- set column_alias = column.name %} + {%- set col_expression -%} + {%- set col_id = 'value:c' ~ loop.index if is_csv else 'value:' ~ column_alias -%} + (case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end) + {%- endset %} + {{column_alias}} {{column.data_type}} as ({{col_expression}}::{{column.data_type}}) + {{- ',' if not loop.last -}} + {% endfor %} + ) - {%- endif -%} - {% if partitions %} partition by ({{partitions|map(attribute='name')|join(', ')}}) {% endif %} location = {{external.location}} {# stage #} - {% if external.auto_refresh in (true, false) -%} - auto_refresh = {{external.auto_refresh}} - {%- endif %} - {% if external.aws_sns_topic -%} - aws_sns_topic = '{{external.aws_sns_topic}}' - {%- endif %} - {% if external.table_format | lower == "delta" %} - refresh_on_create = false - {% endif %} - {% if external.pattern -%} pattern = '{{external.pattern}}' {%- endif %} - {% if external.integration -%} integration = '{{external.integration}}' {%- endif %} + file_format = {{external.file_format}} - {% if external.table_format -%} table_format = '{{external.table_format}}' {%- endif %} + {% endmacro %}