Skip to content

Commit 1d50bfe

Browse files
authored
fix: StreamVersionMismatched when using CTAS in multi statement trans… (#16889)
fix: StreamVersionMismatched when using CTAS in multi statement transaction
1 parent bb5e2e6 commit 1d50bfe

File tree

10 files changed

+335
-87
lines changed

10 files changed

+335
-87
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/ast/src/ast/statements/statement.rs

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,176 @@ impl Statement {
376376
_ => format!("{}", self),
377377
}
378378
}
379+
380+
pub fn allowed_in_multi_statement(&self) -> bool {
381+
match self {
382+
Statement::Query(..)
383+
| Statement::Explain { .. }
384+
| Statement::ExplainAnalyze { .. }
385+
| Statement::CopyIntoTable(..)
386+
| Statement::CopyIntoLocation(..)
387+
| Statement::Call(..)
388+
| Statement::ShowSettings { .. }
389+
| Statement::ShowProcessList { .. }
390+
| Statement::ShowMetrics { .. }
391+
| Statement::ShowEngines { .. }
392+
| Statement::ShowFunctions { .. }
393+
| Statement::ShowUserFunctions { .. }
394+
| Statement::ShowTableFunctions { .. }
395+
| Statement::ShowIndexes { .. }
396+
| Statement::ShowLocks(..)
397+
| Statement::SetPriority { .. }
398+
| Statement::System(..)
399+
| Statement::KillStmt { .. }
400+
| Statement::SetStmt { .. }
401+
| Statement::UnSetStmt { .. }
402+
| Statement::ShowVariables { .. }
403+
| Statement::SetRole { .. }
404+
| Statement::SetSecondaryRoles { .. }
405+
| Statement::Insert(..)
406+
| Statement::InsertMultiTable(..)
407+
| Statement::Replace(..)
408+
| Statement::MergeInto(..)
409+
| Statement::Delete(..)
410+
| Statement::Update(..)
411+
| Statement::ShowCatalogs(..)
412+
| Statement::ShowCreateCatalog(..)
413+
| Statement::ShowDatabases(..)
414+
| Statement::ShowDropDatabases(..)
415+
| Statement::ShowCreateDatabase(..)
416+
| Statement::UseDatabase { .. }
417+
| Statement::ShowTables(..)
418+
| Statement::ShowCreateTable(..)
419+
| Statement::DescribeTable(..)
420+
| Statement::ShowTablesStatus(..)
421+
| Statement::ShowDropTables(..)
422+
| Statement::OptimizeTable(..)
423+
| Statement::VacuumTable(..)
424+
| Statement::VacuumDropTable(..)
425+
| Statement::VacuumTemporaryFiles(..)
426+
| Statement::AnalyzeTable(..)
427+
| Statement::ExistsTable(..)
428+
| Statement::ShowCreateDictionary(..)
429+
| Statement::ShowDictionaries(..)
430+
| Statement::ShowColumns(..)
431+
| Statement::ShowViews(..)
432+
| Statement::DescribeView(..)
433+
| Statement::ShowStreams(..)
434+
| Statement::DescribeStream(..)
435+
| Statement::RefreshIndex(..)
436+
| Statement::RefreshInvertedIndex(..)
437+
| Statement::RefreshVirtualColumn(..)
438+
| Statement::ShowVirtualColumns(..)
439+
| Statement::ShowUsers
440+
| Statement::DescribeUser { .. }
441+
| Statement::ShowRoles
442+
| Statement::ShowGrants { .. }
443+
| Statement::ShowObjectPrivileges(..)
444+
| Statement::ShowStages
445+
| Statement::DescribeStage { .. }
446+
| Statement::RemoveStage { .. }
447+
| Statement::ListStage { .. }
448+
| Statement::DescribeConnection(..)
449+
| Statement::ShowConnections(..)
450+
| Statement::ShowFileFormats
451+
| Statement::Presign(..)
452+
| Statement::DescDatamaskPolicy(..)
453+
| Statement::DescNetworkPolicy(..)
454+
| Statement::ShowNetworkPolicies
455+
| Statement::DescPasswordPolicy(..)
456+
| Statement::ShowPasswordPolicies { .. }
457+
| Statement::ExecuteTask(..)
458+
| Statement::DescribeTask(..)
459+
| Statement::ShowTasks(..)
460+
| Statement::DescribePipe(..)
461+
| Statement::Begin
462+
| Statement::Commit
463+
| Statement::Abort
464+
| Statement::DescribeNotification(..)
465+
| Statement::ExecuteImmediate(..)
466+
| Statement::ShowProcedures { .. }
467+
| Statement::DescProcedure(..)
468+
| Statement::CallProcedure(..) => true,
469+
470+
Statement::CreateDatabase(..)
471+
| Statement::CreateTable(..)
472+
| Statement::CreateView(..)
473+
| Statement::CreateIndex(..)
474+
| Statement::CreateStage(..)
475+
| Statement::CreateSequence(..)
476+
| Statement::CreateDictionary(..)
477+
| Statement::CreateConnection(..)
478+
| Statement::CreatePipe(..)
479+
| Statement::AlterTable(..)
480+
| Statement::AlterView(..)
481+
| Statement::AlterUser(..)
482+
| Statement::AlterDatabase(..)
483+
| Statement::DropDatabase(..)
484+
| Statement::DropTable(..)
485+
| Statement::DropView(..)
486+
| Statement::DropIndex(..)
487+
| Statement::DropSequence(..)
488+
| Statement::DropDictionary(..)
489+
| Statement::TruncateTable(..)
490+
| Statement::AttachTable(..)
491+
| Statement::RenameTable(..)
492+
| Statement::CreateCatalog(..)
493+
| Statement::DropCatalog(..)
494+
| Statement::UndropDatabase(..)
495+
| Statement::UndropTable(..)
496+
| Statement::RenameDictionary(..)
497+
| Statement::CreateStream(..)
498+
| Statement::DropStream(..)
499+
| Statement::CreateInvertedIndex(..)
500+
| Statement::DropInvertedIndex(..)
501+
| Statement::CreateVirtualColumn(..)
502+
| Statement::AlterVirtualColumn(..)
503+
| Statement::DropVirtualColumn(..)
504+
| Statement::CreateUser(..)
505+
| Statement::DropUser { .. }
506+
| Statement::CreateRole { .. }
507+
| Statement::DropRole { .. }
508+
| Statement::Grant(..)
509+
| Statement::Revoke(..)
510+
| Statement::CreateUDF(..)
511+
| Statement::DropUDF { .. }
512+
| Statement::AlterUDF(..)
513+
| Statement::DropStage { .. }
514+
| Statement::DropConnection(..)
515+
| Statement::CreateFileFormat { .. }
516+
| Statement::DropFileFormat { .. }
517+
| Statement::CreateDatamaskPolicy(..)
518+
| Statement::DropDatamaskPolicy(..)
519+
| Statement::CreateNetworkPolicy(..)
520+
| Statement::AlterNetworkPolicy(..)
521+
| Statement::DropNetworkPolicy(..)
522+
| Statement::CreatePasswordPolicy(..)
523+
| Statement::AlterPasswordPolicy(..)
524+
| Statement::DropPasswordPolicy(..)
525+
| Statement::CreateTask(..)
526+
| Statement::AlterTask(..)
527+
| Statement::DropTask(..)
528+
| Statement::CreateDynamicTable(..)
529+
| Statement::DropPipe(..)
530+
| Statement::AlterPipe(..)
531+
| Statement::CreateNotification(..)
532+
| Statement::AlterNotification(..)
533+
| Statement::DropNotification(..)
534+
| Statement::CreateProcedure(..)
535+
| Statement::DropProcedure(..) => false,
536+
537+
Statement::StatementWithSettings { stmt, settings: _ } => {
538+
stmt.allowed_in_multi_statement()
539+
}
540+
}
541+
}
542+
543+
pub fn is_transaction_command(&self) -> bool {
544+
matches!(
545+
self,
546+
Statement::Commit | Statement::Abort | Statement::Begin
547+
)
548+
}
379549
}
380550

381551
impl Display for Statement {

src/query/service/src/interpreters/interpreter.rs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ use md5::Md5;
5151

5252
use super::hook::vacuum_hook::hook_disk_temp_dir;
5353
use super::hook::vacuum_hook::hook_vacuum_temp_files;
54-
use super::interpreter_txn_commit::CommitInterpreter;
5554
use super::InterpreterMetrics;
5655
use super::InterpreterQueryLog;
5756
use crate::pipelines::executor::ExecutorSettings;
@@ -100,18 +99,7 @@ pub trait Interpreter: Sync + Send {
10099

101100
ctx.set_status_info("building pipeline");
102101
ctx.check_aborting().with_context(make_error)?;
103-
if self.is_ddl() {
104-
CommitInterpreter::try_create(ctx.clone())?
105-
.execute2()
106-
.await?;
107-
ctx.clear_tables_cache();
108-
}
109-
if !self.is_txn_command() && ctx.txn_mgr().lock().is_fail() {
110-
let err = ErrorCode::CurrentTransactionIsAborted(
111-
"current transaction is aborted, commands ignored until end of transaction block",
112-
);
113-
return Err(err);
114-
}
102+
115103
let mut build_res = match self.execute2().await {
116104
Ok(build_res) => build_res,
117105
Err(err) => {

src/query/service/src/interpreters/interpreter_txn_commit.rs

Lines changed: 2 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,11 @@
1414

1515
use std::sync::Arc;
1616

17-
use databend_common_exception::ErrorCode;
1817
use databend_common_exception::Result;
19-
use databend_common_storages_fuse::TableContext;
20-
use databend_storages_common_session::TxnManagerRef;
21-
use log::error;
22-
use log::info;
18+
use databend_common_sql::execute_commit_statement;
2319

2420
use crate::interpreters::Interpreter;
2521
use crate::pipelines::PipelineBuildResult;
26-
use crate::pipelines::PipelineBuilder;
2722
use crate::sessions::QueryContext;
2823
pub struct CommitInterpreter {
2924
ctx: Arc<QueryContext>,
@@ -51,73 +46,7 @@ impl Interpreter for CommitInterpreter {
5146

5247
#[async_backtrace::framed]
5348
async fn execute2(&self) -> Result<PipelineBuildResult> {
54-
// After commit statement, current session should be in auto commit mode, no matter update meta success or not.
55-
// Use this guard to clear txn manager before return.
56-
let _guard = ClearTxnManagerGuard(self.ctx.txn_mgr().clone());
57-
let is_active = self.ctx.txn_mgr().lock().is_active();
58-
if is_active {
59-
let catalog = self.ctx.get_default_catalog()?;
60-
61-
let req = self.ctx.txn_mgr().lock().req();
62-
63-
let update_summary = {
64-
let table_descriptions = req
65-
.update_table_metas
66-
.iter()
67-
.map(|(req, _)| (req.table_id, req.seq, req.new_table_meta.engine.clone()))
68-
.collect::<Vec<_>>();
69-
let stream_descriptions = req
70-
.update_stream_metas
71-
.iter()
72-
.map(|s| (s.stream_id, s.seq, "stream"))
73-
.collect::<Vec<_>>();
74-
(table_descriptions, stream_descriptions)
75-
};
76-
77-
let mismatched_tids = {
78-
self.ctx.txn_mgr().lock().set_auto_commit();
79-
let ret = catalog.retryable_update_multi_table_meta(req).await;
80-
if let Err(ref e) = ret {
81-
// other errors may occur, especially the version mismatch of streams,
82-
// let's log it here for the convenience of diagnostics
83-
error!(
84-
"Non-recoverable fault occurred during updating tables. {}",
85-
e
86-
);
87-
}
88-
ret?
89-
};
90-
91-
match &mismatched_tids {
92-
Ok(_) => {
93-
info!(
94-
"COMMIT: Commit explicit transaction success, targets updated {:?}",
95-
update_summary
96-
);
97-
}
98-
Err(e) => {
99-
let err_msg = format!(
100-
"COMMIT: Table versions mismatched in multi statement transaction, conflict tables: {:?}",
101-
e.iter()
102-
.map(|(tid, seq, meta)| (tid, seq, &meta.engine))
103-
.collect::<Vec<_>>()
104-
);
105-
return Err(ErrorCode::TableVersionMismatched(err_msg));
106-
}
107-
}
108-
let need_purge_files = self.ctx.txn_mgr().lock().need_purge_files();
109-
for (stage_info, files) in need_purge_files {
110-
PipelineBuilder::try_purge_files(self.ctx.clone(), &stage_info, &files).await;
111-
}
112-
}
49+
execute_commit_statement(self.ctx.clone()).await?;
11350
Ok(PipelineBuildResult::create())
11451
}
11552
}
116-
117-
struct ClearTxnManagerGuard(TxnManagerRef);
118-
119-
impl Drop for ClearTxnManagerGuard {
120-
fn drop(&mut self) {
121-
self.0.lock().clear();
122-
}
123-
}

src/query/service/tests/it/sql/exec/get_table_bind_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,7 @@ impl TableContext for CtxDelegation {
499499
}
500500

501501
fn txn_mgr(&self) -> TxnManagerRef {
502-
todo!()
502+
self.ctx.txn_mgr()
503503
}
504504

505505
fn incr_total_scan_value(&self, _value: ProgressValues) {

src/query/sql/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ databend-common-storages-view = { workspace = true }
3636
databend-common-users = { workspace = true }
3737
databend-enterprise-data-mask-feature = { workspace = true }
3838
databend-storages-common-cache = { workspace = true }
39+
databend-storages-common-io = { workspace = true }
40+
databend-storages-common-session = { workspace = true }
3941
databend-storages-common-table-meta = { workspace = true }
4042

4143
ahash = { workspace = true, features = ["no-rng"] }

0 commit comments

Comments
 (0)