Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add storage_options to DeltaTable.create #1686

Merged
merged 2 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions dlt/common/schema/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,15 @@ def __init__(self, schema_name: str, table_name: str, column: TColumnSchemaBase)
elif column.get("primary_key"):
key_type = "primary key"

msg = f"The column {column['name']} in table {table_name} did not receive any data during this load. "
msg = (
f"The column {column['name']} in table {table_name} did not receive any data during"
" this load. "
)
if key_type or not nullable:
msg += f"It is marked as non-nullable{' '+key_type} and it must have values. "

msg += (
"This can happen if you specify the column manually, for example using the 'merge_key', 'primary_key' or 'columns' argument "
"but it does not exist in the data."
"This can happen if you specify the column manually, for example using the 'merge_key',"
" 'primary_key' or 'columns' argument but it does not exist in the data."
)
super().__init__(schema_name, msg)
4 changes: 3 additions & 1 deletion dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,9 @@ def is_nullable_column(col: TColumnSchemaBase) -> bool:
return col.get("nullable", True)


def find_incomplete_columns(tables: List[TTableSchema]) -> Iterable[Tuple[str, TColumnSchemaBase, bool]]:
def find_incomplete_columns(
tables: List[TTableSchema],
) -> Iterable[Tuple[str, TColumnSchemaBase, bool]]:
"""Yields (table_name, column, nullable) for all incomplete columns in `tables`"""
for table in tables:
for col in table["columns"].values():
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def run(self) -> None:
table_uri=dt_path,
schema=ensure_delta_compatible_arrow_schema(arrow_ds.schema),
mode="overwrite",
storage_options=storage_options,
)
return

Expand Down
5 changes: 4 additions & 1 deletion dlt/normalize/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
from dlt.common.schema.exceptions import UnboundColumnException
from dlt.common import logger


def verify_normalized_schema(schema: Schema) -> None:
"""Verify the schema is valid for next stage after normalization.

1. Log warning if any incomplete nullable columns are in any data tables
2. Raise `UnboundColumnException` on incomplete non-nullable columns (e.g. missing merge/primary key)
"""
for table_name, column, nullable in find_incomplete_columns(schema.data_tables(seen_data_only=True)):
for table_name, column, nullable in find_incomplete_columns(
schema.data_tables(seen_data_only=True)
):
exc = UnboundColumnException(schema.name, table_name, column)
if nullable:
logger.warning(str(exc))
Expand Down
2 changes: 1 addition & 1 deletion tests/load/pipeline/test_filesystem_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ def complex_table():
destinations_configs(
table_format_filesystem_configs=True,
table_format="delta",
bucket_subset=(FILE_BUCKET),
bucket_subset=(FILE_BUCKET, AZ_BUCKET),
),
ids=lambda x: x.name,
)
Expand Down
Loading