Skip to content

Commit

Permalink
add/update planner tests
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Dec 31, 2024
1 parent a2e7512 commit 6a5c106
Show file tree
Hide file tree
Showing 15 changed files with 698 additions and 543 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
- name: explicit watermark definition + implicit proctime watermark
sql: |
create table t (
foo int,
event_time timestamp,
proc_time timestamp as proctime(),
watermark for event_time as event_time - interval '5 minutes'
) append only;
select * from t;
expected_outputs:
- stream_plan
- eowc_stream_plan
- name: tumble window_start + window_end
sql: |
create table t (
foo int,
ts timestamp,
watermark for ts as ts - interval '5 minutes'
) append only;
select
foo, window_start, window_end
from tumble(t, ts, interval '1 hour');
expected_outputs:
- stream_plan
- eowc_stream_plan
- name: hop window_start + window_end
sql: |
create table t (
foo int,
ts timestamp,
watermark for ts as ts - interval '5 minutes'
) append only;
select
foo, window_start, window_end
from hop(t, ts, interval '20 mins', interval '1 hour');
expected_outputs:
- stream_plan
- eowc_stream_plan
- name: join window_start = window_start
sql: |
create table t (
foo int,
ts timestamp,
watermark for ts as ts - interval '5 minutes'
) append only;
select
foo1 + foo2 as foo,
win1, win2
from (
select foo as foo1, window_start as win1 from tumble(t, ts, interval '5 mins')
) as left
join (
select foo + 1 as foo2, window_start as win2 from tumble(t, ts, interval '10 mins')
) as right
on win1 = win2;
expected_outputs:
- stream_plan
- eowc_stream_plan
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,32 @@
select v1, min(v2), count(distinct v3) as agg from t group by v1;
stream_plan: |-
StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: NoCheck, watermark_columns: [v1] }
└─StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [v1] }
└─StreamHashAgg [append_only] { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [v1] }
└─StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [[v1]] }
└─StreamHashAgg [append_only] { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [[v1]] }
└─StreamExchange { dist: HashShard(v1) }
└─StreamRowIdGen { row_id_index: 3 }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [v1] }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [[v1]] }
└─StreamSource { source: t, columns: [v1, v2, v3, _row_id] }
eowc_stream_plan: |-
StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: NoCheck, watermark_columns: [v1] }
└─StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [v1] }
└─StreamHashAgg [append_only, eowc] { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [v1] }
└─StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [[v1]] }
└─StreamHashAgg [append_only, eowc] { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [[v1]] }
└─StreamExchange { dist: HashShard(v1) }
└─StreamRowIdGen { row_id_index: 3 }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [v1] }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [[v1]] }
└─StreamSource { source: t, columns: [v1, v2, v3, _row_id] }
eowc_stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: NoCheck, watermark_columns: [v1] }
├── tables: [ Materialize: 4294967294 ]
└── StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [v1] }
└── StreamHashAgg [append_only, eowc] { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [v1] }
└── StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [[v1]] }
└── StreamHashAgg [append_only, eowc] { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [[v1]] }
├── tables: [ HashAggState: 0, HashAggDedupForCol2: 1 ]
└── StreamExchange Hash([0]) from 1
Fragment 1
StreamRowIdGen { row_id_index: 3 }
└── StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [v1] }
└── StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [[v1]] }
├── tables: [ WatermarkFilter: 2 ]
└── StreamSource { source: t, columns: [v1, v2, v3, _row_id] } { tables: [ Source: 3 ] }
Expand Down Expand Up @@ -89,28 +89,28 @@
GROUP BY window_start;
stream_plan: |-
StreamMaterialize { columns: [window_start, max], stream_key: [window_start], pk_columns: [window_start], pk_conflict: NoCheck, watermark_columns: [window_start] }
└─StreamProject { exprs: [$expr1, max(t.b)], output_watermarks: [$expr1] }
└─StreamHashAgg [append_only] { group_key: [$expr1], aggs: [max(t.b), count], output_watermarks: [$expr1] }
└─StreamProject { exprs: [$expr1, max(t.b)], output_watermarks: [[$expr1]] }
└─StreamHashAgg [append_only] { group_key: [$expr1], aggs: [max(t.b), count], output_watermarks: [[$expr1]] }
└─StreamExchange { dist: HashShard($expr1) }
└─StreamProject { exprs: [TumbleStart(t.a, '01:00:00':Interval) as $expr1, t.b, t._row_id], output_watermarks: [$expr1] }
└─StreamProject { exprs: [TumbleStart(t.a, '01:00:00':Interval) as $expr1, t.b, t._row_id], output_watermarks: [[$expr1]] }
└─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
eowc_stream_plan: |-
StreamMaterialize { columns: [window_start, max], stream_key: [window_start], pk_columns: [window_start], pk_conflict: NoCheck, watermark_columns: [window_start] }
└─StreamProject { exprs: [$expr1, max(t.b)], output_watermarks: [$expr1] }
└─StreamHashAgg [append_only, eowc] { group_key: [$expr1], aggs: [max(t.b), count], output_watermarks: [$expr1] }
└─StreamProject { exprs: [$expr1, max(t.b)], output_watermarks: [[$expr1]] }
└─StreamHashAgg [append_only, eowc] { group_key: [$expr1], aggs: [max(t.b), count], output_watermarks: [[$expr1]] }
└─StreamExchange { dist: HashShard($expr1) }
└─StreamProject { exprs: [TumbleStart(t.a, '01:00:00':Interval) as $expr1, t.b, t._row_id], output_watermarks: [$expr1] }
└─StreamProject { exprs: [TumbleStart(t.a, '01:00:00':Interval) as $expr1, t.b, t._row_id], output_watermarks: [[$expr1]] }
└─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
eowc_stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [window_start, max], stream_key: [window_start], pk_columns: [window_start], pk_conflict: NoCheck, watermark_columns: [window_start] }
├── tables: [ Materialize: 4294967294 ]
└── StreamProject { exprs: [$expr1, max(t.b)], output_watermarks: [$expr1] }
└── StreamHashAgg [append_only, eowc] { group_key: [$expr1], aggs: [max(t.b), count], output_watermarks: [$expr1] } { tables: [ HashAggState: 0 ] }
└── StreamProject { exprs: [$expr1, max(t.b)], output_watermarks: [[$expr1]] }
└── StreamHashAgg [append_only, eowc] { group_key: [$expr1], aggs: [max(t.b), count], output_watermarks: [[$expr1]] } { tables: [ HashAggState: 0 ] }
└── StreamExchange Hash([0]) from 1
Fragment 1
StreamProject { exprs: [TumbleStart(t.a, '01:00:00':Interval) as $expr1, t.b, t._row_id], output_watermarks: [$expr1] }
StreamProject { exprs: [TumbleStart(t.a, '01:00:00':Interval) as $expr1, t.b, t._row_id], output_watermarks: [[$expr1]] }
└── StreamTableScan { table: t, columns: [t.a, t.b, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
├── tables: [ StreamScan: 1 ]
├── Upstream
Expand All @@ -137,9 +137,9 @@
└─StreamEowcOverWindow { window_functions: [first_value(a) OVER(PARTITION BY b ORDER BY tm ASC ROWS BETWEEN 2 PRECEDING AND 2 PRECEDING)] }
└─StreamEowcSort { sort_column: tm }
└─StreamExchange { dist: HashShard(b) }
└─StreamProject { exprs: [a, b, tm, _row_id], output_watermarks: [tm] }
└─StreamProject { exprs: [a, b, tm, _row_id], output_watermarks: [[tm]] }
└─StreamRowIdGen { row_id_index: 3 }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: tm, expr: (tm - '00:05:00':Interval) }], output_watermarks: [tm] }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: tm, expr: (tm - '00:05:00':Interval) }], output_watermarks: [[tm]] }
└─StreamSource { source: t, columns: [a, b, tm, _row_id] }
eowc_stream_dist_plan: |+
Fragment 0
Expand All @@ -152,9 +152,9 @@
└── StreamExchange Hash([1]) from 1
Fragment 1
StreamProject { exprs: [a, b, tm, _row_id], output_watermarks: [tm] }
StreamProject { exprs: [a, b, tm, _row_id], output_watermarks: [[tm]] }
└── StreamRowIdGen { row_id_index: 3 }
└── StreamWatermarkFilter { watermark_descs: [Desc { column: tm, expr: (tm - '00:05:00':Interval) }], output_watermarks: [tm] }
└── StreamWatermarkFilter { watermark_descs: [Desc { column: tm, expr: (tm - '00:05:00':Interval) }], output_watermarks: [[tm]] }
├── tables: [ WatermarkFilter: 2 ]
└── StreamSource { source: t, columns: [a, b, tm, _row_id] } { tables: [ Source: 3 ] }
Expand Down Expand Up @@ -228,9 +228,9 @@
StreamMaterialize { columns: [id1, value1, id2, value2, ts1, ts2, s1._row_id(hidden), s2._row_id(hidden), count], stream_key: [s1._row_id, s2._row_id, id1, value2], pk_columns: [s1._row_id, s2._row_id, id1, value2], pk_conflict: NoCheck }
└─StreamOverWindow { window_functions: [count() OVER(PARTITION BY s2.value ORDER BY s2.ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─StreamExchange { dist: HashShard(s2.value) }
└─StreamHashJoin [interval, append_only] { type: Inner, predicate: s1.id = s2.id AND (s1.ts >= s2.ts) AND ($expr1 <= s2.ts), conditions_to_clean_left_state_table: (s1.ts >= s2.ts), conditions_to_clean_right_state_table: ($expr1 <= s2.ts), output_watermarks: [s1.ts, s2.ts], output: [s1.id, s1.value, s2.id, s2.value, s1.ts, s2.ts, s1._row_id, s2._row_id] }
└─StreamHashJoin [interval, append_only] { type: Inner, predicate: s1.id = s2.id AND (s1.ts >= s2.ts) AND ($expr1 <= s2.ts), conditions_to_clean_left_state_table: (s1.ts >= s2.ts), conditions_to_clean_right_state_table: ($expr1 <= s2.ts), output_watermarks: [[s1.ts], [s2.ts]], output: [s1.id, s1.value, s2.id, s2.value, s1.ts, s2.ts, s1._row_id, s2._row_id] }
├─StreamExchange { dist: HashShard(s1.id) }
│ └─StreamProject { exprs: [s1.id, s1.value, s1.ts, (s1.ts - '00:01:00':Interval) as $expr1, s1._row_id], output_watermarks: [s1.ts, $expr1] }
│ └─StreamProject { exprs: [s1.id, s1.value, s1.ts, (s1.ts - '00:01:00':Interval) as $expr1, s1._row_id], output_watermarks: [[s1.ts, $expr1]] }
│ └─StreamTableScan { table: s1, columns: [s1.id, s1.value, s1.ts, s1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s1._row_id], pk: [_row_id], dist: UpstreamHashShard(s1._row_id) }
└─StreamExchange { dist: HashShard(s2.id) }
└─StreamTableScan { table: s2, columns: [s2.id, s2.value, s2.ts, s2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s2._row_id], pk: [_row_id], dist: UpstreamHashShard(s2._row_id) }
Expand All @@ -239,9 +239,9 @@
└─StreamEowcOverWindow { window_functions: [count() OVER(PARTITION BY s2.value ORDER BY s2.ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─StreamEowcSort { sort_column: s2.ts }
└─StreamExchange { dist: HashShard(s2.value) }
└─StreamHashJoin [interval, append_only] { type: Inner, predicate: s1.id = s2.id AND (s1.ts >= s2.ts) AND ($expr1 <= s2.ts), conditions_to_clean_left_state_table: (s1.ts >= s2.ts), conditions_to_clean_right_state_table: ($expr1 <= s2.ts), output_watermarks: [s1.ts, s2.ts], output: [s1.id, s1.value, s2.id, s2.value, s1.ts, s2.ts, s1._row_id, s2._row_id] }
└─StreamHashJoin [interval, append_only] { type: Inner, predicate: s1.id = s2.id AND (s1.ts >= s2.ts) AND ($expr1 <= s2.ts), conditions_to_clean_left_state_table: (s1.ts >= s2.ts), conditions_to_clean_right_state_table: ($expr1 <= s2.ts), output_watermarks: [[s1.ts], [s2.ts]], output: [s1.id, s1.value, s2.id, s2.value, s1.ts, s2.ts, s1._row_id, s2._row_id] }
├─StreamExchange { dist: HashShard(s1.id) }
│ └─StreamProject { exprs: [s1.id, s1.value, s1.ts, (s1.ts - '00:01:00':Interval) as $expr1, s1._row_id], output_watermarks: [s1.ts, $expr1] }
│ └─StreamProject { exprs: [s1.id, s1.value, s1.ts, (s1.ts - '00:01:00':Interval) as $expr1, s1._row_id], output_watermarks: [[s1.ts, $expr1]] }
│ └─StreamTableScan { table: s1, columns: [s1.id, s1.value, s1.ts, s1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s1._row_id], pk: [_row_id], dist: UpstreamHashShard(s1._row_id) }
└─StreamExchange { dist: HashShard(s2.id) }
└─StreamTableScan { table: s2, columns: [s2.id, s2.value, s2.ts, s2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s2._row_id], pk: [_row_id], dist: UpstreamHashShard(s2._row_id) }
14 changes: 7 additions & 7 deletions src/frontend/planner_test/tests/testdata/output/expr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@
└─LogicalScan { table: t, columns: [t.v1, t._row_id, t._rw_timestamp] }
stream_plan: |-
StreamMaterialize { columns: [v1, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck, watermark_columns: [v1] }
└─StreamDynamicFilter { predicate: (t.v1 >= now), output_watermarks: [t.v1], output: [t.v1, t._row_id], cleaned_by_watermark: true }
└─StreamDynamicFilter { predicate: (t.v1 >= now), output_watermarks: [[t.v1]], output: [t.v1, t._row_id], cleaned_by_watermark: true }
├─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamNow { output: [now] }
Expand All @@ -498,19 +498,19 @@
└─LogicalScan { table: t, columns: [t.v1, t._row_id, t._rw_timestamp] }
stream_plan: |-
StreamMaterialize { columns: [v1, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck, watermark_columns: [v1] }
└─StreamDynamicFilter { predicate: (t.v1 >= $expr1), output_watermarks: [t.v1], output: [t.v1, t._row_id], cleaned_by_watermark: true }
└─StreamDynamicFilter { predicate: (t.v1 >= $expr1), output_watermarks: [[t.v1]], output: [t.v1, t._row_id], cleaned_by_watermark: true }
├─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [SubtractWithTimeZone(now, '00:00:02':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] }
└─StreamProject { exprs: [SubtractWithTimeZone(now, '00:00:02':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [[$expr1]] }
└─StreamNow { output: [now] }
- name: and of two now expression condition
sql: |
create table t (v1 timestamp with time zone, v2 timestamp with time zone);
select * from t where v1 >= now() and v2 >= now();
stream_plan: |-
StreamMaterialize { columns: [v1, v2, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck, watermark_columns: [v2] }
└─StreamDynamicFilter { predicate: (t.v2 >= now), output_watermarks: [t.v2], output: [t.v1, t.v2, t._row_id], cleaned_by_watermark: true }
├─StreamDynamicFilter { predicate: (t.v1 >= now), output_watermarks: [t.v1], output: [t.v1, t.v2, t._row_id], cleaned_by_watermark: true }
└─StreamDynamicFilter { predicate: (t.v2 >= now), output_watermarks: [[t.v2]], output: [t.v1, t.v2, t._row_id], cleaned_by_watermark: true }
├─StreamDynamicFilter { predicate: (t.v1 >= now), output_watermarks: [[t.v1]], output: [t.v1, t.v2, t._row_id], cleaned_by_watermark: true }
│ ├─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
│ └─StreamExchange { dist: Broadcast }
│ └─StreamNow { output: [now] }
Expand All @@ -527,8 +527,8 @@
select max(v1) as max_time from t group by v2 having max(v1) >= now();
stream_plan: |-
StreamMaterialize { columns: [max_time, t.v2(hidden)], stream_key: [t.v2], pk_columns: [t.v2], pk_conflict: NoCheck, watermark_columns: [max_time] }
└─StreamProject { exprs: [max(t.v1), t.v2], output_watermarks: [max(t.v1)] }
└─StreamDynamicFilter { predicate: (max(t.v1) >= now), output_watermarks: [max(t.v1)], output: [t.v2, max(t.v1)], cleaned_by_watermark: true }
└─StreamProject { exprs: [max(t.v1), t.v2], output_watermarks: [[max(t.v1)]] }
└─StreamDynamicFilter { predicate: (max(t.v1) >= now), output_watermarks: [[max(t.v1)]], output: [t.v2, max(t.v1)], cleaned_by_watermark: true }
├─StreamProject { exprs: [t.v2, max(t.v1)] }
│ └─StreamHashAgg { group_key: [t.v2], aggs: [max(t.v1), count] }
│ └─StreamExchange { dist: HashShard(t.v2) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,5 @@
└─LogicalNow { output: [ts] }
stream_plan: |-
StreamMaterialize { columns: [year, month, constant, ts(hidden)], stream_key: [ts], pk_columns: [ts], pk_conflict: NoCheck, watermark_columns: [ts(hidden)] }
└─StreamProject { exprs: [Extract('YEAR':Varchar, ts, 'UTC':Varchar) as $expr1, Extract('MONTH':Varchar, ts, 'UTC':Varchar) as $expr2, 1:Int32, ts], output_watermarks: [ts] }
└─StreamProject { exprs: [Extract('YEAR':Varchar, ts, 'UTC':Varchar) as $expr1, Extract('MONTH':Varchar, ts, 'UTC':Varchar) as $expr2, 1:Int32, ts], output_watermarks: [[ts]] }
└─StreamNow { output: [ts] }
Loading

0 comments on commit 6a5c106

Please sign in to comment.