diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index ee9f61da27358..8e896aeb07adf 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -200,6 +200,7 @@ wiremock = { workspace = true } [build-dependencies] databend-common-building = { workspace = true } +walkdir = { workspace = true } [lints] workspace = true diff --git a/src/query/service/build.rs b/src/query/service/build.rs index aaf4c53f857c6..7a2890be45739 100644 --- a/src/query/service/build.rs +++ b/src/query/service/build.rs @@ -13,6 +13,10 @@ // limitations under the License. use std::env; +use std::fs; +use std::io::Write; +use std::path::Path; +use std::path::PathBuf; fn main() { // Keep build script rerun behavior explicit for the feature list we expose. @@ -49,4 +53,169 @@ fn main() { let features = features.join(","); println!("cargo:rustc-env=DATABEND_QUERY_CARGO_FEATURES={features}"); + + let manifest_dir = env::var("CARGO_MANIFEST_DIR").expect("CARGO_MANIFEST_DIR not set"); + let src_dir = Path::new(&manifest_dir).join("src"); + let out_dir = PathBuf::from(env::var("OUT_DIR").expect("OUT_DIR not set")); + + let mut entries = collect_impls(&src_dir); + entries.sort_by(|a, b| a.1.cmp(&b.1)); + + let impls_out = out_dir.join("physical_plan_impls.rs"); + write_impls(&impls_out, &entries); + + let dispatch_out = out_dir.join("physical_plan_dispatch.rs"); + write_dispatch(&dispatch_out, &entries); + + println!("cargo:rerun-if-changed={}", src_dir.display()); +} + +fn collect_impls(src_dir: &Path) -> Vec<(Option, String, String)> { + let mut entries = Vec::new(); + + for entry in walkdir::WalkDir::new(src_dir) + .into_iter() + .filter_map(Result::ok) + .filter(|e| e.file_type().is_file()) + { + if entry.path().extension().and_then(|s| s.to_str()) != Some("rs") { + continue; + } + + let content = fs::read_to_string(entry.path()) + .unwrap_or_else(|e| panic!("read {}: {e}", entry.path().display())); + + for line in content.lines() { + if let Some(e) = parse_impl_line(src_dir, entry.path(), line) { + entries.push(e); + } + } + } + + entries +} + +fn parse_impl_line( + src_root: &Path, + path: &Path, + line: &str, +) -> Option<(Option, String, String)> { + // match `impl ... IPhysicalPlan for Type ... {` + let marker = "IPhysicalPlan for"; + let idx = line.find(marker)?; + let after = &line[idx + marker.len()..]; + let type_part = after + .split_whitespace() + .next() + .unwrap_or("") + .trim_end_matches('{') + .trim_end_matches(';'); + if type_part.is_empty() { + return None; + } + + let type_name = type_part + .split("::") + .last() + .unwrap_or(type_part) + .split('<') + .next() + .unwrap_or(type_part) + .to_string(); + + if type_name == "PhysicalPlanDeserialize" { + return None; + } + + let mut module_path = module_path_from_file(src_root, path); + let mut meta = None; + + if type_name == "StackDepthPlan" && path.ends_with("physical_plan.rs") { + module_path.push_str("::tests"); + meta = Some("#[cfg(test)]".to_string()); + } + + let full_path = format!("{module_path}::{type_part}"); + + Some((meta, type_name, full_path)) +} + +fn module_path_from_file(src_root: &Path, path: &Path) -> String { + let mut module_path = String::from("crate"); + let rel = path + .strip_prefix(src_root) + .unwrap_or(path) + .with_extension(""); + for comp in rel.components() { + let c = comp.as_os_str().to_string_lossy(); + module_path.push_str("::"); + module_path.push_str(&c); + } + module_path +} + +fn write_impls(out_path: &Path, entries: &[(Option, String, String)]) { + let mut out = String::new(); + out.push_str("define_physical_plan_serde!(\n"); + + for (meta, variant, path) in entries { + if let Some(meta) = meta { + out.push_str(" "); + out.push_str(meta); + out.push('\n'); + } + out.push_str(" "); + out.push_str(variant); + out.push_str(" => "); + out.push_str(path); + out.push_str(",\n"); + } + + out.push_str(");\n"); + + let mut file = fs::File::create(out_path).expect("create physical_plan_impls.rs"); + file.write_all(out.as_bytes()) + .expect("write physical_plan_impls.rs"); +} + +fn write_dispatch(out_path: &Path, entries: &[(Option, String, String)]) { + let mut out = String::new(); + + out.push_str("macro_rules! dispatch_plan_ref {\n"); + out.push_str(" ($s:expr, $plan:ident => $body:expr) => {\n"); + out.push_str(" match $s {\n"); + for (meta, variant, _) in entries { + if let Some(meta) = meta { + out.push_str(" "); + out.push_str(meta); + out.push('\n'); + } + out.push_str(" PhysicalPlanDeserialize::"); + out.push_str(variant); + out.push_str("($plan) => $body,\n"); + } + out.push_str(" }\n"); + out.push_str(" };\n"); + out.push_str("}\n\n"); + + out.push_str("macro_rules! dispatch_plan_mut {\n"); + out.push_str(" ($s:expr, $plan:ident => $body:expr) => {\n"); + out.push_str(" match $s {\n"); + for (meta, variant, _) in entries { + if let Some(meta) = meta { + out.push_str(" "); + out.push_str(meta); + out.push('\n'); + } + out.push_str(" PhysicalPlanDeserialize::"); + out.push_str(variant); + out.push_str("($plan) => $body,\n"); + } + out.push_str(" }\n"); + out.push_str(" };\n"); + out.push_str("}\n"); + + let mut file = fs::File::create(out_path).expect("create physical_plan_dispatch.rs"); + file.write_all(out.as_bytes()) + .expect("write physical_plan_dispatch.rs"); } diff --git a/src/query/service/src/physical_plans/physical_add_stream_column.rs b/src/query/service/src/physical_plans/physical_add_stream_column.rs index 6b91b0703470b..be64e12bdb81e 100644 --- a/src/query/service/src/physical_plans/physical_add_stream_column.rs +++ b/src/query/service/src/physical_plans/physical_add_stream_column.rs @@ -55,7 +55,6 @@ pub struct AddStreamColumn { pub stream_columns: Vec, } -#[typetag::serde] impl IPhysicalPlan for AddStreamColumn { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_aggregate_expand.rs b/src/query/service/src/physical_plans/physical_aggregate_expand.rs index 9f5a6ce5069d2..bc89ea8d4a84f 100644 --- a/src/query/service/src/physical_plans/physical_aggregate_expand.rs +++ b/src/query/service/src/physical_plans/physical_aggregate_expand.rs @@ -45,7 +45,6 @@ pub struct AggregateExpand { pub stat_info: Option, } -#[typetag::serde] impl IPhysicalPlan for AggregateExpand { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_aggregate_final.rs b/src/query/service/src/physical_plans/physical_aggregate_final.rs index a87b6b6b8cc6f..ae2c7ed5d43aa 100644 --- a/src/query/service/src/physical_plans/physical_aggregate_final.rs +++ b/src/query/service/src/physical_plans/physical_aggregate_final.rs @@ -64,7 +64,6 @@ pub struct AggregateFinal { pub stat_info: Option, } -#[typetag::serde] impl IPhysicalPlan for AggregateFinal { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_aggregate_partial.rs b/src/query/service/src/physical_plans/physical_aggregate_partial.rs index fbaeb0157d5a1..e041e64ca2480 100644 --- a/src/query/service/src/physical_plans/physical_aggregate_partial.rs +++ b/src/query/service/src/physical_plans/physical_aggregate_partial.rs @@ -67,7 +67,6 @@ pub struct AggregatePartial { pub stat_info: Option, } -#[typetag::serde] impl IPhysicalPlan for AggregatePartial { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_async_func.rs b/src/query/service/src/physical_plans/physical_async_func.rs index d3595c2436225..fb2a49b4a41b9 100644 --- a/src/query/service/src/physical_plans/physical_async_func.rs +++ b/src/query/service/src/physical_plans/physical_async_func.rs @@ -46,7 +46,6 @@ pub struct AsyncFunction { pub stat_info: Option, } -#[typetag::serde] impl IPhysicalPlan for AsyncFunction { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_broadcast.rs b/src/query/service/src/physical_plans/physical_broadcast.rs index 88f961c5103ed..369c2f66a16c3 100644 --- a/src/query/service/src/physical_plans/physical_broadcast.rs +++ b/src/query/service/src/physical_plans/physical_broadcast.rs @@ -35,7 +35,6 @@ pub struct BroadcastSource { pub broadcast_id: u32, } -#[typetag::serde] impl IPhysicalPlan for BroadcastSource { fn as_any(&self) -> &dyn Any { self @@ -75,7 +74,6 @@ pub struct BroadcastSink { pub input: PhysicalPlan, } -#[typetag::serde] impl IPhysicalPlan for BroadcastSink { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_cache_scan.rs b/src/query/service/src/physical_plans/physical_cache_scan.rs index 6988ab483fbdf..8ce47add85ef6 100644 --- a/src/query/service/src/physical_plans/physical_cache_scan.rs +++ b/src/query/service/src/physical_plans/physical_cache_scan.rs @@ -41,7 +41,6 @@ pub struct CacheScan { pub output_schema: DataSchemaRef, } -#[typetag::serde] impl IPhysicalPlan for CacheScan { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_column_mutation.rs b/src/query/service/src/physical_plans/physical_column_mutation.rs index c1bb9b709ec8b..791fb423920c1 100644 --- a/src/query/service/src/physical_plans/physical_column_mutation.rs +++ b/src/query/service/src/physical_plans/physical_column_mutation.rs @@ -52,7 +52,6 @@ pub struct ColumnMutation { pub udf_col_num: usize, } -#[typetag::serde] impl IPhysicalPlan for ColumnMutation { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_commit_sink.rs b/src/query/service/src/physical_plans/physical_commit_sink.rs index c8f584484b0f0..ac4928807af7e 100644 --- a/src/query/service/src/physical_plans/physical_commit_sink.rs +++ b/src/query/service/src/physical_plans/physical_commit_sink.rs @@ -58,7 +58,6 @@ pub struct CommitSink { pub recluster_info: Option, } -#[typetag::serde] impl IPhysicalPlan for CommitSink { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_compact_source.rs b/src/query/service/src/physical_plans/physical_compact_source.rs index 52fa41a4b8533..50c4723f28dac 100644 --- a/src/query/service/src/physical_plans/physical_compact_source.rs +++ b/src/query/service/src/physical_plans/physical_compact_source.rs @@ -59,7 +59,6 @@ pub struct CompactSource { pub table_meta_timestamps: TableMetaTimestamps, } -#[typetag::serde] impl IPhysicalPlan for CompactSource { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_constant_table_scan.rs b/src/query/service/src/physical_plans/physical_constant_table_scan.rs index fd9865d9e3c8b..86b18bc3254b6 100644 --- a/src/query/service/src/physical_plans/physical_constant_table_scan.rs +++ b/src/query/service/src/physical_plans/physical_constant_table_scan.rs @@ -38,7 +38,6 @@ pub struct ConstantTableScan { pub output_schema: DataSchemaRef, } -#[typetag::serde] impl IPhysicalPlan for ConstantTableScan { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_copy_into_location.rs b/src/query/service/src/physical_plans/physical_copy_into_location.rs index 982beaa49d70c..ee4ed64edb7ad 100644 --- a/src/query/service/src/physical_plans/physical_copy_into_location.rs +++ b/src/query/service/src/physical_plans/physical_copy_into_location.rs @@ -45,7 +45,6 @@ pub struct CopyIntoLocation { pub info: CopyIntoLocationInfo, } -#[typetag::serde] impl IPhysicalPlan for CopyIntoLocation { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_copy_into_table.rs b/src/query/service/src/physical_plans/physical_copy_into_table.rs index a5b3bdaa34e3a..8e45028be76d1 100644 --- a/src/query/service/src/physical_plans/physical_copy_into_table.rs +++ b/src/query/service/src/physical_plans/physical_copy_into_table.rs @@ -51,7 +51,6 @@ pub struct CopyIntoTable { pub table_meta_timestamps: TableMetaTimestamps, } -#[typetag::serde] impl IPhysicalPlan for CopyIntoTable { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_cte_consumer.rs b/src/query/service/src/physical_plans/physical_cte_consumer.rs index 9d221cf5fee05..6304d2563db06 100644 --- a/src/query/service/src/physical_plans/physical_cte_consumer.rs +++ b/src/query/service/src/physical_plans/physical_cte_consumer.rs @@ -42,7 +42,6 @@ pub struct MaterializeCTERef { pub meta: PhysicalPlanMeta, } -#[typetag::serde] impl IPhysicalPlan for MaterializeCTERef { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_distributed_insert_select.rs b/src/query/service/src/physical_plans/physical_distributed_insert_select.rs index 3c7280778f4a6..66a628e74a86a 100644 --- a/src/query/service/src/physical_plans/physical_distributed_insert_select.rs +++ b/src/query/service/src/physical_plans/physical_distributed_insert_select.rs @@ -40,7 +40,6 @@ pub struct DistributedInsertSelect { pub table_meta_timestamps: TableMetaTimestamps, } -#[typetag::serde] impl IPhysicalPlan for DistributedInsertSelect { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_eval_scalar.rs b/src/query/service/src/physical_plans/physical_eval_scalar.rs index dfa77296fbf31..7dd418ee01e7a 100644 --- a/src/query/service/src/physical_plans/physical_eval_scalar.rs +++ b/src/query/service/src/physical_plans/physical_eval_scalar.rs @@ -65,7 +65,6 @@ pub struct EvalScalar { pub stat_info: Option, } -#[typetag::serde] impl IPhysicalPlan for EvalScalar { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_exchange.rs b/src/query/service/src/physical_plans/physical_exchange.rs index ba19faecbb359..62ee27d393daa 100644 --- a/src/query/service/src/physical_plans/physical_exchange.rs +++ b/src/query/service/src/physical_plans/physical_exchange.rs @@ -41,7 +41,6 @@ pub struct Exchange { pub allow_adjust_parallelism: bool, } -#[typetag::serde] impl IPhysicalPlan for Exchange { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_exchange_sink.rs b/src/query/service/src/physical_plans/physical_exchange_sink.rs index 17f2e3b51d9e5..5a30fb0bff35a 100644 --- a/src/query/service/src/physical_plans/physical_exchange_sink.rs +++ b/src/query/service/src/physical_plans/physical_exchange_sink.rs @@ -45,7 +45,6 @@ pub struct ExchangeSink { pub allow_adjust_parallelism: bool, } -#[typetag::serde] impl IPhysicalPlan for ExchangeSink { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_exchange_source.rs b/src/query/service/src/physical_plans/physical_exchange_source.rs index 79076f92fb2b4..89421c7650d73 100644 --- a/src/query/service/src/physical_plans/physical_exchange_source.rs +++ b/src/query/service/src/physical_plans/physical_exchange_source.rs @@ -37,7 +37,6 @@ pub struct ExchangeSource { pub query_id: String, } -#[typetag::serde] impl IPhysicalPlan for ExchangeSource { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_expression_scan.rs b/src/query/service/src/physical_plans/physical_expression_scan.rs index 8d13ffbe9ed7f..a92073ec66b74 100644 --- a/src/query/service/src/physical_plans/physical_expression_scan.rs +++ b/src/query/service/src/physical_plans/physical_expression_scan.rs @@ -41,7 +41,6 @@ pub struct ExpressionScan { pub output_schema: DataSchemaRef, } -#[typetag::serde] impl IPhysicalPlan for ExpressionScan { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_filter.rs b/src/query/service/src/physical_plans/physical_filter.rs index d9d0f15d9edff..ca63475d6e1c5 100644 --- a/src/query/service/src/physical_plans/physical_filter.rs +++ b/src/query/service/src/physical_plans/physical_filter.rs @@ -48,7 +48,6 @@ pub struct Filter { pub stat_info: Option, } -#[typetag::serde] impl IPhysicalPlan for Filter { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_hash_join.rs b/src/query/service/src/physical_plans/physical_hash_join.rs index 9a2e5802f105f..979b594c47aa0 100644 --- a/src/query/service/src/physical_plans/physical_hash_join.rs +++ b/src/query/service/src/physical_plans/physical_hash_join.rs @@ -142,7 +142,6 @@ pub struct HashJoin { pub broadcast_id: Option, } -#[typetag::serde] impl IPhysicalPlan for HashJoin { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_limit.rs b/src/query/service/src/physical_plans/physical_limit.rs index 33dd52a60d495..b752fc2c5ebd5 100644 --- a/src/query/service/src/physical_plans/physical_limit.rs +++ b/src/query/service/src/physical_plans/physical_limit.rs @@ -38,7 +38,7 @@ use crate::pipelines::PipelineBuilder; #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct Limit { - meta: PhysicalPlanMeta, + pub(crate) meta: PhysicalPlanMeta, pub input: PhysicalPlan, pub limit: Option, pub offset: usize, @@ -47,7 +47,6 @@ pub struct Limit { pub stat_info: Option, } -#[typetag::serde] impl IPhysicalPlan for Limit { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_materialized_cte.rs b/src/query/service/src/physical_plans/physical_materialized_cte.rs index fc6f6b476db38..85aa1591ef68a 100644 --- a/src/query/service/src/physical_plans/physical_materialized_cte.rs +++ b/src/query/service/src/physical_plans/physical_materialized_cte.rs @@ -44,7 +44,6 @@ pub struct MaterializedCTE { pub meta: PhysicalPlanMeta, } -#[typetag::serde] impl IPhysicalPlan for MaterializedCTE { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_multi_table_insert.rs b/src/query/service/src/physical_plans/physical_multi_table_insert.rs index d9959e5b95a11..97177e4c03e75 100644 --- a/src/query/service/src/physical_plans/physical_multi_table_insert.rs +++ b/src/query/service/src/physical_plans/physical_multi_table_insert.rs @@ -59,7 +59,6 @@ pub struct Duplicate { pub n: usize, } -#[typetag::serde] impl IPhysicalPlan for Duplicate { fn as_any(&self) -> &dyn Any { self @@ -108,7 +107,6 @@ pub struct Shuffle { pub strategy: ShuffleStrategy, } -#[typetag::serde] impl IPhysicalPlan for Shuffle { fn as_any(&self) -> &dyn Any { self @@ -191,7 +189,6 @@ pub struct ChunkFilter { pub predicates: Vec>, } -#[typetag::serde] impl IPhysicalPlan for ChunkFilter { fn as_any(&self) -> &dyn Any { self @@ -256,7 +253,6 @@ pub struct ChunkEvalScalar { pub eval_scalars: Vec>, } -#[typetag::serde] impl IPhysicalPlan for ChunkEvalScalar { fn as_any(&self) -> &dyn Any { self @@ -328,7 +324,6 @@ pub struct ChunkCastSchema { pub cast_schemas: Vec>, } -#[typetag::serde] impl IPhysicalPlan for ChunkCastSchema { fn as_any(&self) -> &dyn Any { self @@ -401,7 +396,6 @@ pub struct ChunkFillAndReorder { pub fill_and_reorders: Vec>, } -#[typetag::serde] impl IPhysicalPlan for ChunkFillAndReorder { fn as_any(&self) -> &dyn Any { self @@ -478,7 +472,6 @@ pub struct ChunkAppendData { pub target_tables: Vec, } -#[typetag::serde] impl IPhysicalPlan for ChunkAppendData { fn as_any(&self) -> &dyn Any { self @@ -637,7 +630,6 @@ pub struct ChunkMerge { pub group_ids: Vec, } -#[typetag::serde] impl IPhysicalPlan for ChunkMerge { fn as_any(&self) -> &dyn Any { self @@ -709,7 +701,6 @@ pub struct ChunkCommitInsert { pub targets: Vec, } -#[typetag::serde] impl IPhysicalPlan for ChunkCommitInsert { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_mutation.rs b/src/query/service/src/physical_plans/physical_mutation.rs index 8ce678a6d1c5a..44233c54749ce 100644 --- a/src/query/service/src/physical_plans/physical_mutation.rs +++ b/src/query/service/src/physical_plans/physical_mutation.rs @@ -108,7 +108,6 @@ pub struct Mutation { pub table_meta_timestamps: TableMetaTimestamps, } -#[typetag::serde] impl IPhysicalPlan for Mutation { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_mutation_into_organize.rs b/src/query/service/src/physical_plans/physical_mutation_into_organize.rs index b5770451d56bf..109008955e1b3 100644 --- a/src/query/service/src/physical_plans/physical_mutation_into_organize.rs +++ b/src/query/service/src/physical_plans/physical_mutation_into_organize.rs @@ -31,7 +31,6 @@ pub struct MutationOrganize { pub strategy: MutationStrategy, } -#[typetag::serde] impl IPhysicalPlan for MutationOrganize { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_mutation_into_split.rs b/src/query/service/src/physical_plans/physical_mutation_into_split.rs index e86c70641ab64..ae1386499e434 100644 --- a/src/query/service/src/physical_plans/physical_mutation_into_split.rs +++ b/src/query/service/src/physical_plans/physical_mutation_into_split.rs @@ -33,7 +33,6 @@ pub struct MutationSplit { pub split_index: IndexType, } -#[typetag::serde] impl IPhysicalPlan for MutationSplit { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_mutation_manipulate.rs b/src/query/service/src/physical_plans/physical_mutation_manipulate.rs index cad8392e9133a..097be7b288812 100644 --- a/src/query/service/src/physical_plans/physical_mutation_manipulate.rs +++ b/src/query/service/src/physical_plans/physical_mutation_manipulate.rs @@ -54,7 +54,6 @@ pub struct MutationManipulate { pub target_table_index: usize, } -#[typetag::serde] impl IPhysicalPlan for MutationManipulate { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_mutation_source.rs b/src/query/service/src/physical_plans/physical_mutation_source.rs index 68ab0c632f610..44f725a28654a 100644 --- a/src/query/service/src/physical_plans/physical_mutation_source.rs +++ b/src/query/service/src/physical_plans/physical_mutation_source.rs @@ -74,7 +74,6 @@ pub struct MutationSource { pub statistics: PartStatistics, } -#[typetag::serde] impl IPhysicalPlan for MutationSource { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_plan.rs b/src/query/service/src/physical_plans/physical_plan.rs index 27cc04c134d07..d5df2445057f0 100644 --- a/src/query/service/src/physical_plans/physical_plan.rs +++ b/src/query/service/src/physical_plans/physical_plan.rs @@ -16,8 +16,6 @@ use std::any::Any; use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; -use std::ops::Deref; -use std::ops::DerefMut; use std::sync::Arc; use databend_common_ast::ast::FormatTreeNode; @@ -33,6 +31,8 @@ use databend_common_sql::Metadata; use dyn_clone::DynClone; use serde::Deserializer; use serde::Serializer; +use serde::de::Error as DeError; +use serde_json::Value as JsonValue; use crate::physical_plans::ExchangeSink; use crate::physical_plans::MutationSource; @@ -70,8 +70,23 @@ pub trait DeriveHandle: Send + Sync + 'static { ) -> std::result::Result>; } -#[typetag::serde] -pub trait IPhysicalPlan: DynClone + Debug + Send + Sync + 'static { +pub(crate) trait PhysicalPlanSerdeSerialization { + fn to_physical_plan_serde_serialize(&self) -> PhysicalPlanSerdeSerialize<'_>; +} + +pub(crate) trait PhysicalPlanSerdeDeserialization { + fn from_physical_plan_deserialize(v: PhysicalPlanDeserialize) -> Self; +} + +pub(crate) trait IPhysicalPlan: + PhysicalPlanSerdeSerialization + + PhysicalPlanSerdeSerialization + + DynClone + + Debug + + Send + + Sync + + 'static +{ fn as_any(&self) -> &dyn Any; fn get_meta(&self) -> &PhysicalPlanMeta; @@ -269,6 +284,145 @@ impl PhysicalPlanCast for T { } } +macro_rules! define_physical_plan_serde { + ( $( $(#[$meta:meta])? $variant:ident => $path:path ),+ $(,)? ) => { + #[derive(Clone, Debug, serde::Deserialize)] + /// owned enum for deserialization; serialization uses PhysicalPlanSerdeRef to avoid cloning + pub(crate) enum PhysicalPlanDeserialize { + $( $(#[$meta])? $variant($path), )+ + } + + #[derive(Debug, serde::Serialize)] + pub(crate) enum PhysicalPlanSerdeSerialize<'a> { + $( $(#[$meta])? $variant(&'a $path), )+ + } + + $( $(#[$meta])? impl From<$path> for PhysicalPlanDeserialize { + fn from(v: $path) -> Self { + PhysicalPlanDeserialize::$variant(v) + } + })+ + + $( $(#[$meta])? impl<'a> From<&'a $path> for PhysicalPlanSerdeSerialize<'a> { + fn from(v: &'a $path) -> Self { + PhysicalPlanSerdeSerialize::$variant(v) + } + })+ + + $( $(#[$meta])? impl PhysicalPlanSerdeSerialization for $path { + fn to_physical_plan_serde_serialize(&self) -> PhysicalPlanSerdeSerialize<'_> { + PhysicalPlanSerdeSerialize::from(self) + } + })+ + }; +} + +include!(concat!(env!("OUT_DIR"), "/physical_plan_impls.rs")); + +include!(concat!(env!("OUT_DIR"), "/physical_plan_dispatch.rs")); + +impl PhysicalPlanSerdeSerialization for PhysicalPlanDeserialize { + fn to_physical_plan_serde_serialize(&self) -> PhysicalPlanSerdeSerialize<'_> { + dispatch_plan_ref!(self, v => PhysicalPlanSerdeSerialize::from(v)) + } +} + +impl PhysicalPlanSerdeDeserialization for PhysicalPlan { + fn from_physical_plan_deserialize(v: PhysicalPlanDeserialize) -> Self { + PhysicalPlan { inner: Box::new(v) } + } +} + +impl IPhysicalPlan for PhysicalPlanDeserialize { + fn as_any(&self) -> &dyn Any { + dispatch_plan_ref!(self, v => v.as_any()) + } + + fn get_meta(&self) -> &PhysicalPlanMeta { + dispatch_plan_ref!(self, v => v.get_meta()) + } + + fn get_meta_mut(&mut self) -> &mut PhysicalPlanMeta { + dispatch_plan_mut!(self, v => v.get_meta_mut()) + } + + fn get_id(&self) -> u32 { + dispatch_plan_ref!(self, v => v.get_id()) + } + + fn get_name(&self) -> String { + dispatch_plan_ref!(self, v => v.get_name()) + } + + fn adjust_plan_id(&mut self, next_id: &mut u32) { + dispatch_plan_mut!(self, v => v.adjust_plan_id(next_id)) + } + + fn output_schema(&self) -> Result { + dispatch_plan_ref!(self, v => v.output_schema()) + } + + fn children(&self) -> Box + '_> { + dispatch_plan_ref!(self, v => v.children()) + } + + fn children_mut(&mut self) -> Box + '_> { + dispatch_plan_mut!(self, v => v.children_mut()) + } + + fn formatter(&self) -> Result> { + dispatch_plan_ref!(self, v => v.formatter()) + } + + fn try_find_single_data_source(&self) -> Option<&DataSourcePlan> { + dispatch_plan_ref!(self, v => v.try_find_single_data_source()) + } + + fn try_find_mutation_source(&self) -> Option { + dispatch_plan_ref!(self, v => v.try_find_mutation_source()) + } + + fn get_all_data_source(&self, sources: &mut Vec<(u32, Box)>) { + dispatch_plan_ref!(self, v => v.get_all_data_source(sources)) + } + + fn set_pruning_stats(&mut self, stats: &mut HashMap) { + dispatch_plan_mut!(self, v => v.set_pruning_stats(stats)) + } + + fn is_distributed_plan(&self) -> bool { + dispatch_plan_ref!(self, v => v.is_distributed_plan()) + } + + fn is_warehouse_distributed_plan(&self) -> bool { + dispatch_plan_ref!(self, v => v.is_warehouse_distributed_plan()) + } + + fn display_in_profile(&self) -> bool { + dispatch_plan_ref!(self, v => v.display_in_profile()) + } + + fn get_desc(&self) -> Result { + dispatch_plan_ref!(self, v => v.get_desc()) + } + + fn get_labels(&self) -> Result>> { + dispatch_plan_ref!(self, v => v.get_labels()) + } + + fn derive(&self, children: Vec) -> PhysicalPlan { + dispatch_plan_ref!(self, v => v.derive(children)) + } + + fn build_pipeline(&self, builder: &mut PipelineBuilder) -> Result<()> { + dispatch_plan_ref!(self, v => v.build_pipeline(builder)) + } + + fn build_pipeline2(&self, builder: &mut PipelineBuilder) -> Result<()> { + dispatch_plan_ref!(self, v => v.build_pipeline2(builder)) + } +} + pub struct PhysicalPlan { inner: Box, } @@ -291,43 +445,124 @@ impl Debug for PhysicalPlan { } } -impl Deref for PhysicalPlan { - type Target = dyn IPhysicalPlan; - - fn deref(&self) -> &Self::Target { - self.inner.deref() - } -} - -impl DerefMut for PhysicalPlan { - fn deref_mut(&mut self) -> &mut Self::Target { - self.inner.deref_mut() - } -} - impl serde::Serialize for PhysicalPlan { #[recursive::recursive] fn serialize(&self, serializer: S) -> std::result::Result { - self.inner.serialize(serializer) + self.inner + .to_physical_plan_serde_serialize() + .serialize(serializer) } } impl<'de> serde::Deserialize<'de> for PhysicalPlan { #[recursive::recursive] fn deserialize>(deserializer: D) -> std::result::Result { - Ok(PhysicalPlan { - inner: Box::::deserialize(deserializer)?, - }) + // Deserialize to JSON first to avoid backtracking failures in streaming deserializers. + let value = JsonValue::deserialize(deserializer)?; + let inner: PhysicalPlanDeserialize = + serde_json::from_value(value).map_err(DeError::custom)?; + + Ok(PhysicalPlan::from_physical_plan_deserialize(inner)) } } impl PhysicalPlan { - pub fn new(inner: T) -> PhysicalPlan { + #[allow(private_bounds)] + pub fn new(inner: T) -> PhysicalPlan + where PhysicalPlanDeserialize: From { PhysicalPlan { - inner: Box::new(inner), + inner: Box::::new(inner.into()), } } + pub fn as_any(&self) -> &dyn Any { + self.inner.as_any() + } + + pub fn get_meta(&self) -> &PhysicalPlanMeta { + self.inner.get_meta() + } + + pub fn get_meta_mut(&mut self) -> &mut PhysicalPlanMeta { + self.inner.get_meta_mut() + } + + pub fn get_id(&self) -> u32 { + self.inner.get_id() + } + + pub fn get_name(&self) -> String { + self.inner.get_name() + } + + pub fn adjust_plan_id(&mut self, next_id: &mut u32) { + self.inner.adjust_plan_id(next_id) + } + + pub fn output_schema(&self) -> Result { + self.inner.output_schema() + } + + pub fn children(&self) -> Box + '_> { + self.inner.children() + } + + pub fn children_mut(&mut self) -> Box + '_> { + self.inner.children_mut() + } + + pub fn formatter(&self) -> Result> { + self.inner.formatter() + } + + pub fn try_find_single_data_source(&self) -> Option<&DataSourcePlan> { + self.inner.try_find_single_data_source() + } + + pub fn try_find_mutation_source(&self) -> Option { + self.inner.try_find_mutation_source() + } + + pub fn get_all_data_source(&self, sources: &mut Vec<(u32, Box)>) { + self.inner.get_all_data_source(sources) + } + + pub fn set_pruning_stats(&mut self, stats: &mut HashMap) { + self.inner.set_pruning_stats(stats) + } + + pub fn is_distributed_plan(&self) -> bool { + self.inner.is_distributed_plan() + } + + pub fn is_warehouse_distributed_plan(&self) -> bool { + self.inner.is_warehouse_distributed_plan() + } + + pub fn display_in_profile(&self) -> bool { + self.inner.display_in_profile() + } + + pub fn get_desc(&self) -> Result { + self.inner.get_desc() + } + + pub fn get_labels(&self) -> Result>> { + self.inner.get_labels() + } + + pub fn derive(&self, children: Vec) -> PhysicalPlan { + self.inner.derive(children) + } + + pub fn build_pipeline(&self, builder: &mut PipelineBuilder) -> Result<()> { + self.inner.build_pipeline(builder) + } + + pub fn build_pipeline2(&self, builder: &mut PipelineBuilder) -> Result<()> { + self.inner.build_pipeline2(builder) + } + #[recursive::recursive] pub fn derive_with(&self, handle: &mut Box) -> PhysicalPlan { let mut children = vec![]; @@ -365,3 +600,109 @@ impl PhysicalPlan { self.formatter()?.format(&mut context) } } + +#[cfg(test)] +mod tests { + use std::any::Any; + use std::backtrace::Backtrace; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + + use serde::ser::SerializeStruct; + use serde_json::{self}; + + use super::*; + + static STACK_DEPTH: AtomicUsize = AtomicUsize::new(0); + static STACK_DELTA: AtomicUsize = AtomicUsize::new(0); + static BASELINE_DEPTH: AtomicUsize = AtomicUsize::new(0); + + #[derive(Clone, Debug, serde::Deserialize)] + pub(crate) struct StackDepthPlan { + meta: PhysicalPlanMeta, + } + + impl StackDepthPlan { + fn new(name: impl Into) -> Self { + StackDepthPlan { + meta: PhysicalPlanMeta::new(name), + } + } + } + + impl serde::Serialize for StackDepthPlan { + fn serialize(&self, serializer: S) -> std::result::Result + where S: serde::Serializer { + let depth = current_stack_depth(); + STACK_DEPTH.store(depth, Ordering::Relaxed); + let baseline = BASELINE_DEPTH.load(Ordering::Relaxed); + STACK_DELTA.store(depth.saturating_sub(baseline), Ordering::Relaxed); + + // Serialize in the same shape as a normal plan node. + let mut state = serializer.serialize_struct("StackDepthPlan", 1)?; + state.serialize_field("meta", &self.meta)?; + state.end() + } + } + + impl IPhysicalPlan for StackDepthPlan { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_meta(&self) -> &PhysicalPlanMeta { + &self.meta + } + + fn get_meta_mut(&mut self) -> &mut PhysicalPlanMeta { + &mut self.meta + } + + fn derive(&self, _children: Vec) -> PhysicalPlan { + PhysicalPlan::new(self.clone()) + } + + fn build_pipeline2(&self, builder: &mut PipelineBuilder) -> Result<()> { + let _ = builder; + Ok(()) + } + } + + // Used to compare the serialization stack depth of different versions + #[test] + fn typetag_serialize_stack_depth_is_measured() { + STACK_DEPTH.store(0, Ordering::Relaxed); + STACK_DELTA.store(0, Ordering::Relaxed); + let baseline = current_stack_depth(); + BASELINE_DEPTH.store(baseline, Ordering::Relaxed); + + let plan = PhysicalPlan::new(StackDepthPlan::new("stack_depth_plan")); + serde_json::to_vec(&plan).expect("serialize typetag plan"); + + let depth = STACK_DEPTH.load(Ordering::Relaxed); + let delta = STACK_DELTA.load(Ordering::Relaxed); + + assert!(depth > 0, "backtrace depth was not captured"); + assert!( + delta > 0, + "delta between typetag serialize and baseline should be > 0" + ); + eprintln!( + "typetag serialize stack depth: {}, delta from baseline: {}", + depth, delta + ); + } + + fn current_stack_depth() -> usize { + Backtrace::force_capture() + .to_string() + .lines() + .filter(|line| { + line.trim_start() + .chars() + .next() + .is_some_and(|c| c.is_ascii_digit()) + }) + .count() + } +} diff --git a/src/query/service/src/physical_plans/physical_project_set.rs b/src/query/service/src/physical_plans/physical_project_set.rs index 7b3caf514ea24..8b8809ed5a25d 100644 --- a/src/query/service/src/physical_plans/physical_project_set.rs +++ b/src/query/service/src/physical_plans/physical_project_set.rs @@ -50,7 +50,6 @@ pub struct ProjectSet { pub stat_info: Option, } -#[typetag::serde] impl IPhysicalPlan for ProjectSet { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_r_cte_scan.rs b/src/query/service/src/physical_plans/physical_r_cte_scan.rs index 94d478dadb0c0..9e6792005714a 100644 --- a/src/query/service/src/physical_plans/physical_r_cte_scan.rs +++ b/src/query/service/src/physical_plans/physical_r_cte_scan.rs @@ -35,7 +35,6 @@ pub struct RecursiveCteScan { pub stat: PlanStatsInfo, } -#[typetag::serde] impl IPhysicalPlan for RecursiveCteScan { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_range_join.rs b/src/query/service/src/physical_plans/physical_range_join.rs index b96091b477d84..eb719459521b8 100644 --- a/src/query/service/src/physical_plans/physical_range_join.rs +++ b/src/query/service/src/physical_plans/physical_range_join.rs @@ -66,7 +66,6 @@ pub struct RangeJoin { pub stat_info: Option, } -#[typetag::serde] impl IPhysicalPlan for RangeJoin { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_recluster.rs b/src/query/service/src/physical_plans/physical_recluster.rs index f86247763f24c..0a6c80bede960 100644 --- a/src/query/service/src/physical_plans/physical_recluster.rs +++ b/src/query/service/src/physical_plans/physical_recluster.rs @@ -67,7 +67,6 @@ pub struct Recluster { pub table_meta_timestamps: TableMetaTimestamps, } -#[typetag::serde] impl IPhysicalPlan for Recluster { fn as_any(&self) -> &dyn Any { self @@ -266,7 +265,6 @@ pub struct HilbertPartition { pub rows_per_block: usize, } -#[typetag::serde] impl IPhysicalPlan for HilbertPartition { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_replace_async_source.rs b/src/query/service/src/physical_plans/physical_replace_async_source.rs index e57f78f04fb6f..cc1379fc1f233 100644 --- a/src/query/service/src/physical_plans/physical_replace_async_source.rs +++ b/src/query/service/src/physical_plans/physical_replace_async_source.rs @@ -35,7 +35,6 @@ pub struct ReplaceAsyncSourcer { pub source: InsertValue, } -#[typetag::serde] impl IPhysicalPlan for ReplaceAsyncSourcer { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_replace_deduplicate.rs b/src/query/service/src/physical_plans/physical_replace_deduplicate.rs index 29f1ce6d2ec89..b28712e83354b 100644 --- a/src/query/service/src/physical_plans/physical_replace_deduplicate.rs +++ b/src/query/service/src/physical_plans/physical_replace_deduplicate.rs @@ -57,7 +57,6 @@ pub struct ReplaceDeduplicate { pub delete_when: Option<(RemoteExpr, String)>, } -#[typetag::serde] impl IPhysicalPlan for ReplaceDeduplicate { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_replace_into.rs b/src/query/service/src/physical_plans/physical_replace_into.rs index 9e502fb467a81..09a75dd9847b8 100644 --- a/src/query/service/src/physical_plans/physical_replace_into.rs +++ b/src/query/service/src/physical_plans/physical_replace_into.rs @@ -57,7 +57,6 @@ pub struct ReplaceInto { pub table_meta_timestamps: TableMetaTimestamps, } -#[typetag::serde] impl IPhysicalPlan for ReplaceInto { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_row_fetch.rs b/src/query/service/src/physical_plans/physical_row_fetch.rs index 11fef083c340a..32eb1667e9a0b 100644 --- a/src/query/service/src/physical_plans/physical_row_fetch.rs +++ b/src/query/service/src/physical_plans/physical_row_fetch.rs @@ -54,7 +54,6 @@ pub struct RowFetch { pub stat_info: Option, } -#[typetag::serde] impl IPhysicalPlan for RowFetch { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_secure_filter.rs b/src/query/service/src/physical_plans/physical_secure_filter.rs index 851952279b83f..1480ea2e5a6c4 100644 --- a/src/query/service/src/physical_plans/physical_secure_filter.rs +++ b/src/query/service/src/physical_plans/physical_secure_filter.rs @@ -48,7 +48,6 @@ pub struct SecureFilter { pub stat_info: Option, } -#[typetag::serde] impl IPhysicalPlan for SecureFilter { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_sequence.rs b/src/query/service/src/physical_plans/physical_sequence.rs index 4642019c961f5..a0923d68cd655 100644 --- a/src/query/service/src/physical_plans/physical_sequence.rs +++ b/src/query/service/src/physical_plans/physical_sequence.rs @@ -39,7 +39,6 @@ pub struct Sequence { pub meta: PhysicalPlanMeta, } -#[typetag::serde] impl IPhysicalPlan for Sequence { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_sort.rs b/src/query/service/src/physical_plans/physical_sort.rs index 531e93b955b55..a4ade17b55d00 100644 --- a/src/query/service/src/physical_plans/physical_sort.rs +++ b/src/query/service/src/physical_plans/physical_sort.rs @@ -99,7 +99,6 @@ impl Display for SortStep { } } -#[typetag::serde] impl IPhysicalPlan for Sort { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_table_scan.rs b/src/query/service/src/physical_plans/physical_table_scan.rs index ecca1c5f005cf..7cd3896a26a7b 100644 --- a/src/query/service/src/physical_plans/physical_table_scan.rs +++ b/src/query/service/src/physical_plans/physical_table_scan.rs @@ -97,7 +97,6 @@ pub struct TableScan { pub stat_info: Option, } -#[typetag::serde] impl IPhysicalPlan for TableScan { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_udf.rs b/src/query/service/src/physical_plans/physical_udf.rs index 89f75767e8acb..0ec99673786db 100644 --- a/src/query/service/src/physical_plans/physical_udf.rs +++ b/src/query/service/src/physical_plans/physical_udf.rs @@ -52,7 +52,6 @@ pub struct Udf { pub stat_info: Option, } -#[typetag::serde] impl IPhysicalPlan for Udf { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_union_all.rs b/src/query/service/src/physical_plans/physical_union_all.rs index c2f311951a588..dd669de1d18a8 100644 --- a/src/query/service/src/physical_plans/physical_union_all.rs +++ b/src/query/service/src/physical_plans/physical_union_all.rs @@ -55,7 +55,6 @@ pub struct UnionAll { pub stat_info: Option, } -#[typetag::serde] impl IPhysicalPlan for UnionAll { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_window.rs b/src/query/service/src/physical_plans/physical_window.rs index 6682adddbfcde..437dc9fe34092 100644 --- a/src/query/service/src/physical_plans/physical_window.rs +++ b/src/query/service/src/physical_plans/physical_window.rs @@ -73,7 +73,6 @@ pub struct Window { pub limit: Option, } -#[typetag::serde] impl IPhysicalPlan for Window { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/physical_plans/physical_window_partition.rs b/src/query/service/src/physical_plans/physical_window_partition.rs index e3c9f1f0228d4..e8c652a995d0b 100644 --- a/src/query/service/src/physical_plans/physical_window_partition.rs +++ b/src/query/service/src/physical_plans/physical_window_partition.rs @@ -54,7 +54,6 @@ pub struct WindowPartition { pub stat_info: Option, } -#[typetag::serde] impl IPhysicalPlan for WindowPartition { fn as_any(&self) -> &dyn Any { self diff --git a/src/query/service/src/servers/flight/v1/packets/mod.rs b/src/query/service/src/servers/flight/v1/packets/mod.rs index ca44d46afde5e..062a539857f8c 100644 --- a/src/query/service/src/servers/flight/v1/packets/mod.rs +++ b/src/query/service/src/servers/flight/v1/packets/mod.rs @@ -15,7 +15,7 @@ mod packet_data; mod packet_data_progressinfo; mod packet_executor; -mod packet_fragment; +pub mod packet_fragment; mod packet_publisher; pub use packet_data::DataPacket; diff --git a/src/query/service/src/servers/flight/v1/packets/packet_fragment.rs b/src/query/service/src/servers/flight/v1/packets/packet_fragment.rs index 963363e7267d1..07f5dd4a7b0e2 100644 --- a/src/query/service/src/servers/flight/v1/packets/packet_fragment.rs +++ b/src/query/service/src/servers/flight/v1/packets/packet_fragment.rs @@ -50,9 +50,8 @@ impl QueryFragment { } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -struct SerializedPhysicalPlanRef(u32); +pub struct SerializedPhysicalPlanRef(u32); -#[typetag::serde] impl IPhysicalPlan for SerializedPhysicalPlanRef { fn as_any(&self) -> &dyn Any { self @@ -71,13 +70,20 @@ impl IPhysicalPlan for SerializedPhysicalPlanRef { } } -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug, serde::Serialize)] struct SerializeQueryFragment { pub fragment_id: usize, pub data_exchange: Option, pub flatten_plan: VecDeque, } +#[derive(Clone, Debug, serde::Deserialize)] +struct SerializeQueryFragmentJson { + pub fragment_id: usize, + pub data_exchange: Option, + pub flatten_plan: VecDeque, +} + impl serde::Serialize for QueryFragment { #[recursive::recursive] fn serialize(&self, serializer: S) -> Result { @@ -108,7 +114,22 @@ impl serde::Serialize for QueryFragment { impl<'de> serde::Deserialize<'de> for QueryFragment { #[recursive::recursive] fn deserialize>(deserializer: D) -> Result { - let mut fragment = SerializeQueryFragment::deserialize(deserializer)?; + let fragment = SerializeQueryFragmentJson::deserialize(deserializer)?; + + // Deserialize PhysicalPlan node-by-node using Value to avoid streaming issues. + let flatten_plan: VecDeque = fragment + .flatten_plan + .into_iter() + .map(|plan| { + serde_json::from_value(plan) + .map_err(|e| D::Error::custom(format!("deserialize physical plan: {e}"))) + }) + .collect::>()?; + let mut fragment = SerializeQueryFragment { + fragment_id: fragment.fragment_id, + data_exchange: fragment.data_exchange, + flatten_plan, + }; let mut flatten_storage = HashMap::new();