diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs index 90f2a8a7737fa..0e620c733afae 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs @@ -55,9 +55,37 @@ pub async fn build_runtime_filter_infos( max_threads: usize, ) -> Result> { let total_build_rows = packet.build_rows; - let Some(packets) = packet.packets else { + let Some(mut packets) = packet.packets else { return Ok(HashMap::new()); }; + + // If multiple filters target the same scan, keep only the one with lowest selectivity + // Selectivity = build_rows / build_table_rows (same as should_enable_runtime_filter) + // Key: scan_id, Value: (best_filter_id, lowest_selectivity) + let mut best_filter_per_scan: HashMap = HashMap::new(); + for pkt in packets.values() { + let desc = runtime_filter_descs.get(&pkt.id).unwrap(); + let selectivity = desc + .build_table_rows + .map(|rows| total_build_rows as f64 / rows as f64) + .unwrap_or(f64::MAX); + for (_, scan_id) in &desc.probe_targets { + best_filter_per_scan + .entry(*scan_id) + .and_modify(|e| { + if selectivity < e.1 { + *e = (pkt.id, selectivity); + } + }) + .or_insert((pkt.id, selectivity)); + } + } + + // Remove filters that are not the best for any scan + let best_ids: std::collections::HashSet = + best_filter_per_scan.values().map(|(id, _)| *id).collect(); + packets.retain(|id, _| best_ids.contains(id)); + let mut filters: HashMap = HashMap::new(); // Iterate over all runtime filter packets