Skip to content

Commit

Permalink
Support exchange type hint to allow broadcast join
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang committed Jan 16, 2025
1 parent f37bc3d commit c00a392
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pinot.calcite.rel.hint;

import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.hint.RelHint;
Expand Down Expand Up @@ -83,6 +85,9 @@ public static class JoinHintOptions {
// "lookup" can be used when the right table is a dimension table replicated to all workers
public static final String LOOKUP_JOIN_STRATEGY = "lookup";

public static final String LEFT_EXCHANGE_TYPE = "left_exchange_type";
public static final String RIGHT_EXCHANGE_TYPE = "right_exchange_type";

/**
* Max rows allowed to build the right table hash collection.
*/
Expand All @@ -105,11 +110,66 @@ public static class JoinHintOptions {
*/
public static final String APPEND_DISTINCT_TO_SEMI_JOIN_PROJECT = "append_distinct_to_semi_join_project";

@Nullable
public static String getJoinStrategyHint(Join join) {
return PinotHintStrategyTable.getHintOption(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
}

// TODO: Consider adding a Join implementation with join strategy.
public static boolean useLookupJoinStrategy(Join join) {
return LOOKUP_JOIN_STRATEGY.equalsIgnoreCase(
return LOOKUP_JOIN_STRATEGY.equalsIgnoreCase(getJoinStrategyHint(join));
}

@Nullable
public static ExchangeTypes.Type getLeftExchangeType(Join join) {
return ExchangeTypes.getType(
PinotHintStrategyTable.getHintOption(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.JOIN_STRATEGY));
PinotHintOptions.JoinHintOptions.LEFT_EXCHANGE_TYPE));
}

@Nullable
public static ExchangeTypes.Type getRightExchangeType(Join join) {
return ExchangeTypes.getType(
PinotHintStrategyTable.getHintOption(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.RIGHT_EXCHANGE_TYPE));
}
}

/**
* Similar to {@link RelDistribution.Type}, this class contains the exchange types to be used to shuffle data.
*/
public static class ExchangeTypes {
public static final String LOCAL = "local";
public static final String HASH = "hash";
public static final String BROADCAST = "broadcast";
public static final String RANDOM = "random";

public enum Type {
LOCAL, // Exchange data locally without ser/de
HASH, // Exchange data by hash partitioning
BROADCAST, // Exchange data by broadcasting the data to all workers
RANDOM // Exchange data randomly
}

@Nullable
public static Type getType(@Nullable String hint) {
if (hint == null) {
return null;
}
if (hint.equalsIgnoreCase(LOCAL)) {
return Type.LOCAL;
}
if (hint.equalsIgnoreCase(HASH)) {
return Type.HASH;
}
if (hint.equalsIgnoreCase(BROADCAST)) {
return Type.BROADCAST;
}
if (hint.equalsIgnoreCase(RANDOM)) {
return Type.RANDOM;
}
throw new IllegalArgumentException("Unsupported exchange type hint: " + hint);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.calcite.rel.rules;

import com.google.common.base.Preconditions;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelDistributions;
Expand Down Expand Up @@ -52,22 +53,124 @@ public void onMatch(RelOptRuleCall call) {
RelNode left = PinotRuleUtils.unboxRel(join.getInput(0));
RelNode right = PinotRuleUtils.unboxRel(join.getInput(1));
JoinInfo joinInfo = join.analyzeCondition();
PinotHintOptions.ExchangeTypes.Type leftExchangeType = PinotHintOptions.JoinHintOptions.getLeftExchangeType(join);
PinotHintOptions.ExchangeTypes.Type rightExchangeType = PinotHintOptions.JoinHintOptions.getRightExchangeType(join);
RelNode newLeft;
RelNode newRight;
if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(join)) {
// Lookup join - add local exchange on the left side
newLeft = PinotLogicalExchange.create(left, RelDistributions.SINGLETON);
// Lookup join
if (leftExchangeType == null) {
// By default, use local exchange for the left side
newLeft = PinotLogicalExchange.create(left, RelDistributions.SINGLETON);
} else {
switch (leftExchangeType) {
case LOCAL:
newLeft = PinotLogicalExchange.create(left, RelDistributions.SINGLETON);
break;
case HASH:
Preconditions.checkArgument(!joinInfo.leftKeys.isEmpty(), "Hash exchange requires join keys");
newLeft = PinotLogicalExchange.create(left, RelDistributions.hash(joinInfo.leftKeys));
break;
case RANDOM:
newLeft = PinotLogicalExchange.create(left, RelDistributions.RANDOM_DISTRIBUTED);
break;
default:
throw new IllegalArgumentException(
"Unsupported left exchange type: " + leftExchangeType + " for lookup join");
}
}
Preconditions.checkArgument(rightExchangeType == null,
"Right exchange type hint is not supported for lookup join");
newRight = right;
} else {
// Regular join - add exchange on both sides
// Hash join
// TODO: Validate if the configured exchange types are valid
if (joinInfo.leftKeys.isEmpty()) {
// Broadcast the right side if there is no join key
newLeft = PinotLogicalExchange.create(left, RelDistributions.RANDOM_DISTRIBUTED);
newRight = PinotLogicalExchange.create(right, RelDistributions.BROADCAST_DISTRIBUTED);
// No join key, cannot use hash exchange
if (leftExchangeType == null) {
// By default, randomly distribute the left side
newLeft = PinotLogicalExchange.create(left, RelDistributions.RANDOM_DISTRIBUTED);
} else {
switch (leftExchangeType) {
case LOCAL:
newLeft = PinotLogicalExchange.create(left, RelDistributions.SINGLETON);
break;
case RANDOM:
newLeft = PinotLogicalExchange.create(left, RelDistributions.RANDOM_DISTRIBUTED);
break;
case BROADCAST:
newLeft = PinotLogicalExchange.create(left, RelDistributions.BROADCAST_DISTRIBUTED);
break;
default:
throw new IllegalArgumentException(
"Unsupported left exchange type: " + leftExchangeType + " for hash join without join keys");
}
}
if (rightExchangeType == null) {
// By default, broadcast the right side
newRight = PinotLogicalExchange.create(right, RelDistributions.BROADCAST_DISTRIBUTED);
} else {
switch (rightExchangeType) {
case LOCAL:
newRight = PinotLogicalExchange.create(right, RelDistributions.SINGLETON);
break;
case RANDOM:
newRight = PinotLogicalExchange.create(right, RelDistributions.RANDOM_DISTRIBUTED);
break;
case BROADCAST:
newRight = PinotLogicalExchange.create(right, RelDistributions.BROADCAST_DISTRIBUTED);
break;
default:
throw new IllegalStateException(
"Unsupported right exchange type: " + rightExchangeType + " for hash join without join keys");
}
}
} else {
// Use hash exchange when there are join keys
newLeft = PinotLogicalExchange.create(left, RelDistributions.hash(joinInfo.leftKeys));
newRight = PinotLogicalExchange.create(right, RelDistributions.hash(joinInfo.rightKeys));
// There are join keys, hash exchange is supported
if (leftExchangeType == null) {
// By default, hash distribute the left side
newLeft = PinotLogicalExchange.create(left, RelDistributions.hash(joinInfo.leftKeys));
} else {
switch (leftExchangeType) {
case LOCAL:
newLeft = PinotLogicalExchange.create(left, RelDistributions.SINGLETON);
break;
case HASH:
newLeft = PinotLogicalExchange.create(left, RelDistributions.hash(joinInfo.leftKeys));
break;
case BROADCAST:
newLeft = PinotLogicalExchange.create(left, RelDistributions.BROADCAST_DISTRIBUTED);
break;
case RANDOM:
newLeft = PinotLogicalExchange.create(left, RelDistributions.RANDOM_DISTRIBUTED);
break;
default:
throw new IllegalArgumentException(
"Unsupported left exchange type: " + leftExchangeType + " for hash join with join keys");
}
}
if (rightExchangeType == null) {
// By default, hash distribute the right side
newRight = PinotLogicalExchange.create(right, RelDistributions.hash(joinInfo.rightKeys));
} else {
switch (rightExchangeType) {
case LOCAL:
newRight = PinotLogicalExchange.create(right, RelDistributions.SINGLETON);
break;
case HASH:
newRight = PinotLogicalExchange.create(right, RelDistributions.hash(joinInfo.rightKeys));
break;
case BROADCAST:
newRight = PinotLogicalExchange.create(right, RelDistributions.BROADCAST_DISTRIBUTED);
break;
case RANDOM:
newRight = PinotLogicalExchange.create(right, RelDistributions.RANDOM_DISTRIBUTED);
break;
default:
throw new IllegalStateException(
"Unsupported right exchange type: " + rightExchangeType + " for hash join with join keys");
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,7 @@ public boolean matches(RelOptRuleCall call) {
Join join = call.rel(0);

// Do not apply this rule if join strategy is explicitly set to something other than dynamic broadcast
String joinStrategy = PinotHintStrategyTable.getHintOption(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
String joinStrategy = PinotHintOptions.JoinHintOptions.getJoinStrategyHint(join);
if (joinStrategy != null && !joinStrategy.equals(
PinotHintOptions.JoinHintOptions.DYNAMIC_BROADCAST_JOIN_STRATEGY)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,14 @@ private void assignWorkersToNonRootFragment(PlanFragment fragment, DispatchableP
Map<Integer, DispatchablePlanMetadata> metadataMap = context.getDispatchablePlanMetadataMap();
DispatchablePlanMetadata metadata = metadataMap.get(fragment.getFragmentId());
boolean leafPlan = isLeafPlan(metadata);
if (isLocalExchange(children)) {
// If it is a local exchange (single child with SINGLETON distribution), use the same worker assignment to avoid
int childIdWithLocalExchange = findLocalExchange(children);
if (childIdWithLocalExchange >= 0) {
// If there is a local exchange (child with SINGLETON distribution), use the same worker assignment to avoid
// shuffling data.
// TODO: Support partition parallelism
DispatchablePlanMetadata childMetadata = metadataMap.get(children.get(0).getFragmentId());
// TODO:
// 1. Support partition parallelism
// 2. Check if there are conflicts (multiple children with different local exchange)
DispatchablePlanMetadata childMetadata = metadataMap.get(children.get(childIdWithLocalExchange).getFragmentId());
metadata.setWorkerIdToServerInstanceMap(childMetadata.getWorkerIdToServerInstanceMap());
metadata.setPartitionFunction(childMetadata.getPartitionFunction());
if (leafPlan) {
Expand All @@ -121,13 +124,19 @@ private void assignWorkersToNonRootFragment(PlanFragment fragment, DispatchableP
}
}

private boolean isLocalExchange(List<PlanFragment> children) {
if (children.size() != 1) {
return false;
/**
* Returns the index of the child fragment that has a local exchange (SINGLETON distribution), or -1 if none exists.
*/
private int findLocalExchange(List<PlanFragment> children) {
int numChildren = children.size();
for (int i = 0; i < numChildren; i++) {
PlanNode childPlanNode = children.get(i).getFragmentRoot();
if (childPlanNode instanceof MailboxSendNode
&& ((MailboxSendNode) childPlanNode).getDistributionType() == RelDistribution.Type.SINGLETON) {
return i;
}
}
PlanNode childPlanNode = children.get(0).getFragmentRoot();
return childPlanNode instanceof MailboxSendNode
&& ((MailboxSendNode) childPlanNode).getDistributionType() == RelDistribution.Type.SINGLETON;
return -1;
}

private static boolean isLeafPlan(DispatchablePlanMetadata metadata) {
Expand Down
55 changes: 55 additions & 0 deletions pinot-query-planner/src/test/resources/queries/JoinPlans.json
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,61 @@
}
]
},
"broadcast_join_planning_tests": {
"queries": [
{
"description": "Simple broadcast join",
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(left_exchange_type = 'local', right_exchange_type = 'broadcast') */ a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1",
"output": [
"Execution Plan",
"\nLogicalProject(col1=[$0], col2=[$2])",
"\n LogicalJoin(condition=[=($0, $1)], joinType=[inner])",
"\n PinotLogicalExchange(distribution=[single])",
"\n LogicalProject(col1=[$0])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[broadcast])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[default, b]])",
"\n"
]
},
{
"description": "Broadcast join with filter on both left and right table",
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(left_exchange_type = 'local', right_exchange_type = 'broadcast') */ a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 WHERE a.col2 = 'foo' AND b.col2 = 'bar'",
"output": [
"Execution Plan",
"\nLogicalProject(col1=[$0], col2=[$2])",
"\n LogicalJoin(condition=[=($0, $1)], joinType=[inner])",
"\n PinotLogicalExchange(distribution=[single])",
"\n LogicalProject(col1=[$0])",
"\n LogicalFilter(condition=[=($1, _UTF-8'foo')])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[broadcast])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalFilter(condition=[=($1, _UTF-8'bar')])",
"\n LogicalTableScan(table=[[default, b]])",
"\n"
]
},
{
"description": "Broadcast join with transformation on both left and right table joined key",
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(left_exchange_type = 'local', right_exchange_type = 'broadcast') */ a.col1, b.col2 FROM a JOIN b ON upper(a.col1) = upper(b.col1)",
"output": [
"Execution Plan",
"\nLogicalProject(col1=[$0], col2=[$2])",
"\n LogicalJoin(condition=[=($1, $3)], joinType=[inner])",
"\n PinotLogicalExchange(distribution=[single])",
"\n LogicalProject(col1=[$0], $f8=[UPPER($0)])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[broadcast])",
"\n LogicalProject(col2=[$1], $f8=[UPPER($0)])",
"\n LogicalTableScan(table=[[default, b]])",
"\n"
]
}
]

},
"exception_throwing_join_planning_tests": {
"queries": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@
"description": "Colocated JOIN with partition column and group by non-partitioned column with stage parallelism",
"sql": "SET stageParallelism=2; SELECT {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num GROUP BY {tbl1}.name"
},
{
"description": "Broadcast JOIN without partition hint",
"sql": "SELECT /*+ joinOptions(left_exchange_type = 'local', right_exchange_type = 'broadcast') */ {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM {tbl1} JOIN {tbl2} ON {tbl1}.num = {tbl2}.num"
},
{
"description": "Broadcast JOIN with partition hint",
"sql": "SELECT /*+ joinOptions(left_exchange_type = 'local', right_exchange_type = 'broadcast') */ {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ JOIN {tbl2} ON {tbl1}.num = {tbl2}.num"
},
{
"description": "Colocated, Dynamic broadcast SEMI-JOIN with partition column",
"sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ {tbl1}.num, {tbl1}.name FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN ('xxx', 'yyy'))"
Expand Down

0 comments on commit c00a392

Please sign in to comment.