Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make list_relations_without_caching pagination configurable #1235

Merged
merged 6 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20241107-170307.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: 'Allow configurable pagination on list_relations_without_caching to support
users with a large number of objects per schema'
time: 2024-11-07T17:03:07.826352-05:00
custom:
Author: mikealfare
Issue: "1234"
10 changes: 7 additions & 3 deletions dbt/include/snowflake/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,10 @@

{%- if loop.index == max_iter -%}
{%- set msg -%}
dbt will list a maximum of {{ max_total_results }} objects in schema {{ schema_relation }}.
Your schema exceeds this limit. Please contact [email protected] for troubleshooting tips,
or review and reduce the number of objects contained.
dbt is currently configured to list a maximum of {{ max_total_results }} objects per schema.
{{ schema_relation }} exceeds this limit. If this is expected, you may configure this limit
by setting list_relations_per_page and list_relations_page_limit in your project flags.
It is recommended to start by increasing list_relations_page_limit to something more than the default of 10.
{%- endset -%}

{% do exceptions.raise_compiler_error(msg) %}
Expand All @@ -135,6 +136,9 @@
{% endmacro %}

{% macro snowflake__list_relations_without_caching(schema_relation, max_iter=10, max_results_per_iter=10000) %}

{%- set max_results_per_iter = adapter.config.flags.get('list_relations_per_page', max_results_per_iter) -%}
{%- set max_iter = adapter.config.flags.get('list_relations_page_limit', max_iter) -%}
{%- set max_total_results = max_results_per_iter * max_iter -%}
{%- set sql -%}
{% if schema_relation is string %}
Expand Down
247 changes: 71 additions & 176 deletions tests/functional/adapter/list_relations_tests/test_pagination.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,31 @@
import os

import pytest
import json
from dbt.tests.util import run_dbt, run_dbt_and_capture
from dbt.adapters.snowflake import SnowflakeRelation # Ensure this is the correct import path

# Testing rationale:
# - snowflake SHOW TERSE OBJECTS command returns at max 10K objects in a single call
# - when dbt attempts to write into a schema with more than 10K objects, compilation will fail
# unless we paginate the result
# - however, testing this process is difficult at a full scale of 10K actual objects populated
# into a fresh testing schema
# - accordingly, we create a smaller set of views and test the looping iteration logic in
# smaller chunks

NUM_VIEWS = 90
NUM_DYNAMIC_TABLES = 10
# the total number should be between the numbers referenced in the "passing" and "failing" macros below
# - MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING (11 iter * 10 results per iter -> 110 objects)
# - MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING_RAISE_ERROR (33 iter * 3 results per iter -> 99 objects)
NUM_EXPECTED_RELATIONS = 1 + NUM_VIEWS + NUM_DYNAMIC_TABLES

TABLE_BASE_SQL = """
{{ config(materialized='table') }}

from dbt_common.exceptions import CompilationError
from dbt.tests.util import run_dbt

"""
Testing rationale:
- snowflake SHOW TERSE OBJECTS command returns at max 10K objects in a single call
- when dbt attempts to write into a schema with more than 10K objects, compilation will fail
unless we paginate the result
- we default pagination to 10 pages, but users want to configure this
- we instead use that here to force failures by making it smaller
"""


TABLE = """
{{ config(materialized='table') }}
select 1 as id
""".lstrip()
"""


VIEW_X_SQL = """
VIEW = """
{{ config(materialized='view') }}
select id from {{ ref('my_model_base') }}
""".lstrip()
"""


DYNAMIC_TABLE = (
"""
Expand All @@ -44,173 +41,71 @@
"""
)

MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING = """
{% macro validate_list_relations_without_caching(schema_relation) %}
{% set relation_list_result = snowflake__list_relations_without_caching(schema_relation, max_iter=11, max_results_per_iter=10) %}
{% set n_relations = relation_list_result | length %}
{{ log("n_relations: " ~ n_relations) }}
{% endmacro %}
"""

MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING_RAISE_ERROR = """
{% macro validate_list_relations_without_caching_raise_error(schema_relation) %}
{{ snowflake__list_relations_without_caching(schema_relation, max_iter=33, max_results_per_iter=3) }}
{% endmacro %}
"""


def parse_json_logs(json_log_output):
parsed_logs = []
for line in json_log_output.split("\n"):
try:
log = json.loads(line)
except ValueError:
continue

parsed_logs.append(log)

return parsed_logs

class BaseConfig:
VIEWS = 90
DYNAMIC_TABLES = 10

def find_result_in_parsed_logs(parsed_logs, result_name):
return next(
(
item["data"]["msg"]
for item in parsed_logs
if result_name in item["data"].get("msg", "msg")
),
False,
)


def find_exc_info_in_parsed_logs(parsed_logs, exc_info_name):
return next(
(
item["data"]["exc_info"]
for item in parsed_logs
if exc_info_name in item["data"].get("exc_info", "exc_info")
),
False,
)


class TestListRelationsWithoutCachingSingle:
@pytest.fixture(scope="class")
def models(self):
my_models = {"my_model_base.sql": TABLE_BASE_SQL}
for view in range(0, NUM_VIEWS):
my_models.update({f"my_model_{view}.sql": VIEW_X_SQL})
for dynamic_table in range(0, NUM_DYNAMIC_TABLES):
my_models.update({f"my_dynamic_table_{dynamic_table}.sql": DYNAMIC_TABLE})
my_models = {"my_model_base.sql": TABLE}
for view in range(0, self.VIEWS):
my_models[f"my_model_{view}.sql"] = VIEW
for dynamic_table in range(0, self.DYNAMIC_TABLES):
my_models[f"my_dynamic_table_{dynamic_table}.sql"] = DYNAMIC_TABLE
return my_models

@pytest.fixture(scope="class")
def macros(self):
return {
"validate_list_relations_without_caching.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING,
}
@pytest.fixture(scope="class", autouse=True)
def setup(self, project):
run_dbt(["run"])

def test__snowflake__list_relations_without_caching_termination(self, project):
"""
validates that we do NOT trigger pagination logic snowflake__list_relations_without_caching
macro when there are fewer than max_results_per_iter relations in the target schema
"""
run_dbt(["run", "-s", "my_model_base"])

database = project.database
schemas = project.created_schemas

for schema in schemas:
schema_relation = SnowflakeRelation.create(database=database, schema=schema)
kwargs = {"schema_relation": schema_relation.render()}
_, log_output = run_dbt_and_capture(
[
"--debug",
"--log-format=json",
"run-operation",
"validate_list_relations_without_caching",
"--args",
str(kwargs),
]
def test_list_relations(self, project):
kwargs = {"schema_relation": project.test_schema}
with project.adapter.connection_named("__test"):
relations = project.adapter.execute_macro(
"snowflake__list_relations_without_caching", kwargs=kwargs
)
assert len(relations) == self.VIEWS + self.DYNAMIC_TABLES + 1

parsed_logs = parse_json_logs(log_output)
n_relations = find_result_in_parsed_logs(parsed_logs, "n_relations")
assert n_relations == "n_relations: 1"

class TestListRelationsWithoutCachingSmall(BaseConfig):
pass

class TestListRelationsWithoutCachingFull:
@pytest.fixture(scope="class")
def models(self):
my_models = {"my_model_base.sql": TABLE_BASE_SQL}
for view in range(0, NUM_VIEWS):
my_models.update({f"my_model_{view}.sql": VIEW_X_SQL})
for dynamic_table in range(0, NUM_DYNAMIC_TABLES):
my_models.update({f"my_dynamic_table_{dynamic_table}.sql": DYNAMIC_TABLE})
return my_models

class TestListRelationsWithoutCachingLarge(BaseConfig):
@pytest.fixture(scope="class")
def macros(self):
def profiles_config_update(self):
return {
"validate_list_relations_without_caching.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING,
"validate_list_relations_without_caching_raise_error.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING_RAISE_ERROR,
"flags": {
"list_relations_per_page": 10,
"list_relations_page_limit": 20,
}
}

def test__snowflake__list_relations_without_caching(self, project):
"""
validates pagination logic in snowflake__list_relations_without_caching macro counts
the correct number of objects in the target schema when having to make multiple looped
calls of SHOW TERSE OBJECTS.
"""
# purpose of the first run is to create the replicated views in the target schema
run_dbt(["run"])

database = project.database
schemas = project.created_schemas

for schema in schemas:
schema_relation = SnowflakeRelation.create(database=database, schema=schema)
kwargs = {"schema_relation": schema_relation.render()}
_, log_output = run_dbt_and_capture(
[
"--debug",
"--log-format=json",
"run-operation",
"validate_list_relations_without_caching",
"--args",
str(kwargs),
]
)
parsed_logs = parse_json_logs(log_output)
n_relations = find_result_in_parsed_logs(parsed_logs, "n_relations")
class TestListRelationsWithoutCachingTooLarge(BaseConfig):

assert n_relations == f"n_relations: {NUM_EXPECTED_RELATIONS}"

def test__snowflake__list_relations_without_caching_raise_error(self, project):
"""
validates pagination logic terminates and raises a compilation error
when exceeding the limit of how many results to return.
"""
run_dbt(["run"])
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"flags": {
"list_relations_per_page": 10,
"list_relations_page_limit": 5,
}
}

database = project.database
schemas = project.created_schemas

for schema in schemas:
schema_relation = SnowflakeRelation.create(database=database, schema=schema)

kwargs = {"schema_relation": schema_relation.render()}
_, log_output = run_dbt_and_capture(
[
"--debug",
"--log-format=json",
"run-operation",
"validate_list_relations_without_caching_raise_error",
"--args",
str(kwargs),
],
expect_pass=False,
)
parsed_logs = parse_json_logs(log_output)
traceback = find_exc_info_in_parsed_logs(parsed_logs, "Traceback")
assert "dbt will list a maximum of 99 objects in schema " in traceback
def test_list_relations(self, project):
kwargs = {"schema_relation": project.test_schema}
with project.adapter.connection_named("__test"):
with pytest.raises(CompilationError) as error:
project.adapter.execute_macro(
"snowflake__list_relations_without_caching", kwargs=kwargs
)
assert "list_relations_per_page" in error.value.msg
assert "list_relations_page_limit" in error.value.msg

def test_on_run(self, project):
with pytest.raises(CompilationError) as error:
run_dbt(["run"])
assert "list_relations_per_page" in error.value.msg
assert "list_relations_page_limit" in error.value.msg