Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/query/ast/src/ast/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,28 @@ impl Display for WithOptions {
}
}

impl WithOptions {
/// Used for build change query.
pub fn to_change_query_with_clause(&self) -> String {
let mut result = String::from(" WITH (");
for (i, (k, v)) in self.options.iter().enumerate() {
if i > 0 {
result.push_str(", ");
}

if k == "consume" {
// The consume stream will be recorded in QueryContext.
// Skip 'consume' to avoid unnecessary operations.
result.push_str("consume = false");
} else {
result.push_str(&format!("{k} = '{v}'"));
}
}
result.push(')');
result
}
}

#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub struct ChangesInterval {
pub append_only: bool,
Expand Down
8 changes: 8 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,14 @@ pub trait TableContext: Send + Sync {
fn add_m_cte_temp_table(&self, database_name: &str, table_name: &str);

async fn drop_m_cte_temp_table(&self) -> Result<()>;

fn add_streams_ref(&self, _catalog: &str, _database: &str, _stream: &str, _consume: bool) {
unimplemented!()
}

fn get_consume_streams(&self, _query: bool) -> Result<Vec<Arc<dyn Table>>> {
unimplemented!()
}
}

pub type AbortChecker = Arc<dyn CheckAbort + Send + Sync>;
Expand Down
35 changes: 3 additions & 32 deletions src/query/service/src/interpreters/common/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::sync::Arc;

use chrono::Utc;
Expand All @@ -24,13 +23,10 @@ use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::UpdateStreamMetaReq;
use databend_common_meta_app::schema::UpdateTableMetaReq;
use databend_common_meta_types::MatchSeq;
use databend_common_sql::MetadataRef;
use databend_common_sql::TableEntry;
use databend_common_storages_factory::Table;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::TableContext;
use databend_common_storages_stream::stream_table::StreamTable;
use databend_common_storages_stream::stream_table::STREAM_ENGINE;
use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_NAME;
use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
Expand All @@ -42,9 +38,8 @@ use crate::sessions::QueryContext;

pub async fn dml_build_update_stream_req(
ctx: Arc<QueryContext>,
metadata: &MetadataRef,
) -> Result<Vec<UpdateStreamMetaReq>> {
let tables = get_stream_table(metadata, |t| t.table().engine() == STREAM_ENGINE)?;
let tables = ctx.get_consume_streams(false)?;
if tables.is_empty() {
return Ok(vec![]);
}
Expand Down Expand Up @@ -96,38 +91,14 @@ pub async fn dml_build_update_stream_req(
Ok(reqs)
}

fn get_stream_table<F>(metadata: &MetadataRef, pred: F) -> Result<Vec<Arc<dyn Table>>>
where F: Fn(&TableEntry) -> bool {
let r_lock = metadata.read();
let tables = r_lock.tables();
let mut streams = vec![];
let mut streams_ids = HashSet::new();
for t in tables {
if pred(t) {
let stream = t.table();

let stream_id = stream.get_table_info().ident.table_id;
if streams_ids.contains(&stream_id) {
continue;
}
streams_ids.insert(stream_id);

streams.push(stream);
}
}
Ok(streams)
}

pub struct StreamTableUpdates {
pub update_table_metas: Vec<(UpdateTableMetaReq, TableInfo)>,
}

pub async fn query_build_update_stream_req(
ctx: &Arc<QueryContext>,
metadata: &MetadataRef,
) -> Result<Option<StreamTableUpdates>> {
let streams = get_stream_table(metadata, |t| {
t.is_consume() && t.table().engine() == STREAM_ENGINE
})?;
let streams = ctx.get_consume_streams(true)?;
if streams.is_empty() {
return Ok(None);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl CopyIntoLocationInterpreter {
false,
)?;

let update_stream_meta = dml_build_update_stream_req(self.ctx.clone(), metadata).await?;
let update_stream_meta = dml_build_update_stream_req(self.ctx.clone()).await?;

Ok((select_interpreter, update_stream_meta))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl CopyIntoTableInterpreter {
v => unreachable!("Input plan must be Query, but it's {}", v),
};

let update_stream_meta = dml_build_update_stream_req(self.ctx.clone(), metadata).await?;
let update_stream_meta = dml_build_update_stream_req(self.ctx.clone()).await?;

let select_interpreter = SelectInterpreter::try_create(
self.ctx.clone(),
Expand Down
3 changes: 1 addition & 2 deletions src/query/service/src/interpreters/interpreter_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,7 @@ impl Interpreter for InsertInterpreter {
.format_pretty()?;
info!("Insert select plan: \n{}", explain_plan);

let update_stream_meta =
dml_build_update_stream_req(self.ctx.clone(), metadata).await?;
let update_stream_meta = dml_build_update_stream_req(self.ctx.clone()).await?;

// here we remove the last exchange merge plan to trigger distribute insert
let insert_select_plan = match (select_plan, table.support_distributed_insert()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ impl Interpreter for InsertMultiTableInterpreter {

impl InsertMultiTableInterpreter {
pub async fn build_physical_plan(&self) -> Result<PhysicalPlan> {
let (mut root, metadata) = self.build_source_physical_plan().await?;
let update_stream_meta = dml_build_update_stream_req(self.ctx.clone(), &metadata).await?;
let (mut root, _) = self.build_source_physical_plan().await?;
let update_stream_meta = dml_build_update_stream_req(self.ctx.clone()).await?;
let source_schema = root.output_schema()?;
let branches = self.build_insert_into_branches().await?;
let serializable_tables = branches
Expand Down
3 changes: 1 addition & 2 deletions src/query/service/src/interpreters/interpreter_mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,7 @@ impl MutationInterpreter {
table_snapshot: Option<Arc<TableSnapshot>>,
) -> Result<MutationBuildInfo> {
let table_info = fuse_table.get_table_info().clone();
let update_stream_meta =
dml_build_update_stream_req(self.ctx.clone(), &mutation.metadata).await?;
let update_stream_meta = dml_build_update_stream_req(self.ctx.clone()).await?;
let partitions = self
.mutation_source_partitions(mutation, fuse_table, table_snapshot.clone())
.await?;
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/interpreter_replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ impl ReplaceInterpreter {
v => unreachable!("Input plan must be Query, but it's {}", v),
};

let update_stream_meta = dml_build_update_stream_req(self.ctx.clone(), metadata).await?;
let update_stream_meta = dml_build_update_stream_req(self.ctx.clone()).await?;

let select_interpreter = SelectInterpreter::try_create(
ctx.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/interpreter_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl SelectInterpreter {
.await?;

// consume stream
let update_stream_metas = query_build_update_stream_req(&self.ctx, &self.metadata).await?;
let update_stream_metas = query_build_update_stream_req(&self.ctx).await?;

let catalog = self.ctx.get_default_catalog()?;
build_res
Expand Down
33 changes: 33 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1511,6 +1511,39 @@ impl TableContext for QueryContext {
m_cte_temp_table.clear();
Ok(())
}

fn add_streams_ref(&self, catalog: &str, database: &str, stream: &str, consume: bool) {
let mut streams = self.shared.streams_refs.write();
let stream_key = (
catalog.to_string(),
database.to_string(),
stream.to_string(),
);
streams
.entry(stream_key)
.and_modify(|v| {
if consume {
*v = true;
}
})
.or_insert(consume);
}

fn get_consume_streams(&self, query: bool) -> Result<Vec<Arc<dyn Table>>> {
let streams_refs = self.shared.streams_refs.read();
let tables = self.shared.tables_refs.lock();
let mut streams_meta = Vec::with_capacity(streams_refs.len());
for (stream_key, consume) in streams_refs.iter() {
if query && !consume {
continue;
}
let stream = tables
.get(stream_key)
.ok_or_else(|| ErrorCode::Internal("It's a bug"))?;
streams_meta.push(stream.clone());
}
Ok(streams_meta)
}
}

impl TrySpawn for QueryContext {
Expand Down
3 changes: 2 additions & 1 deletion src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ pub struct QueryContextShared {
pub(in crate::sessions) running_query_parameterized_hash: Arc<RwLock<Option<String>>>,
pub(in crate::sessions) aborting: Arc<AtomicBool>,
pub(in crate::sessions) tables_refs: Arc<Mutex<HashMap<DatabaseAndTable, Arc<dyn Table>>>>,
pub(in crate::sessions) streams_refs: Arc<RwLock<HashMap<DatabaseAndTable, bool>>>,
pub(in crate::sessions) affect: Arc<Mutex<Option<QueryAffect>>>,
pub(in crate::sessions) catalog_manager: Arc<CatalogManager>,
pub(in crate::sessions) data_operator: DataOperator,
Expand Down Expand Up @@ -168,6 +169,7 @@ impl QueryContextShared {
running_query_parameterized_hash: Arc::new(RwLock::new(None)),
aborting: Arc::new(AtomicBool::new(false)),
tables_refs: Arc::new(Mutex::new(HashMap::new())),
streams_refs: Default::default(),
affect: Arc::new(Mutex::new(None)),
executor: Arc::new(RwLock::new(Weak::new())),
stage_attachment: Arc::new(RwLock::new(None)),
Expand Down Expand Up @@ -337,7 +339,6 @@ impl QueryContextShared {
max_batch_size: Option<u64>,
) -> Result<Arc<dyn Table>> {
// Always get same table metadata in the same query

let table_meta_key = (catalog.to_string(), database.to_string(), table.to_string());

let already_in_cache = { self.tables_refs.lock().contains_key(&table_meta_key) };
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/bind_query/bind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl Binder {
let query_id = self.ctx.get_id();
let database = self.ctx.get_current_database();
let mut table_identifier = cte.alias.name.clone();
table_identifier.name = format!("{}_{}", table_identifier.name, query_id.replace("-", "_"));
table_identifier.name = format!("{}${}", table_identifier.name, query_id.replace("-", ""));
let table_name = normalize_identifier(&table_identifier, &self.name_resolution_ctx).name;
self.m_cte_table_name.insert(
normalize_identifier(&cte.alias.name, &self.name_resolution_ctx).name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Binder {
check_with_opt_valid(with_options)?;
let consume = get_with_opt_consume(with_options)?;
let max_batch_size = get_with_opt_max_batch_size(with_options)?;
let with_opts_str = format!(" {with_options}");
let with_opts_str = with_options.to_change_query_with_clause();
(consume, max_batch_size, with_opts_str)
} else {
(false, None, String::new())
Expand All @@ -74,7 +74,7 @@ impl Binder {
let cte_map = bind_context.cte_context.cte_map.clone();
if let Some(cte_info) = cte_map.get(&table_name) {
if cte_info.materialized {
cte_suffix_name = Some(self.ctx.get_id().replace("-", "_"));
cte_suffix_name = Some(self.ctx.get_id().replace("-", ""));
} else {
if self
.metadata
Expand Down Expand Up @@ -105,7 +105,7 @@ impl Binder {
// Resolve table with catalog
let table_meta = {
let table_name = if let Some(cte_suffix_name) = cte_suffix_name.as_ref() {
format!("{}_{}", &table_name, cte_suffix_name)
format!("{}${}", &table_name, cte_suffix_name)
} else {
table_name.clone()
};
Expand Down Expand Up @@ -161,7 +161,6 @@ impl Binder {
bind_context.view_info.is_some(),
bind_context.planning_agg_index,
false,
consume,
None,
);
let (s_expr, mut bind_context) = self.bind_base_table(
Expand All @@ -186,13 +185,20 @@ impl Binder {
&with_opts_str,
))?;

if table_meta.is_stream() {
self.ctx
.add_streams_ref(&catalog, &database, &table_name, consume);
}
let mut new_bind_context = BindContext::with_parent(Box::new(bind_context.clone()));
let tokens = tokenize_sql(query.as_str())?;
let (stmt, _) = parse_sql(&tokens, self.dialect)?;
let Statement::Query(query) = &stmt else {
unreachable!()
};
let (s_expr, mut new_bind_context) = self.bind_query(&mut new_bind_context, query)?;
bind_context
.cte_context
.set_cte_context(new_bind_context.cte_context.clone());

let cols = table_meta
.schema()
Expand Down Expand Up @@ -240,7 +246,6 @@ impl Binder {
false,
false,
false,
false,
None,
);
let (s_expr, mut new_bind_context) =
Expand Down Expand Up @@ -273,7 +278,6 @@ impl Binder {
bind_context.view_info.is_some(),
bind_context.planning_agg_index,
false,
false,
cte_suffix_name,
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ impl Binder {
false,
false,
false,
false,
None,
);

Expand Down Expand Up @@ -209,7 +208,6 @@ impl Binder {
false,
false,
false,
false,
None,
);

Expand Down
25 changes: 14 additions & 11 deletions src/query/sql/src/planner/binder/binder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use databend_common_ast::parser::parse_sql;
use databend_common_ast::parser::tokenize_sql;
use databend_common_ast::parser::Dialect;
use databend_common_catalog::catalog::CatalogManager;
use databend_common_catalog::query_kind::QueryKind;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
Expand All @@ -48,6 +47,7 @@ use databend_common_metrics::storage::metrics_inc_copy_purge_files_counter;
use databend_common_storage::init_stage_operator;
use databend_storages_common_io::Files;
use databend_storages_common_session::TxnManagerRef;
use databend_storages_common_table_meta::table::is_stream_name;
use log::error;
use log::info;
use log::warn;
Expand Down Expand Up @@ -649,17 +649,20 @@ impl<'a> Binder {
}
};

match plan.kind() {
QueryKind::Query | QueryKind::Explain => {}
match &plan {
Plan::Explain { .. }
| Plan::ExplainAnalyze { .. }
| Plan::ExplainAst { .. }
| Plan::ExplainSyntax { .. }
| Plan::Query { .. } => {}
Plan::CreateTable(plan)
if is_stream_name(&plan.table, self.ctx.get_id().replace("-", "").as_str()) => {}
_ => {
let meta_data_guard = self.metadata.read();
let tables = meta_data_guard.tables();
for t in tables {
if t.is_consume() {
return Err(ErrorCode::SyntaxException(
"WITH CONSUME only allowed in query",
));
}
let consume_streams = self.ctx.get_consume_streams(true)?;
if !consume_streams.is_empty() {
return Err(ErrorCode::SyntaxException(
"WITH CONSUME only allowed in query",
));
}
}
}
Expand Down
Loading
Loading