Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/include/storage/ducklake_transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ class DuckLakeTransaction : public Transaction, public enable_shared_from_this<D
private:
void CleanupFiles();
void FlushChanges();
string RollbackAndResetConnection();
string CommitChanges(DuckLakeCommitState &commit_state, TransactionChangeInformation &transaction_changes,
optional_ptr<vector<DuckLakeGlobalStatsInfo>> stats);
void CommitCompaction(DuckLakeSnapshot &commit_snapshot, TransactionChangeInformation &transaction_changes);
Expand Down
38 changes: 28 additions & 10 deletions src/storage/ducklake_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -694,13 +694,31 @@ void DuckLakeTransaction::Commit() {
}

void DuckLakeTransaction::Rollback() {
if (connection) {
// rollback any changes made to the metadata catalog
connection->Rollback();
connection.reset();
}
auto rollback_error = RollbackAndResetConnection();
CleanupFiles();
local_changes.Clear();
if (!rollback_error.empty()) {
throw TransactionException("Failed to rollback metadata transaction: %s", rollback_error);
}
}

string DuckLakeTransaction::RollbackAndResetConnection() {
string rollback_error;
if (!connection) {
return rollback_error;
}
try {
// rollback any changes made to the metadata catalog
auto has_active_transaction = connection->context->transaction.HasActiveTransaction();
if (has_active_transaction) {
connection->Rollback();
}
} catch (std::exception &ex) {
ErrorData error(ex);
rollback_error = error.Message();
}
connection.reset();
return rollback_error;
}

Connection &DuckLakeTransaction::GetConnection() {
Expand Down Expand Up @@ -2617,10 +2635,7 @@ void DuckLakeTransaction::FlushChanges() {
} catch (std::exception &ex) {
ErrorData error(ex);
// rollback if there is an active transaction
auto has_active_transaction = connection->context->transaction.HasActiveTransaction();
if (has_active_transaction) {
connection->Rollback();
}
auto rollback_error = RollbackAndResetConnection();
bool retry_on_error = RetryOnError(error.Message());
bool finished_retrying = i + 1 >= max_retry_count;
if (!can_retry || !retry_on_error || finished_retrying) {
Expand All @@ -2629,6 +2644,9 @@ void DuckLakeTransaction::FlushChanges() {
// Add additional information on the number of retries and suggest to increase it
std::ostringstream error_message;
error_message << "Failed to commit DuckLake transaction." << '\n';
if (!rollback_error.empty()) {
error_message << "Failed to rollback metadata transaction: " << rollback_error << '\n';
}
if (finished_retrying) {
error_message << "Exceeded the maximum retry count of " << max_retry_count
<< " set by the ducklake_max_retry_count setting." << '\n'
Expand All @@ -2650,7 +2668,7 @@ void DuckLakeTransaction::FlushChanges() {
// retry the transaction (with a new snapshot id)
// clear the inlined table caches - the rollback undid any table creation from the previous attempt
metadata_manager->ClearInlinedTableCaches();
connection->BeginTransaction();
GetConnection();
snapshot.reset();
}
}
Expand Down
16 changes: 13 additions & 3 deletions src/storage/ducklake_transaction_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,19 @@ ErrorData DuckLakeTransactionManager::CommitTransaction(ClientContext &context,

void DuckLakeTransactionManager::RollbackTransaction(Transaction &transaction) {
auto &ducklake_transaction = transaction.Cast<DuckLakeTransaction>();
ducklake_transaction.Rollback();
lock_guard<mutex> l(transaction_lock);
transactions.erase(transaction);
ErrorData rollback_error;
try {
ducklake_transaction.Rollback();
} catch (std::exception &ex) {
rollback_error = ErrorData(ex);
}
{
lock_guard<mutex> l(transaction_lock);
transactions.erase(transaction);
}
if (rollback_error.HasError()) {
rollback_error.Throw();
}
}

} // namespace duckdb
6 changes: 4 additions & 2 deletions test/configs/sqlite.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@
},{
"reason": "Test only for Postgres as catalog",
"paths": [
"test/sql/metadata/ducklake_settings_postgres.test"
"test/sql/metadata/ducklake_settings_postgres.test",
"test/sql/transaction/postgres_metadata_explicit_rollback_error.test",
"test/sql/transaction/postgres_metadata_rollback_error.test"
]
},
{
Expand All @@ -118,4 +120,4 @@
]
}
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# name: test/sql/transaction/postgres_metadata_explicit_rollback_error.test
# description: Explicit rollback reports metadata rollback failures after cleanup
# group: [transaction]

require ducklake

require parquet

require postgres_scanner

test-env DUCKLAKE_CONNECTION {TEST_DIR}/{UUID}.db

test-env DATA_PATH {TEST_DIR}

statement ok
ATTACH 'dbname=ducklakedb' AS pg (TYPE postgres)

statement ok
CALL postgres_execute('pg', 'DROP SCHEMA IF EXISTS postgres_metadata_explicit_rollback_error CASCADE', use_transaction=false)

statement ok
ATTACH 'ducklake:{DUCKLAKE_CONNECTION} application_name=ducklake_metadata_explicit_rollback_error' AS ducklake (DATA_PATH '{DATA_PATH}/postgres_metadata_explicit_rollback_error', METADATA_SCHEMA 'postgres_metadata_explicit_rollback_error')

statement ok
USE ducklake

statement ok
CREATE TABLE t(i INTEGER)

statement ok
BEGIN

statement ok
INSERT INTO t VALUES (1)

statement ok
CALL postgres_execute('pg', 'SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE application_name = ''ducklake_metadata_explicit_rollback_error''', use_transaction=false)

statement error
ROLLBACK
----
Failed to rollback metadata transaction

statement ok
CREATE TABLE after_rollback(i INTEGER)
50 changes: 50 additions & 0 deletions test/sql/transaction/postgres_metadata_rollback_error.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# name: test/sql/transaction/postgres_metadata_rollback_error.test
# description: Postgres metadata rollback failures are reported when commit cleanup fails
# group: [transaction]

require ducklake

require parquet

require postgres_scanner

test-env DUCKLAKE_CONNECTION {TEST_DIR}/{UUID}.db

test-env DATA_PATH {TEST_DIR}

statement ok
ATTACH 'dbname=ducklakedb' AS pg (TYPE postgres)

statement ok
CALL postgres_execute('pg', 'DROP SCHEMA IF EXISTS postgres_metadata_rollback_error CASCADE', use_transaction=false)

statement ok
ATTACH 'ducklake:{DUCKLAKE_CONNECTION}' AS ducklake (DATA_PATH '{DATA_PATH}/postgres_metadata_rollback_error', METADATA_SCHEMA 'postgres_metadata_rollback_error', DATA_INLINING_ROW_LIMIT 0)

statement ok
USE ducklake

statement ok
CREATE TABLE t(i INTEGER)

statement ok
CALL postgres_execute('pg', 'CREATE SEQUENCE postgres_metadata_rollback_error.disconnect_once_seq', use_transaction=false)

statement ok
CALL postgres_execute('pg', 'CREATE OR REPLACE FUNCTION postgres_metadata_rollback_error.disconnect_once() RETURNS trigger LANGUAGE plpgsql AS $$ BEGIN IF nextval(''postgres_metadata_rollback_error.disconnect_once_seq'') = 1 THEN PERFORM pg_terminate_backend(pg_backend_pid()); END IF; RETURN NEW; END; $$', use_transaction=false)

statement ok
CALL postgres_execute('pg', 'CREATE TRIGGER disconnect_once_before_snapshot_insert BEFORE INSERT ON postgres_metadata_rollback_error.ducklake_snapshot FOR EACH ROW EXECUTE FUNCTION postgres_metadata_rollback_error.disconnect_once()', use_transaction=false)

statement ok
SET ducklake_max_retry_count = 0

statement error
INSERT INTO t VALUES (1), (2), (3)
----
<REGEX>:.*Failed to rollback metadata transaction.*Failed to flush changes into DuckLake.*

query I
SELECT COUNT(*) FROM glob('{DATA_PATH}/postgres_metadata_rollback_error/**/*.parquet')
----
0
Loading