Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support local replicated join and local exchange parallelism #14893

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

Jackie-Jiang
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang commented Jan 22, 2025

Related to #14518

Added a new table hint:

  • is_replicated (boolean)

Support local replicated join by configuring both side as local distribution, and also hint right table as replicated:

SELECT /*+ joinOptions(left_distribution_type = 'local', right_distribution_type = 'local') */ a.col1, b.col2 FROM a JOIN b /*+ tableOptions(is_replicated='true') */ ON a.col1 = b.col1

Also support parallelism for local exchange to increase the parallelism for intermediate stage with table hint partition_parallelism.

@Jackie-Jiang Jackie-Jiang added feature documentation release-notes Referenced by PRs that need attention when compiling the next release notes multi-stage Related to the multi-stage query engine labels Jan 22, 2025
@codecov-commenter
Copy link

codecov-commenter commented Jan 22, 2025

Codecov Report

Attention: Patch coverage is 80.25210% with 47 lines in your changes missing coverage. Please review.

Project coverage is 63.73%. Comparing base (59551e4) to head (f5095e3).
Report is 1621 commits behind head on master.

Files with missing lines Patch % Lines
.../org/apache/pinot/query/routing/WorkerManager.java 76.05% 23 Missing and 11 partials ⚠️
...che/pinot/broker/routing/BrokerRoutingManager.java 0.00% 9 Missing ⚠️
...ery/planner/physical/MailboxAssignmentVisitor.java 93.22% 0 Missing and 4 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14893      +/-   ##
============================================
+ Coverage     61.75%   63.73%   +1.98%     
- Complexity      207     1471    +1264     
============================================
  Files          2436     2708     +272     
  Lines        133233   151631   +18398     
  Branches      20636    23423    +2787     
============================================
+ Hits          82274    96638   +14364     
- Misses        44911    47724    +2813     
- Partials       6048     7269    +1221     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.70% <80.25%> (+1.99%) ⬆️
java-21 63.61% <80.25%> (+1.98%) ⬆️
skip-bytebuffers-false 63.71% <80.25%> (+1.97%) ⬆️
skip-bytebuffers-true 63.59% <80.25%> (+35.86%) ⬆️
temurin 63.73% <80.25%> (+1.98%) ⬆️
unittests 63.72% <80.25%> (+1.98%) ⬆️
unittests1 56.30% <83.40%> (+9.41%) ⬆️
unittests2 34.00% <0.00%> (+6.27%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.


List<String> getSegments(BrokerRequest brokerRequest) {
Set<String> selectedSegments = _segmentSelector.select(brokerRequest);
if (!selectedSegments.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: isn't this if a bit redundant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to short circuit it. This is the same as calculateRouting()

@@ -55,6 +56,12 @@ public interface RoutingManager {
@Nullable
RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId);

/**
* Returns the segments that are relevant for the given broker request.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's specify here what null means.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@gortiz gortiz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would need more time to review the code and, ideally, some explanation of the decisions you made here. The changes look to me more complex than I would have expected. We are deviating from the standard Calcite semantics here (i.e., with singleton + parallelism), and I'm not sure why we need to do that.

What I would expect in this situation is that the join node uses the broadcast distribution for its right side (meaning that each incarnation of the join will see all the data). The main difference with the regular broadcast is that instead of picking one server per segment and broadcasting from them, we pick all servers that will execute the left side and read from them, sending the information to its own node.


private PinotLogicalExchange(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RelDistribution distribution,
PinotRelExchangeType exchangeType) {
PinotRelExchangeType exchangeType, List<Integer> keys) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which cases the keys will be different than distribution.getKeys? What is in fact the meaning of having keys = {X, Y, Z} and a distribution like random that doesn't support keys? Wouldn't be better to change the distribution value depending on the keys? If we want to use a distribution + keys that is not permitted by Calcite we can create our own implementation of RelDistribution

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me put more comments explaining this. We use SINGLETON to represent local exchange, but we also want to support parallelism for local exchange where keys are needed. We can revisit this as we add more custom distribution types

Comment on lines +105 to +106
// NOTE: We use SINGLETON to represent local distribution. Add keys to the exchange because we might want to
// switch it to HASH distribution to increase parallelism. See MailboxAssignmentVisitor for details.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading RelDistribution.Type, shouldn't this be broadcast? The definition of broadcast is:

There are multiple instances of the stream, and all records appear in each instance

While the definition of singleton is:

There is only one instance of the stream. It sees all records.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I don't get why we set DistributionType as SINGLETON in cases where we want to use HASH.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not broadcast because we don't want to send data to other servers. This is not strictly SINGLETON if we want to add parallelism to local exchange (split one block into multiple and spread them into multiple operators). If there is no extra parallelism (1-to-1 distribution), then it is SINGLETON.

@@ -42,7 +40,7 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
public Void process(PlanNode node, DispatchablePlanContext context) {
if (node instanceof MailboxSendNode) {
MailboxSendNode sendNode = (MailboxSendNode) node;
int senderStageId = sendNode.getStageId();
Integer senderStageId = sendNode.getStageId();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Integer? BaseNode.getStageId() is always a int, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, but using Integer can avoid a lot of boxing. I changed this to Integer to align with receiverStageId

@Jackie-Jiang
Copy link
Contributor Author

I would need more time to review the code and, ideally, some explanation of the decisions you made here. The changes look to me more complex than I would have expected. We are deviating from the standard Calcite semantics here (i.e., with singleton + parallelism), and I'm not sure why we need to do that.

What I would expect in this situation is that the join node uses the broadcast distribution for its right side (meaning that each incarnation of the join will see all the data). The main difference with the regular broadcast is that instead of picking one server per segment and broadcasting from them, we pick all servers that will execute the left side and read from them, sending the information to its own node.

Broadcast is supported in #14797, but there could still be data shuffling.
With this PR, we can completely eliminate data shuffling, and right table is always served from the same server.
Regarding singleton + parallelism, this is needed to increase parallelism for intermediate stage. If we do singleton (1-to-1 exchange), there will be same number of intermediate operators as leaf operators, which is not good enough in a lot of cases. We usually run only one leaf operator per server, but we want to run more intermediate operators to fully utilize CPU.

@Jackie-Jiang Jackie-Jiang requested a review from xiangfu0 January 24, 2025 03:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation feature multi-stage Related to the multi-stage query engine release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants