Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(optimizer): introduce watermark group #19894

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open

Conversation

stdrc
Copy link
Member

@stdrc stdrc commented Dec 23, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Background

Previously we don't record the derivation relation among different watermark columns. For instance, window_start and window_end generated by tumble/hop time window functions are always related to each other and the original event time column, however when we plan for an EOWC query, we lost this information and hence couldn't succeed if more than 1 of these exist (not pruned) at the same time.

Changes

This PR introduces a new way to represent watermark columns in plan nodes -- watermark groups. In this PR we organize watermark columns into several groups, in each of which, all watermark columns are related. With this, when planning EOWC queries, we can handle multiple watermark columns as long as they are in the same one group. For example in the following query, we won't warn our user that there're more than one watermark column any more.

create materialized view mv as
select
  foo, ts, window_start, window_end
from tumble(t, ts, interval '5 mins')
emit on window close;

Among the changes, ~400 LoC are real, others are planner tests. Basically the core changes all happen in StreamXxx::new, with watermark derivation logic unchanged. The main contribution is to determine which columns among all output watermark columns are related for every stream node.

Future work

  1. We still use FixedBitSet to record a flattened watermark_columns field in TableCatalog and TableDesc, and hence in Table protobuf. As a result, When we create MV on MV, all the watermark columns in the base MV will be considered belonging to different watermark groups no matter whether they are actually related. I will change the table catalog field later to fix this.
  2. We still can't do create mv as select window_start, window_end, count(*) as cnt from tumble(t, ts, ...) group by window_start, window_end emit on window close, because I don't want to have too many changes in this single PR. Will modify StreamHashAgg later to support it. (Resolved in feat(eowc): allow multiple watermark columns in eowc hash agg #19998)
  3. Since we don't support duplicated plan node reusing in streaming plan, when we have two side of a join both scanning the same table with join condition left.window = right.window, we cannot know that the two window columns are derived from the same source watermark column. This is because the watermark group ID is allocated and assigned during optimization phase and not persisted.

Checklist

  • I have written necessary rustdoc comments.
  • I have added necessary unit tests and integration tests.
  • I have added test labels as necessary.
  • I have added fuzzing tests or opened an issue to track them.
  • My PR contains breaking changes.
  • My PR changes performance-critical code, so I will run (micro) benchmarks and present the results.
  • My PR contains critical fixes that are necessary to be merged into the latest release.

Documentation

  • My PR needs documentation updates.
Release note

Copy link
Member Author

stdrc commented Dec 23, 2024

@stdrc stdrc force-pushed the rc/watermark-group branch from 7711584 to 68ac534 Compare December 23, 2024 07:48
@stdrc stdrc changed the title reorder feat(optimizer): introduce watermark group Dec 23, 2024
@stdrc stdrc force-pushed the rc/watermark-group branch from 43dcde2 to 60ee293 Compare December 31, 2024 07:34
@stdrc stdrc marked this pull request as ready for review December 31, 2024 17:00
@graphite-app graphite-app bot requested a review from a team December 31, 2024 17:21
Copy link
Contributor

@chenzl25 chenzl25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't support shared node in streaming plan (IIRC),

Do you want this self-join plan with StreamShare?

- id: self_join
before:
- create_sources
sql: |
select count(*) cnt from auction A join auction B on A.id = B.id where A.initial_bid = 1 and B.initial_bid = 2;
batch_plan: |-
BatchSimpleAgg { aggs: [sum0(count)] }
└─BatchExchange { order: [], dist: Single }
└─BatchSimpleAgg { aggs: [count] }
└─BatchHashJoin { type: Inner, predicate: id = id, output: [] }
├─BatchExchange { order: [], dist: HashShard(id) }
│ └─BatchFilter { predicate: (initial_bid = 1:Int32) }
│ └─BatchSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id] }
└─BatchExchange { order: [], dist: HashShard(id) }
└─BatchFilter { predicate: (initial_bid = 2:Int32) }
└─BatchSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id] }
stream_plan: |-
StreamMaterialize { columns: [cnt], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [sum0(count)] }
└─StreamSimpleAgg [append_only] { aggs: [sum0(count), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [count] }
└─StreamHashJoin [append_only] { type: Inner, predicate: id = id, output: [_row_id, id, _row_id] }
├─StreamExchange { dist: HashShard(id) }
│ └─StreamFilter { predicate: (initial_bid = 1:Int32) }
│ └─StreamShare { id: 4 }
│ └─StreamProject { exprs: [id, initial_bid, _row_id] }
│ └─StreamFilter { predicate: ((initial_bid = 1:Int32) OR (initial_bid = 2:Int32)) }
│ └─StreamRowIdGen { row_id_index: 10 }
│ └─StreamSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id] }
└─StreamExchange { dist: HashShard(id) }
└─StreamFilter { predicate: (initial_bid = 2:Int32) }
└─StreamShare { id: 4 }
└─StreamProject { exprs: [id, initial_bid, _row_id] }
└─StreamFilter { predicate: ((initial_bid = 1:Int32) OR (initial_bid = 2:Int32)) }
└─StreamRowIdGen { row_id_index: 10 }
└─StreamSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id] }

@stdrc stdrc force-pushed the rc/watermark-group branch from 6a5c106 to 0f81283 Compare January 2, 2025 07:34
Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has checked 5536 files.

Valid Invalid Ignored Fixed
2332 2 3202 0
Click to see the invalid file list
  • src/common/src/util/functional.rs
  • src/frontend/src/optimizer/property/watermark_columns.rs
Use this command to fix any missing license headers
```bash

docker run -it --rm -v $(pwd):/github/workspace apache/skywalking-eyes header fix

</details>

src/common/src/util/functional.rs Outdated Show resolved Hide resolved
src/frontend/src/optimizer/property/watermark_columns.rs Outdated Show resolved Hide resolved
Comment on lines +87 to +97
eowc_stream_plan: |-
StreamMaterialize { columns: [foo, win1, win2, t._row_id(hidden), t._row_id#1(hidden)], stream_key: [t._row_id, t._row_id#1, win1], pk_columns: [t._row_id, t._row_id#1, win1], pk_conflict: NoCheck, watermark_columns: [win1] }
└─StreamEowcSort { sort_column: $expr1 }
└─StreamProject { exprs: [(t.foo + $expr2) as $expr4, $expr1, $expr3, t._row_id, t._row_id], output_watermarks: [[$expr1], [$expr3]] }
└─StreamHashJoin [window, append_only] { type: Inner, predicate: $expr1 = $expr3, output_watermarks: [[$expr1], [$expr3]], output: [t.foo, $expr1, $expr2, $expr3, t._row_id, t._row_id] }
├─StreamExchange { dist: HashShard($expr1) }
│ └─StreamProject { exprs: [t.foo, TumbleStart(t.ts, '00:05:00':Interval) as $expr1, t._row_id], output_watermarks: [[$expr1]] }
│ └─StreamTableScan { table: t, columns: [t.foo, t.ts, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamExchange { dist: HashShard($expr3) }
└─StreamProject { exprs: [(t.foo + 1:Int32) as $expr2, TumbleStart(t.ts, '00:10:00':Interval) as $expr3, t._row_id], output_watermarks: [[$expr3]] }
└─StreamTableScan { table: t, columns: [t.foo, t.ts, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenzl25 In this plan, is it possible to use a shared StreamTableScan on the two sides?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use a cte to construct a share operator.

create materialized view v as  
with cte as (select foo, window_start as win from tumble(t, ts, interval '5 mins'))
select l.foo + r.foo as foo, l.win as win1, r.win as win2
from cte as l join cte as r on l.win = r.win;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Signed-off-by: Richard Chien <[email protected]>
stdrc added 2 commits January 3, 2025 13:51
Signed-off-by: Richard Chien <[email protected]>
Signed-off-by: Richard Chien <[email protected]>
@graphite-app graphite-app bot requested a review from a team January 3, 2025 06:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants