diff --git a/src/include/metadata_manager/postgres_metadata_manager.hpp b/src/include/metadata_manager/postgres_metadata_manager.hpp index aad3cd7ced9..aa06bc445c9 100644 --- a/src/include/metadata_manager/postgres_metadata_manager.hpp +++ b/src/include/metadata_manager/postgres_metadata_manager.hpp @@ -37,11 +37,26 @@ class PostgresMetadataManager : public DuckLakeMetadataManager { unique_ptr Query(DuckLakeSnapshot snapshot, string &query) override; + idx_t AllocateNextSnapshotId(idx_t current_snapshot_id) override; + idx_t AllocateNextCatalogId(idx_t current_next_catalog_id) override; + idx_t AllocateNextFileId(idx_t current_next_file_id) override; + idx_t AllocateNextSchemaVersion(idx_t current_schema_version) override; + void AcquireCommitLock() override; + + void EnsureIdSequences(); + protected: string GetLatestSnapshotQuery() const override; private: unique_ptr ExecuteQuery(DuckLakeSnapshot snapshot, string &query, string command); + + idx_t FetchScalarSequenceValue(const string &seq_name); + + // classid half is hashtext(schema) so multiple DuckLake catalogs on one + // pg instance do not share the key. + static constexpr int32_t DUCKLAKE_COMMIT_ADVISORY_SUBKEY = 0x44754C4B; + optional_idx commit_lock_classid; }; } // namespace duckdb diff --git a/src/include/storage/ducklake_catalog.hpp b/src/include/storage/ducklake_catalog.hpp index f3d5cb014ab..0128e4dd31f 100644 --- a/src/include/storage/ducklake_catalog.hpp +++ b/src/include/storage/ducklake_catalog.hpp @@ -214,7 +214,10 @@ class DuckLakeCatalog : public Catalog { void SetCommittedSnapshotId(idx_t value) { lock_guard guard(commit_lock); - last_committed_snapshot = value; + // Max-update: sequence allocation decoupled from commit order. + if (!last_committed_snapshot.IsValid() || value > last_committed_snapshot.GetIndex()) { + last_committed_snapshot = value; + } } Value GetLastCommittedSnapshotId() const { diff --git a/src/include/storage/ducklake_metadata_info.hpp b/src/include/storage/ducklake_metadata_info.hpp index 000a71faaaf..b3c093e1d10 100644 --- a/src/include/storage/ducklake_metadata_info.hpp +++ b/src/include/storage/ducklake_metadata_info.hpp @@ -282,6 +282,9 @@ struct DuckLakeGlobalStatsInfo { idx_t record_count; idx_t next_row_id; idx_t table_size_bytes; + // CAS predicate / target. Post-verified in FlushChanges. + idx_t stats_version = 0; + idx_t new_stats_version = 0; vector column_stats; }; diff --git a/src/include/storage/ducklake_metadata_manager.hpp b/src/include/storage/ducklake_metadata_manager.hpp index 596e221fc7f..208f697bc2f 100644 --- a/src/include/storage/ducklake_metadata_manager.hpp +++ b/src/include/storage/ducklake_metadata_manager.hpp @@ -211,6 +211,17 @@ class DuckLakeMetadataManager { virtual string WriteMergeAdjacent(const vector &compactions); virtual string WriteDeleteRewrites(const vector &compactions); virtual string WriteCompactions(const vector &compactions, CompactionType type); + virtual idx_t AllocateNextSnapshotId(idx_t current_snapshot_id); + virtual idx_t AllocateNextCatalogId(idx_t current_next_catalog_id); + virtual idx_t AllocateNextFileId(idx_t current_next_file_id); + virtual idx_t AllocateNextSchemaVersion(idx_t current_schema_version) { + return current_schema_version + 1; + } + // Backends with non-transactional id allocation MUST override: serialise + // allocation, INSERT, and COMMIT, or break MAX(snapshot_id)-as-horizon. + virtual void AcquireCommitLock() { + } + virtual string InsertSnapshot(); virtual string WriteSnapshotChanges(const SnapshotChangeInfo &change_info, const DuckLakeSnapshotCommit &commit_info); diff --git a/src/include/storage/ducklake_transaction.hpp b/src/include/storage/ducklake_transaction.hpp index 015130a411d..1867dac6174 100644 --- a/src/include/storage/ducklake_transaction.hpp +++ b/src/include/storage/ducklake_transaction.hpp @@ -316,7 +316,8 @@ class DuckLakeTransaction : public Transaction, public enable_shared_from_this &overwritten_delete_files) const; DuckLakeDeleteFileInfo GetNewDeleteFile(TableIndex table_id, const DuckLakeCommitState &commit_state, const DuckLakeDeleteFile &file) const; - string UpdateGlobalTableStats(TableIndex table_id, const DuckLakeNewGlobalStats &new_stats); + string UpdateGlobalTableStats(TableIndex table_id, const DuckLakeNewGlobalStats &new_stats, + idx_t expected_stats_version, idx_t new_stats_version); SnapshotAndStats CheckForConflicts(DuckLakeSnapshot transaction_snapshot, const TransactionChangeInformation &changes); void CheckForConflicts(const TransactionChangeInformation &changes, const SnapshotChangeInformation &other_changes, diff --git a/src/metadata_manager/postgres_metadata_manager.cpp b/src/metadata_manager/postgres_metadata_manager.cpp index 6c5150b4edf..ac74e2b99a8 100644 --- a/src/metadata_manager/postgres_metadata_manager.cpp +++ b/src/metadata_manager/postgres_metadata_manager.cpp @@ -127,6 +127,142 @@ string PostgresMetadataManager::GetLatestSnapshotQuery() const { )"; } +idx_t PostgresMetadataManager::FetchScalarSequenceValue(const string &seq_name) { + DuckLakeSnapshot dummy {0, 0, 0, 0}; + string query = "SELECT nextval('{METADATA_SCHEMA_ESCAPED}." + seq_name + "')"; + auto result = Query(dummy, query); + if (result->HasError()) { + result->GetErrorObject().Throw("Failed to allocate next value from " + seq_name + ": "); + } + auto chunk = result->Fetch(); + if (!chunk || chunk->size() == 0) { + throw InternalException("ducklake: %s returned no value from nextval()", seq_name); + } + auto v = chunk->data[0].GetValue(0).GetValue(); + if (v < 0) { + throw InternalException("ducklake: %s returned negative value: %lld", seq_name, (long long)v); + } + return static_cast(v); +} + +idx_t PostgresMetadataManager::AllocateNextSnapshotId(idx_t /*advisory*/) { + return FetchScalarSequenceValue("ducklake_snapshot_id_seq"); +} + +idx_t PostgresMetadataManager::AllocateNextCatalogId(idx_t /*advisory*/) { + return FetchScalarSequenceValue("ducklake_catalog_id_seq"); +} + +idx_t PostgresMetadataManager::AllocateNextFileId(idx_t /*advisory*/) { + return FetchScalarSequenceValue("ducklake_file_id_seq"); +} + +idx_t PostgresMetadataManager::AllocateNextSchemaVersion(idx_t /*advisory*/) { + // Sequence-allocated: catalog cache keys on schema_version; collisions from + // concurrent commits cause stale-cache reuse across transactions. + return FetchScalarSequenceValue("ducklake_schema_version_seq"); +} + +void PostgresMetadataManager::AcquireCommitLock() { + DuckLakeSnapshot dummy {}; + if (!commit_lock_classid.IsValid()) { + string probe = "SELECT hashtext({METADATA_CATALOG_NAME_LITERAL})::int4"; + auto probe_result = Query(dummy, probe); + if (probe_result->HasError()) { + probe_result->GetErrorObject().Throw( + "concurrent: failed to compute DuckLake commit lock classid: "); + } + auto chunk = probe_result->Fetch(); + if (!chunk || chunk->size() == 0) { + throw InternalException("ducklake: hashtext probe returned no row"); + } + commit_lock_classid = static_cast( + static_cast(chunk->data[0].GetValue(0).GetValue())); + } + + string set_timeout = "SET LOCAL lock_timeout = '30s'"; + auto timeout_res = Execute(dummy, set_timeout); + if (timeout_res->HasError()) { + timeout_res->GetErrorObject().Throw("concurrent: failed to set lock_timeout: "); + } + + // "concurrent:" prefix -> RetryOnError matches, transient pg error retries. + string query = "SELECT pg_advisory_xact_lock(" + + std::to_string(static_cast(commit_lock_classid.GetIndex())) + ", " + + std::to_string(DUCKLAKE_COMMIT_ADVISORY_SUBKEY) + ")"; + auto result = Execute(dummy, query); + if (result->HasError()) { + result->GetErrorObject().Throw("concurrent: DuckLake commit serialisation lock failed: "); + } +} + +void PostgresMetadataManager::EnsureIdSequences() { + // One statement per call: postgres_execute drops all but the first of a batch. + DuckLakeSnapshot dummy {0, 0, 0, 0}; + + auto run = [&](string query) { + auto result = Execute(dummy, query); + if (result->HasError()) { + result->GetErrorObject().Throw("Failed to ensure DuckLake id sequences: "); + } + }; + + // file_id_seq needs MINVALUE 0: bootstrap next_file_id=0 and first allocation must return 0. + // CACHE > 1 breaks MAX(snapshot_id)-as-horizon (pre-backend-cached nextval). + run("CREATE SEQUENCE IF NOT EXISTS {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot_id_seq CACHE 1"); + run("CREATE SEQUENCE IF NOT EXISTS {METADATA_SCHEMA_ESCAPED}.ducklake_catalog_id_seq CACHE 1"); + run("CREATE SEQUENCE IF NOT EXISTS {METADATA_SCHEMA_ESCAPED}.ducklake_file_id_seq MINVALUE 0 START WITH 0 CACHE 1"); + run("CREATE SEQUENCE IF NOT EXISTS {METADATA_SCHEMA_ESCAPED}.ducklake_schema_version_seq CACHE 1"); + run(R"(SELECT setval( + '{METADATA_SCHEMA_ESCAPED}.ducklake_snapshot_id_seq', + GREATEST( + (SELECT last_value FROM {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot_id_seq), + GREATEST(1, COALESCE((SELECT MAX(snapshot_id) FROM {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot), 0)) + ), + COALESCE((SELECT MAX(snapshot_id) FROM {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot), 1) >= 1 +))"); + run(R"(SELECT setval( + '{METADATA_SCHEMA_ESCAPED}.ducklake_catalog_id_seq', + GREATEST( + (SELECT last_value FROM {METADATA_SCHEMA_ESCAPED}.ducklake_catalog_id_seq), + GREATEST(1, COALESCE((SELECT MAX(next_catalog_id) - 1 FROM {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot), 0)) + ), + COALESCE((SELECT MAX(next_catalog_id) - 1 FROM {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot), 0) >= 1 +))"); + run(R"(SELECT setval( + '{METADATA_SCHEMA_ESCAPED}.ducklake_file_id_seq', + GREATEST( + (SELECT last_value FROM {METADATA_SCHEMA_ESCAPED}.ducklake_file_id_seq), + GREATEST(0, COALESCE((SELECT MAX(next_file_id) - 1 FROM {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot), 0)) + ), + COALESCE((SELECT MAX(next_file_id) - 1 FROM {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot), 0) >= 1 +))"); + run(R"(SELECT setval( + '{METADATA_SCHEMA_ESCAPED}.ducklake_schema_version_seq', + GREATEST( + (SELECT last_value FROM {METADATA_SCHEMA_ESCAPED}.ducklake_schema_version_seq), + GREATEST(1, COALESCE((SELECT MAX(schema_version) FROM {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot), 0)) + ), + COALESCE((SELECT MAX(schema_version) FROM {METADATA_SCHEMA_ESCAPED}.ducklake_snapshot), 1) >= 1 +))"); + + // Replace the PK-violation signal the sequence allocator eliminates. + run("CREATE UNIQUE INDEX IF NOT EXISTS ducklake_schema_name_active_uidx " + "ON {METADATA_SCHEMA_ESCAPED}.ducklake_schema (schema_name) " + "WHERE end_snapshot IS NULL"); + run("CREATE UNIQUE INDEX IF NOT EXISTS ducklake_table_name_active_uidx " + "ON {METADATA_SCHEMA_ESCAPED}.ducklake_table (schema_id, table_name) " + "WHERE end_snapshot IS NULL"); + run("CREATE UNIQUE INDEX IF NOT EXISTS ducklake_view_name_active_uidx " + "ON {METADATA_SCHEMA_ESCAPED}.ducklake_view (schema_id, view_name) " + "WHERE end_snapshot IS NULL"); + // One live delete_file per data_file: without it, concurrent DELETEs + // double-count rows by applying both delete masks as independent variants. + run("CREATE UNIQUE INDEX IF NOT EXISTS ducklake_delete_file_active_uidx " + "ON {METADATA_SCHEMA_ESCAPED}.ducklake_delete_file (data_file_id) " + "WHERE end_snapshot IS NULL"); +} + // We need a specialized function here to do a reinterpret for postgres from BLOB to VARCHAR shared_ptr PostgresMetadataManager::TransformInlinedData(QueryResult &result, const vector &expected_types) { diff --git a/src/storage/ducklake_catalog.cpp b/src/storage/ducklake_catalog.cpp index c2c7e1f8a5a..69345e7e14f 100644 --- a/src/storage/ducklake_catalog.cpp +++ b/src/storage/ducklake_catalog.cpp @@ -222,7 +222,7 @@ idx_t DuckLakeCatalog::GetBeginSnapshotForSchemaVersion(TableIndex table_id, idx shared_ptr DuckLakeCatalog::GetSchemaCacheEntry(DuckLakeTransaction &transaction, DuckLakeSnapshot snapshot) { auto &cache = GetObjectCacheInstance(); - auto key = SchemaCacheKey(snapshot.schema_version); + auto key = SchemaCacheKey(snapshot.snapshot_id); auto cached = cache.Get(key); if (cached) { return cached; diff --git a/src/storage/ducklake_initializer.cpp b/src/storage/ducklake_initializer.cpp index 3b930baceb5..c56a5193b44 100644 --- a/src/storage/ducklake_initializer.cpp +++ b/src/storage/ducklake_initializer.cpp @@ -36,13 +36,22 @@ string DuckLakeInitializer::GetAttachOptions() { throw InternalException("Unsupported access mode in DuckLake attach"); } } + bool has_isolation_level_override = false; for (auto &option : options.metadata_parameters) { attach_options.push_back(option.first + " " + option.second.ToSQLString()); + if (StringUtil::Lower(option.first) == "isolation_level") { + has_isolation_level_override = true; + } } const string metadata_type = catalog.MetadataType(); if (metadata_type.empty() || metadata_type == "duckdb") { // this is duckdb, we always do latest storage attach_options.push_back(StringUtil::Format("STORAGE_VERSION '%s'", "latest")); + } else if ((metadata_type == "postgres" || metadata_type == "postgres_scanner") && + !has_isolation_level_override) { + // REPEATABLE READ pins one pg snapshot per DuckLake tx, hiding concurrent + // committers from CheckForConflicts. Override via meta_isolation_level. + attach_options.push_back("isolation_level 'read committed'"); } if (attach_options.empty()) { @@ -159,6 +168,9 @@ void DuckLakeInitializer::InitializeNewDuckLake(DuckLakeTransaction &transaction SetVersionedMetadataManager(transaction, version); auto &metadata_manager = transaction.GetMetadataManager(); metadata_manager.InitializeDuckLake(has_explicit_schema, catalog.Encryption()); + if (auto *pg_mgr = dynamic_cast(&metadata_manager)) { + pg_mgr->EnsureIdSequences(); + } if (catalog.Encryption() == DuckLakeEncryption::AUTOMATIC) { // default to unencrypted catalog.SetEncryption(DuckLakeEncryption::UNENCRYPTED); @@ -256,7 +268,11 @@ void DuckLakeInitializer::LoadExistingDuckLake(DuckLakeTransaction &transaction) for (auto &entry : metadata.table_settings) { options.table_options[entry.table_id][entry.tag.key] = entry.tag.value; } - // set correct version metadata manager + if (options.access_mode != AccessMode::READ_ONLY) { + if (auto *pg_mgr = dynamic_cast(&transaction.GetMetadataManager())) { + pg_mgr->EnsureIdSequences(); + } + } if (resolved_version != DuckLakeVersion::UNSET) { SetVersionedMetadataManager(transaction, resolved_version); } diff --git a/src/storage/ducklake_metadata_manager.cpp b/src/storage/ducklake_metadata_manager.cpp index 9b4f6cbf8ae..3c2cf392be0 100644 --- a/src/storage/ducklake_metadata_manager.cpp +++ b/src/storage/ducklake_metadata_manager.cpp @@ -185,8 +185,8 @@ CREATE TABLE {METADATA_CATALOG}.ducklake_file_column_stats(data_file_id BIGINT, CREATE TABLE {METADATA_CATALOG}.ducklake_file_variant_stats(data_file_id BIGINT, table_id BIGINT, column_id BIGINT, variant_path VARCHAR, shredded_type VARCHAR, column_size_bytes BIGINT, value_count BIGINT, null_count BIGINT, min_value VARCHAR, max_value VARCHAR, contains_nan BOOLEAN, extra_stats VARCHAR); CREATE TABLE {METADATA_CATALOG}.ducklake_delete_file(delete_file_id BIGINT PRIMARY KEY, table_id BIGINT, begin_snapshot BIGINT, end_snapshot BIGINT, data_file_id BIGINT, path VARCHAR, path_is_relative BOOLEAN, format VARCHAR, delete_count BIGINT, file_size_bytes BIGINT, footer_size BIGINT, encryption_key VARCHAR, partial_max BIGINT); CREATE TABLE {METADATA_CATALOG}.ducklake_column(column_id BIGINT, begin_snapshot BIGINT, end_snapshot BIGINT, table_id BIGINT, column_order BIGINT, column_name VARCHAR, column_type VARCHAR, initial_default VARCHAR, default_value VARCHAR, nulls_allowed BOOLEAN, parent_column BIGINT, default_value_type VARCHAR, default_value_dialect VARCHAR); -CREATE TABLE {METADATA_CATALOG}.ducklake_table_stats(table_id BIGINT, record_count BIGINT, next_row_id BIGINT, file_size_bytes BIGINT); -CREATE TABLE {METADATA_CATALOG}.ducklake_table_column_stats(table_id BIGINT, column_id BIGINT, contains_null BOOLEAN, contains_nan BOOLEAN, min_value VARCHAR, max_value VARCHAR, extra_stats VARCHAR); +CREATE TABLE {METADATA_CATALOG}.ducklake_table_stats(table_id BIGINT PRIMARY KEY, record_count BIGINT, next_row_id BIGINT, file_size_bytes BIGINT, stats_version BIGINT NOT NULL DEFAULT 0); +CREATE TABLE {METADATA_CATALOG}.ducklake_table_column_stats(table_id BIGINT, column_id BIGINT, contains_null BOOLEAN, contains_nan BOOLEAN, min_value VARCHAR, max_value VARCHAR, extra_stats VARCHAR, PRIMARY KEY (table_id, column_id)); CREATE TABLE {METADATA_CATALOG}.ducklake_partition_info(partition_id BIGINT, table_id BIGINT, begin_snapshot BIGINT, end_snapshot BIGINT); CREATE TABLE {METADATA_CATALOG}.ducklake_partition_column(partition_id BIGINT, table_id BIGINT, partition_key_index BIGINT, column_id BIGINT, transform VARCHAR); CREATE TABLE {METADATA_CATALOG}.ducklake_file_partition_value(data_file_id BIGINT, table_id BIGINT, partition_key_index BIGINT, partition_value VARCHAR); @@ -327,7 +327,14 @@ UPDATE {METADATA_CATALOG}.ducklake_metadata SET value = '1.0' WHERE key = 'versi } void DuckLakeMetadataManager::MigrateV10() { + // Stats PKs deliberately NOT added: pg ADD PRIMARY KEY is not idempotent, + // concurrent init runs would race. Existing catalogs rely on CAS alone. + // duckdb backend rejects ALTER ADD COLUMN with NOT NULL/DEFAULT constraints, + // so add nullable then backfill. Subsequent INSERT/UPDATE always set + // stats_version explicitly so NULL only exists during this migration window. auto result = transaction.Query(R"( +ALTER TABLE {METADATA_CATALOG}.ducklake_table_stats ADD COLUMN IF NOT EXISTS stats_version BIGINT; +UPDATE {METADATA_CATALOG}.ducklake_table_stats SET stats_version = 0 WHERE stats_version IS NULL; UPDATE {METADATA_CATALOG}.ducklake_metadata SET value = '1.1-dev1' WHERE key = 'version'; )"); if (result->HasError()) { @@ -855,6 +862,10 @@ void TransformGlobalStatsRow(const ROW &row, vector &gl new_entry.record_count = row.template GetValue(2 + from_column); new_entry.next_row_id = row.template GetValue(3 + from_column); new_entry.table_size_bytes = row.template GetValue(4 + from_column); + const idx_t STATS_VERSION_COL = 10 + from_column; + if (!row.IsNull(STATS_VERSION_COL)) { // NULL on pre-migration rows + new_entry.stats_version = row.template GetValue(STATS_VERSION_COL); + } global_stats.push_back(std::move(new_entry)); } @@ -920,7 +931,7 @@ vector TransformGlobalStats(QueryResult &result) { vector DuckLakeMetadataManager::GetGlobalTableStats(DuckLakeSnapshot snapshot) { // query the most recent stats auto result = transaction.Query(snapshot, R"( -SELECT table_id, column_id, record_count, next_row_id, file_size_bytes, contains_null, contains_nan, min_value, max_value, extra_stats +SELECT table_id, column_id, record_count, next_row_id, file_size_bytes, contains_null, contains_nan, min_value, max_value, extra_stats, stats_version FROM {METADATA_CATALOG}.ducklake_table_stats LEFT JOIN {METADATA_CATALOG}.ducklake_table_column_stats USING (table_id) WHERE record_count IS NOT NULL AND file_size_bytes IS NOT NULL @@ -2520,7 +2531,9 @@ WHERE table_id = %d AND schema_version=( // write the new inlined table string inlined_tables; string inlined_table_queries; - commit_snapshot.schema_version++; + // Race-free: physical name `ducklake_inlined_data__` would + // collide under concurrent CREATE-inlined paths with a local ++. + commit_snapshot.schema_version = AllocateNextSchemaVersion(commit_snapshot.schema_version); inlined_table_name = GetInlinedTableQueries(commit_snapshot, table_info, inlined_tables, inlined_table_queries); batch_query += "INSERT INTO {METADATA_CATALOG}.ducklake_inlined_data_tables VALUES " + inlined_tables + ";"; @@ -3531,6 +3544,18 @@ string DuckLakeMetadataManager::InsertSnapshot() { return R"(INSERT INTO {METADATA_CATALOG}.ducklake_snapshot VALUES ({SNAPSHOT_ID}, NOW(), {SCHEMA_VERSION}, {NEXT_CATALOG_ID}, {NEXT_FILE_ID});)"; } +idx_t DuckLakeMetadataManager::AllocateNextSnapshotId(idx_t current_snapshot_id) { + return current_snapshot_id + 1; +} + +idx_t DuckLakeMetadataManager::AllocateNextCatalogId(idx_t current_next_catalog_id) { + return current_next_catalog_id; +} + +idx_t DuckLakeMetadataManager::AllocateNextFileId(idx_t current_next_file_id) { + return current_next_file_id; +} + static string SQLStringOrNull(const string &str) { if (str.empty()) { return "NULL"; @@ -3570,7 +3595,8 @@ SELECT NULL AS contains_nan, NULL AS min_value, NULL AS max_value, - NULL AS extra_stats + NULL AS extra_stats, + NULL AS stats_version FROM {METADATA_CATALOG}.ducklake_snapshot WHERE snapshot_id = ( SELECT MAX(snapshot_id) @@ -3591,7 +3617,8 @@ SELECT contains_nan, min_value, max_value, - extra_stats + extra_stats, + stats_version FROM {METADATA_CATALOG}.ducklake_table_stats LEFT JOIN {METADATA_CATALOG}.ducklake_table_column_stats USING (table_id) @@ -4029,7 +4056,7 @@ string DuckLakeMetadataManager::UpdateGlobalTableStats(const DuckLakeGlobalStats if (!stats.initialized) { batch_query += - StringUtil::Format("INSERT INTO {METADATA_CATALOG}.ducklake_table_stats VALUES (%d, %d, %d, %d);", + StringUtil::Format("INSERT INTO {METADATA_CATALOG}.ducklake_table_stats VALUES (%d, %d, %d, %d, 0);", stats.table_id.index, stats.record_count, stats.next_row_id, stats.table_size_bytes); string column_stats_values; for (auto &col_stats : stats.column_stats) { @@ -4044,11 +4071,13 @@ string DuckLakeMetadataManager::UpdateGlobalTableStats(const DuckLakeGlobalStats batch_query += StringUtil::Format("INSERT INTO {METADATA_CATALOG}.ducklake_table_column_stats VALUES %s;", column_stats_values); } else { - // stats have been initialized - update them + // CAS: stats_version=expected, set to unique new_stats_version. + // FlushChanges post-verifies via SELECT to detect silent rowcount=0. batch_query += StringUtil::Format( "UPDATE {METADATA_CATALOG}.ducklake_table_stats SET record_count=%d, file_size_bytes=%d, " - "next_row_id=%d WHERE table_id=%d;", - stats.record_count, stats.table_size_bytes, stats.next_row_id, stats.table_id.index); + "next_row_id=%d, stats_version=%d WHERE table_id=%d AND stats_version=%d;", + stats.record_count, stats.table_size_bytes, stats.next_row_id, stats.new_stats_version, + stats.table_id.index, stats.stats_version); for (auto &col_stats : stats.column_stats) { auto sql = ColumnStatsSQL::FromColumnStats(col_stats); batch_query += diff --git a/src/storage/ducklake_transaction.cpp b/src/storage/ducklake_transaction.cpp index 6a7f3ae064c..e6048b15f55 100644 --- a/src/storage/ducklake_transaction.cpp +++ b/src/storage/ducklake_transaction.cpp @@ -920,16 +920,37 @@ TransactionChangeInformation DuckLakeTransaction::GetTransactionChanges() const } struct DuckLakeCommitState { - explicit DuckLakeCommitState(DuckLakeSnapshot &snapshot) : commit_snapshot(snapshot) { + DuckLakeCommitState(DuckLakeSnapshot &snapshot, DuckLakeMetadataManager &metadata_manager) + : commit_snapshot(snapshot), metadata_manager(metadata_manager) { } DuckLakeSnapshot &commit_snapshot; + DuckLakeMetadataManager &metadata_manager; map committed_schemas; map committed_tables; map committed_partition_ids; map committed_mapping_indexes; map> local_delete_files; + // (table_id, expected new stats_version). FlushChanges post-verifies via + // SELECT to catch CAS UPDATEs that silently matched zero rows. + vector> stats_verification; + + idx_t AllocateCatalogId() const { + idx_t id = metadata_manager.AllocateNextCatalogId(commit_snapshot.next_catalog_id); + if (id + 1 > commit_snapshot.next_catalog_id) { + commit_snapshot.next_catalog_id = id + 1; + } + return id; + } + idx_t AllocateFileId() const { + idx_t id = metadata_manager.AllocateNextFileId(commit_snapshot.next_file_id); + if (id + 1 > commit_snapshot.next_file_id) { + commit_snapshot.next_file_id = id + 1; + } + return id; + } + void RemapIdentifier(SchemaIndex &schema_id) const { auto entry = committed_schemas.find(schema_id); if (entry != committed_schemas.end()) { @@ -1207,6 +1228,7 @@ void DuckLakeTransaction::CheckForConflicts(const TransactionChangeInformation & // check if we are dropping the same view as another transaction for (auto &dropped_idx : changes.dropped_views) { ConflictCheck(dropped_idx, other_changes.dropped_views, "drop view", "dropped it already"); + ConflictCheck(dropped_idx, other_changes.altered_views, "drop view", "altered it"); } // check if we are dropping the same macro as another transaction for (auto &dropped_idx : changes.dropped_scalar_macros) { @@ -1324,19 +1346,24 @@ void DuckLakeTransaction::CheckForConflicts(const TransactionChangeInformation & ConflictCheck(table_id, other_changes.tables_deleted_from, "compact table", "deleted from it"); ConflictCheck(table_id, other_changes.tables_merge_adjacent, "compact table", "compacted it"); ConflictCheck(table_id, other_changes.tables_rewrite_delete, "compact table", "compacted it"); + ConflictCheck(table_id, other_changes.altered_tables, "compact table", "altered it"); } for (auto &table_id : changes.tables_rewrite_delete) { ConflictCheck(table_id, other_changes.dropped_tables, "compact table", "dropped it"); ConflictCheck(table_id, other_changes.tables_deleted_from, "compact table", "deleted from it"); ConflictCheck(table_id, other_changes.tables_merge_adjacent, "compact table", "compacted it"); ConflictCheck(table_id, other_changes.tables_rewrite_delete, "compact table", "compacted it"); + ConflictCheck(table_id, other_changes.altered_tables, "compact table", "altered it"); } for (auto &table_id : changes.altered_tables) { ConflictCheck(table_id, other_changes.dropped_tables, "alter table", "dropped it"); ConflictCheck(table_id, other_changes.altered_tables, "alter table", "altered it"); + ConflictCheck(table_id, other_changes.tables_merge_adjacent, "alter table", "compacted it"); + ConflictCheck(table_id, other_changes.tables_rewrite_delete, "alter table", "compacted it"); } for (auto &view_id : changes.altered_views) { ConflictCheck(view_id, other_changes.altered_views, "alter view", "altered it"); + ConflictCheck(view_id, other_changes.dropped_views, "alter view", "dropped it"); } } @@ -1360,7 +1387,7 @@ vector DuckLakeTransaction::GetNewSchemas(DuckLakeCommitStat auto &schema_entry = entry.second->Cast(); auto old_id = schema_entry.GetSchemaId(); DuckLakeSchemaInfo schema_info; - schema_info.id = SchemaIndex(commit_state.commit_snapshot.next_catalog_id++); + schema_info.id = SchemaIndex(commit_state.AllocateCatalogId()); schema_info.uuid = schema_entry.GetSchemaUUID(); schema_info.name = schema_entry.name; schema_info.path = schema_entry.DataPath(); @@ -1388,7 +1415,7 @@ DuckLakePartitionInfo DuckLakeTransaction::GetNewPartitionKey(DuckLakeCommitStat return partition_key; } auto local_partition_id = partition_data->partition_id; - auto partition_id = commit_state.commit_snapshot.next_catalog_id++; + auto partition_id = commit_state.AllocateCatalogId(); partition_key.id = partition_id; partition_data->partition_id = partition_id; for (auto &field : partition_data->fields) { @@ -1437,7 +1464,7 @@ DuckLakeSortInfo DuckLakeTransaction::GetNewSortKey(DuckLakeCommitState &commit_ return sort_key; } - auto sort_id = commit_state.commit_snapshot.next_catalog_id++; + auto sort_id = commit_state.AllocateCatalogId(); sort_key.id = sort_id; sort_data->sort_id = sort_id; for (auto &field : sort_data->fields) { @@ -1484,7 +1511,7 @@ DuckLakeTableInfo DuckLakeTransaction::GetNewTable(DuckLakeCommitState &commit_s auto original_id = table_entry.id; bool is_new_table; if (IsTransactionLocal(original_id.index)) { - table_entry.id = TableIndex(commit_state.commit_snapshot.next_catalog_id++); + table_entry.id = TableIndex(commit_state.AllocateCatalogId()); is_new_table = true; } else { // this table already has an id - keep it @@ -1783,7 +1810,7 @@ DuckLakeViewInfo DuckLakeTransaction::GetNewView(DuckLakeCommitState &commit_sta DuckLakeViewInfo view_entry; auto original_id = view.GetViewId(); if (IsTransactionLocal(original_id.index)) { - view_entry.id = TableIndex(commit_state.commit_snapshot.next_catalog_id++); + view_entry.id = TableIndex(commit_state.AllocateCatalogId()); } else { // this view already has an id - keep it // this happens if e.g. this view is renamed @@ -1804,7 +1831,7 @@ void DuckLakeTransaction::GetNewMacroInfo(DuckLakeCommitState &commit_state, ref auto ¯o_entry = entry.get().Cast(); auto &ducklake_schema = macro_entry.schema.Cast(); - new_macro_info.macro_id = MacroIndex(commit_state.commit_snapshot.next_catalog_id++); + new_macro_info.macro_id = MacroIndex(commit_state.AllocateCatalogId()); new_macro_info.macro_name = macro_entry.name; new_macro_info.schema_id = commit_state.GetSchemaId(ducklake_schema); // Let's do the implementations @@ -1957,9 +1984,13 @@ struct DuckLakeNewGlobalStats { }; string DuckLakeTransaction::UpdateGlobalTableStats(TableIndex table_id, - const DuckLakeNewGlobalStats &new_global_stats) { + const DuckLakeNewGlobalStats &new_global_stats, + idx_t expected_stats_version, + idx_t new_stats_version) { DuckLakeGlobalStatsInfo stats; stats.table_id = table_id; + stats.stats_version = expected_stats_version; + stats.new_stats_version = new_stats_version; stats.initialized = new_global_stats.initialized; auto &new_stats = new_global_stats.stats; @@ -2035,7 +2066,8 @@ DuckLakeFileInfo DuckLakeTransaction::GetNewDataFile(const DuckLakeDataFile &fil TableIndex table_id, optional_idx row_id_start) { auto &commit_snapshot = commit_state.commit_snapshot; DuckLakeFileInfo data_file; - data_file.id = DataFileIndex(commit_snapshot.next_file_id++); + data_file.id = DataFileIndex(commit_state.AllocateFileId()); + (void)commit_snapshot; data_file.table_id = table_id; data_file.file_name = file.file_name; data_file.row_count = file.row_count; @@ -2165,11 +2197,25 @@ NewDataInfo DuckLakeTransaction::GetNewDataFiles(string &batch_query, DuckLakeCo if (table_changes.new_data_files.empty()) { // force an increment of file_id to signal a data change if we have only inlined data changes - commit_state.commit_snapshot.next_file_id++; + (void)commit_state.AllocateFileId(); } } // update the global stats for this table based on the newly written data - batch_query += UpdateGlobalTableStats(table_id, new_globals); + // CAS: expected = observed pre-commit version (0 if row uninitialised). + idx_t expected_stats_version = 0; + if (stats) { + for (auto &s : *stats) { + if (s.table_id == table_id) { + expected_stats_version = s.stats_version; + break; + } + } + } + idx_t new_stats_version = commit_state.commit_snapshot.snapshot_id; + batch_query += UpdateGlobalTableStats(table_id, new_globals, expected_stats_version, new_stats_version); + if (new_globals.initialized) { + commit_state.stats_verification.emplace_back(table_id, new_stats_version); + } } return result; } @@ -2178,7 +2224,7 @@ DuckLakeDeleteFileInfo DuckLakeTransaction::GetNewDeleteFile(TableIndex table_id const DuckLakeCommitState &commit_state, const DuckLakeDeleteFile &file) const { DuckLakeDeleteFileInfo delete_file; - delete_file.id = DataFileIndex(commit_state.commit_snapshot.next_file_id++); + delete_file.id = DataFileIndex(commit_state.AllocateFileId()); delete_file.table_id = table_id; delete_file.data_file_id = file.data_file_id; delete_file.path = file.file_name; @@ -2256,7 +2302,7 @@ NewNameMapInfo DuckLakeTransaction::GetNewNameMaps(DuckLakeCommitState &commit_s // generate a new mapping id auto local_map_id = entry.first; auto &mapping = *entry.second; - MappingIndex new_map_id(commit_state.commit_snapshot.next_file_id++); + MappingIndex new_map_id(commit_state.AllocateFileId()); DuckLakeColumnMappingInfo map_info; map_info.table_id = commit_state.GetTableId(mapping.table_id); @@ -2553,6 +2599,14 @@ bool RetryOnError(const string &original_message) { if (StringUtil::Contains(message, "concurrent")) { return true; } + if (StringUtil::Contains(message, "server closed the connection") || + StringUtil::Contains(message, "connection to server") || + StringUtil::Contains(message, "connection reset")) { + return true; + } + if (StringUtil::Contains(message, "deadlock")) { + return true; + } return false; } @@ -2577,7 +2631,6 @@ void DuckLakeTransaction::FlushChanges() { } auto transaction_snapshot = GetSnapshot(); - auto transaction_changes = GetTransactionChanges(); SnapshotAndStats commit_stats_snapshot; auto &commit_snapshot = commit_stats_snapshot.snapshot; optional_ptr> stats; @@ -2585,21 +2638,28 @@ void DuckLakeTransaction::FlushChanges() { bool can_retry; try { can_retry = false; - if (i > 0) { - // we failed our first commit due to another transaction committing - // retry - but first check for conflicts - commit_stats_snapshot = CheckForConflicts(transaction_snapshot, transaction_changes); - stats = &commit_stats_snapshot.stats; - } else { - commit_stats_snapshot.snapshot = GetSnapshot(); - } - commit_snapshot.snapshot_id++; + // Re-capture every attempt: tx-local state may have changed and + // stale tx_changes make CheckForConflicts miss real conflicts. + auto transaction_changes = GetTransactionChanges(); + can_retry = true; + // Must share the pg tx with AllocateNextSnapshotId below; release + // tied to COMMIT/ROLLBACK. Split breaks MAX(snapshot_id)-as-horizon. + metadata_manager->AcquireCommitLock(); + // Conflict check under the commit lock: any committer between + // transaction start and now has released the lock and is visible + // under read-committed, so the result is authoritative. Logical + // conflicts (Transaction conflict ...) are not retriable — same tx + // state will throw again — so gate retry off across the call. + can_retry = false; + commit_stats_snapshot = CheckForConflicts(transaction_snapshot, transaction_changes); + stats = &commit_stats_snapshot.stats; + can_retry = true; + commit_snapshot.snapshot_id = metadata_manager->AllocateNextSnapshotId(commit_snapshot.snapshot_id); if (SchemaChangesMade()) { - // we changed the schema - need to get a new schema version - commit_snapshot.schema_version++; + commit_snapshot.schema_version = + metadata_manager->AllocateNextSchemaVersion(commit_snapshot.schema_version); } - can_retry = true; - DuckLakeCommitState commit_state(commit_snapshot); + DuckLakeCommitState commit_state(commit_snapshot, *metadata_manager); // write the new snapshot string batch_queries = metadata_manager->InsertSnapshot(); batch_queries += CommitChanges(commit_state, transaction_changes, stats); @@ -2609,6 +2669,30 @@ void DuckLakeTransaction::FlushChanges() { if (res->HasError()) { res->GetErrorObject().Throw("Failed to flush changes into DuckLake: "); } + // Own-writes visible pre-COMMIT. Row with a different stats_version + // means a concurrent committer's UPDATE won; our CAS matched zero. + for (auto &entry : commit_state.stats_verification) { + string check_sql = StringUtil::Format( + "SELECT stats_version FROM {METADATA_CATALOG}.ducklake_table_stats WHERE table_id = %d", + entry.first.index); + auto check_res = metadata_manager->Query(commit_snapshot, check_sql); + if (check_res->HasError()) { + check_res->GetErrorObject().Throw("Failed to verify stats_version post-commit: "); + } + auto chunk = check_res->Fetch(); + if (!chunk || chunk->size() == 0) { + throw TransactionException( + "concurrent update to ducklake_table_stats: row missing for table_id=%d, retrying", + entry.first.index); + } + idx_t actual = chunk->data[0].GetValue(0).GetValue(); + if (actual != entry.second) { + throw TransactionException( + "concurrent update to ducklake_table_stats detected (stats_version=%llu, " + "expected=%llu for table_id=%d), retrying", + (unsigned long long)actual, (unsigned long long)entry.second, entry.first.index); + } + } connection->Commit(); catalog_version = commit_snapshot.schema_version; diff --git a/test/sql/concurrent/concurrent_insert_pg_sequences.test b/test/sql/concurrent/concurrent_insert_pg_sequences.test new file mode 100644 index 00000000000..aceff882787 --- /dev/null +++ b/test/sql/concurrent/concurrent_insert_pg_sequences.test @@ -0,0 +1,67 @@ +# name: test/sql/concurrent/concurrent_insert_pg_sequences.test +# description: verify Postgres sequence-backed ID allocators are adopted and advance +# under concurrent load (fixes ducklake_snapshot_pkey race). +# group: [concurrent] + +require notwindows + +require ducklake + +require parquet + +require postgres_scanner + +test-env DUCKLAKE_CONNECTION postgres:dbname=ducklakedb + +test-env PG_METADATA_CONN dbname=ducklakedb + +test-env DATA_PATH __TEST_DIR__ + +statement ok +ATTACH '{PG_METADATA_CONN}' AS _clean_pg (TYPE postgres); + +statement ok +CALL postgres_execute('_clean_pg', + 'DROP SCHEMA IF EXISTS metadata_pg_seq_test CASCADE'); + +query I +SELECT COUNT(*)::BIGINT FROM postgres_query('_clean_pg', + 'SELECT 1 FROM pg_namespace WHERE nspname = ''metadata_pg_seq_test'''); +---- +0 + +statement ok +DETACH _clean_pg; + +statement ok +ATTACH 'ducklake:{DUCKLAKE_CONNECTION}' AS dl_pg_seq (DATA_PATH '{DATA_PATH}/ducklake_pg_seq_files', METADATA_SCHEMA 'metadata_pg_seq_test'); + +statement ok +ATTACH '{PG_METADATA_CONN}' AS pg_meta (TYPE postgres); + +statement ok +CREATE TABLE dl_pg_seq.tbl(key INTEGER); + +concurrentloop i 0 4 + +query I +INSERT INTO dl_pg_seq.tbl VALUES ({i}) +---- +1 + +endloop + +# is_called=true proves nextval() fired during commits (vs setval-only bootstrap). +query III +SELECT is_called_snap, is_called_cat, is_called_fil +FROM postgres_query('pg_meta', + 'SELECT (SELECT is_called FROM metadata_pg_seq_test.ducklake_snapshot_id_seq) AS is_called_snap, + (SELECT is_called FROM metadata_pg_seq_test.ducklake_catalog_id_seq) AS is_called_cat, + (SELECT is_called FROM metadata_pg_seq_test.ducklake_file_id_seq) AS is_called_fil'); +---- +true true true + +query II +SELECT COUNT(*), SUM(key) FROM dl_pg_seq.tbl; +---- +4 6 diff --git a/test/sql/concurrent/concurrent_pg_conflict_detection.test b/test/sql/concurrent/concurrent_pg_conflict_detection.test new file mode 100644 index 00000000000..dcf1e8fbc62 --- /dev/null +++ b/test/sql/concurrent/concurrent_pg_conflict_detection.test @@ -0,0 +1,695 @@ +# name: test/sql/concurrent/concurrent_pg_conflict_detection.test +# description: deterministic conflict-detection coverage for the postgres +# metadata backend. Each scenario forces a specific conflict +# path to fire and asserts on the resulting error message, +# so passing this test means the path actually executed. +# group: [concurrent] + +require notwindows + +require ducklake + +require parquet + +require postgres_scanner + +test-env DUCKLAKE_CONNECTION postgres:dbname=ducklakedb + +test-env PG_METADATA_CONN dbname=ducklakedb + +test-env DATA_PATH {TEST_DIR} + +statement ok +ATTACH '{PG_METADATA_CONN}' AS _clean_pg (TYPE postgres); + +statement ok +CALL postgres_execute('_clean_pg', + 'DROP SCHEMA IF EXISTS metadata_pg_conflict_test CASCADE'); + +statement ok +DETACH _clean_pg; + +statement ok +ATTACH 'ducklake:{DUCKLAKE_CONNECTION}' AS dl (DATA_PATH '{DATA_PATH}/ducklake_pg_conflict_files', METADATA_SCHEMA 'metadata_pg_conflict_test', DATA_INLINING_ROW_LIMIT 0) + +statement ok +ATTACH '{PG_METADATA_CONN}' AS pg_meta (TYPE postgres) + +statement ok con1 +SET ducklake_retry_wait_ms=20 + +statement ok con1 +SET ducklake_max_retry_count=10 + +statement ok con2 +SET ducklake_retry_wait_ms=20 + +statement ok con2 +SET ducklake_max_retry_count=10 + + +# Regression guard: ducklake-internal metadata catalog must run at +# `read committed`. `repeatable read` pins the PG snapshot per DuckLake tx +# and silently disables CheckForConflicts cross-action detection. + +# touch metadata to ensure the internal connection has issued BEGIN +statement ok +CREATE TABLE dl.iso_probe(x INTEGER) + +query I +SELECT * FROM postgres_query('__ducklake_metadata_dl', + 'SELECT current_setting(''transaction_isolation'')') +---- +read committed + + +# Scenario A: CREATE TABLE same name from two transactions. +# Exercises ConflictCheck on changes.created_tables. +statement ok con1 +BEGIN + +statement ok con2 +BEGIN + +statement ok con1 +CREATE TABLE dl.create_dup(x INTEGER) + +statement ok con2 +CREATE TABLE dl.create_dup(y INTEGER) + +statement ok con1 +COMMIT + +statement error con2 +COMMIT +---- +:.*Transaction conflict.*create.*create_dup.* + +# Catalog must reflect exactly the winner (con1's column 'x'), not con2. +query I +SELECT COUNT(*)::BIGINT FROM postgres_query('pg_meta', + 'SELECT 1 FROM metadata_pg_conflict_test.ducklake_table + WHERE table_name = ''create_dup'' AND end_snapshot IS NULL') +---- +1 + + +# Scenario B: INSERT vs ALTER on the same table. +# Exercises tables_inserted_into ∩ altered_tables. +statement ok +CREATE TABLE dl.shape(a INTEGER) + +statement ok +INSERT INTO dl.shape VALUES (1) + +statement ok con1 +BEGIN + +statement ok con2 +BEGIN + +statement ok con1 +ALTER TABLE dl.shape ADD COLUMN b INTEGER + +statement ok con2 +INSERT INTO dl.shape(a) VALUES (10) + +statement ok con1 +COMMIT + +statement error con2 +COMMIT +---- +:.*Transaction conflict.*insert into table.*altered.* + + +# Scenario C: INSERT vs DROP on the same table. +# Exercises tables_inserted_into ∩ dropped_tables. +statement ok +CREATE TABLE dl.doomed(a INTEGER) + +statement ok +INSERT INTO dl.doomed VALUES (1) + +statement ok con1 +BEGIN + +statement ok con2 +BEGIN + +statement ok con1 +DROP TABLE dl.doomed + +statement ok con2 +INSERT INTO dl.doomed VALUES (10) + +statement ok con1 +COMMIT + +statement error con2 +COMMIT +---- +:.*Transaction conflict.*insert into table.*dropped.* + + +# Scenario D: concurrent DELETE on the same data file. +# Exercises file-level ConflictCheck OR ducklake_delete_file_active_uidx. +statement ok +CREATE TABLE dl.del_target(k INTEGER) + +statement ok +INSERT INTO dl.del_target SELECT i FROM range(64) t(i) + +query I +SELECT COUNT(*)::BIGINT FROM postgres_query('pg_meta', + 'SELECT 1 FROM metadata_pg_conflict_test.ducklake_data_file df + JOIN metadata_pg_conflict_test.ducklake_table t USING (table_id) + WHERE t.table_name = ''del_target'' AND df.end_snapshot IS NULL') +---- +1 + +statement ok con1 +BEGIN + +statement ok con2 +BEGIN + +statement ok con1 +DELETE FROM dl.del_target WHERE k = 0 + +statement ok con2 +DELETE FROM dl.del_target WHERE k = 1 + +statement ok con1 +COMMIT + +statement error con2 +COMMIT +---- +:.*(Transaction conflict.*delete|duplicate key|ducklake_delete_file_active_uidx).* + +# Active delete_file uniqueness invariant must hold post-conflict. +# Without it, both delete masks apply as independent variants → double-count. +query I +SELECT COUNT(*)::BIGINT FROM postgres_query('pg_meta', + 'SELECT data_file_id FROM metadata_pg_conflict_test.ducklake_delete_file + WHERE end_snapshot IS NULL + GROUP BY data_file_id + HAVING COUNT(*) > 1') +---- +0 + + +# Scenario F: ALTER vs ALTER same table. +# Exercises altered_tables ∩ altered_tables. +statement ok +CREATE TABLE dl.alter_dup(a INTEGER) + +statement ok con1 +BEGIN + +statement ok con2 +BEGIN + +statement ok con1 +ALTER TABLE dl.alter_dup ADD COLUMN x INTEGER + +statement ok con2 +ALTER TABLE dl.alter_dup ADD COLUMN y INTEGER + +statement ok con1 +COMMIT + +statement error con2 +COMMIT +---- +:.*Transaction conflict.*alter table.*altered.* + + +# Scenario G: DROP vs ALTER on same table (con1 DROPs first). +# Exercises altered_tables ∩ dropped_tables. +statement ok +CREATE TABLE dl.drop_alter(a INTEGER) + +statement ok con1 +BEGIN + +statement ok con2 +BEGIN + +statement ok con1 +DROP TABLE dl.drop_alter + +statement ok con2 +ALTER TABLE dl.drop_alter ADD COLUMN x INTEGER + +statement ok con1 +COMMIT + +statement error con2 +COMMIT +---- +:.*Transaction conflict.*alter table.*dropped.* + + +# Scenario H: DROP vs DROP same table. +# Exercises dropped_tables ∩ dropped_tables. +statement ok +CREATE TABLE dl.drop_dup(a INTEGER) + +statement ok con1 +BEGIN + +statement ok con2 +BEGIN + +statement ok con1 +DROP TABLE dl.drop_dup + +statement ok con2 +DROP TABLE dl.drop_dup + +statement ok con1 +COMMIT + +statement error con2 +COMMIT +---- +:.*Transaction conflict.*drop table.*dropped.* + + +# Scenario I: ALTER vs DELETE same table (con1 ALTERs first). +# Exercises tables_deleted_from ∩ altered_tables. +statement ok +CREATE TABLE dl.alter_del(k INTEGER) + +statement ok +INSERT INTO dl.alter_del SELECT i FROM range(8) t(i) + +statement ok con1 +BEGIN + +statement ok con2 +BEGIN + +statement ok con1 +ALTER TABLE dl.alter_del ADD COLUMN x INTEGER + +statement ok con2 +DELETE FROM dl.alter_del WHERE k = 0 + +statement ok con1 +COMMIT + +statement error con2 +COMMIT +---- +:.*Transaction conflict.*delete from table.*altered.* + + +# Scenario J: DROP vs DELETE same table (con1 DROPs first). +# Exercises tables_deleted_from ∩ dropped_tables. +statement ok +CREATE TABLE dl.drop_del(k INTEGER) + +statement ok +INSERT INTO dl.drop_del SELECT i FROM range(8) t(i) + +statement ok con1 +BEGIN + +statement ok con2 +BEGIN + +statement ok con1 +DROP TABLE dl.drop_del + +statement ok con2 +DELETE FROM dl.drop_del WHERE k = 0 + +statement ok con1 +COMMIT + +statement error con2 +COMMIT +---- +:.*Transaction conflict.*delete from table.*dropped.* + + +# Scenario K: concurrent INSERTs into same table — negative control. +# insert/insert is commutative; both commits succeed. +statement ok +CREATE TABLE dl.ins_ins(k INTEGER) + +statement ok con1 +BEGIN + +statement ok con2 +BEGIN + +statement ok con1 +INSERT INTO dl.ins_ins VALUES (1) + +statement ok con2 +INSERT INTO dl.ins_ins VALUES (2) + +statement ok con1 +COMMIT + +statement ok con2 +COMMIT + +query I +SELECT COUNT(*)::BIGINT FROM dl.ins_ins +---- +2 + + +# Scenario E: stats_version CAS advances on each commit. +# Sequence-allocated snapshot ids decouple from commit order, so +# stats_version is what keeps ducklake_table_stats convergent. +statement ok +CREATE TABLE dl.stats_target(k INTEGER) + +statement ok +INSERT INTO dl.stats_target VALUES (1) + +statement ok +CREATE TEMP TABLE _v1 AS +SELECT stats_version AS v FROM postgres_query('pg_meta', + 'SELECT stats_version FROM metadata_pg_conflict_test.ducklake_table_stats + WHERE table_id = (SELECT table_id FROM metadata_pg_conflict_test.ducklake_table + WHERE table_name = ''stats_target'' AND end_snapshot IS NULL)') + +statement ok +INSERT INTO dl.stats_target VALUES (2) + +query I +SELECT (SELECT stats_version FROM postgres_query('pg_meta', + 'SELECT stats_version FROM metadata_pg_conflict_test.ducklake_table_stats + WHERE table_id = (SELECT table_id FROM metadata_pg_conflict_test.ducklake_table + WHERE table_name = ''stats_target'' AND end_snapshot IS NULL)')) + > (SELECT v FROM _v1) +---- +true + + +# Scenario L: ALTER vs ALTER on same view. +# Exercises altered_views ∩ altered_views. +statement ok +CREATE TABLE dl.view_base(a INTEGER) + +statement ok +CREATE VIEW dl.dup_view AS SELECT a FROM dl.view_base + +statement ok con1 +BEGIN + +statement ok con2 +BEGIN + +statement ok con1 +COMMENT ON VIEW dl.dup_view IS 'comment-1' + +statement ok con2 +COMMENT ON VIEW dl.dup_view IS 'comment-2' + +statement ok con1 +COMMIT + +statement error con2 +COMMIT +---- +:.*Transaction conflict.*alter view.*altered.* + + +# Scenario M: DROP vs DROP same view. +# Exercises dropped_views ∩ dropped_views. +statement ok +CREATE VIEW dl.drop_dup_view AS SELECT a FROM dl.view_base + +statement ok con1 +BEGIN + +statement ok con2 +BEGIN + +statement ok con1 +DROP VIEW dl.drop_dup_view + +statement ok con2 +DROP VIEW dl.drop_dup_view + +statement ok con1 +COMMIT + +statement error con2 +COMMIT +---- +:.*Transaction conflict.*drop view.*dropped.* + + +# Scenario N: COMPACT vs DELETE on same table (con1 compacts first). +# Exercises tables_deleted_from ∩ tables_merge_adjacent. +statement ok +CREATE TABLE dl.compact_del(k INTEGER) + +statement ok +INSERT INTO dl.compact_del SELECT i FROM range(8) t(i) + +statement ok +INSERT INTO dl.compact_del SELECT i FROM range(8, 16) t(i) + +statement ok con1 +BEGIN + +statement ok con2 +BEGIN + +query III con1 +SELECT table_name, files_processed, files_created FROM ducklake_merge_adjacent_files('dl', 'compact_del') +---- +compact_del 2 1 + +statement ok con2 +DELETE FROM dl.compact_del WHERE k = 0 + +statement ok con1 +COMMIT + +statement error con2 +COMMIT +---- +:.*Transaction conflict.*delete from table.*compacted.* + + +# Scenario O: DELETE vs COMPACT on same table (con1 deletes first). +# Exercises tables_merge_adjacent ∩ tables_deleted_from. +statement ok +CREATE TABLE dl.del_compact(k INTEGER) + +statement ok +INSERT INTO dl.del_compact SELECT i FROM range(8) t(i) + +statement ok +INSERT INTO dl.del_compact SELECT i FROM range(8, 16) t(i) + +statement ok con1 +BEGIN + +statement ok con2 +BEGIN + +statement ok con1 +DELETE FROM dl.del_compact WHERE k = 0 + +query III con2 +SELECT table_name, files_processed, files_created FROM ducklake_merge_adjacent_files('dl', 'del_compact') +---- +del_compact 2 1 + +statement ok con1 +COMMIT + +statement error con2 +COMMIT +---- +:.*Transaction conflict.*compact table.*deleted.* + + +# Scenario P: COMPACT vs COMPACT on same table. +# Exercises tables_merge_adjacent ∩ tables_merge_adjacent. +statement ok +CREATE TABLE dl.compact_dup(k INTEGER) + +statement ok +INSERT INTO dl.compact_dup SELECT i FROM range(8) t(i) + +statement ok +INSERT INTO dl.compact_dup SELECT i FROM range(8, 16) t(i) + +statement ok con1 +BEGIN + +statement ok con2 +BEGIN + +query III con1 +SELECT table_name, files_processed, files_created FROM ducklake_merge_adjacent_files('dl', 'compact_dup') +---- +compact_dup 2 1 + +query III con2 +SELECT table_name, files_processed, files_created FROM ducklake_merge_adjacent_files('dl', 'compact_dup') +---- +compact_dup 2 1 + +statement ok con1 +COMMIT + +statement error con2 +COMMIT +---- +:.*Transaction conflict.*compact table.*compacted.* + + +# Scenario Q: COMPACT vs ALTER on same table (con1 compacts first). +# Exercises altered_tables ∩ tables_merge_adjacent. +statement ok +CREATE TABLE dl.compact_alter(k INTEGER) + +statement ok +INSERT INTO dl.compact_alter SELECT i FROM range(8) t(i) + +statement ok +INSERT INTO dl.compact_alter SELECT i FROM range(8, 16) t(i) + +statement ok con1 +BEGIN + +statement ok con2 +BEGIN + +query III con1 +SELECT table_name, files_processed, files_created FROM ducklake_merge_adjacent_files('dl', 'compact_alter') +---- +compact_alter 2 1 + +statement ok con2 +ALTER TABLE dl.compact_alter ADD COLUMN x INTEGER + +statement ok con1 +COMMIT + +statement error con2 +COMMIT +---- +:.*Transaction conflict.*alter table.*compacted.* + + +# Scenario R: ALTER vs COMPACT on same table (con1 alters first). +# Exercises tables_merge_adjacent ∩ altered_tables. +statement ok +CREATE TABLE dl.alter_compact(k INTEGER) + +statement ok +INSERT INTO dl.alter_compact SELECT i FROM range(8) t(i) + +statement ok +INSERT INTO dl.alter_compact SELECT i FROM range(8, 16) t(i) + +statement ok con1 +BEGIN + +statement ok con2 +BEGIN + +statement ok con1 +ALTER TABLE dl.alter_compact ADD COLUMN x INTEGER + +query III con2 +SELECT table_name, files_processed, files_created FROM ducklake_merge_adjacent_files('dl', 'alter_compact') +---- +alter_compact 2 1 + +statement ok con1 +COMMIT + +statement error con2 +COMMIT +---- +:.*Transaction conflict.*compact table.*altered.* + + +# Scenario S: ALTER view vs DROP view (con1 alters first). +# Exercises dropped_views ∩ altered_views. +statement ok +CREATE TABLE dl.s_view_base(a INTEGER) + +statement ok +CREATE VIEW dl.alter_drop_view AS SELECT a FROM dl.s_view_base + +statement ok con1 +BEGIN + +statement ok con2 +BEGIN + +statement ok con1 +COMMENT ON VIEW dl.alter_drop_view IS 'comment-1' + +statement ok con2 +DROP VIEW dl.alter_drop_view + +statement ok con1 +COMMIT + +statement error con2 +COMMIT +---- +:.*Transaction conflict.*drop view.*altered.* + + +# Scenario T: DROP view vs ALTER view (con1 drops first). +# Exercises altered_views ∩ dropped_views. +statement ok +CREATE VIEW dl.drop_alter_view AS SELECT a FROM dl.s_view_base + +statement ok con1 +BEGIN + +statement ok con2 +BEGIN + +statement ok con1 +DROP VIEW dl.drop_alter_view + +statement ok con2 +COMMENT ON VIEW dl.drop_alter_view IS 'comment-2' + +statement ok con1 +COMMIT + +statement error con2 +COMMIT +---- +:.*Transaction conflict.*alter view.*dropped.* + + +# Smoke: concurrent CREATE TABLE collisions converge under load. +# Tests outcome, not mechanism — catches contention regressions the +# deterministic scenarios miss. +concurrentloop i 0 8 + +statement maybe +CREATE TABLE dl.stress_dup(x INTEGER) +---- + +endloop + +query I +SELECT COUNT(*)::BIGINT FROM postgres_query('pg_meta', + 'SELECT 1 FROM metadata_pg_conflict_test.ducklake_table + WHERE table_name = ''stress_dup'' AND end_snapshot IS NULL') +---- +1 diff --git a/test/sql/migration/pg_sequences_bootstrap.test b/test/sql/migration/pg_sequences_bootstrap.test new file mode 100644 index 00000000000..cde73db69e9 --- /dev/null +++ b/test/sql/migration/pg_sequences_bootstrap.test @@ -0,0 +1,90 @@ +# name: test/sql/migration/pg_sequences_bootstrap.test +# description: verify EnsureIdSequences bootstraps from a pre-sequence catalog state +# group: [migration] + +require notwindows + +require ducklake + +require parquet + +require postgres_scanner + +test-env DUCKLAKE_CONNECTION postgres:dbname=ducklakedb + +test-env PG_METADATA_CONN dbname=ducklakedb + +test-env DATA_PATH __TEST_DIR__ + +statement ok +ATTACH 'ducklake:{DUCKLAKE_CONNECTION}' AS ducklake (DATA_PATH '{DATA_PATH}/ducklake_pg_seq_bootstrap', METADATA_SCHEMA 'metadata_s1') + +statement ok +CREATE TABLE ducklake.tbl(key INTEGER); + +statement ok +INSERT INTO ducklake.tbl VALUES (1), (2), (3); + +statement ok +DETACH ducklake + +# Simulate pre-sequence catalog: drop sequences, leave tables intact. +# EnsureIdSequences must re-create + setval on next attach. +statement ok +ATTACH '{PG_METADATA_CONN}' AS pg_raw (TYPE postgres); + +statement ok +CALL postgres_execute('pg_raw', + 'DROP SEQUENCE IF EXISTS metadata_s1.ducklake_snapshot_id_seq; + DROP SEQUENCE IF EXISTS metadata_s1.ducklake_catalog_id_seq; + DROP SEQUENCE IF EXISTS metadata_s1.ducklake_file_id_seq;'); + +query I +SELECT COUNT(*) +FROM pg_raw.pg_catalog.pg_class c +JOIN pg_raw.pg_catalog.pg_namespace n ON n.oid = c.relnamespace +WHERE c.relkind = 'S' + AND n.nspname = 'metadata_s1' + AND c.relname IN ('ducklake_snapshot_id_seq', 'ducklake_catalog_id_seq', 'ducklake_file_id_seq'); +---- +0 + +statement ok +DETACH pg_raw + +statement ok +ATTACH 'ducklake:{DUCKLAKE_CONNECTION}' AS ducklake (DATA_PATH '{DATA_PATH}/ducklake_pg_seq_bootstrap', METADATA_SCHEMA 'metadata_s1') + +statement ok +ATTACH '{PG_METADATA_CONN}' AS pg_meta (TYPE postgres); + +query I +SELECT COUNT(*) +FROM pg_meta.pg_catalog.pg_class c +JOIN pg_meta.pg_catalog.pg_namespace n ON n.oid = c.relnamespace +WHERE c.relkind = 'S' + AND n.nspname = 'metadata_s1' + AND c.relname IN ('ducklake_snapshot_id_seq', 'ducklake_catalog_id_seq', 'ducklake_file_id_seq'); +---- +3 + +# Read via postgres_query: postgres_scanner does not expose sequences as duckdb tables. +query III +SELECT snap_ok, cat_ok, fil_ok FROM postgres_query('pg_meta', + 'SELECT + (SELECT last_value FROM metadata_s1.ducklake_snapshot_id_seq) >= + COALESCE((SELECT MAX(snapshot_id) FROM metadata_s1.ducklake_snapshot), 0) AS snap_ok, + (SELECT last_value FROM metadata_s1.ducklake_catalog_id_seq) >= + COALESCE((SELECT MAX(next_catalog_id) - 1 FROM metadata_s1.ducklake_snapshot), 0) AS cat_ok, + (SELECT last_value FROM metadata_s1.ducklake_file_id_seq) >= + COALESCE((SELECT MAX(next_file_id) - 1 FROM metadata_s1.ducklake_snapshot), 0) AS fil_ok'); +---- +true true true + +statement ok +INSERT INTO ducklake.tbl VALUES (4); + +query II +SELECT COUNT(*), SUM(key) FROM ducklake.tbl; +---- +4 10 diff --git a/vcpkg.json b/vcpkg.json index fa76d317076..b6417cfc093 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -1,5 +1,6 @@ { "dependencies": [ - "roaring" + "roaring", + "openssl" ] -} \ No newline at end of file +}