Skip to content

feat(optimizer): introduce watermark group #19894

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jan 6, 2025
Merged
25 changes: 25 additions & 0 deletions src/common/src/util/functional.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#[easy_ext::ext(SameOrElseExt)]
pub impl<T: Eq> T {
/// Check if `self` and `other` are equal, if so, return `self`, otherwise return the result of `f()`.
fn same_or_else(self, other: T, f: impl FnOnce() -> T) -> T {
if self == other {
self
} else {
f()
}
}
}
1 change: 1 addition & 0 deletions src/common/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod pretty_bytes;
pub mod prost;
pub mod query_log;
pub use rw_resource_util as resource_util;
pub mod functional;
pub mod recursive;
pub mod row_id;
pub mod row_serde;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
- name: explicit watermark definition + implicit proctime watermark
sql: |
create table t (
foo int,
event_time timestamp,
proc_time timestamp as proctime(),
watermark for event_time as event_time - interval '5 minutes'
) append only;
select * from t;
expected_outputs:
- stream_plan
- eowc_stream_plan
- name: tumble window_start + window_end
sql: |
create table t (
foo int,
ts timestamp,
watermark for ts as ts - interval '5 minutes'
) append only;
select
foo, window_start, window_end
from tumble(t, ts, interval '1 hour');
expected_outputs:
- stream_plan
- eowc_stream_plan
- name: hop window_start + window_end
sql: |
create table t (
foo int,
ts timestamp,
watermark for ts as ts - interval '5 minutes'
) append only;
select
foo, window_start, window_end
from hop(t, ts, interval '20 mins', interval '1 hour');
expected_outputs:
- stream_plan
- eowc_stream_plan
- name: join window_start = window_start
sql: |
create table t (
foo int,
ts timestamp,
watermark for ts as ts - interval '5 minutes'
) append only;
select
foo1 + foo2 as foo,
win1, win2
from (
select foo as foo1, window_start as win1 from tumble(t, ts, interval '5 mins')
) as left
join (
select foo + 1 as foo2, window_start as win2 from tumble(t, ts, interval '10 mins')
) as right
on win1 = win2;
expected_outputs:
- stream_plan
- eowc_stream_plan
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,32 @@
select v1, min(v2), count(distinct v3) as agg from t group by v1;
stream_plan: |-
StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: NoCheck, watermark_columns: [v1] }
└─StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [v1] }
└─StreamHashAgg [append_only] { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [v1] }
└─StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [[v1]] }
└─StreamHashAgg [append_only] { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [[v1]] }
└─StreamExchange { dist: HashShard(v1) }
└─StreamRowIdGen { row_id_index: 3 }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [v1] }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [[v1]] }
└─StreamSource { source: t, columns: [v1, v2, v3, _row_id] }
eowc_stream_plan: |-
StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: NoCheck, watermark_columns: [v1] }
└─StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [v1] }
└─StreamHashAgg [append_only, eowc] { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [v1] }
└─StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [[v1]] }
└─StreamHashAgg [append_only, eowc] { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [[v1]] }
└─StreamExchange { dist: HashShard(v1) }
└─StreamRowIdGen { row_id_index: 3 }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [v1] }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [[v1]] }
└─StreamSource { source: t, columns: [v1, v2, v3, _row_id] }
eowc_stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: NoCheck, watermark_columns: [v1] }
├── tables: [ Materialize: 4294967294 ]
└── StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [v1] }
└── StreamHashAgg [append_only, eowc] { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [v1] }
└── StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [[v1]] }
└── StreamHashAgg [append_only, eowc] { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [[v1]] }
├── tables: [ HashAggState: 0, HashAggDedupForCol2: 1 ]
└── StreamExchange Hash([0]) from 1

Fragment 1
StreamRowIdGen { row_id_index: 3 }
└── StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [v1] }
└── StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [[v1]] }
├── tables: [ WatermarkFilter: 2 ]
└── StreamSource { source: t, columns: [v1, v2, v3, _row_id] } { tables: [ Source: 3 ] }

Expand Down Expand Up @@ -89,28 +89,28 @@
GROUP BY window_start;
stream_plan: |-
StreamMaterialize { columns: [window_start, max], stream_key: [window_start], pk_columns: [window_start], pk_conflict: NoCheck, watermark_columns: [window_start] }
└─StreamProject { exprs: [$expr1, max(t.b)], output_watermarks: [$expr1] }
└─StreamHashAgg [append_only] { group_key: [$expr1], aggs: [max(t.b), count], output_watermarks: [$expr1] }
└─StreamProject { exprs: [$expr1, max(t.b)], output_watermarks: [[$expr1]] }
└─StreamHashAgg [append_only] { group_key: [$expr1], aggs: [max(t.b), count], output_watermarks: [[$expr1]] }
└─StreamExchange { dist: HashShard($expr1) }
└─StreamProject { exprs: [TumbleStart(t.a, '01:00:00':Interval) as $expr1, t.b, t._row_id], output_watermarks: [$expr1] }
└─StreamProject { exprs: [TumbleStart(t.a, '01:00:00':Interval) as $expr1, t.b, t._row_id], output_watermarks: [[$expr1]] }
└─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
eowc_stream_plan: |-
StreamMaterialize { columns: [window_start, max], stream_key: [window_start], pk_columns: [window_start], pk_conflict: NoCheck, watermark_columns: [window_start] }
└─StreamProject { exprs: [$expr1, max(t.b)], output_watermarks: [$expr1] }
└─StreamHashAgg [append_only, eowc] { group_key: [$expr1], aggs: [max(t.b), count], output_watermarks: [$expr1] }
└─StreamProject { exprs: [$expr1, max(t.b)], output_watermarks: [[$expr1]] }
└─StreamHashAgg [append_only, eowc] { group_key: [$expr1], aggs: [max(t.b), count], output_watermarks: [[$expr1]] }
└─StreamExchange { dist: HashShard($expr1) }
└─StreamProject { exprs: [TumbleStart(t.a, '01:00:00':Interval) as $expr1, t.b, t._row_id], output_watermarks: [$expr1] }
└─StreamProject { exprs: [TumbleStart(t.a, '01:00:00':Interval) as $expr1, t.b, t._row_id], output_watermarks: [[$expr1]] }
└─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
eowc_stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [window_start, max], stream_key: [window_start], pk_columns: [window_start], pk_conflict: NoCheck, watermark_columns: [window_start] }
├── tables: [ Materialize: 4294967294 ]
└── StreamProject { exprs: [$expr1, max(t.b)], output_watermarks: [$expr1] }
└── StreamHashAgg [append_only, eowc] { group_key: [$expr1], aggs: [max(t.b), count], output_watermarks: [$expr1] } { tables: [ HashAggState: 0 ] }
└── StreamProject { exprs: [$expr1, max(t.b)], output_watermarks: [[$expr1]] }
└── StreamHashAgg [append_only, eowc] { group_key: [$expr1], aggs: [max(t.b), count], output_watermarks: [[$expr1]] } { tables: [ HashAggState: 0 ] }
└── StreamExchange Hash([0]) from 1

Fragment 1
StreamProject { exprs: [TumbleStart(t.a, '01:00:00':Interval) as $expr1, t.b, t._row_id], output_watermarks: [$expr1] }
StreamProject { exprs: [TumbleStart(t.a, '01:00:00':Interval) as $expr1, t.b, t._row_id], output_watermarks: [[$expr1]] }
└── StreamTableScan { table: t, columns: [t.a, t.b, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
├── tables: [ StreamScan: 1 ]
├── Upstream
Expand All @@ -137,9 +137,9 @@
└─StreamEowcOverWindow { window_functions: [first_value(a) OVER(PARTITION BY b ORDER BY tm ASC ROWS BETWEEN 2 PRECEDING AND 2 PRECEDING)] }
└─StreamEowcSort { sort_column: tm }
└─StreamExchange { dist: HashShard(b) }
└─StreamProject { exprs: [a, b, tm, _row_id], output_watermarks: [tm] }
└─StreamProject { exprs: [a, b, tm, _row_id], output_watermarks: [[tm]] }
└─StreamRowIdGen { row_id_index: 3 }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: tm, expr: (tm - '00:05:00':Interval) }], output_watermarks: [tm] }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: tm, expr: (tm - '00:05:00':Interval) }], output_watermarks: [[tm]] }
└─StreamSource { source: t, columns: [a, b, tm, _row_id] }
eowc_stream_dist_plan: |+
Fragment 0
Expand All @@ -152,9 +152,9 @@
└── StreamExchange Hash([1]) from 1

Fragment 1
StreamProject { exprs: [a, b, tm, _row_id], output_watermarks: [tm] }
StreamProject { exprs: [a, b, tm, _row_id], output_watermarks: [[tm]] }
└── StreamRowIdGen { row_id_index: 3 }
└── StreamWatermarkFilter { watermark_descs: [Desc { column: tm, expr: (tm - '00:05:00':Interval) }], output_watermarks: [tm] }
└── StreamWatermarkFilter { watermark_descs: [Desc { column: tm, expr: (tm - '00:05:00':Interval) }], output_watermarks: [[tm]] }
├── tables: [ WatermarkFilter: 2 ]
└── StreamSource { source: t, columns: [a, b, tm, _row_id] } { tables: [ Source: 3 ] }

Expand Down Expand Up @@ -228,9 +228,9 @@
StreamMaterialize { columns: [id1, value1, id2, value2, ts1, ts2, s1._row_id(hidden), s2._row_id(hidden), count], stream_key: [s1._row_id, s2._row_id, id1, value2], pk_columns: [s1._row_id, s2._row_id, id1, value2], pk_conflict: NoCheck }
└─StreamOverWindow { window_functions: [count() OVER(PARTITION BY s2.value ORDER BY s2.ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─StreamExchange { dist: HashShard(s2.value) }
└─StreamHashJoin [interval, append_only] { type: Inner, predicate: s1.id = s2.id AND (s1.ts >= s2.ts) AND ($expr1 <= s2.ts), conditions_to_clean_left_state_table: (s1.ts >= s2.ts), conditions_to_clean_right_state_table: ($expr1 <= s2.ts), output_watermarks: [s1.ts, s2.ts], output: [s1.id, s1.value, s2.id, s2.value, s1.ts, s2.ts, s1._row_id, s2._row_id] }
└─StreamHashJoin [interval, append_only] { type: Inner, predicate: s1.id = s2.id AND (s1.ts >= s2.ts) AND ($expr1 <= s2.ts), conditions_to_clean_left_state_table: (s1.ts >= s2.ts), conditions_to_clean_right_state_table: ($expr1 <= s2.ts), output_watermarks: [[s1.ts], [s2.ts]], output: [s1.id, s1.value, s2.id, s2.value, s1.ts, s2.ts, s1._row_id, s2._row_id] }
├─StreamExchange { dist: HashShard(s1.id) }
│ └─StreamProject { exprs: [s1.id, s1.value, s1.ts, (s1.ts - '00:01:00':Interval) as $expr1, s1._row_id], output_watermarks: [s1.ts, $expr1] }
│ └─StreamProject { exprs: [s1.id, s1.value, s1.ts, (s1.ts - '00:01:00':Interval) as $expr1, s1._row_id], output_watermarks: [[s1.ts, $expr1]] }
│ └─StreamTableScan { table: s1, columns: [s1.id, s1.value, s1.ts, s1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s1._row_id], pk: [_row_id], dist: UpstreamHashShard(s1._row_id) }
└─StreamExchange { dist: HashShard(s2.id) }
└─StreamTableScan { table: s2, columns: [s2.id, s2.value, s2.ts, s2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s2._row_id], pk: [_row_id], dist: UpstreamHashShard(s2._row_id) }
Expand All @@ -239,9 +239,9 @@
└─StreamEowcOverWindow { window_functions: [count() OVER(PARTITION BY s2.value ORDER BY s2.ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─StreamEowcSort { sort_column: s2.ts }
└─StreamExchange { dist: HashShard(s2.value) }
└─StreamHashJoin [interval, append_only] { type: Inner, predicate: s1.id = s2.id AND (s1.ts >= s2.ts) AND ($expr1 <= s2.ts), conditions_to_clean_left_state_table: (s1.ts >= s2.ts), conditions_to_clean_right_state_table: ($expr1 <= s2.ts), output_watermarks: [s1.ts, s2.ts], output: [s1.id, s1.value, s2.id, s2.value, s1.ts, s2.ts, s1._row_id, s2._row_id] }
└─StreamHashJoin [interval, append_only] { type: Inner, predicate: s1.id = s2.id AND (s1.ts >= s2.ts) AND ($expr1 <= s2.ts), conditions_to_clean_left_state_table: (s1.ts >= s2.ts), conditions_to_clean_right_state_table: ($expr1 <= s2.ts), output_watermarks: [[s1.ts], [s2.ts]], output: [s1.id, s1.value, s2.id, s2.value, s1.ts, s2.ts, s1._row_id, s2._row_id] }
├─StreamExchange { dist: HashShard(s1.id) }
│ └─StreamProject { exprs: [s1.id, s1.value, s1.ts, (s1.ts - '00:01:00':Interval) as $expr1, s1._row_id], output_watermarks: [s1.ts, $expr1] }
│ └─StreamProject { exprs: [s1.id, s1.value, s1.ts, (s1.ts - '00:01:00':Interval) as $expr1, s1._row_id], output_watermarks: [[s1.ts, $expr1]] }
│ └─StreamTableScan { table: s1, columns: [s1.id, s1.value, s1.ts, s1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s1._row_id], pk: [_row_id], dist: UpstreamHashShard(s1._row_id) }
└─StreamExchange { dist: HashShard(s2.id) }
└─StreamTableScan { table: s2, columns: [s2.id, s2.value, s2.ts, s2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s2._row_id], pk: [_row_id], dist: UpstreamHashShard(s2._row_id) }
14 changes: 7 additions & 7 deletions src/frontend/planner_test/tests/testdata/output/expr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@
└─LogicalScan { table: t, columns: [t.v1, t._row_id, t._rw_timestamp] }
stream_plan: |-
StreamMaterialize { columns: [v1, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck, watermark_columns: [v1] }
└─StreamDynamicFilter { predicate: (t.v1 >= now), output_watermarks: [t.v1], output: [t.v1, t._row_id], cleaned_by_watermark: true }
└─StreamDynamicFilter { predicate: (t.v1 >= now), output_watermarks: [[t.v1]], output: [t.v1, t._row_id], cleaned_by_watermark: true }
├─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamNow { output: [now] }
Expand All @@ -498,19 +498,19 @@
└─LogicalScan { table: t, columns: [t.v1, t._row_id, t._rw_timestamp] }
stream_plan: |-
StreamMaterialize { columns: [v1, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck, watermark_columns: [v1] }
└─StreamDynamicFilter { predicate: (t.v1 >= $expr1), output_watermarks: [t.v1], output: [t.v1, t._row_id], cleaned_by_watermark: true }
└─StreamDynamicFilter { predicate: (t.v1 >= $expr1), output_watermarks: [[t.v1]], output: [t.v1, t._row_id], cleaned_by_watermark: true }
├─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [SubtractWithTimeZone(now, '00:00:02':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] }
└─StreamProject { exprs: [SubtractWithTimeZone(now, '00:00:02':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [[$expr1]] }
└─StreamNow { output: [now] }
- name: and of two now expression condition
sql: |
create table t (v1 timestamp with time zone, v2 timestamp with time zone);
select * from t where v1 >= now() and v2 >= now();
stream_plan: |-
StreamMaterialize { columns: [v1, v2, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck, watermark_columns: [v2] }
└─StreamDynamicFilter { predicate: (t.v2 >= now), output_watermarks: [t.v2], output: [t.v1, t.v2, t._row_id], cleaned_by_watermark: true }
├─StreamDynamicFilter { predicate: (t.v1 >= now), output_watermarks: [t.v1], output: [t.v1, t.v2, t._row_id], cleaned_by_watermark: true }
└─StreamDynamicFilter { predicate: (t.v2 >= now), output_watermarks: [[t.v2]], output: [t.v1, t.v2, t._row_id], cleaned_by_watermark: true }
├─StreamDynamicFilter { predicate: (t.v1 >= now), output_watermarks: [[t.v1]], output: [t.v1, t.v2, t._row_id], cleaned_by_watermark: true }
│ ├─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
│ └─StreamExchange { dist: Broadcast }
│ └─StreamNow { output: [now] }
Expand All @@ -527,8 +527,8 @@
select max(v1) as max_time from t group by v2 having max(v1) >= now();
stream_plan: |-
StreamMaterialize { columns: [max_time, t.v2(hidden)], stream_key: [t.v2], pk_columns: [t.v2], pk_conflict: NoCheck, watermark_columns: [max_time] }
└─StreamProject { exprs: [max(t.v1), t.v2], output_watermarks: [max(t.v1)] }
└─StreamDynamicFilter { predicate: (max(t.v1) >= now), output_watermarks: [max(t.v1)], output: [t.v2, max(t.v1)], cleaned_by_watermark: true }
└─StreamProject { exprs: [max(t.v1), t.v2], output_watermarks: [[max(t.v1)]] }
└─StreamDynamicFilter { predicate: (max(t.v1) >= now), output_watermarks: [[max(t.v1)]], output: [t.v2, max(t.v1)], cleaned_by_watermark: true }
├─StreamProject { exprs: [t.v2, max(t.v1)] }
│ └─StreamHashAgg { group_key: [t.v2], aggs: [max(t.v1), count] }
│ └─StreamExchange { dist: HashShard(t.v2) }
Expand Down
Loading
Loading