Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion dlt/common/destination/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,16 @@
from dlt.common.metrics import LoadJobMetrics
from dlt.common.normalizers.naming import NamingConvention

from dlt.common.schema import Schema, TSchemaTables
from dlt.common.schema import Schema, TSchemaTables, TSchemaDrop
from dlt.common.schema.typing import (
C_DLT_ID,
C_DLT_LOAD_ID,
TLoaderReplaceStrategy,
TTableFormat,
TTableSchemaColumns,
TPartialTableSchema,
)
from dlt.common.schema.utils import get_nested_tables
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
from dlt.common.destination.exceptions import (
DestinationSchemaTampered,
Expand Down Expand Up @@ -608,6 +612,20 @@ def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]:
pass


class WithTableReflection(ABC):
@abstractmethod
def get_storage_tables(
self, table_names: Iterable[str]
) -> Iterable[Tuple[str, TTableSchemaColumns]]:
"""Retrieves table and column information for the specified tables.

Returns an iterator of tuples (table_name, columns_dict) where columns_dict
contains column schemas for existing tables, or is empty for non-existent tables.
Implementations use database introspection (INFORMATION_SCHEMA, table reflection) or file metadata.
"""
pass


class WithStagingDataset(ABC):
"""Adds capability to use staging dataset and request it from the loader"""

Expand Down
8 changes: 8 additions & 0 deletions dlt/common/destination/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,11 @@ def __init__(self, pipeline_name: str, destination_name: str) -> None:
f"Filesystem Client not available for destination `{destination_name}` in pipeline"
f" `{pipeline_name}`",
)


class DestinationTableReflectionNotSupported(DestinationTerminalException):
def __init__(self, destination_name: str) -> None:
super().__init__(
f"Destination `{destination_name}` does not support table reflection. "
"Schema synchronization from destination is not available for this destination type."
)
9 changes: 7 additions & 2 deletions dlt/common/libs/deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from dlt import version, Pipeline
from dlt.common import logger
from dlt.common.libs.pyarrow import pyarrow as pa
from dlt.common.libs.pyarrow import cast_arrow_schema_types
from dlt.common.libs.pyarrow import cast_arrow_schema_types, py_arrow_to_table_schema_columns
from dlt.common.libs.utils import load_open_tables
from dlt.common.schema.typing import TWriteDisposition, TTableSchema
from dlt.common.schema.typing import TWriteDisposition, TTableSchema, TTableSchemaColumns
from dlt.common.schema.utils import get_first_column_name_with_prop, get_columns_names_with_prop
from dlt.common.exceptions import MissingDependencyException, ValueErrorWithKnownValues
from dlt.common.storages import FilesystemConfiguration
Expand Down Expand Up @@ -217,3 +217,8 @@ def evolve_delta_table_schema(delta_table: DeltaTable, arrow_schema: pa.Schema)
if new_fields:
delta_table.alter.add_columns(new_fields)
return delta_table


def get_table_columns(table: DeltaTable) -> TTableSchemaColumns:
arrow_schema = table.schema().to_pyarrow()
return py_arrow_to_table_schema_columns(arrow_schema)
9 changes: 7 additions & 2 deletions dlt/common/libs/pyiceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
from dlt.common import logger
from dlt.common.destination.exceptions import DestinationUndefinedEntity
from dlt.common.time import precise_time
from dlt.common.libs.pyarrow import cast_arrow_schema_types
from dlt.common.libs.pyarrow import cast_arrow_schema_types, py_arrow_to_table_schema_columns
from dlt.common.libs.utils import load_open_tables
from dlt.common.pipeline import SupportsPipeline
from dlt.common.schema.typing import TWriteDisposition, TTableSchema
from dlt.common.schema.typing import TWriteDisposition, TTableSchema, TTableSchemaColumns
from dlt.common.schema.utils import get_first_column_name_with_prop, get_columns_names_with_prop
from dlt.common.utils import assert_min_pkg_version
from dlt.common.exceptions import MissingDependencyException
Expand Down Expand Up @@ -248,3 +248,8 @@ def make_location(path: str, config: FilesystemConfiguration) -> str:
# pyiceberg cannot deal with windows absolute urls
location = location.replace("file:///", "file://")
return location


def get_table_columns(table: IcebergTable) -> TTableSchemaColumns:
arrow_schema = table.schema().as_arrow()
return py_arrow_to_table_schema_columns(arrow_schema)
2 changes: 2 additions & 0 deletions dlt/common/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
TColumnHint,
TColumnSchema,
TColumnSchemaBase,
TSchemaDrop,
)
from dlt.common.schema.typing import COLUMN_HINTS
from dlt.common.schema.schema import Schema, DEFAULT_SCHEMA_CONTRACT_MODE
from dlt.common.schema.exceptions import DataValidationError
from dlt.common.schema.utils import verify_schema_hash

__all__ = [
"TSchemaDrop",
"TSchemaUpdate",
"TSchemaTables",
"TTableSchema",
Expand Down
14 changes: 13 additions & 1 deletion dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,14 +367,26 @@ def drop_tables(
) -> List[TTableSchema]:
"""Drops tables from the schema and returns the dropped tables"""
result = []
# TODO: make sure all nested tables to table_names are also dropped
# TODO: make sure all nested tables to table_names are also dropped,
for table_name in table_names:
table = self.get_table(table_name)
if table and (not seen_data_only or utils.has_table_seen_data(table)):
result.append(self._schema_tables.pop(table_name))
self.data_item_normalizer.remove_table(table_name)
return result

def drop_columns(self, table_name: str, column_names: Sequence[str]) -> TPartialTableSchema:
"""Drops columns from the table schema in place and returns the table schema with the dropped columns"""
table: TPartialTableSchema = {"name": table_name}
dropped_col_schemas: TTableSchemaColumns = {}

for col in column_names:
col_schema = self._schema_tables[table["name"]]["columns"].pop(col)
dropped_col_schemas[col] = col_schema

table["columns"] = dropped_col_schemas
return table

def filter_row_with_hint(
self, table_name: str, hint_type: TColumnDefaultHint, row: StrAny
) -> StrAny:
Expand Down
1 change: 1 addition & 0 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ class TPartialTableSchema(TTableSchema):

TSchemaTables = Dict[str, TTableSchema]
TSchemaUpdate = Dict[str, List[TPartialTableSchema]]
TSchemaDrop = Dict[str, TPartialTableSchema]
TColumnDefaultHint = Literal["not_null", TColumnHint]
"""Allows using not_null in default hints setting section"""

Expand Down
9 changes: 9 additions & 0 deletions dlt/common/warnings.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ def __init__(self, message: str, *args: typing.Any, expected_due: VersionString
)


class Dlt1160DeprecationWarning(DltDeprecationWarning):
V1160 = semver.Version.parse("1.16.0")

def __init__(self, message: str, *args: typing.Any, expected_due: VersionString = None) -> None:
super().__init__(
message, *args, since=Dlt1160DeprecationWarning.V1160, expected_due=expected_due
)


# show dlt deprecations once
warnings.simplefilter("once", DltDeprecationWarning)

Expand Down
6 changes: 5 additions & 1 deletion dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@
TSortOrder,
)
from dlt.common.destination import DestinationCapabilitiesContext, PreparedTableSchema
from dlt.common.destination.client import FollowupJobRequest, SupportsStagingDestination, LoadJob
from dlt.common.destination.client import (
FollowupJobRequest,
SupportsStagingDestination,
LoadJob,
)
from dlt.destinations.sql_jobs import (
SqlStagingCopyFollowupJob,
SqlStagingReplaceFollowupJob,
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/impl/destination/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dlt.common.storages.load_storage import ParsedLoadJobFileName
from dlt.common.configuration import create_resolved_partial

from dlt.common.schema import Schema, TSchemaTables
from dlt.common.schema import Schema, TSchemaTables, TSchemaDrop
from dlt.common.destination import DestinationCapabilitiesContext

from dlt.destinations.impl.destination.configuration import CustomDestinationClientConfiguration
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/impl/dummy/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import time
from dlt.common.metrics import LoadJobMetrics
from dlt.common.pendulum import pendulum
from dlt.common.schema import Schema, TSchemaTables
from dlt.common.schema import Schema, TSchemaTables, TSchemaDrop
from dlt.common.storages import FileStorage
from dlt.common.storages.load_package import LoadJobInfo
from dlt.common.destination import DestinationCapabilitiesContext
Expand Down
41 changes: 39 additions & 2 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
from dlt.common.metrics import LoadJobMetrics
from dlt.common.schema.exceptions import TableNotFound
from dlt.common.schema.typing import (
C_DLT_ID,
C_DLT_LOAD_ID,
C_DLT_LOADS_TABLE_LOAD_ID,
TTableFormat,
TTableSchemaColumns,
TSchemaDrop,
TPartialTableSchema,
)
from dlt.common.storages.exceptions import (
CurrentLoadPackageStateNotAvailable,
Expand Down Expand Up @@ -60,6 +63,7 @@
StorageSchemaInfo,
StateInfo,
LoadJob,
WithTableReflection,
)
from dlt.common.destination.exceptions import (
DestinationUndefinedEntity,
Expand Down Expand Up @@ -279,6 +283,7 @@ class FilesystemClient(
WithStagingDataset,
WithStateSync,
SupportsOpenTables,
WithTableReflection,
):
fs_client: AbstractFileSystem
# a path (without the scheme) to a location in the bucket where dataset is present
Expand Down Expand Up @@ -468,7 +473,12 @@ def drop_tables(self, *tables: str, delete_schema: bool = True) -> None:
def get_storage_tables(
self, table_names: Iterable[str]
) -> Iterable[Tuple[str, TTableSchemaColumns]]:
"""Yields tables that have files in storage, returns columns from current schema"""
"""Yield (table_name, column_schemas) pairs for tables that have files in storage.

For Delta and Iceberg tables, the columns present in the actual table metadata
are returned. For tables using regular file formats, the column schemas come from the
dlt schema instead, since their real schema cannot be reflected directly.
"""
for table_name in table_names:
table_dir = self.get_table_dir(table_name)
if (
Expand All @@ -478,7 +488,34 @@ def get_storage_tables(
and len(self.list_table_files(table_name)) > 0
):
if table_name in self.schema.tables:
yield (table_name, self.schema.get_table_columns(table_name))
# If it's an open table, only actually exsiting columns
if self.is_open_table("iceberg", table_name):
from dlt.common.libs.pyiceberg import (
get_table_columns as get_iceberg_table_columns,
)

iceberg_table = self.load_open_table("iceberg", table_name)
col_schemas = get_iceberg_table_columns(iceberg_table)
yield (table_name, col_schemas)

elif self.is_open_table("delta", table_name):
from dlt.common.libs.deltalake import (
get_table_columns as get_delta_table_columns,
)

delta_table = self.load_open_table("delta", table_name)
col_schemas = get_delta_table_columns(delta_table)
yield (table_name, col_schemas)

else:
logger.warning(
f"Table '{table_name}' does not use a table format and does not support"
" true schema reflection. Returning column schemas from the dlt"
" schema, which may be stale if the underlying files were manually"
" modified. "
)
yield (table_name, self.schema.get_table_columns(table_name))

Comment on lines +511 to +518
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized that for parquet files we can also just use pyarrow and read actual metadata 👀 , but I still don't think people drop columns in parquet files...

else:
yield (table_name, {"_column": {}})
else:
Expand Down
13 changes: 11 additions & 2 deletions dlt/destinations/job_client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from dlt.common.destination.utils import resolve_replace_strategy
from dlt.common.json import json
from dlt.common.schema.typing import (
C_DLT_ID,
C_DLT_LOAD_ID,
C_DLT_LOADS_TABLE_LOAD_ID,
COLUMN_HINTS,
Expand All @@ -44,7 +45,13 @@
from dlt.common.utils import read_dialect_and_sql
from dlt.common.storages import FileStorage
from dlt.common.storages.load_package import LoadJobInfo, ParsedLoadJobFileName
from dlt.common.schema import TColumnSchema, Schema, TTableSchemaColumns, TSchemaTables
from dlt.common.schema import (
TColumnSchema,
Schema,
TTableSchemaColumns,
TSchemaTables,
TSchemaDrop,
)
from dlt.common.schema import TColumnHint
from dlt.common.destination.client import (
PreparedTableSchema,
Expand All @@ -60,6 +67,7 @@
JobClientBase,
HasFollowupJobs,
CredentialsConfiguration,
WithTableReflection,
)

from dlt.destinations.exceptions import DatabaseUndefinedRelation
Expand All @@ -74,6 +82,7 @@
info_schema_null_to_bool,
verify_schema_merge_disposition,
verify_schema_replace_disposition,
update_dlt_schema,
)

import sqlglot
Expand Down Expand Up @@ -240,7 +249,7 @@ def __init__(
self._bucket_path = ReferenceFollowupJobRequest.resolve_reference(file_path)


class SqlJobClientBase(WithSqlClient, JobClientBase, WithStateSync):
class SqlJobClientBase(WithSqlClient, JobClientBase, WithStateSync, WithTableReflection):
def __init__(
self,
schema: Schema,
Expand Down
Loading
Loading