From bd9bc9547238bebb0c144f9b566462170183fe69 Mon Sep 17 00:00:00 2001 From: Juntao Wang Date: Tue, 9 Apr 2024 17:38:21 -0400 Subject: [PATCH] Read pipeline run node details from mlmd context --- .../concepts/pipelines/apiHooks/mlmd/types.ts | 7 + .../mlmd/useExecutionsFromMlmdContext.ts | 31 +++ .../pipelines/apiHooks/mlmd/useMlmdContext.ts | 45 ++++ .../mlmd/usePipelineRunMlmdContext.ts | 8 + .../pipelines/apiHooks/usePipelineRunById.ts | 20 +- .../pipelineRun/PipelineRunDetails.tsx | 34 +-- .../pipelineRun/runLogs/LogsTab.tsx | 4 +- .../useExecutionsForPipelineRun.ts | 21 ++ frontend/src/concepts/pipelines/kfTypes.ts | 16 ++ .../topology/__tests__/parseUtils.spec.ts | 209 +++++++++++++++++- .../concepts/pipelines/topology/parseUtils.ts | 123 ++++++++++- .../pipelines/topology/pipelineTaskTypes.ts | 9 +- .../topology/usePipelineTaskTopology.ts | 14 +- 13 files changed, 495 insertions(+), 46 deletions(-) create mode 100644 frontend/src/concepts/pipelines/apiHooks/mlmd/types.ts create mode 100644 frontend/src/concepts/pipelines/apiHooks/mlmd/useExecutionsFromMlmdContext.ts create mode 100644 frontend/src/concepts/pipelines/apiHooks/mlmd/useMlmdContext.ts create mode 100644 frontend/src/concepts/pipelines/apiHooks/mlmd/usePipelineRunMlmdContext.ts create mode 100644 frontend/src/concepts/pipelines/content/pipelinesDetails/pipelineRun/useExecutionsForPipelineRun.ts diff --git a/frontend/src/concepts/pipelines/apiHooks/mlmd/types.ts b/frontend/src/concepts/pipelines/apiHooks/mlmd/types.ts new file mode 100644 index 0000000000..928e839713 --- /dev/null +++ b/frontend/src/concepts/pipelines/apiHooks/mlmd/types.ts @@ -0,0 +1,7 @@ +import { Context } from '~/third_party/mlmd'; + +export type MlmdContext = Context; + +export enum MlmdContextTypes { + RUN = 'system.PipelineRun', +} diff --git a/frontend/src/concepts/pipelines/apiHooks/mlmd/useExecutionsFromMlmdContext.ts b/frontend/src/concepts/pipelines/apiHooks/mlmd/useExecutionsFromMlmdContext.ts new file mode 100644 index 0000000000..454fd19dd3 --- /dev/null +++ b/frontend/src/concepts/pipelines/apiHooks/mlmd/useExecutionsFromMlmdContext.ts @@ -0,0 +1,31 @@ +import React from 'react'; +import { MlmdContext } from '~/concepts/pipelines/apiHooks/mlmd/types'; +import { usePipelinesAPI } from '~/concepts/pipelines/context'; +import { Execution, GetExecutionsByContextRequest } from '~/third_party/mlmd'; +import useFetchState, { + FetchState, + FetchStateCallbackPromise, + NotReadyError, +} from '~/utilities/useFetchState'; + +export const useExecutionsFromMlmdContext = ( + context: MlmdContext | null, + refreshRate?: number, +): FetchState => { + const { metadataStoreServiceClient } = usePipelinesAPI(); + + const call = React.useCallback>(async () => { + if (!context) { + return Promise.reject(new NotReadyError('No context')); + } + + const request = new GetExecutionsByContextRequest(); + request.setContextId(context.getId()); + const res = await metadataStoreServiceClient.getExecutionsByContext(request); + return res.getExecutionsList(); + }, [metadataStoreServiceClient, context]); + + return useFetchState(call, null, { + refreshRate, + }); +}; diff --git a/frontend/src/concepts/pipelines/apiHooks/mlmd/useMlmdContext.ts b/frontend/src/concepts/pipelines/apiHooks/mlmd/useMlmdContext.ts new file mode 100644 index 0000000000..68daf18600 --- /dev/null +++ b/frontend/src/concepts/pipelines/apiHooks/mlmd/useMlmdContext.ts @@ -0,0 +1,45 @@ +import React from 'react'; +import { MlmdContext, MlmdContextTypes } from '~/concepts/pipelines/apiHooks/mlmd/types'; +import { usePipelinesAPI } from '~/concepts/pipelines/context'; +import { GetContextByTypeAndNameRequest } from '~/third_party/mlmd'; +import useFetchState, { + FetchState, + FetchStateCallbackPromise, + NotReadyError, +} from '~/utilities/useFetchState'; + +/** + * A hook used to use the MLMD service and fetch the MLMD context + * If being used without name/type, this hook will throw an error + * @param name The identifier to query a specific type of MLMD context. e.g. The runID for a pipeline run + */ +export const useMlmdContext = ( + name?: string, + type?: MlmdContextTypes, + refreshRate?: number, +): FetchState => { + const { metadataStoreServiceClient } = usePipelinesAPI(); + + const call = React.useCallback>(async () => { + if (!type) { + return Promise.reject(new NotReadyError('No context type')); + } + if (!name) { + return Promise.reject(new NotReadyError('No context name')); + } + + const request = new GetContextByTypeAndNameRequest(); + request.setTypeName(type); + request.setContextName(name); + const res = await metadataStoreServiceClient.getContextByTypeAndName(request); + const context = res.getContext(); + if (!context) { + return Promise.reject(new Error('Cannot find specified context')); + } + return context; + }, [metadataStoreServiceClient, type, name]); + + return useFetchState(call, null, { + refreshRate, + }); +}; diff --git a/frontend/src/concepts/pipelines/apiHooks/mlmd/usePipelineRunMlmdContext.ts b/frontend/src/concepts/pipelines/apiHooks/mlmd/usePipelineRunMlmdContext.ts new file mode 100644 index 0000000000..848eef59b2 --- /dev/null +++ b/frontend/src/concepts/pipelines/apiHooks/mlmd/usePipelineRunMlmdContext.ts @@ -0,0 +1,8 @@ +import { FetchState } from '~/utilities/useFetchState'; +import { MlmdContext, MlmdContextTypes } from '~/concepts/pipelines/apiHooks/mlmd/types'; +import { useMlmdContext } from '~/concepts/pipelines/apiHooks/mlmd/useMlmdContext'; + +export const usePipelineRunMlmdContext = ( + runID?: string, + refreshRate?: number, +): FetchState => useMlmdContext(runID, MlmdContextTypes.RUN, refreshRate); diff --git a/frontend/src/concepts/pipelines/apiHooks/usePipelineRunById.ts b/frontend/src/concepts/pipelines/apiHooks/usePipelineRunById.ts index efdfd74901..c112b91db5 100644 --- a/frontend/src/concepts/pipelines/apiHooks/usePipelineRunById.ts +++ b/frontend/src/concepts/pipelines/apiHooks/usePipelineRunById.ts @@ -9,6 +9,15 @@ import { PipelineRunKFv2, RuntimeStateKF, runtimeStateLabels } from '~/concepts/ import { FAST_POLL_INTERVAL } from '~/utilities/const'; import { computeRunStatus } from '~/concepts/pipelines/content/utils'; +export const isPipelineRunFinished = (run?: PipelineRunKFv2 | null): boolean => { + const { label } = computeRunStatus(run); + return [ + runtimeStateLabels[RuntimeStateKF.SUCCEEDED], + runtimeStateLabels[RuntimeStateKF.FAILED], + runtimeStateLabels[RuntimeStateKF.CANCELED], + ].includes(label); +}; + const usePipelineRunById = ( pipelineRunId?: string, refreshForDetails?: boolean, @@ -32,18 +41,13 @@ const usePipelineRunById = ( }); const [run] = runData; - const { label } = computeRunStatus(run); - const isComplete = [ - runtimeStateLabels[RuntimeStateKF.SUCCEEDED], - runtimeStateLabels[RuntimeStateKF.FAILED], - runtimeStateLabels[RuntimeStateKF.CANCELED], - ].includes(label); + const isFinished = isPipelineRunFinished(run); React.useEffect(() => { - if (isComplete) { + if (isFinished) { setPipelineFinished(true); } - }, [isComplete]); + }, [isFinished]); return runData; }; diff --git a/frontend/src/concepts/pipelines/content/pipelinesDetails/pipelineRun/PipelineRunDetails.tsx b/frontend/src/concepts/pipelines/content/pipelinesDetails/pipelineRun/PipelineRunDetails.tsx index 2c743db6be..006828eed3 100644 --- a/frontend/src/concepts/pipelines/content/pipelinesDetails/pipelineRun/PipelineRunDetails.tsx +++ b/frontend/src/concepts/pipelines/content/pipelinesDetails/pipelineRun/PipelineRunDetails.tsx @@ -36,6 +36,7 @@ import { usePipelineTaskTopology } from '~/concepts/pipelines/topology'; import { PipelineRunType } from '~/pages/pipelines/global/runs/types'; import { routePipelineRunsNamespace } from '~/routes'; import PipelineJobReferenceName from '~/concepts/pipelines/content/PipelineJobReferenceName'; +import useExecutionsForPipelineRun from '~/concepts/pipelines/content/pipelinesDetails/pipelineRun/useExecutionsForPipelineRun'; const PipelineRunDetails: PipelineCoreDetailsPageComponent = ({ breadcrumbPath, contextPath }) => { const { runId } = useParams(); @@ -52,17 +53,16 @@ const PipelineRunDetails: PipelineCoreDetailsPageComponent = ({ breadcrumbPath, RunDetailsTabs.DETAILS, ); const [selectedId, setSelectedId] = React.useState(null); - const { taskMap, nodes } = usePipelineTaskTopology(pipelineSpec, runResource ?? undefined); + + const [executions, executionsLoaded, executionsError] = useExecutionsForPipelineRun(runResource); + const { taskMap, nodes } = usePipelineTaskTopology( + pipelineSpec, + runResource?.run_details, + executions, + ); const loaded = runLoaded && (versionLoaded || !!runResource?.pipeline_spec); const error = versionError || runError; - if (!loaded && !error) { - return ( - - - - ); - } if (error) { return ( @@ -77,6 +77,14 @@ const PipelineRunDetails: PipelineCoreDetailsPageComponent = ({ breadcrumbPath, ); } + if (!loaded || (!executionsLoaded && !executionsError)) { + return ( + + + + ); + } + return ( <> @@ -136,12 +144,10 @@ const PipelineRunDetails: PipelineCoreDetailsPageComponent = ({ breadcrumbPath, } headerAction={ - loaded && ( - setDeleting(true)} - /> - ) + setDeleting(true)} + /> } empty={false} > diff --git a/frontend/src/concepts/pipelines/content/pipelinesDetails/pipelineRun/runLogs/LogsTab.tsx b/frontend/src/concepts/pipelines/content/pipelinesDetails/pipelineRun/runLogs/LogsTab.tsx index bd4bc9aa4f..13bd5267f5 100644 --- a/frontend/src/concepts/pipelines/content/pipelinesDetails/pipelineRun/runLogs/LogsTab.tsx +++ b/frontend/src/concepts/pipelines/content/pipelinesDetails/pipelineRun/runLogs/LogsTab.tsx @@ -40,12 +40,12 @@ import DownloadDropdown from '~/concepts/pipelines/content/pipelinesDetails/pipe import { PodStepStateType } from '~/types'; import useDebounceCallback from '~/utilities/useDebounceCallback'; import { PipelineTask } from '~/concepts/pipelines/topology'; -import { RuntimeStateKF } from '~/concepts/pipelines/kfTypes'; +import { ExecutionStateKF } from '~/concepts/pipelines/kfTypes'; // TODO: If this gets large enough we should look to make this its own component file const LogsTab: React.FC<{ task: PipelineTask }> = ({ task }) => { const podName = task.status?.podName; - const isFailedPod = task.status?.state === RuntimeStateKF.FAILED; + const isFailedPod = task.status?.state === ExecutionStateKF.FAILED; if (!podName) { return <>No content; diff --git a/frontend/src/concepts/pipelines/content/pipelinesDetails/pipelineRun/useExecutionsForPipelineRun.ts b/frontend/src/concepts/pipelines/content/pipelinesDetails/pipelineRun/useExecutionsForPipelineRun.ts new file mode 100644 index 0000000000..f8e6bad3ea --- /dev/null +++ b/frontend/src/concepts/pipelines/content/pipelinesDetails/pipelineRun/useExecutionsForPipelineRun.ts @@ -0,0 +1,21 @@ +import { useExecutionsFromMlmdContext } from '~/concepts/pipelines/apiHooks/mlmd/useExecutionsFromMlmdContext'; +import { usePipelineRunMlmdContext } from '~/concepts/pipelines/apiHooks/mlmd/usePipelineRunMlmdContext'; +import { isPipelineRunFinished } from '~/concepts/pipelines/apiHooks/usePipelineRunById'; +import { PipelineRunKFv2 } from '~/concepts/pipelines/kfTypes'; +import { Execution } from '~/third_party/mlmd'; +import { FAST_POLL_INTERVAL } from '~/utilities/const'; + +const useExecutionsForPipelineRun = ( + run: PipelineRunKFv2 | null, +): [executions: Execution[] | null, loaded: boolean, error?: Error] => { + const isFinished = isPipelineRunFinished(run); + const refreshRate = isFinished ? 0 : FAST_POLL_INTERVAL; + // contextError means mlmd service is not available, no need to check executions + const [context, , contextError] = usePipelineRunMlmdContext(run?.run_id, refreshRate); + // executionsLoaded is the flag to show the spinner or not + const [executions, executionsLoaded] = useExecutionsFromMlmdContext(context, refreshRate); + + return [executions, executionsLoaded, contextError]; +}; + +export default useExecutionsForPipelineRun; diff --git a/frontend/src/concepts/pipelines/kfTypes.ts b/frontend/src/concepts/pipelines/kfTypes.ts index 73cd7e6987..50a967abab 100644 --- a/frontend/src/concepts/pipelines/kfTypes.ts +++ b/frontend/src/concepts/pipelines/kfTypes.ts @@ -466,6 +466,22 @@ export enum RuntimeStateKF { PAUSED = 'PAUSED', } +export enum ExecutionStateKF { + NEW = 'New', + RUNNING = 'Running', + COMPLETE = 'Complete', + CANCELED = 'Canceled', + FAILED = 'Failed', + CACHED = 'Cached', +} + +export enum ArtifactStateKF { + PENDING = 'Pending', + LIVE = 'Live', + MARKED_FOR_DELETION = 'Marked for deletion', + DELETED = 'Deleted', +} + export const runtimeStateLabels = { [RuntimeStateKF.RUNTIME_STATE_UNSPECIFIED]: 'Unspecified', [RuntimeStateKF.PENDING]: 'Pending', diff --git a/frontend/src/concepts/pipelines/topology/__tests__/parseUtils.spec.ts b/frontend/src/concepts/pipelines/topology/__tests__/parseUtils.spec.ts index adb4db1e61..416b33aa7e 100644 --- a/frontend/src/concepts/pipelines/topology/__tests__/parseUtils.spec.ts +++ b/frontend/src/concepts/pipelines/topology/__tests__/parseUtils.spec.ts @@ -2,22 +2,30 @@ import { RunStatus } from '@patternfly/react-topology'; import { parseInputOutput, - parseRuntimeInfo, translateStatusForNode, lowestProgress, parseComponentsForArtifactRelationship, parseTasksForArtifactRelationship, + parseRuntimeInfoFromRunDetails, + getResourceStateText, + ResourceType, + parseRuntimeInfoFromExecutions, + parseVolumeMounts, } from '~/concepts/pipelines/topology/parseUtils'; import { + ArtifactStateKF, ArtifactType, + ExecutionStateKF, InputDefinitionParameterType, PipelineComponentsKF, + PlatformSpec, RunDetailsKF, RuntimeStateKF, TaskDetailKF, TaskKF, TriggerStrategy, } from '~/concepts/pipelines/kfTypes'; +import { Artifact, Execution, Value } from '~/third_party/mlmd'; describe('pipeline topology parseUtils', () => { describe('parseInputOutput', () => { @@ -58,7 +66,7 @@ describe('pipeline topology parseUtils', () => { const testTaskId = 'test-task-id'; it('returns undefined when runDetails are not provided', () => { - const result = parseRuntimeInfo(testTaskId); + const result = parseRuntimeInfoFromRunDetails(testTaskId); expect(result).toBeUndefined(); }); @@ -69,7 +77,7 @@ describe('pipeline topology parseUtils', () => { task_details: [], }; - const result = parseRuntimeInfo(testTaskId, testRunDetails); + const result = parseRuntimeInfoFromRunDetails(testTaskId, testRunDetails); expect(result).toBeUndefined(); }); @@ -89,7 +97,7 @@ describe('pipeline topology parseUtils', () => { ], }; - const result = parseRuntimeInfo(testTaskId, testRunDetails); + const result = parseRuntimeInfoFromRunDetails(testTaskId, testRunDetails); expect(result).toBeUndefined(); }); @@ -116,13 +124,13 @@ describe('pipeline topology parseUtils', () => { ], }; - const result = parseRuntimeInfo(testTaskId, testRunDetails); + const result = parseRuntimeInfoFromRunDetails(testTaskId, testRunDetails); expect(result).toEqual({ completeTime: '2024-01-03T00:00:00Z', podName: 'Some pod name', startTime: '2024-01-02T00:00:00Z', state: 'RUNNING', - taskId: 'test-task-id', + taskId: 'task.test-task-id', }); }); @@ -143,13 +151,112 @@ describe('pipeline topology parseUtils', () => { ], }; - const result = parseRuntimeInfo(testTaskId, testRunDetails); + const result = parseRuntimeInfoFromRunDetails(testTaskId, testRunDetails); expect(result).toEqual({ completeTime: '2024-01-03T00:00:00Z', podName: undefined, startTime: '2024-01-02T00:00:00Z', state: 'RUNNING', - taskId: 'test-task-id', + taskId: 'task.test-task-id', + }); + }); + }); + + describe('parseRuntimeInfoFromExecutions', () => { + const testTaskId = 'test-task-id'; + + it('returns undefined when executions are not provided', () => { + const result = parseRuntimeInfoFromExecutions(testTaskId); + expect(result).toBeUndefined(); + }); + + it('returns undefined when executions is null', () => { + const result = parseRuntimeInfoFromExecutions(testTaskId, null); + expect(result).toBeUndefined(); + }); + + it('returns undefined when executions are empty', () => { + const result = parseRuntimeInfoFromExecutions(testTaskId, []); + expect(result).toBeUndefined(); + }); + + it('returns undefined when there are no match executions', () => { + const mockExecution = new Execution(); + const result = parseRuntimeInfoFromExecutions(testTaskId, [mockExecution]); + expect(result).toBeUndefined(); + }); + + it('returns runtime info when execution id matches', () => { + const mockExecution = new Execution(); + const value = new Value(); + mockExecution.getCustomPropertiesMap().set('task_name', value.setStringValue(testTaskId)); + mockExecution.setCreateTimeSinceEpoch(1713285296322); + mockExecution.setLastUpdateTimeSinceEpoch(1713285296524); + mockExecution.setLastKnownState(Execution.State.COMPLETE); + const result = parseRuntimeInfoFromExecutions(testTaskId, [mockExecution]); + expect(result).toStrictEqual({ + completeTime: '2024-04-16T16:34:56.524Z', + podName: undefined, + startTime: '2024-04-16T16:34:56.322Z', + state: 'Complete', + taskId: 'task.test-task-id', + }); + }); + }); + + describe('getResourceStateText', () => { + it('returns undefined when state is not provided', () => { + const mockExecution = new Execution(); + const result = getResourceStateText({ + resourceType: ResourceType.EXECUTION, + resource: mockExecution, + }); + expect(result).toBeUndefined(); + }); + + it('returns undefined when state is "UNKNOWN"', () => { + const mockExecution = new Execution(); + mockExecution.setLastKnownState(Execution.State.UNKNOWN); + const result = getResourceStateText({ + resourceType: ResourceType.EXECUTION, + resource: mockExecution, + }); + expect(result).toBeUndefined(); + }); + + [ + { state: Execution.State.CACHED, status: ExecutionStateKF.CACHED }, + { state: Execution.State.CANCELED, status: ExecutionStateKF.CANCELED }, + { state: Execution.State.COMPLETE, status: ExecutionStateKF.COMPLETE }, + { state: Execution.State.FAILED, status: ExecutionStateKF.FAILED }, + { state: Execution.State.NEW, status: ExecutionStateKF.NEW }, + { state: Execution.State.RUNNING, status: ExecutionStateKF.RUNNING }, + ].forEach(({ state, status }) => { + it(`returns "${status}" with a provided "${state}" MLMD execution state`, () => { + const mockExecution = new Execution(); + mockExecution.setLastKnownState(state); + const result = getResourceStateText({ + resourceType: ResourceType.EXECUTION, + resource: mockExecution, + }); + expect(result).toBe(status); + }); + }); + + [ + { state: Artifact.State.LIVE, status: ArtifactStateKF.LIVE }, + { state: Artifact.State.DELETED, status: ArtifactStateKF.DELETED }, + { state: Artifact.State.MARKED_FOR_DELETION, status: ArtifactStateKF.MARKED_FOR_DELETION }, + { state: Artifact.State.PENDING, status: ArtifactStateKF.PENDING }, + ].forEach(({ state, status }) => { + it(`returns "${status}" with a provided "${state}" MLMD artifact state`, () => { + const mockArtifact = new Artifact(); + mockArtifact.setState(state); + const result = getResourceStateText({ + resourceType: ResourceType.ARTIFACT, + resource: mockArtifact, + }); + expect(result).toBe(status); }); }); }); @@ -179,6 +286,11 @@ describe('pipeline topology parseUtils', () => { { state: RuntimeStateKF.RUNNING, status: RunStatus.InProgress }, { state: RuntimeStateKF.SKIPPED, status: RunStatus.Skipped }, { state: RuntimeStateKF.SUCCEEDED, status: RunStatus.Succeeded }, + { state: ExecutionStateKF.CANCELED, status: RunStatus.Cancelled }, + { state: ExecutionStateKF.CACHED, status: RunStatus.Skipped }, + { state: ExecutionStateKF.COMPLETE, status: RunStatus.Succeeded }, + { state: ExecutionStateKF.FAILED, status: RunStatus.Failed }, + { state: ExecutionStateKF.RUNNING, status: RunStatus.Running }, ].forEach(({ state, status }) => { it(`returns "${status}" with a provided "${state}" state`, () => { const result = translateStatusForNode(state); @@ -409,4 +521,85 @@ describe('pipeline topology parseUtils', () => { }); }); }); + + describe('parseVolumeMounts', () => { + it('returns empty when no params passed', () => { + const result = parseVolumeMounts(); + expect(result).toEqual([]); + }); + + it('returns empty when no platform.kubernetes', () => { + const testPlatformSpec: PlatformSpec = { platforms: {} }; + const result = parseVolumeMounts(testPlatformSpec, 'test-executor-label'); + expect(result).toEqual([]); + }); + + it('returns empty when no executor label', () => { + const testExecutorLabel = 'test-executor-label'; + const testPlatformSpec: PlatformSpec = { + platforms: { + kubernetes: { + deploymentSpec: { + executors: { + [testExecutorLabel]: { + container: { image: 'test-image' }, + pvcMount: [{ mountPath: 'path-1' }], + }, + }, + }, + }, + }, + }; + const result = parseVolumeMounts(testPlatformSpec); + expect(result).toEqual([]); + }); + + it('returns empty when executor label does not match the platform spec', () => { + const testExecutorLabel = 'test-executor-label'; + const testPlatformSpec: PlatformSpec = { + platforms: { + kubernetes: { + deploymentSpec: { + executors: { + [`${testExecutorLabel}-not-match`]: { + container: { image: 'test-image' }, + pvcMount: [{ mountPath: 'path-1' }], + }, + }, + }, + }, + }, + }; + const result = parseVolumeMounts(testPlatformSpec, testExecutorLabel); + expect(result).toEqual([]); + }); + + it('returns the correct result when executor label matches the platform spec', () => { + const testExecutorLabel = 'test-executor-label'; + const testPlatformSpec: PlatformSpec = { + platforms: { + kubernetes: { + deploymentSpec: { + executors: { + [testExecutorLabel]: { + container: { image: 'test-image' }, + pvcMount: [ + { + mountPath: 'path-1', + taskOutputParameter: { + outputParameterKey: 'test-key', + producerTask: 'test-task-1', + }, + }, + ], + }, + }, + }, + }, + }, + }; + const result = parseVolumeMounts(testPlatformSpec, testExecutorLabel); + expect(result).toEqual([{ mountPath: 'path-1', name: 'test-task-1' }]); + }); + }); }); diff --git a/frontend/src/concepts/pipelines/topology/parseUtils.ts b/frontend/src/concepts/pipelines/topology/parseUtils.ts index baa3320e5d..79870ead5b 100644 --- a/frontend/src/concepts/pipelines/topology/parseUtils.ts +++ b/frontend/src/concepts/pipelines/topology/parseUtils.ts @@ -1,6 +1,8 @@ import { RunStatus } from '@patternfly/react-topology'; import { + ArtifactStateKF, DAG, + ExecutionStateKF, InputOutputArtifactType, InputOutputDefinition, PipelineComponentsKF, @@ -10,6 +12,7 @@ import { TaskDetailKF, } from '~/concepts/pipelines/kfTypes'; import { VolumeMount } from '~/types'; +import { Artifact, Execution } from '~/third_party/mlmd'; import { PipelineTaskInputOutput, PipelineTaskRunStatus } from './pipelineTaskTypes'; export const composeArtifactType = (data: InputOutputArtifactType): string => @@ -135,14 +138,13 @@ export const lowestProgress = (details: TaskDetailKF[]): PipelineTaskRunStatus[' )[0].state; }; -export const parseRuntimeInfo = ( +export const parseRuntimeInfoFromRunDetails = ( taskId: string, runDetails?: RunDetailsKF, ): PipelineTaskRunStatus | undefined => { if (!runDetails) { return undefined; } - const { task_details: taskDetails } = runDetails; // taskId should always be first, as it's the most direct item, but it may not drive the entire details @@ -159,26 +161,133 @@ export const parseRuntimeInfo = ( startTime: thisTaskDetail[0].start_time, completeTime: thisTaskDetail[0].end_time, state: lowestProgress(thisTaskDetail), - taskId: thisTaskDetail[0].task_id, + taskId: `task.${taskId}`, podName: thisTaskDetail[0].child_tasks?.find((o) => o.pod_name)?.pod_name, }; }; -export const translateStatusForNode = (stateKF?: RuntimeStateKF): RunStatus | undefined => { - switch (stateKF) { +export const parseRuntimeInfoFromExecutions = ( + taskId: string, + executions?: Execution[] | null, +): PipelineTaskRunStatus | undefined => { + if (!executions) { + return undefined; + } + + const execution = executions.find( + (e) => e.getCustomPropertiesMap().get('task_name')?.getStringValue() === taskId, + ); + + if (!execution) { + return undefined; + } + + const lastUpdatedTime = execution.getLastUpdateTimeSinceEpoch(); + let completeTime; + const lastKnownState = execution.getLastKnownState(); + // Logic comes from https://github.com/opendatahub-io/data-science-pipelines/blob/master/frontend/src/components/tabs/RuntimeNodeDetailsV2.tsx#L245-L253 + if ( + lastUpdatedTime && + (lastKnownState === Execution.State.COMPLETE || + lastKnownState === Execution.State.FAILED || + lastKnownState === Execution.State.CACHED || + lastKnownState === Execution.State.CANCELED) + ) { + completeTime = new Date(lastUpdatedTime).toISOString(); + } + return { + startTime: new Date(execution.getCreateTimeSinceEpoch()).toISOString(), + completeTime, + state: getResourceStateText({ + resourceType: ResourceType.EXECUTION, + resource: execution, + }), + taskId: `task.${taskId}`, + podName: execution.getCustomPropertiesMap().get('pod_name')?.getStringValue(), + }; +}; + +export enum ResourceType { + ARTIFACT = 'ARTIFACT', + EXECUTION = 'EXECUTION', +} + +export interface ArtifactProps { + resourceType: ResourceType.ARTIFACT; + resource: Artifact; +} + +export interface ExecutionProps { + resourceType: ResourceType.EXECUTION; + resource: Execution; +} + +export type ResourceInfoProps = ArtifactProps | ExecutionProps; + +// Get text representation of resource state. +// Works for both artifact and execution. +export const getResourceStateText = ( + props: ResourceInfoProps, +): ArtifactStateKF | ExecutionStateKF | undefined => { + if (props.resourceType === ResourceType.ARTIFACT) { + const state = props.resource.getState(); + switch (state) { + case Artifact.State.PENDING: + return ArtifactStateKF.PENDING; + case Artifact.State.LIVE: + return ArtifactStateKF.LIVE; + case Artifact.State.MARKED_FOR_DELETION: + return ArtifactStateKF.MARKED_FOR_DELETION; + case Artifact.State.DELETED: + return ArtifactStateKF.DELETED; + default: + return undefined; + } + } else { + // type == EXECUTION + const state = props.resource.getLastKnownState(); + switch (state) { + case Execution.State.NEW: + return ExecutionStateKF.NEW; + case Execution.State.RUNNING: + return ExecutionStateKF.RUNNING; + case Execution.State.COMPLETE: + return ExecutionStateKF.COMPLETE; + case Execution.State.CANCELED: + return ExecutionStateKF.CANCELED; + case Execution.State.FAILED: + return ExecutionStateKF.FAILED; + case Execution.State.CACHED: + return ExecutionStateKF.CACHED; + default: + return undefined; + } + } +}; + +export const translateStatusForNode = ( + state?: RuntimeStateKF | ExecutionStateKF | ArtifactStateKF, +): RunStatus | undefined => { + switch (state) { + case ExecutionStateKF.CANCELED: case RuntimeStateKF.CANCELED: case RuntimeStateKF.CANCELING: return RunStatus.Cancelled; - case RuntimeStateKF.PAUSED: - return RunStatus.Pending; + case ExecutionStateKF.RUNNING: + return RunStatus.Running; + case ExecutionStateKF.FAILED: case RuntimeStateKF.FAILED: return RunStatus.Failed; + case ArtifactStateKF.PENDING: + case RuntimeStateKF.PAUSED: case RuntimeStateKF.PENDING: return RunStatus.Pending; case RuntimeStateKF.RUNNING: return RunStatus.InProgress; + case ExecutionStateKF.COMPLETE: case RuntimeStateKF.SUCCEEDED: return RunStatus.Succeeded; + case ExecutionStateKF.CACHED: case RuntimeStateKF.SKIPPED: return RunStatus.Skipped; case RuntimeStateKF.RUNTIME_STATE_UNSPECIFIED: diff --git a/frontend/src/concepts/pipelines/topology/pipelineTaskTypes.ts b/frontend/src/concepts/pipelines/topology/pipelineTaskTypes.ts index 1d524e8306..db8f50610e 100644 --- a/frontend/src/concepts/pipelines/topology/pipelineTaskTypes.ts +++ b/frontend/src/concepts/pipelines/topology/pipelineTaskTypes.ts @@ -1,4 +1,9 @@ -import { InputDefinitionParameterType, RuntimeStateKF } from '~/concepts/pipelines/kfTypes'; +import { + ArtifactStateKF, + ExecutionStateKF, + InputDefinitionParameterType, + RuntimeStateKF, +} from '~/concepts/pipelines/kfTypes'; import { createNode } from '~/concepts/topology'; import { VolumeMount } from '~/types'; @@ -31,7 +36,7 @@ export type PipelineTaskRunStatus = { startTime: string; completeTime?: string; podName?: string; - state?: RuntimeStateKF; + state?: RuntimeStateKF | ExecutionStateKF | ArtifactStateKF; taskId?: string; }; diff --git a/frontend/src/concepts/pipelines/topology/usePipelineTaskTopology.ts b/frontend/src/concepts/pipelines/topology/usePipelineTaskTopology.ts index 11b8506480..a44cea3e36 100644 --- a/frontend/src/concepts/pipelines/topology/usePipelineTaskTopology.ts +++ b/frontend/src/concepts/pipelines/topology/usePipelineTaskTopology.ts @@ -1,12 +1,14 @@ -import { PipelineRunKFv2, PipelineSpecVariable, TaskKF } from '~/concepts/pipelines/kfTypes'; +import { PipelineSpecVariable, RunDetailsKF, TaskKF } from '~/concepts/pipelines/kfTypes'; import { createNode } from '~/concepts/topology'; import { PipelineNodeModelExpanded } from '~/concepts/topology/types'; import { createArtifactNode } from '~/concepts/topology/utils'; +import { Execution } from '~/third_party/mlmd'; import { composeArtifactType, parseComponentsForArtifactRelationship, parseInputOutput, - parseRuntimeInfo, + parseRuntimeInfoFromExecutions, + parseRuntimeInfoFromRunDetails, parseTasksForArtifactRelationship, parseVolumeMounts, translateStatusForNode, @@ -15,7 +17,8 @@ import { KubeFlowTaskTopology } from './pipelineTaskTypes'; export const usePipelineTaskTopology = ( spec?: PipelineSpecVariable, - run?: PipelineRunKFv2, + runDetails?: RunDetailsKF, + executions?: Execution[] | null, ): KubeFlowTaskTopology => { if (!spec) { return { taskMap: {}, nodes: [] }; @@ -29,7 +32,6 @@ export const usePipelineTaskTopology = ( dag: { tasks: rootTasks }, }, } = pipelineSpec; - const { run_details: runDetails } = run || {}; const componentArtifactMap = parseComponentsForArtifactRelationship(components); const nodes: PipelineNodeModelExpanded[] = []; @@ -48,7 +50,9 @@ export const usePipelineTaskTopology = ( const executorLabel = component?.executorLabel; const executor = executorLabel ? executors[executorLabel] : undefined; - const status = parseRuntimeInfo(taskId, runDetails); + const status = executions + ? parseRuntimeInfoFromExecutions(taskId, executions) + : parseRuntimeInfoFromRunDetails(taskId, runDetails); const runAfter: string[] = taskValue.dependentTasks ?? [];