Skip to content

Commit

Permalink
Support local replicated join and local exchange parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang committed Jan 23, 2025
1 parent 4967780 commit 96ec407
Show file tree
Hide file tree
Showing 15 changed files with 649 additions and 289 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,14 @@ private Map<ServerInstance, Pair<List<String>, List<String>>> getServerInstanceT
return merged;
}

@Nullable
@Override
public List<String> getSegments(BrokerRequest brokerRequest) {
String tableNameWithType = brokerRequest.getQuerySource().getTableName();
RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
return routingEntry != null ? routingEntry.getSegments(brokerRequest) : null;
}

@Override
public Map<String, ServerInstance> getEnabledServerInstanceMap() {
return _enabledServerInstanceMap;
Expand Down Expand Up @@ -848,5 +856,15 @@ InstanceSelector.SelectionResult calculateRouting(BrokerRequest brokerRequest, l
Collections.emptyList(), numPrunedSegments);
}
}

List<String> getSegments(BrokerRequest brokerRequest) {
Set<String> selectedSegments = _segmentSelector.select(brokerRequest);
if (!selectedSegments.isEmpty()) {
for (SegmentPruner segmentPruner : _segmentPruners) {
selectedSegments = segmentPruner.prune(brokerRequest, selectedSegments);
}
}
return new ArrayList<>(selectedSegments);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.routing;

import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -55,6 +56,12 @@ public interface RoutingManager {
@Nullable
RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId);

/**
* Returns the segments that are relevant for the given broker request.
*/
@Nullable
List<String> getSegments(BrokerRequest brokerRequest);

/**
* Validate routing exist for a table
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,28 +173,39 @@ public static DistributionType fromHint(@Nullable String hint) {
}

public static class TableHintOptions {

/**
* Indicates how many partitions the table must be partitioned by.
* This must be equal to the partition count of the table in
* {@code tableIndexConfig.segmentPartitionConfig.columnPartitionMap}.
*/
public static final String PARTITION_KEY = "partition_key";

/**
* The function to use to partition the table.
* This must be equal to {@code functionName} in {@code tableIndexConfig.segmentPartitionConfig.columnPartitionMap}.
*/
public static final String PARTITION_FUNCTION = "partition_function";

/**
* The size of each partition.
* This must be equal to {@code numPartition} in {@code tableIndexConfig.segmentPartitionConfig.columnPartitionMap}.
*/
public static final String PARTITION_SIZE = "partition_size";

/**
* The number of workers per partition.
*
* How many threads to use in the following stage after partition is joined.
* When partition info is set, each partition is processed as a separate query in the leaf stage.
* When partition info is not set, we count all data processed in the leaf stage as the same partition.
*/
public static final String PARTITION_PARALLELISM = "partition_parallelism";

/**
* Indicates whether the table is replicated across all workers. When table is replicated across all workers, we can
* execute the same query on all workers to achieve broadcast without network shuffle.
*/
public static final String IS_REPLICATED = "is_replicated";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.calcite.rel.logical;

import java.util.List;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
Expand All @@ -35,38 +36,43 @@
*/
public class PinotLogicalExchange extends Exchange {
private final PinotRelExchangeType _exchangeType;
// NOTE: We might change distribution type for local exchange. Store the keys separately.
private final List<Integer> _keys;

private PinotLogicalExchange(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RelDistribution distribution,
PinotRelExchangeType exchangeType) {
PinotRelExchangeType exchangeType, List<Integer> keys) {
super(cluster, traitSet, input, distribution);
_exchangeType = exchangeType;
_keys = keys;
assert traitSet.containsIfApplicable(Convention.NONE);
}

public static PinotLogicalExchange create(RelNode input, RelDistribution distribution) {
return create(input, distribution, PinotRelExchangeType.getDefaultExchangeType());
return create(input, distribution, PinotRelExchangeType.getDefaultExchangeType(), distribution.getKeys());
}

public static PinotLogicalExchange create(RelNode input, RelDistribution distribution, List<Integer> keys) {
return create(input, distribution, PinotRelExchangeType.getDefaultExchangeType(), keys);
}

/**
* Creates a LogicalExchange.
*
* @param input Input relational expression
* @param distribution Distribution specification
* @param exchangeType RelExchangeType specification
*/
public static PinotLogicalExchange create(RelNode input, RelDistribution distribution,
PinotRelExchangeType exchangeType) {
return create(input, distribution, exchangeType, distribution.getKeys());
}

public static PinotLogicalExchange create(RelNode input, RelDistribution distribution,
PinotRelExchangeType exchangeType, List<Integer> keys) {
RelOptCluster cluster = input.getCluster();
distribution = RelDistributionTraitDef.INSTANCE.canonize(distribution);
RelTraitSet traitSet = input.getTraitSet().replace(Convention.NONE).replace(distribution);
return new PinotLogicalExchange(cluster, traitSet, input, distribution, exchangeType);
return new PinotLogicalExchange(cluster, traitSet, input, distribution, exchangeType, keys);
}

//~ Methods ----------------------------------------------------------------

@Override
public Exchange copy(RelTraitSet traitSet, RelNode newInput, RelDistribution newDistribution) {
return new PinotLogicalExchange(getCluster(), traitSet, newInput, newDistribution, _exchangeType);
return new PinotLogicalExchange(getCluster(), traitSet, newInput, newDistribution, _exchangeType, _keys);
}

@Override
Expand All @@ -86,4 +92,8 @@ public RelWriter explainTerms(RelWriter pw) {
public PinotRelExchangeType getExchangeType() {
return _exchangeType;
}

public List<Integer> getKeys() {
return _keys;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ private static PinotLogicalExchange createExchangeForLookupJoin(PinotHintOptions
List<Integer> keys, RelNode child) {
switch (distributionType) {
case LOCAL:
return PinotLogicalExchange.create(child, RelDistributions.SINGLETON);
// NOTE: We use SINGLETON to represent local distribution. Add keys to the exchange because we might want to
// switch it to HASH distribution to increase parallelism. See MailboxAssignmentVisitor for details.
return PinotLogicalExchange.create(child, RelDistributions.SINGLETON, keys);
case HASH:
Preconditions.checkArgument(!keys.isEmpty(), "Hash distribution requires join keys");
return PinotLogicalExchange.create(child, RelDistributions.hash(keys));
Expand All @@ -117,7 +119,9 @@ private static PinotLogicalExchange createExchangeForHashJoin(PinotHintOptions.D
List<Integer> keys, RelNode child) {
switch (distributionType) {
case LOCAL:
return PinotLogicalExchange.create(child, RelDistributions.SINGLETON);
// NOTE: We use SINGLETON to represent local distribution. Add keys to the exchange because we might want to
// switch it to HASH distribution to increase parallelism. See MailboxAssignmentVisitor for details.
return PinotLogicalExchange.create(child, RelDistributions.SINGLETON, keys);
case HASH:
Preconditions.checkArgument(!keys.isEmpty(), "Hash distribution requires join keys");
return PinotLogicalExchange.create(child, RelDistributions.hash(keys));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,34 +147,38 @@ public PlanNode toPlanNode(RelNode node) {
}

private ExchangeNode convertLogicalExchange(Exchange node) {
RelDistribution distribution = node.getDistribution();
RelDistribution.Type distributionType = distribution.getType();
boolean prePartitioned;
if (distributionType == RelDistribution.Type.HASH_DISTRIBUTED) {
RelDistribution inputDistributionTrait = node.getInputs().get(0).getTraitSet().getDistribution();
prePartitioned = distribution.equals(inputDistributionTrait);
} else {
prePartitioned = false;
}
PinotRelExchangeType exchangeType;
List<Integer> keys;
List<RelFieldCollation> collations;
boolean sortOnSender;
boolean sortOnReceiver;
if (node instanceof PinotLogicalSortExchange) {
PinotLogicalSortExchange sortExchange = (PinotLogicalSortExchange) node;
exchangeType = sortExchange.getExchangeType();
keys = distribution.getKeys();
collations = sortExchange.getCollation().getFieldCollations();
sortOnSender = sortExchange.isSortOnSender();
sortOnReceiver = sortExchange.isSortOnReceiver();
} else {
assert node instanceof PinotLogicalExchange;
exchangeType = ((PinotLogicalExchange) node).getExchangeType();
PinotLogicalExchange exchange = (PinotLogicalExchange) node;
exchangeType = exchange.getExchangeType();
keys = exchange.getKeys();
collations = null;
sortOnSender = false;
sortOnReceiver = false;
}
RelDistribution distribution = node.getDistribution();
RelDistribution.Type distributionType = distribution.getType();
List<Integer> keys;
boolean prePartitioned;
if (distributionType == RelDistribution.Type.HASH_DISTRIBUTED) {
keys = distribution.getKeys();
RelDistribution inputDistributionTrait = node.getInputs().get(0).getTraitSet().getDistribution();
prePartitioned = distribution.equals(inputDistributionTrait);
} else {
if (keys.isEmpty()) {
keys = null;
prePartitioned = false;
}
return new ExchangeNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), convertInputs(node.getInputs()),
exchangeType, distributionType, keys, prePartitioned, collations, sortOnSender, sortOnReceiver, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,46 +46,39 @@ public class DispatchablePlanMetadata implements Serializable {
// --------------------------------------------------------------------------
// Fields extracted with {@link DispatchablePlanVisitor}
// --------------------------------------------------------------------------
// info from TableNode
private final List<String> _scannedTables;

// Info from TableNode
private final List<String> _scannedTables = new ArrayList<>();
private Map<String, String> _tableOptions;
// info from MailboxSendNode - whether a stage is pre-partitioned by the same way the sending exchange desires

// Info from MailboxSendNode - whether a stage is pre-partitioned by the same way the sending exchange desires
private boolean _isPrePartitioned;
// info from PlanNode that requires singleton (e.g. SortNode/AggregateNode)

// Info from PlanNode that requires singleton (e.g. SortNode/AggregateNode)
private boolean _requiresSingletonInstance;

// TODO: Change the following maps to lists

// --------------------------------------------------------------------------
// Fields extracted with {@link PinotDispatchPlanner}
// --------------------------------------------------------------------------
// used for assigning server/worker nodes.
private Map<Integer, QueryServerInstance> _workerIdToServerInstanceMap;

// used for table scan stage - we use ServerInstance instead of VirtualServer
// here because all virtual servers that share a server instance will have the
// same segments on them
// The following fields are calculated in {@link WorkerManager}
// Available for both leaf and intermediate stage
private Map<Integer, QueryServerInstance> _workerIdToServerInstanceMap;
private String _partitionFunction;
// Available for leaf stage only
// Map from workerId -> {tableType -> segments}
private Map<Integer, Map<String, List<String>>> _workerIdToSegmentsMap;

// used for build mailboxes between workers.
// workerId -> {planFragmentId -> mailbox list}
private final Map<Integer, Map<Integer, MailboxInfos>> _workerIdToMailboxesMap;

// used for tracking unavailable segments from routing table, then assemble missing segments exception.
private final Map<String, Set<String>> _tableToUnavailableSegmentsMap;

// time boundary info
// Map from tableType -> segments, available when 'is_replicated' hint is set to true
private Map<String, List<String>> _replicatedSegments;
private TimeBoundaryInfo _timeBoundaryInfo;
private int _partitionParallelism = 1;
private final Map<String, Set<String>> _tableToUnavailableSegmentsMap = new HashMap<>();

// physical partition info
private String _partitionFunction;
private int _partitionParallelism;

public DispatchablePlanMetadata() {
_scannedTables = new ArrayList<>();
_workerIdToMailboxesMap = new HashMap<>();
_tableToUnavailableSegmentsMap = new HashMap<>();
}
// Calculated in {@link MailboxAssignmentVisitor}
// Map from workerId -> {planFragmentId -> mailboxes}
private final Map<Integer, Map<Integer, MailboxInfos>> _workerIdToMailboxesMap = new HashMap<>();

public List<String> getScannedTables() {
return _scannedTables;
Expand Down Expand Up @@ -125,6 +118,15 @@ public void setWorkerIdToSegmentsMap(Map<Integer, Map<String, List<String>>> wor
_workerIdToSegmentsMap = workerIdToSegmentsMap;
}

@Nullable
public Map<String, List<String>> getReplicatedSegments() {
return _replicatedSegments;
}

public void setReplicatedSegments(Map<String, List<String>> replicatedSegments) {
_replicatedSegments = replicatedSegments;
}

public Map<Integer, Map<Integer, MailboxInfos>> getWorkerIdToMailboxesMap() {
return _workerIdToMailboxesMap;
}
Expand Down
Loading

0 comments on commit 96ec407

Please sign in to comment.