Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spool] Define equivalence between stages #14296

Merged
merged 14 commits into from
Nov 18, 2024
Merged

[spool] Define equivalence between stages #14296

merged 14 commits into from
Nov 18, 2024

Conversation

gortiz
Copy link
Contributor

@gortiz gortiz commented Oct 24, 2024

This PR includes the code that defines when two stages are equivalent and it is the first step to implement #14196.

Instead of implementing the ability to reuse common expressions in a single and very large PR, I decided to create this first one where:

  • The new code is not being called by production code (yet)
  • Classes are well tested and documented

My hope is that this will be easier to review.

@gortiz
Copy link
Contributor Author

gortiz commented Oct 24, 2024

cc @bziobrowski @vrajat

@codecov-commenter
Copy link

codecov-commenter commented Oct 24, 2024

Codecov Report

Attention: Patch coverage is 44.60784% with 113 lines in your changes missing coverage. Please review.

Project coverage is 63.75%. Comparing base (59551e4) to head (7758934).
Report is 1307 commits behind head on master.

Files with missing lines Patch % Lines
.../query/planner/logical/EquivalentStagesFinder.java 40.17% 52 Missing and 18 partials ⚠️
.../pinot/query/planner/plannode/PlanNodeVisitor.java 40.00% 20 Missing and 1 partial ⚠️
...che/pinot/query/planner/logical/GroupedStages.java 75.00% 7 Missing and 3 partials ⚠️
...anner/logical/ParentToChildrenStageCalculator.java 0.00% 8 Missing ⚠️
.../apache/pinot/query/planner/plannode/PlanNode.java 0.00% 3 Missing ⚠️
.../pinot/query/planner/plannode/MailboxSendNode.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14296      +/-   ##
============================================
+ Coverage     61.75%   63.75%   +2.00%     
- Complexity      207     1569    +1362     
============================================
  Files          2436     2665     +229     
  Lines        133233   146223   +12990     
  Branches      20636    22432    +1796     
============================================
+ Hits          82274    93221   +10947     
- Misses        44911    46087    +1176     
- Partials       6048     6915     +867     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.71% <44.60%> (+2.00%) ⬆️
java-21 63.64% <44.60%> (+2.01%) ⬆️
skip-bytebuffers-false 63.74% <44.60%> (+1.99%) ⬆️
skip-bytebuffers-true 63.59% <44.60%> (+35.87%) ⬆️
temurin 63.75% <44.60%> (+2.00%) ⬆️
unittests 63.74% <44.60%> (+2.00%) ⬆️
unittests1 55.51% <44.60%> (+8.62%) ⬆️
unittests2 34.09% <0.00%> (+6.36%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

import static org.testng.Assert.*;


public class EquivalentStagesFinderTest extends StagesTestBase {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confused by the these tests. Are the tests dependent on each other ? Every test adds another stage to _stages ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is explained in StagesTestBase. Basically each test starts from an clean environment where these _stageRoots is empty. That way each test is self-contained and easier to read.

Suggestion: We can rename plan() method to when(), which is a more common name in Given-When-Then tests. Do you think that would make it easier to read?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think plan() is better? It is worth the change?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get the logic now. when() is fine. What confused me initially is that I missed _stageRoots.clear() call after every test.

@@ -63,4 +63,150 @@ public interface PlanNodeVisitor<T, C> {
T visitExchange(ExchangeNode exchangeNode, C context);

T visitExplained(ExplainedNode node, C context);

/**
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you choose to add this class to the same file ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it is closely related to this visitor. Also I don't need to call it PlanNodeDepthFirstVisitor given it is already contained inside PlanNodeVisitor. But I'm open to move as a regular class if it is preferred.

* same nodes multiple times. The side effect of that is that the second argument for {@link #areEquivalent} must be
* a node that was already visited.
*/
private class NodeEquivalence implements PlanNodeVisitor<Boolean, PlanNode> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to step through patch to understand. Where is the return value (Boolean) checked ?
Also when are methods like visitAggregate called as I cannot find visitChildren calls in the NodeEquivalence class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the return value (Boolean) checked ?

The entry point here is areEquivalent. That method returns the Boolean returned by each visit method. And areEquivalent is being called by Visitor in its visitMailboxSend.

Also when are methods like visitAggregate called as I cannot find visitChildren calls in the NodeEquivalence class.

There is no visitChildren in this method. NodeEquivalence is a normal visitor, not a DepthFirstVisitor. Recursion is done in baseNode, which is called by all visit specific methods.

Copy link
Collaborator

@yashmayya yashmayya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Posting a partial review, I'm yet to take a look at the tests.

return group;
}

public Mutable removeStage(MailboxSendNode stage) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would we need to remove a stage from the set of grouped stages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the prune phase I was talking about before, although I'm not sure if that will be done in a copy fashion or actually modifying the object.

Basically in the next phase we are going to need to transforma a GroupedStages into a simplified expression tree where for each equivalent group G we create a new synthetic stage that send data to each stage g that belongs to G. But imagine a case like:

flowchart LR
    root --> S1
    S1 --> S2
    S1 --> S5
    S2 --> S3[S3 Read A]
    S2 --> S4[S4 Read B]
    S5 --> S6[S6 Read A]
    S5 --> S7[S7 Read B]
Loading

Where:

  • Stage 3 is equivalent to Stage 6
  • Stage 4 is equivalent to Stage 7
  • Stage 2 is equivalent to Stage 5

We wouldn't want to create synthetic classes for each of these equivalence groups because in the end all (S3, S6) and (S4, S7) could be pruned once (S2, S5) creates its own equivalence.

The final plan should be something like:

flowchart LR
    root --> S1
    S1 --> S2
    S1 --> S5
    S2 --> N1
    S5 --> N1

    N1 --> N2[N2 Read A]
    N1 --> N3[N2 Read B]
Loading

Where Ns are the new synthetic stages.

One way to implement this is to remove stages that will be pruned it its parent is equivalent. In our case S3, S4, S6 and S7 would be pruned.

I'm not 100% sure if this method will be called in the final, but at least in my first draft it is useful

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If S2 and S5 are equivalent, why will both of them be retained in the final plan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. We need to keep them, but we also modify them. It is explained in the design document, but basically there are cases where we need to keep them. For example imagine that the sender node in S2 (the one that sends to S1) uses hash distribution while the sender of S5 sends using random. We could consider them not equivalent but it is better to just keep both stages, schedule them in a way they are running in the same servers than N1 and then apply each distribution.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't they still be replaced by new stages? The new ones will retain the sender nodes from the original but the rest of the stage will be replaced by a single receive node which gets data from the synthetic stage that contains all the other nodes (apart from the sender) from the equivalent stages right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, S2 and S5 will change internally so we could consider them new as well. It helps me to think that they are not new but they have been modified, but this is just a concept level, not programatically (they will be different objects)

Copy link
Collaborator

@yashmayya yashmayya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the new test framework for query plan stages @gortiz! The tests using the framework are so pleasant to read 😄

return group;
}

public Mutable removeStage(MailboxSendNode stage) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If S2 and S5 are equivalent, why will both of them be retained in the final plan?

Comment on lines +61 to +71
when(
join(
exchange(1, tableScan("T1")),
exchange(2, tableScan("T2"))
)
);
GroupedStages.Mutable mutable = new GroupedStages.Mutable()
.addNewGroup(stage(0))
.addNewGroup(stage(1))
.addToGroup(stage(0), stage(2));
assertEquals(mutable.toString(), "[[0, 2], [1]]");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests in these classes probably don't need to use the StagesTestBase framework right? For instance, here it's pretty confusing because stage 0 and stage 2 are not actually equivalent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow you. StagesTestBase is just an API to create these nodes. It doesn't care about correctness. Same here. GroupedStagesTest doesn't care about what being equivalent mean. It is just a matemathical partition of a set of nodes. How nodes are partition is defined in EquivalentStagesFinder.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that but my point was that the test looks misleading because we're asserting that the send node above the join node is equivalent to the send node above one of the join node's input table scan nodes. Since we're only testing GroupedStages here and we don't care about the definition of equivalence like you said, wouldn't it be clearer to use synthetic MailboxSendNodes here (mocks or otherwise)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have a different perspective. This tests is not testing the semantic of the equivalence. It is just testing the API of GroupedStages. In here you can do crazy things that doesn't make any sense. We are just testing that the API works in the same way we can create a test on a java.util.list that verifies that if we have a list of [1, 2] and we call add(1, 3) the result will be [1, 3, 2]. Whether that list is later being used to store ints order by their value or not is not important for the list and the same happens in this GroupedStages class.

* {@link #tableScan(String)} and others to chain instances of this class.
*/
public interface ChildBuilder<P extends PlanNode> {
P build(int stageId, @Nullable DataSchema dataSchema, @Nullable PlanNode.NodeHint hints);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't currently being used anywhere directly? All external uses seem to be through SimpleChildBuilder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it is not used externally and probably should never be. But we need it to create the machinary that runs withDataSchema and withHints

@yashmayya
Copy link
Collaborator

I think this branch needs to be rebased / merged with master, all the builds are currently failing (at least one reason seems to be conflict with #14294).

return group;
}

public Mutable removeStage(MailboxSendNode stage) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't they still be replaced by new stages? The new ones will retain the sender nodes from the original but the rest of the stage will be replaced by a single receive node which gets data from the synthetic stage that contains all the other nodes (apart from the sender) from the equivalent stages right?

Copy link
Collaborator

@yashmayya yashmayya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I just have one optional suggestion on GroupedStagesTest.

Comment on lines +61 to +71
when(
join(
exchange(1, tableScan("T1")),
exchange(2, tableScan("T2"))
)
);
GroupedStages.Mutable mutable = new GroupedStages.Mutable()
.addNewGroup(stage(0))
.addNewGroup(stage(1))
.addToGroup(stage(0), stage(2));
assertEquals(mutable.toString(), "[[0, 2], [1]]");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that but my point was that the test looks misleading because we're asserting that the send node above the join node is equivalent to the send node above one of the join node's input table scan nodes. Since we're only testing GroupedStages here and we don't care about the definition of equivalence like you said, wouldn't it be clearer to use synthetic MailboxSendNodes here (mocks or otherwise)?

)
);
GroupedStages result = EquivalentStagesFinder.findEquivalentStages(stage(0));
assertEquals(result.toString(), "[[0], [1, 2]]");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this notation mean that 1, 2 are equivalent and therefore grouped together ? In the previous test, [[0], [1], [2]] implies that there no equivalent stages ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly (to both questions). And each group is sorted by their first element (that is why [0] is shown before [1, 2], because 0 < 1).

I hope tests are easy to read once that is clear.

@Jackie-Jiang Jackie-Jiang added feature multi-stage Related to the multi-stage query engine labels Nov 16, 2024
@gortiz gortiz merged commit f61f47b into apache:master Nov 18, 2024
21 checks passed
davecromberge pushed a commit to davecromberge/pinot that referenced this pull request Nov 22, 2024
Add some classes to calculate stage equivalence
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature multi-stage Related to the multi-stage query engine
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants