Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spool4 #14672

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open

Spool4 #14672

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
982ac13
Support stage replacement
gortiz Nov 19, 2024
d02b5cb
Verify the case where a spool tries to send twice to the same stage
gortiz Nov 19, 2024
05b894e
Add tests that verifies that different distribution breaks equivalence
gortiz Nov 19, 2024
1597f5b
Fix root mailbox send node build using null receiver stage
gortiz Nov 20, 2024
91b1b55
Remove unused variable
gortiz Nov 20, 2024
37168a0
Fix MailboxReceiveNode.equals, broken in previous spool commits
gortiz Nov 20, 2024
fe22305
Hide the receiver stage bit set in MailboxSendNode
gortiz Nov 20, 2024
a72157f
Apply suggestions from code review
gortiz Nov 21, 2024
edf8542
Make _senderStageId mutable to simplify getSenderStageId() logic
gortiz Nov 21, 2024
5bff873
Make StagesTestBase.Spool static
gortiz Nov 21, 2024
8263de3
Rename Spool as SpoolBuilder and add javadoc explaining the class
gortiz Nov 21, 2024
3b252cf
Fix error added in a previous commit
gortiz Nov 21, 2024
32fa712
Support spool in runtime. Still not complete.
gortiz Nov 20, 2024
f3bffb3
Fix design issues in multi-sender MailboxSendNode
gortiz Nov 20, 2024
f06138a
Add some explain physical plans with spool
gortiz Nov 21, 2024
0b45a95
Merge branch 'master' into spool3
gortiz Dec 13, 2024
cb7ff1f
Add some logs in trace mode
gortiz Dec 13, 2024
7d63fbf
Rename spool resource tests
gortiz Dec 13, 2024
0479a2b
Fix error detected (and only affecting) tests. Improve logs
gortiz Dec 16, 2024
53cebc8
Merge remote-tracking branch 'origin/master' into spool3
gortiz Dec 16, 2024
5d81c6a
Buffer multi-output on heap to avoid blocks
gortiz Dec 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,15 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptions, httpHeaders);
boolean inferPartitionHint = _config.getProperty(CommonConstants.Broker.CONFIG_OF_INFER_PARTITION_HINT,
CommonConstants.Broker.DEFAULT_INFER_PARTITION_HINT);
//@formatter:off
boolean defaultUseSpool = _config.getProperty(CommonConstants.Broker.CONFIG_OF_SPOOLS,
CommonConstants.Broker.DEFAULT_OF_SPOOLS);
QueryEnvironment queryEnvironment = new QueryEnvironment(QueryEnvironment.configBuilder()
.database(database)
.tableCache(_tableCache)
.workerManager(_workerManager)
.defaultInferPartitionHint(inferPartitionHint)
.defaultUseSpools(defaultUseSpool)
.build());
//@formatter:on
switch (sqlNodeAndOptions.getSqlNode().getKind()) {
case EXPLAIN:
boolean askServers = QueryOptionsUtils.isExplainAskingServers(queryOptions)
Expand Down
4 changes: 3 additions & 1 deletion pinot-common/src/main/proto/plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,15 @@ message MailboxReceiveNode {
}

message MailboxSendNode {
int32 receiverStageId = 1;
// kept for backward compatibility. Brokers populate it, but servers should prioritize receiverStageIds
int32 receiverStageId = 1 [deprecated = true];
ExchangeType exchangeType = 2;
DistributionType distributionType = 3;
repeated int32 keys = 4;
bool prePartitioned = 5;
repeated Collation collations = 6;
bool sort = 7;
repeated int32 receiverStageIds = 8;
}

message ProjectNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ public QueryEnvironment(String database, TableCache tableCache, @Nullable Worker
private PlannerContext getPlannerContext(SqlNodeAndOptions sqlNodeAndOptions) {
WorkerManager workerManager = getWorkerManager(sqlNodeAndOptions);
HepProgram traitProgram = getTraitProgram(workerManager);
return new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram, traitProgram);
return new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram, traitProgram,
sqlNodeAndOptions.getOptions());
}

@Nullable
Expand All @@ -163,14 +164,6 @@ private WorkerManager getWorkerManager(SqlNodeAndOptions sqlNodeAndOptions) {
}
}

/**
* Returns the planner context that should be used only for parsing queries.
*/
private PlannerContext getParsingPlannerContext() {
HepProgram traitProgram = getTraitProgram(null);
return new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram, traitProgram);
}

/**
* Plan a SQL query.
*
Expand All @@ -185,7 +178,6 @@ private PlannerContext getParsingPlannerContext() {
*/
public QueryPlannerResult planQuery(String sqlQuery, SqlNodeAndOptions sqlNodeAndOptions, long requestId) {
try (PlannerContext plannerContext = getPlannerContext(sqlNodeAndOptions)) {
plannerContext.setOptions(sqlNodeAndOptions.getOptions());
RelRoot relRoot = compileQuery(sqlNodeAndOptions.getSqlNode(), plannerContext);
// TODO: current code only assume one SubPlan per query, but we should support multiple SubPlans per query.
// Each SubPlan should be able to run independently from Broker then set the results into the dependent
Expand All @@ -209,8 +201,7 @@ public DispatchableSubPlan planQuery(String sqlQuery) {
*
* Similar to {@link QueryEnvironment#planQuery(String, SqlNodeAndOptions, long)}, this API runs the query
* compilation. But it doesn't run the distributed {@link DispatchableSubPlan} generation, instead it only
* returns the
* explained logical plan.
* returns the explained logical plan.
*
* @param sqlQuery SQL query string.
* @param sqlNodeAndOptions parsed SQL query.
Expand All @@ -221,7 +212,6 @@ public QueryPlannerResult explainQuery(String sqlQuery, SqlNodeAndOptions sqlNod
@Nullable AskingServerStageExplainer.OnServerExplainer onServerExplainer) {
try (PlannerContext plannerContext = getPlannerContext(sqlNodeAndOptions)) {
SqlExplain explain = (SqlExplain) sqlNodeAndOptions.getSqlNode();
plannerContext.setOptions(sqlNodeAndOptions.getOptions());
RelRoot relRoot = compileQuery(explain.getExplicandum(), plannerContext);
if (explain instanceof SqlPhysicalExplain) {
// get the physical plan for query.
Expand Down Expand Up @@ -271,8 +261,9 @@ public String explainQuery(String sqlQuery, long requestId) {
}

public List<String> getTableNamesForQuery(String sqlQuery) {
try (PlannerContext plannerContext = getParsingPlannerContext()) {
SqlNode sqlNode = CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery).getSqlNode();
SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery);
try (PlannerContext plannerContext = getPlannerContext(sqlNodeAndOptions)) {
SqlNode sqlNode = sqlNodeAndOptions.getSqlNode();
if (sqlNode.getKind().equals(SqlKind.EXPLAIN)) {
sqlNode = ((SqlExplain) sqlNode).getExplicandum();
}
Expand All @@ -288,8 +279,9 @@ public List<String> getTableNamesForQuery(String sqlQuery) {
* Returns whether the query can be successfully compiled in this query environment
*/
public boolean canCompileQuery(String query) {
try (PlannerContext plannerContext = getParsingPlannerContext()) {
SqlNode sqlNode = CalciteSqlParser.compileToSqlNodeAndOptions(query).getSqlNode();
SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);
try (PlannerContext plannerContext = getPlannerContext(sqlNodeAndOptions)) {
SqlNode sqlNode = sqlNodeAndOptions.getSqlNode();
if (sqlNode.getKind().equals(SqlKind.EXPLAIN)) {
sqlNode = ((SqlExplain) sqlNode).getExplicandum();
}
Expand Down Expand Up @@ -400,7 +392,7 @@ private DispatchableSubPlan toDispatchableSubPlan(RelRoot relRoot, PlannerContex

private DispatchableSubPlan toDispatchableSubPlan(RelRoot relRoot, PlannerContext plannerContext, long requestId,
@Nullable TransformationTracker.Builder<PlanNode, RelNode> tracker) {
SubPlan plan = PinotLogicalQueryPlanner.makePlan(relRoot, tracker);
SubPlan plan = PinotLogicalQueryPlanner.makePlan(relRoot, tracker, useSpools(plannerContext.getOptions()));
PinotDispatchPlanner pinotDispatchPlanner =
new PinotDispatchPlanner(plannerContext, _envConfig.getWorkerManager(), requestId, _envConfig.getTableCache());
return pinotDispatchPlanner.createDispatchableSubPlan(plan);
Expand Down Expand Up @@ -465,6 +457,14 @@ public static ImmutableQueryEnvironment.Config.Builder configBuilder() {
return ImmutableQueryEnvironment.Config.builder();
}

public boolean useSpools(Map<String, String> options) {
String optionValue = options.get(CommonConstants.Broker.Request.QueryOptionKey.USE_SPOOLS);
if (optionValue == null) {
return _envConfig.defaultUseSpools();
}
return Boolean.parseBoolean(optionValue);
}

@Value.Immutable
public interface Config {
String getDatabase();
Expand All @@ -484,6 +484,18 @@ default boolean defaultInferPartitionHint() {
return CommonConstants.Broker.DEFAULT_INFER_PARTITION_HINT;
}

/**
* Whether to use spools or not.
*
* This is treated as the default value for the broker and it is expected to be obtained from a Pinot configuration.
* This default value can be always overridden at query level by the query option
* {@link CommonConstants.Broker.Request.QueryOptionKey#USE_SPOOLS}.
*/
@Value.Default
default boolean defaultUseSpools() {
return CommonConstants.Broker.DEFAULT_OF_SPOOLS;
}

/**
* Returns the worker manager.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,16 @@ public class PlannerContext implements AutoCloseable {
private final RelOptPlanner _relOptPlanner;
private final LogicalPlanner _relTraitPlanner;

private Map<String, String> _options;
private final Map<String, String> _options;

public PlannerContext(FrameworkConfig config, Prepare.CatalogReader catalogReader, RelDataTypeFactory typeFactory,
HepProgram optProgram, HepProgram traitProgram) {
HepProgram optProgram, HepProgram traitProgram, Map<String, String> options) {
_planner = new PlannerImpl(config);
_validator = new Validator(config.getOperatorTable(), catalogReader, typeFactory);
_relOptPlanner = new LogicalPlanner(optProgram, Contexts.EMPTY_CONTEXT, config.getTraitDefs());
_relTraitPlanner = new LogicalPlanner(traitProgram, Contexts.EMPTY_CONTEXT,
Collections.singletonList(RelDistributionTraitDef.INSTANCE));
_options = options;
}

public PlannerImpl getPlanner() {
Expand All @@ -74,10 +75,6 @@ public LogicalPlanner getRelTraitPlanner() {
return _relTraitPlanner;
}

public void setOptions(Map<String, String> options) {
_options = options;
}

public Map<String, String> getOptions() {
return _options;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
*/
package org.apache.pinot.query.planner.explain;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.planner.plannode.AggregateNode;
Expand Down Expand Up @@ -212,14 +215,22 @@ public StringBuilder visitMailboxSend(MailboxSendNode node, Context context) {
private StringBuilder appendMailboxSend(MailboxSendNode node, Context context) {
appendInfo(node, context);

int receiverStageId = node.getReceiverStageId();
List<MailboxInfo> receiverMailboxInfos =
_dispatchableSubPlan.getQueryStageList().get(node.getStageId()).getWorkerMetadataList().get(context._workerId)
.getMailboxInfosMap().get(receiverStageId).getMailboxInfos();
List<Stream<String>> perStageDescriptions = new ArrayList<>();
// This iterator is guaranteed to be sorted by stageId
for (Integer receiverStageId : node.getReceiverStageIds()) {
List<MailboxInfo> receiverMailboxInfos =
_dispatchableSubPlan.getQueryStageList().get(node.getStageId()).getWorkerMetadataList().get(context._workerId)
.getMailboxInfosMap().get(receiverStageId).getMailboxInfos();
// Sort to ensure print order
Stream<String> stageDescriptions = receiverMailboxInfos.stream()
.sorted(Comparator.comparingInt(MailboxInfo::getPort))
.map(v -> "[" + receiverStageId + "]@" + v);
perStageDescriptions.add(stageDescriptions);
}
context._builder.append("->");
// Sort to ensure print order
String receivers = receiverMailboxInfos.stream().sorted(Comparator.comparingInt(MailboxInfo::getPort))
.map(v -> "[" + receiverStageId + "]@" + v).collect(Collectors.joining(",", "{", "}"));
String receivers = perStageDescriptions.stream()
.flatMap(Function.identity())
.collect(Collectors.joining(",", "{", "}"));
return context._builder.append(receivers);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class EquivalentStagesFinder {
private EquivalentStagesFinder() {
}

public static GroupedStages findEquivalentStages(MailboxSendNode root) {
public static GroupedStages findEquivalentStages(PlanNode root) {
Visitor visitor = new Visitor();
root.visit(visitor, null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,31 @@ public class EquivalentStagesReplacer {
private EquivalentStagesReplacer() {
}

public static void replaceEquivalentStages(PlanNode root, GroupedStages equivalentStages) {
replaceEquivalentStages(root, equivalentStages, OnSubstitution.NO_OP);
}

/**
* Replaces the equivalent stages in the query plan.
*
* @param root Root plan node
* @param equivalentStages Equivalent stages
*/
public static void replaceEquivalentStages(PlanNode root, GroupedStages equivalentStages) {
root.visit(Replacer.INSTANCE, equivalentStages);
public static void replaceEquivalentStages(PlanNode root, GroupedStages equivalentStages, OnSubstitution listener) {
root.visit(new Replacer(listener), equivalentStages);
}

public interface OnSubstitution {
OnSubstitution NO_OP = (receiver, oldSender, newSender) -> {
};
void onSubstitution(int receiver, int oldSender, int newSender);
}

private static class Replacer extends PlanNodeVisitor.DepthFirstVisitor<Void, GroupedStages> {
private static final Replacer INSTANCE = new Replacer();
private final OnSubstitution _listener;

private Replacer() {
public Replacer(OnSubstitution listener) {
_listener = listener;
}

@Override
Expand All @@ -62,6 +73,7 @@ public Void visitMailboxReceive(MailboxReceiveNode node, GroupedStages equivalen
// we don't want to visit the children of the node given it is going to be pruned
node.setSender(leader);
leader.addReceiver(node);
_listener.onSubstitution(node.getStageId(), sender.getStageId(), leader.getStageId());
} else {
visitMailboxSend(leader, equivalenceGroups);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ private PinotLogicalQueryPlanner() {
* Converts a Calcite {@link RelRoot} into a Pinot {@link SubPlan}.
*/
public static SubPlan makePlan(RelRoot relRoot,
@Nullable TransformationTracker.Builder<PlanNode, RelNode> tracker) {
@Nullable TransformationTracker.Builder<PlanNode, RelNode> tracker, boolean useSpools) {
PlanNode rootNode = new RelToPlanNodeConverter(tracker).toPlanNode(relRoot.rel);

PlanFragment rootFragment = planNodeToPlanFragment(rootNode, tracker);
PlanFragment rootFragment = planNodeToPlanFragment(rootNode, tracker, useSpools);
return new SubPlan(rootFragment,
new SubPlanMetadata(RelToPlanNodeConverter.getTableNamesFromRelRoot(relRoot.rel), relRoot.fields), List.of());

Expand Down Expand Up @@ -89,10 +89,16 @@ public static SubPlan makePlan(RelRoot relRoot,
}

private static PlanFragment planNodeToPlanFragment(
PlanNode node, @Nullable TransformationTracker.Builder<PlanNode, RelNode> tracker) {
PlanNode node, @Nullable TransformationTracker.Builder<PlanNode, RelNode> tracker, boolean useSpools) {
PlanFragmenter fragmenter = new PlanFragmenter();
PlanFragmenter.Context fragmenterContext = fragmenter.createContext();
node = node.visit(fragmenter, fragmenterContext);

if (useSpools) {
GroupedStages equivalentStages = EquivalentStagesFinder.findEquivalentStages(node);
EquivalentStagesReplacer.replaceEquivalentStages(node, equivalentStages, fragmenter);
}

Int2ObjectOpenHashMap<PlanFragment> planFragmentMap = fragmenter.getPlanFragmentMap();
Int2ObjectOpenHashMap<IntList> childPlanFragmentIdsMap = fragmenter.getChildPlanFragmentIdsMap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
* 3. Assign current PlanFragment ID to {@link MailboxReceiveNode};
* 4. Increment current PlanFragment ID by one and assign it to the {@link MailboxSendNode}.
*/
public class PlanFragmenter implements PlanNodeVisitor<PlanNode, PlanFragmenter.Context> {
public class PlanFragmenter implements PlanNodeVisitor<PlanNode, PlanFragmenter.Context>,
EquivalentStagesReplacer.OnSubstitution {
private final Int2ObjectOpenHashMap<PlanFragment> _planFragmentMap = new Int2ObjectOpenHashMap<>();
private final Int2ObjectOpenHashMap<IntList> _childPlanFragmentIdsMap = new Int2ObjectOpenHashMap<>();

Expand Down Expand Up @@ -86,6 +87,16 @@ private PlanNode process(PlanNode node, Context context) {
return node;
}

@Override
public void onSubstitution(int receiver, int oldSender, int newSender) {
IntList senders = _childPlanFragmentIdsMap.get(receiver);
senders.rem(oldSender);
if (!senders.contains(newSender)) {
senders.add(newSender);
}
_planFragmentMap.remove(oldSender);
}

@Override
public PlanNode visitAggregate(AggregateNode node, Context context) {
return process(node, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.pinot.query.planner.physical;

import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Set;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.query.planner.plannode.AggregateNode;
import org.apache.pinot.query.planner.plannode.ExchangeNode;
Expand All @@ -37,10 +40,7 @@


public class DispatchablePlanVisitor implements PlanNodeVisitor<Void, DispatchablePlanContext> {
public static final DispatchablePlanVisitor INSTANCE = new DispatchablePlanVisitor();

private DispatchablePlanVisitor() {
}
private final Set<MailboxSendNode> _visited = Collections.newSetFromMap(new IdentityHashMap<>());

private static DispatchablePlanMetadata getOrCreateDispatchablePlanMetadata(PlanNode node,
DispatchablePlanContext context) {
Expand Down Expand Up @@ -104,10 +104,12 @@ public Void visitMailboxReceive(MailboxReceiveNode node, DispatchablePlanContext

@Override
public Void visitMailboxSend(MailboxSendNode node, DispatchablePlanContext context) {
node.getInputs().get(0).visit(this, context);
DispatchablePlanMetadata dispatchablePlanMetadata = getOrCreateDispatchablePlanMetadata(node, context);
dispatchablePlanMetadata.setPrePartitioned(node.isPrePartitioned());
context.getDispatchablePlanStageRootMap().put(node.getStageId(), node);
if (_visited.add(node)) {
node.getInputs().get(0).visit(this, context);
DispatchablePlanMetadata dispatchablePlanMetadata = getOrCreateDispatchablePlanMetadata(node, context);
dispatchablePlanMetadata.setPrePartitioned(node.isPrePartitioned());
context.getDispatchablePlanStageRootMap().put(node.getStageId(), node);
}
return null;
}

Expand Down
Loading
Loading