Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1156,3 +1156,24 @@ async fn nested_loop_join_selectivity() {
);
}
}

#[tokio::test]
async fn explain_analyze_hash_join() {
let sql = "EXPLAIN ANALYZE \
SELECT * \
FROM generate_series(10) as t1(a) \
JOIN generate_series(20) as t2(b) \
ON t1.a=t2.b";

for (level, needle, should_contain) in [
(ExplainAnalyzeLevel::Summary, "probe_hit_rate", true),
(ExplainAnalyzeLevel::Summary, "avg_fanout", true),
] {
let plan = collect_plan(sql, level).await;
assert_eq!(
plan.contains(needle),
should_contain,
"plan for level {level:?} unexpected content: {plan}"
);
}
}
47 changes: 46 additions & 1 deletion datafusion/physical-plan/src/joins/hash_join/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::{
RecordBatchStream, SendableRecordBatchStream,
};

use arrow::array::{ArrayRef, UInt32Array, UInt64Array};
use arrow::array::{Array, ArrayRef, UInt32Array, UInt64Array};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::{
Expand Down Expand Up @@ -296,6 +296,35 @@ pub(super) fn lookup_join_hashmap(
Ok((build_indices, probe_indices, next_offset))
}

/// Counts the number of distinct elements in the input array.
///
/// The input array must be sorted (e.g., `[0, 1, 1, 2, 2, ...]`) and contain no null values.
#[inline]
fn count_distinct_sorted_indices(indices: &UInt32Array) -> usize {
if indices.is_empty() {
return 0;
}

debug_assert!(indices.null_count() == 0);

let values_buf = indices.values();
let values = values_buf.as_ref();
let mut iter = values.iter();
let Some(&first) = iter.next() else {
return 0;
};

let mut count = 1usize;
let mut last = first;
for &value in iter {
if value != last {
last = value;
count += 1;
}
}
count
}

impl HashJoinStream {
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
Expand Down Expand Up @@ -483,6 +512,10 @@ impl HashJoinStream {
let state = self.state.try_as_process_probe_batch_mut()?;
let build_side = self.build_side.try_as_ready_mut()?;

self.join_metrics
.probe_hit_rate
.add_total(state.batch.num_rows());

let timer = self.join_metrics.join_time.timer();

// if the left side is empty, we can skip the (potentially expensive) join operation
Expand Down Expand Up @@ -512,6 +545,18 @@ impl HashJoinStream {
state.offset,
)?;

let distinct_right_indices_count = count_distinct_sorted_indices(&right_indices);

self.join_metrics
.probe_hit_rate
.add_part(distinct_right_indices_count);

self.join_metrics.avg_fanout.add_part(left_indices.len());

self.join_metrics
.avg_fanout
.add_total(distinct_right_indices_count);

// apply join filter if exists
let (left_indices, right_indices) = if let Some(filter) = &self.filter {
apply_join_filter_to_indices(
Expand Down
18 changes: 17 additions & 1 deletion datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use std::sync::Arc;
use std::task::{Context, Poll};

use crate::joins::SharedBitmapBuilder;
use crate::metrics::{self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder};
use crate::metrics::{
self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricType,
};
use crate::projection::{ProjectionExec, ProjectionExpr};
use crate::{
ColumnStatistics, ExecutionPlan, ExecutionPlanProperties, Partitioning, Statistics,
Expand Down Expand Up @@ -1327,6 +1329,10 @@ pub(crate) struct BuildProbeJoinMetrics {
pub(crate) input_batches: metrics::Count,
/// Number of rows consumed by probe-side this operator
pub(crate) input_rows: metrics::Count,
/// Fraction of probe rows that found more than one match
pub(crate) probe_hit_rate: metrics::RatioMetrics,
/// Average number of build matches per matched probe row
pub(crate) avg_fanout: metrics::RatioMetrics,
}

// This Drop implementation updates the elapsed compute part of the metrics.
Expand Down Expand Up @@ -1370,6 +1376,14 @@ impl BuildProbeJoinMetrics {

let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);

let probe_hit_rate = MetricBuilder::new(metrics)
.with_type(MetricType::SUMMARY)
.ratio_metrics("probe_hit_rate", partition);

let avg_fanout = MetricBuilder::new(metrics)
.with_type(MetricType::SUMMARY)
.ratio_metrics("avg_fanout", partition);

Self {
build_time,
build_input_batches,
Expand All @@ -1379,6 +1393,8 @@ impl BuildProbeJoinMetrics {
input_batches,
input_rows,
baseline,
probe_hit_rate,
avg_fanout,
}
}
}
Expand Down