Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[proposal] Reuse common expressions in a query #14196

Open
5 of 8 tasks
gortiz opened this issue Oct 9, 2024 · 10 comments
Open
5 of 8 tasks

[proposal] Reuse common expressions in a query #14196

gortiz opened this issue Oct 9, 2024 · 10 comments
Assignees
Labels
Design Review multi-stage Related to the multi-stage query engine

Comments

@gortiz
Copy link
Contributor

gortiz commented Oct 9, 2024

Introduction

In multi-stage query engine it is not very difficult to create queries that end up reading from the same table or executing the same join twice. This is specially easy when writing a SQL query using WITH expressions, but that is not the only case.

For example, a query like:

SELECT * 
FROM T1 
JOIN T2 as t2first 
    ON T1.col1 = t2first.col2 
JOIN T2 as t2second 
    ON t2first.col3 = t2second.col3

Generates the following plan:

flowchart BT
    J2([JOIN 2])
    J1([JOIN 1])
    S1([Scan T1])
    S2([Scan T2])
    S22([Scan T2])
    
    J2 --> J1
    S1 --> J2
    S2 --> J2
    S22 --> J1
Loading

In this case, we have two instances of Scan T2, which means it is executed twice and therefore the table is scanned twice. That may be already expensive, but it gets even worse when the subtree is more complex. For example imagine if it was a join or an aggregation.
To avoid this problem, we can reuse the subtree and therefore read T2 only once. In our example, the plan should be something like:

flowchart BT
    J2([JOIN 2])
    J1([JOIN 1])
    S1([Scan T1])
    S2([Scan T2])
    
    J2 --> J1
    S1 --> J2
    S2 --> J2
    S2 --> J1
Loading

This feature is supported in some databases and it is known as Spool in Calcite.

Design

The design document can be found here. This document includes some issues we can expect and how we could resolve them.

Current proposal

The proposal consider that the equivalence unit is the stage. We are not going to look for equivalent individual expressions in the tree but for equivalent stages. If at least two stages are found to be equivalent, then a new stage is created. This stage is equivalent to the equivalent stages in all operators but the mailbox send operator. The equivalent stages are then substituted by new stages with the same id but simplified to be a pair of receive and send mailboxes. The newly created stage broadcast all rows to the simplified stages.

For example, this query:

flowchart RL
    R([Root])
    J2([JOIN 2])
    J1([JOIN 1])
    S1([Scan T1])
    S2([Scan T2])

    subgraph Stage 1
        R
        Receiver1([Receive 1])
        Receiver1 --> R
    end

    subgraph Stage 2
        J1
        Sender2([Send 2])
        Receiver21([Receive 2.1])
        Receiver22([Receive 2.2])
        J1 --> Sender2
        Sender2 --> Receiver1
        Receiver21 -- hashtable --> J1
        Receiver22 -- stream --> J1
    end

    subgraph Stage 3
        J2
        Sender3([Send 3])
        Receiver31([Receive 3.1])
        Receiver32([Receive 3.2])

        J2 --> Sender3
        Sender3 -- broadcast --> Receiver21
        Receiver31 -- stream --> J2
        Receiver32 -- hashtable --> J2
    end

    subgraph Stage 4
        S1
        Sender4([Send 4])
        Sender4 -- random --> Receiver31
        S1 --> Sender4
    end

    subgraph Stage 5
        S2 --> Sender5
        Sender5([Send 5])
        
        Sender5 -- random --> Receiver22
    end
    
    subgraph Stage 6
        T22([Scan T2])
        Sender6([Send 6])
        Sender6 -- broadcast --> Receiver32
        T22 --> Sender6
    end
Loading

Will be transformed into:

flowchart RL
    R([Root])
    J2([JOIN 2])
    J1([JOIN 1])
    S1([Scan T1])
    S2([Scan T2])

    subgraph Stage 1
        R
        Receiver1([Receive 1])
        Receiver1 --> R
    end

    subgraph Stage 2
        J1
        Sender2([Send 2])
        Receiver21([Receive 2.1])
        Receiver22([Receive 2.2])
        J1 --> Sender2
        Sender2 --> Receiver1
        Receiver21 -- hashtable --> J1
        Receiver22 -- stream --> J1
    end
    
    subgraph Stage 3
        J2
        Sender3([Send 3])
        Receiver31([Receive 3.1])
        Receiver32([Receive 3.2])

        J2 --> Sender3
        Sender3 -- broadcast --> Receiver21
        Receiver31 -- stream --> J2
        Receiver32 -- hashtable --> J2
    end

    subgraph Stage 4
        S1
        Sender4([Send 4])
        Sender4 -- random --> Receiver31
        S1 --> Sender4
    end

    subgraph Same Server

    subgraph Stage 6
        Receiver6([Receiver 6])
        Sender6([Send 6])

        Receiver6 --> Sender6
        Sender6 -- broadcast --> Receiver32
    end

    subgraph Stage 7
        Receiver7([Receiver 6])
        Sender7([Send 6])

        Receiver7 --> Sender7
        Sender7 -- random --> Receiver22
    end

    subgraph Stage 5
        Sender5([Send 5])
        
        S2 --> Sender5
        Sender5 -- broadcast in memory --> Receiver7
        Sender5 -- broadcast in memory --> Receiver6
    end
    end
Loading

Challenges

Blocking and buffering

Given the blocking nature of some of our operators, some queries may end up in a inter-block when this feature is used. The design document describes one of this cases. The way we can face this problem is to buffer the data, probably offheap or even on disk. Alternatively we could just fail in this scenarios.

TODO list

@gortiz gortiz self-assigned this Oct 9, 2024
@gortiz gortiz added Design Review multi-stage Related to the multi-stage query engine labels Oct 9, 2024
@ankitsultana
Copy link
Contributor

@gortiz : can you enable comment access on the doc?

@gortiz
Copy link
Contributor Author

gortiz commented Oct 11, 2024

Let's try to trust internet and open comments for everyone :D

@ankitsultana
Copy link
Contributor

@gortiz : do you folks have some numbers which show the performance gains with this approach? And specifically which queries this will help?

I have seen CTE reuse in query engines like Presto, but the bottlenecks there would be very different than what we have in most of the Pinot workloads (at least speaking from an Uber point of view). I suspect this can help if you have tiered storage (esp. for the Startree tiered storage plugin), but are there any other cases?

@gortiz
Copy link
Contributor Author

gortiz commented Oct 28, 2024

I've seen several cases where this feature would help in Pinot queries written by our customers. Remember that this is not limited to WITH expressions (but common expressions in the relational algebra) and that includes intermediate stages. For example there are cases where people use the same join twice in a query. If were able to reuse these computations we would divide the number of rows we need to read from tables, but also the number of workers and allocation we need in intermediate stages.

We don't have actual numbers we can share, but we have some actual cases where an ideal reduce system would improve latency (and/or used workers) linearly.

@gortiz
Copy link
Contributor Author

gortiz commented Oct 28, 2024

BTW, in the document I also mention apache/hive#5249, which tries to apply the same in Hive

I'm not familiar enough with how materialized views are implemented in Hive and Presto, but I'm assuming these are batch processes that will end up adding milliseconds to the latency of the query. In our case we will try to reuse the current mailbox system and only buffer on cases like the one explained in the document. Even in that case, latency should not be affected that much given the buffer will be consumed as a queue and therefore we should be able to apply different techniques to reduce the disk IO (if even needed).

@gortiz
Copy link
Contributor Author

gortiz commented Nov 19, 2024

In #14495 the stage replacer is introduced. The next PR will probably include the changes in the worker protocol and operators that will let us actually apply the optimization

@siddharthteotia
Copy link
Contributor

How far are we with this feature? I am interested in helping with reviews (while not slowing it down)

@gortiz
Copy link
Contributor Author

gortiz commented Jan 16, 2025

It was finally implemented at #14507. #14672 was not merged yet, but it is a cheap way to solve the theoretical blocking issue documented in the doc. We didn't find a case where that blocking issue happened, so we may not need it (or we can wait until we have a better way to store buffer blocks offline).

Spools/reuse CTE is still an experimental feature. Right now, it can be enabled with a hint, so it is safe to try everywhere. In the future, we also want to create a new join optimization that merges this feature and pipeline breaker to create something similar to Trino's dynamic filtering. At StarTree, we are still deciding what we will focus on during the following months, but this may be one of the features we want to implement. If this interests you, working together on this dynamic filtering feature would be great.

@ankitsultana
Copy link
Contributor

Don't we already support dynamic filtering with broadcast? Or do you folks want it to be done automatically via a CBO? https://github.com/apache/pinot/blob/master/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java#L123

@gortiz
Copy link
Contributor Author

gortiz commented Jan 16, 2025

Please let me know if I'm wrong, but the dynamic broadcast is currently limited to semi-joins. Am I right?

What we would like to do is to apply dynamic filtering even for inner-joins where the right relation is being used to project columns (as it is used in Trino). For example:

flowchart BT
    InnerJoin
    ExchangeLeft
    ExchangeRight

    ScanA
    ScanB

    ExtraB[...]

    ExchangeLeft --> InnerJoin
    ScanA --> ExchangeLeft
    ExchangeRight --> InnerJoin
    ExtraB --> ExchangeRight
    ScanB --> ExtraB

Loading

Will be transformed into something like:

flowchart BT
    InnerJoin
    ExchangeLeft
    ExchangeRight["ExchangeRight (spool)"]

    ScanA
    ScanB

    ExtraB[...]

    ExchangeLeft --> InnerJoin
    ExchangeRight --> InnerJoin
    ExtraB --> ExchangeRight
    ScanB --> ExtraB

    FilterIn --> ExchangeLeft
    ScanA --> FilterIn
    ExchangeRight -->|only keys and pipeline breaker| FilterIn
   

Loading

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Design Review multi-stage Related to the multi-stage query engine
Projects
None yet
Development

No branches or pull requests

3 participants