diff --git a/.github/workflows/MainDistributionPipeline.yml b/.github/workflows/MainDistributionPipeline.yml index a44f45c5..8ce67153 100644 --- a/.github/workflows/MainDistributionPipeline.yml +++ b/.github/workflows/MainDistributionPipeline.yml @@ -17,7 +17,7 @@ jobs: uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@main with: extension_name: iceberg - duckdb_version: main + duckdb_version: v1.4.0 ci_tools_version: main exclude_archs: 'windows_amd64_mingw' extra_toolchains: 'python3' @@ -29,7 +29,7 @@ jobs: secrets: inherit with: extension_name: iceberg - duckdb_version: main + duckdb_version: v1.4.0 ci_tools_version: main exclude_archs: 'windows_amd64_mingw' deploy_latest: ${{ startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main' }} diff --git a/extension_config.cmake b/extension_config.cmake index e70c9c64..6cc4fdb1 100644 --- a/extension_config.cmake +++ b/extension_config.cmake @@ -17,7 +17,7 @@ duckdb_extension_load(icu) duckdb_extension_load(ducklake LOAD_TESTS GIT_URL https://github.com/duckdb/ducklake - GIT_TAG c1ebd032eb4c763910551c08f4b61bdb8168f209 + GIT_TAG dbb022506e21c27fc4d4cd3d14995af89955401a ) @@ -32,4 +32,3 @@ if (NOT MINGW) endif () endif() - diff --git a/src/iceberg_functions/iceberg_to_ducklake.cpp b/src/iceberg_functions/iceberg_to_ducklake.cpp index ffb080c1..a4b22a75 100644 --- a/src/iceberg_functions/iceberg_to_ducklake.cpp +++ b/src/iceberg_functions/iceberg_to_ducklake.cpp @@ -439,7 +439,7 @@ struct DuckLakeDataFile { } public: - //! Contains the stats used to write the 'ducklake_file_column_statistics' + //! Contains the stats used to write the 'ducklake_file_column_stats' IcebergManifestEntry manifest_entry; DuckLakePartition &partition; @@ -1001,7 +1001,7 @@ struct IcebergToDuckLakeBindData : public TableFunctionData { public: vector CreateSQLStatements() { //! Order to process in: - // - snapshot + // - snapshot + schema_versions // - schema // - table // - partition_info @@ -1026,6 +1026,13 @@ struct IcebergToDuckLakeBindData : public TableFunctionData { auto &snapshot = it.second; auto values = snapshot.FinalizeEntry(serializer); + if (snapshot.catalog_changes) { + auto snapshot_id = snapshot.snapshot_id; + auto schema_version = snapshot.base_schema_version; + sql.push_back( + StringUtil::Format("INSERT INTO {METADATA_CATALOG}.ducklake_schema_versions VALUES (%llu, %llu);", + snapshot_id.GetIndex(), schema_version)); + } sql.push_back(StringUtil::Format("INSERT INTO {METADATA_CATALOG}.ducklake_snapshot %s", values)); } @@ -1083,7 +1090,7 @@ struct IcebergToDuckLakeBindData : public TableFunctionData { auto data_file_id = data_file.data_file_id.GetIndex(); auto &start_snapshot = snapshots.at(data_file.start_snapshot); - //! ducklake_file_column_statistics + //! ducklake_file_column_stats auto columns = table.GetColumnsAtSnapshot(start_snapshot); for (auto &it : columns) { auto column_id = it.first; @@ -1130,11 +1137,11 @@ struct IcebergToDuckLakeBindData : public TableFunctionData { auto contains_nan = stats.has_nan ? "true" : "false"; auto min_value = stats.lower_bound.IsNull() ? "NULL" : "'" + stats.lower_bound.ToString() + "'"; auto max_value = stats.upper_bound.IsNull() ? "NULL" : "'" + stats.upper_bound.ToString() + "'"; - auto values = StringUtil::Format("VALUES(%d, %d, %d, %s, %s, %s, %s, %s, %s);", data_file_id, + auto values = StringUtil::Format("VALUES(%d, %d, %d, %s, %s, %s, %s, %s, %s, NULL);", data_file_id, table_id, column_id, column_size_bytes, value_count, null_count.ToString(), min_value, max_value, contains_nan); - sql.push_back(StringUtil::Format( - "INSERT INTO {METADATA_CATALOG}.ducklake_file_column_statistics %s", values)); + sql.push_back( + StringUtil::Format("INSERT INTO {METADATA_CATALOG}.ducklake_file_column_stats %s", values)); if (!data_file.has_end && !column.has_end && !column.IsNested()) { //! This data file is currently active, collect stats for it @@ -1218,8 +1225,8 @@ struct IcebergToDuckLakeBindData : public TableFunctionData { auto contains_nan = stats.contains_nan ? "true" : "false"; auto min_value = stats.min_value.IsNull() ? "NULL" : "'" + stats.min_value.ToString() + "'"; auto max_value = stats.max_value.IsNull() ? "NULL" : "'" + stats.max_value.ToString() + "'"; - auto values = StringUtil::Format("VALUES(%d, %d, %s, %s, %s, %s);", table_id, column_id, contains_null, - contains_nan, min_value, max_value); + auto values = StringUtil::Format("VALUES(%d, %d, %s, %s, %s, %s, NULL);", table_id, column_id, + contains_null, contains_nan, min_value, max_value); sql.push_back( StringUtil::Format("INSERT INTO {METADATA_CATALOG}.ducklake_table_column_stats %s", values)); } @@ -1286,7 +1293,8 @@ struct IcebergToDuckLakeBindData : public TableFunctionData { changes.push_back(StringUtil::Format("altered_table:%d", table_id)); } auto snapshot_id = snapshot.snapshot_id.GetIndex(); - auto values = StringUtil::Format("VALUES(%d, '%s');", snapshot_id, StringUtil::Join(changes, ",")); + auto values = + StringUtil::Format("VALUES(%d, '%s', NULL, NULL, NULL);", snapshot_id, StringUtil::Join(changes, ",")); sql.push_back(StringUtil::Format("INSERT INTO {METADATA_CATALOG}.ducklake_snapshot_changes %s", values)); } sql.push_back("COMMIT TRANSACTION;"); @@ -1452,8 +1460,9 @@ struct IcebergToDuckLakeGlobalTableFunctionState : public GlobalTableFunctionSta "DuckLake version metadata is corrupt, the value can't be NULL and has to be of type VARCHAR"); } auto version_string = value.GetValue(); - if (version_string != "0.2") { - throw InvalidInputException("'iceberg_to_ducklake' only support version 0.2 currently"); + if (!StringUtil::StartsWith(version_string, "0.3")) { + throw InvalidInputException( + "'iceberg_to_ducklake' only support version 0.3 currently, detected '%s' instead", version_string); } } diff --git a/test/sql/iceberg_to_ducklake.test_slow b/test/sql/iceberg_to_ducklake.test_slow new file mode 100644 index 00000000..57db5d3e --- /dev/null +++ b/test/sql/iceberg_to_ducklake.test_slow @@ -0,0 +1,84 @@ +# name: test/sql/iceberg_to_ducklake.test_slow +# group: [sql] + +require-env ICEBERG_SERVER_AVAILABLE + +require avro + +require parquet + +require iceberg + +require httpfs + +require ducklake + +# Do not ignore 'HTTP' error messages! +set ignore_error_messages + +statement ok +set enable_logging=true + +statement ok +set logging_level='debug' + +statement ok +CREATE SECRET ( + TYPE S3, + KEY_ID 'admin', + SECRET 'password', + ENDPOINT '127.0.0.1:9000', + URL_STYLE 'path', + USE_SSL 0 +); + + +statement ok +ATTACH '' AS my_datalake ( + TYPE ICEBERG, + CLIENT_ID 'admin', + CLIENT_SECRET 'password', + ENDPOINT 'http://127.0.0.1:8181' +); + +statement ok +ATTACH 'ducklake:duckdb:__TEST_DIR__/ducklake.duckdb' as my_ducklake (DATA_PATH '__TEST_DIR__/data_path'); + +statement ok +call iceberg_to_ducklake( + 'my_datalake', + 'my_ducklake', + skip_tables := [ + 'pyspark_iceberg_table_v2', + 'deletion_vectors', + 'variant_column', + 'simple_v3_table' + ] +) + +# These are empty, so they are omitted: +# - insert_all_types +# - simple_v3_table +# - test_not_null +# - tpch + +# These have an ALTER at the end, with no new snapshot added afterwards +# Without a snapshot, that means the change isn't recorded by the conversion +# So the results will not match +# - schema_evolve_struct_in_list +# - schema_evolve_struct_in_map + +foreach table_name lineitem_partitioned_l_shipmode_deletes all_types_table day_timestamp day_timestamptz empty_insert filtering_on_bounds filtering_on_partition_bounds issue_328 lineitem_001_deletes lineitem_partitioned_l_shipmode lineitem_sf_01_1_delete lineitem_sf_01_no_deletes lineitem_sf1_deletes many_adds_deletes nested_types pyspark_iceberg_table_v1 quickstart_table schema_evolve_float_to_double schema_evolve_int_to_bigint schema_evolve_struct schema_evolve_widen_decimal table_more_deletes table_partitioned table_unpartitioned table_with_deletes year_timestamp year_timestamptz + +query I rowsort expected_res +select * from my_datalake.default.${table_name} +---- + +query I rowsort expected_res +select * from my_ducklake.default.${table_name} +---- + +reset label expected_res + +# table_name +endloop