Skip to content

Commit

Permalink
Add storage_options to DeltaTable.create (#1686)
Browse files Browse the repository at this point in the history
* add storage_options to delta table create statement
  • Loading branch information
jorritsandbrink authored Aug 14, 2024
1 parent e42f4d7 commit a9c2958
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 6 deletions.
9 changes: 6 additions & 3 deletions dlt/common/schema/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,15 @@ def __init__(self, schema_name: str, table_name: str, column: TColumnSchemaBase)
elif column.get("primary_key"):
key_type = "primary key"

msg = f"The column {column['name']} in table {table_name} did not receive any data during this load. "
msg = (
f"The column {column['name']} in table {table_name} did not receive any data during"
" this load. "
)
if key_type or not nullable:
msg += f"It is marked as non-nullable{' '+key_type} and it must have values. "

msg += (
"This can happen if you specify the column manually, for example using the 'merge_key', 'primary_key' or 'columns' argument "
"but it does not exist in the data."
"This can happen if you specify the column manually, for example using the 'merge_key',"
" 'primary_key' or 'columns' argument but it does not exist in the data."
)
super().__init__(schema_name, msg)
4 changes: 3 additions & 1 deletion dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,9 @@ def is_nullable_column(col: TColumnSchemaBase) -> bool:
return col.get("nullable", True)


def find_incomplete_columns(tables: List[TTableSchema]) -> Iterable[Tuple[str, TColumnSchemaBase, bool]]:
def find_incomplete_columns(
tables: List[TTableSchema],
) -> Iterable[Tuple[str, TColumnSchemaBase, bool]]:
"""Yields (table_name, column, nullable) for all incomplete columns in `tables`"""
for table in tables:
for col in table["columns"].values():
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def run(self) -> None:
table_uri=dt_path,
schema=ensure_delta_compatible_arrow_schema(arrow_ds.schema),
mode="overwrite",
storage_options=storage_options,
)
return

Expand Down
5 changes: 4 additions & 1 deletion dlt/normalize/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
from dlt.common.schema.exceptions import UnboundColumnException
from dlt.common import logger


def verify_normalized_schema(schema: Schema) -> None:
"""Verify the schema is valid for next stage after normalization.
1. Log warning if any incomplete nullable columns are in any data tables
2. Raise `UnboundColumnException` on incomplete non-nullable columns (e.g. missing merge/primary key)
"""
for table_name, column, nullable in find_incomplete_columns(schema.data_tables(seen_data_only=True)):
for table_name, column, nullable in find_incomplete_columns(
schema.data_tables(seen_data_only=True)
):
exc = UnboundColumnException(schema.name, table_name, column)
if nullable:
logger.warning(str(exc))
Expand Down
2 changes: 1 addition & 1 deletion tests/load/pipeline/test_filesystem_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ def complex_table():
destinations_configs(
table_format_filesystem_configs=True,
table_format="delta",
bucket_subset=(FILE_BUCKET),
bucket_subset=(FILE_BUCKET, AZ_BUCKET),
),
ids=lambda x: x.name,
)
Expand Down

0 comments on commit a9c2958

Please sign in to comment.