Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions src/query/sql/src/planner/plans/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,17 +793,28 @@ impl Operator for Join {

let settings = ctx.get_settings();
if !matches!(self.join_type, JoinType::Cross) && !settings.get_enforce_broadcast_join()? {
// (Hash, Hash)
children_required.extend(self.equi_conditions.iter().map(|condition| {
vec![
// (Hash, Hash) – use full equi-join key set to avoid single-column hash shuffle
let left_keys: Vec<_> = self
.equi_conditions
.iter()
.map(|condition| condition.left.clone())
.collect();
let right_keys: Vec<_> = self
.equi_conditions
.iter()
.map(|condition| condition.right.clone())
.collect();

if !left_keys.is_empty() {
children_required.push(vec![
RequiredProperty {
distribution: Distribution::NodeToNodeHash(vec![condition.left.clone()]),
distribution: Distribution::NodeToNodeHash(left_keys),
},
RequiredProperty {
distribution: Distribution::NodeToNodeHash(vec![condition.right.clone()]),
distribution: Distribution::NodeToNodeHash(right_keys),
},
]
}));
]);
}
}

if !matches!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ where t_10.a = t_1000.a and t_100.a = t_1000.a
----
Memo
├── root group: #8
├── estimated memory: 10.69 KiB
├── estimated memory: 10.09 KiB
├── Group #0
│ ├── Best properties
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: []
│ │ └── { dist: Hash(t_1000.a (#2)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ └── { dist: Hash(t_1000.a (#2)::Int32 NULL, t_1000.a (#2)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
│ ├── #0 Scan []
│ └── #1 Exchange: (Hash(t_1000.a (#2)::Int32 NULL)) [#0]
│ └── #1 Exchange: (Hash(t_1000.a (#2)::Int32 NULL,t_1000.a (#2)::Int32 NULL)) [#0]
├── Group #1
│ ├── Best properties
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: []
Expand All @@ -51,12 +51,10 @@ Memo
│ ├── Best properties
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }, { dist: Broadcast }]
│ │ ├── { dist: Broadcast }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ ├── { dist: Hash(t_10.a (#0)::Int32 NULL) }: expr: #2, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ └── { dist: Hash(t_100.a (#1)::Int32 NULL) }: expr: #0, cost: <slt:ignore>, children: [{ dist: Hash(t_100.a (#1)::Int32 NULL) }, { dist: Hash(t_10.a (#0)::Int32 NULL) }]
│ │ └── { dist: Hash(t_10.a (#0)::Int32 NULL, t_100.a (#1)::Int32 NULL) }: expr: #2, cost: 1820.000, children: [{ dist: Any }]
│ ├── #0 Join [#1, #2]
│ ├── #1 Exchange: (Broadcast) [#3]
│ ├── #2 Exchange: (Hash(t_10.a (#0)::Int32 NULL)) [#3]
│ └── #3 Exchange: (Hash(t_100.a (#1)::Int32 NULL)) [#3]
│ └── #2 Exchange: (Hash(t_10.a (#0)::Int32 NULL,t_100.a (#1)::Int32 NULL)) [#3]
├── Group #4
│ ├── Best properties
│ │ └── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }, { dist: Broadcast }]
Expand Down Expand Up @@ -89,13 +87,13 @@ group by t_10.a, t_100.a
----
Memo
├── root group: #8
├── estimated memory: 26.72 KiB
├── estimated memory: 26.12 KiB
├── Group #0
│ ├── Best properties
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: []
│ │ └── { dist: Hash(t_1000.a (#2)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ └── { dist: Hash(t_1000.a (#2)::Int32 NULL, t_1000.a (#2)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
│ ├── #0 Scan []
│ └── #1 Exchange: (Hash(t_1000.a (#2)::Int32 NULL)) [#0]
│ └── #1 Exchange: (Hash(t_1000.a (#2)::Int32 NULL,t_1000.a (#2)::Int32 NULL)) [#0]
├── Group #1
│ ├── Best properties
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: []
Expand All @@ -114,12 +112,12 @@ Memo
│ ├── Best properties
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }, { dist: Broadcast }]
│ │ ├── { dist: Broadcast }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ ├── { dist: Hash(t_10.a (#0)::Int32 NULL) }: expr: #2, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ └── { dist: Hash(t_100.a (#1)::Int32 NULL) }: expr: #0, cost: <slt:ignore>, children: [{ dist: Hash(t_100.a (#1)::Int32 NULL) }, { dist: Hash(t_10.a (#0)::Int32 NULL) }]
│ │ ├── { dist: Hash(t_10.a (#0)::Int32 NULL) }: expr: #3, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ └── { dist: Hash(t_10.a (#0)::Int32 NULL, t_100.a (#1)::Int32 NULL) }: expr: #2, cost: <slt:ignore>, children: [{ dist: Any }]
│ ├── #0 Join [#1, #2]
│ ├── #1 Exchange: (Broadcast) [#3]
│ ├── #2 Exchange: (Hash(t_10.a (#0)::Int32 NULL)) [#3]
│ └── #3 Exchange: (Hash(t_100.a (#1)::Int32 NULL)) [#3]
│ ├── #2 Exchange: (Hash(t_10.a (#0)::Int32 NULL,t_100.a (#1)::Int32 NULL)) [#3]
│ └── #3 Exchange: (Hash(t_10.a (#0)::Int32 NULL)) [#3]
├── Group #4
│ ├── Best properties
│ │ └── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }, { dist: Broadcast }]
Expand Down Expand Up @@ -160,9 +158,9 @@ Memo
├── Group #11
│ ├── Best properties
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ └── { dist: Hash(t_1000.a (#2)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ └── { dist: Hash(t_1000.a (#2)::Int32 NULL, t_1000.a (#2)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
│ ├── #0 Aggregate [#10]
│ └── #1 Exchange: (Hash(t_1000.a (#2)::Int32 NULL)) [#11]
│ └── #1 Exchange: (Hash(t_1000.a (#2)::Int32 NULL,t_1000.a (#2)::Int32 NULL)) [#11]
├── Group #12
│ ├── Best properties
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }, { dist: Broadcast }]
Expand All @@ -184,13 +182,11 @@ Memo
├── Group #16
│ ├── Best properties
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ ├── { dist: Broadcast }: expr: #3, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ ├── { dist: Hash(t_10.a (#0)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ └── { dist: Hash(t_100.a (#1)::Int32 NULL) }: expr: #2, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ ├── { dist: Broadcast }: expr: #2, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ └── { dist: Hash(t_10.a (#0)::Int32 NULL, t_100.a (#1)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
│ ├── #0 Aggregate [#15]
│ ├── #1 Exchange: (Hash(t_10.a (#0)::Int32 NULL)) [#16]
│ ├── #2 Exchange: (Hash(t_100.a (#1)::Int32 NULL)) [#16]
│ └── #3 Exchange: (Broadcast) [#16]
│ ├── #1 Exchange: (Hash(t_10.a (#0)::Int32 NULL,t_100.a (#1)::Int32 NULL)) [#16]
│ └── #2 Exchange: (Broadcast) [#16]
├── Group #17
│ ├── Best properties
│ │ └── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }, { dist: Broadcast }]
Expand Down
12 changes: 5 additions & 7 deletions tests/sqllogictests/suites/mode/cluster/memo/join_property.test
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ select * from t_10, t_100, t_1000 where t_10.a = t_1000.a and t_100.a = t_1000.a
----
Memo
├── root group: #5
├── estimated memory: 8.31 KiB
├── estimated memory: 7.72 KiB
├── Group #0
│ ├── Best properties
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: []
│ │ └── { dist: Hash(t_1000.a (#2)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ └── { dist: Hash(t_1000.a (#2)::Int32 NULL, t_1000.a (#2)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
│ ├── #0 Scan []
│ └── #1 Exchange: (Hash(t_1000.a (#2)::Int32 NULL)) [#0]
│ └── #1 Exchange: (Hash(t_1000.a (#2)::Int32 NULL,t_1000.a (#2)::Int32 NULL)) [#0]
├── Group #1
│ ├── Best properties
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: []
Expand All @@ -50,12 +50,10 @@ Memo
│ ├── Best properties
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }, { dist: Broadcast }]
│ │ ├── { dist: Broadcast }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ ├── { dist: Hash(t_10.a (#0)::Int32 NULL) }: expr: #2, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ └── { dist: Hash(t_100.a (#1)::Int32 NULL) }: expr: #0, cost: <slt:ignore>, children: [{ dist: Hash(t_100.a (#1)::Int32 NULL) }, { dist: Hash(t_10.a (#0)::Int32 NULL) }]
│ │ └── { dist: Hash(t_10.a (#0)::Int32 NULL, t_100.a (#1)::Int32 NULL) }: expr: #2, cost: <slt:ignore>, children: [{ dist: Any }]
│ ├── #0 Join [#1, #2]
│ ├── #1 Exchange: (Broadcast) [#3]
│ ├── #2 Exchange: (Hash(t_10.a (#0)::Int32 NULL)) [#3]
│ └── #3 Exchange: (Hash(t_100.a (#1)::Int32 NULL)) [#3]
│ └── #2 Exchange: (Hash(t_10.a (#0)::Int32 NULL,t_100.a (#1)::Int32 NULL)) [#3]
├── Group #4
│ ├── Best properties
│ │ └── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }, { dist: Broadcast }]
Expand Down
24 changes: 11 additions & 13 deletions tests/sqllogictests/suites/mode/cluster/memo/mix_property.test
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ limit 10
----
Memo
├── root group: #10
├── estimated memory: 27.91 KiB
├── estimated memory: 27.31 KiB
├── Group #0
│ ├── Best properties
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: []
│ │ └── { dist: Hash(t_1000.a (#0)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ └── { dist: Hash(t_1000.a (#0)::Int32 NULL, t_1000.a (#0)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
│ ├── #0 Scan []
│ └── #1 Exchange: (Hash(t_1000.a (#0)::Int32 NULL)) [#0]
│ └── #1 Exchange: (Hash(t_1000.a (#0)::Int32 NULL,t_1000.a (#0)::Int32 NULL)) [#0]
├── Group #1
│ ├── Best properties
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: []
Expand All @@ -54,11 +54,11 @@ Memo
│ ├── Best properties
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }, { dist: Broadcast }]
│ │ ├── { dist: Broadcast }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ ├── { dist: Hash(t_10.a (#2)::Int32 NULL) }: expr: #2, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ ├── { dist: Hash(t_10.a (#2)::Int32 NULL, t_100.a (#1)::Int32 NULL) }: expr: #2, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ └── { dist: Hash(t_100.a (#1)::Int32 NULL) }: expr: #0, cost: <slt:ignore>, children: [{ dist: Hash(t_100.a (#1)::Int32 NULL) }, { dist: Hash(t_10.a (#2)::Int32 NULL) }]
│ ├── #0 Join [#1, #2]
│ ├── #1 Exchange: (Broadcast) [#3]
│ ├── #2 Exchange: (Hash(t_10.a (#2)::Int32 NULL)) [#3]
│ ├── #2 Exchange: (Hash(t_10.a (#2)::Int32 NULL,t_100.a (#1)::Int32 NULL)) [#3]
│ └── #3 Exchange: (Hash(t_100.a (#1)::Int32 NULL)) [#3]
├── Group #4
│ ├── Best properties
Expand Down Expand Up @@ -108,9 +108,9 @@ Memo
├── Group #13
│ ├── Best properties
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ └── { dist: Hash(t_1000.a (#0)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ └── { dist: Hash(t_1000.a (#0)::Int32 NULL, t_1000.a (#0)::Int32 NULL) }: expr: #1, cost: 111816.000, children: [{ dist: Any }]
│ ├── #0 Aggregate [#12]
│ └── #1 Exchange: (Hash(t_1000.a (#0)::Int32 NULL)) [#13]
│ └── #1 Exchange: (Hash(t_1000.a (#0)::Int32 NULL,t_1000.a (#0)::Int32 NULL)) [#13]
├── Group #14
│ ├── Best properties
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }, { dist: Broadcast }]
Expand All @@ -132,13 +132,11 @@ Memo
├── Group #18
│ ├── Best properties
│ │ ├── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ ├── { dist: Broadcast }: expr: #3, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ ├── { dist: Hash(t_10.a (#2)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ └── { dist: Hash(t_100.a (#1)::Int32 NULL) }: expr: #2, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ ├── { dist: Broadcast }: expr: #2, cost: <slt:ignore>, children: [{ dist: Any }]
│ │ └── { dist: Hash(t_10.a (#2)::Int32 NULL, t_100.a (#1)::Int32 NULL) }: expr: #1, cost: <slt:ignore>, children: [{ dist: Any }]
│ ├── #0 Aggregate [#17]
│ ├── #1 Exchange: (Hash(t_10.a (#2)::Int32 NULL)) [#18]
│ ├── #2 Exchange: (Hash(t_100.a (#1)::Int32 NULL)) [#18]
│ └── #3 Exchange: (Broadcast) [#18]
│ ├── #1 Exchange: (Hash(t_10.a (#2)::Int32 NULL,t_100.a (#1)::Int32 NULL)) [#18]
│ └── #2 Exchange: (Broadcast) [#18]
├── Group #19
│ ├── Best properties
│ │ └── { dist: Any }: expr: #0, cost: <slt:ignore>, children: [{ dist: Any }, { dist: Broadcast }]
Expand Down
Loading