From d0ce2add9587519dc8d616b6ac06f2092a8df6be Mon Sep 17 00:00:00 2001 From: Teague Sterling Date: Sun, 4 Aug 2024 14:53:29 -0700 Subject: [PATCH 1/8] Adding default printf format string for metatdata version file name Signed-off-by: Teague Sterling --- src/include/iceberg_metadata.hpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/include/iceberg_metadata.hpp b/src/include/iceberg_metadata.hpp index a6b7b319..43bc4e04 100644 --- a/src/include/iceberg_metadata.hpp +++ b/src/include/iceberg_metadata.hpp @@ -16,6 +16,9 @@ using namespace duckdb_yyjson; namespace duckdb { +// First arg is version string, arg is either empty or ".gz" if gzip +static string DEFAULT_TABLE_VERSION_FORMAT = "v%s%s.metadata.json"; + struct IcebergColumnDefinition { public: static IcebergColumnDefinition ParseFromJson(yyjson_val *val); @@ -124,4 +127,4 @@ struct IcebergTable { string path; }; -} // namespace duckdb \ No newline at end of file +} // namespace duckdb From 47a52254f809e1b75cb6aab911c5a1265c1c2142 Mon Sep 17 00:00:00 2001 From: Teague Sterling Date: Sun, 4 Aug 2024 14:54:05 -0700 Subject: [PATCH 2/8] Replace hard-coded format with printf-style format function and default format string Signed-off-by: Teague Sterling --- src/common/iceberg.cpp | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/common/iceberg.cpp b/src/common/iceberg.cpp index 669ebe4d..b491201b 100644 --- a/src/common/iceberg.cpp +++ b/src/common/iceberg.cpp @@ -167,14 +167,18 @@ IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(const string &path, File return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec, skip_schema_inference); } -// Function to generate a metadata file url -string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, const string &table_version, const string &metadata_compression_codec) { - if (metadata_compression_codec != "gzip") { - return fs.JoinPath(meta_path, "v" + table_version + ".metadata.json"); +// Function to generate a metadata file url from version and format string +// default format is "v%s%s.metadata.json" -> v00###-xxxxxxxxx-.gz.metadata.json" +string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, const string &table_version, const string &metadata_compression_codec, const string &version_format = DEFAULT_TABLE_VERSION_FORMAT) { + // TODO: Need to URL Encode table_version + string compression_suffix = ""; + if (metadata_compression_codec == "gzip") { + compression_suffix = ".gz"; } - return fs.JoinPath(meta_path, "v" + table_version + ".gz.metadata.json"); + return fs.JoinPath(meta_path, StringUtil::Format(version_format, table_version, compression_suffix)); } + string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs, string metadata_compression_codec) { string metadata_file_path; if (StringUtil::EndsWith(path, ".json")) { @@ -288,4 +292,4 @@ yyjson_val *IcebergSnapshot::IcebergSnapshot::FindSnapshotByIdTimestampInternal( return max_snapshot; } -} // namespace duckdb \ No newline at end of file +} // namespace duckdb From 1ff816fc394679d510aaef4a468b64a093099c78 Mon Sep 17 00:00:00 2001 From: Teague Sterling Date: Sat, 10 Aug 2024 14:32:31 -0700 Subject: [PATCH 3/8] Adding parameters for table_version and version_file_format to iceberg functions Signed-off-by: Teague Sterling --- src/common/iceberg.cpp | 30 +++++++++++++-------- src/iceberg_functions/iceberg_metadata.cpp | 20 +++++++++++--- src/iceberg_functions/iceberg_scan.cpp | 19 ++++++++++--- src/iceberg_functions/iceberg_snapshots.cpp | 17 +++++++++++- src/include/iceberg_metadata.hpp | 13 +++++---- 5 files changed, 76 insertions(+), 23 deletions(-) diff --git a/src/common/iceberg.cpp b/src/common/iceberg.cpp index b491201b..3f4db898 100644 --- a/src/common/iceberg.cpp +++ b/src/common/iceberg.cpp @@ -169,7 +169,7 @@ IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(const string &path, File // Function to generate a metadata file url from version and format string // default format is "v%s%s.metadata.json" -> v00###-xxxxxxxxx-.gz.metadata.json" -string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, const string &table_version, const string &metadata_compression_codec, const string &version_format = DEFAULT_TABLE_VERSION_FORMAT) { +string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, string &table_version, string &metadata_compression_codec, string &version_format = DEFAULT_TABLE_VERSION_FORMAT) { // TODO: Need to URL Encode table_version string compression_suffix = ""; if (metadata_compression_codec == "gzip") { @@ -179,22 +179,30 @@ string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, const string } -string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs, string metadata_compression_codec) { - string metadata_file_path; +string IcebergSnapshot::GetMetaDataPath(const string &path, FileSystem &fs, string metadata_compression_codec, string table_version = DEFAULT_VERSION_HINT_FILE, string version_format = DEFAULT_TABLE_VERSION_FORMAT) { if (StringUtil::EndsWith(path, ".json")) { - metadata_file_path = path; + return path; + } + + auto meta_path = fs.JoinPath(path, "metadata"); + string version_hint; + if(StringUtil::EndsWith(table_version, ".txt")) { + version_hint = GetTableVersion(path, fs, version_format); } else { - auto table_version = GetTableVersion(path, fs); - auto meta_path = fs.JoinPath(path, "metadata"); - metadata_file_path = GenerateMetaDataUrl(fs, meta_path, table_version, metadata_compression_codec); + version_hint = table_version; } + return GenerateMetaDataUrl(fs, meta_path, version_hint, metadata_compression_codec, version_format); +} + +string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs, string metadata_compression_codec) { if (metadata_compression_codec == "gzip") { - return IcebergUtils::GzFileToString(metadata_file_path, fs); + return IcebergUtils::GzFileToString(path, fs); } - return IcebergUtils::FileToString(metadata_file_path, fs); + return IcebergUtils::FileToString(path, fs); } + IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version, idx_t schema_id, vector &schemas, string metadata_compression_codec, bool skip_schema_inference) { @@ -221,9 +229,9 @@ IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t icebe return ret; } -string IcebergSnapshot::GetTableVersion(const string &path, FileSystem &fs) { +string IcebergSnapshot::GetTableVersion(const string &path, FileSystem &fs, string version_file = DEFAULT_VERSION_HINT_FILE) { auto meta_path = fs.JoinPath(path, "metadata"); - auto version_file_path = fs.JoinPath(meta_path, "version-hint.text"); + auto version_file_path = fs.JoinPath(meta_path, version_file); auto version_file_content = IcebergUtils::FileToString(version_file_path, fs); try { diff --git a/src/iceberg_functions/iceberg_metadata.cpp b/src/iceberg_functions/iceberg_metadata.cpp index 2b5f857a..d2664ac9 100644 --- a/src/iceberg_functions/iceberg_metadata.cpp +++ b/src/iceberg_functions/iceberg_metadata.cpp @@ -57,6 +57,8 @@ static unique_ptr IcebergMetaDataBind(ClientContext &context, Tabl bool allow_moved_paths = false; string metadata_compression_codec = "none"; bool skip_schema_inference = false; + string table_version = DEFAULT_VERSION_HINT_FILE; + string version_name_format = DEFAULT_TABLE_VERSION_FORMAT; for (auto &kv : input.named_parameters) { auto loption = StringUtil::Lower(kv.first); @@ -66,20 +68,26 @@ static unique_ptr IcebergMetaDataBind(ClientContext &context, Tabl metadata_compression_codec = StringValue::Get(kv.second); } else if (loption == "skip_schema_inference") { skip_schema_inference = BooleanValue::Get(kv.second); + } else if (loption == "version") { + table_version = StringValue::Get(kv.second); + } else if (loption == "version_name_format") { + version_name_format = StringValue::Get(kv.second); } } + + auto iceberg_meta_path = IcebergSnapshot::GetMetaDataPath(iceberg_path, fs, metadata_compression_codec, table_version, version_name_format); IcebergSnapshot snapshot_to_scan; if (input.inputs.size() > 1) { if (input.inputs[1].type() == LogicalType::UBIGINT) { - snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue(), metadata_compression_codec, skip_schema_inference); + snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_meta_path, fs, input.inputs[1].GetValue(), metadata_compression_codec, skip_schema_inference); } else if (input.inputs[1].type() == LogicalType::TIMESTAMP) { snapshot_to_scan = - IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue(), metadata_compression_codec, skip_schema_inference); + IcebergSnapshot::GetSnapshotByTimestamp(iceberg_meta_path, fs, input.inputs[1].GetValue(), metadata_compression_codec, skip_schema_inference); } else { throw InvalidInputException("Unknown argument type in IcebergScanBindReplace."); } } else { - snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec, skip_schema_inference); + snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_meta_path, fs, metadata_compression_codec, skip_schema_inference); } ret->iceberg_table = @@ -146,6 +154,8 @@ TableFunctionSet IcebergFunctions::GetIcebergMetadataFunction() { fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; + fun.named_parameters["version"] = LogicalType::VARCHAR; + fun.named_parameters["version_name_format"] = LogicalType::VARCHAR; function_set.AddFunction(fun); fun = TableFunction({LogicalType::VARCHAR, LogicalType::UBIGINT}, IcebergMetaDataFunction, IcebergMetaDataBind, @@ -153,6 +163,8 @@ TableFunctionSet IcebergFunctions::GetIcebergMetadataFunction() { fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; + fun.named_parameters["version"] = LogicalType::VARCHAR; + fun.named_parameters["version_name_format"] = LogicalType::VARCHAR; function_set.AddFunction(fun); fun = TableFunction({LogicalType::VARCHAR, LogicalType::TIMESTAMP}, IcebergMetaDataFunction, IcebergMetaDataBind, @@ -160,6 +172,8 @@ TableFunctionSet IcebergFunctions::GetIcebergMetadataFunction() { fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; + fun.named_parameters["version"] = LogicalType::VARCHAR; + fun.named_parameters["version_name_format"] = LogicalType::VARCHAR; function_set.AddFunction(fun); return function_set; diff --git a/src/iceberg_functions/iceberg_scan.cpp b/src/iceberg_functions/iceberg_scan.cpp index fdb4934f..4e0b5cc9 100644 --- a/src/iceberg_functions/iceberg_scan.cpp +++ b/src/iceberg_functions/iceberg_scan.cpp @@ -214,6 +214,8 @@ static unique_ptr IcebergScanBindReplace(ClientContext &context, Table bool skip_schema_inference = false; string mode = "default"; string metadata_compression_codec = "none"; + string table_version = DEFAULT_VERSION_HINT_FILE; + string version_name_format = DEFAULT_TABLE_VERSION_FORMAT; for (auto &kv : input.named_parameters) { auto loption = StringUtil::Lower(kv.first); @@ -229,20 +231,25 @@ static unique_ptr IcebergScanBindReplace(ClientContext &context, Table metadata_compression_codec = StringValue::Get(kv.second); } else if (loption == "skip_schema_inference") { skip_schema_inference = BooleanValue::Get(kv.second); + } else if (loption == "version") { + table_version = StringValue::Get(kv.second); + } else if (loption == "version_name_format") { + version_name_format = StringValue::Get(kv.second); } } + auto iceberg_meta_path = IcebergSnapshot::GetMetaDataPath(iceberg_path, fs, metadata_compression_codec, table_version, version_name_format); IcebergSnapshot snapshot_to_scan; if (input.inputs.size() > 1) { if (input.inputs[1].type() == LogicalType::UBIGINT) { - snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue(), metadata_compression_codec, skip_schema_inference); + snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_meta_path, fs, input.inputs[1].GetValue(), metadata_compression_codec, skip_schema_inference); } else if (input.inputs[1].type() == LogicalType::TIMESTAMP) { snapshot_to_scan = - IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue(), metadata_compression_codec, skip_schema_inference); + IcebergSnapshot::GetSnapshotByTimestamp(iceberg_meta_path, fs, input.inputs[1].GetValue(), metadata_compression_codec, skip_schema_inference); } else { throw InvalidInputException("Unknown argument type in IcebergScanBindReplace."); } } else { - snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec, skip_schema_inference); + snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_meta_path, fs, metadata_compression_codec, skip_schema_inference); } IcebergTable iceberg_table = IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths, metadata_compression_codec); @@ -277,6 +284,8 @@ TableFunctionSet IcebergFunctions::GetIcebergScanFunction() { fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; fun.named_parameters["mode"] = LogicalType::VARCHAR; fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; + fun.named_parameters["version"] = LogicalType::VARCHAR; + fun.named_parameters["version_name_format"] = LogicalType::VARCHAR; function_set.AddFunction(fun); fun = TableFunction({LogicalType::VARCHAR, LogicalType::UBIGINT}, nullptr, nullptr, @@ -286,6 +295,8 @@ TableFunctionSet IcebergFunctions::GetIcebergScanFunction() { fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; fun.named_parameters["mode"] = LogicalType::VARCHAR; fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; + fun.named_parameters["version"] = LogicalType::VARCHAR; + fun.named_parameters["version_name_format"] = LogicalType::VARCHAR; function_set.AddFunction(fun); fun = TableFunction({LogicalType::VARCHAR, LogicalType::TIMESTAMP}, nullptr, nullptr, @@ -295,6 +306,8 @@ TableFunctionSet IcebergFunctions::GetIcebergScanFunction() { fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; fun.named_parameters["mode"] = LogicalType::VARCHAR; fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; + fun.named_parameters["version"] = LogicalType::VARCHAR; + fun.named_parameters["version_name_format"] = LogicalType::VARCHAR; function_set.AddFunction(fun); return function_set; diff --git a/src/iceberg_functions/iceberg_snapshots.cpp b/src/iceberg_functions/iceberg_snapshots.cpp index 4dd5dd78..7d44d485 100644 --- a/src/iceberg_functions/iceberg_snapshots.cpp +++ b/src/iceberg_functions/iceberg_snapshots.cpp @@ -13,6 +13,8 @@ struct IcebergSnaphotsBindData : public TableFunctionData { IcebergSnaphotsBindData() {}; string filename; string metadata_compression_codec; + string table_version; + string version_name_format; bool skip_schema_inference = false; }; @@ -29,7 +31,10 @@ struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState auto global_state = make_uniq(); FileSystem &fs = FileSystem::GetFileSystem(context); - global_state->metadata_file = IcebergSnapshot::ReadMetaData(bind_data.filename, fs, bind_data.metadata_compression_codec); + + auto iceberg_meta_path = IcebergSnapshot::GetMetaDataPath( + bind_data.filename, fs, bind_data.metadata_compression_codec, bind_data.table_version, bind_data.version_name_format); + global_state->metadata_file = IcebergSnapshot::ReadMetaData(iceberg_meta_path, fs, bind_data.metadata_compression_codec); global_state->metadata_doc = yyjson_read(global_state->metadata_file.c_str(), global_state->metadata_file.size(), 0); auto root = yyjson_doc_get_root(global_state->metadata_doc); @@ -51,6 +56,8 @@ static unique_ptr IcebergSnapshotsBind(ClientContext &context, Tab string metadata_compression_codec = "none"; bool skip_schema_inference = false; + string table_version = DEFAULT_VERSION_HINT_FILE; + string version_name_format = DEFAULT_TABLE_VERSION_FORMAT; for (auto &kv : input.named_parameters) { auto loption = StringUtil::Lower(kv.first); @@ -58,11 +65,17 @@ static unique_ptr IcebergSnapshotsBind(ClientContext &context, Tab metadata_compression_codec = StringValue::Get(kv.second); } else if (loption == "skip_schema_inference") { skip_schema_inference = BooleanValue::Get(kv.second); + } else if (loption == "version") { + table_version = StringValue::Get(kv.second); + } else if (loption == "version_name_format") { + version_name_format = StringValue::Get(kv.second); } } bind_data->filename = input.inputs[0].ToString(); bind_data->metadata_compression_codec = metadata_compression_codec; bind_data->skip_schema_inference = skip_schema_inference; + bind_data->table_version = table_version; + bind_data->version_name_format = version_name_format; names.emplace_back("sequence_number"); return_types.emplace_back(LogicalType::UBIGINT); @@ -115,6 +128,8 @@ TableFunctionSet IcebergFunctions::GetIcebergSnapshotsFunction() { TableFunction table_function({LogicalType::VARCHAR}, IcebergSnapshotsFunction, IcebergSnapshotsBind, IcebergSnapshotGlobalTableFunctionState::Init); table_function.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; + table_function.named_parameters["version"] = LogicalType::VARCHAR; + table_function.named_parameters["version_name_format"] = LogicalType::VARCHAR; table_function.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; function_set.AddFunction(table_function); return function_set; diff --git a/src/include/iceberg_metadata.hpp b/src/include/iceberg_metadata.hpp index 43bc4e04..c56038c4 100644 --- a/src/include/iceberg_metadata.hpp +++ b/src/include/iceberg_metadata.hpp @@ -19,6 +19,8 @@ namespace duckdb { // First arg is version string, arg is either empty or ".gz" if gzip static string DEFAULT_TABLE_VERSION_FORMAT = "v%s%s.metadata.json"; +static string DEFAULT_VERSION_HINT_FILE = "version-hint.text"; + struct IcebergColumnDefinition { public: static IcebergColumnDefinition ParseFromJson(yyjson_val *val); @@ -64,19 +66,20 @@ class IcebergSnapshot { vector schema; string metadata_compression_codec = "none"; - static IcebergSnapshot GetLatestSnapshot(const string &path, FileSystem &fs, string GetSnapshotByTimestamp, bool skip_schema_inference); - static IcebergSnapshot GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id, string GetSnapshotByTimestamp, bool skip_schema_inference); - static IcebergSnapshot GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, string GetSnapshotByTimestamp, bool skip_schema_inference); + static IcebergSnapshot GetLatestSnapshot(const string &path, FileSystem &fs, string metadata_compression_codec, bool skip_schema_inference); + static IcebergSnapshot GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id, string metadata_compression_codec, bool skip_schema_inference); + static IcebergSnapshot GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, string metadata_compression_codec, bool skip_schema_inference); static IcebergSnapshot ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version, idx_t schema_id, vector &schemas, string metadata_compression_codec, bool skip_schema_inference); - static string ReadMetaData(const string &path, FileSystem &fs, string GetSnapshotByTimestamp); + static string GetMetaDataPath(const string &path, FileSystem &fs, string metadata_compression_codec, string table_version, string version_format); + static string ReadMetaData(const string &path, FileSystem &fs, string metadata_compression_codec); static yyjson_val *GetSnapshots(const string &path, FileSystem &fs, string GetSnapshotByTimestamp); static unique_ptr GetParseInfo(yyjson_doc &metadata_json); protected: //! Internal JSON parsing functions - static string GetTableVersion(const string &path, FileSystem &fs); + static string GetTableVersion(const string &path, FileSystem &fs, string version_format); static yyjson_val *FindLatestSnapshotInternal(yyjson_val *snapshots); static yyjson_val *FindSnapshotByIdInternal(yyjson_val *snapshots, idx_t target_id); static yyjson_val *FindSnapshotByIdTimestampInternal(yyjson_val *snapshots, timestamp_t timestamp); From 92c21954a730b656cf5c1bf82f6bd61ad9e3f56f Mon Sep 17 00:00:00 2001 From: Teague Sterling Date: Sat, 10 Aug 2024 15:11:22 -0700 Subject: [PATCH 4/8] Fixing version file formatting Signed-off-by: Teague Sterling --- src/common/iceberg.cpp | 17 ++++++++++++++--- src/include/iceberg_metadata.hpp | 3 ++- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/src/common/iceberg.cpp b/src/common/iceberg.cpp index 3f4db898..22984dc7 100644 --- a/src/common/iceberg.cpp +++ b/src/common/iceberg.cpp @@ -172,10 +172,21 @@ IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(const string &path, File string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, string &table_version, string &metadata_compression_codec, string &version_format = DEFAULT_TABLE_VERSION_FORMAT) { // TODO: Need to URL Encode table_version string compression_suffix = ""; + string url; if (metadata_compression_codec == "gzip") { compression_suffix = ".gz"; } - return fs.JoinPath(meta_path, StringUtil::Format(version_format, table_version, compression_suffix)); + for(auto try_format : StringUtil::Split(version_format, ',')) { + url = fs.JoinPath(meta_path, StringUtil::Format(try_format, table_version, compression_suffix)); + if(fs.FileExists(url)) { + return url; + } + } + + throw IOException( + "Iceberg metadata file could not be found for version '%s' using format(s) '%s'", + table_version, version_format + ); } @@ -186,8 +197,8 @@ string IcebergSnapshot::GetMetaDataPath(const string &path, FileSystem &fs, stri auto meta_path = fs.JoinPath(path, "metadata"); string version_hint; - if(StringUtil::EndsWith(table_version, ".txt")) { - version_hint = GetTableVersion(path, fs, version_format); + if(StringUtil::EndsWith(table_version, ".text")||StringUtil::EndsWith(table_version, ".txt")) { + version_hint = GetTableVersion(path, fs, table_version); } else { version_hint = table_version; } diff --git a/src/include/iceberg_metadata.hpp b/src/include/iceberg_metadata.hpp index c56038c4..d7d44782 100644 --- a/src/include/iceberg_metadata.hpp +++ b/src/include/iceberg_metadata.hpp @@ -17,7 +17,8 @@ using namespace duckdb_yyjson; namespace duckdb { // First arg is version string, arg is either empty or ".gz" if gzip -static string DEFAULT_TABLE_VERSION_FORMAT = "v%s%s.metadata.json"; +// Allows for both "v###.gz.metadata.json" and "###.metadata.json" styles +static string DEFAULT_TABLE_VERSION_FORMAT = "v%s%s.metadata.json,%s%s.metadata.json"; static string DEFAULT_VERSION_HINT_FILE = "version-hint.text"; From ff17250b218712bf37a090a40c7f92795153fb45 Mon Sep 17 00:00:00 2001 From: Teague Sterling Date: Sat, 10 Aug 2024 15:48:19 -0700 Subject: [PATCH 5/8] troubleshooting named_parameters Signed-off-by: Teague Sterling --- src/common/iceberg.cpp | 12 +++++++----- src/iceberg_functions/iceberg_metadata.cpp | 6 +++--- src/iceberg_functions/iceberg_snapshots.cpp | 6 +++--- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/common/iceberg.cpp b/src/common/iceberg.cpp index 22984dc7..80684c05 100644 --- a/src/common/iceberg.cpp +++ b/src/common/iceberg.cpp @@ -176,16 +176,19 @@ string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, string &tabl if (metadata_compression_codec == "gzip") { compression_suffix = ".gz"; } + string attempts = ""; for(auto try_format : StringUtil::Split(version_format, ',')) { url = fs.JoinPath(meta_path, StringUtil::Format(try_format, table_version, compression_suffix)); if(fs.FileExists(url)) { return url; + } else { + attempts = attempts + ", " + url; } } throw IOException( - "Iceberg metadata file could not be found for version '%s' using format(s) '%s'", - table_version, version_format + "Iceberg metadata file could not be found for version '%s' using %s and format(s) '%s'. Attempted: %s", + table_version, metadata_compression_codec, version_format, attempts ); } @@ -198,7 +201,7 @@ string IcebergSnapshot::GetMetaDataPath(const string &path, FileSystem &fs, stri auto meta_path = fs.JoinPath(path, "metadata"); string version_hint; if(StringUtil::EndsWith(table_version, ".text")||StringUtil::EndsWith(table_version, ".txt")) { - version_hint = GetTableVersion(path, fs, table_version); + version_hint = GetTableVersion(meta_path, fs, table_version); } else { version_hint = table_version; } @@ -240,8 +243,7 @@ IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t icebe return ret; } -string IcebergSnapshot::GetTableVersion(const string &path, FileSystem &fs, string version_file = DEFAULT_VERSION_HINT_FILE) { - auto meta_path = fs.JoinPath(path, "metadata"); +string IcebergSnapshot::GetTableVersion(const string &meta_path, FileSystem &fs, string version_file = DEFAULT_VERSION_HINT_FILE) { auto version_file_path = fs.JoinPath(meta_path, version_file); auto version_file_content = IcebergUtils::FileToString(version_file_path, fs); diff --git a/src/iceberg_functions/iceberg_metadata.cpp b/src/iceberg_functions/iceberg_metadata.cpp index d2664ac9..1ffff2c5 100644 --- a/src/iceberg_functions/iceberg_metadata.cpp +++ b/src/iceberg_functions/iceberg_metadata.cpp @@ -151,8 +151,8 @@ TableFunctionSet IcebergFunctions::GetIcebergMetadataFunction() { auto fun = TableFunction({LogicalType::VARCHAR}, IcebergMetaDataFunction, IcebergMetaDataBind, IcebergMetaDataGlobalTableFunctionState::Init); - fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; + fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; fun.named_parameters["version"] = LogicalType::VARCHAR; fun.named_parameters["version_name_format"] = LogicalType::VARCHAR; @@ -160,8 +160,8 @@ TableFunctionSet IcebergFunctions::GetIcebergMetadataFunction() { fun = TableFunction({LogicalType::VARCHAR, LogicalType::UBIGINT}, IcebergMetaDataFunction, IcebergMetaDataBind, IcebergMetaDataGlobalTableFunctionState::Init); - fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; + fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; fun.named_parameters["version"] = LogicalType::VARCHAR; fun.named_parameters["version_name_format"] = LogicalType::VARCHAR; @@ -169,8 +169,8 @@ TableFunctionSet IcebergFunctions::GetIcebergMetadataFunction() { fun = TableFunction({LogicalType::VARCHAR, LogicalType::TIMESTAMP}, IcebergMetaDataFunction, IcebergMetaDataBind, IcebergMetaDataGlobalTableFunctionState::Init); - fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; + fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; fun.named_parameters["version"] = LogicalType::VARCHAR; fun.named_parameters["version_name_format"] = LogicalType::VARCHAR; diff --git a/src/iceberg_functions/iceberg_snapshots.cpp b/src/iceberg_functions/iceberg_snapshots.cpp index 7d44d485..e6e40037 100644 --- a/src/iceberg_functions/iceberg_snapshots.cpp +++ b/src/iceberg_functions/iceberg_snapshots.cpp @@ -55,20 +55,20 @@ static unique_ptr IcebergSnapshotsBind(ClientContext &context, Tab auto bind_data = make_uniq(); string metadata_compression_codec = "none"; - bool skip_schema_inference = false; string table_version = DEFAULT_VERSION_HINT_FILE; string version_name_format = DEFAULT_TABLE_VERSION_FORMAT; + bool skip_schema_inference = false; for (auto &kv : input.named_parameters) { auto loption = StringUtil::Lower(kv.first); if (loption == "metadata_compression_codec") { metadata_compression_codec = StringValue::Get(kv.second); - } else if (loption == "skip_schema_inference") { - skip_schema_inference = BooleanValue::Get(kv.second); } else if (loption == "version") { table_version = StringValue::Get(kv.second); } else if (loption == "version_name_format") { version_name_format = StringValue::Get(kv.second); + } else if (loption == "skip_schema_inference") { + skip_schema_inference = BooleanValue::Get(kv.second); } } bind_data->filename = input.inputs[0].ToString(); From 5f13a6d6df28a9ae8fa4fceeea710f007c972a62 Mon Sep 17 00:00:00 2001 From: Teague Sterling Date: Sat, 10 Aug 2024 16:45:18 -0700 Subject: [PATCH 6/8] Updating error messages Signed-off-by: Teague Sterling --- src/common/iceberg.cpp | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/common/iceberg.cpp b/src/common/iceberg.cpp index 80684c05..6e639691 100644 --- a/src/common/iceberg.cpp +++ b/src/common/iceberg.cpp @@ -176,20 +176,15 @@ string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, string &tabl if (metadata_compression_codec == "gzip") { compression_suffix = ".gz"; } - string attempts = ""; for(auto try_format : StringUtil::Split(version_format, ',')) { url = fs.JoinPath(meta_path, StringUtil::Format(try_format, table_version, compression_suffix)); if(fs.FileExists(url)) { return url; - } else { - attempts = attempts + ", " + url; } } throw IOException( - "Iceberg metadata file could not be found for version '%s' using %s and format(s) '%s'. Attempted: %s", - table_version, metadata_compression_codec, version_format, attempts - ); + "Iceberg metadata file not found for table version '%s' using '%s' compression and format(s): '%s'", table_version, metadata_compression_codec, version_format); } From b5cb2816770343351db857ef2c6b45b02b1d6883 Mon Sep 17 00:00:00 2001 From: Teague Sterling Date: Sat, 10 Aug 2024 16:45:31 -0700 Subject: [PATCH 7/8] Updating test cases Signed-off-by: Teague Sterling --- test/sql/iceberg_metadata.test | 26 ++++++++++++++++++++++++-- test/sql/iceberg_scan.test | 12 +++++++++++- test/sql/iceberg_snapshots.test | 31 ++++++++++++++++++++++++++++--- 3 files changed, 63 insertions(+), 6 deletions(-) diff --git a/test/sql/iceberg_metadata.test b/test/sql/iceberg_metadata.test index 3238c1cd..2ee1fbd6 100644 --- a/test/sql/iceberg_metadata.test +++ b/test/sql/iceberg_metadata.test @@ -16,10 +16,32 @@ SELECT * FROM ICEBERG_METADATA('data/iceberg/lineitem_iceberg', ALLOW_MOVED_PATH lineitem_iceberg/metadata/10eaca8a-1e1c-421e-ad6d-b232e5ee23d3-m1.avro 2 DATA ADDED EXISTING lineitem_iceberg/data/00041-414-f3c73457-bbd6-4b92-9c15-17b241171b16-00001.parquet PARQUET 51793 lineitem_iceberg/metadata/10eaca8a-1e1c-421e-ad6d-b232e5ee23d3-m0.avro 2 DATA DELETED EXISTING lineitem_iceberg/data/00000-411-0792dcfe-4e25-4ca3-8ada-175286069a47-00001.parquet PARQUET 60175 +query IIIIIIII +SELECT * FROM ICEBERG_METADATA('data/iceberg/lineitem_iceberg', ALLOW_MOVED_PATHS=TRUE, version='1'); +---- +lineitem_iceberg/metadata/cf3d0be5-cf70-453d-ad8f-48fdc412e608-m0.avro 1 DATA ADDED EXISTING lineitem_iceberg/data/00000-411-0792dcfe-4e25-4ca3-8ada-175286069a47-00001.parquet PARQUET 60175 + +query IIIIIIII +SELECT * FROM ICEBERG_METADATA('data/iceberg/lineitem_iceberg', ALLOW_MOVED_PATHS=TRUE, version_name_format='v%s%s.metadata.json'); +---- +lineitem_iceberg/metadata/10eaca8a-1e1c-421e-ad6d-b232e5ee23d3-m1.avro 2 DATA ADDED EXISTING lineitem_iceberg/data/00041-414-f3c73457-bbd6-4b92-9c15-17b241171b16-00001.parquet PARQUET 51793 +lineitem_iceberg/metadata/10eaca8a-1e1c-421e-ad6d-b232e5ee23d3-m0.avro 2 DATA DELETED EXISTING lineitem_iceberg/data/00000-411-0792dcfe-4e25-4ca3-8ada-175286069a47-00001.parquet PARQUET 60175 + +query IIIIIIII +SELECT * FROM ICEBERG_METADATA('data/iceberg/lineitem_iceberg', ALLOW_MOVED_PATHS=TRUE, version='2', version_name_format='v%s%s.metadata.json'); +---- +lineitem_iceberg/metadata/10eaca8a-1e1c-421e-ad6d-b232e5ee23d3-m1.avro 2 DATA ADDED EXISTING lineitem_iceberg/data/00041-414-f3c73457-bbd6-4b92-9c15-17b241171b16-00001.parquet PARQUET 51793 +lineitem_iceberg/metadata/10eaca8a-1e1c-421e-ad6d-b232e5ee23d3-m0.avro 2 DATA DELETED EXISTING lineitem_iceberg/data/00000-411-0792dcfe-4e25-4ca3-8ada-175286069a47-00001.parquet PARQUET 60175 + statement error SELECT * FROM ICEBERG_METADATA('data/iceberg/lineitem_iceberg_gz', ALLOW_MOVED_PATHS=TRUE); ---- -IO Error: Cannot open file +IO Error: Iceberg metadata file not found for table version '2' using 'none' compression and format(s): 'v%s%s.metadata.json,%s%s.metadata.json' + +statement error +SELECT * FROM ICEBERG_METADATA('data/iceberg/lineitem_iceberg_gz', ALLOW_MOVED_PATHS=TRUE, METADATA_COMPRESSION_CODEC="blarg", version_name_format='blat%s%s'); +---- +IO Error: Iceberg metadata file not found for table version '2' using 'blarg' compression and format(s): 'blat%s%s' query IIIIIIII SELECT * FROM ICEBERG_METADATA('data/iceberg/lineitem_iceberg_gz', ALLOW_MOVED_PATHS=TRUE, METADATA_COMPRESSION_CODEC="gzip"); @@ -29,4 +51,4 @@ lineitem_iceberg_gz/metadata/23f9dbea-1e7f-4694-a82c-dc3c9a94953e-m0.avro 0 DATA statement error SELECT * FROM ICEBERG_METADATA('data/iceberg/lineitem_iceberg_nonexistent'); ---- -IO Error: Cannot open file \ No newline at end of file +IO Error: Cannot open file diff --git a/test/sql/iceberg_scan.test b/test/sql/iceberg_scan.test index 7847b53e..5d809660 100644 --- a/test/sql/iceberg_scan.test +++ b/test/sql/iceberg_scan.test @@ -57,9 +57,19 @@ IO Error: Could not find latest snapshots for timestamp 2023-02-15 15:07:54.503 statement error SELECT * FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_gz', ALLOW_MOVED_PATHS=TRUE); ---- -IO Error: Cannot open file +IO Error: Iceberg metadata file not found for table version '2' using 'none' compression and format(s): 'v%s%s.metadata.json,%s%s.metadata.json' query I SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_gz', ALLOW_MOVED_PATHS=TRUE, METADATA_COMPRESSION_CODEC="gzip"); ---- 111968 + +statement error +SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_gz', ALLOW_MOVED_PATHS=TRUE, METADATA_COMPRESSION_CODEC="gzip", version='1'); +---- +IO Error: No snapshots found + +query I +SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_gz', ALLOW_MOVED_PATHS=TRUE, METADATA_COMPRESSION_CODEC="gzip", version='2', version_name_format='v%s%s.metadata.json'); +---- +111968 diff --git a/test/sql/iceberg_snapshots.test b/test/sql/iceberg_snapshots.test index 39d29d83..f260c91a 100644 --- a/test/sql/iceberg_snapshots.test +++ b/test/sql/iceberg_snapshots.test @@ -18,6 +18,26 @@ SELECT * FROM ICEBERG_SNAPSHOTS('data/iceberg/lineitem_iceberg'); 1 3776207205136740581 2023-02-15 15:07:54.504 lineitem_iceberg/metadata/snap-3776207205136740581-1-cf3d0be5-cf70-453d-ad8f-48fdc412e608.avro 2 7635660646343998149 2023-02-15 15:08:14.73 lineitem_iceberg/metadata/snap-7635660646343998149-1-10eaca8a-1e1c-421e-ad6d-b232e5ee23d3.avro +query IIII +SELECT * FROM ICEBERG_SNAPSHOTS('data/iceberg/lineitem_iceberg', version='1'); +---- +1 3776207205136740581 2023-02-15 15:07:54.504 lineitem_iceberg/metadata/snap-3776207205136740581-1-cf3d0be5-cf70-453d-ad8f-48fdc412e608.avro + +statement error +SELECT * FROM ICEBERG_SNAPSHOTS('data/iceberg/lineitem_iceberg', version="1", version_name_format='v%s%s.metadata.gz'); +---- +IO Error: Iceberg metadata file not found for table version '1' using 'none' compression and format(s): 'v%s%s.metadata.gz' + +query IIII +SELECT * FROM ICEBERG_SNAPSHOTS('data/iceberg/lineitem_iceberg', version="1", version_name_format='v%s%s.metadata.json'); +---- +1 3776207205136740581 2023-02-15 15:07:54.504 lineitem_iceberg/metadata/snap-3776207205136740581-1-cf3d0be5-cf70-453d-ad8f-48fdc412e608.avro + +query IIII +SELECT * FROM ICEBERG_SNAPSHOTS('data/iceberg/lineitem_iceberg', version='1'); +---- +1 3776207205136740581 2023-02-15 15:07:54.504 lineitem_iceberg/metadata/snap-3776207205136740581-1-cf3d0be5-cf70-453d-ad8f-48fdc412e608.avro + statement error SELECT * FROM ICEBERG_SNAPSHOTS('data/iceberg/lineitem_iceberg_nonexistent'); ---- @@ -26,9 +46,14 @@ IO Error: Cannot open file "data/iceberg/lineitem_iceberg_nonexistent/metadata/v statement error SELECT * FROM ICEBERG_SNAPSHOTS('data/iceberg/lineitem_iceberg_gz'); ---- -IO Error: Cannot open file +IO Error: Iceberg metadata file not found for table version '2' using 'none' compression and format(s): 'v%s%s.metadata.json,%s%s.metadata.json' + +query IIII +SELECT * FROM ICEBERG_SNAPSHOTS('data/iceberg/lineitem_iceberg_gz', metadata_compression_codec="gzip"); +---- +0 4468019210336628573 2024-03-13 18:38:58.602 lineitem_iceberg_gz/metadata/snap-4468019210336628573-1-23f9dbea-1e7f-4694-a82c-dc3c9a94953e.avro query IIII -SELECT * FROM ICEBERG_SNAPSHOTS('data/iceberg/lineitem_iceberg_gz', METADATA_COMPRESSION_CODEC="gzip"); +SELECT * FROM ICEBERG_SNAPSHOTS('data/iceberg/lineitem_iceberg_gz', metadata_compression_codec="gzip", version='2'); ---- -0 4468019210336628573 2024-03-13 18:38:58.602 lineitem_iceberg_gz/metadata/snap-4468019210336628573-1-23f9dbea-1e7f-4694-a82c-dc3c9a94953e.avro \ No newline at end of file +0 4468019210336628573 2024-03-13 18:38:58.602 lineitem_iceberg_gz/metadata/snap-4468019210336628573-1-23f9dbea-1e7f-4694-a82c-dc3c9a94953e.avro From e74e77366670746fae7b2c7987e3aba433026f2a Mon Sep 17 00:00:00 2001 From: Teague Sterling Date: Sat, 10 Aug 2024 17:19:58 -0700 Subject: [PATCH 8/8] Adding additional test Signed-off-by: Teague Sterling --- test/sql/iceberg_scan.test | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/sql/iceberg_scan.test b/test/sql/iceberg_scan.test index 5d809660..4f84bb79 100644 --- a/test/sql/iceberg_scan.test +++ b/test/sql/iceberg_scan.test @@ -30,6 +30,12 @@ SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg', 7635660646343 ---- 51793 +# Scanning 2nd snapshot +query I +SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg', version='2', ALLOW_MOVED_PATHS=TRUE); +---- +51793 + # Scanning latest snapshot at specific moment in time # note in the data we have: # 1 = 2023-02-15 15:07:54.504