Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions src/functions/ducklake_add_data_files.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ struct DuckLakeFileProcessor {
};

void DuckLakeFileProcessor::ReadParquetFullMetadata(const string &glob, vector<DuckLakeDataFile> &written_files) {
auto result = transaction.Query(StringUtil::Format(R"(
auto result = result_or_throw(
transaction.Query(StringUtil::Format(R"(
SELECT
list_transform(parquet_file_metadata, lambda x: struct_pack(
file_name := x.file_name,
Expand Down Expand Up @@ -198,10 +199,8 @@ SELECT
)) AS parquet_schema
FROM parquet_full_metadata(%s)
)",
SQLString(glob)));
if (result->HasError()) {
result->GetErrorObject().Throw("Failed to add data files to DuckLake: ");
}
SQLString(glob))),
"Failed to add data files to DuckLake: ");

for (auto &row : *result) {
auto &chunk = row.GetChunk();
Expand Down
9 changes: 4 additions & 5 deletions src/functions/ducklake_flush_inlined_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,8 @@ static void FlushInlinedFileDeletions(ClientContext &context, DuckLakeCatalog &c
}

// Query the inlined deletions with file paths and existing delete file info
auto deletions_result = transaction.Query(snapshot, StringUtil::Format(R"(
auto deletions_result = result_or_throw(
transaction.Query(snapshot, StringUtil::Format(R"(
SELECT del.file_id, data.path, data.path_is_relative, del.row_id, del.begin_snapshot,
existing_del.delete_file_id, existing_del.path as del_path, existing_del.path_is_relative as del_path_is_relative,
existing_del.begin_snapshot as del_begin_snapshot, existing_del.encryption_key as del_encryption_key,
Expand All @@ -442,10 +443,8 @@ LEFT JOIN (
AND ({SNAPSHOT_ID} < end_snapshot OR end_snapshot IS NULL)
) existing_del ON del.file_id = existing_del.data_file_id
)",
inlined_table_name, table_id.index));
if (deletions_result->HasError()) {
deletions_result->GetErrorObject().Throw("Failed to query inlined file deletions for flush: ");
}
inlined_table_name, table_id.index)),
"Failed to query inlined file deletions for flush: ");

unordered_map<idx_t, FileDeleteInfo> files_to_flush;

Expand Down
33 changes: 33 additions & 0 deletions src/include/storage/ducklake_transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
#include "common/ducklake_data_file.hpp"
#include "common/ducklake_snapshot.hpp"
#include "duckdb/common/case_insensitive_map.hpp"
#include "duckdb/common/exception.hpp"
#include "duckdb/common/types/value_map.hpp"
#include "duckdb/main/connection.hpp"
#include "duckdb/main/query_result.hpp"
#include "duckdb/transaction/transaction.hpp"
#include <utility>
#include "storage/ducklake_catalog_set.hpp"
#include "storage/ducklake_inlined_data.hpp"
#include "storage/ducklake_metadata_manager.hpp"
Expand All @@ -41,6 +44,36 @@ struct DuckLakeCommitState;
class DuckLakeFieldId;
class LocalTableChangeIterationHelper;

inline unique_ptr<QueryResult> result_or_throw(unique_ptr<QueryResult> result, const string &err) {
if (result->HasError()) {
result->GetErrorObject().Throw(err);
}
return result;
}

inline void execute_or_throw(unique_ptr<QueryResult> result, const string &err) {
(void)result_or_throw(std::move(result), err);
}

template <class T>
T query_scalar_or_throw(unique_ptr<QueryResult> result, const string &err) {
auto checked_result = result_or_throw(std::move(result), err);
auto chunk = checked_result->Fetch();
if (!chunk || chunk->size() == 0) {
throw InvalidInputException("Expected scalar result");
}
return chunk->GetValue(0, 0).template GetValue<T>();
}

template <class T>
T query_scalar_or_zero(unique_ptr<QueryResult> result, const string &err) {
auto checked_result = result_or_throw(std::move(result), err);
for (auto &row : *checked_result) {
return row.template GetValue<T>(0);
}
return T {};
}

struct FlushedInlinedTableInfo {
DuckLakeInlinedTableInfo inlined_table;
idx_t flush_snapshot_id;
Expand Down
16 changes: 7 additions & 9 deletions src/storage/ducklake_initializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,13 @@ string DuckLakeInitializer::GetAttachOptions() {
void DuckLakeInitializer::Initialize() {
auto &transaction = DuckLakeTransaction::Get(context, catalog);
// attach the metadata database
auto result = transaction.Query("ATTACH OR REPLACE {METADATA_PATH} AS {METADATA_CATALOG_NAME_IDENTIFIER}" +
GetAttachOptions());
if (result->HasError()) {
auto &error_obj = result->GetErrorObject();
error_obj.Throw("Failed to attach DuckLake MetaData \"" + catalog.MetadataDatabaseName() + "\" at path + \"" +
catalog.MetadataPath() + "\"");
}
execute_or_throw(
transaction.Query("ATTACH OR REPLACE {METADATA_PATH} AS {METADATA_CATALOG_NAME_IDENTIFIER}" +
GetAttachOptions()),
"Failed to attach DuckLake MetaData \"" + catalog.MetadataDatabaseName() + "\" at path + \"" +
catalog.MetadataPath() + "\"");
// explicitly load all secrets - work-around to secret initialization bug
transaction.Query("FROM duckdb_secrets()");
execute_or_throw(transaction.Query("FROM duckdb_secrets()"), "Failed to load DuckDB secrets");

bool has_explicit_schema = !options.metadata_schema.empty();
if (options.metadata_schema.empty()) {
Expand All @@ -94,7 +92,7 @@ void DuckLakeInitializer::Initialize() {
// directly query a known ducklake metadata table to avoid scanning all attached catalogs via duckdb_tables()
// this prevents a corrupted ducklake catalog from blocking initialization of unrelated ducklake databases
// FIXME: verify that all ducklake tables are in the correct format
result = transaction.Query("SELECT NULL FROM {METADATA_CATALOG}.ducklake_metadata LIMIT 1");
auto result = transaction.Query("SELECT NULL FROM {METADATA_CATALOG}.ducklake_metadata LIMIT 1");
if (result->HasError()) {
auto &error_obj = result->GetErrorObject();
if (error_obj.Type() == ExceptionType::CATALOG) {
Expand Down
15 changes: 8 additions & 7 deletions src/storage/ducklake_inlined_data_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,25 +105,26 @@ bool DuckLakeInlinedDataReader::TryInitializeScan(ClientContext &context, Global
virtual_columns.push_back(InlinedVirtualColumn::NONE);
}
}
unique_ptr<QueryResult> query_result;
unique_ptr<QueryResult> scan_result;
switch (read_info.scan_type) {
case DuckLakeScanType::SCAN_TABLE:
query_result = metadata_manager.ReadInlinedData(read_info.snapshot, table_name, columns_to_read);
scan_result = metadata_manager.ReadInlinedData(read_info.snapshot, table_name, columns_to_read);
break;
case DuckLakeScanType::SCAN_INSERTIONS:
query_result = metadata_manager.ReadInlinedDataInsertions(*read_info.start_snapshot, read_info.snapshot,
table_name, columns_to_read);
scan_result = metadata_manager.ReadInlinedDataInsertions(*read_info.start_snapshot, read_info.snapshot,
table_name, columns_to_read);
break;
case DuckLakeScanType::SCAN_DELETIONS:
query_result = metadata_manager.ReadInlinedDataDeletions(*read_info.start_snapshot, read_info.snapshot,
table_name, columns_to_read);
scan_result = metadata_manager.ReadInlinedDataDeletions(*read_info.start_snapshot, read_info.snapshot,
table_name, columns_to_read);
break;
case DuckLakeScanType::SCAN_FOR_FLUSH:
query_result = metadata_manager.ReadAllInlinedDataForFlush(read_info.snapshot, table_name, columns_to_read);
scan_result = metadata_manager.ReadAllInlinedDataForFlush(read_info.snapshot, table_name, columns_to_read);
break;
default:
throw InternalException("Unknown DuckLake scan type");
}
auto query_result = result_or_throw(std::move(scan_result), "Failed to read inlined data from DuckLake: ");
data = metadata_manager.TransformInlinedData(*query_result, expected_types);
if (!virtual_columns.empty()) {
auto scan_types = data->data->Types();
Expand Down
Loading
Loading