Skip to content

Commit

Permalink
make duckdb handle Iceberg table with nested types (#2141)
Browse files Browse the repository at this point in the history
* make duckdb handle iceberg table with nested types

* replace duckdb views for iceberg tables

* remove unnecessary context closing and opening

* replace duckdb views for abfss protocol

* restore original destination for write path

* use dev_mode to work around leftover data from previous tests

leftover data caused by #2148
  • Loading branch information
jorritsandbrink authored Dec 15, 2024
1 parent 39c0a01 commit fd5ba0b
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 17 deletions.
25 changes: 16 additions & 9 deletions dlt/destinations/impl/filesystem/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,17 @@ def create_views_for_tables(self, tables: Dict[str, str]) -> None:
# unknown views will not be created
continue

# only create view if it does not exist in the current schema yet
existing_tables = [tname[0] for tname in self._conn.execute("SHOW TABLES").fetchall()]
if view_name in existing_tables:
continue

# NOTE: if this is staging configuration then `prepare_load_table` will remove some info
# from table schema, if we ever extend this to handle staging destination, this needs to change
schema_table = self.fs_client.prepare_load_table(table_name)
table_format = schema_table.get("table_format")

# skip if view already exists and does not need to be replaced each time
existing_tables = [tname[0] for tname in self._conn.execute("SHOW TABLES").fetchall()]
needs_replace = table_format == "iceberg" or self.fs_client.config.protocol == "abfss"
if view_name in existing_tables and not needs_replace:
continue

# discover file type
folder = self.fs_client.get_table_dir(table_name)
files = self.fs_client.list_table_files(table_name)
Expand Down Expand Up @@ -258,15 +261,17 @@ def create_views_for_tables(self, tables: Dict[str, str]) -> None:

# create from statement
from_statement = ""
if schema_table.get("table_format") == "delta":
if table_format == "delta":
from_statement = f"delta_scan('{resolved_folder}')"
elif schema_table.get("table_format") == "iceberg":
elif table_format == "iceberg":
from dlt.common.libs.pyiceberg import _get_last_metadata_file

self._setup_iceberg(self._conn)
metadata_path = f"{resolved_folder}/metadata"
last_metadata_file = _get_last_metadata_file(metadata_path, self.fs_client)
from_statement = f"iceberg_scan('{last_metadata_file}')"
# skip schema inference to make nested data types work
# https://github.com/duckdb/duckdb_iceberg/issues/47
from_statement = f"iceberg_scan('{last_metadata_file}', skip_schema_inference=True)"
elif first_file_type == "parquet":
from_statement = f"read_parquet([{resolved_files_string}])"
elif first_file_type == "jsonl":
Expand All @@ -281,7 +286,9 @@ def create_views_for_tables(self, tables: Dict[str, str]) -> None:

# create table
view_name = self.make_qualified_table_name(view_name)
create_table_sql_base = f"CREATE VIEW {view_name} AS SELECT * FROM {from_statement}"
create_table_sql_base = (
f"CREATE OR REPLACE VIEW {view_name} AS SELECT * FROM {from_statement}"
)
self._conn.execute(create_table_sql_base)

@contextmanager
Expand Down
42 changes: 34 additions & 8 deletions tests/load/filesystem/test_sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
)
from dlt.destinations import filesystem
from tests.utils import TEST_STORAGE_ROOT
from tests.cases import arrow_table_all_data_types
from dlt.destinations.exceptions import DatabaseUndefinedRelation


Expand Down Expand Up @@ -81,12 +82,17 @@ def double_items():
for i in range(total_records)
]

return [items, double_items]
@dlt.resource(table_format=table_format)
def arrow_all_types():
yield arrow_table_all_data_types("arrow-table", num_rows=total_records)[0]

return [items, double_items, arrow_all_types]

# run source
pipeline.run(source(), loader_file_format=destination_config.file_format)

if alternate_access_pipeline:
orig_dest = pipeline.destination
pipeline.destination = alternate_access_pipeline.destination

import duckdb
Expand All @@ -96,8 +102,11 @@ def double_items():
DuckDbCredentials,
)

# check we can create new tables from the views
with pipeline.sql_client() as c:
# check if all data types are handled properly
c.execute_sql("SELECT * FROM arrow_all_types;")

# check we can create new tables from the views
c.execute_sql(
"CREATE TABLE items_joined AS (SELECT i.id, di.double_id FROM items as i JOIN"
" double_items as di ON (i.id = di.id));"
Expand All @@ -109,16 +118,14 @@ def double_items():
assert list(joined_table[5]) == [5, 10]
assert list(joined_table[10]) == [10, 20]

# inserting values into a view should fail gracefully
with pipeline.sql_client() as c:
# inserting values into a view should fail gracefully
try:
c.execute_sql("INSERT INTO double_items VALUES (1, 2)")
except Exception as exc:
assert "double_items is not an table" in str(exc)

# check that no automated views are created for a schema different than
# the known one
with pipeline.sql_client() as c:
# check that no automated views are created for a schema different than
# the known one
c.execute_sql("CREATE SCHEMA other_schema;")
with pytest.raises(DatabaseUndefinedRelation):
with c.execute_query("SELECT * FROM other_schema.items ORDER BY id ASC;") as cursor:
Expand Down Expand Up @@ -172,6 +179,24 @@ def _fs_sql_client_for_external_db(
# views exist
assert len(external_db.sql("SELECT * FROM second.referenced_items").fetchall()) == total_records
assert len(external_db.sql("SELECT * FROM first.items").fetchall()) == 3

# test if view reflects source table accurately after it has changed
# conretely, this tests if an existing view is replaced with formats that need it, such as
# `iceberg` table format
with fs_sql_client as sql_client:
sql_client.create_views_for_tables({"arrow_all_types": "arrow_all_types"})
assert external_db.sql("FROM second.arrow_all_types;").arrow().num_rows == total_records
if alternate_access_pipeline:
# switch back for the write path
pipeline.destination = orig_dest
pipeline.run( # run pipeline again to add rows to source table
source().with_resources("arrow_all_types"),
loader_file_format=destination_config.file_format,
)
with fs_sql_client as sql_client:
sql_client.create_views_for_tables({"arrow_all_types": "arrow_all_types"})
assert external_db.sql("FROM second.arrow_all_types;").arrow().num_rows == (2 * total_records)

external_db.close()

# in case we are not connecting to a bucket that needs secrets, views should still be here after connection reopen
Expand Down Expand Up @@ -298,6 +323,7 @@ def test_table_formats(
pipeline = destination_config.setup_pipeline(
"read_pipeline",
dataset_name="read_test",
dev_mode=True,
)

# in case of gcs we use the s3 compat layer for reading
Expand All @@ -310,7 +336,7 @@ def test_table_formats(
GCS_BUCKET.replace("gs://", "s3://"), destination_name="filesystem_s3_gcs_comp"
)
access_pipeline = destination_config.setup_pipeline(
"read_pipeline", dataset_name="read_test", destination=gcp_bucket
"read_pipeline", dataset_name="read_test", dev_mode=True, destination=gcp_bucket
)

_run_dataset_checks(
Expand Down

0 comments on commit fd5ba0b

Please sign in to comment.