Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ wiremock = { workspace = true }

[build-dependencies]
databend-common-building = { workspace = true }
walkdir = { workspace = true }

[lints]
workspace = true
Expand Down
169 changes: 169 additions & 0 deletions src/query/service/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
// limitations under the License.

use std::env;
use std::fs;
use std::io::Write;
use std::path::Path;
use std::path::PathBuf;

fn main() {
// Keep build script rerun behavior explicit for the feature list we expose.
Expand Down Expand Up @@ -49,4 +53,169 @@ fn main() {

let features = features.join(",");
println!("cargo:rustc-env=DATABEND_QUERY_CARGO_FEATURES={features}");

let manifest_dir = env::var("CARGO_MANIFEST_DIR").expect("CARGO_MANIFEST_DIR not set");
let src_dir = Path::new(&manifest_dir).join("src");
let out_dir = PathBuf::from(env::var("OUT_DIR").expect("OUT_DIR not set"));

let mut entries = collect_impls(&src_dir);
entries.sort_by(|a, b| a.1.cmp(&b.1));

let impls_out = out_dir.join("physical_plan_impls.rs");
write_impls(&impls_out, &entries);

let dispatch_out = out_dir.join("physical_plan_dispatch.rs");
write_dispatch(&dispatch_out, &entries);

println!("cargo:rerun-if-changed={}", src_dir.display());
}

fn collect_impls(src_dir: &Path) -> Vec<(Option<String>, String, String)> {
let mut entries = Vec::new();

for entry in walkdir::WalkDir::new(src_dir)
.into_iter()
.filter_map(Result::ok)
.filter(|e| e.file_type().is_file())
{
if entry.path().extension().and_then(|s| s.to_str()) != Some("rs") {
continue;
}

let content = fs::read_to_string(entry.path())
.unwrap_or_else(|e| panic!("read {}: {e}", entry.path().display()));

for line in content.lines() {
if let Some(e) = parse_impl_line(src_dir, entry.path(), line) {
entries.push(e);
}
}
}

entries
}

fn parse_impl_line(
src_root: &Path,
path: &Path,
line: &str,
) -> Option<(Option<String>, String, String)> {
// match `impl ... IPhysicalPlan for Type ... {`
let marker = "IPhysicalPlan for";
let idx = line.find(marker)?;
let after = &line[idx + marker.len()..];
let type_part = after
.split_whitespace()
.next()
.unwrap_or("")
.trim_end_matches('{')
.trim_end_matches(';');
if type_part.is_empty() {
return None;
}

let type_name = type_part
.split("::")
.last()
.unwrap_or(type_part)
.split('<')
.next()
.unwrap_or(type_part)
.to_string();

if type_name == "PhysicalPlanDeserialize" {
return None;
}

let mut module_path = module_path_from_file(src_root, path);
let mut meta = None;

if type_name == "StackDepthPlan" && path.ends_with("physical_plan.rs") {
module_path.push_str("::tests");
meta = Some("#[cfg(test)]".to_string());
}

let full_path = format!("{module_path}::{type_part}");

Some((meta, type_name, full_path))
}

fn module_path_from_file(src_root: &Path, path: &Path) -> String {
let mut module_path = String::from("crate");
let rel = path
.strip_prefix(src_root)
.unwrap_or(path)
.with_extension("");
for comp in rel.components() {
let c = comp.as_os_str().to_string_lossy();
module_path.push_str("::");
module_path.push_str(&c);
}
module_path
}

fn write_impls(out_path: &Path, entries: &[(Option<String>, String, String)]) {
let mut out = String::new();
out.push_str("define_physical_plan_serde!(\n");

for (meta, variant, path) in entries {
if let Some(meta) = meta {
out.push_str(" ");
out.push_str(meta);
out.push('\n');
}
out.push_str(" ");
out.push_str(variant);
out.push_str(" => ");
out.push_str(path);
out.push_str(",\n");
}

out.push_str(");\n");

let mut file = fs::File::create(out_path).expect("create physical_plan_impls.rs");
file.write_all(out.as_bytes())
.expect("write physical_plan_impls.rs");
}

fn write_dispatch(out_path: &Path, entries: &[(Option<String>, String, String)]) {
let mut out = String::new();

out.push_str("macro_rules! dispatch_plan_ref {\n");
out.push_str(" ($s:expr, $plan:ident => $body:expr) => {\n");
out.push_str(" match $s {\n");
for (meta, variant, _) in entries {
if let Some(meta) = meta {
out.push_str(" ");
out.push_str(meta);
out.push('\n');
}
out.push_str(" PhysicalPlanDeserialize::");
out.push_str(variant);
out.push_str("($plan) => $body,\n");
}
out.push_str(" }\n");
out.push_str(" };\n");
out.push_str("}\n\n");

out.push_str("macro_rules! dispatch_plan_mut {\n");
out.push_str(" ($s:expr, $plan:ident => $body:expr) => {\n");
out.push_str(" match $s {\n");
for (meta, variant, _) in entries {
if let Some(meta) = meta {
out.push_str(" ");
out.push_str(meta);
out.push('\n');
}
out.push_str(" PhysicalPlanDeserialize::");
out.push_str(variant);
out.push_str("($plan) => $body,\n");
}
out.push_str(" }\n");
out.push_str(" };\n");
out.push_str("}\n");

let mut file = fs::File::create(out_path).expect("create physical_plan_dispatch.rs");
file.write_all(out.as_bytes())
.expect("write physical_plan_dispatch.rs");
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ pub struct AddStreamColumn {
pub stream_columns: Vec<StreamColumn>,
}

#[typetag::serde]
impl IPhysicalPlan for AddStreamColumn {
fn as_any(&self) -> &dyn Any {
self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ pub struct AggregateExpand {
pub stat_info: Option<PlanStatsInfo>,
}

#[typetag::serde]
impl IPhysicalPlan for AggregateExpand {
fn as_any(&self) -> &dyn Any {
self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ pub struct AggregateFinal {
pub stat_info: Option<PlanStatsInfo>,
}

#[typetag::serde]
impl IPhysicalPlan for AggregateFinal {
fn as_any(&self) -> &dyn Any {
self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ pub struct AggregatePartial {
pub stat_info: Option<PlanStatsInfo>,
}

#[typetag::serde]
impl IPhysicalPlan for AggregatePartial {
fn as_any(&self) -> &dyn Any {
self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ pub struct AsyncFunction {
pub stat_info: Option<PlanStatsInfo>,
}

#[typetag::serde]
impl IPhysicalPlan for AsyncFunction {
fn as_any(&self) -> &dyn Any {
self
Expand Down
2 changes: 0 additions & 2 deletions src/query/service/src/physical_plans/physical_broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ pub struct BroadcastSource {
pub broadcast_id: u32,
}

#[typetag::serde]
impl IPhysicalPlan for BroadcastSource {
fn as_any(&self) -> &dyn Any {
self
Expand Down Expand Up @@ -75,7 +74,6 @@ pub struct BroadcastSink {
pub input: PhysicalPlan,
}

#[typetag::serde]
impl IPhysicalPlan for BroadcastSink {
fn as_any(&self) -> &dyn Any {
self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ pub struct CacheScan {
pub output_schema: DataSchemaRef,
}

#[typetag::serde]
impl IPhysicalPlan for CacheScan {
fn as_any(&self) -> &dyn Any {
self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ pub struct ColumnMutation {
pub udf_col_num: usize,
}

#[typetag::serde]
impl IPhysicalPlan for ColumnMutation {
fn as_any(&self) -> &dyn Any {
self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ pub struct CommitSink {
pub recluster_info: Option<ReclusterInfoSideCar>,
}

#[typetag::serde]
impl IPhysicalPlan for CommitSink {
fn as_any(&self) -> &dyn Any {
self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ pub struct CompactSource {
pub table_meta_timestamps: TableMetaTimestamps,
}

#[typetag::serde]
impl IPhysicalPlan for CompactSource {
fn as_any(&self) -> &dyn Any {
self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ pub struct ConstantTableScan {
pub output_schema: DataSchemaRef,
}

#[typetag::serde]
impl IPhysicalPlan for ConstantTableScan {
fn as_any(&self) -> &dyn Any {
self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ pub struct CopyIntoLocation {
pub info: CopyIntoLocationInfo,
}

#[typetag::serde]
impl IPhysicalPlan for CopyIntoLocation {
fn as_any(&self) -> &dyn Any {
self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ pub struct CopyIntoTable {
pub table_meta_timestamps: TableMetaTimestamps,
}

#[typetag::serde]
impl IPhysicalPlan for CopyIntoTable {
fn as_any(&self) -> &dyn Any {
self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ pub struct MaterializeCTERef {
pub meta: PhysicalPlanMeta,
}

#[typetag::serde]
impl IPhysicalPlan for MaterializeCTERef {
fn as_any(&self) -> &dyn Any {
self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ pub struct DistributedInsertSelect {
pub table_meta_timestamps: TableMetaTimestamps,
}

#[typetag::serde]
impl IPhysicalPlan for DistributedInsertSelect {
fn as_any(&self) -> &dyn Any {
self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ pub struct EvalScalar {
pub stat_info: Option<PlanStatsInfo>,
}

#[typetag::serde]
impl IPhysicalPlan for EvalScalar {
fn as_any(&self) -> &dyn Any {
self
Expand Down
1 change: 0 additions & 1 deletion src/query/service/src/physical_plans/physical_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ pub struct Exchange {
pub allow_adjust_parallelism: bool,
}

#[typetag::serde]
impl IPhysicalPlan for Exchange {
fn as_any(&self) -> &dyn Any {
self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ pub struct ExchangeSink {
pub allow_adjust_parallelism: bool,
}

#[typetag::serde]
impl IPhysicalPlan for ExchangeSink {
fn as_any(&self) -> &dyn Any {
self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ pub struct ExchangeSource {
pub query_id: String,
}

#[typetag::serde]
impl IPhysicalPlan for ExchangeSource {
fn as_any(&self) -> &dyn Any {
self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ pub struct ExpressionScan {
pub output_schema: DataSchemaRef,
}

#[typetag::serde]
impl IPhysicalPlan for ExpressionScan {
fn as_any(&self) -> &dyn Any {
self
Expand Down
1 change: 0 additions & 1 deletion src/query/service/src/physical_plans/physical_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ pub struct Filter {
pub stat_info: Option<PlanStatsInfo>,
}

#[typetag::serde]
impl IPhysicalPlan for Filter {
fn as_any(&self) -> &dyn Any {
self
Expand Down
1 change: 0 additions & 1 deletion src/query/service/src/physical_plans/physical_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ pub struct HashJoin {
pub broadcast_id: Option<u32>,
}

#[typetag::serde]
impl IPhysicalPlan for HashJoin {
fn as_any(&self) -> &dyn Any {
self
Expand Down
3 changes: 1 addition & 2 deletions src/query/service/src/physical_plans/physical_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::pipelines::PipelineBuilder;

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct Limit {
meta: PhysicalPlanMeta,
pub(crate) meta: PhysicalPlanMeta,
pub input: PhysicalPlan,
pub limit: Option<usize>,
pub offset: usize,
Expand All @@ -47,7 +47,6 @@ pub struct Limit {
pub stat_info: Option<PlanStatsInfo>,
}

#[typetag::serde]
impl IPhysicalPlan for Limit {
fn as_any(&self) -> &dyn Any {
self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ pub struct MaterializedCTE {
pub meta: PhysicalPlanMeta,
}

#[typetag::serde]
impl IPhysicalPlan for MaterializedCTE {
fn as_any(&self) -> &dyn Any {
self
Expand Down
Loading
Loading