From b351b6d7bd553b4c17ef33341b6e5c78265c431d Mon Sep 17 00:00:00 2001 From: anuunchin <88698977+anuunchin@users.noreply.github.com> Date: Thu, 17 Jul 2025 17:33:10 +0200 Subject: [PATCH 1/7] Initial impl of sync_schema_destructively --- dlt/common/destination/client.py | 31 ++++ dlt/common/libs/deltalake.py | 6 + dlt/common/libs/pyiceberg.py | 5 + dlt/common/schema/schema.py | 7 + .../impl/filesystem/filesystem.py | 20 +++ dlt/destinations/job_client_impl.py | 4 + dlt/pipeline/pipeline.py | 17 +- .../test_sync_schema_destructively.py | 149 ++++++++++++++++++ 8 files changed, 238 insertions(+), 1 deletion(-) create mode 100644 tests/load/pipeline/test_sync_schema_destructively.py diff --git a/dlt/common/destination/client.py b/dlt/common/destination/client.py index c4b6eddf6a..0f7d0c1290 100644 --- a/dlt/common/destination/client.py +++ b/dlt/common/destination/client.py @@ -539,6 +539,37 @@ def update_stored_schema( ) return expected_update + def update_stored_schema_destructively( + self, + ) -> None: + """ + Compare the schema we think we should have (`self.schema`) + with what actually exists in the destination, and drop any + columns that disappeared. + """ + for table in self.schema.data_tables(): + table_name = table["name"] + + actual_columns = self._get_actual_columns(table_name) + schema_columns = self.schema.get_table_columns(table_name) + dropped_columns = set(schema_columns.keys()) - set(actual_columns) + if dropped_columns: + for dropped_col in dropped_columns: + if schema_columns[dropped_col].get("increment"): + logger.warning( + "An incremental field is being removed from schema." + "You should unset the" + " incremental with `incremental=dlt.sources.incremental.EMPTY`" + ) + self.schema.drop_columns(table_name, list(dropped_columns)) + + def _get_actual_columns(self, table_name: str) -> List[str]: # noqa: B027, optional override + """ + Return a list of column names that currently exist in the + destination for `table_name`. + """ + pass + def prepare_load_table(self, table_name: str) -> PreparedTableSchema: """Prepares a table schema to be loaded by filling missing hints and doing other modifications requires by given destination. diff --git a/dlt/common/libs/deltalake.py b/dlt/common/libs/deltalake.py index 7004b09d42..ac5958a4ed 100644 --- a/dlt/common/libs/deltalake.py +++ b/dlt/common/libs/deltalake.py @@ -217,3 +217,9 @@ def evolve_delta_table_schema(delta_table: DeltaTable, arrow_schema: pa.Schema) if new_fields: delta_table.alter.add_columns(new_fields) return delta_table + + +def get_table_columns(table: DeltaTable) -> List[str]: + fields = table.schema().fields + column_names = [field.name for field in fields] + return column_names diff --git a/dlt/common/libs/pyiceberg.py b/dlt/common/libs/pyiceberg.py index e4c0e9bee8..2b5d2929d5 100644 --- a/dlt/common/libs/pyiceberg.py +++ b/dlt/common/libs/pyiceberg.py @@ -248,3 +248,8 @@ def make_location(path: str, config: FilesystemConfiguration) -> str: # pyiceberg cannot deal with windows absolute urls location = location.replace("file:///", "file://") return location + + +def get_table_columns(table: IcebergTable) -> List[str]: + column_names = table.schema().column_names + return column_names diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index d20c755ee3..c9b22b4fd1 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -375,6 +375,13 @@ def drop_tables( self.data_item_normalizer.remove_table(table_name) return result + def drop_columns(self, table_name: str, column_names: Sequence[str]) -> List[TColumnSchema]: + """Drops columns from the table schema and returns the dropped columns""" + result = [] + for col_name in column_names: + result.append(self._schema_tables[table_name]["columns"].pop(col_name)) + return result + def filter_row_with_hint( self, table_name: str, hint_type: TColumnDefaultHint, row: StrAny ) -> StrAny: diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 97d156044c..462202e0a7 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -566,6 +566,26 @@ def update_stored_schema( # externally changed return applied_update + def _get_actual_columns(self, table_name: str) -> List[str]: + """Get actual column names from files in storage for regular (non-delta/iceberg) tables + or column names from schema""" + + if self.is_open_table("iceberg", table_name): + from dlt.common.libs.pyiceberg import get_table_columns as get_iceberg_table_columns + + iceberg_table = self.load_open_table("iceberg", table_name) + return get_iceberg_table_columns(iceberg_table) + + elif self.is_open_table("delta", table_name): + from dlt.common.libs.deltalake import get_table_columns as get_delta_table_columns + + delta_table = self.load_open_table("delta", table_name) + return get_delta_table_columns(delta_table) + + else: + schema_columns = self.schema.get_table_columns(table_name) + return list(schema_columns.keys()) + def prepare_load_table(self, table_name: str) -> PreparedTableSchema: table = super().prepare_load_table(table_name) if self.config.as_staging_destination: diff --git a/dlt/destinations/job_client_impl.py b/dlt/destinations/job_client_impl.py index 7707e5159b..fb670d1682 100644 --- a/dlt/destinations/job_client_impl.py +++ b/dlt/destinations/job_client_impl.py @@ -319,6 +319,10 @@ def update_stored_schema( ) return applied_update + def _get_actual_columns(self, table_name: str) -> List[str]: + actual_columns = list(self.get_storage_table(table_name)[1].keys()) + return actual_columns + def drop_tables(self, *tables: str, delete_schema: bool = True) -> None: """Drop tables in destination database and optionally delete the stored schema as well. Clients that support ddl transactions will have both operations performed in a single transaction. diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 764e9b315e..f22137a22b 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -1022,7 +1022,7 @@ def drop_pending_packages(self, with_partial_loads: bool = True) -> None: @with_schemas_sync def sync_schema(self, schema_name: str = None) -> TSchemaTables: - """Synchronizes the schema `schema_name` with the destination. If no name is provided, the default schema will be synchronized.""" + """Synchronizes the destination with the schema `schema_name`. If no name is provided, the default schema will be synchronized.""" if not schema_name and not self.default_schema_name: raise PipelineConfigMissing( self.pipeline_name, @@ -1037,6 +1037,21 @@ def sync_schema(self, schema_name: str = None) -> TSchemaTables: client.initialize_storage() return client.update_stored_schema() + @with_schemas_sync + def sync_schema_destructively(self, schema_name: str = None) -> None: + """Synchronizes the schema `schema_name` with the destination. If no name is provided, the default schema will be synchronized.""" + if not schema_name and not self.default_schema_name: + raise PipelineConfigMissing( + self.pipeline_name, + "default_schema_name", + "load", + "Pipeline contains no schemas. Please extract any data with `extract` or `run`" + " methods.", + ) + schema = self.schemas[schema_name] if schema_name else self.default_schema + with self._get_destination_clients(schema)[0] as client: + client.update_stored_schema_destructively() + def set_local_state_val(self, key: str, value: Any) -> None: """Sets value in local state. Local state is not synchronized with destination.""" try: diff --git a/tests/load/pipeline/test_sync_schema_destructively.py b/tests/load/pipeline/test_sync_schema_destructively.py new file mode 100644 index 0000000000..e2babf19ce --- /dev/null +++ b/tests/load/pipeline/test_sync_schema_destructively.py @@ -0,0 +1,149 @@ +import json +from typing import cast +from collections.abc import Mapping + +import pytest +import dlt +from dlt.common import pendulum +from dlt.common.utils import uniq_id +from dlt.common.destination.exceptions import DestinationUndefinedEntity +from dlt.destinations.impl.filesystem.filesystem import FilesystemClient +from dlt.load import Load +from dlt.pipeline.pipeline import Pipeline +from dlt.pipeline.state_sync import load_pipeline_state_from_destination, state_resource + +from dlt.common.libs.deltalake import write_deltalake, get_table_columns, deltalake_storage_options +from tests.utils import TEST_STORAGE_ROOT +from tests.cases import JSON_TYPED_DICT, JSON_TYPED_DICT_DECODED +from tests.common.utils import IMPORTED_VERSION_HASH_ETH_V10, yml_case_path as common_yml_case_path +from tests.common.configuration.utils import environment +from tests.pipeline.utils import assert_query_column, assert_load_info +from tests.load.utils import ( + destinations_configs, + DestinationTestConfiguration, +) + + +def _drop_column_in_filesystem( + pipeline: Pipeline, table_name: str, col: str, table_format: str +) -> None: + """Iceberg / Delta live under the Filesystem destination.""" + client = cast(FilesystemClient, pipeline._fs_client()) + if table_format == "iceberg": + tbl = client.load_open_table("iceberg", table_name) + with tbl.update_schema(allow_incompatible_changes=True) as upd: + upd.delete_column(col) + elif table_format == "delta": + delta_tbl = client.load_open_table("delta", table_name) + keep_cols = [c for c in get_table_columns(delta_tbl) if c != col] + arrow_tbl = delta_tbl.to_pyarrow_table(columns=keep_cols) + storage_options = deltalake_storage_options(client.config) + write_deltalake( + table_or_uri=client.get_open_table_location("delta", table_name), + data=arrow_tbl, + mode="overwrite", + schema_mode="overwrite", + storage_options=storage_options, + ) + else: + file_path = client.list_table_files(table_name)[0] + new_lines = [] + with client.fs_client.open(file_path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if line: + data = json.loads(line) + if isinstance(data, dict) and col in data: + del data[col] + new_lines.append(json.dumps(data)) + + # Write back the modified content + with client.fs_client.open(file_path, "w", encoding="utf-8") as f: + for line in new_lines: + f.write(line + "\n") + + +def _drop_column_in_sql( + pipeline: Pipeline, + destination_config: DestinationTestConfiguration, + table_name: str, + col: str, +) -> None: + """All non-filesystem destinations end up here.""" + with pipeline.sql_client() as client: + destination_type, table_format = ( + destination_config.destination_type, + destination_config.table_format, + ) + + if destination_type == "athena" and table_format != "iceberg": + # Athena Hive table need REPLACE COLUMNS syntax + col_defs = [ + f"{client.escape_ddl_identifier('id')} bigint", + f"{client.escape_ddl_identifier('name')} string", + ] + ddl = ( + f"ALTER TABLE {client.make_qualified_ddl_table_name(table_name)} " + f"REPLACE COLUMNS ({','.join(col_defs)})" + ) + elif destination_type == "databricks": + # Enable column-mapping once, then DROP + client.execute_sql( + f"ALTER TABLE {client.make_qualified_table_name(table_name)} " + "SET TBLPROPERTIES(" + "'delta.columnMapping.mode'='name'," + "'delta.minReaderVersion'='2'," + "'delta.minWriterVersion'='5')" + ) + ddl = ( + f"ALTER TABLE {client.make_qualified_table_name(table_name)} " + f"DROP COLUMN {client.escape_column_name(col)}" + ) + else: + qualified = ( + client.make_qualified_ddl_table_name(table_name) + if destination_type == "athena" and table_format == "iceberg" + else client.make_qualified_table_name(table_name) + ) + ddl = f"ALTER TABLE {qualified} DROP COLUMN {client.escape_column_name(col)}" + + client.execute_sql(ddl) + + +@dlt.resource(table_name="my_table") +def my_resource(with_col: bool = True): + row = {"id": 1, "name": "Liuwen"} + if with_col: + row["age"] = 40 + yield row + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + default_staging_configs=True, + default_sql_configs=True, + table_format_filesystem_configs=True, + table_format_local_configs=True, + ), + ids=lambda x: x.name, +) +def test_sync_schema_destructively(destination_config: DestinationTestConfiguration) -> None: + table, col_to_drop = "my_table", "age" + + pipeline = destination_config.setup_pipeline(pipeline_name=f"pipe_{uniq_id()}") + + assert_load_info(pipeline.run(my_resource(), **destination_config.run_kwargs)) + + # simulate "user" manually dropped the column” + if destination_config.destination_type == "filesystem": + _drop_column_in_filesystem(pipeline, table, col_to_drop, destination_config.table_format) + else: + _drop_column_in_sql(pipeline, destination_config, table, col_to_drop) + + # sync schema with destination make sure next run succeeds + pipeline.sync_schema_destructively() + assert_load_info(pipeline.run(my_resource(with_col=False), **destination_config.run_kwargs)) + + # ensure schema doesn't have the dropped column + assert col_to_drop not in pipeline.default_schema.tables[table]["columns"] From 40a4e17aa0adc70448dd0091f5334769ee3e1ac7 Mon Sep 17 00:00:00 2001 From: anuunchin <88698977+anuunchin@users.noreply.github.com> Date: Thu, 24 Jul 2025 13:53:20 +0200 Subject: [PATCH 2/7] Formalising dlt schema sync --- dlt/common/destination/client.py | 161 +++++++++--- dlt/common/schema/__init__.py | 2 + dlt/common/schema/schema.py | 21 +- dlt/common/schema/typing.py | 1 + dlt/destinations/impl/athena/athena.py | 9 +- dlt/destinations/impl/bigquery/bigquery.py | 5 +- .../impl/clickhouse/clickhouse.py | 5 +- .../impl/databricks/databricks.py | 5 +- dlt/destinations/impl/dremio/dremio.py | 3 +- dlt/destinations/impl/duckdb/duck.py | 3 +- .../impl/filesystem/filesystem.py | 107 ++++++-- dlt/destinations/impl/mssql/mssql.py | 3 +- dlt/destinations/impl/postgres/postgres.py | 3 +- dlt/destinations/impl/redshift/redshift.py | 3 +- dlt/destinations/impl/snowflake/snowflake.py | 5 +- .../impl/sqlalchemy/sqlalchemy_job_client.py | 3 +- dlt/destinations/impl/synapse/synapse.py | 1 + dlt/destinations/job_client_impl.py | 29 ++- dlt/pipeline/pipeline.py | 9 +- tests/load/pipeline/test_sync_dlt_schema.py | 229 ++++++++++++++++++ .../test_sync_schema_destructively.py | 149 ------------ 21 files changed, 526 insertions(+), 230 deletions(-) create mode 100644 tests/load/pipeline/test_sync_dlt_schema.py delete mode 100644 tests/load/pipeline/test_sync_schema_destructively.py diff --git a/dlt/common/destination/client.py b/dlt/common/destination/client.py index 0f7d0c1290..44105ebaa7 100644 --- a/dlt/common/destination/client.py +++ b/dlt/common/destination/client.py @@ -33,11 +33,14 @@ from dlt.common.metrics import LoadJobMetrics from dlt.common.normalizers.naming import NamingConvention -from dlt.common.schema import Schema, TSchemaTables +from dlt.common.schema import Schema, TSchemaTables, TSchemaDrop from dlt.common.schema.typing import ( + C_DLT_ID, C_DLT_LOAD_ID, TLoaderReplaceStrategy, TTableFormat, + TTableSchemaColumns, + TPartialTableSchema, ) from dlt.common.destination.capabilities import DestinationCapabilitiesContext from dlt.common.destination.exceptions import ( @@ -539,36 +542,120 @@ def update_stored_schema( ) return expected_update - def update_stored_schema_destructively( + def update_dlt_schema( self, - ) -> None: - """ - Compare the schema we think we should have (`self.schema`) - with what actually exists in the destination, and drop any - columns that disappeared. - """ - for table in self.schema.data_tables(): - table_name = table["name"] - - actual_columns = self._get_actual_columns(table_name) - schema_columns = self.schema.get_table_columns(table_name) - dropped_columns = set(schema_columns.keys()) - set(actual_columns) - if dropped_columns: - for dropped_col in dropped_columns: - if schema_columns[dropped_col].get("increment"): - logger.warning( - "An incremental field is being removed from schema." - "You should unset the" - " incremental with `incremental=dlt.sources.incremental.EMPTY`" - ) - self.schema.drop_columns(table_name, list(dropped_columns)) - - def _get_actual_columns(self, table_name: str) -> List[str]: # noqa: B027, optional override - """ - Return a list of column names that currently exist in the - destination for `table_name`. + table_names: Iterable[str] = None, + dry_run: bool = False, + ) -> Optional[TSchemaDrop]: + """Updates schema to the storage. + + Compare the schema we think we should have (`self.schema`) with what actually exists in the destination, + and drop any tables and/or columns that disappeared. + + Args: + table_names (Iterable[str], optional): Check only listed tables. Defaults to None and checks all tables. + + Returns: + Optional[TSchemaTables]: Returns an update that was applied to the schema. """ - pass + from dlt.destinations.sql_client import WithSqlClient + + if not (isinstance(self, WithTableReflection) and isinstance(self, WithSqlClient)): + raise NotImplementedError + + def _diff_between_actual_and_dlt_schema( + table_name: str, actual_col_names: set[str], disregard_dlt_columns: bool = True + ) -> TPartialTableSchema: + """Returns a partial table schema containing columns that exist in the dlt schema + but are missing from the actual table. Skips dlt internal columns by default. + """ + col_schemas = self.schema.get_table_columns(table_name) + + # Map escaped -> original names (actual_col_names are escaped) + escaped_to_original = { + self.sql_client.escape_column_name(col, quote=False): col + for col in col_schemas.keys() + } + dropped_col_names = set(escaped_to_original.keys()) - actual_col_names + + if not dropped_col_names: + return {} + + partial_table: TPartialTableSchema = {"name": table_name, "columns": {}} + + for esc_name in dropped_col_names: + orig_name = escaped_to_original[esc_name] + + # Athena doesn't have dlt columns in actual columns. Don't drop them anyway. + if disregard_dlt_columns and orig_name in [C_DLT_ID, C_DLT_LOAD_ID]: + continue + + col_schema = col_schemas[orig_name] + if col_schema.get("increment"): + # We can warn within the for loop, + # since there's only one incremental field per table + logger.warning( + f"An incremental field {orig_name} is being removed from schema." + "You should unset the" + " incremental with `incremental=dlt.sources.incremental.EMPTY`" + ) + partial_table["columns"][orig_name] = col_schema + + return partial_table if partial_table["columns"] else {} + + tables = table_names if table_names else self.schema.data_table_names() + + table_drops: TSchemaDrop = {} # includes entire tables to drop + column_drops: TSchemaDrop = {} # includes parts of tables to drop as partial tables + + # 1. Detect what needs to be dropped + for table_name in tables: + _, actual_col_schemas = list(self.get_storage_tables([table_name]))[0] + + # no actual column schemas -> + # table doesn't exist -> + # we take entire table schema as a schema drop + if not actual_col_schemas: + table = self.schema.get_table(table_name) + table_drops[table_name] = table + continue + + # actual column schemas present -> + # we compare actual schemas with dlt ones -> + # we take the difference as a partial table + else: + partial_table = _diff_between_actual_and_dlt_schema( + table_name, + set(actual_col_schemas.keys()), + ) + if partial_table: + column_drops[table_name] = partial_table + + # 2. For entire table drops, we make sure no orphaned tables remain + for table_name in table_drops.copy(): + child_tables = self.schema.get_child_tables(table_name) + orphaned_table_names: List[str] = [] + for child_table in child_tables: + if child_table["name"] not in table_drops: + orphaned_table_names.append(child_table["name"]) + if orphaned_table_names: + table_drops.pop(table_name) + logger.warning( + f"Removing table '{table_name}' from the dlt schema would leave orphan" + f" table(s): {'.'.join(repr(t) for t in orphaned_table_names)}. Drop these" + " child tables in the destination and sync the dlt schema again." + ) + + # 3. If it's not a dry run, we actually drop fromt the dlt schema + if not dry_run: + for table_name in table_drops: + self.schema.tables.pop(table_name) + for table_name, partial_table in column_drops.items(): + col_schemas = partial_table["columns"] + col_names = [col for col in col_schemas] + self.schema.drop_columns(table_name, col_names) + + return {**table_drops, **column_drops} def prepare_load_table(self, table_name: str) -> PreparedTableSchema: """Prepares a table schema to be loaded by filling missing hints and doing other modifications requires by given destination. @@ -639,6 +726,22 @@ def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]: pass +class WithTableReflection(ABC): + @abstractmethod + def get_storage_tables( + self, table_names: Iterable[str] + ) -> Iterable[Tuple[str, TTableSchemaColumns]]: + """Uses INFORMATION_SCHEMA to retrieve table and column information for tables in `table_names` iterator. + Table names should be normalized according to naming convention and will be further converted to desired casing + in order to (in most cases) create case-insensitive name suitable for search in information schema. + + The column names are returned as in information schema. To match those with columns in existing table, you'll need to use + `schema.get_new_table_columns` method and pass the correct casing. Most of the casing function are irreversible so it is not + possible to convert identifiers into INFORMATION SCHEMA back into case sensitive dlt schema. + """ + pass + + class WithStagingDataset(ABC): """Adds capability to use staging dataset and request it from the loader""" diff --git a/dlt/common/schema/__init__.py b/dlt/common/schema/__init__.py index 9cb5e2ab76..1a9f7bf8ed 100644 --- a/dlt/common/schema/__init__.py +++ b/dlt/common/schema/__init__.py @@ -8,6 +8,7 @@ TColumnHint, TColumnSchema, TColumnSchemaBase, + TSchemaDrop, ) from dlt.common.schema.typing import COLUMN_HINTS from dlt.common.schema.schema import Schema, DEFAULT_SCHEMA_CONTRACT_MODE @@ -15,6 +16,7 @@ from dlt.common.schema.utils import verify_schema_hash __all__ = [ + "TSchemaDrop", "TSchemaUpdate", "TSchemaTables", "TTableSchema", diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index c9b22b4fd1..7a3d9a660a 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -375,11 +375,24 @@ def drop_tables( self.data_item_normalizer.remove_table(table_name) return result - def drop_columns(self, table_name: str, column_names: Sequence[str]) -> List[TColumnSchema]: - """Drops columns from the table schema and returns the dropped columns""" + def drop_columns(self, table_name: str, column_names: Sequence[str]) -> TPartialTableSchema: + """Drops columns from the table schema and returns the table schema with the dropped columns""" + table: TPartialTableSchema = {"name": table_name} + dropped_col_schemas: TTableSchemaColumns = {} + + for col in column_names: + col_schema = self._schema_tables[table["name"]]["columns"].pop(col) + dropped_col_schemas[col] = col_schema + + table["columns"] = dropped_col_schemas + return table + + def get_child_tables(self, table_name: str) -> List[TTableSchema]: + """Returns child tables""" result = [] - for col_name in column_names: - result.append(self._schema_tables[table_name]["columns"].pop(col_name)) + for table in self.data_tables(): + if table.get("parent", None) == table_name: + result.append(table) return result def filter_row_with_hint( diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index ad1c8f90c8..83a695408e 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -315,6 +315,7 @@ class TPartialTableSchema(TTableSchema): TSchemaTables = Dict[str, TTableSchema] TSchemaUpdate = Dict[str, List[TPartialTableSchema]] +TSchemaDrop = Dict[str, TPartialTableSchema] TColumnDefaultHint = Literal["not_null", TColumnHint] """Allows using not_null in default hints setting section""" diff --git a/dlt/destinations/impl/athena/athena.py b/dlt/destinations/impl/athena/athena.py index d48ac07bbe..4d7de9aead 100644 --- a/dlt/destinations/impl/athena/athena.py +++ b/dlt/destinations/impl/athena/athena.py @@ -37,7 +37,12 @@ TSortOrder, ) from dlt.common.destination import DestinationCapabilitiesContext, PreparedTableSchema -from dlt.common.destination.client import FollowupJobRequest, SupportsStagingDestination, LoadJob +from dlt.common.destination.client import ( + FollowupJobRequest, + SupportsStagingDestination, + LoadJob, + WithTableReflection, +) from dlt.destinations.sql_jobs import ( SqlStagingCopyFollowupJob, SqlStagingReplaceFollowupJob, @@ -191,7 +196,7 @@ def _parse_and_log_lf_response( logger.debug(f"Success: {verb} LF tags {lf_tags} to " + resource_msg) -class AthenaClient(SqlJobClientWithStagingDataset, SupportsStagingDestination): +class AthenaClient(SqlJobClientWithStagingDataset, SupportsStagingDestination, WithTableReflection): def __init__( self, schema: Schema, diff --git a/dlt/destinations/impl/bigquery/bigquery.py b/dlt/destinations/impl/bigquery/bigquery.py index 412ad72610..6c467c65c9 100644 --- a/dlt/destinations/impl/bigquery/bigquery.py +++ b/dlt/destinations/impl/bigquery/bigquery.py @@ -16,6 +16,7 @@ RunnableLoadJob, SupportsStagingDestination, LoadJob, + WithTableReflection, ) from dlt.common.json import json from dlt.common.runtime.signals import sleep @@ -175,7 +176,9 @@ def gen_key_table_clauses( return sql -class BigQueryClient(SqlJobClientWithStagingDataset, SupportsStagingDestination): +class BigQueryClient( + SqlJobClientWithStagingDataset, SupportsStagingDestination, WithTableReflection +): def __init__( self, schema: Schema, diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index aadaf7a067..650d1de3d2 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -21,6 +21,7 @@ RunnableLoadJob, FollowupJobRequest, LoadJob, + WithTableReflection, ) from dlt.common.schema import Schema, TColumnSchema from dlt.common.schema.typing import ( @@ -210,7 +211,9 @@ def requires_temp_table_for_delete(cls) -> bool: return True -class ClickHouseClient(SqlJobClientWithStagingDataset, SupportsStagingDestination): +class ClickHouseClient( + SqlJobClientWithStagingDataset, SupportsStagingDestination, WithTableReflection +): def __init__( self, schema: Schema, diff --git a/dlt/destinations/impl/databricks/databricks.py b/dlt/destinations/impl/databricks/databricks.py index 7740bdd0b6..bc4303ceef 100644 --- a/dlt/destinations/impl/databricks/databricks.py +++ b/dlt/destinations/impl/databricks/databricks.py @@ -13,6 +13,7 @@ RunnableLoadJob, SupportsStagingDestination, LoadJob, + WithTableReflection, ) from dlt.common.configuration.specs import ( AwsCredentialsWithoutDefaults, @@ -291,7 +292,9 @@ def gen_delete_from_sql( """ -class DatabricksClient(SqlJobClientWithStagingDataset, SupportsStagingDestination): +class DatabricksClient( + SqlJobClientWithStagingDataset, SupportsStagingDestination, WithTableReflection +): def __init__( self, schema: Schema, diff --git a/dlt/destinations/impl/dremio/dremio.py b/dlt/destinations/impl/dremio/dremio.py index b18ad4a812..ce4c1f5908 100644 --- a/dlt/destinations/impl/dremio/dremio.py +++ b/dlt/destinations/impl/dremio/dremio.py @@ -10,6 +10,7 @@ SupportsStagingDestination, FollowupJobRequest, LoadJob, + WithTableReflection, ) from dlt.common.schema import TColumnSchema, Schema from dlt.common.schema.typing import TColumnType, TTableFormat @@ -97,7 +98,7 @@ def run(self) -> None: """) -class DremioClient(SqlJobClientWithStagingDataset, SupportsStagingDestination): +class DremioClient(SqlJobClientWithStagingDataset, SupportsStagingDestination, WithTableReflection): def __init__( self, schema: Schema, diff --git a/dlt/destinations/impl/duckdb/duck.py b/dlt/destinations/impl/duckdb/duck.py index ee3ea6601b..5e9584e46a 100644 --- a/dlt/destinations/impl/duckdb/duck.py +++ b/dlt/destinations/impl/duckdb/duck.py @@ -9,6 +9,7 @@ RunnableLoadJob, HasFollowupJobs, LoadJob, + WithTableReflection, ) from dlt.common.schema.typing import TColumnSchema, TColumnType, TTableFormat from dlt.common.schema.utils import has_default_column_prop_value @@ -52,7 +53,7 @@ def run(self) -> None: ) -class DuckDbClient(InsertValuesJobClient): +class DuckDbClient(InsertValuesJobClient, WithTableReflection): def __init__( self, schema: Schema, diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 462202e0a7..f7bb2846e9 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -23,10 +23,13 @@ from dlt.common.metrics import LoadJobMetrics from dlt.common.schema.exceptions import TableNotFound from dlt.common.schema.typing import ( + C_DLT_ID, C_DLT_LOAD_ID, C_DLT_LOADS_TABLE_LOAD_ID, TTableFormat, TTableSchemaColumns, + TSchemaDrop, + TPartialTableSchema, ) from dlt.common.storages.exceptions import ( CurrentLoadPackageStateNotAvailable, @@ -60,6 +63,7 @@ StorageSchemaInfo, StateInfo, LoadJob, + WithTableReflection, ) from dlt.common.destination.exceptions import ( DestinationUndefinedEntity, @@ -279,6 +283,7 @@ class FilesystemClient( WithStagingDataset, WithStateSync, SupportsOpenTables, + WithTableReflection, ): fs_client: AbstractFileSystem # a path (without the scheme) to a location in the bucket where dataset is present @@ -468,7 +473,8 @@ def drop_tables(self, *tables: str, delete_schema: bool = True) -> None: def get_storage_tables( self, table_names: Iterable[str] ) -> Iterable[Tuple[str, TTableSchemaColumns]]: - """Yields tables that have files in storage, returns columns from current schema""" + """Yields tables that have files in storage, returns columns from files in storage for regular delta/iceberg tables, + or from schema for regular tables without table format""" for table_name in table_names: table_dir = self.get_table_dir(table_name) if ( @@ -478,7 +484,40 @@ def get_storage_tables( and len(self.list_table_files(table_name)) > 0 ): if table_name in self.schema.tables: - yield (table_name, self.schema.get_table_columns(table_name)) + # If it's an open table, only actually exsiting columns + if self.is_open_table("iceberg", table_name): + from dlt.common.libs.pyiceberg import ( + get_table_columns as get_iceberg_table_columns, + ) + + iceberg_table = self.load_open_table("iceberg", table_name) + actual_column_names = get_iceberg_table_columns(iceberg_table) + + col_schemas = { + col: schema + for col, schema in self.schema.get_table_columns(table_name).items() + if col in actual_column_names + } + yield (table_name, col_schemas) + + elif self.is_open_table("delta", table_name): + from dlt.common.libs.deltalake import ( + get_table_columns as get_delta_table_columns, + ) + + delta_table = self.load_open_table("delta", table_name) + actual_column_names = get_delta_table_columns(delta_table) + + col_schemas = { + col: schema + for col, schema in self.schema.get_table_columns(table_name).items() + if col in actual_column_names + } + yield (table_name, col_schemas) + + else: + yield (table_name, self.schema.get_table_columns(table_name)) + else: yield (table_name, {"_column": {}}) else: @@ -538,6 +577,45 @@ def verify_schema( raise exceptions[0] return loaded_tables + def _diff_between_actual_and_dlt_schema( + self, table_name: str, actual_col_names: set[str], disregard_dlt_columns: bool = True + ) -> TPartialTableSchema: + """Returns a partial table schema containing columns that exist in the dlt schema + but are missing from the actual table. Skips dlt internal columns by default. + """ + col_schemas = self.schema.get_table_columns(table_name) + + # Map escaped -> original names (actual_col_names are escaped) + escaped_to_original = { + self.sql_client.escape_column_name(col, quote=False): col for col in col_schemas.keys() + } + dropped_col_names = set(escaped_to_original.keys()) - actual_col_names + + if not dropped_col_names: + return {} + + partial_table: TPartialTableSchema = {"name": table_name, "columns": {}} + + for esc_name in dropped_col_names: + orig_name = escaped_to_original[esc_name] + + # Athena doesn't have dlt columns in actual columns. Don't drop them anyway. + if disregard_dlt_columns and orig_name in [C_DLT_ID, C_DLT_LOAD_ID]: + continue + + col_schema = col_schemas[orig_name] + if col_schema.get("increment"): + # We can warn within the for loop, + # since there's only one incremental field per table + logger.warning( + f"An incremental field {orig_name} is being removed from schema." + "You should unset the" + " incremental with `incremental=dlt.sources.incremental.EMPTY`" + ) + partial_table["columns"][orig_name] = col_schema + + return partial_table if partial_table["columns"] else {} + def update_stored_schema( self, only_tables: Iterable[str] = None, @@ -566,25 +644,12 @@ def update_stored_schema( # externally changed return applied_update - def _get_actual_columns(self, table_name: str) -> List[str]: - """Get actual column names from files in storage for regular (non-delta/iceberg) tables - or column names from schema""" - - if self.is_open_table("iceberg", table_name): - from dlt.common.libs.pyiceberg import get_table_columns as get_iceberg_table_columns - - iceberg_table = self.load_open_table("iceberg", table_name) - return get_iceberg_table_columns(iceberg_table) - - elif self.is_open_table("delta", table_name): - from dlt.common.libs.deltalake import get_table_columns as get_delta_table_columns - - delta_table = self.load_open_table("delta", table_name) - return get_delta_table_columns(delta_table) - - else: - schema_columns = self.schema.get_table_columns(table_name) - return list(schema_columns.keys()) + def update_dlt_schema( + self, + table_names: Iterable[str] = None, + dry_run: bool = False, + ) -> Optional[TSchemaDrop]: + return super().update_dlt_schema(table_names, dry_run) def prepare_load_table(self, table_name: str) -> PreparedTableSchema: table = super().prepare_load_table(table_name) diff --git a/dlt/destinations/impl/mssql/mssql.py b/dlt/destinations/impl/mssql/mssql.py index 5f190da561..3710f7e5ff 100644 --- a/dlt/destinations/impl/mssql/mssql.py +++ b/dlt/destinations/impl/mssql/mssql.py @@ -3,6 +3,7 @@ from dlt.common.destination.client import ( FollowupJobRequest, PreparedTableSchema, + WithTableReflection, ) from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.schema import TColumnSchema, TColumnHint, Schema @@ -76,7 +77,7 @@ def _new_temp_table_name(cls, table_name: str, op: str, sql_client: SqlClientBas return SqlMergeFollowupJob._new_temp_table_name("#" + table_name, op, sql_client) -class MsSqlJobClient(InsertValuesJobClient): +class MsSqlJobClient(InsertValuesJobClient, WithTableReflection): def __init__( self, schema: Schema, diff --git a/dlt/destinations/impl/postgres/postgres.py b/dlt/destinations/impl/postgres/postgres.py index 5105f51cfd..57192a572e 100644 --- a/dlt/destinations/impl/postgres/postgres.py +++ b/dlt/destinations/impl/postgres/postgres.py @@ -12,6 +12,7 @@ RunnableLoadJob, FollowupJobRequest, LoadJob, + WithTableReflection, ) from dlt.common.schema import TColumnSchema, TColumnHint, Schema from dlt.common.schema.typing import TColumnType @@ -167,7 +168,7 @@ def run(self) -> None: cursor.copy_expert(copy_sql, f, size=8192) -class PostgresClient(InsertValuesJobClient): +class PostgresClient(InsertValuesJobClient, WithTableReflection): def __init__( self, schema: Schema, diff --git a/dlt/destinations/impl/redshift/redshift.py b/dlt/destinations/impl/redshift/redshift.py index 28a0fcf999..393a5a92dd 100644 --- a/dlt/destinations/impl/redshift/redshift.py +++ b/dlt/destinations/impl/redshift/redshift.py @@ -19,6 +19,7 @@ PreparedTableSchema, SupportsStagingDestination, LoadJob, + WithTableReflection, ) from dlt.common.destination.capabilities import DestinationCapabilitiesContext from dlt.common.schema import TColumnSchema, TColumnHint, Schema @@ -160,7 +161,7 @@ def gen_key_table_clauses( ) -class RedshiftClient(InsertValuesJobClient, SupportsStagingDestination): +class RedshiftClient(InsertValuesJobClient, SupportsStagingDestination, WithTableReflection): def __init__( self, schema: Schema, diff --git a/dlt/destinations/impl/snowflake/snowflake.py b/dlt/destinations/impl/snowflake/snowflake.py index b50b6def64..5ecd02c135 100644 --- a/dlt/destinations/impl/snowflake/snowflake.py +++ b/dlt/destinations/impl/snowflake/snowflake.py @@ -10,6 +10,7 @@ RunnableLoadJob, CredentialsConfiguration, SupportsStagingDestination, + WithTableReflection, ) from dlt.common.schema.utils import get_columns_names_with_prop from dlt.common.storages.file_storage import FileStorage @@ -116,7 +117,9 @@ def run(self) -> None: self._sql_client.execute_sql(f"REMOVE {stage_file_path}") -class SnowflakeClient(SqlJobClientWithStagingDataset, SupportsStagingDestination): +class SnowflakeClient( + SqlJobClientWithStagingDataset, SupportsStagingDestination, WithTableReflection +): def __init__( self, schema: Schema, diff --git a/dlt/destinations/impl/sqlalchemy/sqlalchemy_job_client.py b/dlt/destinations/impl/sqlalchemy/sqlalchemy_job_client.py index 1e6c3adcdd..7d8e9cbf4e 100644 --- a/dlt/destinations/impl/sqlalchemy/sqlalchemy_job_client.py +++ b/dlt/destinations/impl/sqlalchemy/sqlalchemy_job_client.py @@ -13,6 +13,7 @@ StateInfo, PreparedTableSchema, FollowupJobRequest, + WithTableReflection, ) from dlt.destinations.job_client_impl import SqlJobClientWithStagingDataset, SqlLoadJob from dlt.common.destination.capabilities import DestinationCapabilitiesContext @@ -41,7 +42,7 @@ ) -class SqlalchemyJobClient(SqlJobClientWithStagingDataset): +class SqlalchemyJobClient(SqlJobClientWithStagingDataset, WithTableReflection): sql_client: SqlalchemyClient # type: ignore[assignment] def __init__( diff --git a/dlt/destinations/impl/synapse/synapse.py b/dlt/destinations/impl/synapse/synapse.py index 0f46050d2f..2c807458cf 100644 --- a/dlt/destinations/impl/synapse/synapse.py +++ b/dlt/destinations/impl/synapse/synapse.py @@ -10,6 +10,7 @@ SupportsStagingDestination, FollowupJobRequest, LoadJob, + WithTableReflection, ) from dlt.common.schema import TColumnSchema, Schema, TColumnHint diff --git a/dlt/destinations/job_client_impl.py b/dlt/destinations/job_client_impl.py index fb670d1682..e001428de4 100644 --- a/dlt/destinations/job_client_impl.py +++ b/dlt/destinations/job_client_impl.py @@ -27,6 +27,7 @@ from dlt.common.destination.utils import resolve_replace_strategy from dlt.common.json import json from dlt.common.schema.typing import ( + C_DLT_ID, C_DLT_LOAD_ID, C_DLT_LOADS_TABLE_LOAD_ID, COLUMN_HINTS, @@ -44,7 +45,13 @@ from dlt.common.utils import read_dialect_and_sql from dlt.common.storages import FileStorage from dlt.common.storages.load_package import LoadJobInfo, ParsedLoadJobFileName -from dlt.common.schema import TColumnSchema, Schema, TTableSchemaColumns, TSchemaTables +from dlt.common.schema import ( + TColumnSchema, + Schema, + TTableSchemaColumns, + TSchemaTables, + TSchemaDrop, +) from dlt.common.schema import TColumnHint from dlt.common.destination.client import ( PreparedTableSchema, @@ -60,6 +67,7 @@ JobClientBase, HasFollowupJobs, CredentialsConfiguration, + WithTableReflection, ) from dlt.destinations.exceptions import DatabaseUndefinedRelation @@ -240,7 +248,7 @@ def __init__( self._bucket_path = ReferenceFollowupJobRequest.resolve_reference(file_path) -class SqlJobClientBase(WithSqlClient, JobClientBase, WithStateSync): +class SqlJobClientBase(WithSqlClient, JobClientBase, WithStateSync, WithTableReflection): def __init__( self, schema: Schema, @@ -319,9 +327,12 @@ def update_stored_schema( ) return applied_update - def _get_actual_columns(self, table_name: str) -> List[str]: - actual_columns = list(self.get_storage_table(table_name)[1].keys()) - return actual_columns + def update_dlt_schema( + self, + table_names: Iterable[str] = None, + dry_run: bool = False, + ) -> Optional[TSchemaDrop]: + return super().update_dlt_schema() def drop_tables(self, *tables: str, delete_schema: bool = True) -> None: """Drop tables in destination database and optionally delete the stored schema as well. @@ -436,14 +447,6 @@ def __exit__( def get_storage_tables( self, table_names: Iterable[str] ) -> Iterable[Tuple[str, TTableSchemaColumns]]: - """Uses INFORMATION_SCHEMA to retrieve table and column information for tables in `table_names` iterator. - Table names should be normalized according to naming convention and will be further converted to desired casing - in order to (in most cases) create case-insensitive name suitable for search in information schema. - - The column names are returned as in information schema. To match those with columns in existing table, you'll need to use - `schema.get_new_table_columns` method and pass the correct casing. Most of the casing function are irreversible so it is not - possible to convert identifiers into INFORMATION SCHEMA back into case sensitive dlt schema. - """ table_names = list(table_names) if len(table_names) == 0: # empty generator diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index f22137a22b..b2e3af9731 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -17,6 +17,7 @@ ContextManager, Union, TYPE_CHECKING, + Iterable, ) import dlt @@ -44,6 +45,7 @@ TWriteDispositionConfig, TAnySchemaColumns, TSchemaContract, + TSchemaDrop, ) from dlt.common.schema.utils import normalize_schema_name from dlt.common.storages.exceptions import LoadPackageNotFound @@ -1038,7 +1040,9 @@ def sync_schema(self, schema_name: str = None) -> TSchemaTables: return client.update_stored_schema() @with_schemas_sync - def sync_schema_destructively(self, schema_name: str = None) -> None: + def sync_dlt_schema( + self, schema_name: str = None, table_names: Iterable[str] = None, dry_run: bool = False + ) -> Optional[TSchemaDrop]: """Synchronizes the schema `schema_name` with the destination. If no name is provided, the default schema will be synchronized.""" if not schema_name and not self.default_schema_name: raise PipelineConfigMissing( @@ -1050,7 +1054,8 @@ def sync_schema_destructively(self, schema_name: str = None) -> None: ) schema = self.schemas[schema_name] if schema_name else self.default_schema with self._get_destination_clients(schema)[0] as client: - client.update_stored_schema_destructively() + client.initialize_storage() + return client.update_dlt_schema(table_names=table_names, dry_run=dry_run) def set_local_state_val(self, key: str, value: Any) -> None: """Sets value in local state. Local state is not synchronized with destination.""" diff --git a/tests/load/pipeline/test_sync_dlt_schema.py b/tests/load/pipeline/test_sync_dlt_schema.py new file mode 100644 index 0000000000..1c65b88c9e --- /dev/null +++ b/tests/load/pipeline/test_sync_dlt_schema.py @@ -0,0 +1,229 @@ +import json +from typing import cast + +import pytest +from pytest_mock import MockerFixture + +import dlt +from dlt.common.utils import uniq_id +from dlt.destinations.impl.filesystem.filesystem import FilesystemClient +from dlt.pipeline.pipeline import Pipeline + +from dlt.common import logger +from tests.pipeline.utils import assert_load_info +from tests.load.utils import ( + destinations_configs, + DestinationTestConfiguration, +) + + +def _drop_column_in_filesystem( + pipeline: Pipeline, table_name: str, col: str, table_format: str +) -> None: + client = cast(FilesystemClient, pipeline._fs_client()) + if table_format == "iceberg": + tbl = client.load_open_table("iceberg", table_name) + with tbl.update_schema(allow_incompatible_changes=True) as upd: + upd.delete_column(col) + elif table_format == "delta": + from dlt.common.libs.deltalake import ( + write_deltalake, + get_table_columns, + deltalake_storage_options, + ) + + delta_tbl = client.load_open_table("delta", table_name) + keep_cols = [c for c in get_table_columns(delta_tbl) if c != col] + arrow_tbl = delta_tbl.to_pyarrow_table(columns=keep_cols) + storage_options = deltalake_storage_options(client.config) + write_deltalake( + table_or_uri=client.get_open_table_location("delta", table_name), + data=arrow_tbl, + mode="overwrite", + schema_mode="overwrite", + storage_options=storage_options, + ) + else: + # We don't simulate removal of a column in a plain filesystem destination, + # because it's unlikely that users do it. + # Additionally, the dlt schema sync doesn't support it. + return + + +def _drop_table_in_filesystem( + pipeline: Pipeline, + destination_config: DestinationTestConfiguration, + table_name: str, +) -> None: + client = cast(FilesystemClient, pipeline._fs_client()) + client.drop_tables(table_name) + + +def _drop_column_in_sql( + pipeline: Pipeline, + destination_config: DestinationTestConfiguration, + table_name: str, + col: str, +) -> None: + """All non-filesystem destinations end up here.""" + with pipeline.sql_client() as client: + destination_type, table_format = ( + destination_config.destination_type, + destination_config.table_format, + ) + + if destination_type == "athena" and table_format != "iceberg": + # Athena Hive table need REPLACE COLUMNS syntax + col_defs = [ + f"{client.escape_ddl_identifier('id')} bigint", + f"{client.escape_ddl_identifier('name')} string", + ] + ddl = ( + f"ALTER TABLE {client.make_qualified_ddl_table_name(table_name)} " + f"REPLACE COLUMNS ({','.join(col_defs)})" + ) + elif destination_type == "databricks": + # Enable column-mapping once, then DROP + client.execute_sql( + f"ALTER TABLE {client.make_qualified_table_name(table_name)} " + "SET TBLPROPERTIES(" + "'delta.columnMapping.mode'='name'," + "'delta.minReaderVersion'='2'," + "'delta.minWriterVersion'='5')" + ) + ddl = ( + f"ALTER TABLE {client.make_qualified_table_name(table_name)} " + f"DROP COLUMN {client.escape_column_name(col)}" + ) + else: + qualitified_tbl_name = ( + client.make_qualified_ddl_table_name(table_name) + if destination_type == "athena" and table_format == "iceberg" + else client.make_qualified_table_name(table_name) + ) + qualified_col_name = ( + client.escape_ddl_identifier(col) + if destination_type == "athena" and table_format == "iceberg" + else client.escape_column_name(col) + ) + ddl = f"ALTER TABLE {qualitified_tbl_name} DROP COLUMN {qualified_col_name}" + + client.execute_sql(ddl) + + +def _drop_table_in_sql( + pipeline: Pipeline, + destination_config: DestinationTestConfiguration, + table_name: str, +) -> None: + """All non-filesystem destinations end up here.""" + with pipeline.sql_client() as client: + destination_type = destination_config.destination_type + qualified = ( + client.make_qualified_ddl_table_name(table_name) + if destination_type == "athena" + else client.make_qualified_table_name(table_name) + ) + if destination_type == "clickhouse": + query = f"DROP TABLE {qualified} SYNC;" + else: + query = f"DROP TABLE {qualified};" + + client.execute_sql(query) + + +@dlt.resource(table_name="my_table") +def my_resource(with_col: bool = True): + row = {"id": 1, "name": "Liuwen"} + if with_col: + row["age"] = 40 + yield row + + +@dlt.resource(table_name="my_other_table") +def my_other_resource(): + row = {"id": 1, "name": "Liuwen", "height": 180} + yield row + + +@dlt.resource(table_name="my_last_table") +def my_last_resource(): + row = { + "id": 1, + "name": "Liuwen", + "children": [{"id": 2, "name": "Dawei"}, {"id": 3, "name": "Xiaoyun"}], + } + yield row + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + default_sql_configs=True, + table_format_filesystem_configs=True, + ), + ids=lambda x: x.name, +) +def test_sync_dlt_schema( + destination_config: DestinationTestConfiguration, mocker: MockerFixture +) -> None: + pipeline = destination_config.setup_pipeline(pipeline_name=f"pipe_{uniq_id()}") + + assert_load_info( + pipeline.run( + [my_resource(), my_other_resource(), my_last_resource()], + **destination_config.run_kwargs, + ) + ) + + # Simulate a scenario where the user manually drops + # 1. a column in a table + # 2. a table with a child table + if destination_config.destination_type == "filesystem": + _drop_column_in_filesystem(pipeline, "my_table", "age", destination_config.table_format) + _drop_table_in_filesystem(pipeline, destination_config, "my_last_table") + else: + _drop_column_in_sql(pipeline, destination_config, "my_table", "age") + _drop_table_in_sql(pipeline, destination_config, "my_last_table") + + # Make sure the warning about orphaned tables is emitted + logger_spy = mocker.spy(logger, "warning") + + schema_drops = pipeline.sync_dlt_schema() + + logger_spy.assert_called() + assert logger_spy.call_count == 1 + expected_warning = ( + "Removing table 'my_last_table' from the dlt schema would leave orphan table(s):" + " 'my_last_table__children'. Drop these child tables in the destination and sync the dlt" + " schema again." + ) + assert expected_warning in logger_spy.call_args_list[0][0][0] + + # Schema drop should only include the "age" column of "my_table" + assert len(schema_drops) == 1 + assert "my_table" in schema_drops + assert len(schema_drops["my_table"]["columns"]) == 1 + assert "age" in schema_drops["my_table"]["columns"] + + # ensure schema doesn't have the "age" column in "my_table" anymore + assert "age" not in pipeline.default_schema.tables["my_table"]["columns"] + # ensure "my_other_table" was NOT dropped from schema + assert "my_last_table" in pipeline.default_schema.tables + # sanity check that the child table is still there + assert "my_last_table__children" in pipeline.default_schema.tables + + # now the user drops the child table as instructed in the warning + if destination_config.destination_type == "filesystem": + _drop_table_in_filesystem(pipeline, destination_config, "my_last_table__children") + else: + _drop_table_in_sql(pipeline, destination_config, "my_last_table__children") + + schema_drops = pipeline.sync_dlt_schema() + # Schema drop should include the "my_last_table" with the child table + assert len(schema_drops) == 2 + assert "my_last_table" in schema_drops + assert "my_last_table__children" in schema_drops + + assert "my_last_table" not in pipeline.default_schema.tables + assert "my_last_table__children" not in pipeline.default_schema.tables diff --git a/tests/load/pipeline/test_sync_schema_destructively.py b/tests/load/pipeline/test_sync_schema_destructively.py deleted file mode 100644 index e2babf19ce..0000000000 --- a/tests/load/pipeline/test_sync_schema_destructively.py +++ /dev/null @@ -1,149 +0,0 @@ -import json -from typing import cast -from collections.abc import Mapping - -import pytest -import dlt -from dlt.common import pendulum -from dlt.common.utils import uniq_id -from dlt.common.destination.exceptions import DestinationUndefinedEntity -from dlt.destinations.impl.filesystem.filesystem import FilesystemClient -from dlt.load import Load -from dlt.pipeline.pipeline import Pipeline -from dlt.pipeline.state_sync import load_pipeline_state_from_destination, state_resource - -from dlt.common.libs.deltalake import write_deltalake, get_table_columns, deltalake_storage_options -from tests.utils import TEST_STORAGE_ROOT -from tests.cases import JSON_TYPED_DICT, JSON_TYPED_DICT_DECODED -from tests.common.utils import IMPORTED_VERSION_HASH_ETH_V10, yml_case_path as common_yml_case_path -from tests.common.configuration.utils import environment -from tests.pipeline.utils import assert_query_column, assert_load_info -from tests.load.utils import ( - destinations_configs, - DestinationTestConfiguration, -) - - -def _drop_column_in_filesystem( - pipeline: Pipeline, table_name: str, col: str, table_format: str -) -> None: - """Iceberg / Delta live under the Filesystem destination.""" - client = cast(FilesystemClient, pipeline._fs_client()) - if table_format == "iceberg": - tbl = client.load_open_table("iceberg", table_name) - with tbl.update_schema(allow_incompatible_changes=True) as upd: - upd.delete_column(col) - elif table_format == "delta": - delta_tbl = client.load_open_table("delta", table_name) - keep_cols = [c for c in get_table_columns(delta_tbl) if c != col] - arrow_tbl = delta_tbl.to_pyarrow_table(columns=keep_cols) - storage_options = deltalake_storage_options(client.config) - write_deltalake( - table_or_uri=client.get_open_table_location("delta", table_name), - data=arrow_tbl, - mode="overwrite", - schema_mode="overwrite", - storage_options=storage_options, - ) - else: - file_path = client.list_table_files(table_name)[0] - new_lines = [] - with client.fs_client.open(file_path, "r", encoding="utf-8") as f: - for line in f: - line = line.strip() - if line: - data = json.loads(line) - if isinstance(data, dict) and col in data: - del data[col] - new_lines.append(json.dumps(data)) - - # Write back the modified content - with client.fs_client.open(file_path, "w", encoding="utf-8") as f: - for line in new_lines: - f.write(line + "\n") - - -def _drop_column_in_sql( - pipeline: Pipeline, - destination_config: DestinationTestConfiguration, - table_name: str, - col: str, -) -> None: - """All non-filesystem destinations end up here.""" - with pipeline.sql_client() as client: - destination_type, table_format = ( - destination_config.destination_type, - destination_config.table_format, - ) - - if destination_type == "athena" and table_format != "iceberg": - # Athena Hive table need REPLACE COLUMNS syntax - col_defs = [ - f"{client.escape_ddl_identifier('id')} bigint", - f"{client.escape_ddl_identifier('name')} string", - ] - ddl = ( - f"ALTER TABLE {client.make_qualified_ddl_table_name(table_name)} " - f"REPLACE COLUMNS ({','.join(col_defs)})" - ) - elif destination_type == "databricks": - # Enable column-mapping once, then DROP - client.execute_sql( - f"ALTER TABLE {client.make_qualified_table_name(table_name)} " - "SET TBLPROPERTIES(" - "'delta.columnMapping.mode'='name'," - "'delta.minReaderVersion'='2'," - "'delta.minWriterVersion'='5')" - ) - ddl = ( - f"ALTER TABLE {client.make_qualified_table_name(table_name)} " - f"DROP COLUMN {client.escape_column_name(col)}" - ) - else: - qualified = ( - client.make_qualified_ddl_table_name(table_name) - if destination_type == "athena" and table_format == "iceberg" - else client.make_qualified_table_name(table_name) - ) - ddl = f"ALTER TABLE {qualified} DROP COLUMN {client.escape_column_name(col)}" - - client.execute_sql(ddl) - - -@dlt.resource(table_name="my_table") -def my_resource(with_col: bool = True): - row = {"id": 1, "name": "Liuwen"} - if with_col: - row["age"] = 40 - yield row - - -@pytest.mark.parametrize( - "destination_config", - destinations_configs( - default_staging_configs=True, - default_sql_configs=True, - table_format_filesystem_configs=True, - table_format_local_configs=True, - ), - ids=lambda x: x.name, -) -def test_sync_schema_destructively(destination_config: DestinationTestConfiguration) -> None: - table, col_to_drop = "my_table", "age" - - pipeline = destination_config.setup_pipeline(pipeline_name=f"pipe_{uniq_id()}") - - assert_load_info(pipeline.run(my_resource(), **destination_config.run_kwargs)) - - # simulate "user" manually dropped the column” - if destination_config.destination_type == "filesystem": - _drop_column_in_filesystem(pipeline, table, col_to_drop, destination_config.table_format) - else: - _drop_column_in_sql(pipeline, destination_config, table, col_to_drop) - - # sync schema with destination make sure next run succeeds - pipeline.sync_schema_destructively() - assert_load_info(pipeline.run(my_resource(with_col=False), **destination_config.run_kwargs)) - - # ensure schema doesn't have the dropped column - assert col_to_drop not in pipeline.default_schema.tables[table]["columns"] From f70077e818bbb1ef4525e0b4d8bcb4e3537b2d26 Mon Sep 17 00:00:00 2001 From: anuunchin <88698977+anuunchin@users.noreply.github.com> Date: Mon, 1 Sep 2025 13:08:21 +0200 Subject: [PATCH 3/7] Unnecessary inheritance removed, functions moved --- dlt/common/destination/client.py | 124 ++--------------- dlt/common/schema/schema.py | 10 +- dlt/destinations/impl/athena/athena.py | 3 +- dlt/destinations/impl/bigquery/bigquery.py | 5 +- .../impl/clickhouse/clickhouse.py | 5 +- .../impl/databricks/databricks.py | 5 +- dlt/destinations/impl/dremio/dremio.py | 3 +- dlt/destinations/impl/duckdb/duck.py | 3 +- .../impl/filesystem/filesystem.py | 5 +- dlt/destinations/impl/mssql/mssql.py | 3 +- dlt/destinations/impl/postgres/postgres.py | 3 +- dlt/destinations/impl/redshift/redshift.py | 3 +- dlt/destinations/impl/snowflake/snowflake.py | 5 +- .../impl/sqlalchemy/sqlalchemy_job_client.py | 3 +- dlt/destinations/impl/synapse/synapse.py | 1 - dlt/destinations/job_client_impl.py | 13 +- dlt/destinations/utils.py | 126 +++++++++++++++++- dlt/pipeline/pipeline.py | 2 +- 18 files changed, 159 insertions(+), 163 deletions(-) diff --git a/dlt/common/destination/client.py b/dlt/common/destination/client.py index 44105ebaa7..905ad51c70 100644 --- a/dlt/common/destination/client.py +++ b/dlt/common/destination/client.py @@ -42,6 +42,7 @@ TTableSchemaColumns, TPartialTableSchema, ) +from dlt.common.schema.utils import get_nested_tables from dlt.common.destination.capabilities import DestinationCapabilitiesContext from dlt.common.destination.exceptions import ( DestinationSchemaTampered, @@ -542,120 +543,13 @@ def update_stored_schema( ) return expected_update - def update_dlt_schema( + @abstractmethod + def update_from_stored_schema( self, table_names: Iterable[str] = None, dry_run: bool = False, ) -> Optional[TSchemaDrop]: - """Updates schema to the storage. - - Compare the schema we think we should have (`self.schema`) with what actually exists in the destination, - and drop any tables and/or columns that disappeared. - - Args: - table_names (Iterable[str], optional): Check only listed tables. Defaults to None and checks all tables. - - Returns: - Optional[TSchemaTables]: Returns an update that was applied to the schema. - """ - from dlt.destinations.sql_client import WithSqlClient - - if not (isinstance(self, WithTableReflection) and isinstance(self, WithSqlClient)): - raise NotImplementedError - - def _diff_between_actual_and_dlt_schema( - table_name: str, actual_col_names: set[str], disregard_dlt_columns: bool = True - ) -> TPartialTableSchema: - """Returns a partial table schema containing columns that exist in the dlt schema - but are missing from the actual table. Skips dlt internal columns by default. - """ - col_schemas = self.schema.get_table_columns(table_name) - - # Map escaped -> original names (actual_col_names are escaped) - escaped_to_original = { - self.sql_client.escape_column_name(col, quote=False): col - for col in col_schemas.keys() - } - dropped_col_names = set(escaped_to_original.keys()) - actual_col_names - - if not dropped_col_names: - return {} - - partial_table: TPartialTableSchema = {"name": table_name, "columns": {}} - - for esc_name in dropped_col_names: - orig_name = escaped_to_original[esc_name] - - # Athena doesn't have dlt columns in actual columns. Don't drop them anyway. - if disregard_dlt_columns and orig_name in [C_DLT_ID, C_DLT_LOAD_ID]: - continue - - col_schema = col_schemas[orig_name] - if col_schema.get("increment"): - # We can warn within the for loop, - # since there's only one incremental field per table - logger.warning( - f"An incremental field {orig_name} is being removed from schema." - "You should unset the" - " incremental with `incremental=dlt.sources.incremental.EMPTY`" - ) - partial_table["columns"][orig_name] = col_schema - - return partial_table if partial_table["columns"] else {} - - tables = table_names if table_names else self.schema.data_table_names() - - table_drops: TSchemaDrop = {} # includes entire tables to drop - column_drops: TSchemaDrop = {} # includes parts of tables to drop as partial tables - - # 1. Detect what needs to be dropped - for table_name in tables: - _, actual_col_schemas = list(self.get_storage_tables([table_name]))[0] - - # no actual column schemas -> - # table doesn't exist -> - # we take entire table schema as a schema drop - if not actual_col_schemas: - table = self.schema.get_table(table_name) - table_drops[table_name] = table - continue - - # actual column schemas present -> - # we compare actual schemas with dlt ones -> - # we take the difference as a partial table - else: - partial_table = _diff_between_actual_and_dlt_schema( - table_name, - set(actual_col_schemas.keys()), - ) - if partial_table: - column_drops[table_name] = partial_table - - # 2. For entire table drops, we make sure no orphaned tables remain - for table_name in table_drops.copy(): - child_tables = self.schema.get_child_tables(table_name) - orphaned_table_names: List[str] = [] - for child_table in child_tables: - if child_table["name"] not in table_drops: - orphaned_table_names.append(child_table["name"]) - if orphaned_table_names: - table_drops.pop(table_name) - logger.warning( - f"Removing table '{table_name}' from the dlt schema would leave orphan" - f" table(s): {'.'.join(repr(t) for t in orphaned_table_names)}. Drop these" - " child tables in the destination and sync the dlt schema again." - ) - - # 3. If it's not a dry run, we actually drop fromt the dlt schema - if not dry_run: - for table_name in table_drops: - self.schema.tables.pop(table_name) - for table_name, partial_table in column_drops.items(): - col_schemas = partial_table["columns"] - col_names = [col for col in col_schemas] - self.schema.drop_columns(table_name, col_names) - - return {**table_drops, **column_drops} + pass def prepare_load_table(self, table_name: str) -> PreparedTableSchema: """Prepares a table schema to be loaded by filling missing hints and doing other modifications requires by given destination. @@ -731,13 +625,11 @@ class WithTableReflection(ABC): def get_storage_tables( self, table_names: Iterable[str] ) -> Iterable[Tuple[str, TTableSchemaColumns]]: - """Uses INFORMATION_SCHEMA to retrieve table and column information for tables in `table_names` iterator. - Table names should be normalized according to naming convention and will be further converted to desired casing - in order to (in most cases) create case-insensitive name suitable for search in information schema. + """Retrieves table and column information for the specified tables. - The column names are returned as in information schema. To match those with columns in existing table, you'll need to use - `schema.get_new_table_columns` method and pass the correct casing. Most of the casing function are irreversible so it is not - possible to convert identifiers into INFORMATION SCHEMA back into case sensitive dlt schema. + Returns an iterator of tuples (table_name, columns_dict) where columns_dict + contains column schemas for existing tables, or is empty for non-existent tables. + Implementations use database introspection (INFORMATION_SCHEMA, table reflection) or file metadata. """ pass diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index 7a3d9a660a..3580b95502 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -376,7 +376,7 @@ def drop_tables( return result def drop_columns(self, table_name: str, column_names: Sequence[str]) -> TPartialTableSchema: - """Drops columns from the table schema and returns the table schema with the dropped columns""" + """Drops columns from the table schema in place and returns the table schema with the dropped columns""" table: TPartialTableSchema = {"name": table_name} dropped_col_schemas: TTableSchemaColumns = {} @@ -387,14 +387,6 @@ def drop_columns(self, table_name: str, column_names: Sequence[str]) -> TPartial table["columns"] = dropped_col_schemas return table - def get_child_tables(self, table_name: str) -> List[TTableSchema]: - """Returns child tables""" - result = [] - for table in self.data_tables(): - if table.get("parent", None) == table_name: - result.append(table) - return result - def filter_row_with_hint( self, table_name: str, hint_type: TColumnDefaultHint, row: StrAny ) -> StrAny: diff --git a/dlt/destinations/impl/athena/athena.py b/dlt/destinations/impl/athena/athena.py index 4d7de9aead..3decb5b3f2 100644 --- a/dlt/destinations/impl/athena/athena.py +++ b/dlt/destinations/impl/athena/athena.py @@ -41,7 +41,6 @@ FollowupJobRequest, SupportsStagingDestination, LoadJob, - WithTableReflection, ) from dlt.destinations.sql_jobs import ( SqlStagingCopyFollowupJob, @@ -196,7 +195,7 @@ def _parse_and_log_lf_response( logger.debug(f"Success: {verb} LF tags {lf_tags} to " + resource_msg) -class AthenaClient(SqlJobClientWithStagingDataset, SupportsStagingDestination, WithTableReflection): +class AthenaClient(SqlJobClientWithStagingDataset, SupportsStagingDestination): def __init__( self, schema: Schema, diff --git a/dlt/destinations/impl/bigquery/bigquery.py b/dlt/destinations/impl/bigquery/bigquery.py index 6c467c65c9..412ad72610 100644 --- a/dlt/destinations/impl/bigquery/bigquery.py +++ b/dlt/destinations/impl/bigquery/bigquery.py @@ -16,7 +16,6 @@ RunnableLoadJob, SupportsStagingDestination, LoadJob, - WithTableReflection, ) from dlt.common.json import json from dlt.common.runtime.signals import sleep @@ -176,9 +175,7 @@ def gen_key_table_clauses( return sql -class BigQueryClient( - SqlJobClientWithStagingDataset, SupportsStagingDestination, WithTableReflection -): +class BigQueryClient(SqlJobClientWithStagingDataset, SupportsStagingDestination): def __init__( self, schema: Schema, diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 650d1de3d2..aadaf7a067 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -21,7 +21,6 @@ RunnableLoadJob, FollowupJobRequest, LoadJob, - WithTableReflection, ) from dlt.common.schema import Schema, TColumnSchema from dlt.common.schema.typing import ( @@ -211,9 +210,7 @@ def requires_temp_table_for_delete(cls) -> bool: return True -class ClickHouseClient( - SqlJobClientWithStagingDataset, SupportsStagingDestination, WithTableReflection -): +class ClickHouseClient(SqlJobClientWithStagingDataset, SupportsStagingDestination): def __init__( self, schema: Schema, diff --git a/dlt/destinations/impl/databricks/databricks.py b/dlt/destinations/impl/databricks/databricks.py index bc4303ceef..7740bdd0b6 100644 --- a/dlt/destinations/impl/databricks/databricks.py +++ b/dlt/destinations/impl/databricks/databricks.py @@ -13,7 +13,6 @@ RunnableLoadJob, SupportsStagingDestination, LoadJob, - WithTableReflection, ) from dlt.common.configuration.specs import ( AwsCredentialsWithoutDefaults, @@ -292,9 +291,7 @@ def gen_delete_from_sql( """ -class DatabricksClient( - SqlJobClientWithStagingDataset, SupportsStagingDestination, WithTableReflection -): +class DatabricksClient(SqlJobClientWithStagingDataset, SupportsStagingDestination): def __init__( self, schema: Schema, diff --git a/dlt/destinations/impl/dremio/dremio.py b/dlt/destinations/impl/dremio/dremio.py index ce4c1f5908..b18ad4a812 100644 --- a/dlt/destinations/impl/dremio/dremio.py +++ b/dlt/destinations/impl/dremio/dremio.py @@ -10,7 +10,6 @@ SupportsStagingDestination, FollowupJobRequest, LoadJob, - WithTableReflection, ) from dlt.common.schema import TColumnSchema, Schema from dlt.common.schema.typing import TColumnType, TTableFormat @@ -98,7 +97,7 @@ def run(self) -> None: """) -class DremioClient(SqlJobClientWithStagingDataset, SupportsStagingDestination, WithTableReflection): +class DremioClient(SqlJobClientWithStagingDataset, SupportsStagingDestination): def __init__( self, schema: Schema, diff --git a/dlt/destinations/impl/duckdb/duck.py b/dlt/destinations/impl/duckdb/duck.py index 5e9584e46a..ee3ea6601b 100644 --- a/dlt/destinations/impl/duckdb/duck.py +++ b/dlt/destinations/impl/duckdb/duck.py @@ -9,7 +9,6 @@ RunnableLoadJob, HasFollowupJobs, LoadJob, - WithTableReflection, ) from dlt.common.schema.typing import TColumnSchema, TColumnType, TTableFormat from dlt.common.schema.utils import has_default_column_prop_value @@ -53,7 +52,7 @@ def run(self) -> None: ) -class DuckDbClient(InsertValuesJobClient, WithTableReflection): +class DuckDbClient(InsertValuesJobClient): def __init__( self, schema: Schema, diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index f7bb2846e9..de298579e8 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -82,6 +82,7 @@ from dlt.destinations.utils import ( verify_schema_merge_disposition, verify_schema_replace_disposition, + update_dlt_schema, ) CURRENT_VERSION: int = 2 @@ -644,12 +645,12 @@ def update_stored_schema( # externally changed return applied_update - def update_dlt_schema( + def update_from_stored_schema( self, table_names: Iterable[str] = None, dry_run: bool = False, ) -> Optional[TSchemaDrop]: - return super().update_dlt_schema(table_names, dry_run) + return update_dlt_schema(self, self.schema, table_names, dry_run) def prepare_load_table(self, table_name: str) -> PreparedTableSchema: table = super().prepare_load_table(table_name) diff --git a/dlt/destinations/impl/mssql/mssql.py b/dlt/destinations/impl/mssql/mssql.py index 3710f7e5ff..5f190da561 100644 --- a/dlt/destinations/impl/mssql/mssql.py +++ b/dlt/destinations/impl/mssql/mssql.py @@ -3,7 +3,6 @@ from dlt.common.destination.client import ( FollowupJobRequest, PreparedTableSchema, - WithTableReflection, ) from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.schema import TColumnSchema, TColumnHint, Schema @@ -77,7 +76,7 @@ def _new_temp_table_name(cls, table_name: str, op: str, sql_client: SqlClientBas return SqlMergeFollowupJob._new_temp_table_name("#" + table_name, op, sql_client) -class MsSqlJobClient(InsertValuesJobClient, WithTableReflection): +class MsSqlJobClient(InsertValuesJobClient): def __init__( self, schema: Schema, diff --git a/dlt/destinations/impl/postgres/postgres.py b/dlt/destinations/impl/postgres/postgres.py index 57192a572e..5105f51cfd 100644 --- a/dlt/destinations/impl/postgres/postgres.py +++ b/dlt/destinations/impl/postgres/postgres.py @@ -12,7 +12,6 @@ RunnableLoadJob, FollowupJobRequest, LoadJob, - WithTableReflection, ) from dlt.common.schema import TColumnSchema, TColumnHint, Schema from dlt.common.schema.typing import TColumnType @@ -168,7 +167,7 @@ def run(self) -> None: cursor.copy_expert(copy_sql, f, size=8192) -class PostgresClient(InsertValuesJobClient, WithTableReflection): +class PostgresClient(InsertValuesJobClient): def __init__( self, schema: Schema, diff --git a/dlt/destinations/impl/redshift/redshift.py b/dlt/destinations/impl/redshift/redshift.py index 393a5a92dd..28a0fcf999 100644 --- a/dlt/destinations/impl/redshift/redshift.py +++ b/dlt/destinations/impl/redshift/redshift.py @@ -19,7 +19,6 @@ PreparedTableSchema, SupportsStagingDestination, LoadJob, - WithTableReflection, ) from dlt.common.destination.capabilities import DestinationCapabilitiesContext from dlt.common.schema import TColumnSchema, TColumnHint, Schema @@ -161,7 +160,7 @@ def gen_key_table_clauses( ) -class RedshiftClient(InsertValuesJobClient, SupportsStagingDestination, WithTableReflection): +class RedshiftClient(InsertValuesJobClient, SupportsStagingDestination): def __init__( self, schema: Schema, diff --git a/dlt/destinations/impl/snowflake/snowflake.py b/dlt/destinations/impl/snowflake/snowflake.py index 5ecd02c135..b50b6def64 100644 --- a/dlt/destinations/impl/snowflake/snowflake.py +++ b/dlt/destinations/impl/snowflake/snowflake.py @@ -10,7 +10,6 @@ RunnableLoadJob, CredentialsConfiguration, SupportsStagingDestination, - WithTableReflection, ) from dlt.common.schema.utils import get_columns_names_with_prop from dlt.common.storages.file_storage import FileStorage @@ -117,9 +116,7 @@ def run(self) -> None: self._sql_client.execute_sql(f"REMOVE {stage_file_path}") -class SnowflakeClient( - SqlJobClientWithStagingDataset, SupportsStagingDestination, WithTableReflection -): +class SnowflakeClient(SqlJobClientWithStagingDataset, SupportsStagingDestination): def __init__( self, schema: Schema, diff --git a/dlt/destinations/impl/sqlalchemy/sqlalchemy_job_client.py b/dlt/destinations/impl/sqlalchemy/sqlalchemy_job_client.py index 7d8e9cbf4e..1e6c3adcdd 100644 --- a/dlt/destinations/impl/sqlalchemy/sqlalchemy_job_client.py +++ b/dlt/destinations/impl/sqlalchemy/sqlalchemy_job_client.py @@ -13,7 +13,6 @@ StateInfo, PreparedTableSchema, FollowupJobRequest, - WithTableReflection, ) from dlt.destinations.job_client_impl import SqlJobClientWithStagingDataset, SqlLoadJob from dlt.common.destination.capabilities import DestinationCapabilitiesContext @@ -42,7 +41,7 @@ ) -class SqlalchemyJobClient(SqlJobClientWithStagingDataset, WithTableReflection): +class SqlalchemyJobClient(SqlJobClientWithStagingDataset): sql_client: SqlalchemyClient # type: ignore[assignment] def __init__( diff --git a/dlt/destinations/impl/synapse/synapse.py b/dlt/destinations/impl/synapse/synapse.py index 2c807458cf..0f46050d2f 100644 --- a/dlt/destinations/impl/synapse/synapse.py +++ b/dlt/destinations/impl/synapse/synapse.py @@ -10,7 +10,6 @@ SupportsStagingDestination, FollowupJobRequest, LoadJob, - WithTableReflection, ) from dlt.common.schema import TColumnSchema, Schema, TColumnHint diff --git a/dlt/destinations/job_client_impl.py b/dlt/destinations/job_client_impl.py index e001428de4..c23b7844ff 100644 --- a/dlt/destinations/job_client_impl.py +++ b/dlt/destinations/job_client_impl.py @@ -82,6 +82,7 @@ info_schema_null_to_bool, verify_schema_merge_disposition, verify_schema_replace_disposition, + update_dlt_schema, ) import sqlglot @@ -327,12 +328,12 @@ def update_stored_schema( ) return applied_update - def update_dlt_schema( + def update_from_stored_schema( self, table_names: Iterable[str] = None, dry_run: bool = False, ) -> Optional[TSchemaDrop]: - return super().update_dlt_schema() + return update_dlt_schema(self, self.schema, table_names, dry_run) def drop_tables(self, *tables: str, delete_schema: bool = True) -> None: """Drop tables in destination database and optionally delete the stored schema as well. @@ -447,6 +448,14 @@ def __exit__( def get_storage_tables( self, table_names: Iterable[str] ) -> Iterable[Tuple[str, TTableSchemaColumns]]: + """Uses INFORMATION_SCHEMA to retrieve table and column information for tables in `table_names` iterator. + Table names should be normalized according to naming convention and will be further converted to desired casing + in order to (in most cases) create case-insensitive name suitable for search in information schema. + + The column names are returned as in information schema. To match those with columns in existing table, you'll need to use + `schema.get_new_table_columns` method and pass the correct casing. Most of the casing function are irreversible so it is not + possible to convert identifiers into INFORMATION SCHEMA back into case sensitive dlt schema. + """ table_names = list(table_names) if len(table_names) == 0: # empty generator diff --git a/dlt/destinations/utils.py b/dlt/destinations/utils.py index d82b0f1812..d247dedc16 100644 --- a/dlt/destinations/utils.py +++ b/dlt/destinations/utils.py @@ -1,18 +1,22 @@ import re -from typing import Any, List, Dict, Type, Optional, Sequence, Tuple, cast +from typing import Any, List, Dict, Type, Optional, Sequence, Tuple, cast, Iterable from dlt.common import logger from dlt.common.destination.capabilities import DestinationCapabilitiesContext from dlt.common.destination.typing import PreparedTableSchema from dlt.common.destination.utils import resolve_merge_strategy, resolve_replace_strategy -from dlt.common.schema import Schema +from dlt.common.destination.client import WithTableReflection +from dlt.common.schema import Schema, TSchemaDrop from dlt.common.schema.exceptions import SchemaCorruptedException from dlt.common.schema.typing import ( MERGE_STRATEGIES, TColumnType, TLoaderReplaceStrategy, TTableSchema, + TPartialTableSchema, + C_DLT_ID, + C_DLT_LOAD_ID, ) from dlt.common.schema.utils import ( get_columns_names_with_prop, @@ -20,6 +24,7 @@ has_column_with_prop, is_nested_table, pipeline_state_table, + get_nested_tables, ) from dlt.destinations.exceptions import DatabaseTransientException @@ -292,3 +297,120 @@ def get_deterministic_temp_table_name(table_name: str, op: str) -> str: op_name = f"{table_name}_{op}" return f"{op_name}_{NamingConvention._compute_tag(op_name, 0.001)}" + + +def update_dlt_schema( + client: WithTableReflection, + schema: Schema, + table_names: Iterable[str] = None, + dry_run: bool = False, +) -> Optional[TSchemaDrop]: + """Updates schema to the storage. + + Compare the schema we think we should have with what actually exists in the destination, + and drop any tables and/or columns that disappeared. + + Args: + table_names (Iterable[str], optional): Check only listed tables. Defaults to None and checks all tables. + + Returns: + Optional[TSchemaTables]: Returns an update that was applied to the schema. + """ + from dlt.destinations.sql_client import WithSqlClient + + if not isinstance(client, WithSqlClient): + raise NotImplementedError + + def _diff_between_actual_and_dlt_schema( + table_name: str, actual_col_names: set[str], disregard_dlt_columns: bool = True + ) -> TPartialTableSchema: + """Returns a partial table schema containing columns that exist in the dlt schema + but are missing from the actual table. Skips dlt internal columns by default. + """ + col_schemas = schema.get_table_columns(table_name) + + # Map escaped -> original names (actual_col_names are escaped) + escaped_to_original = { + client.sql_client.escape_column_name(col, quote=False): col + for col in col_schemas.keys() + } + dropped_col_names = set(escaped_to_original.keys()) - actual_col_names + + if not dropped_col_names: + return {} + + partial_table: TPartialTableSchema = {"name": table_name, "columns": {}} + + for esc_name in dropped_col_names: + orig_name = escaped_to_original[esc_name] + + # Athena doesn't have dlt columns in actual columns. Don't drop them anyway. + if disregard_dlt_columns and orig_name in [C_DLT_ID, C_DLT_LOAD_ID]: + continue + + col_schema = col_schemas[orig_name] + if col_schema.get("increment"): + # We can warn within the for loop, + # since there's only one incremental field per table + logger.warning( + f"An incremental field {orig_name} is being removed from schema." + "You should unset the" + " incremental with `incremental=dlt.sources.incremental.EMPTY`" + ) + partial_table["columns"][orig_name] = col_schema + + return partial_table if partial_table["columns"] else {} + + tables = table_names if table_names else schema.data_table_names() + + table_drops: TSchemaDrop = {} # includes entire tables to drop + column_drops: TSchemaDrop = {} # includes parts of tables to drop as partial tables + + # 1. Detect what needs to be dropped + for table_name in tables: + _, actual_col_schemas = list(client.get_storage_tables([table_name]))[0] + + # no actual column schemas -> + # table doesn't exist -> + # we take entire table schema as a schema drop + if not actual_col_schemas: + table = schema.get_table(table_name) + table_drops[table_name] = table + continue + + # actual column schemas present -> + # we compare actual schemas with dlt ones -> + # we take the difference as a partial table + else: + partial_table = _diff_between_actual_and_dlt_schema( + table_name, + set(actual_col_schemas.keys()), + ) + if partial_table: + column_drops[table_name] = partial_table + + # 2. For entire table drops, we make sure no orphaned tables remain + for table_name in table_drops.copy(): + child_tables = get_nested_tables(schema.tables, table_name) + orphaned_table_names: List[str] = [] + for child_table in child_tables: + if child_table["name"] not in table_drops: + orphaned_table_names.append(child_table["name"]) + if orphaned_table_names: + table_drops.pop(table_name) + logger.warning( + f"Removing table '{table_name}' from the dlt schema would leave orphan" + f" table(s): {'.'.join(repr(t) for t in orphaned_table_names)}. Drop these" + " child tables in the destination and sync the dlt schema again." + ) + + # 3. If it's not a dry run, we actually drop fromt the dlt schema + if not dry_run: + for table_name in table_drops: + schema.tables.pop(table_name) + for table_name, partial_table in column_drops.items(): + col_schemas = partial_table["columns"] + col_names = [col for col in col_schemas] + schema.drop_columns(table_name, col_names) + + return {**table_drops, **column_drops} diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index b2e3af9731..02c0c76afa 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -1055,7 +1055,7 @@ def sync_dlt_schema( schema = self.schemas[schema_name] if schema_name else self.default_schema with self._get_destination_clients(schema)[0] as client: client.initialize_storage() - return client.update_dlt_schema(table_names=table_names, dry_run=dry_run) + return client.update_from_stored_schema(table_names=table_names, dry_run=dry_run) def set_local_state_val(self, key: str, value: Any) -> None: """Sets value in local state. Local state is not synchronized with destination.""" From c0d028a8a0663e365d7a4800214e1ea7cc28f039 Mon Sep 17 00:00:00 2001 From: anuunchin <88698977+anuunchin@users.noreply.github.com> Date: Mon, 1 Sep 2025 16:19:50 +0200 Subject: [PATCH 4/7] Duplicate function removed, dummy implements empty update_from_stored_schema --- dlt/destinations/impl/dummy/dummy.py | 9 ++++- .../impl/filesystem/filesystem.py | 39 ------------------- dlt/destinations/utils.py | 2 +- 3 files changed, 9 insertions(+), 41 deletions(-) diff --git a/dlt/destinations/impl/dummy/dummy.py b/dlt/destinations/impl/dummy/dummy.py index dafe722215..d4980cd791 100644 --- a/dlt/destinations/impl/dummy/dummy.py +++ b/dlt/destinations/impl/dummy/dummy.py @@ -15,7 +15,7 @@ import time from dlt.common.metrics import LoadJobMetrics from dlt.common.pendulum import pendulum -from dlt.common.schema import Schema, TSchemaTables +from dlt.common.schema import Schema, TSchemaTables, TSchemaDrop from dlt.common.storages import FileStorage from dlt.common.storages.load_package import LoadJobInfo from dlt.common.destination import DestinationCapabilitiesContext @@ -160,6 +160,13 @@ def update_stored_schema( ) return applied_update + def update_from_stored_schema( + self, + table_names: Iterable[str] = None, + dry_run: bool = False, + ) -> Optional[TSchemaDrop]: + return None + def create_load_job( self, table: PreparedTableSchema, file_path: str, load_id: str, restore: bool = False ) -> LoadJob: diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index de298579e8..b42f79ed12 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -578,45 +578,6 @@ def verify_schema( raise exceptions[0] return loaded_tables - def _diff_between_actual_and_dlt_schema( - self, table_name: str, actual_col_names: set[str], disregard_dlt_columns: bool = True - ) -> TPartialTableSchema: - """Returns a partial table schema containing columns that exist in the dlt schema - but are missing from the actual table. Skips dlt internal columns by default. - """ - col_schemas = self.schema.get_table_columns(table_name) - - # Map escaped -> original names (actual_col_names are escaped) - escaped_to_original = { - self.sql_client.escape_column_name(col, quote=False): col for col in col_schemas.keys() - } - dropped_col_names = set(escaped_to_original.keys()) - actual_col_names - - if not dropped_col_names: - return {} - - partial_table: TPartialTableSchema = {"name": table_name, "columns": {}} - - for esc_name in dropped_col_names: - orig_name = escaped_to_original[esc_name] - - # Athena doesn't have dlt columns in actual columns. Don't drop them anyway. - if disregard_dlt_columns and orig_name in [C_DLT_ID, C_DLT_LOAD_ID]: - continue - - col_schema = col_schemas[orig_name] - if col_schema.get("increment"): - # We can warn within the for loop, - # since there's only one incremental field per table - logger.warning( - f"An incremental field {orig_name} is being removed from schema." - "You should unset the" - " incremental with `incremental=dlt.sources.incremental.EMPTY`" - ) - partial_table["columns"][orig_name] = col_schema - - return partial_table if partial_table["columns"] else {} - def update_stored_schema( self, only_tables: Iterable[str] = None, diff --git a/dlt/destinations/utils.py b/dlt/destinations/utils.py index d247dedc16..3db92f5bba 100644 --- a/dlt/destinations/utils.py +++ b/dlt/destinations/utils.py @@ -404,7 +404,7 @@ def _diff_between_actual_and_dlt_schema( " child tables in the destination and sync the dlt schema again." ) - # 3. If it's not a dry run, we actually drop fromt the dlt schema + # 3. If it's not a dry run, we actually drop from the dlt schema if not dry_run: for table_name in table_drops: schema.tables.pop(table_name) From ceb5f0423be5e6beb24dc02349ee636568ae75b4 Mon Sep 17 00:00:00 2001 From: anuunchin <88698977+anuunchin@users.noreply.github.com> Date: Mon, 1 Sep 2025 17:35:23 +0200 Subject: [PATCH 5/7] sync_schema deprecated, storage initialization check --- dlt/common/schema/schema.py | 2 +- dlt/common/warnings.py | 9 +++++++++ dlt/destinations/utils.py | 3 +-- dlt/pipeline/pipeline.py | 16 +++++++++++++--- tests/load/pipeline/test_pipelines.py | 2 +- tests/load/pipeline/test_restore_state.py | 8 ++++---- tests/load/pipeline/test_sync_dlt_schema.py | 4 ++-- tests/pipeline/test_dlt_versions.py | 2 +- 8 files changed, 32 insertions(+), 14 deletions(-) diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index 3580b95502..6345c09266 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -367,7 +367,7 @@ def drop_tables( ) -> List[TTableSchema]: """Drops tables from the schema and returns the dropped tables""" result = [] - # TODO: make sure all nested tables to table_names are also dropped + # TODO: make sure all nested tables to table_names are also dropped, for table_name in table_names: table = self.get_table(table_name) if table and (not seen_data_only or utils.has_table_seen_data(table)): diff --git a/dlt/common/warnings.py b/dlt/common/warnings.py index 243a8e2dfa..8ac733385e 100644 --- a/dlt/common/warnings.py +++ b/dlt/common/warnings.py @@ -65,6 +65,15 @@ def __init__(self, message: str, *args: typing.Any, expected_due: VersionString ) +class Dlt1160DeprecationWarning(DltDeprecationWarning): + V1160 = semver.Version.parse("1.16.0") + + def __init__(self, message: str, *args: typing.Any, expected_due: VersionString = None) -> None: + super().__init__( + message, *args, since=Dlt1160DeprecationWarning.V1160, expected_due=expected_due + ) + + # show dlt deprecations once warnings.simplefilter("once", DltDeprecationWarning) diff --git a/dlt/destinations/utils.py b/dlt/destinations/utils.py index 3db92f5bba..6bcd8a5447 100644 --- a/dlt/destinations/utils.py +++ b/dlt/destinations/utils.py @@ -406,8 +406,7 @@ def _diff_between_actual_and_dlt_schema( # 3. If it's not a dry run, we actually drop from the dlt schema if not dry_run: - for table_name in table_drops: - schema.tables.pop(table_name) + schema.drop_tables(list(table_drops.keys())) for table_name, partial_table in column_drops.items(): col_schemas = partial_table["columns"] col_names = [col for col in col_schemas] diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 02c0c76afa..19b585942e 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -99,7 +99,7 @@ ) from dlt.common.schema import Schema from dlt.common.utils import is_interactive, simple_repr, without_none -from dlt.common.warnings import deprecated, Dlt04DeprecationWarning +from dlt.common.warnings import deprecated, Dlt04DeprecationWarning, Dlt1160DeprecationWarning from dlt.common.versioned_state import json_encode_state, json_decode_state from dlt.extract import DltSource @@ -1022,8 +1022,17 @@ def drop_pending_packages(self, with_partial_loads: bool = True) -> None: for load_id in normalize_storage.extracted_packages.list_packages(): normalize_storage.extracted_packages.delete_package(load_id) + @deprecated( + "Please use sync_schema_to_destination instead. The sync_schema is deprecated due to" + " ambiguous naming.", + category=Dlt1160DeprecationWarning, + ) @with_schemas_sync def sync_schema(self, schema_name: str = None) -> TSchemaTables: + return self.sync_schema_to_destination(schema_name) + + @with_schemas_sync + def sync_schema_to_destination(self, schema_name: str = None) -> TSchemaTables: """Synchronizes the destination with the schema `schema_name`. If no name is provided, the default schema will be synchronized.""" if not schema_name and not self.default_schema_name: raise PipelineConfigMissing( @@ -1040,7 +1049,7 @@ def sync_schema(self, schema_name: str = None) -> TSchemaTables: return client.update_stored_schema() @with_schemas_sync - def sync_dlt_schema( + def sync_schema_from_destination( self, schema_name: str = None, table_names: Iterable[str] = None, dry_run: bool = False ) -> Optional[TSchemaDrop]: """Synchronizes the schema `schema_name` with the destination. If no name is provided, the default schema will be synchronized.""" @@ -1054,7 +1063,8 @@ def sync_dlt_schema( ) schema = self.schemas[schema_name] if schema_name else self.default_schema with self._get_destination_clients(schema)[0] as client: - client.initialize_storage() + if not client.is_storage_initialized(): + raise DestinationUndefinedEntity() return client.update_from_stored_schema(table_names=table_names, dry_run=dry_run) def set_local_state_val(self, key: str, value: Any) -> None: diff --git a/tests/load/pipeline/test_pipelines.py b/tests/load/pipeline/test_pipelines.py index f565698495..bdc8ed1402 100644 --- a/tests/load/pipeline/test_pipelines.py +++ b/tests/load/pipeline/test_pipelines.py @@ -279,7 +279,7 @@ def _data(): assert "data_table" in schema.tables assert schema.tables["data_table"]["columns"] == {} - p.sync_schema() + p.sync_schema_to_destination() with p._get_destination_clients(schema)[0] as job_client: # there's some data at all diff --git a/tests/load/pipeline/test_restore_state.py b/tests/load/pipeline/test_restore_state.py index b16ffbc381..0fe096436a 100644 --- a/tests/load/pipeline/test_restore_state.py +++ b/tests/load/pipeline/test_restore_state.py @@ -85,7 +85,7 @@ def test_restore_state_utils(destination_config: DestinationTestConfiguration) - with pytest.raises(DestinationUndefinedEntity): load_pipeline_state_from_destination(p.pipeline_name, job_client) # sync the schema - p.sync_schema() + p.sync_schema_to_destination() # check if schema exists with p.destination_client(p.default_schema.name) as job_client: # type: ignore[assignment] stored_schema = job_client.get_stored_schema(job_client.schema.name) @@ -110,7 +110,7 @@ def test_restore_state_utils(destination_config: DestinationTestConfiguration) - # so dlt in normalize stage infers _state_version table again but with different column order and the column order in schema is different # then in database. parquet is created in schema order and in Redshift it must exactly match the order. # schema.bump_version() - p.sync_schema() + p.sync_schema_to_destination() with p.destination_client(p.default_schema.name) as job_client: # type: ignore[assignment] stored_schema = job_client.get_stored_schema(job_client.schema.name) assert stored_schema is not None @@ -257,7 +257,7 @@ def _make_dn_name(schema_name: str) -> str: p._inject_schema(default_schema) # just sync schema without name - will use default schema - p.sync_schema() + p.sync_schema_to_destination() with p.destination_client() as job_client: assert get_normalized_dataset_name( job_client @@ -275,7 +275,7 @@ def _make_dn_name(schema_name: str) -> str: schema_three = Schema("three") p._inject_schema(schema_three) # sync schema with a name - p.sync_schema(schema_three.name) + p.sync_schema_to_destination(schema_three.name) with p._get_destination_clients(schema_three)[0] as job_client: assert get_normalized_dataset_name( job_client diff --git a/tests/load/pipeline/test_sync_dlt_schema.py b/tests/load/pipeline/test_sync_dlt_schema.py index 1c65b88c9e..a117659d90 100644 --- a/tests/load/pipeline/test_sync_dlt_schema.py +++ b/tests/load/pipeline/test_sync_dlt_schema.py @@ -189,7 +189,7 @@ def test_sync_dlt_schema( # Make sure the warning about orphaned tables is emitted logger_spy = mocker.spy(logger, "warning") - schema_drops = pipeline.sync_dlt_schema() + schema_drops = pipeline.sync_schema_from_destination() logger_spy.assert_called() assert logger_spy.call_count == 1 @@ -219,7 +219,7 @@ def test_sync_dlt_schema( else: _drop_table_in_sql(pipeline, destination_config, "my_last_table__children") - schema_drops = pipeline.sync_dlt_schema() + schema_drops = pipeline.sync_schema_from_destination() # Schema drop should include the "my_last_table" with the child table assert len(schema_drops) == 2 assert "my_last_table" in schema_drops diff --git a/tests/pipeline/test_dlt_versions.py b/tests/pipeline/test_dlt_versions.py index f8ba739c0c..4d5ecafbe9 100644 --- a/tests/pipeline/test_dlt_versions.py +++ b/tests/pipeline/test_dlt_versions.py @@ -461,7 +461,7 @@ def test_load_package_with_dlt_update(test_storage: FileStorage) -> None: with pipeline.managed_state(extract_state=True): pass # this will sync schema to destination - pipeline.sync_schema() + pipeline.sync_schema_to_destination() # we have hash now rows = client.execute_sql(f"SELECT * FROM {PIPELINE_STATE_TABLE_NAME}") assert len(rows[0]) == 6 + 2 From 327c25e6fb5d33ca08b3b0d943fba262e8f457f7 Mon Sep 17 00:00:00 2001 From: anuunchin <88698977+anuunchin@users.noreply.github.com> Date: Tue, 2 Sep 2025 09:43:45 +0200 Subject: [PATCH 6/7] Unnecessary abstract class impls removed, no table reflection exception --- dlt/common/destination/client.py | 8 -------- dlt/common/destination/exceptions.py | 8 ++++++++ dlt/destinations/impl/destination/destination.py | 2 +- dlt/destinations/impl/dummy/dummy.py | 7 ------- dlt/destinations/impl/filesystem/filesystem.py | 7 ------- dlt/destinations/job_client_impl.py | 7 ------- dlt/pipeline/pipeline.py | 12 +++++++++--- 7 files changed, 18 insertions(+), 33 deletions(-) diff --git a/dlt/common/destination/client.py b/dlt/common/destination/client.py index 905ad51c70..210911a1c6 100644 --- a/dlt/common/destination/client.py +++ b/dlt/common/destination/client.py @@ -543,14 +543,6 @@ def update_stored_schema( ) return expected_update - @abstractmethod - def update_from_stored_schema( - self, - table_names: Iterable[str] = None, - dry_run: bool = False, - ) -> Optional[TSchemaDrop]: - pass - def prepare_load_table(self, table_name: str) -> PreparedTableSchema: """Prepares a table schema to be loaded by filling missing hints and doing other modifications requires by given destination. diff --git a/dlt/common/destination/exceptions.py b/dlt/common/destination/exceptions.py index bba9416513..edacb18317 100644 --- a/dlt/common/destination/exceptions.py +++ b/dlt/common/destination/exceptions.py @@ -243,3 +243,11 @@ def __init__(self, pipeline_name: str, destination_name: str) -> None: f"Filesystem Client not available for destination `{destination_name}` in pipeline" f" `{pipeline_name}`", ) + + +class DestinationTableReflectionNotSupported(DestinationTerminalException): + def __init__(self, destination_name: str) -> None: + super().__init__( + f"Destination `{destination_name}` does not support table reflection. " + "Schema synchronization from destination is not available for this destination type." + ) diff --git a/dlt/destinations/impl/destination/destination.py b/dlt/destinations/impl/destination/destination.py index 25e4b93969..4815235f08 100644 --- a/dlt/destinations/impl/destination/destination.py +++ b/dlt/destinations/impl/destination/destination.py @@ -8,7 +8,7 @@ from dlt.common.storages.load_storage import ParsedLoadJobFileName from dlt.common.configuration import create_resolved_partial -from dlt.common.schema import Schema, TSchemaTables +from dlt.common.schema import Schema, TSchemaTables, TSchemaDrop from dlt.common.destination import DestinationCapabilitiesContext from dlt.destinations.impl.destination.configuration import CustomDestinationClientConfiguration diff --git a/dlt/destinations/impl/dummy/dummy.py b/dlt/destinations/impl/dummy/dummy.py index d4980cd791..0ea3c0387a 100644 --- a/dlt/destinations/impl/dummy/dummy.py +++ b/dlt/destinations/impl/dummy/dummy.py @@ -160,13 +160,6 @@ def update_stored_schema( ) return applied_update - def update_from_stored_schema( - self, - table_names: Iterable[str] = None, - dry_run: bool = False, - ) -> Optional[TSchemaDrop]: - return None - def create_load_job( self, table: PreparedTableSchema, file_path: str, load_id: str, restore: bool = False ) -> LoadJob: diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index b42f79ed12..f718135cd3 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -606,13 +606,6 @@ def update_stored_schema( # externally changed return applied_update - def update_from_stored_schema( - self, - table_names: Iterable[str] = None, - dry_run: bool = False, - ) -> Optional[TSchemaDrop]: - return update_dlt_schema(self, self.schema, table_names, dry_run) - def prepare_load_table(self, table_name: str) -> PreparedTableSchema: table = super().prepare_load_table(table_name) if self.config.as_staging_destination: diff --git a/dlt/destinations/job_client_impl.py b/dlt/destinations/job_client_impl.py index c23b7844ff..d84c8e8c17 100644 --- a/dlt/destinations/job_client_impl.py +++ b/dlt/destinations/job_client_impl.py @@ -328,13 +328,6 @@ def update_stored_schema( ) return applied_update - def update_from_stored_schema( - self, - table_names: Iterable[str] = None, - dry_run: bool = False, - ) -> Optional[TSchemaDrop]: - return update_dlt_schema(self, self.schema, table_names, dry_run) - def drop_tables(self, *tables: str, delete_schema: bool = True) -> None: """Drop tables in destination database and optionally delete the stored schema as well. Clients that support ddl transactions will have both operations performed in a single transaction. diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 19b585942e..5950278485 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -37,6 +37,7 @@ DestinationIncompatibleLoaderFileFormatException, DestinationNoStagingMode, DestinationUndefinedEntity, + DestinationTableReflectionNotSupported, ) from dlt.common.runtime import signals, apply_runtime_config from dlt.common.schema.typing import ( @@ -79,6 +80,7 @@ WithStateSync, JobClientBase, DestinationClientStagingConfiguration, + WithTableReflection, ) from dlt.common.destination.exceptions import SqlClientNotAvailable, FSClientNotAvailable from dlt.common.normalizers.naming import NamingConvention @@ -109,8 +111,8 @@ from dlt.normalize.configuration import NormalizeConfiguration from dlt.destinations.sql_client import SqlClientBase, WithSqlClient from dlt.destinations.fs_client import FSClientBase -from dlt.destinations.job_client_impl import SqlJobClientBase from dlt.destinations.dataset import get_destination_clients +from dlt.destinations.utils import update_dlt_schema from dlt.load.configuration import LoaderConfiguration from dlt.load import Load @@ -121,7 +123,6 @@ CannotRestorePipelineException, InvalidPipelineName, PipelineConfigMissing, - PipelineNeverRan, PipelineNotActive, PipelineStepFailed, ) @@ -1065,7 +1066,12 @@ def sync_schema_from_destination( with self._get_destination_clients(schema)[0] as client: if not client.is_storage_initialized(): raise DestinationUndefinedEntity() - return client.update_from_stored_schema(table_names=table_names, dry_run=dry_run) + if isinstance(client, WithTableReflection): + return update_dlt_schema( + client=client, schema=schema, table_names=table_names, dry_run=dry_run + ) + else: + raise DestinationTableReflectionNotSupported(self._destination.destination_name) def set_local_state_val(self, key: str, value: Any) -> None: """Sets value in local state. Local state is not synchronized with destination.""" From d2ad80e7356aac1e56ec69d37a3635118ec0bd2f Mon Sep 17 00:00:00 2001 From: anuunchin <88698977+anuunchin@users.noreply.github.com> Date: Tue, 2 Sep 2025 15:13:51 +0200 Subject: [PATCH 7/7] Better docstrings, var names --- dlt/common/libs/deltalake.py | 11 +- dlt/common/libs/pyiceberg.py | 10 +- .../impl/filesystem/filesystem.py | 31 ++-- dlt/destinations/utils.py | 145 ++++++++++++------ dlt/pipeline/pipeline.py | 9 +- tests/load/pipeline/test_sync_dlt_schema.py | 128 +++++++++++++++- 6 files changed, 248 insertions(+), 86 deletions(-) diff --git a/dlt/common/libs/deltalake.py b/dlt/common/libs/deltalake.py index ac5958a4ed..68c413ec38 100644 --- a/dlt/common/libs/deltalake.py +++ b/dlt/common/libs/deltalake.py @@ -6,9 +6,9 @@ from dlt import version, Pipeline from dlt.common import logger from dlt.common.libs.pyarrow import pyarrow as pa -from dlt.common.libs.pyarrow import cast_arrow_schema_types +from dlt.common.libs.pyarrow import cast_arrow_schema_types, py_arrow_to_table_schema_columns from dlt.common.libs.utils import load_open_tables -from dlt.common.schema.typing import TWriteDisposition, TTableSchema +from dlt.common.schema.typing import TWriteDisposition, TTableSchema, TTableSchemaColumns from dlt.common.schema.utils import get_first_column_name_with_prop, get_columns_names_with_prop from dlt.common.exceptions import MissingDependencyException, ValueErrorWithKnownValues from dlt.common.storages import FilesystemConfiguration @@ -219,7 +219,6 @@ def evolve_delta_table_schema(delta_table: DeltaTable, arrow_schema: pa.Schema) return delta_table -def get_table_columns(table: DeltaTable) -> List[str]: - fields = table.schema().fields - column_names = [field.name for field in fields] - return column_names +def get_table_columns(table: DeltaTable) -> TTableSchemaColumns: + arrow_schema = table.schema().to_pyarrow() + return py_arrow_to_table_schema_columns(arrow_schema) diff --git a/dlt/common/libs/pyiceberg.py b/dlt/common/libs/pyiceberg.py index 2b5d2929d5..5d5c8a5e8a 100644 --- a/dlt/common/libs/pyiceberg.py +++ b/dlt/common/libs/pyiceberg.py @@ -7,10 +7,10 @@ from dlt.common import logger from dlt.common.destination.exceptions import DestinationUndefinedEntity from dlt.common.time import precise_time -from dlt.common.libs.pyarrow import cast_arrow_schema_types +from dlt.common.libs.pyarrow import cast_arrow_schema_types, py_arrow_to_table_schema_columns from dlt.common.libs.utils import load_open_tables from dlt.common.pipeline import SupportsPipeline -from dlt.common.schema.typing import TWriteDisposition, TTableSchema +from dlt.common.schema.typing import TWriteDisposition, TTableSchema, TTableSchemaColumns from dlt.common.schema.utils import get_first_column_name_with_prop, get_columns_names_with_prop from dlt.common.utils import assert_min_pkg_version from dlt.common.exceptions import MissingDependencyException @@ -250,6 +250,6 @@ def make_location(path: str, config: FilesystemConfiguration) -> str: return location -def get_table_columns(table: IcebergTable) -> List[str]: - column_names = table.schema().column_names - return column_names +def get_table_columns(table: IcebergTable) -> TTableSchemaColumns: + arrow_schema = table.schema().as_arrow() + return py_arrow_to_table_schema_columns(arrow_schema) diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index f718135cd3..12682f3754 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -82,7 +82,6 @@ from dlt.destinations.utils import ( verify_schema_merge_disposition, verify_schema_replace_disposition, - update_dlt_schema, ) CURRENT_VERSION: int = 2 @@ -474,8 +473,12 @@ def drop_tables(self, *tables: str, delete_schema: bool = True) -> None: def get_storage_tables( self, table_names: Iterable[str] ) -> Iterable[Tuple[str, TTableSchemaColumns]]: - """Yields tables that have files in storage, returns columns from files in storage for regular delta/iceberg tables, - or from schema for regular tables without table format""" + """Yield (table_name, column_schemas) pairs for tables that have files in storage. + + For Delta and Iceberg tables, the columns present in the actual table metadata + are returned. For tables using regular file formats, the column schemas come from the + dlt schema instead, since their real schema cannot be reflected directly. + """ for table_name in table_names: table_dir = self.get_table_dir(table_name) if ( @@ -492,13 +495,7 @@ def get_storage_tables( ) iceberg_table = self.load_open_table("iceberg", table_name) - actual_column_names = get_iceberg_table_columns(iceberg_table) - - col_schemas = { - col: schema - for col, schema in self.schema.get_table_columns(table_name).items() - if col in actual_column_names - } + col_schemas = get_iceberg_table_columns(iceberg_table) yield (table_name, col_schemas) elif self.is_open_table("delta", table_name): @@ -507,16 +504,16 @@ def get_storage_tables( ) delta_table = self.load_open_table("delta", table_name) - actual_column_names = get_delta_table_columns(delta_table) - - col_schemas = { - col: schema - for col, schema in self.schema.get_table_columns(table_name).items() - if col in actual_column_names - } + col_schemas = get_delta_table_columns(delta_table) yield (table_name, col_schemas) else: + logger.warning( + f"Table '{table_name}' does not use a table format and does not support" + " true schema reflection. Returning column schemas from the dlt" + " schema, which may be stale if the underlying files were manually" + " modified. " + ) yield (table_name, self.schema.get_table_columns(table_name)) else: diff --git a/dlt/destinations/utils.py b/dlt/destinations/utils.py index 6bcd8a5447..9c2fa35c1c 100644 --- a/dlt/destinations/utils.py +++ b/dlt/destinations/utils.py @@ -28,6 +28,7 @@ ) from dlt.destinations.exceptions import DatabaseTransientException +from dlt.destinations.sql_client import WithSqlClient from dlt.extract import DltResource, resource as make_resource, DltSource RE_DATA_TYPE = re.compile(r"([A-Z]+)\((\d+)(?:,\s?(\d+))?\)") @@ -299,76 +300,118 @@ def get_deterministic_temp_table_name(table_name: str, op: str) -> str: return f"{op_name}_{NamingConvention._compute_tag(op_name, 0.001)}" +class WithTableReflectionAndSql(WithTableReflection, WithSqlClient): + pass + + +def _diff_between_actual_and_dlt_schema( + client: WithTableReflectionAndSql, + schema: Schema, + table_name: str, + actual_col_names: set[str], + disregard_dlt_columns: bool = True, +) -> TPartialTableSchema: + """Compares dlt schema with destination table schema and returns columns that appear to be missing. + + This function identifies columns that exist in the dlt schema but are missing from the actual + destination table. It's used during schema synchronization to detect when columns may have + been dropped from the destination and need to be removed from the dlt schema as well. + + However, dlt internal columns (_dlt_id, _dlt_load_id) are treated specially because: + + 1. Users rarely drop dlt internal columns manually, and if they did, + dlt cannot recover from this situation anyway. + + 2. Athena has a constraint where dlt columns exist in the data but not in the table metadata: + + - Athena external tables have fixed schemas defined at CREATE TABLE time + - These columns exist in the actual data files but don't appear in INFORMATION_SCHEMA + - This causes false positives where dlt columns appear "missing" when they're not + + Args: + client (WithTableReflectionAndSql): The destination client with table reflection capabilities. + schema (Schema): The dlt schema to compare against the destination. + table_name (str): Name of the table to analyze. + actual_col_names (set[str]): Column names that actually exist in the destination table, + typically obtained from INFORMATION_SCHEMA queries. For Athena, + this may not include dlt columns present in the underlying data files. + disregard_dlt_columns: Whether to ignore apparent mismatches for dlt internal + columns (_dlt_id, _dlt_load_id). Defaults to True to prevent incorrect + removal of essential dlt columns from the schema. + + Returns: + TPartialTableSchema: Returns a partial table schema containing columns that exist in the dlt schema + but are missing from the actual table. + + Example: + If dlt schema has [user_id, name, _dlt_id, _dlt_load_id] but destination + INFORMATION_SCHEMA only shows [user_id, name], this function would return + an empty dict (assuming disregard_dlt_columns=True) rather than suggesting + the dlt columns should be dropped. + """ + col_schemas = schema.get_table_columns(table_name) + + # Map escaped (like actual_col_names) -> original names (what appears in the dlt schema) + escaped_to_dlt = { + client.sql_client.escape_column_name(col, quote=False): col for col in col_schemas.keys() + } + + possibly_dropped_col_names = set(escaped_to_dlt.keys()) - actual_col_names + + if not possibly_dropped_col_names: + return {} + + partial_table: TPartialTableSchema = {"name": table_name, "columns": {}} + + for esc_name in possibly_dropped_col_names: + name_in_dlt = escaped_to_dlt[esc_name] + + if disregard_dlt_columns and name_in_dlt in [C_DLT_ID, C_DLT_LOAD_ID]: + continue + + col_schema = col_schemas[name_in_dlt] + if col_schema.get("increment"): + # We can warn within the for loop, + # since there's only one incremental field per table + logger.warning( + f"An incremental field {name_in_dlt} is being removed from schema." + "You should unset the" + " incremental with `incremental=dlt.sources.incremental.EMPTY`" + ) + partial_table["columns"][name_in_dlt] = col_schema + + return partial_table if partial_table["columns"] else {} + + def update_dlt_schema( - client: WithTableReflection, + client: WithTableReflectionAndSql, schema: Schema, table_names: Iterable[str] = None, dry_run: bool = False, ) -> Optional[TSchemaDrop]: - """Updates schema to the storage. + """Updates the dlt schema from destination. Compare the schema we think we should have with what actually exists in the destination, and drop any tables and/or columns that disappeared. Args: + client (WithTableReflectionAndSql): The destination client with table reflection capabilities. + schema (Schema): The dlt schema to compare against the destination. table_names (Iterable[str], optional): Check only listed tables. Defaults to None and checks all tables. + dry_run (bool, optional): Whether to actually update the dlt schema. Defaults to False. Returns: - Optional[TSchemaTables]: Returns an update that was applied to the schema. + Optional[TSchemaDrop]: Returns the update that was applied to the schema. """ - from dlt.destinations.sql_client import WithSqlClient - - if not isinstance(client, WithSqlClient): - raise NotImplementedError - - def _diff_between_actual_and_dlt_schema( - table_name: str, actual_col_names: set[str], disregard_dlt_columns: bool = True - ) -> TPartialTableSchema: - """Returns a partial table schema containing columns that exist in the dlt schema - but are missing from the actual table. Skips dlt internal columns by default. - """ - col_schemas = schema.get_table_columns(table_name) - - # Map escaped -> original names (actual_col_names are escaped) - escaped_to_original = { - client.sql_client.escape_column_name(col, quote=False): col - for col in col_schemas.keys() - } - dropped_col_names = set(escaped_to_original.keys()) - actual_col_names - - if not dropped_col_names: - return {} - - partial_table: TPartialTableSchema = {"name": table_name, "columns": {}} - - for esc_name in dropped_col_names: - orig_name = escaped_to_original[esc_name] - - # Athena doesn't have dlt columns in actual columns. Don't drop them anyway. - if disregard_dlt_columns and orig_name in [C_DLT_ID, C_DLT_LOAD_ID]: - continue - - col_schema = col_schemas[orig_name] - if col_schema.get("increment"): - # We can warn within the for loop, - # since there's only one incremental field per table - logger.warning( - f"An incremental field {orig_name} is being removed from schema." - "You should unset the" - " incremental with `incremental=dlt.sources.incremental.EMPTY`" - ) - partial_table["columns"][orig_name] = col_schema - - return partial_table if partial_table["columns"] else {} - tables = table_names if table_names else schema.data_table_names() table_drops: TSchemaDrop = {} # includes entire tables to drop column_drops: TSchemaDrop = {} # includes parts of tables to drop as partial tables # 1. Detect what needs to be dropped + actual_table_col_schemas = dict(client.get_storage_tables(tables)) for table_name in tables: - _, actual_col_schemas = list(client.get_storage_tables([table_name]))[0] + actual_col_schemas = actual_table_col_schemas[table_name] # no actual column schemas -> # table doesn't exist -> @@ -379,10 +422,12 @@ def _diff_between_actual_and_dlt_schema( continue # actual column schemas present -> - # we compare actual schemas with dlt ones -> + # we compare actual column schemas with dlt ones -> # we take the difference as a partial table else: partial_table = _diff_between_actual_and_dlt_schema( + client, + schema, table_name, set(actual_col_schemas.keys()), ) diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 5950278485..63f15396fb 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -112,7 +112,7 @@ from dlt.destinations.sql_client import SqlClientBase, WithSqlClient from dlt.destinations.fs_client import FSClientBase from dlt.destinations.dataset import get_destination_clients -from dlt.destinations.utils import update_dlt_schema +from dlt.destinations.utils import update_dlt_schema, WithTableReflectionAndSql from dlt.load.configuration import LoaderConfiguration from dlt.load import Load @@ -1066,9 +1066,12 @@ def sync_schema_from_destination( with self._get_destination_clients(schema)[0] as client: if not client.is_storage_initialized(): raise DestinationUndefinedEntity() - if isinstance(client, WithTableReflection): + if isinstance(client, WithTableReflection) and isinstance(client, WithSqlClient): return update_dlt_schema( - client=client, schema=schema, table_names=table_names, dry_run=dry_run + client=cast(WithTableReflectionAndSql, client), + schema=schema, + table_names=table_names, + dry_run=dry_run, ) else: raise DestinationTableReflectionNotSupported(self._destination.destination_name) diff --git a/tests/load/pipeline/test_sync_dlt_schema.py b/tests/load/pipeline/test_sync_dlt_schema.py index a117659d90..5f27e18f71 100644 --- a/tests/load/pipeline/test_sync_dlt_schema.py +++ b/tests/load/pipeline/test_sync_dlt_schema.py @@ -1,17 +1,24 @@ import json +import os from typing import cast +import tempfile import pytest from pytest_mock import MockerFixture import dlt from dlt.common.utils import uniq_id +from dlt.common.libs.pyarrow import remove_columns +from dlt.common.libs.pyarrow import pyarrow as pa from dlt.destinations.impl.filesystem.filesystem import FilesystemClient +from dlt.destinations.job_client_impl import SqlJobClientBase from dlt.pipeline.pipeline import Pipeline from dlt.common import logger from tests.pipeline.utils import assert_load_info +from tests.utils import TEST_STORAGE_ROOT from tests.load.utils import ( + FILE_BUCKET, destinations_configs, DestinationTestConfiguration, ) @@ -132,6 +139,32 @@ def _drop_table_in_sql( client.execute_sql(query) +def _drop_col_in_filesystem_json(file_path: str, col_name: str) -> None: + """Removes a specific column from a jsonl file""" + dir_ = os.path.dirname(file_path) + with open(file_path, "r", encoding="utf-8") as r: + with tempfile.NamedTemporaryFile("w", encoding="utf-8", dir=dir_, delete=False) as w: + tmp_path = w.name + for line in r: + obj = json.loads(line) + obj.pop(col_name, None) + json.dump(obj, w) + w.write("\n") + os.replace(tmp_path, file_path) + + +def _drop_col_in_filesystem_parquet(file_path: str, col_name: str) -> None: + """Removes a specific column from a parquet file""" + dir_ = os.path.dirname(file_path) + table = pa.parquet.read_table(file_path) + modified_table = remove_columns(table, [col_name]) + with tempfile.NamedTemporaryFile(suffix=".parquet", dir=dir_, delete=False) as tmp_file: + tmp_path = tmp_file.name + + pa.parquet.write_table(modified_table, tmp_path) + os.replace(tmp_path, file_path) + + @dlt.resource(table_name="my_table") def my_resource(with_col: bool = True): row = {"id": 1, "name": "Liuwen"} @@ -186,6 +219,23 @@ def test_sync_dlt_schema( _drop_column_in_sql(pipeline, destination_config, "my_table", "age") _drop_table_in_sql(pipeline, destination_config, "my_last_table") + # Prove a mismatch between what dlt knows (has in the schema) and what's in the destination + table_names = ["my_table", "my_other_table", "my_last_table", "my_last_table__children"] + dlt_knows = { + table_name: set(pipeline.default_schema.get_table_columns(table_name)) + for table_name in table_names + } + assert "age" in dlt_knows["my_table"] + assert "my_last_table" in dlt_knows + assert dlt_knows["my_last_table"] != set() + + client: SqlJobClientBase + with pipeline.destination_client() as client: # type: ignore[assignment] + actual_tables = dict(client.get_storage_tables(table_names)) + dest_has = {table_name: set(actual_tables[table_name]) for table_name in table_names} + assert "age" not in dest_has["my_table"] + assert dest_has["my_last_table"] == set() + # Make sure the warning about orphaned tables is emitted logger_spy = mocker.spy(logger, "warning") @@ -206,24 +256,92 @@ def test_sync_dlt_schema( assert len(schema_drops["my_table"]["columns"]) == 1 assert "age" in schema_drops["my_table"]["columns"] - # ensure schema doesn't have the "age" column in "my_table" anymore + # Ensure schema doesn't have the "age" column in "my_table" anymore assert "age" not in pipeline.default_schema.tables["my_table"]["columns"] - # ensure "my_other_table" was NOT dropped from schema + # Ensure "my_other_table" was NOT dropped from schema assert "my_last_table" in pipeline.default_schema.tables - # sanity check that the child table is still there + # Sanity check that the child table is still there assert "my_last_table__children" in pipeline.default_schema.tables - # now the user drops the child table as instructed in the warning + # Now, the user drops the child table as instructed in the warning if destination_config.destination_type == "filesystem": _drop_table_in_filesystem(pipeline, destination_config, "my_last_table__children") else: _drop_table_in_sql(pipeline, destination_config, "my_last_table__children") + # Prove a mismatch between what dlt knows about my_last_table__children + # and what's in the destination + dlt_knows = { + "my_last_table__children": set( + pipeline.default_schema.get_table_columns("my_last_table__children") + ) + } + assert dlt_knows["my_last_table__children"] != set() + + with pipeline.destination_client() as client: # type: ignore[assignment] + _, col_schemas = list(client.get_storage_tables(["my_last_table__children"]))[0] + dest_has = {"my_last_table__children": set(col_schemas)} + assert dest_has["my_last_table__children"] == set() + + # Schema drop should now include the "my_last_table" with the child table schema_drops = pipeline.sync_schema_from_destination() - # Schema drop should include the "my_last_table" with the child table assert len(schema_drops) == 2 assert "my_last_table" in schema_drops assert "my_last_table__children" in schema_drops + # Ensure schema doesn't have the "my_last_table" and "my_last_table__children" anymore assert "my_last_table" not in pipeline.default_schema.tables assert "my_last_table__children" not in pipeline.default_schema.tables + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + local_filesystem_configs=True, + ), + ids=lambda x: x.name, +) +def test_regular_filesystem_tables( + destination_config: DestinationTestConfiguration, mocker: MockerFixture +) -> None: + pipeline = destination_config.setup_pipeline(pipeline_name=f"pipe_{uniq_id()}") + + assert_load_info( + pipeline.run([my_resource(), my_other_resource()], **destination_config.run_kwargs) + ) + + # Simulate a scenario where the user manually drops an entire table + _drop_table_in_filesystem(pipeline, destination_config, "my_table") + + # Prove a mismatch between what dlt knows about "my_table" and what's in the destination + dlt_knows = {"my_table": set(pipeline.default_schema.get_table_columns("my_table"))} + assert dlt_knows["my_table"] != set() + + with pipeline.destination_client() as client: + client = cast(FilesystemClient, client) + _, col_schemas = list(client.get_storage_tables(["my_table"]))[0] + dest_has = {"my_table": set(col_schemas)} + assert dest_has["my_table"] == set() + + schema_drops = pipeline.sync_schema_from_destination() + + # Schema drop should include "my_table" + assert len(schema_drops) == 1 + assert "my_table" in schema_drops + # Ensure schema doesn't have the "my_table" anymore + assert "my_table" not in pipeline.default_schema.tables + + # Simulate a scenario where the user manually drops a column + with pipeline.destination_client() as client: + client = cast(FilesystemClient, client) + table_file = client.list_table_files("my_other_table")[0] + if destination_config.run_kwargs.get("loader_file_format") == "jsonl": + _drop_col_in_filesystem_json(table_file, "height") + else: + _drop_col_in_filesystem_parquet(table_file, "height") + + # Make sure syncing from destination does nothing + # because we don't support individual column schema syncing from destination + # for regular filesystem without table format + schema_drops = pipeline.sync_schema_from_destination() + assert not schema_drops