diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 929de7a5304d..1a323d07490d 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -1156,3 +1156,24 @@ async fn nested_loop_join_selectivity() { ); } } + +#[tokio::test] +async fn explain_analyze_hash_join() { + let sql = "EXPLAIN ANALYZE \ + SELECT * \ + FROM generate_series(10) as t1(a) \ + JOIN generate_series(20) as t2(b) \ + ON t1.a=t2.b"; + + for (level, needle, should_contain) in [ + (ExplainAnalyzeLevel::Summary, "probe_hit_rate", true), + (ExplainAnalyzeLevel::Summary, "avg_fanout", true), + ] { + let plan = collect_plan(sql, level).await; + assert_eq!( + plan.contains(needle), + should_contain, + "plan for level {level:?} unexpected content: {plan}" + ); + } +} diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 1f4aeecb2972..ae23b8d90118 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -42,7 +42,7 @@ use crate::{ RecordBatchStream, SendableRecordBatchStream, }; -use arrow::array::{ArrayRef, UInt32Array, UInt64Array}; +use arrow::array::{Array, ArrayRef, UInt32Array, UInt64Array}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::{ @@ -296,6 +296,35 @@ pub(super) fn lookup_join_hashmap( Ok((build_indices, probe_indices, next_offset)) } +/// Counts the number of distinct elements in the input array. +/// +/// The input array must be sorted (e.g., `[0, 1, 1, 2, 2, ...]`) and contain no null values. +#[inline] +fn count_distinct_sorted_indices(indices: &UInt32Array) -> usize { + if indices.is_empty() { + return 0; + } + + debug_assert!(indices.null_count() == 0); + + let values_buf = indices.values(); + let values = values_buf.as_ref(); + let mut iter = values.iter(); + let Some(&first) = iter.next() else { + return 0; + }; + + let mut count = 1usize; + let mut last = first; + for &value in iter { + if value != last { + last = value; + count += 1; + } + } + count +} + impl HashJoinStream { #[allow(clippy::too_many_arguments)] pub(super) fn new( @@ -483,6 +512,10 @@ impl HashJoinStream { let state = self.state.try_as_process_probe_batch_mut()?; let build_side = self.build_side.try_as_ready_mut()?; + self.join_metrics + .probe_hit_rate + .add_total(state.batch.num_rows()); + let timer = self.join_metrics.join_time.timer(); // if the left side is empty, we can skip the (potentially expensive) join operation @@ -512,6 +545,18 @@ impl HashJoinStream { state.offset, )?; + let distinct_right_indices_count = count_distinct_sorted_indices(&right_indices); + + self.join_metrics + .probe_hit_rate + .add_part(distinct_right_indices_count); + + self.join_metrics.avg_fanout.add_part(left_indices.len()); + + self.join_metrics + .avg_fanout + .add_total(distinct_right_indices_count); + // apply join filter if exists let (left_indices, right_indices) = if let Some(filter) = &self.filter { apply_join_filter_to_indices( diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 6ff829815451..8877acc333eb 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -27,7 +27,9 @@ use std::sync::Arc; use std::task::{Context, Poll}; use crate::joins::SharedBitmapBuilder; -use crate::metrics::{self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder}; +use crate::metrics::{ + self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricType, +}; use crate::projection::{ProjectionExec, ProjectionExpr}; use crate::{ ColumnStatistics, ExecutionPlan, ExecutionPlanProperties, Partitioning, Statistics, @@ -1327,6 +1329,10 @@ pub(crate) struct BuildProbeJoinMetrics { pub(crate) input_batches: metrics::Count, /// Number of rows consumed by probe-side this operator pub(crate) input_rows: metrics::Count, + /// Fraction of probe rows that found more than one match + pub(crate) probe_hit_rate: metrics::RatioMetrics, + /// Average number of build matches per matched probe row + pub(crate) avg_fanout: metrics::RatioMetrics, } // This Drop implementation updates the elapsed compute part of the metrics. @@ -1370,6 +1376,14 @@ impl BuildProbeJoinMetrics { let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition); + let probe_hit_rate = MetricBuilder::new(metrics) + .with_type(MetricType::SUMMARY) + .ratio_metrics("probe_hit_rate", partition); + + let avg_fanout = MetricBuilder::new(metrics) + .with_type(MetricType::SUMMARY) + .ratio_metrics("avg_fanout", partition); + Self { build_time, build_input_batches, @@ -1379,6 +1393,8 @@ impl BuildProbeJoinMetrics { input_batches, input_rows, baseline, + probe_hit_rate, + avg_fanout, } } }