From 95d606396157a968e35bf1170b1e43989dfc97f5 Mon Sep 17 00:00:00 2001 From: rudolfix Date: Sun, 15 Dec 2024 16:49:14 +0100 Subject: [PATCH] Fix/refresh standalone resources (#2140) * drops tables from schema and relational * documents custom sections for sql_database and source rename * clones schema without data tables when resources without source are extacted, adds tests * skips airflow tests if not installed * adds doc on setting up FUSE on bucket * adds doc on setting up FUSE on bucket * adds row key propagation for table when its nested table require it * fixes tests --- Makefile | 3 +- dlt/common/normalizers/json/__init__.py | 4 + dlt/common/normalizers/json/relational.py | 54 ++++++++++--- dlt/common/schema/schema.py | 2 + dlt/extract/extract.py | 7 +- docs/tools/check_embedded_snippets.py | 9 ++- .../verified-sources/sql_database/advanced.md | 21 +++++ docs/website/docs/general-usage/source.md | 17 +++- docs/website/docs/reference/performance.md | 26 ++++++ .../normalizers/test_json_relational.py | 33 ++++++++ .../airflow_tests/test_airflow_provider.py | 4 + .../airflow_tests/test_airflow_wrapper.py | 2 + .../test_join_airflow_scheduler.py | 3 + tests/helpers/airflow_tests/utils.py | 8 +- tests/load/pipeline/test_drop.py | 55 ++++++++++--- tests/load/pipeline/test_refresh_modes.py | 79 ++++++++++++++----- tests/pipeline/test_pipeline.py | 24 ++++++ 17 files changed, 304 insertions(+), 47 deletions(-) diff --git a/Makefile b/Makefile index 0ca8a2e0c3..975a8a42da 100644 --- a/Makefile +++ b/Makefile @@ -63,7 +63,6 @@ format: lint-snippets: cd docs/tools && poetry run python check_embedded_snippets.py full - lint-and-test-snippets: lint-snippets poetry run mypy --config-file mypy.ini docs/website docs/tools --exclude docs/tools/lint_setup --exclude docs/website/docs_processed poetry run flake8 --max-line-length=200 docs/website docs/tools --exclude docs/website/.dlt-repo @@ -82,7 +81,7 @@ lint-security: poetry run bandit -r dlt/ -n 3 -l test: - (set -a && . tests/.env && poetry run pytest tests) + poetry run pytest tests test-load-local: DESTINATION__POSTGRES__CREDENTIALS=postgresql://loader:loader@localhost:5432/dlt_data DESTINATION__DUCKDB__CREDENTIALS=duckdb:///_storage/test_quack.duckdb poetry run pytest tests -k '(postgres or duckdb)' diff --git a/dlt/common/normalizers/json/__init__.py b/dlt/common/normalizers/json/__init__.py index 725f6a8355..ae5e06fe2e 100644 --- a/dlt/common/normalizers/json/__init__.py +++ b/dlt/common/normalizers/json/__init__.py @@ -36,6 +36,10 @@ def extend_schema(self) -> None: def extend_table(self, table_name: str) -> None: pass + @abc.abstractmethod + def remove_table(self, table_name: str) -> None: + pass + @classmethod @abc.abstractmethod def update_normalizer_config(cls, schema: Schema, config: TNormalizerConfig) -> None: diff --git a/dlt/common/normalizers/json/relational.py b/dlt/common/normalizers/json/relational.py index e365017125..36845b2e14 100644 --- a/dlt/common/normalizers/json/relational.py +++ b/dlt/common/normalizers/json/relational.py @@ -1,4 +1,16 @@ -from typing import Dict, List, Mapping, Optional, Sequence, Tuple, cast, TypedDict, Any +from typing import ( + ClassVar, + Dict, + List, + Mapping, + Optional, + Sequence, + Tuple, + Type, + cast, + TypedDict, + Any, +) from dlt.common.normalizers.exceptions import InvalidJsonNormalizer from dlt.common.normalizers.typing import TJSONNormalizer @@ -14,6 +26,9 @@ from dlt.common.schema.utils import ( column_name_validator, is_nested_table, + get_nested_tables, + has_column_with_prop, + get_first_column_name_with_prop, ) from dlt.common.utils import update_dict_nested from dlt.common.normalizers.json import ( @@ -48,6 +63,7 @@ class DataItemNormalizer(DataItemNormalizerBase[RelationalNormalizerConfig]): # other constants EMPTY_KEY_IDENTIFIER = "_empty" # replace empty keys with this + RELATIONAL_CONFIG_TYPE: ClassVar[Type[RelationalNormalizerConfig]] = RelationalNormalizerConfig normalizer_config: RelationalNormalizerConfig propagation_config: RelationalNormalizerConfigPropagation @@ -310,20 +326,38 @@ def extend_table(self, table_name: str) -> None: Table name should be normalized. """ table = self.schema.tables.get(table_name) - if not is_nested_table(table) and table.get("write_disposition") == "merge": - DataItemNormalizer.update_normalizer_config( + # add root key prop when merge disposition is used or any of nested tables needs row_key + if not is_nested_table(table) and ( + table.get("write_disposition") == "merge" + or any( + has_column_with_prop(t, "root_key", include_incomplete=True) + for t in get_nested_tables(self.schema.tables, table_name) + ) + ): + # get row id column from table, assume that we propagate it into c_dlt_root_id always + c_dlt_id = get_first_column_name_with_prop(table, "row_key", include_incomplete=True) + self.update_normalizer_config( self.schema, { "propagation": { "tables": { table_name: { - TColumnName(self.c_dlt_id): TColumnName(self.c_dlt_root_id) + TColumnName(c_dlt_id or self.c_dlt_id): TColumnName( + self.c_dlt_root_id + ) } } } }, ) + def remove_table(self, table_name: str) -> None: + """Called by the Schema when table is removed from it.""" + config = self.get_normalizer_config(self.schema) + if propagation := config.get("propagation"): + if tables := propagation.get("tables"): + tables.pop(table_name, None) + def normalize_data_item( self, item: TDataItem, load_id: str, table_name: str ) -> TNormalizedRowIterator: @@ -352,8 +386,8 @@ def normalize_data_item( def ensure_this_normalizer(cls, norm_config: TJSONNormalizer) -> None: # make sure schema has right normalizer present_normalizer = norm_config["module"] - if present_normalizer != __name__: - raise InvalidJsonNormalizer(__name__, present_normalizer) + if present_normalizer != cls.__module__: + raise InvalidJsonNormalizer(cls.__module__, present_normalizer) @classmethod def update_normalizer_config(cls, schema: Schema, config: RelationalNormalizerConfig) -> None: @@ -371,8 +405,10 @@ def get_normalizer_config(cls, schema: Schema) -> RelationalNormalizerConfig: cls.ensure_this_normalizer(norm_config) return cast(RelationalNormalizerConfig, norm_config.get("config", {})) - @staticmethod - def _validate_normalizer_config(schema: Schema, config: RelationalNormalizerConfig) -> None: + @classmethod + def _validate_normalizer_config( + cls, schema: Schema, config: RelationalNormalizerConfig + ) -> None: """Normalizes all known column identifiers according to the schema and then validates the configuration""" def _normalize_prop( @@ -397,7 +433,7 @@ def _normalize_prop( ) validate_dict( - RelationalNormalizerConfig, + cls.RELATIONAL_CONFIG_TYPE, config, "./normalizers/json/config", validator_f=column_name_validator(schema.naming), diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index 276bbe9c09..f2d75638fe 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -451,10 +451,12 @@ def drop_tables( ) -> List[TTableSchema]: """Drops tables from the schema and returns the dropped tables""" result = [] + # TODO: make sure all nested tables to table_names are also dropped for table_name in table_names: table = self.get_table(table_name) if table and (not seen_data_only or utils.has_table_seen_data(table)): result.append(self._schema_tables.pop(table_name)) + self.data_item_normalizer.remove_table(table_name) return result def filter_row_with_hint( diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index 25c3a0dbae..c062a74920 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -87,7 +87,12 @@ def choose_schema() -> Schema: schema_ = schema # take pipeline schema to make newest version visible to the resources elif pipeline.default_schema_name: - schema_ = pipeline.schemas[pipeline.default_schema_name].clone() + # clones with name which will drop previous hashes + schema_ = pipeline.schemas[pipeline.default_schema_name].clone( + with_name=pipeline.default_schema_name + ) + # delete data tables + schema_.drop_tables(schema_.data_table_names(include_incomplete=True)) else: schema_ = pipeline._make_schema_with_default_name() return schema_ diff --git a/docs/tools/check_embedded_snippets.py b/docs/tools/check_embedded_snippets.py index e8399fce6e..b917cafee1 100644 --- a/docs/tools/check_embedded_snippets.py +++ b/docs/tools/check_embedded_snippets.py @@ -21,7 +21,7 @@ SNIPPET_MARKER = "```" -ALLOWED_LANGUAGES = ["py", "toml", "json", "yaml", "text", "sh", "bat", "sql"] +ALLOWED_LANGUAGES = ["py", "toml", "json", "yaml", "text", "sh", "bat", "sql", "hcl"] LINT_TEMPLATE = "./lint_setup/template.py" LINT_FILE = "./lint_setup/lint_me.py" @@ -163,8 +163,11 @@ def parse_snippets(snippets: List[Snippet], verbose: bool) -> None: json.loads(snippet.code) elif snippet.language == "yaml": yaml.safe_load(snippet.code) - # ignore text and sh scripts - elif snippet.language in ["text", "sh", "bat", "sql"]: + elif snippet.language == "hcl": + # TODO: implement hcl parsers + pass + # ignore all other scripts + elif snippet.language in ALLOWED_LANGUAGES: pass else: raise ValueError(f"Unknown language {snippet.language}") diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md index c532f6d357..954c1fb493 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md @@ -256,3 +256,24 @@ SOURCES__SQL_DATABASE__CHUNK_SIZE=1000 SOURCES__SQL_DATABASE__CHAT_MESSAGE__INCREMENTAL__CURSOR_PATH=updated_at ``` +### Configure many sources side by side with custom sections +`dlt` allows you to rename any source to place the source configuration into custom section or to have many instances +of the source created side by side. For example: +```py +from dlt.sources.sql_database import sql_database + +my_db = sql_database.with_args(name="my_db", section="my_db")(table_names=["chat_message"]) +print(my_db.name) +``` +Here we create a renamed version of the `sql_database` and then instantiate it. Such source will read +credentials from: +```toml +[sources.my_db] +credentials="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server" +schema="data" +backend="pandas" +chunk_size=1000 + +[sources.my_db.chat_message.incremental] +cursor_path="updated_at" +``` diff --git a/docs/website/docs/general-usage/source.md b/docs/website/docs/general-usage/source.md index a5f1f04dee..87c07a3e44 100644 --- a/docs/website/docs/general-usage/source.md +++ b/docs/website/docs/general-usage/source.md @@ -52,7 +52,6 @@ Do not extract data in the source function. Leave that task to your resources if If this is impractical (for example, you want to reflect a database to create resources for tables), make sure you do not call the source function too often. [See this note if you plan to deploy on Airflow](../walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer.md#2-modify-dag-file) - ## Customize sources ### Access and select resources to load @@ -114,6 +113,22 @@ Note that `add_limit` **does not limit the number of records** but rather the "n Find more on sampling data [here](resource.md#sample-from-large-data). +### Rename the source +`dlt` allows you to rename the source ie. to place the source configuration into custom section or to have many instances +of the source created side by side. For example: +```py +from dlt.sources.sql_database import sql_database + +my_db = sql_database.with_args(name="my_db", section="my_db")(table_names=["table_1"]) +print(my_db.name) +``` +Here we create a renamed version of the `sql_database` and then instantiate it. Such source will read +credentials from: +```toml +[sources.my_db.my_db.credentials] +password="..." +``` + ### Add more resources to existing source You can add a custom resource to a source after it was created. Imagine that you want to score all the deals with a keras model that will tell you if the deal is a fraud or not. In order to do that, you declare a new [transformer that takes the data from](resource.md#feeding-data-from-one-resource-into-another) `deals` resource and add it to the source. diff --git a/docs/website/docs/reference/performance.md b/docs/website/docs/reference/performance.md index 1e58080200..0f536fa786 100644 --- a/docs/website/docs/reference/performance.md +++ b/docs/website/docs/reference/performance.md @@ -265,3 +265,29 @@ DLT_USE_JSON=simplejson Instead of using Python Requests directly, you can use the built-in [requests wrapper](../general-usage/http/requests) or [`RESTClient`](../general-usage/http/rest-client) for API calls. This will make your pipeline more resilient to intermittent network errors and other random glitches. + +## Keep pipeline working folder in a bucket on constrained environments. +`dlt` stores extracted data in load packages in order to load them atomically. In case you extract a lot of data at once (ie. backfill) or +your runtime env has constrained local storage (ie. cloud functions) you can keep your data on a bucket by using [FUSE](https://github.com/libfuse/libfuse) or +any other option which your cloud provider supplies. + +`dlt` users rename when saving files and "committing" packages (folder rename). Those may be not supported on bucket filesystems. Often +`rename` is translated into `copy` automatically. In other cases `dlt` will fallback to copy itself. + +In case of cloud function and gs bucket mounts, increasing the rename limit for folders is possible: +```hcl +volume_mounts { + mount_path = "/usr/src/ingestion/pipeline_storage" + name = "pipeline_bucket" + } +volumes { + name = "pipeline_bucket" + gcs { + bucket = google_storage_bucket.dlt_pipeline_data_bucket.name + read_only = false + mount_options = [ + "rename-dir-limit=100000" + ] + } +} +``` diff --git a/tests/common/normalizers/test_json_relational.py b/tests/common/normalizers/test_json_relational.py index 35bc80add2..c35ecdef7f 100644 --- a/tests/common/normalizers/test_json_relational.py +++ b/tests/common/normalizers/test_json_relational.py @@ -880,6 +880,35 @@ def test_propagation_update_on_table_change(norm: RelationalNormalizer): "table_3" ] == {"_dlt_id": "_dlt_root_id", "prop1": "prop2"} + # force propagation when table has nested table that needs root_key + # also use custom name for row_key + table_4 = new_table( + "table_4", write_disposition="replace", columns=[{"name": "primary_key", "row_key": True}] + ) + table_4_nested = new_table( + "table_4__nested", + parent_table_name="table_4", + columns=[{"name": "_dlt_root_id", "root_key": True}], + ) + # must add table_4 first + norm.schema.update_table(table_4) + norm.schema.update_table(table_4_nested) + # row key table_4 not propagated because it was added before nested that needs that + # TODO: maybe fix it + assert ( + "table_4" not in norm.schema._normalizers_config["json"]["config"]["propagation"]["tables"] + ) + norm.schema.update_table(table_4) + # also custom key was used + assert norm.schema._normalizers_config["json"]["config"]["propagation"]["tables"][ + "table_4" + ] == {"primary_key": "_dlt_root_id"} + # drop table from schema + norm.schema.drop_tables(["table_4"]) + assert ( + "table_4" not in norm.schema._normalizers_config["json"]["config"]["propagation"]["tables"] + ) + def test_caching_perf(norm: RelationalNormalizer) -> None: from time import time @@ -893,6 +922,10 @@ def test_caching_perf(norm: RelationalNormalizer) -> None: print(f"{time() - start}") +def test_extend_table(norm: RelationalNormalizer) -> None: + pass + + def set_max_nesting(norm: RelationalNormalizer, max_nesting: int) -> None: RelationalNormalizer.update_normalizer_config(norm.schema, {"max_nesting": max_nesting}) norm._reset() diff --git a/tests/helpers/airflow_tests/test_airflow_provider.py b/tests/helpers/airflow_tests/test_airflow_provider.py index 43fb23e48a..2a8e46e2c8 100644 --- a/tests/helpers/airflow_tests/test_airflow_provider.py +++ b/tests/helpers/airflow_tests/test_airflow_provider.py @@ -1,3 +1,7 @@ +import pytest + +pytest.importorskip("airflow") + from airflow import DAG from airflow.decorators import task, dag from airflow.operators.python import PythonOperator diff --git a/tests/helpers/airflow_tests/test_airflow_wrapper.py b/tests/helpers/airflow_tests/test_airflow_wrapper.py index 69e48733e3..06603ffcec 100644 --- a/tests/helpers/airflow_tests/test_airflow_wrapper.py +++ b/tests/helpers/airflow_tests/test_airflow_wrapper.py @@ -2,6 +2,8 @@ import pytest from unittest import mock from typing import Iterator, List + +pytest.importorskip("airflow") from airflow import DAG from airflow.decorators import dag from airflow.operators.python import PythonOperator, get_current_context diff --git a/tests/helpers/airflow_tests/test_join_airflow_scheduler.py b/tests/helpers/airflow_tests/test_join_airflow_scheduler.py index d737f254e3..503aa62359 100644 --- a/tests/helpers/airflow_tests/test_join_airflow_scheduler.py +++ b/tests/helpers/airflow_tests/test_join_airflow_scheduler.py @@ -1,5 +1,8 @@ +import pytest import datetime from pendulum.tz import UTC + +pytest.importorskip("airflow") from airflow import DAG from airflow.decorators import dag, task from airflow.models import DagRun diff --git a/tests/helpers/airflow_tests/utils.py b/tests/helpers/airflow_tests/utils.py index a98ad4333a..4c1482a2ef 100644 --- a/tests/helpers/airflow_tests/utils.py +++ b/tests/helpers/airflow_tests/utils.py @@ -2,9 +2,6 @@ import os import argparse import pytest -from airflow.cli.commands.db_command import resetdb -from airflow.configuration import conf -from airflow.models.variable import Variable from dlt.common.configuration.container import Container from dlt.common.configuration.specs import PluggableRunContext @@ -19,6 +16,8 @@ @pytest.fixture(scope="function", autouse=True) def initialize_airflow_db(): + from airflow.models.variable import Variable + setup_airflow() # backup context providers providers = Container()[PluggableRunContext].providers @@ -35,6 +34,9 @@ def initialize_airflow_db(): def setup_airflow() -> None: + from airflow.cli.commands.db_command import resetdb + from airflow.configuration import conf + # Disable loading examples try: conf.add_section("core") diff --git a/tests/load/pipeline/test_drop.py b/tests/load/pipeline/test_drop.py index 0e44c754e7..330f2606ff 100644 --- a/tests/load/pipeline/test_drop.py +++ b/tests/load/pipeline/test_drop.py @@ -27,13 +27,17 @@ def _attach(pipeline: Pipeline) -> Pipeline: @dlt.source(section="droppable", name="droppable") -def droppable_source() -> List[DltResource]: +def droppable_source(drop_columns: bool = False) -> List[DltResource]: @dlt.resource def droppable_a( - a: dlt.sources.incremental[int] = dlt.sources.incremental("a", 0) + a: dlt.sources.incremental[int] = dlt.sources.incremental("a", 0, range_start="open") ) -> Iterator[Dict[str, Any]]: - yield dict(a=1, b=2, c=3) - yield dict(a=4, b=23, c=24) + if drop_columns: + yield dict(a=1, b=2) + yield dict(a=4, b=23) + else: + yield dict(a=1, b=2, c=3) + yield dict(a=4, b=23, c=24) @dlt.resource def droppable_b( @@ -47,9 +51,17 @@ def droppable_c( qe: dlt.sources.incremental[int] = dlt.sources.incremental("qe"), ) -> Iterator[Dict[str, Any]]: # Grandchild table - yield dict( - asdasd=2424, qe=111, items=[dict(k=2, r=2, labels=[dict(name="abc"), dict(name="www")])] - ) + if drop_columns: + # dropped asdasd, items[r], items.labels.value + yield dict(qe=111, items=[dict(k=2, labels=[dict(name="abc"), dict(name="www")])]) + else: + yield dict( + asdasd=2424, + qe=111, + items=[ + dict(k=2, r=2, labels=[dict(name="abc", value=1), dict(name="www", value=2)]) + ], + ) @dlt.resource def droppable_d( @@ -134,11 +146,17 @@ def assert_destination_state_loaded(pipeline: Pipeline) -> None: ), ids=lambda x: x.name, ) -def test_drop_command_resources_and_state(destination_config: DestinationTestConfiguration) -> None: +@pytest.mark.parametrize("in_source", (True, False)) +def test_drop_command_resources_and_state( + destination_config: DestinationTestConfiguration, in_source: bool +) -> None: """Test the drop command with resource and state path options and verify correct data is deleted from destination and locally""" - source = droppable_source() - pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True) + source: Any = droppable_source() + if not in_source: + source = list(source.selected_resources.values()) + + pipeline = destination_config.setup_pipeline("droppable", dev_mode=True) info = pipeline.run(source, **destination_config.run_kwargs) assert_load_info(info) assert load_table_counts(pipeline, *pipeline.default_schema.tables.keys()) == { @@ -173,6 +191,9 @@ def test_drop_command_resources_and_state(destination_config: DestinationTestCon assert_destination_state_loaded(pipeline) # now run the same droppable_source to see if tables are recreated and they contain right number of items + source = droppable_source(drop_columns=True) + if not in_source: + source = list(source.selected_resources.values()) info = pipeline.run(source, **destination_config.run_kwargs) assert_load_info(info) # 2 versions (one dropped and replaced with schema with dropped tables, then we added missing tables) @@ -192,6 +213,20 @@ def test_drop_command_resources_and_state(destination_config: DestinationTestCon "droppable_c__items": 1, "droppable_c__items__labels": 2, } + # check if columns got correctly dropped + droppable_a_schema = pipeline.default_schema.get_table("droppable_a") + # this table was not dropped so column still exists + assert "c" in droppable_a_schema["columns"] + # dropped asdasd, items[r], items.labels.value + droppable_c_schema = pipeline.default_schema.get_table("droppable_c") + assert "asdasd" not in droppable_c_schema["columns"] + assert "qe" in droppable_c_schema["columns"] + droppable_c_i_schema = pipeline.default_schema.get_table("droppable_c__items") + assert "r" not in droppable_c_i_schema["columns"] + assert "k" in droppable_c_i_schema["columns"] + droppable_c_l_schema = pipeline.default_schema.get_table("droppable_c__items__labels") + assert "value" not in droppable_c_l_schema["columns"] + assert "name" in droppable_c_l_schema["columns"] @pytest.mark.parametrize( diff --git a/tests/load/pipeline/test_refresh_modes.py b/tests/load/pipeline/test_refresh_modes.py index 86479acd2b..fb88ba915c 100644 --- a/tests/load/pipeline/test_refresh_modes.py +++ b/tests/load/pipeline/test_refresh_modes.py @@ -1,5 +1,5 @@ from typing import Any, List - +import os import pytest import dlt from dlt.common.destination.exceptions import DestinationUndefinedEntity @@ -12,7 +12,7 @@ from dlt.extract.source import DltSource from dlt.pipeline.state_sync import load_pipeline_state_from_destination -from tests.utils import clean_test_storage +from tests.utils import clean_test_storage, TEST_STORAGE_ROOT from tests.pipeline.utils import ( _is_filesystem, assert_load_info, @@ -106,19 +106,40 @@ def some_data_4(): ), ids=lambda x: x.name, ) -def test_refresh_drop_sources(destination_config: DestinationTestConfiguration): - pipeline = destination_config.setup_pipeline("refresh_full_test", refresh="drop_sources") +@pytest.mark.parametrize("in_source", (True, False)) +@pytest.mark.parametrize("with_wipe", (True, False)) +def test_refresh_drop_sources( + destination_config: DestinationTestConfiguration, in_source: bool, with_wipe: bool +): + # do not place duckdb in the working dir, because we may wipe it + os.environ["DESTINATION__DUCKDB__CREDENTIALS"] = os.path.join( + TEST_STORAGE_ROOT, "refresh_source_db.duckdb" + ) + + pipeline = destination_config.setup_pipeline("refresh_source") + + data: Any = refresh_source(first_run=True, drop_sources=True) + if not in_source: + data = list(data.selected_resources.values()) # First run pipeline so destination so tables are created - info = pipeline.run( - refresh_source(first_run=True, drop_sources=True), **destination_config.run_kwargs - ) + info = pipeline.run(data, refresh="drop_sources", **destination_config.run_kwargs) assert_load_info(info) + # Second run of pipeline with only selected resources + if with_wipe: + pipeline._wipe_working_folder() + pipeline = destination_config.setup_pipeline("refresh_source") + + data = refresh_source(first_run=False, drop_sources=True).with_resources( + "some_data_1", "some_data_2" + ) + if not in_source: + data = list(data.selected_resources.values()) + info = pipeline.run( - refresh_source(first_run=False, drop_sources=True).with_resources( - "some_data_1", "some_data_2" - ), + data, + refresh="drop_sources", **destination_config.run_kwargs, ) @@ -199,16 +220,37 @@ def test_existing_schema_hash(destination_config: DestinationTestConfiguration): ), ids=lambda x: x.name, ) -def test_refresh_drop_resources(destination_config: DestinationTestConfiguration): +@pytest.mark.parametrize("in_source", (True, False)) +@pytest.mark.parametrize("with_wipe", (True, False)) +def test_refresh_drop_resources( + destination_config: DestinationTestConfiguration, in_source: bool, with_wipe: bool +): + # do not place duckdb in the working dir, because we may wipe it + os.environ["DESTINATION__DUCKDB__CREDENTIALS"] = os.path.join( + TEST_STORAGE_ROOT, "refresh_source_db.duckdb" + ) # First run pipeline with load to destination so tables are created - pipeline = destination_config.setup_pipeline("refresh_full_test", refresh="drop_tables") + pipeline = destination_config.setup_pipeline("refresh_source") - info = pipeline.run(refresh_source(first_run=True), **destination_config.run_kwargs) + data: Any = refresh_source(first_run=True) + if not in_source: + data = list(data.selected_resources.values()) + + info = pipeline.run(data, refresh="drop_resources", **destination_config.run_kwargs) assert_load_info(info) # Second run of pipeline with only selected resources + if with_wipe: + pipeline._wipe_working_folder() + pipeline = destination_config.setup_pipeline("refresh_source") + + data = refresh_source(first_run=False).with_resources("some_data_1", "some_data_2") + if not in_source: + data = list(data.selected_resources.values()) + info = pipeline.run( - refresh_source(first_run=False).with_resources("some_data_1", "some_data_2"), + data, + refresh="drop_resources", **destination_config.run_kwargs, ) @@ -309,7 +351,9 @@ def test_refresh_drop_data_only(destination_config: DestinationTestConfiguration @pytest.mark.parametrize( "destination_config", - destinations_configs(default_sql_configs=True, subset=["duckdb"]), + destinations_configs( + default_sql_configs=True, local_filesystem_configs=True, subset=["duckdb", "filesystem"] + ), ids=lambda x: x.name, ) def test_refresh_drop_sources_multiple_sources(destination_config: DestinationTestConfiguration): @@ -364,7 +408,6 @@ def source_2_data_2(): **destination_config.run_kwargs, ) assert_load_info(info, 2) - # breakpoint() info = pipeline.run( refresh_source_2(first_run=False).with_resources("source_2_data_1"), **destination_config.run_kwargs, @@ -388,7 +431,7 @@ def source_2_data_2(): result = sorted([(row["id"], row["name"]) for row in data["some_data_1"]]) assert result == [(1, "John"), (2, "Jane")] - # # First table from source2 exists, with only first column + # First table from source2 exists, with only first column data = load_tables_to_dicts(pipeline, "source_2_data_1", schema_name="refresh_source_2") assert_only_table_columns( pipeline, "source_2_data_1", ["product"], schema_name="refresh_source_2" @@ -396,7 +439,7 @@ def source_2_data_2(): result = sorted([row["product"] for row in data["source_2_data_1"]]) assert result == ["orange", "pear"] - # # Second table from source 2 is gone + # Second table from source 2 is gone assert not table_exists(pipeline, "source_2_data_2", schema_name="refresh_source_2") diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index e58db64e5e..b32854b110 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -1566,6 +1566,30 @@ def test_drop() -> None: pipeline.run([1, 2, 3], table_name="numbers") +def test_source_schema_in_resource() -> None: + run_count = 0 + + @dlt.resource + def schema_inspector(): + schema = dlt.current.source_schema() + if run_count == 0: + assert "schema_inspector" not in schema.tables + if run_count == 1: + assert "schema_inspector" in schema.tables + assert schema.tables["schema_inspector"]["columns"]["value"]["x-custom"] == "X" # type: ignore[typeddict-item] + + yield [1, 2, 3] + + pipeline = dlt.pipeline(pipeline_name="test_inspector", destination="duckdb") + pipeline.run(schema_inspector()) + + # add custom annotation + pipeline.default_schema.tables["schema_inspector"]["columns"]["value"]["x-custom"] = "X" # type: ignore[typeddict-unknown-key] + + run_count += 1 + pipeline.run(schema_inspector()) + + def test_schema_version_increase_and_source_update() -> None: now = pendulum.now()