diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java index a52d3038b50..36684249391 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java @@ -22,6 +22,7 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.hint.RelHint; +import org.apache.pinot.query.planner.plannode.JoinNode; /** @@ -114,21 +115,27 @@ public static String getJoinStrategyHint(Join join) { PinotHintOptions.JoinHintOptions.JOIN_STRATEGY); } - public static boolean useLookupJoinStrategy(@Nullable String joinStrategyHint) { - return LOOKUP_JOIN_STRATEGY.equalsIgnoreCase(joinStrategyHint); + @Nullable + public static JoinNode.JoinStrategy getJoinStrategy(Join join) { + String joinStrategyHint = getJoinStrategyHint(join); + if (joinStrategyHint == null) { + return null; + } + if (HASH_JOIN_STRATEGY.equalsIgnoreCase(joinStrategyHint)) { + return JoinNode.JoinStrategy.HASH; + } + if (LOOKUP_JOIN_STRATEGY.equalsIgnoreCase(joinStrategyHint)) { + return JoinNode.JoinStrategy.LOOKUP; + } + if (BROADCAST_JOIN_STRATEGY.equalsIgnoreCase(joinStrategyHint)) { + return JoinNode.JoinStrategy.BROADCAST; + } + throw new IllegalArgumentException("Unsupported join strategy hint: " + joinStrategyHint); } // TODO: Consider adding a Join implementation with join strategy. public static boolean useLookupJoinStrategy(Join join) { - return useLookupJoinStrategy(getJoinStrategyHint(join)); - } - - public static boolean useBroadcastJoinStrategy(@Nullable String joinStrategyHint) { - return BROADCAST_JOIN_STRATEGY.equalsIgnoreCase(joinStrategyHint); - } - - public static boolean useBroadcastJoinStrategy(Join join) { - return useBroadcastJoinStrategy(getJoinStrategyHint(join)); + return LOOKUP_JOIN_STRATEGY.equalsIgnoreCase(getJoinStrategyHint(join)); } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java index e331e5ba308..9664fb988b1 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.calcite.rel.rules; +import com.google.common.base.Preconditions; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.RelDistributions; @@ -27,6 +28,7 @@ import org.apache.calcite.tools.RelBuilderFactory; import org.apache.pinot.calcite.rel.hint.PinotHintOptions; import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange; +import org.apache.pinot.query.planner.plannode.JoinNode; /** @@ -54,19 +56,11 @@ public void onMatch(RelOptRuleCall call) { JoinInfo joinInfo = join.analyzeCondition(); RelNode newLeft; RelNode newRight; - String joinStrategyHint = PinotHintOptions.JoinHintOptions.getJoinStrategyHint(join); - if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(joinStrategyHint)) { - // Lookup join - add local exchange on the left side - newLeft = PinotLogicalExchange.create(left, RelDistributions.SINGLETON); - newRight = right; - } else if (PinotHintOptions.JoinHintOptions.useBroadcastJoinStrategy(joinStrategyHint)) { - // Broadcast join - add local exchange on the left side, broadcast exchange on the right side - newLeft = PinotLogicalExchange.create(left, RelDistributions.SINGLETON); - newRight = PinotLogicalExchange.create(right, RelDistributions.BROADCAST_DISTRIBUTED); - } else { - // Regular join - add exchange on both sides + JoinNode.JoinStrategy joinStrategy = PinotHintOptions.JoinHintOptions.getJoinStrategy(join); + if (joinStrategy == null) { + // Default join - add exchange on both sides if (joinInfo.leftKeys.isEmpty()) { - // Broadcast the right side if there is no join key + // Randomly distribute the left side, broadcast the right side if there is no join key newLeft = PinotLogicalExchange.create(left, RelDistributions.RANDOM_DISTRIBUTED); newRight = PinotLogicalExchange.create(right, RelDistributions.BROADCAST_DISTRIBUTED); } else { @@ -74,6 +68,28 @@ public void onMatch(RelOptRuleCall call) { newLeft = PinotLogicalExchange.create(left, RelDistributions.hash(joinInfo.leftKeys)); newRight = PinotLogicalExchange.create(right, RelDistributions.hash(joinInfo.rightKeys)); } + } else { + switch (joinStrategy) { + case HASH: + // Hash join - use hash exchange on both sides + Preconditions.checkState(!joinInfo.leftKeys.isEmpty(), "Hash join requires join keys"); + newLeft = PinotLogicalExchange.create(left, RelDistributions.hash(joinInfo.leftKeys)); + newRight = PinotLogicalExchange.create(right, RelDistributions.hash(joinInfo.rightKeys)); + break; + case LOOKUP: + // Lookup join - use local exchange on the left side, no exchange on the right side + Preconditions.checkState(!joinInfo.leftKeys.isEmpty(), "Lookup join requires join keys"); + newLeft = PinotLogicalExchange.create(left, RelDistributions.SINGLETON); + newRight = right; + break; + case BROADCAST: + // Broadcast join - use local exchange on the left side, broadcast exchange on the right side + newLeft = PinotLogicalExchange.create(left, RelDistributions.SINGLETON); + newRight = PinotLogicalExchange.create(right, RelDistributions.BROADCAST_DISTRIBUTED); + break; + default: + throw new IllegalStateException("Unsupported join strategy: " + joinStrategy); + } } // TODO: Consider creating different JOIN Rel for each join strategy diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java index 694cf5d21a1..b18cdea7645 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java @@ -312,13 +312,11 @@ private JoinNode convertLogicalJoin(LogicalJoin join) { } // Check if the join hint specifies the join strategy - JoinNode.JoinStrategy joinStrategy; - String joinStrategyHint = PinotHintOptions.JoinHintOptions.getJoinStrategyHint(join); - if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(joinStrategyHint)) { - joinStrategy = JoinNode.JoinStrategy.LOOKUP; - - // Run some validations for lookup join - Preconditions.checkArgument(!joinInfo.leftKeys.isEmpty(), "Lookup join requires join keys"); + JoinNode.JoinStrategy joinStrategy = PinotHintOptions.JoinHintOptions.getJoinStrategy(join); + if (joinStrategy == null) { + joinStrategy = JoinNode.JoinStrategy.HASH; + } else if (joinStrategy == JoinNode.JoinStrategy.LOOKUP) { + // Run some validations for lookup join: // Right table should be a dimension table, and the right input should be an identifier only ProjectNode over // TableScanNode. RelNode rightInput = PinotRuleUtils.unboxRel(join.getRight()); @@ -334,11 +332,6 @@ private JoinNode convertLogicalJoin(LogicalJoin join) { Preconditions.checkState(projectInput instanceof TableScan, "Right input for lookup join must be a Project over TableScan, got Project over: %s", projectInput.getClass().getSimpleName()); - } else if (PinotHintOptions.JoinHintOptions.useBroadcastJoinStrategy(joinStrategyHint)) { - joinStrategy = JoinNode.JoinStrategy.BROADCAST; - } else { - // TODO: Consider adding DYNAMIC_BROADCAST as a separate join strategy - joinStrategy = JoinNode.JoinStrategy.HASH; } return new JoinNode(DEFAULT_STAGE_ID, dataSchema, NodeHint.fromRelHints(join.getHints()), inputs, joinType, diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java index 5d55e43b4f2..c415528d137 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java @@ -101,6 +101,7 @@ public int hashCode() { return Objects.hash(super.hashCode(), _joinType, _leftKeys, _rightKeys, _nonEquiConditions, _joinStrategy); } + // TODO: Consider adding DYNAMIC_BROADCAST as a separate join strategy public enum JoinStrategy { // HASH is the default equi-join strategy, where both left and right tables are hash partitioned on join keys, then // shuffled to the same worker to perform the join. diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java index 0218282e21d..7556993876a 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java @@ -97,8 +97,8 @@ private void assignWorkersToNonRootFragment(PlanFragment fragment, DispatchableP Map metadataMap = context.getDispatchablePlanMetadataMap(); DispatchablePlanMetadata metadata = metadataMap.get(fragment.getFragmentId()); boolean leafPlan = isLeafPlan(metadata); - Integer childIdWithLocalExchange = findLocalExchange(children); - if (childIdWithLocalExchange != null) { + int childIdWithLocalExchange = findLocalExchange(children); + if (childIdWithLocalExchange >= 0) { // If there is a local exchange (child with SINGLETON distribution), use the same worker assignment to avoid // shuffling data. // TODO: @@ -125,11 +125,9 @@ private void assignWorkersToNonRootFragment(PlanFragment fragment, DispatchableP } /** - * Returns the index of the child fragment that has a local exchange (SINGLETON distribution), or {@code null} if none - * exists. + * Returns the index of the child fragment that has a local exchange (SINGLETON distribution), or -1 if none exists. */ - @Nullable - private Integer findLocalExchange(List children) { + private int findLocalExchange(List children) { int numChildren = children.size(); for (int i = 0; i < numChildren; i++) { PlanNode childPlanNode = children.get(i).getFragmentRoot(); @@ -138,7 +136,7 @@ private Integer findLocalExchange(List children) { return i; } } - return null; + return -1; } private static boolean isLeafPlan(DispatchablePlanMetadata metadata) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java index 98a35b1c89f..e0f62907cac 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java @@ -141,7 +141,6 @@ public ObjectNode visitJoin(JoinNode node, Void context) { if (node.getJoinStrategy() == JoinNode.JoinStrategy.LOOKUP) { return recursiveCase(node, MultiStageOperator.Type.LOOKUP_JOIN); } else { - // TODO: Consider renaming this operator type. It handles multiple join strategies. return recursiveCase(node, MultiStageOperator.Type.HASH_JOIN); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java index 37664ebeea6..e546df42296 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java @@ -181,7 +181,6 @@ public MultiStageOperator visitJoin(JoinNode node, OpChainExecutionContext conte if (node.getJoinStrategy() == JoinNode.JoinStrategy.LOOKUP) { return new LookupJoinOperator(context, leftOperator, rightOperator, node); } else { - // TODO: Consider renaming this operator. It handles multiple join strategies. return new HashJoinOperator(context, leftOperator, left.getDataSchema(), rightOperator, node); } }