Skip to content

Commit ac02e60

Browse files
committed
Support local replicated join and local exchange parallelism
1 parent 132aead commit ac02e60

File tree

15 files changed

+652
-287
lines changed

15 files changed

+652
-287
lines changed

Diff for: pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java

+18
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,14 @@ private Map<ServerInstance, Pair<List<String>, List<String>>> getServerInstanceT
665665
return merged;
666666
}
667667

668+
@Nullable
669+
@Override
670+
public List<String> getSegments(BrokerRequest brokerRequest) {
671+
String tableNameWithType = brokerRequest.getQuerySource().getTableName();
672+
RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
673+
return routingEntry != null ? routingEntry.getSegments(brokerRequest) : null;
674+
}
675+
668676
@Override
669677
public Map<String, ServerInstance> getEnabledServerInstanceMap() {
670678
return _enabledServerInstanceMap;
@@ -848,5 +856,15 @@ InstanceSelector.SelectionResult calculateRouting(BrokerRequest brokerRequest, l
848856
Collections.emptyList(), numPrunedSegments);
849857
}
850858
}
859+
860+
List<String> getSegments(BrokerRequest brokerRequest) {
861+
Set<String> selectedSegments = _segmentSelector.select(brokerRequest);
862+
if (!selectedSegments.isEmpty()) {
863+
for (SegmentPruner segmentPruner : _segmentPruners) {
864+
selectedSegments = segmentPruner.prune(brokerRequest, selectedSegments);
865+
}
866+
}
867+
return new ArrayList<>(selectedSegments);
868+
}
851869
}
852870
}

Diff for: pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java

+7
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pinot.core.routing;
2020

21+
import java.util.List;
2122
import java.util.Map;
2223
import java.util.Set;
2324
import javax.annotation.Nullable;
@@ -55,6 +56,12 @@ public interface RoutingManager {
5556
@Nullable
5657
RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId);
5758

59+
/**
60+
* Returns the segments that are relevant for the given broker request.
61+
*/
62+
@Nullable
63+
List<String> getSegments(BrokerRequest brokerRequest);
64+
5865
/**
5966
* Validate routing exist for a table
6067
*

Diff for: pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java

+11
Original file line numberDiff line numberDiff line change
@@ -173,28 +173,39 @@ public static DistributionType fromHint(@Nullable String hint) {
173173
}
174174

175175
public static class TableHintOptions {
176+
176177
/**
177178
* Indicates how many partitions the table must be partitioned by.
178179
* This must be equal to the partition count of the table in
179180
* {@code tableIndexConfig.segmentPartitionConfig.columnPartitionMap}.
180181
*/
181182
public static final String PARTITION_KEY = "partition_key";
183+
182184
/**
183185
* The function to use to partition the table.
184186
* This must be equal to {@code functionName} in {@code tableIndexConfig.segmentPartitionConfig.columnPartitionMap}.
185187
*/
186188
public static final String PARTITION_FUNCTION = "partition_function";
189+
187190
/**
188191
* The size of each partition.
189192
* This must be equal to {@code numPartition} in {@code tableIndexConfig.segmentPartitionConfig.columnPartitionMap}.
190193
*/
191194
public static final String PARTITION_SIZE = "partition_size";
195+
192196
/**
193197
* The number of workers per partition.
194198
*
195199
* How many threads to use in the following stage after partition is joined.
196200
* When partition info is set, each partition is processed as a separate query in the leaf stage.
201+
* When partition info is not set, we count all data processed in the leaf stage as the same partition.
197202
*/
198203
public static final String PARTITION_PARALLELISM = "partition_parallelism";
204+
205+
/**
206+
* Indicates whether the table is replicated across all workers. When table is replicated across all workers, we can
207+
* execute the same query on all workers to achieve broadcast without network shuffle.
208+
*/
209+
public static final String IS_REPLICATED = "is_replicated";
199210
}
200211
}

Diff for: pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/logical/PinotLogicalExchange.java

+21-11
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pinot.calcite.rel.logical;
2020

21+
import java.util.List;
2122
import org.apache.calcite.plan.Convention;
2223
import org.apache.calcite.plan.RelOptCluster;
2324
import org.apache.calcite.plan.RelTraitSet;
@@ -35,38 +36,43 @@
3536
*/
3637
public class PinotLogicalExchange extends Exchange {
3738
private final PinotRelExchangeType _exchangeType;
39+
// NOTE: We might change distribution type for local exchange. Store the keys separately.
40+
private final List<Integer> _keys;
3841

3942
private PinotLogicalExchange(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RelDistribution distribution,
40-
PinotRelExchangeType exchangeType) {
43+
PinotRelExchangeType exchangeType, List<Integer> keys) {
4144
super(cluster, traitSet, input, distribution);
4245
_exchangeType = exchangeType;
46+
_keys = keys;
4347
assert traitSet.containsIfApplicable(Convention.NONE);
4448
}
4549

4650
public static PinotLogicalExchange create(RelNode input, RelDistribution distribution) {
47-
return create(input, distribution, PinotRelExchangeType.getDefaultExchangeType());
51+
return create(input, distribution, PinotRelExchangeType.getDefaultExchangeType(), distribution.getKeys());
52+
}
53+
54+
public static PinotLogicalExchange create(RelNode input, RelDistribution distribution, List<Integer> keys) {
55+
return create(input, distribution, PinotRelExchangeType.getDefaultExchangeType(), keys);
4856
}
4957

50-
/**
51-
* Creates a LogicalExchange.
52-
*
53-
* @param input Input relational expression
54-
* @param distribution Distribution specification
55-
* @param exchangeType RelExchangeType specification
56-
*/
5758
public static PinotLogicalExchange create(RelNode input, RelDistribution distribution,
5859
PinotRelExchangeType exchangeType) {
60+
return create(input, distribution, exchangeType, distribution.getKeys());
61+
}
62+
63+
public static PinotLogicalExchange create(RelNode input, RelDistribution distribution,
64+
PinotRelExchangeType exchangeType, List<Integer> keys) {
5965
RelOptCluster cluster = input.getCluster();
6066
distribution = RelDistributionTraitDef.INSTANCE.canonize(distribution);
6167
RelTraitSet traitSet = input.getTraitSet().replace(Convention.NONE).replace(distribution);
62-
return new PinotLogicalExchange(cluster, traitSet, input, distribution, exchangeType);
68+
return new PinotLogicalExchange(cluster, traitSet, input, distribution, exchangeType, keys);
6369
}
6470

6571
//~ Methods ----------------------------------------------------------------
6672

6773
@Override
6874
public Exchange copy(RelTraitSet traitSet, RelNode newInput, RelDistribution newDistribution) {
69-
return new PinotLogicalExchange(getCluster(), traitSet, newInput, newDistribution, _exchangeType);
75+
return new PinotLogicalExchange(getCluster(), traitSet, newInput, newDistribution, _exchangeType, _keys);
7076
}
7177

7278
@Override
@@ -86,4 +92,8 @@ public RelWriter explainTerms(RelWriter pw) {
8692
public PinotRelExchangeType getExchangeType() {
8793
return _exchangeType;
8894
}
95+
96+
public List<Integer> getKeys() {
97+
return _keys;
98+
}
8999
}

Diff for: pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,9 @@ private static PinotLogicalExchange createExchangeForLookupJoin(PinotHintOptions
102102
List<Integer> keys, RelNode child) {
103103
switch (distributionType) {
104104
case LOCAL:
105-
return PinotLogicalExchange.create(child, RelDistributions.SINGLETON);
105+
// NOTE: We use SINGLETON to represent local distribution. Add keys to the exchange because we might want to
106+
// switch it to HASH distribution to increase parallelism. See MailboxAssignmentVisitor for details.
107+
return PinotLogicalExchange.create(child, RelDistributions.SINGLETON, keys);
106108
case HASH:
107109
Preconditions.checkArgument(!keys.isEmpty(), "Hash distribution requires join keys");
108110
return PinotLogicalExchange.create(child, RelDistributions.hash(keys));
@@ -117,7 +119,9 @@ private static PinotLogicalExchange createExchangeForHashJoin(PinotHintOptions.D
117119
List<Integer> keys, RelNode child) {
118120
switch (distributionType) {
119121
case LOCAL:
120-
return PinotLogicalExchange.create(child, RelDistributions.SINGLETON);
122+
// NOTE: We use SINGLETON to represent local distribution. Add keys to the exchange because we might want to
123+
// switch it to HASH distribution to increase parallelism. See MailboxAssignmentVisitor for details.
124+
return PinotLogicalExchange.create(child, RelDistributions.SINGLETON, keys);
121125
case HASH:
122126
Preconditions.checkArgument(!keys.isEmpty(), "Hash distribution requires join keys");
123127
return PinotLogicalExchange.create(child, RelDistributions.hash(keys));

Diff for: pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java

+15-11
Original file line numberDiff line numberDiff line change
@@ -143,34 +143,38 @@ public PlanNode toPlanNode(RelNode node) {
143143
}
144144

145145
private ExchangeNode convertLogicalExchange(Exchange node) {
146+
RelDistribution distribution = node.getDistribution();
147+
RelDistribution.Type distributionType = distribution.getType();
148+
boolean prePartitioned;
149+
if (distributionType == RelDistribution.Type.HASH_DISTRIBUTED) {
150+
RelDistribution inputDistributionTrait = node.getInputs().get(0).getTraitSet().getDistribution();
151+
prePartitioned = distribution.equals(inputDistributionTrait);
152+
} else {
153+
prePartitioned = false;
154+
}
146155
PinotRelExchangeType exchangeType;
156+
List<Integer> keys;
147157
List<RelFieldCollation> collations;
148158
boolean sortOnSender;
149159
boolean sortOnReceiver;
150160
if (node instanceof PinotLogicalSortExchange) {
151161
PinotLogicalSortExchange sortExchange = (PinotLogicalSortExchange) node;
152162
exchangeType = sortExchange.getExchangeType();
163+
keys = distribution.getKeys();
153164
collations = sortExchange.getCollation().getFieldCollations();
154165
sortOnSender = sortExchange.isSortOnSender();
155166
sortOnReceiver = sortExchange.isSortOnReceiver();
156167
} else {
157168
assert node instanceof PinotLogicalExchange;
158-
exchangeType = ((PinotLogicalExchange) node).getExchangeType();
169+
PinotLogicalExchange exchange = (PinotLogicalExchange) node;
170+
exchangeType = exchange.getExchangeType();
171+
keys = exchange.getKeys();
159172
collations = null;
160173
sortOnSender = false;
161174
sortOnReceiver = false;
162175
}
163-
RelDistribution distribution = node.getDistribution();
164-
RelDistribution.Type distributionType = distribution.getType();
165-
List<Integer> keys;
166-
boolean prePartitioned;
167-
if (distributionType == RelDistribution.Type.HASH_DISTRIBUTED) {
168-
keys = distribution.getKeys();
169-
RelDistribution inputDistributionTrait = node.getInputs().get(0).getTraitSet().getDistribution();
170-
prePartitioned = distribution.equals(inputDistributionTrait);
171-
} else {
176+
if (keys.isEmpty()) {
172177
keys = null;
173-
prePartitioned = false;
174178
}
175179
return new ExchangeNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), convertInputs(node.getInputs()),
176180
exchangeType, distributionType, keys, prePartitioned, collations, sortOnSender, sortOnReceiver, null);

Diff for: pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java

+29-27
Original file line numberDiff line numberDiff line change
@@ -46,46 +46,39 @@ public class DispatchablePlanMetadata implements Serializable {
4646
// --------------------------------------------------------------------------
4747
// Fields extracted with {@link DispatchablePlanVisitor}
4848
// --------------------------------------------------------------------------
49-
// info from TableNode
50-
private final List<String> _scannedTables;
49+
50+
// Info from TableNode
51+
private final List<String> _scannedTables = new ArrayList<>();
5152
private Map<String, String> _tableOptions;
52-
// info from MailboxSendNode - whether a stage is pre-partitioned by the same way the sending exchange desires
53+
54+
// Info from MailboxSendNode - whether a stage is pre-partitioned by the same way the sending exchange desires
5355
private boolean _isPrePartitioned;
54-
// info from PlanNode that requires singleton (e.g. SortNode/AggregateNode)
56+
57+
// Info from PlanNode that requires singleton (e.g. SortNode/AggregateNode)
5558
private boolean _requiresSingletonInstance;
5659

5760
// TODO: Change the following maps to lists
5861

5962
// --------------------------------------------------------------------------
6063
// Fields extracted with {@link PinotDispatchPlanner}
6164
// --------------------------------------------------------------------------
62-
// used for assigning server/worker nodes.
63-
private Map<Integer, QueryServerInstance> _workerIdToServerInstanceMap;
6465

65-
// used for table scan stage - we use ServerInstance instead of VirtualServer
66-
// here because all virtual servers that share a server instance will have the
67-
// same segments on them
66+
// The following fields are calculated in {@link WorkerManager}
67+
// Available for both leaf and intermediate stage
68+
private Map<Integer, QueryServerInstance> _workerIdToServerInstanceMap;
69+
private String _partitionFunction;
70+
// Available for leaf stage only
71+
// Map from workerId -> {tableType -> segments}
6872
private Map<Integer, Map<String, List<String>>> _workerIdToSegmentsMap;
69-
70-
// used for build mailboxes between workers.
71-
// workerId -> {planFragmentId -> mailbox list}
72-
private final Map<Integer, Map<Integer, MailboxInfos>> _workerIdToMailboxesMap;
73-
74-
// used for tracking unavailable segments from routing table, then assemble missing segments exception.
75-
private final Map<String, Set<String>> _tableToUnavailableSegmentsMap;
76-
77-
// time boundary info
73+
// Map from tableType -> segments, available when 'is_replicated' hint is set to true
74+
private Map<String, List<String>> _replicatedSegments;
7875
private TimeBoundaryInfo _timeBoundaryInfo;
76+
private int _partitionParallelism = 1;
77+
private final Map<String, Set<String>> _tableToUnavailableSegmentsMap = new HashMap<>();
7978

80-
// physical partition info
81-
private String _partitionFunction;
82-
private int _partitionParallelism;
83-
84-
public DispatchablePlanMetadata() {
85-
_scannedTables = new ArrayList<>();
86-
_workerIdToMailboxesMap = new HashMap<>();
87-
_tableToUnavailableSegmentsMap = new HashMap<>();
88-
}
79+
// Calculated in {@link MailboxAssignmentVisitor}
80+
// Map from workerId -> {planFragmentId -> mailboxes}
81+
private final Map<Integer, Map<Integer, MailboxInfos>> _workerIdToMailboxesMap = new HashMap<>();
8982

9083
public List<String> getScannedTables() {
9184
return _scannedTables;
@@ -125,6 +118,15 @@ public void setWorkerIdToSegmentsMap(Map<Integer, Map<String, List<String>>> wor
125118
_workerIdToSegmentsMap = workerIdToSegmentsMap;
126119
}
127120

121+
@Nullable
122+
public Map<String, List<String>> getReplicatedSegments() {
123+
return _replicatedSegments;
124+
}
125+
126+
public void setReplicatedSegments(Map<String, List<String>> replicatedSegments) {
127+
_replicatedSegments = replicatedSegments;
128+
}
129+
128130
public Map<Integer, Map<Integer, MailboxInfos>> getWorkerIdToMailboxesMap() {
129131
return _workerIdToMailboxesMap;
130132
}

0 commit comments

Comments
 (0)