Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use timestamp_tz type in microbatch delete DDL #1257

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241127-162204.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Use timestamp_tz type in microbatch `delete` DDL
time: 2024-11-27T16:22:04.103212-05:00
custom:
Author: michelleark
Issue: "1256"
4 changes: 2 additions & 2 deletions dbt/include/snowflake/macros/materializations/merge.sql
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@

{#-- Add additional incremental_predicates to filter for batch --#}
{% if model.config.get("__dbt_internal_microbatch_event_time_start") -%}
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") %}
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= to_timestamp_tz('" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "')") %}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interestingly, just using the raw timestamp (that includes UTC offset, which model.config.__dbt_internal_microbatch_event_time_start does) works well. E.g. just removing TIMESTAMP would have been enough here. However, the to_timestamp_tz piece for more intentional so I've opted for that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like how using TIMESTAMP was causing the issue 🫠 I like th emove to using to_timestamp_tz instead of just removing TIMESTAMP 👍

{% endif %}
{% if model.config.__dbt_internal_microbatch_event_time_end -%}
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") %}
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < to_timestamp_tz('" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "')") %}
{% endif %}
{% do arg_dict.update({'incremental_predicates': incremental_predicates}) %}

Expand Down
16 changes: 15 additions & 1 deletion tests/functional/adapter/test_incremental_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,20 @@
BaseMicrobatch,
)

# Create input with UTC timestamps
_input_model_sql = """
{{ config(materialized='table', event_time='event_time') }}
select 1 as id, to_timestamp_tz('2020-01-01 00:00:00-0') as event_time
union all
select 2 as id, to_timestamp_tz('2020-01-02 00:00:00-0') as event_time
union all
select 3 as id, to_timestamp_tz('2020-01-03 00:00:00-0') as event_time
"""


# No requirement for a unique_id for snowflake microbatch!
_microbatch_model_no_unique_id_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0), pre_hook="alter session set timezone = 'America/Los_Angeles'") }}
select * from {{ ref('input_model') }}
"""

Expand All @@ -16,6 +26,10 @@ class TestSnowflakeMicrobatch(BaseMicrobatch):
def microbatch_model_sql(self) -> str:
return _microbatch_model_no_unique_id_sql

@pytest.fixture(scope="class")
def input_model_sql(self) -> str:
return _input_model_sql

@pytest.fixture(scope="class")
def insert_two_rows_sql(self, project) -> str:
test_schema_relation = project.adapter.Relation.create(
Expand Down