Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/include/metadata_manager/postgres_metadata_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,26 @@ class PostgresMetadataManager : public DuckLakeMetadataManager {

unique_ptr<QueryResult> Query(DuckLakeSnapshot snapshot, string &query) override;

idx_t AllocateNextSnapshotId(idx_t current_snapshot_id) override;
idx_t AllocateNextCatalogId(idx_t current_next_catalog_id) override;
idx_t AllocateNextFileId(idx_t current_next_file_id) override;
idx_t AllocateNextSchemaVersion(idx_t current_schema_version) override;
void AcquireCommitLock() override;

void EnsureIdSequences();

protected:
string GetLatestSnapshotQuery() const override;

private:
unique_ptr<QueryResult> ExecuteQuery(DuckLakeSnapshot snapshot, string &query, string command);

idx_t FetchScalarSequenceValue(const string &seq_name);

// classid half is hashtext(schema) so multiple DuckLake catalogs on one
// pg instance do not share the key.
static constexpr int32_t DUCKLAKE_COMMIT_ADVISORY_SUBKEY = 0x44754C4B;
optional_idx commit_lock_classid;
};

} // namespace duckdb
5 changes: 4 additions & 1 deletion src/include/storage/ducklake_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,10 @@ class DuckLakeCatalog : public Catalog {

void SetCommittedSnapshotId(idx_t value) {
lock_guard<mutex> guard(commit_lock);
last_committed_snapshot = value;
// Max-update: sequence allocation decoupled from commit order.
if (!last_committed_snapshot.IsValid() || value > last_committed_snapshot.GetIndex()) {
last_committed_snapshot = value;
}
}

Value GetLastCommittedSnapshotId() const {
Expand Down
3 changes: 3 additions & 0 deletions src/include/storage/ducklake_metadata_info.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ struct DuckLakeGlobalStatsInfo {
idx_t record_count;
idx_t next_row_id;
idx_t table_size_bytes;
// CAS predicate / target. Post-verified in FlushChanges.
idx_t stats_version = 0;
idx_t new_stats_version = 0;
vector<DuckLakeGlobalColumnStatsInfo> column_stats;
};

Expand Down
11 changes: 11 additions & 0 deletions src/include/storage/ducklake_metadata_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,17 @@ class DuckLakeMetadataManager {
virtual string WriteMergeAdjacent(const vector<DuckLakeCompactedFileInfo> &compactions);
virtual string WriteDeleteRewrites(const vector<DuckLakeCompactedFileInfo> &compactions);
virtual string WriteCompactions(const vector<DuckLakeCompactedFileInfo> &compactions, CompactionType type);
virtual idx_t AllocateNextSnapshotId(idx_t current_snapshot_id);
virtual idx_t AllocateNextCatalogId(idx_t current_next_catalog_id);
virtual idx_t AllocateNextFileId(idx_t current_next_file_id);
virtual idx_t AllocateNextSchemaVersion(idx_t current_schema_version) {
return current_schema_version + 1;
}
// Backends with non-transactional id allocation MUST override: serialise
// allocation, INSERT, and COMMIT, or break MAX(snapshot_id)-as-horizon.
virtual void AcquireCommitLock() {
}

virtual string InsertSnapshot();
virtual string WriteSnapshotChanges(const SnapshotChangeInfo &change_info,
const DuckLakeSnapshotCommit &commit_info);
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/ducklake_transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,8 @@ class DuckLakeTransaction : public Transaction, public enable_shared_from_this<D
vector<DuckLakeOverwrittenDeleteFile> &overwritten_delete_files) const;
DuckLakeDeleteFileInfo GetNewDeleteFile(TableIndex table_id, const DuckLakeCommitState &commit_state,
const DuckLakeDeleteFile &file) const;
string UpdateGlobalTableStats(TableIndex table_id, const DuckLakeNewGlobalStats &new_stats);
string UpdateGlobalTableStats(TableIndex table_id, const DuckLakeNewGlobalStats &new_stats,
idx_t expected_stats_version, idx_t new_stats_version);
SnapshotAndStats CheckForConflicts(DuckLakeSnapshot transaction_snapshot,
const TransactionChangeInformation &changes);
void CheckForConflicts(const TransactionChangeInformation &changes, const SnapshotChangeInformation &other_changes,
Expand Down
136 changes: 136 additions & 0 deletions src/metadata_manager/postgres_metadata_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,142 @@ string PostgresMetadataManager::GetLatestSnapshotQuery() const {
)";
}

idx_t PostgresMetadataManager::FetchScalarSequenceValue(const string &seq_name) {
DuckLakeSnapshot dummy {0, 0, 0, 0};
string query = "SELECT nextval('{METADATA_SCHEMA_ESCAPED}." + seq_name + "')";
auto result = Query(dummy, query);
if (result->HasError()) {
result->GetErrorObject().Throw("Failed to allocate next value from " + seq_name + ": ");
}
auto chunk = result->Fetch();
if (!chunk || chunk->size() == 0) {
throw InternalException("ducklake: %s returned no value from nextval()", seq_name);
}
auto v = chunk->data[0].GetValue(0).GetValue<int64_t>();
if (v < 0) {
throw InternalException("ducklake: %s returned negative value: %lld", seq_name, (long long)v);
}
return static_cast<idx_t>(v);
}

idx_t PostgresMetadataManager::AllocateNextSnapshotId(idx_t /*advisory*/) {
return FetchScalarSequenceValue("ducklake_snapshot_id_seq");
}

idx_t PostgresMetadataManager::AllocateNextCatalogId(idx_t /*advisory*/) {
return FetchScalarSequenceValue("ducklake_catalog_id_seq");
}

idx_t PostgresMetadataManager::AllocateNextFileId(idx_t /*advisory*/) {
return FetchScalarSequenceValue("ducklake_file_id_seq");
}

idx_t PostgresMetadataManager::AllocateNextSchemaVersion(idx_t /*advisory*/) {
// Sequence-allocated: catalog cache keys on schema_version; collisions from
// concurrent commits cause stale-cache reuse across transactions.
return FetchScalarSequenceValue("ducklake_schema_version_seq");
}

void PostgresMetadataManager::AcquireCommitLock() {
DuckLakeSnapshot dummy {};
if (!commit_lock_classid.IsValid()) {
string probe = "SELECT hashtext({METADATA_CATALOG_NAME_LITERAL})::int4";
auto probe_result = Query(dummy, probe);
if (probe_result->HasError()) {
probe_result->GetErrorObject().Throw(
"concurrent: failed to compute DuckLake commit lock classid: ");
}
auto chunk = probe_result->Fetch();
if (!chunk || chunk->size() == 0) {
throw InternalException("ducklake: hashtext probe returned no row");
}
commit_lock_classid = static_cast<idx_t>(
static_cast<uint32_t>(chunk->data[0].GetValue(0).GetValue<int32_t>()));
}

string set_timeout = "SET LOCAL lock_timeout = '30s'";
auto timeout_res = Execute(dummy, set_timeout);
if (timeout_res->HasError()) {
timeout_res->GetErrorObject().Throw("concurrent: failed to set lock_timeout: ");
}

// "concurrent:" prefix -> RetryOnError matches, transient pg error retries.
string query = "SELECT pg_advisory_xact_lock(" +
std::to_string(static_cast<int32_t>(commit_lock_classid.GetIndex())) + ", " +
std::to_string(DUCKLAKE_COMMIT_ADVISORY_SUBKEY) + ")";
auto result = Execute(dummy, query);
if (result->HasError()) {
result->GetErrorObject().Throw("concurrent: DuckLake commit serialisation lock failed: ");
}
}

void PostgresMetadataManager::EnsureIdSequences() {
// One statement per call: postgres_execute drops all but the first of a batch.
DuckLakeSnapshot dummy {0, 0, 0, 0};

auto run = [&](string query) {
auto result = Execute(dummy, query);
if (result->HasError()) {
result->GetErrorObject().Throw("Failed to ensure DuckLake id sequences: ");
}
};

// file_id_seq needs MINVALUE 0: bootstrap next_file_id=0 and first allocation must return 0.
// CACHE > 1 breaks MAX(snapshot_id)-as-horizon (pre-backend-cached nextval).
run("CREATE SEQUENCE IF NOT EXISTS {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot_id_seq CACHE 1");
run("CREATE SEQUENCE IF NOT EXISTS {METADATA_SCHEMA_ESCAPED}.ducklake_catalog_id_seq CACHE 1");
run("CREATE SEQUENCE IF NOT EXISTS {METADATA_SCHEMA_ESCAPED}.ducklake_file_id_seq MINVALUE 0 START WITH 0 CACHE 1");
run("CREATE SEQUENCE IF NOT EXISTS {METADATA_SCHEMA_ESCAPED}.ducklake_schema_version_seq CACHE 1");
run(R"(SELECT setval(
'{METADATA_SCHEMA_ESCAPED}.ducklake_snapshot_id_seq',
GREATEST(
(SELECT last_value FROM {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot_id_seq),
GREATEST(1, COALESCE((SELECT MAX(snapshot_id) FROM {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot), 0))
),
COALESCE((SELECT MAX(snapshot_id) FROM {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot), 1) >= 1
))");
run(R"(SELECT setval(
'{METADATA_SCHEMA_ESCAPED}.ducklake_catalog_id_seq',
GREATEST(
(SELECT last_value FROM {METADATA_SCHEMA_ESCAPED}.ducklake_catalog_id_seq),
GREATEST(1, COALESCE((SELECT MAX(next_catalog_id) - 1 FROM {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot), 0))
),
COALESCE((SELECT MAX(next_catalog_id) - 1 FROM {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot), 0) >= 1
))");
run(R"(SELECT setval(
'{METADATA_SCHEMA_ESCAPED}.ducklake_file_id_seq',
GREATEST(
(SELECT last_value FROM {METADATA_SCHEMA_ESCAPED}.ducklake_file_id_seq),
GREATEST(0, COALESCE((SELECT MAX(next_file_id) - 1 FROM {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot), 0))
),
COALESCE((SELECT MAX(next_file_id) - 1 FROM {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot), 0) >= 1
))");
run(R"(SELECT setval(
'{METADATA_SCHEMA_ESCAPED}.ducklake_schema_version_seq',
GREATEST(
(SELECT last_value FROM {METADATA_SCHEMA_ESCAPED}.ducklake_schema_version_seq),
GREATEST(1, COALESCE((SELECT MAX(schema_version) FROM {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot), 0))
),
COALESCE((SELECT MAX(schema_version) FROM {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot), 1) >= 1
))");

// Replace the PK-violation signal the sequence allocator eliminates.
run("CREATE UNIQUE INDEX IF NOT EXISTS ducklake_schema_name_active_uidx "
"ON {METADATA_SCHEMA_ESCAPED}.ducklake_schema (schema_name) "
"WHERE end_snapshot IS NULL");
run("CREATE UNIQUE INDEX IF NOT EXISTS ducklake_table_name_active_uidx "
"ON {METADATA_SCHEMA_ESCAPED}.ducklake_table (schema_id, table_name) "
"WHERE end_snapshot IS NULL");
run("CREATE UNIQUE INDEX IF NOT EXISTS ducklake_view_name_active_uidx "
"ON {METADATA_SCHEMA_ESCAPED}.ducklake_view (schema_id, view_name) "
"WHERE end_snapshot IS NULL");
// One live delete_file per data_file: without it, concurrent DELETEs
// double-count rows by applying both delete masks as independent variants.
run("CREATE UNIQUE INDEX IF NOT EXISTS ducklake_delete_file_active_uidx "
"ON {METADATA_SCHEMA_ESCAPED}.ducklake_delete_file (data_file_id) "
"WHERE end_snapshot IS NULL");
}

// We need a specialized function here to do a reinterpret for postgres from BLOB to VARCHAR
shared_ptr<DuckLakeInlinedData>
PostgresMetadataManager::TransformInlinedData(QueryResult &result, const vector<LogicalType> &expected_types) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/ducklake_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ idx_t DuckLakeCatalog::GetBeginSnapshotForSchemaVersion(TableIndex table_id, idx
shared_ptr<DuckLakeSchemaCacheEntry> DuckLakeCatalog::GetSchemaCacheEntry(DuckLakeTransaction &transaction,
DuckLakeSnapshot snapshot) {
auto &cache = GetObjectCacheInstance();
auto key = SchemaCacheKey(snapshot.schema_version);
auto key = SchemaCacheKey(snapshot.snapshot_id);
auto cached = cache.Get<DuckLakeSchemaCacheEntry>(key);
if (cached) {
return cached;
Expand Down
18 changes: 17 additions & 1 deletion src/storage/ducklake_initializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,22 @@ string DuckLakeInitializer::GetAttachOptions() {
throw InternalException("Unsupported access mode in DuckLake attach");
}
}
bool has_isolation_level_override = false;
for (auto &option : options.metadata_parameters) {
attach_options.push_back(option.first + " " + option.second.ToSQLString());
if (StringUtil::Lower(option.first) == "isolation_level") {
has_isolation_level_override = true;
}
}
const string metadata_type = catalog.MetadataType();
if (metadata_type.empty() || metadata_type == "duckdb") {
// this is duckdb, we always do latest storage
attach_options.push_back(StringUtil::Format("STORAGE_VERSION '%s'", "latest"));
} else if ((metadata_type == "postgres" || metadata_type == "postgres_scanner") &&
!has_isolation_level_override) {
// REPEATABLE READ pins one pg snapshot per DuckLake tx, hiding concurrent
// committers from CheckForConflicts. Override via meta_isolation_level.
attach_options.push_back("isolation_level 'read committed'");
}

if (attach_options.empty()) {
Expand Down Expand Up @@ -159,6 +168,9 @@ void DuckLakeInitializer::InitializeNewDuckLake(DuckLakeTransaction &transaction
SetVersionedMetadataManager(transaction, version);
auto &metadata_manager = transaction.GetMetadataManager();
metadata_manager.InitializeDuckLake(has_explicit_schema, catalog.Encryption());
if (auto *pg_mgr = dynamic_cast<PostgresMetadataManager *>(&metadata_manager)) {
pg_mgr->EnsureIdSequences();
}
if (catalog.Encryption() == DuckLakeEncryption::AUTOMATIC) {
// default to unencrypted
catalog.SetEncryption(DuckLakeEncryption::UNENCRYPTED);
Expand Down Expand Up @@ -256,7 +268,11 @@ void DuckLakeInitializer::LoadExistingDuckLake(DuckLakeTransaction &transaction)
for (auto &entry : metadata.table_settings) {
options.table_options[entry.table_id][entry.tag.key] = entry.tag.value;
}
// set correct version metadata manager
if (options.access_mode != AccessMode::READ_ONLY) {
if (auto *pg_mgr = dynamic_cast<PostgresMetadataManager *>(&transaction.GetMetadataManager())) {
pg_mgr->EnsureIdSequences();
}
}
if (resolved_version != DuckLakeVersion::UNSET) {
SetVersionedMetadataManager(transaction, resolved_version);
}
Expand Down
Loading
Loading