Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adap 321/add iceberg incremental models #1194

Merged
merged 6 commits into from
Sep 27, 2024
15 changes: 7 additions & 8 deletions tests/functional/iceberg/test_incremental_models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
import time

from pathlib import Path

Expand Down Expand Up @@ -57,6 +58,8 @@


class TestIcebergIncrementalStrategies:
append: str = f"append_{hash(time.time())}"

@pytest.fixture(scope="class")
def project_config_update(self):
return {"flags": {"enable_iceberg_materializations": True}}
Expand All @@ -76,23 +79,19 @@ def setup_class(self, project):
def models(self):
return {
"upstream_table.sql": _MODEL_BASIC_TABLE_MODEL,
"append.sql": _MODEL_INCREMENTAL_ICEBERG_APPEND,
f"{self.append}.sql": _MODEL_INCREMENTAL_ICEBERG_APPEND,
"merge.sql": _MODEL_INCREMENTAL_ICEBERG_MERGE,
"delete_insert.sql": _MODEL_INCREMENTAL_ICEBERG_DELETE_INSERT,
}

def test_incremental_strategies_build(self, project, setup_class):
run_results = run_dbt()
assert len(run_results) == 4

def __check_correct_operations(self, model_name, /, rows_affected, status="SUCCESS"):
run_results = run_dbt(
["show", "--inline", f"select * from {{{{ ref('{model_name}') }}}} where world_id = 4"]
)
assert run_results[0].adapter_response["rows_affected"] == rows_affected
assert run_results[0].adapter_response["code"] == status

if model_name != "append":
if "append" not in model_name:
run_results, stdout = run_dbt_and_capture(
[
"show",
Expand All @@ -118,9 +117,9 @@ def test_incremental_strategies_with_update(self, project, setup_class):
)
)

run_results = run_dbt(["run", "-s", "append", "merge", "delete_insert"])
run_results = run_dbt(["run", "-s", self.append, "merge", "delete_insert"])
assert len(run_results) == 3

self.__check_correct_operations("append", rows_affected=3)
self.__check_correct_operations(self.append, rows_affected=2)
self.__check_correct_operations("merge", rows_affected=1)
self.__check_correct_operations("delete_insert", rows_affected=1)