Skip to content

Commit b85bf10

Browse files
committed
Fix regression in updating equi-join optimization (#686)
1 parent d127018 commit b85bf10

File tree

10 files changed

+160
-42
lines changed

10 files changed

+160
-42
lines changed

crates/arroyo-planner/src/extension/join.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,25 +42,29 @@ impl ArroyoExtension for JoinExtension {
4242
join_plan.clone(),
4343
&ArroyoPhysicalExtensionCodec::default(),
4444
)?;
45+
4546
let operator_name = if self.is_instant {
4647
OperatorName::InstantJoin
4748
} else {
4849
OperatorName::Join
4950
};
51+
5052
let config = JoinOperator {
5153
name: format!("join_{}", index),
5254
left_schema: Some(left_schema.as_ref().clone().into()),
5355
right_schema: Some(right_schema.as_ref().clone().into()),
5456
output_schema: Some(self.output_schema().into()),
5557
join_plan: physical_plan_node.encode_to_vec(),
5658
};
59+
5760
let logical_node = LogicalNode {
5861
operator_id: format!("join_{}", index),
5962
description: "join".to_string(),
6063
operator_name,
6164
operator_config: config.encode_to_vec(),
6265
parallelism: 1,
6366
};
67+
6468
let left_edge =
6569
LogicalEdge::project_all(LogicalEdgeType::LeftJoin, left_schema.as_ref().clone());
6670
let right_edge =

crates/arroyo-planner/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ use datafusion::logical_expr::expr_rewriter::FunctionRewrite;
7272
use std::time::{Duration, SystemTime};
7373
use std::{collections::HashMap, sync::Arc};
7474
use syn::Item;
75-
use tracing::{info, warn};
75+
use tracing::{debug, info, warn};
7676
use unicase::UniCase;
7777

7878
const DEFAULT_IDLE_TIME: Option<Duration> = Some(Duration::from_secs(5 * 60));
@@ -553,6 +553,8 @@ pub async fn parse_and_get_arrow_program(
553553

554554
let plan_rewrite = rewrite_plan(plan, &schema_provider)?;
555555

556+
debug!("Plan = {:?}", plan_rewrite);
557+
556558
let mut metadata = SourceMetadataVisitor::new(&schema_provider);
557559
plan_rewrite.visit(&mut metadata)?;
558560
used_connections.extend(metadata.connection_ids.iter());

crates/arroyo-planner/src/plan/join.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ impl JoinRewriter {
8383
name: &'static str,
8484
) -> Result<LogicalPlan> {
8585
let key_count = join_expressions.len();
86+
8687
let mut join_expressions: Vec<_> = join_expressions
8788
.into_iter()
8889
.enumerate()
@@ -263,6 +264,10 @@ impl TreeNodeRewriter for JoinRewriter {
263264
};
264265
Self::check_updating(&left, &right)?;
265266

267+
if on.is_empty() && !is_instant {
268+
return not_impl_err!("Updating joins must include an equijoin condition");
269+
}
270+
266271
let (left_expressions, right_expressions): (Vec<_>, Vec<_>) =
267272
on.clone().into_iter().unzip();
268273

crates/arroyo-planner/src/tables.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,13 @@ use std::{collections::HashMap, time::Duration};
55
use arrow_schema::{DataType, Field, FieldRef, Schema};
66
use arroyo_connectors::connector_for_type;
77

8+
use crate::extension::remote_table::RemoteTableExtension;
9+
use crate::types::convert_data_type;
10+
use crate::{
11+
external::{ProcessingMode, SqlSource},
12+
ArroyoSchemaProvider,
13+
};
14+
use crate::{rewrite_plan, DEFAULT_IDLE_TIME};
815
use arroyo_datastream::default_sink;
916
use arroyo_operator::connector::Connection;
1017
use arroyo_rpc::api_types::connections::{
@@ -29,6 +36,7 @@ use datafusion::optimizer::eliminate_limit::EliminateLimit;
2936
use datafusion::optimizer::eliminate_nested_union::EliminateNestedUnion;
3037
use datafusion::optimizer::eliminate_one_union::EliminateOneUnion;
3138
use datafusion::optimizer::eliminate_outer_join::EliminateOuterJoin;
39+
use datafusion::optimizer::extract_equijoin_predicate::ExtractEquijoinPredicate;
3240
use datafusion::optimizer::filter_null_join_keys::FilterNullJoinKeys;
3341
use datafusion::optimizer::propagate_empty_relation::PropagateEmptyRelation;
3442
use datafusion::optimizer::push_down_filter::PushDownFilter;
@@ -50,14 +58,6 @@ use datafusion::{
5058
},
5159
};
5260

53-
use crate::extension::remote_table::RemoteTableExtension;
54-
use crate::types::convert_data_type;
55-
use crate::{
56-
external::{ProcessingMode, SqlSource},
57-
ArroyoSchemaProvider,
58-
};
59-
use crate::{rewrite_plan, DEFAULT_IDLE_TIME};
60-
6161
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
6262
pub struct ConnectorTable {
6363
pub id: Option<i64>,
@@ -124,8 +124,7 @@ fn produce_optimized_plan(
124124
Arc::new(EliminateJoin::new()),
125125
Arc::new(DecorrelatePredicateSubquery::new()),
126126
Arc::new(ScalarSubqueryToJoin::new()),
127-
// Breaks window joins
128-
// Arc::new(ExtractEquijoinPredicate::new()),
127+
Arc::new(ExtractEquijoinPredicate::new()),
129128
Arc::new(SimplifyExpressions::new()),
130129
Arc::new(RewriteDisjunctivePredicate::new()),
131130
Arc::new(EliminateDuplicatedExpr::new()),
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
--fail=Updating joins must include an equijoin condition
2+
3+
CREATE TABLE cars (
4+
timestamp TIMESTAMP,
5+
car_id TEXT,
6+
driver_id BIGINT,
7+
event_type TEXT,
8+
location TEXT
9+
) WITH (
10+
connector = 'single_file',
11+
path = '$input_dir/cars.json',
12+
format = 'json',
13+
type = 'source',
14+
event_time_field = 'timestamp'
15+
);
16+
17+
CREATE TABLE passengers (
18+
timestamp TIMESTAMP,
19+
passenger_id BIGINT
20+
) WITH (
21+
connector = 'single_file',
22+
path = '$input_dir/cars.json',
23+
format = 'json',
24+
type = 'source',
25+
event_time_field = 'timestamp'
26+
);
27+
28+
select passenger_id, car_id
29+
from passengers
30+
join cars ON passenger_id < car_id;

crates/arroyo-planner/src/test/queries/windowed_inner_join.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,4 @@ INNER JOIN (
3333
COUNT(distinct driver_id) as pickup_drivers FROM cars where event_type = 'pickup'
3434
GROUP BY 1
3535
) pickups
36-
ON dropoffs.window.start = pickups.window.start)
36+
ON dropoffs.window = pickups.window)
Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,50 @@
1-
{"before":null,"after":{"left_counter":1,"counter_mod_2":0,"right_count":1},"op":"c"}
2-
{"before":null,"after":{"left_counter":1,"counter_mod_2":1,"right_count":1},"op":"c"}
3-
{"before":null,"after":{"left_counter":2,"counter_mod_2":0,"right_count":2},"op":"c"}
4-
{"before":{"left_counter":1,"counter_mod_2":0,"right_count":1},"after":null,"op":"d"}
1+
{"before":null,"after":{"left_count":1,"right_count":1},"op":"c"}
2+
{"before":null,"after":{"left_count":3,"right_count":3},"op":"c"}
3+
{"before":null,"after":{"left_count":5,"right_count":5},"op":"c"}
4+
{"before":null,"after":{"left_count":7,"right_count":7},"op":"c"}
5+
{"before":null,"after":{"left_count":9,"right_count":9},"op":"c"}
6+
{"before":null,"after":{"left_count":11,"right_count":11},"op":"c"}
7+
{"before":null,"after":{"left_count":13,"right_count":13},"op":"c"}
8+
{"before":null,"after":{"left_count":15,"right_count":15},"op":"c"}
9+
{"before":null,"after":{"left_count":17,"right_count":17},"op":"c"}
10+
{"before":null,"after":{"left_count":19,"right_count":19},"op":"c"}
11+
{"before":null,"after":{"left_count":21,"right_count":21},"op":"c"}
12+
{"before":null,"after":{"left_count":23,"right_count":23},"op":"c"}
13+
{"before":null,"after":{"left_count":25,"right_count":25},"op":"c"}
14+
{"before":null,"after":{"left_count":27,"right_count":27},"op":"c"}
15+
{"before":null,"after":{"left_count":29,"right_count":29},"op":"c"}
16+
{"before":null,"after":{"left_count":31,"right_count":31},"op":"c"}
17+
{"before":null,"after":{"left_count":33,"right_count":33},"op":"c"}
18+
{"before":null,"after":{"left_count":35,"right_count":35},"op":"c"}
19+
{"before":null,"after":{"left_count":37,"right_count":37},"op":"c"}
20+
{"before":null,"after":{"left_count":39,"right_count":39},"op":"c"}
21+
{"before":null,"after":{"left_count":41,"right_count":41},"op":"c"}
22+
{"before":null,"after":{"left_count":43,"right_count":43},"op":"c"}
23+
{"before":null,"after":{"left_count":45,"right_count":45},"op":"c"}
24+
{"before":null,"after":{"left_count":47,"right_count":47},"op":"c"}
25+
{"before":null,"after":{"left_count":49,"right_count":49},"op":"c"}
26+
{"before":null,"after":{"left_count":51,"right_count":51},"op":"c"}
27+
{"before":null,"after":{"left_count":53,"right_count":53},"op":"c"}
28+
{"before":null,"after":{"left_count":55,"right_count":55},"op":"c"}
29+
{"before":null,"after":{"left_count":57,"right_count":57},"op":"c"}
30+
{"before":null,"after":{"left_count":59,"right_count":59},"op":"c"}
31+
{"before":null,"after":{"left_count":61,"right_count":61},"op":"c"}
32+
{"before":null,"after":{"left_count":63,"right_count":63},"op":"c"}
33+
{"before":null,"after":{"left_count":65,"right_count":65},"op":"c"}
34+
{"before":null,"after":{"left_count":67,"right_count":67},"op":"c"}
35+
{"before":null,"after":{"left_count":69,"right_count":69},"op":"c"}
36+
{"before":null,"after":{"left_count":71,"right_count":71},"op":"c"}
37+
{"before":null,"after":{"left_count":73,"right_count":73},"op":"c"}
38+
{"before":null,"after":{"left_count":75,"right_count":75},"op":"c"}
39+
{"before":null,"after":{"left_count":77,"right_count":77},"op":"c"}
40+
{"before":null,"after":{"left_count":79,"right_count":79},"op":"c"}
41+
{"before":null,"after":{"left_count":81,"right_count":81},"op":"c"}
42+
{"before":null,"after":{"left_count":83,"right_count":83},"op":"c"}
43+
{"before":null,"after":{"left_count":85,"right_count":85},"op":"c"}
44+
{"before":null,"after":{"left_count":87,"right_count":87},"op":"c"}
45+
{"before":null,"after":{"left_count":89,"right_count":89},"op":"c"}
46+
{"before":null,"after":{"left_count":91,"right_count":91},"op":"c"}
47+
{"before":null,"after":{"left_count":93,"right_count":93},"op":"c"}
48+
{"before":null,"after":{"left_count":95,"right_count":95},"op":"c"}
49+
{"before":null,"after":{"left_count":97,"right_count":97},"op":"c"}
50+
{"before":null,"after":{"left_count":99,"right_count":99},"op":"c"}
Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,32 @@
1-
--fail=Error during planning: can't handle updating right side of join
21
CREATE TABLE impulse (
3-
timestamp TIMESTAMP,
4-
counter bigint unsigned not null,
5-
subtask_index bigint unsigned not null
6-
) WITH (
7-
connector = 'single_file',
8-
path = '$input_dir/impulse.json',
9-
format = 'json',
10-
type = 'source',
11-
event_time_field = 'timestamp'
12-
);
2+
timestamp TIMESTAMP,
3+
counter bigint unsigned not null,
4+
subtask_index bigint unsigned not null
5+
) WITH (
6+
connector = 'single_file',
7+
path = '$input_dir/impulse.json',
8+
format = 'json',
9+
type = 'source',
10+
event_time_field = 'timestamp'
11+
);
1312

1413

15-
CREATE TABLE output (
16-
left_counter bigint,
17-
counter_mod_2 bigint,
18-
right_count bigint
19-
) WITH (
20-
connector = 'single_file',
21-
path = '$output_path',
22-
format = 'debezium_json',
23-
type = 'sink'
24-
);
14+
CREATE VIEW impulse_odd AS (
15+
SELECT * FROM impulse
16+
WHERE counter % 2 == 1
17+
);
2518

26-
INSERT INTO output
27-
select counter as left_counter, counter_mod_2, right_count from impulse inner join
28-
(select counter % 2 as counter_mod_2, cast(count(*) as bigint UNSIGNED) as right_count from impulse where counter < 3 group by 1)
29-
on counter = right_count where counter < 3;
19+
CREATE TABLE output (
20+
left_count bigint,
21+
right_count bigint
22+
) WITH (
23+
connector = 'single_file',
24+
path = '$output_path',
25+
format = 'debezium_json',
26+
type = 'sink'
27+
);
28+
29+
INSERT INTO output
30+
SELECT A.counter, B.counter
31+
FROM impulse A
32+
JOIN impulse_odd B ON A.counter = B.counter;
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
--fail=Error during planning: can't handle updating right side of join
2+
CREATE TABLE impulse (
3+
timestamp TIMESTAMP,
4+
counter bigint unsigned not null,
5+
subtask_index bigint unsigned not null
6+
) WITH (
7+
connector = 'single_file',
8+
path = '$input_dir/impulse.json',
9+
format = 'json',
10+
type = 'source',
11+
event_time_field = 'timestamp'
12+
);
13+
14+
15+
CREATE TABLE output (
16+
left_counter bigint,
17+
counter_mod_2 bigint,
18+
right_count bigint
19+
) WITH (
20+
connector = 'single_file',
21+
path = '$output_path',
22+
format = 'debezium_json',
23+
type = 'sink'
24+
);
25+
26+
INSERT INTO output
27+
select counter as left_counter, counter_mod_2, right_count from impulse inner join
28+
(select counter % 2 as counter_mod_2, cast(count(*) as bigint UNSIGNED) as right_count from impulse where counter < 3 group by 1)
29+
on counter = right_count where counter < 3;

crates/arroyo-sql-testing/src/test/queries/windowed_inner_join.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,4 @@ INNER JOIN (
3333
COUNT(distinct driver_id) as pickup_drivers FROM cars where event_type = 'pickup'
3434
GROUP BY 1
3535
) pickups
36-
ON dropoffs.window.start = pickups.window.start)
36+
ON dropoffs.window = pickups.window)

0 commit comments

Comments
 (0)