Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang committed Jan 15, 2025
1 parent bd0befc commit 01309b4
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.pinot.query.planner.plannode.JoinNode;


/**
Expand Down Expand Up @@ -114,21 +115,30 @@ public static String getJoinStrategyHint(Join join) {
PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
}

public static boolean useLookupJoinStrategy(@Nullable String joinStrategyHint) {
return LOOKUP_JOIN_STRATEGY.equalsIgnoreCase(joinStrategyHint);
@Nullable
public static JoinNode.JoinStrategy getJoinStrategy(Join join) {
String joinStrategyHint = getJoinStrategyHint(join);
if (joinStrategyHint == null) {
return null;
}
if (HASH_JOIN_STRATEGY.equalsIgnoreCase(joinStrategyHint)) {
return JoinNode.JoinStrategy.HASH;
}
if (LOOKUP_JOIN_STRATEGY.equalsIgnoreCase(joinStrategyHint)) {
return JoinNode.JoinStrategy.LOOKUP;
}
if (BROADCAST_JOIN_STRATEGY.equalsIgnoreCase(joinStrategyHint)) {
return JoinNode.JoinStrategy.BROADCAST;
}
// NOTE: Returning null instead of throwing exception here because "dynamic_broadcast" is also a valid join
// strategy hint
// TODO: Consider adding DYNAMIC_BROADCAST as a separate join strategy
return null;
}

// TODO: Consider adding a Join implementation with join strategy.
public static boolean useLookupJoinStrategy(Join join) {
return useLookupJoinStrategy(getJoinStrategyHint(join));
}

public static boolean useBroadcastJoinStrategy(@Nullable String joinStrategyHint) {
return BROADCAST_JOIN_STRATEGY.equalsIgnoreCase(joinStrategyHint);
}

public static boolean useBroadcastJoinStrategy(Join join) {
return useBroadcastJoinStrategy(getJoinStrategyHint(join));
return LOOKUP_JOIN_STRATEGY.equalsIgnoreCase(getJoinStrategyHint(join));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.calcite.rel.rules;

import com.google.common.base.Preconditions;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelDistributions;
Expand All @@ -27,6 +28,7 @@
import org.apache.calcite.tools.RelBuilderFactory;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;
import org.apache.pinot.query.planner.plannode.JoinNode;


/**
Expand Down Expand Up @@ -54,26 +56,40 @@ public void onMatch(RelOptRuleCall call) {
JoinInfo joinInfo = join.analyzeCondition();
RelNode newLeft;
RelNode newRight;
String joinStrategyHint = PinotHintOptions.JoinHintOptions.getJoinStrategyHint(join);
if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(joinStrategyHint)) {
// Lookup join - add local exchange on the left side
newLeft = PinotLogicalExchange.create(left, RelDistributions.SINGLETON);
newRight = right;
} else if (PinotHintOptions.JoinHintOptions.useBroadcastJoinStrategy(joinStrategyHint)) {
// Broadcast join - add local exchange on the left side, broadcast exchange on the right side
newLeft = PinotLogicalExchange.create(left, RelDistributions.SINGLETON);
newRight = PinotLogicalExchange.create(right, RelDistributions.BROADCAST_DISTRIBUTED);
} else {
// Regular join - add exchange on both sides
JoinNode.JoinStrategy joinStrategy = PinotHintOptions.JoinHintOptions.getJoinStrategy(join);
if (joinStrategy == null) {
// Default join - add exchange on both sides
if (joinInfo.leftKeys.isEmpty()) {
// Broadcast the right side if there is no join key
// Randomly distribute the left side, broadcast the right side if there is no join key
newLeft = PinotLogicalExchange.create(left, RelDistributions.RANDOM_DISTRIBUTED);
newRight = PinotLogicalExchange.create(right, RelDistributions.BROADCAST_DISTRIBUTED);
} else {
// Use hash exchange when there are join keys
newLeft = PinotLogicalExchange.create(left, RelDistributions.hash(joinInfo.leftKeys));
newRight = PinotLogicalExchange.create(right, RelDistributions.hash(joinInfo.rightKeys));
}
} else {
switch (joinStrategy) {
case HASH:
// Hash join - use hash exchange on both sides
Preconditions.checkState(!joinInfo.leftKeys.isEmpty(), "Hash join requires join keys");
newLeft = PinotLogicalExchange.create(left, RelDistributions.hash(joinInfo.leftKeys));
newRight = PinotLogicalExchange.create(right, RelDistributions.hash(joinInfo.rightKeys));
break;
case LOOKUP:
// Lookup join - use local exchange on the left side, no exchange on the right side
Preconditions.checkState(!joinInfo.leftKeys.isEmpty(), "Lookup join requires join keys");
newLeft = PinotLogicalExchange.create(left, RelDistributions.SINGLETON);
newRight = right;
break;
case BROADCAST:
// Broadcast join - use local exchange on the left side, broadcast exchange on the right side
newLeft = PinotLogicalExchange.create(left, RelDistributions.SINGLETON);
newRight = PinotLogicalExchange.create(right, RelDistributions.BROADCAST_DISTRIBUTED);
break;
default:
throw new IllegalStateException("Unsupported join strategy: " + joinStrategy);
}
}

// TODO: Consider creating different JOIN Rel for each join strategy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,7 @@ public boolean matches(RelOptRuleCall call) {
Join join = call.rel(0);

// Do not apply this rule if join strategy is explicitly set to something other than dynamic broadcast
String joinStrategy = PinotHintStrategyTable.getHintOption(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
String joinStrategy = PinotHintOptions.JoinHintOptions.getJoinStrategyHint(join);
if (joinStrategy != null && !joinStrategy.equals(
PinotHintOptions.JoinHintOptions.DYNAMIC_BROADCAST_JOIN_STRATEGY)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,11 @@ private JoinNode convertLogicalJoin(LogicalJoin join) {
}

// Check if the join hint specifies the join strategy
JoinNode.JoinStrategy joinStrategy;
String joinStrategyHint = PinotHintOptions.JoinHintOptions.getJoinStrategyHint(join);
if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(joinStrategyHint)) {
joinStrategy = JoinNode.JoinStrategy.LOOKUP;

// Run some validations for lookup join
Preconditions.checkArgument(!joinInfo.leftKeys.isEmpty(), "Lookup join requires join keys");
JoinNode.JoinStrategy joinStrategy = PinotHintOptions.JoinHintOptions.getJoinStrategy(join);
if (joinStrategy == null) {
joinStrategy = JoinNode.JoinStrategy.HASH;
} else if (joinStrategy == JoinNode.JoinStrategy.LOOKUP) {
// Run some validations for lookup join:
// Right table should be a dimension table, and the right input should be an identifier only ProjectNode over
// TableScanNode.
RelNode rightInput = PinotRuleUtils.unboxRel(join.getRight());
Expand All @@ -334,11 +332,6 @@ private JoinNode convertLogicalJoin(LogicalJoin join) {
Preconditions.checkState(projectInput instanceof TableScan,
"Right input for lookup join must be a Project over TableScan, got Project over: %s",
projectInput.getClass().getSimpleName());
} else if (PinotHintOptions.JoinHintOptions.useBroadcastJoinStrategy(joinStrategyHint)) {
joinStrategy = JoinNode.JoinStrategy.BROADCAST;
} else {
// TODO: Consider adding DYNAMIC_BROADCAST as a separate join strategy
joinStrategy = JoinNode.JoinStrategy.HASH;
}

return new JoinNode(DEFAULT_STAGE_ID, dataSchema, NodeHint.fromRelHints(join.getHints()), inputs, joinType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public int hashCode() {
return Objects.hash(super.hashCode(), _joinType, _leftKeys, _rightKeys, _nonEquiConditions, _joinStrategy);
}

// TODO: Consider adding DYNAMIC_BROADCAST as a separate join strategy
public enum JoinStrategy {
// HASH is the default equi-join strategy, where both left and right tables are hash partitioned on join keys, then
// shuffled to the same worker to perform the join.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ private void assignWorkersToNonRootFragment(PlanFragment fragment, DispatchableP
Map<Integer, DispatchablePlanMetadata> metadataMap = context.getDispatchablePlanMetadataMap();
DispatchablePlanMetadata metadata = metadataMap.get(fragment.getFragmentId());
boolean leafPlan = isLeafPlan(metadata);
Integer childIdWithLocalExchange = findLocalExchange(children);
if (childIdWithLocalExchange != null) {
int childIdWithLocalExchange = findLocalExchange(children);
if (childIdWithLocalExchange >= 0) {
// If there is a local exchange (child with SINGLETON distribution), use the same worker assignment to avoid
// shuffling data.
// TODO:
Expand All @@ -125,11 +125,9 @@ private void assignWorkersToNonRootFragment(PlanFragment fragment, DispatchableP
}

/**
* Returns the index of the child fragment that has a local exchange (SINGLETON distribution), or {@code null} if none
* exists.
* Returns the index of the child fragment that has a local exchange (SINGLETON distribution), or -1 if none exists.
*/
@Nullable
private Integer findLocalExchange(List<PlanFragment> children) {
private int findLocalExchange(List<PlanFragment> children) {
int numChildren = children.size();
for (int i = 0; i < numChildren; i++) {
PlanNode childPlanNode = children.get(i).getFragmentRoot();
Expand All @@ -138,7 +136,7 @@ private Integer findLocalExchange(List<PlanFragment> children) {
return i;
}
}
return null;
return -1;
}

private static boolean isLeafPlan(DispatchablePlanMetadata metadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ public ObjectNode visitJoin(JoinNode node, Void context) {
if (node.getJoinStrategy() == JoinNode.JoinStrategy.LOOKUP) {
return recursiveCase(node, MultiStageOperator.Type.LOOKUP_JOIN);
} else {
// TODO: Consider renaming this operator type. It handles multiple join strategies.
return recursiveCase(node, MultiStageOperator.Type.HASH_JOIN);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ public MultiStageOperator visitJoin(JoinNode node, OpChainExecutionContext conte
if (node.getJoinStrategy() == JoinNode.JoinStrategy.LOOKUP) {
return new LookupJoinOperator(context, leftOperator, rightOperator, node);
} else {
// TODO: Consider renaming this operator. It handles multiple join strategies.
return new HashJoinOperator(context, leftOperator, left.getDataSchema(), rightOperator, node);
}
}
Expand Down

0 comments on commit 01309b4

Please sign in to comment.