Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,37 @@ pub async fn build_runtime_filter_infos(
max_threads: usize,
) -> Result<HashMap<usize, RuntimeFilterInfo>> {
let total_build_rows = packet.build_rows;
let Some(packets) = packet.packets else {
let Some(mut packets) = packet.packets else {
return Ok(HashMap::new());
};

// If multiple filters target the same scan, keep only the one with lowest selectivity
// Selectivity = build_rows / build_table_rows (same as should_enable_runtime_filter)
// Key: scan_id, Value: (best_filter_id, lowest_selectivity)
let mut best_filter_per_scan: HashMap<usize, (usize, f64)> = HashMap::new();
for pkt in packets.values() {
let desc = runtime_filter_descs.get(&pkt.id).unwrap();
let selectivity = desc
.build_table_rows
.map(|rows| total_build_rows as f64 / rows as f64)
.unwrap_or(f64::MAX);
for (_, scan_id) in &desc.probe_targets {
best_filter_per_scan
.entry(*scan_id)
.and_modify(|e| {
if selectivity < e.1 {
*e = (pkt.id, selectivity);
}
})
.or_insert((pkt.id, selectivity));
}
}

// Remove filters that are not the best for any scan
let best_ids: std::collections::HashSet<usize> =
best_filter_per_scan.values().map(|(id, _)| *id).collect();
packets.retain(|id, _| best_ids.contains(id));

let mut filters: HashMap<usize, RuntimeFilterInfo> = HashMap::new();

// Iterate over all runtime filter packets
Expand Down
Loading