Skip to content

Commit

Permalink
Fix for adding all artifacts, putting spacer nodes in groups
Browse files Browse the repository at this point in the history
  • Loading branch information
jeff-phillips-18 committed May 2, 2024
1 parent 3e456d3 commit 4b99136
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 60 deletions.
142 changes: 94 additions & 48 deletions frontend/src/concepts/pipelines/topology/usePipelineTaskTopology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,85 @@ import { PipelineNodeModelExpanded } from '~/concepts/topology/types';
import { createArtifactNode, createGroupNode } from '~/concepts/topology/utils';
import { Execution } from '~/third_party/mlmd';
import {
ComponentArtifactMap,
composeArtifactType,
parseComponentsForArtifactRelationship,
parseInputOutput,
parseRuntimeInfoFromExecutions,
parseRuntimeInfoFromRunDetails,
parseTasksForArtifactRelationship,
parseVolumeMounts,
TaskArtifactMap,
translateStatusForNode,
} from './parseUtils';
import { PipelineTask } from './pipelineTaskTypes';
import { PipelineTask, PipelineTaskRunStatus } from './pipelineTaskTypes';

const EMPTY_STATE: PipelineNodeModelExpanded[] = [];

const getNodeArtifacts = (
taskId: string,
status: PipelineTaskRunStatus | undefined,
componentRef: string,
componentArtifactMap: ComponentArtifactMap,
taskArtifactMap: TaskArtifactMap,
): PipelineNodeModelExpanded[] => {
const artifactsInComponent = componentArtifactMap[componentRef];
const artifactNodes: PipelineNodeModelExpanded[] = [];
if (artifactsInComponent) {
const artifactNodeData = taskArtifactMap[taskId];

Object.entries(artifactsInComponent).forEach(([artifactKey, data]) => {
const label = artifactKey;
const { artifactId } =
artifactNodeData?.find((a) => artifactKey === a.outputArtifactKey) ?? {};

// if no node needs it as an input, we don't really need a well known id
const id = artifactId ?? artifactKey;

const artifactPipelineTask: PipelineTask = {
type: 'artifact',
name: label,
inputs: {
artifacts: [{ label: id, type: composeArtifactType(data) }],
},
};

artifactNodes.push(
createArtifactNode(
id,
label,
artifactPipelineTask,
[taskId],
translateStatusForNode(status?.state),
data.schemaTitle,
),
);
});
}
return artifactNodes;
};

const getNestedNodes = (
spec: PipelineSpecVariable,
items: Record<string, TaskKF>,
components: PipelineComponentsKF,
executors: PipelineExecutorsKF,
componentArtifactMap: ComponentArtifactMap,
taskArtifactMap: TaskArtifactMap,
runDetails?: RunDetailsKF,
executions?: Execution[] | null,
): [nestedNodes: PipelineNodeModelExpanded[], children: string[]] => {
const nodes: PipelineNodeModelExpanded[] = [];
const children: string[] = [];

Object.entries(items).forEach(([name, details]) => {
Object.entries(items).forEach(([taskId, details]) => {
const componentRef = details.componentRef.name;
const component = components[componentRef];
const taskName = details.taskInfo.name;

const status = executions
? parseRuntimeInfoFromExecutions(name, executions)
: parseRuntimeInfoFromRunDetails(name, runDetails);
const status =
parseRuntimeInfoFromExecutions(taskId, executions) ||
parseRuntimeInfoFromRunDetails(taskId, runDetails);

const runAfter: string[] = details.dependentTasks ?? [];
const hasSubTask =
Expand All @@ -52,39 +100,55 @@ const getNestedNodes = (

const pipelineTask: PipelineTask = {
type: 'groupTask',
name,
name: taskName,
steps: executor ? [executor.container] : undefined,
inputs: parseInputOutput(component?.inputDefinitions),
outputs: parseInputOutput(component?.outputDefinitions),
status,
volumeMounts: parseVolumeMounts(spec.platform_spec, executorLabel),
};

const artifactNodes = getNodeArtifacts(
taskId,
status,
componentRef,
componentArtifactMap,
taskArtifactMap,
);
if (artifactNodes.length) {
nodes.push(...artifactNodes);
children.push(...artifactNodes.map((n) => n.id));
}

if (hasSubTask && subTasks) {
const [nestedNodes, nestedChildren] = getNestedNodes(
spec,
subTasks,
components,
executors,
componentArtifactMap,
taskArtifactMap,
runDetails,
executions,
);
const newChildren = nestedChildren.filter((child) => !nodes.find((n) => n.id === child));
const newNodes = nestedNodes.filter((node) => !nodes.find((n) => n.id === node.id));

const itemNode = createGroupNode(
name,
name,
taskId,
taskName,
pipelineTask,
runAfter,
translateStatusForNode(status?.state),
nestedChildren,
newChildren,
);
nodes.push(itemNode, ...nestedNodes);
nodes.push(itemNode, ...newNodes);
} else {
nodes.push(
createNode(name, name, pipelineTask, runAfter, translateStatusForNode(status?.state)),
createNode(taskId, taskName, pipelineTask, runAfter, translateStatusForNode(status?.state)),
);
}
children.push(name);
children.push(taskId);
});

return [nodes, children];
Expand Down Expand Up @@ -116,50 +180,28 @@ export const usePipelineTaskTopology = (

const componentRef = taskValue.componentRef.name;
const component = components[componentRef];
const artifactsInComponent = componentArtifactMap[componentRef];
const isGroupNode = !!component?.dag;
const groupTasks = component?.dag?.tasks;

const executorLabel = component?.executorLabel;
const executor = executorLabel ? executors[executorLabel] : undefined;

const status = executions
? parseRuntimeInfoFromExecutions(taskId, executions)
: parseRuntimeInfoFromRunDetails(taskId, runDetails);
const status =
parseRuntimeInfoFromExecutions(taskId, executions) ||
parseRuntimeInfoFromRunDetails(taskId, runDetails);

const nodes: PipelineNodeModelExpanded[] = [];
const runAfter: string[] = taskValue.dependentTasks ?? [];

if (artifactsInComponent) {
const artifactNodeData = taskArtifactMap[taskId];

Object.entries(artifactsInComponent).forEach(([artifactKey, data]) => {
const label = artifactKey;
const { artifactId } =
artifactNodeData?.find((a) => artifactKey === a.outputArtifactKey) ?? {};

// if no node needs it as an input, we don't really need a well known id
const id = artifactId ?? artifactKey;

const pipelineTask: PipelineTask = {
type: 'artifact',
name: label,
inputs: {
artifacts: [{ label: id, type: composeArtifactType(data) }],
},
};

nodes.push(
createArtifactNode(
id,
label,
pipelineTask,
[taskId],
translateStatusForNode(status?.state),
data.schemaTitle,
),
);
});
const artifactNodes = getNodeArtifacts(
taskId,
status,
componentRef,
componentArtifactMap,
taskArtifactMap,
);
if (artifactNodes.length) {
nodes.push(...artifactNodes);
}

if (taskValue.dependentTasks) {
Expand Down Expand Up @@ -192,18 +234,22 @@ export const usePipelineTaskTopology = (
groupTasks,
components,
executors,
componentArtifactMap,
taskArtifactMap,
runDetails,
executions,
);
const newChildren = children.filter((child) => !nodes.find((n) => n.id === child));
const newNodes = nestedNodes.filter((node) => !nodes.find((n) => n.id === node.id));
const itemNode = createGroupNode(
taskId,
taskName,
pipelineTask,
runAfter,
translateStatusForNode(status?.state),
children,
newChildren,
);
nodes.push(itemNode, ...nestedNodes);
nodes.push(itemNode, ...newNodes);
} else {
nodes.push(
createNode(taskId, taskName, pipelineTask, runAfter, translateStatusForNode(status?.state)),
Expand Down
19 changes: 7 additions & 12 deletions frontend/src/concepts/topology/PipelineDefaultTaskGroup.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import * as React from 'react';

import {
LabelPosition,
WithSelectionProps,
isNode,
DefaultTaskGroup,
Expand All @@ -22,6 +21,8 @@ import { PipelineNodeModelExpanded, StandardTaskNodeData } from '~/concepts/topo
import NodeStatusIcon from '~/concepts/topology/NodeStatusIcon';
import { NODE_HEIGHT, NODE_WIDTH } from './const';

const MAX_TIP_ITEMS = 6;

type PipelinesDefaultGroupProps = {
element: GraphElement<PipelineNodeModelExpanded>;
} & WithSelectionProps;
Expand All @@ -36,20 +37,15 @@ const DefaultTaskGroupInner: React.FunctionComponent<PipelinesDefaultGroupInnerP
const popoverRef = React.useRef<SVGGElement>(null);
const detailsLevel = element.getGraph().getDetailsLevel();

const MAX_TIP_ITEMS = 6;

const getPopoverTasksList = (items: Node<NodeModel>[]) => (
<Stack hasGutter>
{items.slice(0, MAX_TIP_ITEMS).map((item: Node) => (
<StackItem key={item.getId()}>
<Flex gap={{ default: 'gapXs' }}>
{item.getData()?.runStatus && (
<FlexItem style={{ flex: '0' }}>
<NodeStatusIcon runStatus={item.getData()?.runStatus} />
</FlexItem>
)}
{!item.getData()?.runStatus && <div style={{ width: '20px' }}>&nbsp;</div>}
<FlexItem style={{ flex: '1' }}>{item.getLabel()}</FlexItem>
<Flex gap={{ default: 'gapXs' }} alignItems={{ default: 'alignItemsCenter' }}>
<FlexItem style={{ flex: '0', width: 26 }}>
<NodeStatusIcon runStatus={item.getData()?.runStatus} />
</FlexItem>
<FlexItem style={{ flex: '1', marginLeft: 4 }}>{item.getLabel()}</FlexItem>
</Flex>
</StackItem>
))}
Expand All @@ -61,7 +57,6 @@ const DefaultTaskGroupInner: React.FunctionComponent<PipelinesDefaultGroupInnerP

const groupNode = (
<DefaultTaskGroup
labelPosition={LabelPosition.top}
element={element}
collapsible
recreateLayoutOnCollapseChange
Expand Down
11 changes: 11 additions & 0 deletions frontend/src/concepts/topology/PipelineVisualizationSurface.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ const PipelineVisualizationSurface: React.FC<PipelineVisualizationSurfaceProps>

const spacerNodes = getSpacerNodes(updateNodes);

// find the parent of each spacer node
spacerNodes.forEach((spacerNode) => {
const nodeIds = spacerNode.id.split('|');
if (nodeIds[0]) {
const parent = updateNodes.find((n) => n.children?.includes(nodeIds[0]));
if (parent) {
parent.children?.push(spacerNode.id);
}
}
});

// Dagre likes the root nodes to be first in the order
const renderNodes = [...spacerNodes, ...updateNodes].sort(
(a, b) => (a.runAfterTasks?.length ?? 0) - (b.runAfterTasks?.length ?? 0),
Expand Down

0 comments on commit 4b99136

Please sign in to comment.