diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index ae12c0e725f6..8f1003092d76 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -136,14 +136,15 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptions, httpHeaders); boolean inferPartitionHint = _config.getProperty(CommonConstants.Broker.CONFIG_OF_INFER_PARTITION_HINT, CommonConstants.Broker.DEFAULT_INFER_PARTITION_HINT); - //@formatter:off + boolean defaultUseSpool = _config.getProperty(CommonConstants.Broker.CONFIG_OF_SPOOLS, + CommonConstants.Broker.DEFAULT_OF_SPOOLS); QueryEnvironment queryEnvironment = new QueryEnvironment(QueryEnvironment.configBuilder() .database(database) .tableCache(_tableCache) .workerManager(_workerManager) .defaultInferPartitionHint(inferPartitionHint) + .defaultUseSpools(defaultUseSpool) .build()); - //@formatter:on switch (sqlNodeAndOptions.getSqlNode().getKind()) { case EXPLAIN: boolean askServers = QueryOptionsUtils.isExplainAskingServers(queryOptions) diff --git a/pinot-common/src/main/proto/plan.proto b/pinot-common/src/main/proto/plan.proto index 49d357307648..90174c7c4c3b 100644 --- a/pinot-common/src/main/proto/plan.proto +++ b/pinot-common/src/main/proto/plan.proto @@ -144,13 +144,15 @@ message MailboxReceiveNode { } message MailboxSendNode { - int32 receiverStageId = 1; + // kept for backward compatibility. Brokers populate it, but servers should prioritize receiverStageIds + int32 receiverStageId = 1 [deprecated = true]; ExchangeType exchangeType = 2; DistributionType distributionType = 3; repeated int32 keys = 4; bool prePartitioned = 5; repeated Collation collations = 6; bool sort = 7; + repeated int32 receiverStageIds = 8; } message ProjectNode { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java index 629c7ae2c56f..63422f37e521 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -138,7 +138,8 @@ public QueryEnvironment(String database, TableCache tableCache, @Nullable Worker private PlannerContext getPlannerContext(SqlNodeAndOptions sqlNodeAndOptions) { WorkerManager workerManager = getWorkerManager(sqlNodeAndOptions); HepProgram traitProgram = getTraitProgram(workerManager); - return new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram, traitProgram); + return new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram, traitProgram, + sqlNodeAndOptions.getOptions()); } @Nullable @@ -163,14 +164,6 @@ private WorkerManager getWorkerManager(SqlNodeAndOptions sqlNodeAndOptions) { } } - /** - * Returns the planner context that should be used only for parsing queries. - */ - private PlannerContext getParsingPlannerContext() { - HepProgram traitProgram = getTraitProgram(null); - return new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram, traitProgram); - } - /** * Plan a SQL query. * @@ -185,7 +178,6 @@ private PlannerContext getParsingPlannerContext() { */ public QueryPlannerResult planQuery(String sqlQuery, SqlNodeAndOptions sqlNodeAndOptions, long requestId) { try (PlannerContext plannerContext = getPlannerContext(sqlNodeAndOptions)) { - plannerContext.setOptions(sqlNodeAndOptions.getOptions()); RelRoot relRoot = compileQuery(sqlNodeAndOptions.getSqlNode(), plannerContext); // TODO: current code only assume one SubPlan per query, but we should support multiple SubPlans per query. // Each SubPlan should be able to run independently from Broker then set the results into the dependent @@ -209,8 +201,7 @@ public DispatchableSubPlan planQuery(String sqlQuery) { * * Similar to {@link QueryEnvironment#planQuery(String, SqlNodeAndOptions, long)}, this API runs the query * compilation. But it doesn't run the distributed {@link DispatchableSubPlan} generation, instead it only - * returns the - * explained logical plan. + * returns the explained logical plan. * * @param sqlQuery SQL query string. * @param sqlNodeAndOptions parsed SQL query. @@ -221,7 +212,6 @@ public QueryPlannerResult explainQuery(String sqlQuery, SqlNodeAndOptions sqlNod @Nullable AskingServerStageExplainer.OnServerExplainer onServerExplainer) { try (PlannerContext plannerContext = getPlannerContext(sqlNodeAndOptions)) { SqlExplain explain = (SqlExplain) sqlNodeAndOptions.getSqlNode(); - plannerContext.setOptions(sqlNodeAndOptions.getOptions()); RelRoot relRoot = compileQuery(explain.getExplicandum(), plannerContext); if (explain instanceof SqlPhysicalExplain) { // get the physical plan for query. @@ -271,8 +261,9 @@ public String explainQuery(String sqlQuery, long requestId) { } public List getTableNamesForQuery(String sqlQuery) { - try (PlannerContext plannerContext = getParsingPlannerContext()) { - SqlNode sqlNode = CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery).getSqlNode(); + SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery); + try (PlannerContext plannerContext = getPlannerContext(sqlNodeAndOptions)) { + SqlNode sqlNode = sqlNodeAndOptions.getSqlNode(); if (sqlNode.getKind().equals(SqlKind.EXPLAIN)) { sqlNode = ((SqlExplain) sqlNode).getExplicandum(); } @@ -288,8 +279,9 @@ public List getTableNamesForQuery(String sqlQuery) { * Returns whether the query can be successfully compiled in this query environment */ public boolean canCompileQuery(String query) { - try (PlannerContext plannerContext = getParsingPlannerContext()) { - SqlNode sqlNode = CalciteSqlParser.compileToSqlNodeAndOptions(query).getSqlNode(); + SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query); + try (PlannerContext plannerContext = getPlannerContext(sqlNodeAndOptions)) { + SqlNode sqlNode = sqlNodeAndOptions.getSqlNode(); if (sqlNode.getKind().equals(SqlKind.EXPLAIN)) { sqlNode = ((SqlExplain) sqlNode).getExplicandum(); } @@ -400,7 +392,7 @@ private DispatchableSubPlan toDispatchableSubPlan(RelRoot relRoot, PlannerContex private DispatchableSubPlan toDispatchableSubPlan(RelRoot relRoot, PlannerContext plannerContext, long requestId, @Nullable TransformationTracker.Builder tracker) { - SubPlan plan = PinotLogicalQueryPlanner.makePlan(relRoot, tracker); + SubPlan plan = PinotLogicalQueryPlanner.makePlan(relRoot, tracker, useSpools(plannerContext.getOptions())); PinotDispatchPlanner pinotDispatchPlanner = new PinotDispatchPlanner(plannerContext, _envConfig.getWorkerManager(), requestId, _envConfig.getTableCache()); return pinotDispatchPlanner.createDispatchableSubPlan(plan); @@ -465,6 +457,14 @@ public static ImmutableQueryEnvironment.Config.Builder configBuilder() { return ImmutableQueryEnvironment.Config.builder(); } + public boolean useSpools(Map options) { + String optionValue = options.get(CommonConstants.Broker.Request.QueryOptionKey.USE_SPOOLS); + if (optionValue == null) { + return _envConfig.defaultUseSpools(); + } + return Boolean.parseBoolean(optionValue); + } + @Value.Immutable public interface Config { String getDatabase(); @@ -484,6 +484,18 @@ default boolean defaultInferPartitionHint() { return CommonConstants.Broker.DEFAULT_INFER_PARTITION_HINT; } + /** + * Whether to use spools or not. + * + * This is treated as the default value for the broker and it is expected to be obtained from a Pinot configuration. + * This default value can be always overridden at query level by the query option + * {@link CommonConstants.Broker.Request.QueryOptionKey#USE_SPOOLS}. + */ + @Value.Default + default boolean defaultUseSpools() { + return CommonConstants.Broker.DEFAULT_OF_SPOOLS; + } + /** * Returns the worker manager. * diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java index 3164921c785e..4505e16da3d8 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java @@ -47,15 +47,16 @@ public class PlannerContext implements AutoCloseable { private final RelOptPlanner _relOptPlanner; private final LogicalPlanner _relTraitPlanner; - private Map _options; + private final Map _options; public PlannerContext(FrameworkConfig config, Prepare.CatalogReader catalogReader, RelDataTypeFactory typeFactory, - HepProgram optProgram, HepProgram traitProgram) { + HepProgram optProgram, HepProgram traitProgram, Map options) { _planner = new PlannerImpl(config); _validator = new Validator(config.getOperatorTable(), catalogReader, typeFactory); _relOptPlanner = new LogicalPlanner(optProgram, Contexts.EMPTY_CONTEXT, config.getTraitDefs()); _relTraitPlanner = new LogicalPlanner(traitProgram, Contexts.EMPTY_CONTEXT, Collections.singletonList(RelDistributionTraitDef.INSTANCE)); + _options = options; } public PlannerImpl getPlanner() { @@ -74,10 +75,6 @@ public LogicalPlanner getRelTraitPlanner() { return _relTraitPlanner; } - public void setOptions(Map options) { - _options = options; - } - public Map getOptions() { return _options; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java index e7d1c04f50dc..b91783a18637 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java @@ -18,11 +18,14 @@ */ package org.apache.pinot.query.planner.explain; +import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.pinot.query.planner.physical.DispatchablePlanFragment; import org.apache.pinot.query.planner.physical.DispatchableSubPlan; import org.apache.pinot.query.planner.plannode.AggregateNode; @@ -212,14 +215,22 @@ public StringBuilder visitMailboxSend(MailboxSendNode node, Context context) { private StringBuilder appendMailboxSend(MailboxSendNode node, Context context) { appendInfo(node, context); - int receiverStageId = node.getReceiverStageId(); - List receiverMailboxInfos = - _dispatchableSubPlan.getQueryStageList().get(node.getStageId()).getWorkerMetadataList().get(context._workerId) - .getMailboxInfosMap().get(receiverStageId).getMailboxInfos(); + List> perStageDescriptions = new ArrayList<>(); + // This iterator is guaranteed to be sorted by stageId + for (Integer receiverStageId : node.getReceiverStageIds()) { + List receiverMailboxInfos = + _dispatchableSubPlan.getQueryStageList().get(node.getStageId()).getWorkerMetadataList().get(context._workerId) + .getMailboxInfosMap().get(receiverStageId).getMailboxInfos(); + // Sort to ensure print order + Stream stageDescriptions = receiverMailboxInfos.stream() + .sorted(Comparator.comparingInt(MailboxInfo::getPort)) + .map(v -> "[" + receiverStageId + "]@" + v); + perStageDescriptions.add(stageDescriptions); + } context._builder.append("->"); - // Sort to ensure print order - String receivers = receiverMailboxInfos.stream().sorted(Comparator.comparingInt(MailboxInfo::getPort)) - .map(v -> "[" + receiverStageId + "]@" + v).collect(Collectors.joining(",", "{", "}")); + String receivers = perStageDescriptions.stream() + .flatMap(Function.identity()) + .collect(Collectors.joining(",", "{", "}")); return context._builder.append(receivers); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesFinder.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesFinder.java index 55813264ffb0..4ec8f43fb39e 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesFinder.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesFinder.java @@ -52,7 +52,7 @@ public class EquivalentStagesFinder { private EquivalentStagesFinder() { } - public static GroupedStages findEquivalentStages(MailboxSendNode root) { + public static GroupedStages findEquivalentStages(PlanNode root) { Visitor visitor = new Visitor(); root.visit(visitor, null); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesReplacer.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesReplacer.java index 06a4cf16dac3..0ad7d9b4d86f 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesReplacer.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesReplacer.java @@ -38,20 +38,31 @@ public class EquivalentStagesReplacer { private EquivalentStagesReplacer() { } + public static void replaceEquivalentStages(PlanNode root, GroupedStages equivalentStages) { + replaceEquivalentStages(root, equivalentStages, OnSubstitution.NO_OP); + } + /** * Replaces the equivalent stages in the query plan. * * @param root Root plan node * @param equivalentStages Equivalent stages */ - public static void replaceEquivalentStages(PlanNode root, GroupedStages equivalentStages) { - root.visit(Replacer.INSTANCE, equivalentStages); + public static void replaceEquivalentStages(PlanNode root, GroupedStages equivalentStages, OnSubstitution listener) { + root.visit(new Replacer(listener), equivalentStages); + } + + public interface OnSubstitution { + OnSubstitution NO_OP = (receiver, oldSender, newSender) -> { + }; + void onSubstitution(int receiver, int oldSender, int newSender); } private static class Replacer extends PlanNodeVisitor.DepthFirstVisitor { - private static final Replacer INSTANCE = new Replacer(); + private final OnSubstitution _listener; - private Replacer() { + public Replacer(OnSubstitution listener) { + _listener = listener; } @Override @@ -62,6 +73,7 @@ public Void visitMailboxReceive(MailboxReceiveNode node, GroupedStages equivalen // we don't want to visit the children of the node given it is going to be pruned node.setSender(leader); leader.addReceiver(node); + _listener.onSubstitution(node.getStageId(), sender.getStageId(), leader.getStageId()); } else { visitMailboxSend(leader, equivalenceGroups); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java index 8282ea787b31..e08ebd29bd92 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java @@ -55,10 +55,10 @@ private PinotLogicalQueryPlanner() { * Converts a Calcite {@link RelRoot} into a Pinot {@link SubPlan}. */ public static SubPlan makePlan(RelRoot relRoot, - @Nullable TransformationTracker.Builder tracker) { + @Nullable TransformationTracker.Builder tracker, boolean useSpools) { PlanNode rootNode = new RelToPlanNodeConverter(tracker).toPlanNode(relRoot.rel); - PlanFragment rootFragment = planNodeToPlanFragment(rootNode, tracker); + PlanFragment rootFragment = planNodeToPlanFragment(rootNode, tracker, useSpools); return new SubPlan(rootFragment, new SubPlanMetadata(RelToPlanNodeConverter.getTableNamesFromRelRoot(relRoot.rel), relRoot.fields), List.of()); @@ -89,10 +89,16 @@ public static SubPlan makePlan(RelRoot relRoot, } private static PlanFragment planNodeToPlanFragment( - PlanNode node, @Nullable TransformationTracker.Builder tracker) { + PlanNode node, @Nullable TransformationTracker.Builder tracker, boolean useSpools) { PlanFragmenter fragmenter = new PlanFragmenter(); PlanFragmenter.Context fragmenterContext = fragmenter.createContext(); node = node.visit(fragmenter, fragmenterContext); + + if (useSpools) { + GroupedStages equivalentStages = EquivalentStagesFinder.findEquivalentStages(node); + EquivalentStagesReplacer.replaceEquivalentStages(node, equivalentStages, fragmenter); + } + Int2ObjectOpenHashMap planFragmentMap = fragmenter.getPlanFragmentMap(); Int2ObjectOpenHashMap childPlanFragmentIdsMap = fragmenter.getChildPlanFragmentIdsMap(); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java index 420b9d16150b..bbd9a50924a0 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java @@ -56,7 +56,8 @@ * 3. Assign current PlanFragment ID to {@link MailboxReceiveNode}; * 4. Increment current PlanFragment ID by one and assign it to the {@link MailboxSendNode}. */ -public class PlanFragmenter implements PlanNodeVisitor { +public class PlanFragmenter implements PlanNodeVisitor, + EquivalentStagesReplacer.OnSubstitution { private final Int2ObjectOpenHashMap _planFragmentMap = new Int2ObjectOpenHashMap<>(); private final Int2ObjectOpenHashMap _childPlanFragmentIdsMap = new Int2ObjectOpenHashMap<>(); @@ -86,6 +87,16 @@ private PlanNode process(PlanNode node, Context context) { return node; } + @Override + public void onSubstitution(int receiver, int oldSender, int newSender) { + IntList senders = _childPlanFragmentIdsMap.get(receiver); + senders.rem(oldSender); + if (!senders.contains(newSender)) { + senders.add(newSender); + } + _planFragmentMap.remove(oldSender); + } + @Override public PlanNode visitAggregate(AggregateNode node, Context context) { return process(node, context); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java index a6a7040c4e0d..338161da9e7b 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java @@ -18,6 +18,9 @@ */ package org.apache.pinot.query.planner.physical; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Set; import org.apache.pinot.calcite.rel.hint.PinotHintOptions; import org.apache.pinot.query.planner.plannode.AggregateNode; import org.apache.pinot.query.planner.plannode.ExchangeNode; @@ -37,10 +40,7 @@ public class DispatchablePlanVisitor implements PlanNodeVisitor { - public static final DispatchablePlanVisitor INSTANCE = new DispatchablePlanVisitor(); - - private DispatchablePlanVisitor() { - } + private final Set _visited = Collections.newSetFromMap(new IdentityHashMap<>()); private static DispatchablePlanMetadata getOrCreateDispatchablePlanMetadata(PlanNode node, DispatchablePlanContext context) { @@ -104,10 +104,12 @@ public Void visitMailboxReceive(MailboxReceiveNode node, DispatchablePlanContext @Override public Void visitMailboxSend(MailboxSendNode node, DispatchablePlanContext context) { - node.getInputs().get(0).visit(this, context); - DispatchablePlanMetadata dispatchablePlanMetadata = getOrCreateDispatchablePlanMetadata(node, context); - dispatchablePlanMetadata.setPrePartitioned(node.isPrePartitioned()); - context.getDispatchablePlanStageRootMap().put(node.getStageId(), node); + if (_visited.add(node)) { + node.getInputs().get(0).visit(this, context); + DispatchablePlanMetadata dispatchablePlanMetadata = getOrCreateDispatchablePlanMetadata(node, context); + dispatchablePlanMetadata.setPrePartitioned(node.isPrePartitioned()); + context.getDispatchablePlanStageRootMap().put(node.getStageId(), node); + } return null; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java index 75765d341f07..5a6734f23f6a 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java @@ -43,99 +43,102 @@ public Void process(PlanNode node, DispatchablePlanContext context) { if (node instanceof MailboxSendNode) { MailboxSendNode sendNode = (MailboxSendNode) node; int senderStageId = sendNode.getStageId(); - int receiverStageId = sendNode.getReceiverStageId(); - Map metadataMap = context.getDispatchablePlanMetadataMap(); - DispatchablePlanMetadata senderMetadata = metadataMap.get(senderStageId); - DispatchablePlanMetadata receiverMetadata = metadataMap.get(receiverStageId); - Map senderServerMap = senderMetadata.getWorkerIdToServerInstanceMap(); - Map receiverServerMap = receiverMetadata.getWorkerIdToServerInstanceMap(); - Map> senderMailboxesMap = senderMetadata.getWorkerIdToMailboxesMap(); - Map> receiverMailboxesMap = receiverMetadata.getWorkerIdToMailboxesMap(); + for (Integer receiverStageId : sendNode.getReceiverStageIds()) { + Map metadataMap = context.getDispatchablePlanMetadataMap(); + DispatchablePlanMetadata senderMetadata = metadataMap.get(senderStageId); + DispatchablePlanMetadata receiverMetadata = metadataMap.get(receiverStageId); + Map senderServerMap = senderMetadata.getWorkerIdToServerInstanceMap(); + Map receiverServerMap = receiverMetadata.getWorkerIdToServerInstanceMap(); + Map> senderMailboxesMap = senderMetadata.getWorkerIdToMailboxesMap(); + Map> receiverMailboxesMap = receiverMetadata.getWorkerIdToMailboxesMap(); - int numSenders = senderServerMap.size(); - int numReceivers = receiverServerMap.size(); - if (sendNode.getDistributionType() == RelDistribution.Type.SINGLETON) { - // For SINGLETON exchange type, send the data to the same instance (same worker id) - Preconditions.checkState(numSenders == numReceivers, - "Got different number of workers for SINGLETON distribution type, sender: %s, receiver: %s", numSenders, - numReceivers); - for (int workerId = 0; workerId < numSenders; workerId++) { - QueryServerInstance senderServer = senderServerMap.get(workerId); - QueryServerInstance receiverServer = receiverServerMap.get(workerId); - Preconditions.checkState(senderServer.equals(receiverServer), - "Got different server for SINGLETON distribution type for worker id: %s, sender: %s, receiver: %s", - workerId, senderServer, receiverServer); - MailboxInfos mailboxInfos = new SharedMailboxInfos( - new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), - ImmutableList.of(workerId))); - senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(receiverStageId, mailboxInfos); - receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderStageId, mailboxInfos); - } - } else if (senderMetadata.isPrePartitioned() && isDirectExchangeCompatible(senderMetadata, receiverMetadata)) { - // - direct exchange possible: - // 1. send the data to the worker with the same worker id (not necessary the same instance), 1-to-1 mapping - // 2. When partition parallelism is configured, fanout based on partition parallelism from each sender - // workerID to sequentially increment receiver workerIDs - int partitionParallelism = numReceivers / numSenders; - if (partitionParallelism == 1) { - // 1-to-1 mapping + int numSenders = senderServerMap.size(); + int numReceivers = receiverServerMap.size(); + if (sendNode.getDistributionType() == RelDistribution.Type.SINGLETON) { + // For SINGLETON exchange type, send the data to the same instance (same worker id) + Preconditions.checkState(numSenders == numReceivers, + "Got different number of workers for SINGLETON distribution type, sender: %s, receiver: %s", numSenders, + numReceivers); for (int workerId = 0; workerId < numSenders; workerId++) { QueryServerInstance senderServer = senderServerMap.get(workerId); QueryServerInstance receiverServer = receiverServerMap.get(workerId); - List workerIds = ImmutableList.of(workerId); - MailboxInfos senderMailboxInfos; - MailboxInfos receiverMailboxInfos; - if (senderServer.equals(receiverServer)) { - senderMailboxInfos = new SharedMailboxInfos( - new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), workerIds)); - receiverMailboxInfos = senderMailboxInfos; - } else { - senderMailboxInfos = new MailboxInfos( - new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), workerIds)); - receiverMailboxInfos = new MailboxInfos( - new MailboxInfo(receiverServer.getHostname(), receiverServer.getQueryMailboxPort(), workerIds)); + Preconditions.checkState(senderServer.equals(receiverServer), + "Got different server for SINGLETON distribution type for worker id: %s, sender: %s, receiver: %s", + workerId, senderServer, receiverServer); + MailboxInfos mailboxInfos = new SharedMailboxInfos( + new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), + ImmutableList.of(workerId))); + senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(receiverStageId, mailboxInfos); + receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderStageId, mailboxInfos); + } + } else if (senderMetadata.isPrePartitioned() && isDirectExchangeCompatible(senderMetadata, receiverMetadata)) { + // - direct exchange possible: + // 1. send the data to the worker with the same worker id (not necessary the same instance), 1-to-1 mapping + // 2. When partition parallelism is configured, fanout based on partition parallelism from each sender + // workerID to sequentially increment receiver workerIDs + int partitionParallelism = numReceivers / numSenders; + if (partitionParallelism == 1) { + // 1-to-1 mapping + for (int workerId = 0; workerId < numSenders; workerId++) { + QueryServerInstance senderServer = senderServerMap.get(workerId); + QueryServerInstance receiverServer = receiverServerMap.get(workerId); + List workerIds = ImmutableList.of(workerId); + MailboxInfos senderMailboxInfos; + MailboxInfos receiverMailboxInfos; + if (senderServer.equals(receiverServer)) { + senderMailboxInfos = new SharedMailboxInfos( + new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), workerIds)); + receiverMailboxInfos = senderMailboxInfos; + } else { + senderMailboxInfos = new MailboxInfos( + new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), workerIds)); + receiverMailboxInfos = new MailboxInfos( + new MailboxInfo(receiverServer.getHostname(), receiverServer.getQueryMailboxPort(), workerIds)); + } + senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()) + .put(receiverStageId, receiverMailboxInfos); + receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()) + .put(senderStageId, senderMailboxInfos); + } + } else { + // 1-to- mapping + int receiverWorkerId = 0; + for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) { + QueryServerInstance senderServer = senderServerMap.get(senderWorkerId); + QueryServerInstance receiverServer = receiverServerMap.get(receiverWorkerId); + List receiverWorkerIds = new ArrayList<>(partitionParallelism); + senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>()).put(receiverStageId, + new MailboxInfos(new MailboxInfo(receiverServer.getHostname(), receiverServer.getQueryMailboxPort(), + receiverWorkerIds))); + MailboxInfos senderMailboxInfos = new SharedMailboxInfos( + new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), + ImmutableList.of(senderWorkerId))); + for (int i = 0; i < partitionParallelism; i++) { + receiverWorkerIds.add(receiverWorkerId); + receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>()) + .put(senderStageId, senderMailboxInfos); + receiverWorkerId++; + } } - senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()) - .put(receiverStageId, receiverMailboxInfos); - receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderStageId, senderMailboxInfos); } } else { - // 1-to- mapping - int receiverWorkerId = 0; + // For other exchange types, send the data to all the instances in the receiver fragment + // TODO: Add support for more exchange types + List receiverMailboxInfoList = getMailboxInfos(receiverServerMap); + MailboxInfos receiverMailboxInfos = numSenders > 1 ? new SharedMailboxInfos(receiverMailboxInfoList) + : new MailboxInfos(receiverMailboxInfoList); for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) { - QueryServerInstance senderServer = senderServerMap.get(senderWorkerId); - QueryServerInstance receiverServer = receiverServerMap.get(receiverWorkerId); - List receiverWorkerIds = new ArrayList<>(partitionParallelism); - senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>()).put(receiverStageId, - new MailboxInfos(new MailboxInfo(receiverServer.getHostname(), receiverServer.getQueryMailboxPort(), - receiverWorkerIds))); - MailboxInfos senderMailboxInfos = new SharedMailboxInfos( - new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), - ImmutableList.of(senderWorkerId))); - for (int i = 0; i < partitionParallelism; i++) { - receiverWorkerIds.add(receiverWorkerId); - receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>()) - .put(senderStageId, senderMailboxInfos); - receiverWorkerId++; - } + senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>()) + .put(receiverStageId, receiverMailboxInfos); + } + List senderMailboxInfoList = getMailboxInfos(senderServerMap); + MailboxInfos senderMailboxInfos = + numReceivers > 1 ? new SharedMailboxInfos(senderMailboxInfoList) + : new MailboxInfos(senderMailboxInfoList); + for (int receiverWorkerId = 0; receiverWorkerId < numReceivers; receiverWorkerId++) { + receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>()) + .put(senderStageId, senderMailboxInfos); } - } - } else { - // For other exchange types, send the data to all the instances in the receiver fragment - // TODO: Add support for more exchange types - List receiverMailboxInfoList = getMailboxInfos(receiverServerMap); - MailboxInfos receiverMailboxInfos = numSenders > 1 ? new SharedMailboxInfos(receiverMailboxInfoList) - : new MailboxInfos(receiverMailboxInfoList); - for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) { - senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>()) - .put(receiverStageId, receiverMailboxInfos); - } - List senderMailboxInfoList = getMailboxInfos(senderServerMap); - MailboxInfos senderMailboxInfos = - numReceivers > 1 ? new SharedMailboxInfos(senderMailboxInfoList) : new MailboxInfos(senderMailboxInfoList); - for (int receiverWorkerId = 0; receiverWorkerId < numReceivers; receiverWorkerId++) { - receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>()) - .put(senderStageId, senderMailboxInfos); } } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java index 5c9dabb225be..0828aa49ffe5 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java @@ -59,7 +59,7 @@ public DispatchableSubPlan createDispatchableSubPlan(SubPlan subPlan) { PlanFragment rootFragment = subPlan.getSubPlanRoot(); PlanNode rootNode = rootFragment.getFragmentRoot(); // 1. start by visiting the sub plan fragment root. - rootNode.visit(DispatchablePlanVisitor.INSTANCE, context); + rootNode.visit(new DispatchablePlanVisitor(), context); // 2. add a special stage for the global mailbox receive, this runs on the dispatcher. context.getDispatchablePlanStageRootMap().put(0, rootNode); // 3. add worker assignment after the dispatchable plan context is fulfilled after the visit. diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java index 71546d1fe822..a0e94561e3c0 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java @@ -209,22 +209,33 @@ public Set visitMailboxSend(MailboxSendNode node, GreedyShuffleRe boolean canSkipShuffleBasic = colocationKeyCondition(oldColocationKeys, distributionKeys); // If receiver is not a join-stage, then we can determine distribution type now. - if (!context.isJoinStage(node.getReceiverStageId())) { - Set colocationKeys; - if (canSkipShuffleBasic && areServersSuperset(node.getReceiverStageId(), node.getStageId())) { - // Servers are not re-assigned on sender-side. If needed, they are re-assigned on the receiver side. + boolean sendsToJoin = false; + boolean allAreSuperSet = true; + for (Integer receiverStageId : node.getReceiverStageIds()) { + if (context.isJoinStage(receiverStageId)) { + sendsToJoin = true; + break; + } + if (!(canSkipShuffleBasic && areServersSuperset(receiverStageId, node.getStageId()))) { + allAreSuperSet = false; + break; + } + } + if (!sendsToJoin) { + if (allAreSuperSet) { node.setDistributionType(RelDistribution.Type.SINGLETON); - colocationKeys = oldColocationKeys; + return oldColocationKeys; } else { - colocationKeys = new HashSet<>(); + Set colocationKeys = new HashSet<>(); + context.setColocationKeys(node.getStageId(), colocationKeys); + return colocationKeys; } - context.setColocationKeys(node.getStageId(), colocationKeys); - return colocationKeys; + } else { + // If receiver is a join-stage, remember partition-keys of the child node of MailboxSendNode. + Set mailboxSendColocationKeys = canSkipShuffleBasic ? oldColocationKeys : new HashSet<>(); + context.setColocationKeys(node.getStageId(), mailboxSendColocationKeys); + return mailboxSendColocationKeys; } - // If receiver is a join-stage, remember partition-keys of the child node of MailboxSendNode. - Set mailboxSendColocationKeys = canSkipShuffleBasic ? oldColocationKeys : new HashSet<>(); - context.setColocationKeys(node.getStageId(), mailboxSendColocationKeys); - return mailboxSendColocationKeys; } @Override diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java index 9cc2c2e65792..c40fa50b0005 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java @@ -54,6 +54,14 @@ private MailboxSendNode(int stageId, DataSchema dataSchema, List input _sort = sort; } + public MailboxSendNode(int stageId, DataSchema dataSchema, List inputs, + @Nullable List receiverStages, PinotRelExchangeType exchangeType, + RelDistribution.Type distributionType, @Nullable List keys, boolean prePartitioned, + @Nullable List collations, boolean sort) { + this(stageId, dataSchema, inputs, toBitSet(receiverStages), exchangeType, + distributionType, keys, prePartitioned, collations, sort); + } + public MailboxSendNode(int stageId, DataSchema dataSchema, List inputs, int receiverStage, PinotRelExchangeType exchangeType, RelDistribution.Type distributionType, @Nullable List keys, boolean prePartitioned, @@ -111,6 +119,13 @@ public Integer next() { }; } + /** + * returns true if this node sends to multiple receivers + */ + public boolean isMultiSend() { + return _receiverStages.cardinality() > 1; + } + @Deprecated public int getReceiverStageId() { Preconditions.checkState(!_receiverStages.isEmpty(), "Receivers not set"); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java index abd474ebce3e..9cf9bff06a45 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java @@ -117,8 +117,18 @@ private static MailboxReceiveNode deserializeMailboxReceiveNode(Plan.PlanNode pr private static MailboxSendNode deserializeMailboxSendNode(Plan.PlanNode protoNode) { Plan.MailboxSendNode protoMailboxSendNode = protoNode.getMailboxSendNode(); + + List receiverIds; + List protoReceiverIds = protoMailboxSendNode.getReceiverStageIdsList(); + if (protoReceiverIds == null || protoReceiverIds.isEmpty()) { + // This should only happen if a not updated broker sends the request + receiverIds = List.of(protoMailboxSendNode.getReceiverStageId()); + } else { + receiverIds = protoReceiverIds; + } + return new MailboxSendNode(protoNode.getStageId(), extractDataSchema(protoNode), extractInputs(protoNode), - protoMailboxSendNode.getReceiverStageId(), convertExchangeType(protoMailboxSendNode.getExchangeType()), + receiverIds, convertExchangeType(protoMailboxSendNode.getExchangeType()), convertDistributionType(protoMailboxSendNode.getDistributionType()), protoMailboxSendNode.getKeysList(), protoMailboxSendNode.getPrePartitioned(), convertCollations(protoMailboxSendNode.getCollationsList()), protoMailboxSendNode.getSort()); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java index 65ccb13b2cae..acea4d065572 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java @@ -142,8 +142,16 @@ public Void visitMailboxReceive(MailboxReceiveNode node, Plan.PlanNode.Builder b @Override public Void visitMailboxSend(MailboxSendNode node, Plan.PlanNode.Builder builder) { - Plan.MailboxSendNode mailboxSendNode = Plan.MailboxSendNode.newBuilder() - .setReceiverStageId(node.getReceiverStageId()) + List receiverStageIds = new ArrayList<>(); + for (Integer receiverStageId : node.getReceiverStageIds()) { + receiverStageIds.add(receiverStageId); + } + assert !receiverStageIds.isEmpty() : "Receiver stage IDs should not be empty"; + + Plan.MailboxSendNode mailboxSendNode = + Plan.MailboxSendNode.newBuilder() + .setReceiverStageId(receiverStageIds.get(0)) // to keep backward compatibility + .addAllReceiverStageIds(receiverStageIds) .setExchangeType(convertExchangeType(node.getExchangeType())) .setDistributionType(convertDistributionType(node.getDistributionType())) .addAllKeys(node.getKeys()) diff --git a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json index 31db5ee99b2b..db28d08439fa 100644 --- a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json +++ b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json @@ -501,6 +501,98 @@ " └── [3]@localhost:1|[1] PROJECT\n", " └── [3]@localhost:1|[1] TABLE SCAN (b) null\n" ] + }, + { + "description": "explain plan with simple spool", + "sql": "SET useSpools=true; EXPLAIN IMPLEMENTATION PLAN FOR SELECT 1 FROM a as a1 JOIN b ON a1.col1 = b.col1 JOIN a as a2 ON a2.col1 = b.col1", + "output": [ + "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n", + "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n", + "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n", + " └── [1]@localhost:2|[0] PROJECT\n", + " └── [1]@localhost:2|[0] JOIN\n", + " ├── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n", + " │ ├── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n", + " │ └── [2]@localhost:2|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n", + " │ └── [2]@localhost:2|[0] PROJECT\n", + " │ └── [2]@localhost:2|[0] JOIN\n", + " │ ├── [2]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n", + " │ │ ├── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0],[2]@localhost:1|[1],[2]@localhost:2|[0]} (Subtree Omitted)\n", + " │ │ └── [3]@localhost:2|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0],[2]@localhost:1|[1],[2]@localhost:2|[0]}\n", + " │ │ └── [3]@localhost:2|[0] PROJECT\n", + " │ │ └── [3]@localhost:2|[0] TABLE SCAN (a) null\n", + " │ └── [2]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n", + " │ └── [4]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[1],[2]@localhost:2|[0]}\n", + " │ └── [4]@localhost:1|[0] PROJECT\n", + " │ └── [4]@localhost:1|[0] TABLE SCAN (b) null\n", + " └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n", + " ├── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0],[2]@localhost:1|[1],[2]@localhost:2|[0]} (Subtree Omitted)\n", + " └── [3]@localhost:2|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0],[2]@localhost:1|[1],[2]@localhost:2|[0]}\n", + " └── [3]@localhost:2|[0] PROJECT\n", + " └── [3]@localhost:2|[0] TABLE SCAN (a) null\n" + ] + }, + { + "description": "explain plan with spool on CTE", + "sql": "SET useSpools=true; EXPLAIN IMPLEMENTATION PLAN FOR WITH mySpool AS (select * from a) SELECT 1 FROM mySpool as a1 JOIN b ON a1.col1 = b.col1 JOIN mySpool as a2 ON a2.col1 = b.col1", + "output": [ + "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n", + "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n", + "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n", + " └── [1]@localhost:2|[0] PROJECT\n", + " └── [1]@localhost:2|[0] JOIN\n", + " ├── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n", + " │ ├── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n", + " │ └── [2]@localhost:2|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n", + " │ └── [2]@localhost:2|[0] PROJECT\n", + " │ └── [2]@localhost:2|[0] JOIN\n", + " │ ├── [2]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n", + " │ │ ├── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0],[2]@localhost:1|[1],[2]@localhost:2|[0]} (Subtree Omitted)\n", + " │ │ └── [3]@localhost:2|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0],[2]@localhost:1|[1],[2]@localhost:2|[0]}\n", + " │ │ └── [3]@localhost:2|[0] PROJECT\n", + " │ │ └── [3]@localhost:2|[0] TABLE SCAN (a) null\n", + " │ └── [2]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n", + " │ └── [4]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[1],[2]@localhost:2|[0]}\n", + " │ └── [4]@localhost:1|[0] PROJECT\n", + " │ └── [4]@localhost:1|[0] TABLE SCAN (b) null\n", + " └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n", + " ├── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0],[2]@localhost:1|[1],[2]@localhost:2|[0]} (Subtree Omitted)\n", + " └── [3]@localhost:2|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0],[2]@localhost:1|[1],[2]@localhost:2|[0]}\n", + " └── [3]@localhost:2|[0] PROJECT\n", + " └── [3]@localhost:2|[0] TABLE SCAN (a) null\n" + ] + }, + + { + "description": "explain plan with spool on CTE with extra filters", + "sql": "SET useSpools=true; EXPLAIN IMPLEMENTATION PLAN FOR WITH mySpool AS (select * from a) SELECT 1 FROM mySpool as a1 JOIN b ON a1.col1 = b.col1 JOIN mySpool as a2 ON a2.col1 = b.col1 where a2.col2 > 0", + "output": [ + "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n", + "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n", + "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n", + " └── [1]@localhost:2|[0] PROJECT\n", + " └── [1]@localhost:2|[0] JOIN\n", + " ├── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n", + " │ ├── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n", + " │ └── [2]@localhost:2|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n", + " │ └── [2]@localhost:2|[0] PROJECT\n", + " │ └── [2]@localhost:2|[0] JOIN\n", + " │ ├── [2]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n", + " │ │ ├── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[1],[2]@localhost:2|[0]} (Subtree Omitted)\n", + " │ │ └── [3]@localhost:2|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[1],[2]@localhost:2|[0]}\n", + " │ │ └── [3]@localhost:2|[0] PROJECT\n", + " │ │ └── [3]@localhost:2|[0] TABLE SCAN (a) null\n", + " │ └── [2]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n", + " │ └── [4]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[2]@localhost:1|[1],[2]@localhost:2|[0]}\n", + " │ └── [4]@localhost:1|[0] PROJECT\n", + " │ └── [4]@localhost:1|[0] TABLE SCAN (b) null\n", + " └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n", + " ├── [5]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n", + " └── [5]@localhost:2|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n", + " └── [5]@localhost:2|[0] PROJECT\n", + " └── [5]@localhost:2|[0] FILTER\n", + " └── [5]@localhost:2|[0] TABLE SCAN (a) null\n" + ] } ] } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java index b21d3a7f4a59..3926d7cdbbdf 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java @@ -66,6 +66,8 @@ public GrpcSendingMailbox(String id, ChannelManager channelManager, String hostn public void send(TransferableBlock block) throws IOException { if (isTerminated() || (isEarlyTerminated() && !block.isEndOfStreamBlock())) { + LOGGER.debug("==[GRPC SEND]== terminated or early terminated mailbox. Skipping sending message {} to: {}", + block, _id); return; } if (LOGGER.isDebugEnabled()) { @@ -124,7 +126,8 @@ public boolean isTerminated() { private StreamObserver getContentObserver() { return PinotMailboxGrpc.newStub(_channelManager.getChannel(_hostname, _port)) - .withDeadlineAfter(_deadlineMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS).open(_statusObserver); + .withDeadlineAfter(_deadlineMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS) + .open(_statusObserver); } private MailboxContent toMailboxContent(TransferableBlock block) @@ -147,4 +150,9 @@ private MailboxContent toMailboxContent(TransferableBlock block) _statMap.merge(MailboxSendOperator.StatKey.SERIALIZATION_TIME_MS, System.currentTimeMillis() - start); } } + + @Override + public String toString() { + return "g" + _id; + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java index 8adf8db073b3..5fb21c96c4a0 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java @@ -106,4 +106,9 @@ public boolean isEarlyTerminated() { public boolean isTerminated() { return _isTerminated; } + + @Override + public String toString() { + return "m" + _id; + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index 0ca99b06ccd2..2af5b9d0baa5 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -216,12 +216,16 @@ public void processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan, Map int stageId = stageMetadata.getStageId(); LOGGER.error("Error executing pipeline breaker for request: {}, stage: {}, sending error block: {}", requestId, stageId, errorBlock.getExceptions()); - int receiverStageId = ((MailboxSendNode) stagePlan.getRootNode()).getReceiverStageId(); - List receiverMailboxInfos = - workerMetadata.getMailboxInfosMap().get(receiverStageId).getMailboxInfos(); - List routingInfos = - MailboxIdUtils.toRoutingInfos(requestId, stageId, workerMetadata.getWorkerId(), receiverStageId, - receiverMailboxInfos); + MailboxSendNode rootNode = (MailboxSendNode) stagePlan.getRootNode(); + List routingInfos = new ArrayList<>(); + for (Integer receiverStageId : rootNode.getReceiverStageIds()) { + List receiverMailboxInfos = + workerMetadata.getMailboxInfosMap().get(receiverStageId).getMailboxInfos(); + List stageRoutingInfos = + MailboxIdUtils.toRoutingInfos(requestId, stageId, workerMetadata.getWorkerId(), receiverStageId, + receiverMailboxInfos); + routingInfos.addAll(stageRoutingInfos); + } for (RoutingInfo routingInfo : routingInfos) { try { StatMap statMap = new StatMap<>(MailboxSendOperator.StatKey.class); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BlockSplitter.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BlockSplitter.java index 92c0dfef54df..096003d444f8 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BlockSplitter.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BlockSplitter.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.query.runtime.blocks; +import com.google.common.collect.Iterators; import java.util.Iterator; import org.apache.pinot.common.datablock.BaseDataBlock; @@ -28,6 +29,7 @@ * underlying transport. */ public interface BlockSplitter { + BlockSplitter NO_OP = (block, type, maxBlockSize) -> Iterators.singletonIterator(block); /** * @return a list of blocks that was split from the original {@code block} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java index 28cebdbcd32a..1540cbfb0786 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java @@ -207,13 +207,17 @@ protected TransferableBlock getNextBlock() buildBroadcastHashTable(); } if (_upstreamErrorBlock != null) { + LOGGER.trace("Returning upstream error block for join operator"); return _upstreamErrorBlock; } - return buildJoinedDataBlock(); + TransferableBlock transferableBlock = buildJoinedDataBlock(); + LOGGER.trace("Returning {} for join operator", transferableBlock); + return transferableBlock; } private void buildBroadcastHashTable() throws ProcessingException { + LOGGER.trace("Building hash table for join operator"); long startTime = System.currentTimeMillis(); int numRowsInHashTable = 0; TransferableBlock rightBlock = _rightInput.nextBlock(); @@ -255,10 +259,12 @@ private void buildBroadcastHashTable() assert _rightSideStats != null; } _statMap.merge(StatKey.TIME_BUILDING_HASH_TABLE_MS, System.currentTimeMillis() - startTime); + LOGGER.trace("Finished building hash table for join operator"); } private TransferableBlock buildJoinedDataBlock() throws ProcessingException { + LOGGER.trace("Building joined data block for join operator"); // Keep reading the input blocks until we find a match row or all blocks are processed. // TODO: Consider batching the rows to improve performance. while (true) { @@ -269,7 +275,7 @@ private TransferableBlock buildJoinedDataBlock() assert _leftSideStats != null; return TransferableBlockUtils.getEndOfStreamTransferableBlock(_leftSideStats); } - + LOGGER.trace("Processing next block on left input"); TransferableBlock leftBlock = _leftInput.nextBlock(); if (leftBlock.isErrorBlock()) { return leftBlock; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java index 864f200fe6e5..5977e216ed9a 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; import java.util.List; @@ -35,9 +36,11 @@ import org.apache.pinot.query.planner.plannode.MailboxSendNode; import org.apache.pinot.query.routing.MailboxInfo; import org.apache.pinot.query.routing.RoutingInfo; +import org.apache.pinot.query.runtime.blocks.BlockSplitter; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.exchange.BlockExchange; +import org.apache.pinot.query.runtime.operator.exchange.BufferedExchange; import org.apache.pinot.query.runtime.plan.MultiStageQueryStats; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.apache.pinot.spi.exception.QueryCancelledException; @@ -64,9 +67,7 @@ public class MailboxSendOperator extends MultiStageOperator { // TODO: Support sort on sender public MailboxSendOperator(OpChainExecutionContext context, MultiStageOperator input, MailboxSendNode node) { - this(context, input, - statMap -> getBlockExchange(context, node.getReceiverStageId(), node.getDistributionType(), node.getKeys(), - statMap)); + this(context, input, statMap -> getBlockExchange(context, node, statMap)); _statMap.merge(StatKey.STAGE, context.getStageId()); _statMap.merge(StatKey.PARALLELISM, 1); } @@ -79,8 +80,48 @@ public MailboxSendOperator(OpChainExecutionContext context, MultiStageOperator i _exchange = exchangeFactory.apply(_statMap); } + /** + * Creates a {@link BlockExchange} for the given {@link MailboxSendNode}. + * + * In normal cases, where the sender sends data to a single receiver stage, this method just delegates on + * {@link #getBlockExchange(OpChainExecutionContext, int, RelDistribution.Type, List, StatMap, BlockSplitter)}. + * + * In case of a multi-sender node, this method creates a two steps exchange: + *
    + *
  1. One inner exchange is created for each receiver stage, using the method mentioned above and keeping the + * distribution type specified in the {@link MailboxSendNode}.
  2. + *
  3. Then, a single outer broadcast exchange is created to fan out the data to all the inner exchanges.
  4. + *
+ * + * @see BlockExchange#asSendingMailbox(String) + */ + private static BlockExchange getBlockExchange(OpChainExecutionContext ctx, MailboxSendNode node, + StatMap statMap) { + BlockSplitter mainSplitter = TransferableBlockUtils::splitBlock; + if (!node.isMultiSend()) { + // it is guaranteed that there is exactly one receiver stage + int receiverStageId = node.getReceiverStageIds().iterator().next(); + return getBlockExchange(ctx, receiverStageId, node.getDistributionType(), node.getKeys(), statMap, mainSplitter); + } + List perStageSendingMailboxes = new ArrayList<>(); + // The inner splitter is a NO_OP because the outer splitter will take care of splitting the blocks + BlockSplitter innerSplitter = BlockSplitter.NO_OP; + for (int receiverStageId : node.getReceiverStageIds()) { + BlockExchange blockExchange = + getBlockExchange(ctx, receiverStageId, node.getDistributionType(), node.getKeys(), statMap, innerSplitter); + perStageSendingMailboxes.add(blockExchange.asSendingMailbox(Integer.toString(receiverStageId))); + } + return new BufferedExchange(perStageSendingMailboxes, mainSplitter, ctx.getRequestId(), ctx.getStageId(), + ctx.getWorkerId()); + } + + /** + * Creates a {@link BlockExchange} that sends data to the given receiver stage. + * + * In case of a multi-sender node, this method will be called for each receiver stage. + */ private static BlockExchange getBlockExchange(OpChainExecutionContext context, int receiverStageId, - RelDistribution.Type distributionType, List keys, StatMap statMap) { + RelDistribution.Type distributionType, List keys, StatMap statMap, BlockSplitter splitter) { Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(distributionType), "Unsupported distribution type: %s", distributionType); MailboxService mailboxService = context.getMailboxService(); @@ -90,13 +131,13 @@ private static BlockExchange getBlockExchange(OpChainExecutionContext context, i List mailboxInfos = context.getWorkerMetadata().getMailboxInfosMap().get(receiverStageId).getMailboxInfos(); List routingInfos = - MailboxIdUtils.toRoutingInfos(requestId, context.getStageId(), context.getWorkerId(), receiverStageId, - mailboxInfos); + MailboxIdUtils.toRoutingInfos(requestId, context.getStageId(), context.getWorkerId(), receiverStageId, + mailboxInfos); List sendingMailboxes = routingInfos.stream() .map(v -> mailboxService.getSendingMailbox(v.getHostname(), v.getPort(), v.getMailboxId(), deadlineMs, statMap)) .collect(Collectors.toList()); statMap.merge(StatKey.FAN_OUT, sendingMailboxes.size()); - return BlockExchange.getExchange(sendingMailboxes, distributionType, keys, TransferableBlockUtils::splitBlock); + return BlockExchange.getExchange(sendingMailboxes, distributionType, keys, splitter); } @Override diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java index 79c7aeeadd34..f10699e820c0 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java @@ -18,22 +18,29 @@ */ package org.apache.pinot.query.runtime.operator.exchange; +import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeoutException; import org.apache.calcite.rel.RelDistribution; import org.apache.pinot.common.datablock.DataBlock; +import org.apache.pinot.query.mailbox.ReceivingMailbox; import org.apache.pinot.query.mailbox.SendingMailbox; import org.apache.pinot.query.planner.partitioning.KeySelectorFactory; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; import org.apache.pinot.query.runtime.blocks.BlockSplitter; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class contains the shared logic across all different exchange types for exchanging data across servers. */ public abstract class BlockExchange { + private static final Logger LOGGER = LoggerFactory.getLogger(BlockExchange.class); // TODO: Deduct this value via grpc config maximum byte size; and make it configurable with override. // TODO: Max block size is a soft limit. only counts fixedSize datatable byte buffer private static final int MAX_MAILBOX_CONTENT_SIZE_BYTES = 4 * 1024 * 1024; @@ -69,10 +76,11 @@ protected BlockExchange(List sendingMailboxes, BlockSplitter spl * API to send a block to the destination mailboxes. * @param block the block to be transferred * @return true if all the mailboxes has been early terminated. - * @throws Exception when sending stream unexpectedly closed. + * @throws IOException when sending stream unexpectedly closed. + * @throws TimeoutException when sending stream timeout. */ public boolean send(TransferableBlock block) - throws Exception { + throws IOException, TimeoutException { if (block.isErrorBlock()) { // Send error block to all mailboxes to propagate the error for (SendingMailbox sendingMailbox : _sendingMailboxes) { @@ -84,8 +92,19 @@ public boolean send(TransferableBlock block) if (block.isSuccessfulEndOfStreamBlock()) { // Send metadata to only one randomly picked mailbox, and empty EOS block to other mailboxes int numMailboxes = _sendingMailboxes.size(); - int mailboxIdToSendMetadata = ThreadLocalRandom.current().nextInt(numMailboxes); - assert block.getQueryStats() != null; + int mailboxIdToSendMetadata; + if (block.getQueryStats() != null) { + mailboxIdToSendMetadata = ThreadLocalRandom.current().nextInt(numMailboxes); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Sending EOS metadata. Only mailbox #{} will get stats", mailboxIdToSendMetadata); + } + } else { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Sending EOS metadata. No stat will be sent"); + } + // this may happen when the block exchange is itself used as a sending mailbox, like when using spools + mailboxIdToSendMetadata = -1; + } for (int i = 0; i < numMailboxes; i++) { SendingMailbox sendingMailbox = _sendingMailboxes.get(i); TransferableBlock blockToSend = @@ -110,10 +129,16 @@ public boolean send(TransferableBlock block) } protected void sendBlock(SendingMailbox sendingMailbox, TransferableBlock block) - throws Exception { + throws IOException, TimeoutException { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Sending block: {} {} to {}", block.getType(), System.identityHashCode(block), sendingMailbox); + } if (block.isEndOfStreamBlock()) { sendingMailbox.send(block); sendingMailbox.complete(); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Block sent: {} {} to {}", block.getType(), System.identityHashCode(block), sendingMailbox); + } return; } @@ -122,10 +147,13 @@ protected void sendBlock(SendingMailbox sendingMailbox, TransferableBlock block) while (splits.hasNext()) { sendingMailbox.send(splits.next()); } + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Block sent: {} {} to {}", block.getType(), System.identityHashCode(block), sendingMailbox); + } } protected abstract void route(List destinations, TransferableBlock block) - throws Exception; + throws IOException, TimeoutException; // Called when the OpChain gracefully returns. // TODO: This is a no-op right now. @@ -137,4 +165,66 @@ public void cancel(Throwable t) { sendingMailbox.cancel(t); } } + + public SendingMailbox asSendingMailbox(String id) { + return new BlockExchangeSendingMailbox(id); + } + + /** + * A mailbox that sends data blocks to a {@link org.apache.pinot.query.runtime.operator.exchange.BlockExchange}. + * + * BlockExchanges send data to a list of {@link SendingMailbox}es, which are responsible for sending the data + * to the corresponding {@link ReceivingMailbox}es. This class applies the decorator pattern to expose a BlockExchange + * as a SendingMailbox, open the possibility of having a BlockExchange as a destination for another BlockExchange. + * + * This is useful for example when a send operator has to send data to more than one stage. We need to broadcast the + * data to all the stages (the first BlockExchange). Then for each stage, we need to send the data to the + * corresponding workers (the inner BlockExchange). The inner BlockExchange may send data using a different + * distribution strategy. + * + * @see MailboxSendNode#isMultiSend()} + */ + private class BlockExchangeSendingMailbox implements SendingMailbox { + private final String _id; + private boolean _earlyTerminated = false; + private boolean _completed = false; + + public BlockExchangeSendingMailbox(String id) { + _id = id; + } + + @Override + public void send(TransferableBlock block) + throws IOException, TimeoutException { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Exchange mailbox {} echoing {} {}", this, block.getType(), System.identityHashCode(block)); + } + _earlyTerminated = BlockExchange.this.send(block); + } + + @Override + public void complete() { + _completed = true; + } + + @Override + public void cancel(Throwable t) { + BlockExchange.this.cancel(t); + } + + @Override + public boolean isTerminated() { + return _completed; + } + + @Override + public boolean isEarlyTerminated() { + return _earlyTerminated; + } + + @Override + public String toString() { + return "e" + _id; + } + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java index 4129606dabe4..e7b47be9170f 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java @@ -18,7 +18,9 @@ */ package org.apache.pinot.query.runtime.operator.exchange; +import java.io.IOException; import java.util.List; +import java.util.concurrent.TimeoutException; import org.apache.pinot.query.mailbox.SendingMailbox; import org.apache.pinot.query.runtime.blocks.BlockSplitter; import org.apache.pinot.query.runtime.blocks.TransferableBlock; @@ -35,7 +37,7 @@ protected BroadcastExchange(List sendingMailboxes, BlockSplitter @Override protected void route(List destinations, TransferableBlock block) - throws Exception { + throws IOException, TimeoutException { for (SendingMailbox mailbox : destinations) { sendBlock(mailbox, block); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BufferedExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BufferedExchange.java new file mode 100644 index 000000000000..9e798ddeaced --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BufferedExchange.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.operator.exchange; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; +import org.apache.pinot.common.utils.NamedThreadFactory; +import org.apache.pinot.query.mailbox.SendingMailbox; +import org.apache.pinot.query.runtime.blocks.BlockSplitter; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * An exchange that buffers the blocks in a queue before sending them to the destination. + * + * This is useful when one of the destinations is slow and we want to buffer the blocks for that destination without + * affecting the other destinations. + * + * The current implementation buffers the blocks on heap, but it may be changed in future versions if needed. + */ +public class BufferedExchange extends BlockExchange { + private static final Logger LOGGER = LoggerFactory.getLogger(BufferedExchange.class); + + private static final Throwable CORRECT_MARKER = new Exception("Fake exception marking correct completion"); + private final BlockingQueue[] _queues; + @Nullable + private volatile Throwable _throwable = null; + + public BufferedExchange(List sendingMailboxes, BlockSplitter splitter, long requestId, int stageId, + int workerId) { + super(sendingMailboxes, splitter); + ThreadFactory threadFactory = new NamedThreadFactory( + "BufferedExchange-req-" + requestId + "-s" + stageId + "-w" + workerId); + _queues = new BlockingQueue[sendingMailboxes.size()]; + for (int i = 0; i < sendingMailboxes.size(); i++) { + final SendingMailbox sendingMailbox = sendingMailboxes.get(i); + final BlockingQueue buffer = new ArrayBlockingQueue<>(1000); + final int index = i; + _queues[i] = buffer; + Runnable runnable = () -> { + while (_throwable == null) { + try { + LOGGER.trace("Reading from {}", index); + TransferableBlock block = buffer.take(); + LOGGER.trace("Read from {}. Starting to send", index); + sendingMailbox.send(block); + LOGGER.trace("Send to {} finished", index); + } catch (InterruptedException | IOException | TimeoutException e) { + sendingMailbox.cancel(e); + return; + } + } + if (_throwable != CORRECT_MARKER) { + LOGGER.trace("Exceptional post completion for {}", index); + sendingMailbox.cancel(_throwable); + } else { + LOGGER.trace("Normal post completion for {}", index); + while (!buffer.isEmpty()) { + TransferableBlock poll = buffer.poll(); + LOGGER.trace("Post completion read from {}. Starting to send", index); + try { + sendingMailbox.send(poll); + LOGGER.trace("Post completion send to {} finished", index); + } catch (IOException | TimeoutException e) { + LOGGER.info("Failed to send {} block after successful completion", poll.getType(), e); + return; + } + } + } + }; + threadFactory.newThread(runnable).start(); + } + } + + @Override + protected void route(List destinations, TransferableBlock block) + throws IOException, TimeoutException { + LOGGER.trace("Routing block: {} {} to {}", block, System.identityHashCode(block), destinations); + for (int i = 0; i < _queues.length; i++) { + if (!_queues[i].offer(block)) { + cancel(new RuntimeException("Failed to offer block to queue " + i)); + } + } + } + + @Override + public void close() { + super.close(); + LOGGER.trace("Closing BufferedExchange"); + _throwable = CORRECT_MARKER; + } + + @Override + public void cancel(Throwable t) { + LOGGER.trace("Cancelling BufferedExchange", t); + _throwable = t; + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java index 3b3eeb1d03d4..722f188d01e4 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java @@ -18,8 +18,10 @@ */ package org.apache.pinot.query.runtime.operator.exchange; +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeoutException; import org.apache.pinot.query.mailbox.SendingMailbox; import org.apache.pinot.query.planner.partitioning.EmptyKeySelector; import org.apache.pinot.query.planner.partitioning.KeySelector; @@ -42,7 +44,7 @@ class HashExchange extends BlockExchange { @Override protected void route(List destinations, TransferableBlock block) - throws Exception { + throws IOException, TimeoutException { int numMailboxes = destinations.size(); if (numMailboxes == 1 || _keySelector == EmptyKeySelector.INSTANCE) { sendBlock(destinations.get(0), block); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java index 825095f3cb30..4e0dabf7e183 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java @@ -19,8 +19,10 @@ package org.apache.pinot.query.runtime.operator.exchange; import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; import java.util.List; import java.util.Random; +import java.util.concurrent.TimeoutException; import java.util.function.IntFunction; import org.apache.pinot.query.mailbox.SendingMailbox; import org.apache.pinot.query.runtime.blocks.BlockSplitter; @@ -48,7 +50,7 @@ class RandomExchange extends BlockExchange { @Override protected void route(List destinations, TransferableBlock block) - throws Exception { + throws IOException, TimeoutException { int destinationIdx = _rand.apply(destinations.size()); sendBlock(destinations.get(destinationIdx), block); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java index 926cf2a9d883..96c0c0c62cfe 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java @@ -19,7 +19,9 @@ package org.apache.pinot.query.runtime.operator.exchange; import com.google.common.base.Preconditions; +import java.io.IOException; import java.util.List; +import java.util.concurrent.TimeoutException; import org.apache.pinot.query.mailbox.InMemorySendingMailbox; import org.apache.pinot.query.mailbox.SendingMailbox; import org.apache.pinot.query.runtime.blocks.BlockSplitter; @@ -41,7 +43,7 @@ class SingletonExchange extends BlockExchange { @Override protected void route(List sendingMailboxes, TransferableBlock block) - throws Exception { + throws IOException, TimeoutException { sendBlock(sendingMailboxes.get(0), block); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java index 145028fc7458..6fab218cffb4 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.query.runtime.blocks.TransferableBlock; @@ -119,7 +120,11 @@ public void onData() { */ public E readBlockBlocking() { if (LOGGER.isTraceEnabled()) { - LOGGER.trace("==[RECEIVE]== Enter getNextBlock from: " + _id + " mailboxSize: " + _mailboxes.size()); + String mailboxIds = _mailboxes.stream() + .map(AsyncStream::getId) + .map(Object::toString) + .collect(Collectors.joining(",")); + LOGGER.trace("==[RECEIVE]== Enter getNextBlock from: " + _id + ". Mailboxes: " + mailboxIds); } // Standard optimistic execution. First we try to read without acquiring the lock. E block = readDroppingSuccessEos(); @@ -156,11 +161,11 @@ public E readBlockBlocking() { } /** - * This is a utility method that reads tries to read from the different mailboxes in a circular manner. + * This is a utility method that tries to read from the different mailboxes in a circular manner. * * The method is a bit more complex than expected because ir order to simplify {@link #readBlockBlocking} we added - * some extra logic here. For example, this method checks for timeouts, add some logs, releases mailboxes that emitted - * EOS and in case an error block is found, stores it. + * some extra logic here. For example, this method checks for timeouts, adds some logs, releases mailboxes that + * emitted EOS and in case an error block is found, stores it. * * @return the new block to consume or null if none is found. EOS is only emitted when all mailboxes already emitted * EOS. @@ -180,8 +185,12 @@ private E readDroppingSuccessEos() { // this is done in order to keep the invariant. _lastRead--; if (LOGGER.isDebugEnabled()) { + String ids = _mailboxes.stream() + .map(AsyncStream::getId) + .map(Object::toString) + .collect(Collectors.joining(",")); LOGGER.debug("==[RECEIVE]== EOS received : " + _id + " in mailbox: " + removed.getId() - + " (" + _mailboxes.size() + " mailboxes alive)"); + + " (" + ids + " mailboxes alive)"); } onConsumerFinish(block); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java index 182b128798a8..df8854d18c12 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java @@ -20,7 +20,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; +import java.io.IOException; import java.util.List; +import java.util.concurrent.TimeoutException; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; @@ -176,7 +178,7 @@ protected TestBlockExchange(List destinations, BlockSplitter spl @Override protected void route(List destinations, TransferableBlock block) - throws Exception { + throws IOException, TimeoutException { for (SendingMailbox mailbox : destinations) { sendBlock(mailbox, block); } diff --git a/pinot-query-runtime/src/test/resources/queries/Spool.json b/pinot-query-runtime/src/test/resources/queries/Spool.json new file mode 100644 index 000000000000..fdea8caa407d --- /dev/null +++ b/pinot-query-runtime/src/test/resources/queries/Spool.json @@ -0,0 +1,37 @@ +{ + "spools": { + "tables": { + "tbl1" : { + "schema": [ + {"name": "strCol1", "type": "STRING"}, + {"name": "intCol1", "type": "INT"}, + {"name": "strCol2", "type": "STRING"} + ], + "inputs": [ + ["foo", 1, "foo"], + ["bar", 2, "alice"] + ] + }, + "tbl2" : { + "schema": [ + {"name": "strCol1", "type": "STRING"}, + {"name": "strCol2", "type": "STRING"}, + {"name": "intCol1", "type": "INT"}, + {"name": "doubleCol1", "type": "DOUBLE"}, + {"name": "boolCol1", "type": "BOOLEAN"} + ], + "inputs": [ + ["foo", "bob", 3, 3.1416, true], + ["alice", "alice", 4, 2.7183, false] + ] + } + }, + "queries": [ + { + "description": "Simplest spool", + "sql": "SET timeoutMs=10000; SET useSpools=true; SELECT * FROM {tbl1} as a1 JOIN {tbl2} as b ON a1.strCol1 = b.strCol1 JOIN {tbl1} as a2 ON a2.strCol1 = b.strCol1", + "h2Sql": "SELECT * FROM {tbl1} as a1 JOIN {tbl2} as b ON a1.strCol1 = b.strCol1 JOIN {tbl1} as a2 ON a2.strCol1 = b.strCol1" + } + ] + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 641fa4ef899e..77c985a8d0ee 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -363,6 +363,13 @@ public static class Broker { public static final String CONFIG_OF_INFER_PARTITION_HINT = "pinot.broker.multistage.infer.partition.hint"; public static final boolean DEFAULT_INFER_PARTITION_HINT = false; + /** + * Whether to use spools in multistage query engine by default. + * This value can always be overridden by {@link Request.QueryOptionKey#USE_SPOOLS} query option + */ + public static final String CONFIG_OF_SPOOLS = "pinot.broker.multistage.spools"; + public static final boolean DEFAULT_OF_SPOOLS = false; + public static final String CONFIG_OF_USE_FIXED_REPLICA = "pinot.broker.use.fixed.replica"; public static final boolean DEFAULT_USE_FIXED_REPLICA = false; @@ -414,6 +421,7 @@ public static class QueryOptionKey { public static final String INFER_PARTITION_HINT = "inferPartitionHint"; public static final String ENABLE_NULL_HANDLING = "enableNullHandling"; public static final String APPLICATION_NAME = "applicationName"; + public static final String USE_SPOOLS = "useSpools"; /** * If set, changes the explain behavior in multi-stage engine. *