Skip to content

refactor(frontend): rearrange column binding steps in create table handler #19917

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ use crate::error::ErrorCode::{self, Deprecated, InvalidInputSyntax, NotSupported
use crate::error::{Result, RwError};
use crate::expr::Expr;
use crate::handler::create_table::{
bind_pk_and_row_id_on_relation, bind_sql_column_constraints, bind_sql_columns,
bind_sql_pk_names, bind_table_constraints, ColumnIdGenerator,
bind_pk_and_row_id_on_relation, bind_sql_columns,
bind_sql_columns_generated_and_default_constraints, bind_sql_pk_names, bind_table_constraints,
ColumnIdGenerator,
};
use crate::handler::util::{
check_connector_match_connection_type, ensure_connection_type_allowed, SourceSchemaCompatExt,
Expand Down Expand Up @@ -746,6 +747,14 @@ pub async fn bind_create_source_or_table_with_connector(
}
debug_assert_column_ids_distinct(&columns);

bind_sql_columns_generated_and_default_constraints(
session,
source_name.clone(),
&mut columns,
// TODO(st1page): pass the ref
sql_columns_defs.to_vec(),
)?;

let must_need_pk = if is_create_source {
with_properties.connector_need_pk()
} else {
Expand All @@ -756,22 +765,14 @@ pub async fn bind_create_source_or_table_with_connector(
true
};

let (mut columns, pk_col_ids, row_id_index) =
let (columns, pk_col_ids, row_id_index) =
bind_pk_and_row_id_on_relation(columns, pk_names, must_need_pk)?;

let watermark_descs =
bind_source_watermark(session, source_name.clone(), source_watermarks, &columns)?;
// TODO(yuhao): allow multiple watermark on source.
assert!(watermark_descs.len() <= 1);

bind_sql_column_constraints(
session,
source_name.clone(),
&mut columns,
// TODO(st1page): pass the ref
sql_columns_defs.to_vec(),
&pk_col_ids,
)?;
check_format_encode(&with_properties, row_id_index, &columns).await?;

let definition = handler_args.normalized_sql.clone();
Expand Down
144 changes: 66 additions & 78 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,50 +261,15 @@ pub fn bind_sql_columns(column_defs: &[ColumnDef]) -> Result<Vec<ColumnCatalog>>
Ok(columns)
}

fn check_generated_column_constraints(
column_name: &String,
column_id: ColumnId,
expr: &ExprImpl,
column_catalogs: &[ColumnCatalog],
generated_column_names: &[String],
pk_column_ids: &[ColumnId],
) -> Result<()> {
let input_refs = expr.collect_input_refs(column_catalogs.len());
for idx in input_refs.ones() {
let referred_generated_column = &column_catalogs[idx].column_desc.name;
if generated_column_names
.iter()
.any(|c| c == referred_generated_column)
{
return Err(ErrorCode::BindError(format!(
"Generated can not reference another generated column. \
But here generated column \"{}\" referenced another generated column \"{}\"",
column_name, referred_generated_column
))
.into());
}
}

if pk_column_ids.contains(&column_id) && expr.is_impure() {
return Err(ErrorCode::BindError(format!(
"Generated columns with impure expressions should not be part of the primary key. \
Here column \"{}\" is defined as part of the primary key.",
column_name
))
.into());
}

Ok(())
}

/// Binds constraints that can be only specified in column definitions,
/// currently generated columns and default columns.
pub fn bind_sql_column_constraints(
///
/// `generated_or_default_column` field in [`ColumnDesc`] will be set.
pub fn bind_sql_columns_generated_and_default_constraints(
session: &SessionImpl,
table_name: String,
column_catalogs: &mut [ColumnCatalog],
columns: Vec<ColumnDef>,
pk_column_ids: &[ColumnId],
) -> Result<()> {
let generated_column_names = {
let mut names = vec![];
Expand Down Expand Up @@ -336,14 +301,19 @@ pub fn bind_sql_column_constraints(
)
})?;

check_generated_column_constraints(
&column.name.real_value(),
column_catalogs[idx].column_id(),
&expr_impl,
column_catalogs,
&generated_column_names,
pk_column_ids,
)?;
// Check if generated column references another generated column.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This inlines the function.

let input_refs = expr_impl.collect_input_refs(column_catalogs.len());
for idx in input_refs.ones() {
let referred_generated_column = &column_catalogs[idx].column_desc.name;
if generated_column_names.contains(referred_generated_column) {
return Err(ErrorCode::BindError(format!(
"Generated can not reference another generated column. \
But here generated column \"{}\" referenced another generated column \"{}\"",
column.name.real_value(), referred_generated_column
))
.into());
}
}

column_catalogs[idx].column_desc.generated_or_default_column = Some(
GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
Expand Down Expand Up @@ -444,8 +414,14 @@ fn multiple_pk_definition_err() -> RwError {

/// Binds primary keys defined in SQL.
///
/// It returns the columns together with `pk_column_ids`, and an optional row id column index if
/// added.
/// If `must_need_pk` is true and no primary key is specified, a `_row_id` column is added to the
/// given `columns` vec as the primary key.
///
/// Returns the (maybe) updated columns together with `pk_column_ids`, and an optional `_row_id`
/// column index if added.
///
/// Should be called after calling [`bind_sql_columns_generated_and_default_constraints`] so that
/// it can check whether a generated column is valid for being part of the primary key, if any.
pub fn bind_pk_and_row_id_on_relation(
mut columns: Vec<ColumnCatalog>,
pk_names: Vec<String>,
Expand Down Expand Up @@ -487,6 +463,23 @@ pub fn bind_pk_and_row_id_on_relation(
)))?;
}

// Check if impure generated columns are part of the primary key.
for column in &columns {
Comment on lines +466 to +467
Copy link
Member Author

@BugenZhao BugenZhao Dec 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This step used to be in bind_sql_column_constraints, now moved into bind_pk_and_row_id_on_relation as a post check.

let Some(expr) = column.generated_expr() else {
continue;
};
let expr = ExprImpl::from_expr_proto(&expr)?;

if pk_column_ids.contains(&column.column_id()) && expr.is_impure() {
return Err(ErrorCode::BindError(format!(
"Generated columns with impure expressions should not be part of the primary key. \
Here column \"{}\" is defined as part of the primary key.",
column.name()
))
.into());
}
}

Ok((columns, pk_column_ids, row_id_index))
}

Expand Down Expand Up @@ -568,7 +561,6 @@ pub(crate) async fn gen_create_table_plan_with_source(

/// `gen_create_table_plan` generates the plan for creating a table without an external stream
/// source.
#[allow(clippy::too_many_arguments)]
pub(crate) fn gen_create_table_plan(
context: OptimizerContext,
table_name: ObjectName,
Expand All @@ -583,53 +575,50 @@ pub(crate) fn gen_create_table_plan(
c.column_desc.column_id = col_id_gen.generate(&*c)?;
}

bind_sql_columns_generated_and_default_constraints(
context.session_ctx(),
table_name.real_value(),
&mut columns,
column_defs.clone(),
)?;

let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
if !secret_refs.is_empty() || !connection_refs.is_empty() {
return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
}

let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;

let watermark_descs = bind_source_watermark(
context.session_ctx(),
table_name.real_value(),
source_watermarks,
&columns,
)?;

gen_create_table_plan_without_source(
context,
table_name,
columns,
column_defs,
constraints,
source_watermarks,
pk_names,
watermark_descs,
col_id_gen.into_version(),
props,
)
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn gen_create_table_plan_without_source(
context: OptimizerContext,
table_name: ObjectName,
columns: Vec<ColumnCatalog>,
column_defs: Vec<ColumnDef>,
constraints: Vec<TableConstraint>,
source_watermarks: Vec<SourceWatermark>,
pk_names: Vec<String>,
watermark_descs: Vec<WatermarkDesc>,
version: TableVersion,
props: CreateTableProps,
) -> Result<(PlanRef, PbTable)> {
// XXX: Why not bind outside?
let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
let (mut columns, pk_column_ids, row_id_index) =
let (columns, pk_column_ids, row_id_index) =
bind_pk_and_row_id_on_relation(columns, pk_names, true)?;

let watermark_descs: Vec<WatermarkDesc> = bind_source_watermark(
context.session_ctx(),
table_name.real_value(),
source_watermarks,
&columns,
)?;

bind_sql_column_constraints(
context.session_ctx(),
table_name.real_value(),
&mut columns,
column_defs,
&pk_column_ids,
)?;
let session = context.session_ctx().clone();

let db_name = session.database();
Expand Down Expand Up @@ -857,18 +846,17 @@ pub(crate) fn gen_create_table_plan_for_cdc_table(
c.column_desc.column_id = col_id_gen.generate(&*c)?;
}

let (mut columns, pk_column_ids, _row_id_index) =
bind_pk_and_row_id_on_relation(columns, pk_names, true)?;

// NOTES: In auto schema change, default value is not provided in column definition.
bind_sql_column_constraints(
bind_sql_columns_generated_and_default_constraints(
context.session_ctx(),
table_name.real_value(),
&mut columns,
column_defs,
&pk_column_ids,
)?;

let (columns, pk_column_ids, _row_id_index) =
bind_pk_and_row_id_on_relation(columns, pk_names, true)?;

let definition = context.normalized_sql().to_owned();

let pk_column_indices = {
Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/handler/create_table_as.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ pub async fn handle_create_as(
context,
table_name.clone(),
columns,
vec![],
vec![],
vec![], // No primary key, will generate a hidden `row_id` column.
vec![], // No watermark should be defined in for `CREATE TABLE AS`
col_id_gen.into_version(),
CreateTableProps {
Expand Down
Loading