Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ private Optional<QueryFailureInfo> createQueryFailureInfo(ExecutionFailureInfo f

private static Optional<TaskInfo> findFailedTask(StagesInfo stages)
{
for (StageInfo stageInfo : stages.getSubStagesDeepPostOrder(stages.getOutputStageId(), true)) {
for (StageInfo stageInfo : stages.getSubStagesDeep(stages.getOutputStageId(), true)) {
Optional<TaskInfo> failedTaskInfo = stageInfo.getTasks().stream()
.filter(taskInfo -> taskInfo.taskStatus().getState() == TaskState.FAILED)
.findFirst();
Expand Down
45 changes: 21 additions & 24 deletions core/trino-main/src/main/java/io/trino/execution/StagesInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -92,13 +93,13 @@ public List<StageInfo> getSubStages(StageId stageId)
}

@JsonIgnore
public List<StageInfo> getSubStagesDeepPreOrder(StageId stageId)
public List<StageInfo> getSubStagesDeep(StageId stageId)
{
return getSubStagesDeepPreOrder(stageId, false);
return getSubStagesDeep(stageId, false);
}

@JsonIgnore
public List<StageInfo> getSubStagesDeepPreOrder(StageId root, boolean includeRoot)
public List<StageInfo> getSubStagesDeep(StageId root, boolean includeRoot)
{
StageInfo stageInfo = stagesById.get(root);
checkArgument(stageInfo != null, "stage %s not found", root);
Expand All @@ -112,7 +113,7 @@ public List<StageInfo> getSubStagesDeepPreOrder(StageId root, boolean includeRoo
return subStagesIds.build().stream().map(stagesById::get).collect(toImmutableList());
}

private void collectSubStageIdsPreOrder(StageInfo stageInfo, ImmutableSet.Builder collector)
private void collectSubStageIdsPreOrder(StageInfo stageInfo, ImmutableSet.Builder<StageId> collector)
{
stageInfo.getSubStages().stream().forEach(subStageId -> {
collector.add(subStageId);
Expand All @@ -122,33 +123,29 @@ private void collectSubStageIdsPreOrder(StageInfo stageInfo, ImmutableSet.Builde
}

@JsonIgnore
public List<StageInfo> getSubStagesDeepPostOrder(StageId stageId)
public List<StageInfo> getSubStagesDeepTopological(StageId root, boolean includeRoot)
{
return getSubStagesDeepPostOrder(stageId, false);
ImmutableList.Builder<StageInfo> builder = ImmutableList.builder();
getSubStagesDeepTopologicalInner(root, builder, new HashSet<>(), includeRoot);

return builder.build().reverse();
}

@JsonIgnore
public List<StageInfo> getSubStagesDeepPostOrder(StageId root, boolean includeRoot)
private void getSubStagesDeepTopologicalInner(StageId stageId, ImmutableList.Builder<StageInfo> builder, Set<StageId> visitedFragments, boolean includeRoot)
{
StageInfo stageInfo = stagesById.get(root);
checkArgument(stageInfo != null, "stage %s not found", root);

ImmutableSet.Builder<StageId> subStagesIds = ImmutableSet.builder();
collectSubStageIdsPostOrder(stageInfo, subStagesIds);
if (includeRoot) {
subStagesIds.add(root);
if (visitedFragments.contains(stageId)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Cycle detection is introduced but not documented.

Using 'visitedFragments' avoids infinite recursion, but if cycles are unexpected, this could hide data problems. Consider raising an error or warning when a cycle is detected.

return;
}

return subStagesIds.build().stream().map(stagesById::get).collect(toImmutableList());
}
StageInfo stageInfo = stagesById.get(stageId);

private void collectSubStageIdsPostOrder(StageInfo stageInfo, ImmutableSet.Builder collector)
{
stageInfo.getSubStages().stream().forEach(subStageId -> {
StageInfo subStage = stagesById.get(subStageId);
collectSubStageIdsPostOrder(subStage, collector);
collector.add(subStageId);
});
for (StageId childId : stageInfo.getSubStages().reversed()) {
getSubStagesDeepTopologicalInner(childId, builder, visitedFragments, true);
}
if (includeRoot) {
builder.add(stageInfo);
}
visitedFragments.add(stageId);
}

public static List<StageInfo> getAllStages(Optional<StagesInfo> stages)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.sql.planner.planprinter.PlanPrinter.textDistributedPlan;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -153,15 +152,14 @@ public Page getOutput()

QueryInfo queryInfo = queryPerformanceFetcher.getQueryInfo(operatorContext.getDriverContext().getTaskId().queryId());
checkState(queryInfo.getStages().isPresent(), "Stages informations is missing");
checkState(queryInfo.getStages().get().getOutputStage().getSubStages().size() == 1, "Expected one sub stage of explain node");
StagesInfo stagesInfo = queryInfo.getStages().get();
checkState(stagesInfo.getOutputStage().getSubStages().size() == 1, "Expected one sub stage of explain node");

if (!hasFinalStageInfo(queryInfo.getStages().get())) {
if (!hasFinalStageInfo(stagesInfo)) {
return null;
}

List<StageInfo> stagesWithoutOutputStage = queryInfo.getStages().orElseThrow().getStages().stream()
.filter(stage -> !stage.getStageId().equals(queryInfo.getStages().orElseThrow().getOutputStageId()))
.collect(toImmutableList());
List<StageInfo> stagesWithoutOutputStage = stagesInfo.getSubStagesDeepTopological(stagesInfo.getOutputStageId(), false);

String plan = textDistributedPlan(
stagesWithoutOutputStage,
Expand Down Expand Up @@ -194,7 +192,7 @@ private boolean hasFinalStageInfo(StagesInfo stages)

private boolean isFinalStageInfo(StagesInfo stages)
{
List<StageInfo> subStages = stages.getSubStagesDeepPreOrder(operatorContext.getDriverContext().getTaskId().stageId());
List<StageInfo> subStages = stages.getSubStagesDeep(operatorContext.getDriverContext().getTaskId().stageId());
return subStages.stream().allMatch(StageInfo::isFinalStageInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ public static String textDistributedPlan(
Anonymizer anonymizer,
NodeVersion version)
{
return textDistributedPlan(stages.getStages(), queryStats, valuePrinter, verbose, anonymizer, version);
return textDistributedPlan(stages.getSubStagesDeepTopological(stages.getOutputStageId(), true), queryStats, valuePrinter, verbose, anonymizer, version);
}

public static String textDistributedPlan(
Expand Down
Loading