diff --git a/dlt/common/destination/client.py b/dlt/common/destination/client.py index c4b6eddf6a..210911a1c6 100644 --- a/dlt/common/destination/client.py +++ b/dlt/common/destination/client.py @@ -33,12 +33,16 @@ from dlt.common.metrics import LoadJobMetrics from dlt.common.normalizers.naming import NamingConvention -from dlt.common.schema import Schema, TSchemaTables +from dlt.common.schema import Schema, TSchemaTables, TSchemaDrop from dlt.common.schema.typing import ( + C_DLT_ID, C_DLT_LOAD_ID, TLoaderReplaceStrategy, TTableFormat, + TTableSchemaColumns, + TPartialTableSchema, ) +from dlt.common.schema.utils import get_nested_tables from dlt.common.destination.capabilities import DestinationCapabilitiesContext from dlt.common.destination.exceptions import ( DestinationSchemaTampered, @@ -608,6 +612,20 @@ def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]: pass +class WithTableReflection(ABC): + @abstractmethod + def get_storage_tables( + self, table_names: Iterable[str] + ) -> Iterable[Tuple[str, TTableSchemaColumns]]: + """Retrieves table and column information for the specified tables. + + Returns an iterator of tuples (table_name, columns_dict) where columns_dict + contains column schemas for existing tables, or is empty for non-existent tables. + Implementations use database introspection (INFORMATION_SCHEMA, table reflection) or file metadata. + """ + pass + + class WithStagingDataset(ABC): """Adds capability to use staging dataset and request it from the loader""" diff --git a/dlt/common/destination/exceptions.py b/dlt/common/destination/exceptions.py index bba9416513..edacb18317 100644 --- a/dlt/common/destination/exceptions.py +++ b/dlt/common/destination/exceptions.py @@ -243,3 +243,11 @@ def __init__(self, pipeline_name: str, destination_name: str) -> None: f"Filesystem Client not available for destination `{destination_name}` in pipeline" f" `{pipeline_name}`", ) + + +class DestinationTableReflectionNotSupported(DestinationTerminalException): + def __init__(self, destination_name: str) -> None: + super().__init__( + f"Destination `{destination_name}` does not support table reflection. " + "Schema synchronization from destination is not available for this destination type." + ) diff --git a/dlt/common/libs/deltalake.py b/dlt/common/libs/deltalake.py index 7004b09d42..68c413ec38 100644 --- a/dlt/common/libs/deltalake.py +++ b/dlt/common/libs/deltalake.py @@ -6,9 +6,9 @@ from dlt import version, Pipeline from dlt.common import logger from dlt.common.libs.pyarrow import pyarrow as pa -from dlt.common.libs.pyarrow import cast_arrow_schema_types +from dlt.common.libs.pyarrow import cast_arrow_schema_types, py_arrow_to_table_schema_columns from dlt.common.libs.utils import load_open_tables -from dlt.common.schema.typing import TWriteDisposition, TTableSchema +from dlt.common.schema.typing import TWriteDisposition, TTableSchema, TTableSchemaColumns from dlt.common.schema.utils import get_first_column_name_with_prop, get_columns_names_with_prop from dlt.common.exceptions import MissingDependencyException, ValueErrorWithKnownValues from dlt.common.storages import FilesystemConfiguration @@ -217,3 +217,8 @@ def evolve_delta_table_schema(delta_table: DeltaTable, arrow_schema: pa.Schema) if new_fields: delta_table.alter.add_columns(new_fields) return delta_table + + +def get_table_columns(table: DeltaTable) -> TTableSchemaColumns: + arrow_schema = table.schema().to_pyarrow() + return py_arrow_to_table_schema_columns(arrow_schema) diff --git a/dlt/common/libs/pyiceberg.py b/dlt/common/libs/pyiceberg.py index e4c0e9bee8..5d5c8a5e8a 100644 --- a/dlt/common/libs/pyiceberg.py +++ b/dlt/common/libs/pyiceberg.py @@ -7,10 +7,10 @@ from dlt.common import logger from dlt.common.destination.exceptions import DestinationUndefinedEntity from dlt.common.time import precise_time -from dlt.common.libs.pyarrow import cast_arrow_schema_types +from dlt.common.libs.pyarrow import cast_arrow_schema_types, py_arrow_to_table_schema_columns from dlt.common.libs.utils import load_open_tables from dlt.common.pipeline import SupportsPipeline -from dlt.common.schema.typing import TWriteDisposition, TTableSchema +from dlt.common.schema.typing import TWriteDisposition, TTableSchema, TTableSchemaColumns from dlt.common.schema.utils import get_first_column_name_with_prop, get_columns_names_with_prop from dlt.common.utils import assert_min_pkg_version from dlt.common.exceptions import MissingDependencyException @@ -248,3 +248,8 @@ def make_location(path: str, config: FilesystemConfiguration) -> str: # pyiceberg cannot deal with windows absolute urls location = location.replace("file:///", "file://") return location + + +def get_table_columns(table: IcebergTable) -> TTableSchemaColumns: + arrow_schema = table.schema().as_arrow() + return py_arrow_to_table_schema_columns(arrow_schema) diff --git a/dlt/common/schema/__init__.py b/dlt/common/schema/__init__.py index 9cb5e2ab76..1a9f7bf8ed 100644 --- a/dlt/common/schema/__init__.py +++ b/dlt/common/schema/__init__.py @@ -8,6 +8,7 @@ TColumnHint, TColumnSchema, TColumnSchemaBase, + TSchemaDrop, ) from dlt.common.schema.typing import COLUMN_HINTS from dlt.common.schema.schema import Schema, DEFAULT_SCHEMA_CONTRACT_MODE @@ -15,6 +16,7 @@ from dlt.common.schema.utils import verify_schema_hash __all__ = [ + "TSchemaDrop", "TSchemaUpdate", "TSchemaTables", "TTableSchema", diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index d20c755ee3..6345c09266 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -367,7 +367,7 @@ def drop_tables( ) -> List[TTableSchema]: """Drops tables from the schema and returns the dropped tables""" result = [] - # TODO: make sure all nested tables to table_names are also dropped + # TODO: make sure all nested tables to table_names are also dropped, for table_name in table_names: table = self.get_table(table_name) if table and (not seen_data_only or utils.has_table_seen_data(table)): @@ -375,6 +375,18 @@ def drop_tables( self.data_item_normalizer.remove_table(table_name) return result + def drop_columns(self, table_name: str, column_names: Sequence[str]) -> TPartialTableSchema: + """Drops columns from the table schema in place and returns the table schema with the dropped columns""" + table: TPartialTableSchema = {"name": table_name} + dropped_col_schemas: TTableSchemaColumns = {} + + for col in column_names: + col_schema = self._schema_tables[table["name"]]["columns"].pop(col) + dropped_col_schemas[col] = col_schema + + table["columns"] = dropped_col_schemas + return table + def filter_row_with_hint( self, table_name: str, hint_type: TColumnDefaultHint, row: StrAny ) -> StrAny: diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index ad1c8f90c8..83a695408e 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -315,6 +315,7 @@ class TPartialTableSchema(TTableSchema): TSchemaTables = Dict[str, TTableSchema] TSchemaUpdate = Dict[str, List[TPartialTableSchema]] +TSchemaDrop = Dict[str, TPartialTableSchema] TColumnDefaultHint = Literal["not_null", TColumnHint] """Allows using not_null in default hints setting section""" diff --git a/dlt/common/warnings.py b/dlt/common/warnings.py index 243a8e2dfa..8ac733385e 100644 --- a/dlt/common/warnings.py +++ b/dlt/common/warnings.py @@ -65,6 +65,15 @@ def __init__(self, message: str, *args: typing.Any, expected_due: VersionString ) +class Dlt1160DeprecationWarning(DltDeprecationWarning): + V1160 = semver.Version.parse("1.16.0") + + def __init__(self, message: str, *args: typing.Any, expected_due: VersionString = None) -> None: + super().__init__( + message, *args, since=Dlt1160DeprecationWarning.V1160, expected_due=expected_due + ) + + # show dlt deprecations once warnings.simplefilter("once", DltDeprecationWarning) diff --git a/dlt/destinations/impl/athena/athena.py b/dlt/destinations/impl/athena/athena.py index d48ac07bbe..3decb5b3f2 100644 --- a/dlt/destinations/impl/athena/athena.py +++ b/dlt/destinations/impl/athena/athena.py @@ -37,7 +37,11 @@ TSortOrder, ) from dlt.common.destination import DestinationCapabilitiesContext, PreparedTableSchema -from dlt.common.destination.client import FollowupJobRequest, SupportsStagingDestination, LoadJob +from dlt.common.destination.client import ( + FollowupJobRequest, + SupportsStagingDestination, + LoadJob, +) from dlt.destinations.sql_jobs import ( SqlStagingCopyFollowupJob, SqlStagingReplaceFollowupJob, diff --git a/dlt/destinations/impl/destination/destination.py b/dlt/destinations/impl/destination/destination.py index 25e4b93969..4815235f08 100644 --- a/dlt/destinations/impl/destination/destination.py +++ b/dlt/destinations/impl/destination/destination.py @@ -8,7 +8,7 @@ from dlt.common.storages.load_storage import ParsedLoadJobFileName from dlt.common.configuration import create_resolved_partial -from dlt.common.schema import Schema, TSchemaTables +from dlt.common.schema import Schema, TSchemaTables, TSchemaDrop from dlt.common.destination import DestinationCapabilitiesContext from dlt.destinations.impl.destination.configuration import CustomDestinationClientConfiguration diff --git a/dlt/destinations/impl/dummy/dummy.py b/dlt/destinations/impl/dummy/dummy.py index dafe722215..0ea3c0387a 100644 --- a/dlt/destinations/impl/dummy/dummy.py +++ b/dlt/destinations/impl/dummy/dummy.py @@ -15,7 +15,7 @@ import time from dlt.common.metrics import LoadJobMetrics from dlt.common.pendulum import pendulum -from dlt.common.schema import Schema, TSchemaTables +from dlt.common.schema import Schema, TSchemaTables, TSchemaDrop from dlt.common.storages import FileStorage from dlt.common.storages.load_package import LoadJobInfo from dlt.common.destination import DestinationCapabilitiesContext diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 97d156044c..12682f3754 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -23,10 +23,13 @@ from dlt.common.metrics import LoadJobMetrics from dlt.common.schema.exceptions import TableNotFound from dlt.common.schema.typing import ( + C_DLT_ID, C_DLT_LOAD_ID, C_DLT_LOADS_TABLE_LOAD_ID, TTableFormat, TTableSchemaColumns, + TSchemaDrop, + TPartialTableSchema, ) from dlt.common.storages.exceptions import ( CurrentLoadPackageStateNotAvailable, @@ -60,6 +63,7 @@ StorageSchemaInfo, StateInfo, LoadJob, + WithTableReflection, ) from dlt.common.destination.exceptions import ( DestinationUndefinedEntity, @@ -279,6 +283,7 @@ class FilesystemClient( WithStagingDataset, WithStateSync, SupportsOpenTables, + WithTableReflection, ): fs_client: AbstractFileSystem # a path (without the scheme) to a location in the bucket where dataset is present @@ -468,7 +473,12 @@ def drop_tables(self, *tables: str, delete_schema: bool = True) -> None: def get_storage_tables( self, table_names: Iterable[str] ) -> Iterable[Tuple[str, TTableSchemaColumns]]: - """Yields tables that have files in storage, returns columns from current schema""" + """Yield (table_name, column_schemas) pairs for tables that have files in storage. + + For Delta and Iceberg tables, the columns present in the actual table metadata + are returned. For tables using regular file formats, the column schemas come from the + dlt schema instead, since their real schema cannot be reflected directly. + """ for table_name in table_names: table_dir = self.get_table_dir(table_name) if ( @@ -478,7 +488,34 @@ def get_storage_tables( and len(self.list_table_files(table_name)) > 0 ): if table_name in self.schema.tables: - yield (table_name, self.schema.get_table_columns(table_name)) + # If it's an open table, only actually exsiting columns + if self.is_open_table("iceberg", table_name): + from dlt.common.libs.pyiceberg import ( + get_table_columns as get_iceberg_table_columns, + ) + + iceberg_table = self.load_open_table("iceberg", table_name) + col_schemas = get_iceberg_table_columns(iceberg_table) + yield (table_name, col_schemas) + + elif self.is_open_table("delta", table_name): + from dlt.common.libs.deltalake import ( + get_table_columns as get_delta_table_columns, + ) + + delta_table = self.load_open_table("delta", table_name) + col_schemas = get_delta_table_columns(delta_table) + yield (table_name, col_schemas) + + else: + logger.warning( + f"Table '{table_name}' does not use a table format and does not support" + " true schema reflection. Returning column schemas from the dlt" + " schema, which may be stale if the underlying files were manually" + " modified. " + ) + yield (table_name, self.schema.get_table_columns(table_name)) + else: yield (table_name, {"_column": {}}) else: diff --git a/dlt/destinations/job_client_impl.py b/dlt/destinations/job_client_impl.py index 7707e5159b..d84c8e8c17 100644 --- a/dlt/destinations/job_client_impl.py +++ b/dlt/destinations/job_client_impl.py @@ -27,6 +27,7 @@ from dlt.common.destination.utils import resolve_replace_strategy from dlt.common.json import json from dlt.common.schema.typing import ( + C_DLT_ID, C_DLT_LOAD_ID, C_DLT_LOADS_TABLE_LOAD_ID, COLUMN_HINTS, @@ -44,7 +45,13 @@ from dlt.common.utils import read_dialect_and_sql from dlt.common.storages import FileStorage from dlt.common.storages.load_package import LoadJobInfo, ParsedLoadJobFileName -from dlt.common.schema import TColumnSchema, Schema, TTableSchemaColumns, TSchemaTables +from dlt.common.schema import ( + TColumnSchema, + Schema, + TTableSchemaColumns, + TSchemaTables, + TSchemaDrop, +) from dlt.common.schema import TColumnHint from dlt.common.destination.client import ( PreparedTableSchema, @@ -60,6 +67,7 @@ JobClientBase, HasFollowupJobs, CredentialsConfiguration, + WithTableReflection, ) from dlt.destinations.exceptions import DatabaseUndefinedRelation @@ -74,6 +82,7 @@ info_schema_null_to_bool, verify_schema_merge_disposition, verify_schema_replace_disposition, + update_dlt_schema, ) import sqlglot @@ -240,7 +249,7 @@ def __init__( self._bucket_path = ReferenceFollowupJobRequest.resolve_reference(file_path) -class SqlJobClientBase(WithSqlClient, JobClientBase, WithStateSync): +class SqlJobClientBase(WithSqlClient, JobClientBase, WithStateSync, WithTableReflection): def __init__( self, schema: Schema, diff --git a/dlt/destinations/utils.py b/dlt/destinations/utils.py index d82b0f1812..9c2fa35c1c 100644 --- a/dlt/destinations/utils.py +++ b/dlt/destinations/utils.py @@ -1,18 +1,22 @@ import re -from typing import Any, List, Dict, Type, Optional, Sequence, Tuple, cast +from typing import Any, List, Dict, Type, Optional, Sequence, Tuple, cast, Iterable from dlt.common import logger from dlt.common.destination.capabilities import DestinationCapabilitiesContext from dlt.common.destination.typing import PreparedTableSchema from dlt.common.destination.utils import resolve_merge_strategy, resolve_replace_strategy -from dlt.common.schema import Schema +from dlt.common.destination.client import WithTableReflection +from dlt.common.schema import Schema, TSchemaDrop from dlt.common.schema.exceptions import SchemaCorruptedException from dlt.common.schema.typing import ( MERGE_STRATEGIES, TColumnType, TLoaderReplaceStrategy, TTableSchema, + TPartialTableSchema, + C_DLT_ID, + C_DLT_LOAD_ID, ) from dlt.common.schema.utils import ( get_columns_names_with_prop, @@ -20,9 +24,11 @@ has_column_with_prop, is_nested_table, pipeline_state_table, + get_nested_tables, ) from dlt.destinations.exceptions import DatabaseTransientException +from dlt.destinations.sql_client import WithSqlClient from dlt.extract import DltResource, resource as make_resource, DltSource RE_DATA_TYPE = re.compile(r"([A-Z]+)\((\d+)(?:,\s?(\d+))?\)") @@ -292,3 +298,163 @@ def get_deterministic_temp_table_name(table_name: str, op: str) -> str: op_name = f"{table_name}_{op}" return f"{op_name}_{NamingConvention._compute_tag(op_name, 0.001)}" + + +class WithTableReflectionAndSql(WithTableReflection, WithSqlClient): + pass + + +def _diff_between_actual_and_dlt_schema( + client: WithTableReflectionAndSql, + schema: Schema, + table_name: str, + actual_col_names: set[str], + disregard_dlt_columns: bool = True, +) -> TPartialTableSchema: + """Compares dlt schema with destination table schema and returns columns that appear to be missing. + + This function identifies columns that exist in the dlt schema but are missing from the actual + destination table. It's used during schema synchronization to detect when columns may have + been dropped from the destination and need to be removed from the dlt schema as well. + + However, dlt internal columns (_dlt_id, _dlt_load_id) are treated specially because: + + 1. Users rarely drop dlt internal columns manually, and if they did, + dlt cannot recover from this situation anyway. + + 2. Athena has a constraint where dlt columns exist in the data but not in the table metadata: + + - Athena external tables have fixed schemas defined at CREATE TABLE time + - These columns exist in the actual data files but don't appear in INFORMATION_SCHEMA + - This causes false positives where dlt columns appear "missing" when they're not + + Args: + client (WithTableReflectionAndSql): The destination client with table reflection capabilities. + schema (Schema): The dlt schema to compare against the destination. + table_name (str): Name of the table to analyze. + actual_col_names (set[str]): Column names that actually exist in the destination table, + typically obtained from INFORMATION_SCHEMA queries. For Athena, + this may not include dlt columns present in the underlying data files. + disregard_dlt_columns: Whether to ignore apparent mismatches for dlt internal + columns (_dlt_id, _dlt_load_id). Defaults to True to prevent incorrect + removal of essential dlt columns from the schema. + + Returns: + TPartialTableSchema: Returns a partial table schema containing columns that exist in the dlt schema + but are missing from the actual table. + + Example: + If dlt schema has [user_id, name, _dlt_id, _dlt_load_id] but destination + INFORMATION_SCHEMA only shows [user_id, name], this function would return + an empty dict (assuming disregard_dlt_columns=True) rather than suggesting + the dlt columns should be dropped. + """ + col_schemas = schema.get_table_columns(table_name) + + # Map escaped (like actual_col_names) -> original names (what appears in the dlt schema) + escaped_to_dlt = { + client.sql_client.escape_column_name(col, quote=False): col for col in col_schemas.keys() + } + + possibly_dropped_col_names = set(escaped_to_dlt.keys()) - actual_col_names + + if not possibly_dropped_col_names: + return {} + + partial_table: TPartialTableSchema = {"name": table_name, "columns": {}} + + for esc_name in possibly_dropped_col_names: + name_in_dlt = escaped_to_dlt[esc_name] + + if disregard_dlt_columns and name_in_dlt in [C_DLT_ID, C_DLT_LOAD_ID]: + continue + + col_schema = col_schemas[name_in_dlt] + if col_schema.get("increment"): + # We can warn within the for loop, + # since there's only one incremental field per table + logger.warning( + f"An incremental field {name_in_dlt} is being removed from schema." + "You should unset the" + " incremental with `incremental=dlt.sources.incremental.EMPTY`" + ) + partial_table["columns"][name_in_dlt] = col_schema + + return partial_table if partial_table["columns"] else {} + + +def update_dlt_schema( + client: WithTableReflectionAndSql, + schema: Schema, + table_names: Iterable[str] = None, + dry_run: bool = False, +) -> Optional[TSchemaDrop]: + """Updates the dlt schema from destination. + + Compare the schema we think we should have with what actually exists in the destination, + and drop any tables and/or columns that disappeared. + + Args: + client (WithTableReflectionAndSql): The destination client with table reflection capabilities. + schema (Schema): The dlt schema to compare against the destination. + table_names (Iterable[str], optional): Check only listed tables. Defaults to None and checks all tables. + dry_run (bool, optional): Whether to actually update the dlt schema. Defaults to False. + + Returns: + Optional[TSchemaDrop]: Returns the update that was applied to the schema. + """ + tables = table_names if table_names else schema.data_table_names() + + table_drops: TSchemaDrop = {} # includes entire tables to drop + column_drops: TSchemaDrop = {} # includes parts of tables to drop as partial tables + + # 1. Detect what needs to be dropped + actual_table_col_schemas = dict(client.get_storage_tables(tables)) + for table_name in tables: + actual_col_schemas = actual_table_col_schemas[table_name] + + # no actual column schemas -> + # table doesn't exist -> + # we take entire table schema as a schema drop + if not actual_col_schemas: + table = schema.get_table(table_name) + table_drops[table_name] = table + continue + + # actual column schemas present -> + # we compare actual column schemas with dlt ones -> + # we take the difference as a partial table + else: + partial_table = _diff_between_actual_and_dlt_schema( + client, + schema, + table_name, + set(actual_col_schemas.keys()), + ) + if partial_table: + column_drops[table_name] = partial_table + + # 2. For entire table drops, we make sure no orphaned tables remain + for table_name in table_drops.copy(): + child_tables = get_nested_tables(schema.tables, table_name) + orphaned_table_names: List[str] = [] + for child_table in child_tables: + if child_table["name"] not in table_drops: + orphaned_table_names.append(child_table["name"]) + if orphaned_table_names: + table_drops.pop(table_name) + logger.warning( + f"Removing table '{table_name}' from the dlt schema would leave orphan" + f" table(s): {'.'.join(repr(t) for t in orphaned_table_names)}. Drop these" + " child tables in the destination and sync the dlt schema again." + ) + + # 3. If it's not a dry run, we actually drop from the dlt schema + if not dry_run: + schema.drop_tables(list(table_drops.keys())) + for table_name, partial_table in column_drops.items(): + col_schemas = partial_table["columns"] + col_names = [col for col in col_schemas] + schema.drop_columns(table_name, col_names) + + return {**table_drops, **column_drops} diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 764e9b315e..63f15396fb 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -17,6 +17,7 @@ ContextManager, Union, TYPE_CHECKING, + Iterable, ) import dlt @@ -36,6 +37,7 @@ DestinationIncompatibleLoaderFileFormatException, DestinationNoStagingMode, DestinationUndefinedEntity, + DestinationTableReflectionNotSupported, ) from dlt.common.runtime import signals, apply_runtime_config from dlt.common.schema.typing import ( @@ -44,6 +46,7 @@ TWriteDispositionConfig, TAnySchemaColumns, TSchemaContract, + TSchemaDrop, ) from dlt.common.schema.utils import normalize_schema_name from dlt.common.storages.exceptions import LoadPackageNotFound @@ -77,6 +80,7 @@ WithStateSync, JobClientBase, DestinationClientStagingConfiguration, + WithTableReflection, ) from dlt.common.destination.exceptions import SqlClientNotAvailable, FSClientNotAvailable from dlt.common.normalizers.naming import NamingConvention @@ -97,7 +101,7 @@ ) from dlt.common.schema import Schema from dlt.common.utils import is_interactive, simple_repr, without_none -from dlt.common.warnings import deprecated, Dlt04DeprecationWarning +from dlt.common.warnings import deprecated, Dlt04DeprecationWarning, Dlt1160DeprecationWarning from dlt.common.versioned_state import json_encode_state, json_decode_state from dlt.extract import DltSource @@ -107,8 +111,8 @@ from dlt.normalize.configuration import NormalizeConfiguration from dlt.destinations.sql_client import SqlClientBase, WithSqlClient from dlt.destinations.fs_client import FSClientBase -from dlt.destinations.job_client_impl import SqlJobClientBase from dlt.destinations.dataset import get_destination_clients +from dlt.destinations.utils import update_dlt_schema, WithTableReflectionAndSql from dlt.load.configuration import LoaderConfiguration from dlt.load import Load @@ -119,7 +123,6 @@ CannotRestorePipelineException, InvalidPipelineName, PipelineConfigMissing, - PipelineNeverRan, PipelineNotActive, PipelineStepFailed, ) @@ -1020,9 +1023,18 @@ def drop_pending_packages(self, with_partial_loads: bool = True) -> None: for load_id in normalize_storage.extracted_packages.list_packages(): normalize_storage.extracted_packages.delete_package(load_id) + @deprecated( + "Please use sync_schema_to_destination instead. The sync_schema is deprecated due to" + " ambiguous naming.", + category=Dlt1160DeprecationWarning, + ) @with_schemas_sync def sync_schema(self, schema_name: str = None) -> TSchemaTables: - """Synchronizes the schema `schema_name` with the destination. If no name is provided, the default schema will be synchronized.""" + return self.sync_schema_to_destination(schema_name) + + @with_schemas_sync + def sync_schema_to_destination(self, schema_name: str = None) -> TSchemaTables: + """Synchronizes the destination with the schema `schema_name`. If no name is provided, the default schema will be synchronized.""" if not schema_name and not self.default_schema_name: raise PipelineConfigMissing( self.pipeline_name, @@ -1037,6 +1049,33 @@ def sync_schema(self, schema_name: str = None) -> TSchemaTables: client.initialize_storage() return client.update_stored_schema() + @with_schemas_sync + def sync_schema_from_destination( + self, schema_name: str = None, table_names: Iterable[str] = None, dry_run: bool = False + ) -> Optional[TSchemaDrop]: + """Synchronizes the schema `schema_name` with the destination. If no name is provided, the default schema will be synchronized.""" + if not schema_name and not self.default_schema_name: + raise PipelineConfigMissing( + self.pipeline_name, + "default_schema_name", + "load", + "Pipeline contains no schemas. Please extract any data with `extract` or `run`" + " methods.", + ) + schema = self.schemas[schema_name] if schema_name else self.default_schema + with self._get_destination_clients(schema)[0] as client: + if not client.is_storage_initialized(): + raise DestinationUndefinedEntity() + if isinstance(client, WithTableReflection) and isinstance(client, WithSqlClient): + return update_dlt_schema( + client=cast(WithTableReflectionAndSql, client), + schema=schema, + table_names=table_names, + dry_run=dry_run, + ) + else: + raise DestinationTableReflectionNotSupported(self._destination.destination_name) + def set_local_state_val(self, key: str, value: Any) -> None: """Sets value in local state. Local state is not synchronized with destination.""" try: diff --git a/tests/load/pipeline/test_pipelines.py b/tests/load/pipeline/test_pipelines.py index f565698495..bdc8ed1402 100644 --- a/tests/load/pipeline/test_pipelines.py +++ b/tests/load/pipeline/test_pipelines.py @@ -279,7 +279,7 @@ def _data(): assert "data_table" in schema.tables assert schema.tables["data_table"]["columns"] == {} - p.sync_schema() + p.sync_schema_to_destination() with p._get_destination_clients(schema)[0] as job_client: # there's some data at all diff --git a/tests/load/pipeline/test_restore_state.py b/tests/load/pipeline/test_restore_state.py index b16ffbc381..0fe096436a 100644 --- a/tests/load/pipeline/test_restore_state.py +++ b/tests/load/pipeline/test_restore_state.py @@ -85,7 +85,7 @@ def test_restore_state_utils(destination_config: DestinationTestConfiguration) - with pytest.raises(DestinationUndefinedEntity): load_pipeline_state_from_destination(p.pipeline_name, job_client) # sync the schema - p.sync_schema() + p.sync_schema_to_destination() # check if schema exists with p.destination_client(p.default_schema.name) as job_client: # type: ignore[assignment] stored_schema = job_client.get_stored_schema(job_client.schema.name) @@ -110,7 +110,7 @@ def test_restore_state_utils(destination_config: DestinationTestConfiguration) - # so dlt in normalize stage infers _state_version table again but with different column order and the column order in schema is different # then in database. parquet is created in schema order and in Redshift it must exactly match the order. # schema.bump_version() - p.sync_schema() + p.sync_schema_to_destination() with p.destination_client(p.default_schema.name) as job_client: # type: ignore[assignment] stored_schema = job_client.get_stored_schema(job_client.schema.name) assert stored_schema is not None @@ -257,7 +257,7 @@ def _make_dn_name(schema_name: str) -> str: p._inject_schema(default_schema) # just sync schema without name - will use default schema - p.sync_schema() + p.sync_schema_to_destination() with p.destination_client() as job_client: assert get_normalized_dataset_name( job_client @@ -275,7 +275,7 @@ def _make_dn_name(schema_name: str) -> str: schema_three = Schema("three") p._inject_schema(schema_three) # sync schema with a name - p.sync_schema(schema_three.name) + p.sync_schema_to_destination(schema_three.name) with p._get_destination_clients(schema_three)[0] as job_client: assert get_normalized_dataset_name( job_client diff --git a/tests/load/pipeline/test_sync_dlt_schema.py b/tests/load/pipeline/test_sync_dlt_schema.py new file mode 100644 index 0000000000..5f27e18f71 --- /dev/null +++ b/tests/load/pipeline/test_sync_dlt_schema.py @@ -0,0 +1,347 @@ +import json +import os +from typing import cast +import tempfile + +import pytest +from pytest_mock import MockerFixture + +import dlt +from dlt.common.utils import uniq_id +from dlt.common.libs.pyarrow import remove_columns +from dlt.common.libs.pyarrow import pyarrow as pa +from dlt.destinations.impl.filesystem.filesystem import FilesystemClient +from dlt.destinations.job_client_impl import SqlJobClientBase +from dlt.pipeline.pipeline import Pipeline + +from dlt.common import logger +from tests.pipeline.utils import assert_load_info +from tests.utils import TEST_STORAGE_ROOT +from tests.load.utils import ( + FILE_BUCKET, + destinations_configs, + DestinationTestConfiguration, +) + + +def _drop_column_in_filesystem( + pipeline: Pipeline, table_name: str, col: str, table_format: str +) -> None: + client = cast(FilesystemClient, pipeline._fs_client()) + if table_format == "iceberg": + tbl = client.load_open_table("iceberg", table_name) + with tbl.update_schema(allow_incompatible_changes=True) as upd: + upd.delete_column(col) + elif table_format == "delta": + from dlt.common.libs.deltalake import ( + write_deltalake, + get_table_columns, + deltalake_storage_options, + ) + + delta_tbl = client.load_open_table("delta", table_name) + keep_cols = [c for c in get_table_columns(delta_tbl) if c != col] + arrow_tbl = delta_tbl.to_pyarrow_table(columns=keep_cols) + storage_options = deltalake_storage_options(client.config) + write_deltalake( + table_or_uri=client.get_open_table_location("delta", table_name), + data=arrow_tbl, + mode="overwrite", + schema_mode="overwrite", + storage_options=storage_options, + ) + else: + # We don't simulate removal of a column in a plain filesystem destination, + # because it's unlikely that users do it. + # Additionally, the dlt schema sync doesn't support it. + return + + +def _drop_table_in_filesystem( + pipeline: Pipeline, + destination_config: DestinationTestConfiguration, + table_name: str, +) -> None: + client = cast(FilesystemClient, pipeline._fs_client()) + client.drop_tables(table_name) + + +def _drop_column_in_sql( + pipeline: Pipeline, + destination_config: DestinationTestConfiguration, + table_name: str, + col: str, +) -> None: + """All non-filesystem destinations end up here.""" + with pipeline.sql_client() as client: + destination_type, table_format = ( + destination_config.destination_type, + destination_config.table_format, + ) + + if destination_type == "athena" and table_format != "iceberg": + # Athena Hive table need REPLACE COLUMNS syntax + col_defs = [ + f"{client.escape_ddl_identifier('id')} bigint", + f"{client.escape_ddl_identifier('name')} string", + ] + ddl = ( + f"ALTER TABLE {client.make_qualified_ddl_table_name(table_name)} " + f"REPLACE COLUMNS ({','.join(col_defs)})" + ) + elif destination_type == "databricks": + # Enable column-mapping once, then DROP + client.execute_sql( + f"ALTER TABLE {client.make_qualified_table_name(table_name)} " + "SET TBLPROPERTIES(" + "'delta.columnMapping.mode'='name'," + "'delta.minReaderVersion'='2'," + "'delta.minWriterVersion'='5')" + ) + ddl = ( + f"ALTER TABLE {client.make_qualified_table_name(table_name)} " + f"DROP COLUMN {client.escape_column_name(col)}" + ) + else: + qualitified_tbl_name = ( + client.make_qualified_ddl_table_name(table_name) + if destination_type == "athena" and table_format == "iceberg" + else client.make_qualified_table_name(table_name) + ) + qualified_col_name = ( + client.escape_ddl_identifier(col) + if destination_type == "athena" and table_format == "iceberg" + else client.escape_column_name(col) + ) + ddl = f"ALTER TABLE {qualitified_tbl_name} DROP COLUMN {qualified_col_name}" + + client.execute_sql(ddl) + + +def _drop_table_in_sql( + pipeline: Pipeline, + destination_config: DestinationTestConfiguration, + table_name: str, +) -> None: + """All non-filesystem destinations end up here.""" + with pipeline.sql_client() as client: + destination_type = destination_config.destination_type + qualified = ( + client.make_qualified_ddl_table_name(table_name) + if destination_type == "athena" + else client.make_qualified_table_name(table_name) + ) + if destination_type == "clickhouse": + query = f"DROP TABLE {qualified} SYNC;" + else: + query = f"DROP TABLE {qualified};" + + client.execute_sql(query) + + +def _drop_col_in_filesystem_json(file_path: str, col_name: str) -> None: + """Removes a specific column from a jsonl file""" + dir_ = os.path.dirname(file_path) + with open(file_path, "r", encoding="utf-8") as r: + with tempfile.NamedTemporaryFile("w", encoding="utf-8", dir=dir_, delete=False) as w: + tmp_path = w.name + for line in r: + obj = json.loads(line) + obj.pop(col_name, None) + json.dump(obj, w) + w.write("\n") + os.replace(tmp_path, file_path) + + +def _drop_col_in_filesystem_parquet(file_path: str, col_name: str) -> None: + """Removes a specific column from a parquet file""" + dir_ = os.path.dirname(file_path) + table = pa.parquet.read_table(file_path) + modified_table = remove_columns(table, [col_name]) + with tempfile.NamedTemporaryFile(suffix=".parquet", dir=dir_, delete=False) as tmp_file: + tmp_path = tmp_file.name + + pa.parquet.write_table(modified_table, tmp_path) + os.replace(tmp_path, file_path) + + +@dlt.resource(table_name="my_table") +def my_resource(with_col: bool = True): + row = {"id": 1, "name": "Liuwen"} + if with_col: + row["age"] = 40 + yield row + + +@dlt.resource(table_name="my_other_table") +def my_other_resource(): + row = {"id": 1, "name": "Liuwen", "height": 180} + yield row + + +@dlt.resource(table_name="my_last_table") +def my_last_resource(): + row = { + "id": 1, + "name": "Liuwen", + "children": [{"id": 2, "name": "Dawei"}, {"id": 3, "name": "Xiaoyun"}], + } + yield row + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + default_sql_configs=True, + table_format_filesystem_configs=True, + ), + ids=lambda x: x.name, +) +def test_sync_dlt_schema( + destination_config: DestinationTestConfiguration, mocker: MockerFixture +) -> None: + pipeline = destination_config.setup_pipeline(pipeline_name=f"pipe_{uniq_id()}") + + assert_load_info( + pipeline.run( + [my_resource(), my_other_resource(), my_last_resource()], + **destination_config.run_kwargs, + ) + ) + + # Simulate a scenario where the user manually drops + # 1. a column in a table + # 2. a table with a child table + if destination_config.destination_type == "filesystem": + _drop_column_in_filesystem(pipeline, "my_table", "age", destination_config.table_format) + _drop_table_in_filesystem(pipeline, destination_config, "my_last_table") + else: + _drop_column_in_sql(pipeline, destination_config, "my_table", "age") + _drop_table_in_sql(pipeline, destination_config, "my_last_table") + + # Prove a mismatch between what dlt knows (has in the schema) and what's in the destination + table_names = ["my_table", "my_other_table", "my_last_table", "my_last_table__children"] + dlt_knows = { + table_name: set(pipeline.default_schema.get_table_columns(table_name)) + for table_name in table_names + } + assert "age" in dlt_knows["my_table"] + assert "my_last_table" in dlt_knows + assert dlt_knows["my_last_table"] != set() + + client: SqlJobClientBase + with pipeline.destination_client() as client: # type: ignore[assignment] + actual_tables = dict(client.get_storage_tables(table_names)) + dest_has = {table_name: set(actual_tables[table_name]) for table_name in table_names} + assert "age" not in dest_has["my_table"] + assert dest_has["my_last_table"] == set() + + # Make sure the warning about orphaned tables is emitted + logger_spy = mocker.spy(logger, "warning") + + schema_drops = pipeline.sync_schema_from_destination() + + logger_spy.assert_called() + assert logger_spy.call_count == 1 + expected_warning = ( + "Removing table 'my_last_table' from the dlt schema would leave orphan table(s):" + " 'my_last_table__children'. Drop these child tables in the destination and sync the dlt" + " schema again." + ) + assert expected_warning in logger_spy.call_args_list[0][0][0] + + # Schema drop should only include the "age" column of "my_table" + assert len(schema_drops) == 1 + assert "my_table" in schema_drops + assert len(schema_drops["my_table"]["columns"]) == 1 + assert "age" in schema_drops["my_table"]["columns"] + + # Ensure schema doesn't have the "age" column in "my_table" anymore + assert "age" not in pipeline.default_schema.tables["my_table"]["columns"] + # Ensure "my_other_table" was NOT dropped from schema + assert "my_last_table" in pipeline.default_schema.tables + # Sanity check that the child table is still there + assert "my_last_table__children" in pipeline.default_schema.tables + + # Now, the user drops the child table as instructed in the warning + if destination_config.destination_type == "filesystem": + _drop_table_in_filesystem(pipeline, destination_config, "my_last_table__children") + else: + _drop_table_in_sql(pipeline, destination_config, "my_last_table__children") + + # Prove a mismatch between what dlt knows about my_last_table__children + # and what's in the destination + dlt_knows = { + "my_last_table__children": set( + pipeline.default_schema.get_table_columns("my_last_table__children") + ) + } + assert dlt_knows["my_last_table__children"] != set() + + with pipeline.destination_client() as client: # type: ignore[assignment] + _, col_schemas = list(client.get_storage_tables(["my_last_table__children"]))[0] + dest_has = {"my_last_table__children": set(col_schemas)} + assert dest_has["my_last_table__children"] == set() + + # Schema drop should now include the "my_last_table" with the child table + schema_drops = pipeline.sync_schema_from_destination() + assert len(schema_drops) == 2 + assert "my_last_table" in schema_drops + assert "my_last_table__children" in schema_drops + + # Ensure schema doesn't have the "my_last_table" and "my_last_table__children" anymore + assert "my_last_table" not in pipeline.default_schema.tables + assert "my_last_table__children" not in pipeline.default_schema.tables + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + local_filesystem_configs=True, + ), + ids=lambda x: x.name, +) +def test_regular_filesystem_tables( + destination_config: DestinationTestConfiguration, mocker: MockerFixture +) -> None: + pipeline = destination_config.setup_pipeline(pipeline_name=f"pipe_{uniq_id()}") + + assert_load_info( + pipeline.run([my_resource(), my_other_resource()], **destination_config.run_kwargs) + ) + + # Simulate a scenario where the user manually drops an entire table + _drop_table_in_filesystem(pipeline, destination_config, "my_table") + + # Prove a mismatch between what dlt knows about "my_table" and what's in the destination + dlt_knows = {"my_table": set(pipeline.default_schema.get_table_columns("my_table"))} + assert dlt_knows["my_table"] != set() + + with pipeline.destination_client() as client: + client = cast(FilesystemClient, client) + _, col_schemas = list(client.get_storage_tables(["my_table"]))[0] + dest_has = {"my_table": set(col_schemas)} + assert dest_has["my_table"] == set() + + schema_drops = pipeline.sync_schema_from_destination() + + # Schema drop should include "my_table" + assert len(schema_drops) == 1 + assert "my_table" in schema_drops + # Ensure schema doesn't have the "my_table" anymore + assert "my_table" not in pipeline.default_schema.tables + + # Simulate a scenario where the user manually drops a column + with pipeline.destination_client() as client: + client = cast(FilesystemClient, client) + table_file = client.list_table_files("my_other_table")[0] + if destination_config.run_kwargs.get("loader_file_format") == "jsonl": + _drop_col_in_filesystem_json(table_file, "height") + else: + _drop_col_in_filesystem_parquet(table_file, "height") + + # Make sure syncing from destination does nothing + # because we don't support individual column schema syncing from destination + # for regular filesystem without table format + schema_drops = pipeline.sync_schema_from_destination() + assert not schema_drops diff --git a/tests/pipeline/test_dlt_versions.py b/tests/pipeline/test_dlt_versions.py index f8ba739c0c..4d5ecafbe9 100644 --- a/tests/pipeline/test_dlt_versions.py +++ b/tests/pipeline/test_dlt_versions.py @@ -461,7 +461,7 @@ def test_load_package_with_dlt_update(test_storage: FileStorage) -> None: with pipeline.managed_state(extract_state=True): pass # this will sync schema to destination - pipeline.sync_schema() + pipeline.sync_schema_to_destination() # we have hash now rows = client.execute_sql(f"SELECT * FROM {PIPELINE_STATE_TABLE_NAME}") assert len(rows[0]) == 6 + 2