Skip to content

Commit

Permalink
Read pipeline run node details from mlmd context
Browse files Browse the repository at this point in the history
  • Loading branch information
DaoDaoNoCode committed Apr 9, 2024
1 parent d2117a5 commit 0c0974f
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import React from 'react';
import { usePipelinesAPI } from '~/concepts/pipelines/context';
import { Context, Execution, GetExecutionsByContextRequest } from '~/third_party/mlmd';
import { FAST_POLL_INTERVAL } from '~/utilities/const';
import useFetchState, {
FetchState,
FetchStateCallbackPromise,
NotReadyError,
} from '~/utilities/useFetchState';

export const useExecutionsFromContext = (
context: Context | null,
pipelineFinished?: boolean,
): FetchState<Execution[] | null> => {
const { metadataStoreServiceClient } = usePipelinesAPI();

const call = React.useCallback<FetchStateCallbackPromise<Execution[] | null>>(async () => {
if (!context) {
return Promise.reject(new NotReadyError('No context'));
}

const request = new GetExecutionsByContextRequest();
request.setContextId(context.getId());
const res = await metadataStoreServiceClient.getExecutionsByContext(request);
return res.getExecutionsList();
}, [metadataStoreServiceClient, context]);

return useFetchState(call, null, {
refreshRate: !pipelineFinished ? FAST_POLL_INTERVAL : undefined,
});
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import React from 'react';
import { Context } from '~/third_party/mlmd';
import { usePipelinesAPI } from '~/concepts/pipelines/context';
import useFetchState, {
FetchState,
FetchStateCallbackPromise,
NotReadyError,
} from '~/utilities/useFetchState';
import { GetContextByTypeAndNameRequest } from '~/third_party/mlmd/generated/ml_metadata/proto/metadata_store_service_pb';
import { FAST_POLL_INTERVAL } from '~/utilities/const';

const KFP_V2_RUN_CONTEXT_TYPE = 'system.PipelineRun';

const useMlmdContext = (
name?: string,
type?: string,
pipelineFinished?: boolean,
): FetchState<Context | null> => {
const { metadataStoreServiceClient } = usePipelinesAPI();

const call = React.useCallback<FetchStateCallbackPromise<Context | null>>(async () => {
if (!type) {
return Promise.reject(new NotReadyError('No context type'));
}
if (!name) {
return Promise.reject(new NotReadyError('No context name'));
}

const request = new GetContextByTypeAndNameRequest();
request.setTypeName(type);
request.setContextName(name);
const res = await metadataStoreServiceClient.getContextByTypeAndName(request);
const context = res.getContext();
if (!context) {
return Promise.reject(new Error('Cannot find specified context'));
}
return context;
}, [metadataStoreServiceClient, type, name]);

return useFetchState(call, null, {
refreshRate: !pipelineFinished ? FAST_POLL_INTERVAL : undefined,
});
};

export const usePipelineRunMlmdContext = (
runID?: string,
pipelineFinished?: boolean,
): FetchState<Context | null> => useMlmdContext(runID, KFP_V2_RUN_CONTEXT_TYPE, pipelineFinished);
16 changes: 10 additions & 6 deletions frontend/src/concepts/pipelines/apiHooks/usePipelineRunById.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ import { PipelineRunKFv2, RuntimeStateKF, runtimeStateLabels } from '~/concepts/
import { FAST_POLL_INTERVAL } from '~/utilities/const';
import { computeRunStatus } from '~/concepts/pipelines/content/utils';

export const isPipelineRunComplete = (run?: PipelineRunKFv2 | null): boolean => {
const { label } = computeRunStatus(run);
return [
runtimeStateLabels[RuntimeStateKF.SUCCEEDED],
runtimeStateLabels[RuntimeStateKF.FAILED],
runtimeStateLabels[RuntimeStateKF.CANCELED],
].includes(label);
};

const usePipelineRunById = (
pipelineRunId?: string,
refreshForDetails?: boolean,
Expand All @@ -32,12 +41,7 @@ const usePipelineRunById = (
});

const [run] = runData;
const { label } = computeRunStatus(run);
const isComplete = [
runtimeStateLabels[RuntimeStateKF.SUCCEEDED],
runtimeStateLabels[RuntimeStateKF.FAILED],
runtimeStateLabels[RuntimeStateKF.CANCELED],
].includes(label);
const isComplete = isPipelineRunComplete(run);

React.useEffect(() => {
if (isComplete) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,13 @@ const PipelineRunDetails: PipelineCoreDetailsPageComponent = ({ breadcrumbPath,
RunDetailsTabs.DETAILS,
);
const [selectedId, setSelectedId] = React.useState<string | null>(null);
const { taskMap, nodes } = usePipelineTaskTopology(
version?.pipeline_spec,
runResource ?? undefined,
);
const {
taskMap,
nodes,
loaded: topologyLoaded,
} = usePipelineTaskTopology(version?.pipeline_spec, runResource ?? undefined);

const loaded = versionLoaded && runLoaded;
const loaded = versionLoaded && runLoaded && topologyLoaded;
const error = versionError || runError;
if (!loaded && !error) {
return (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,11 @@ import DownloadDropdown from '~/concepts/pipelines/content/pipelinesDetails/pipe
import { PodStepStateType } from '~/types';
import useDebounceCallback from '~/utilities/useDebounceCallback';
import { PipelineTask } from '~/concepts/pipelines/topology';
import { RuntimeStateKF } from '~/concepts/pipelines/kfTypes';

// TODO: If this gets large enough we should look to make this its own component file
const LogsTab: React.FC<{ task: PipelineTask }> = ({ task }) => {
const podName = task.status?.podName;
const isFailedPod = task.status?.state === RuntimeStateKF.FAILED;
const isFailedPod = task.status?.state === 'Failed';

if (!podName) {
return <>No content</>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
import { RunStatus } from '@patternfly/react-topology';
import {
parseInputOutput,
parseRuntimeInfo,
translateStatusForNode,
lowestProgress,
parseComponentsForArtifactRelationship,
parseTasksForArtifactRelationship,
parseRuntimeInfoFromRunDetails,
} from '~/concepts/pipelines/topology/parseUtils';
import {
ArtifactType,
Expand Down Expand Up @@ -58,7 +58,7 @@ describe('pipeline topology parseUtils', () => {
const testTaskId = 'test-task-id';

it('returns undefined when runDetails are not provided', () => {
const result = parseRuntimeInfo(testTaskId);
const result = parseRuntimeInfoFromRunDetails(testTaskId);
expect(result).toBeUndefined();
});

Expand All @@ -69,7 +69,7 @@ describe('pipeline topology parseUtils', () => {
task_details: [],
};

const result = parseRuntimeInfo(testTaskId, testRunDetails);
const result = parseRuntimeInfoFromRunDetails(testTaskId, testRunDetails);
expect(result).toBeUndefined();
});

Expand All @@ -89,7 +89,7 @@ describe('pipeline topology parseUtils', () => {
],
};

const result = parseRuntimeInfo(testTaskId, testRunDetails);
const result = parseRuntimeInfoFromRunDetails(testTaskId, testRunDetails);
expect(result).toBeUndefined();
});

Expand All @@ -116,13 +116,13 @@ describe('pipeline topology parseUtils', () => {
],
};

const result = parseRuntimeInfo(testTaskId, testRunDetails);
const result = parseRuntimeInfoFromRunDetails(testTaskId, testRunDetails);
expect(result).toEqual({
completeTime: '2024-01-03T00:00:00Z',
podName: 'Some pod name',
startTime: '2024-01-02T00:00:00Z',
state: 'RUNNING',
taskId: 'test-task-id',
taskId: 'task.test-task-id',
});
});

Expand All @@ -143,13 +143,13 @@ describe('pipeline topology parseUtils', () => {
],
};

const result = parseRuntimeInfo(testTaskId, testRunDetails);
const result = parseRuntimeInfoFromRunDetails(testTaskId, testRunDetails);
expect(result).toEqual({
completeTime: '2024-01-03T00:00:00Z',
podName: undefined,
startTime: '2024-01-02T00:00:00Z',
state: 'RUNNING',
taskId: 'test-task-id',
taskId: 'task.test-task-id',
});
});
});
Expand Down
121 changes: 114 additions & 7 deletions frontend/src/concepts/pipelines/topology/parseUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
TaskDetailKF,
} from '~/concepts/pipelines/kfTypes';
import { VolumeMount } from '~/types';
import { Artifact, Execution } from '~/third_party/mlmd';
import { PipelineTaskInputOutput, PipelineTaskRunStatus } from './pipelineTaskTypes';

export const composeArtifactType = (data: InputOutputArtifactType): string =>
Expand Down Expand Up @@ -135,14 +136,13 @@ export const lowestProgress = (details: TaskDetailKF[]): PipelineTaskRunStatus['
)[0].state;
};

export const parseRuntimeInfo = (
export const parseRuntimeInfoFromRunDetails = (
taskId: string,
runDetails?: RunDetailsKF,
): PipelineTaskRunStatus | undefined => {
if (!runDetails) {
return undefined;
}

const { task_details: taskDetails } = runDetails;

// taskId should always be first, as it's the most direct item, but it may not drive the entire details
Expand All @@ -159,24 +159,131 @@ export const parseRuntimeInfo = (
startTime: thisTaskDetail[0].start_time,
completeTime: thisTaskDetail[0].end_time,
state: lowestProgress(thisTaskDetail),
taskId: thisTaskDetail[0].task_id,
taskId: `task.${taskId}`,
podName: thisTaskDetail[0].child_tasks?.find((o) => o.pod_name)?.pod_name,
};
};

export const translateStatusForNode = (stateKF?: RuntimeStateKF): RunStatus | undefined => {
switch (stateKF) {
export const parseRuntimeInfoFromExecutions = (
taskId: string,
executions: Execution[] | null,
): PipelineTaskRunStatus | undefined => {
if (!executions) {
return undefined;
}

const execution = executions.find(
(e) => e.getCustomPropertiesMap().get('task_name')?.getStringValue() === taskId,
);

if (!execution) {
return undefined;
}

const lastUpdatedTime = execution.getLastUpdateTimeSinceEpoch();
let completeTime;
if (
lastUpdatedTime &&
(execution.getLastKnownState() === Execution.State.COMPLETE ||
execution.getLastKnownState() === Execution.State.FAILED ||
execution.getLastKnownState() === Execution.State.CACHED ||
execution.getLastKnownState() === Execution.State.CANCELED)
) {
completeTime = new Date(lastUpdatedTime).toISOString();
}
return {
startTime: new Date(execution.getCreateTimeSinceEpoch()).toISOString(),
completeTime,
state: getResourceStateText({
resourceType: ResourceType.EXECUTION,
resource: execution,
typeName: 'Execution',
}),
taskId: `task.${taskId}`,
podName: execution.getCustomPropertiesMap().get('pod_name')?.getStringValue(),
};
};

export enum ResourceType {
ARTIFACT = 'ARTIFACT',
EXECUTION = 'EXECUTION',
}

export interface ArtifactProps {
resourceType: ResourceType.ARTIFACT;
resource: Artifact;
typeName: string;
}

export interface ExecutionProps {
resourceType: ResourceType.EXECUTION;
resource: Execution;
typeName: string;
}

export type ResourceInfoProps = ArtifactProps | ExecutionProps;

// Get text representation of resource state.
// Works for both artifact and execution.
export const getResourceStateText = (props: ResourceInfoProps): string | undefined => {
if (props.resourceType === ResourceType.ARTIFACT) {
const state = props.resource.getState();
switch (state) {
case Artifact.State.UNKNOWN:
return undefined; // when state is not set, it defaults to UNKNOWN
case Artifact.State.PENDING:
return 'Pending';
case Artifact.State.LIVE:
return 'Live';
case Artifact.State.MARKED_FOR_DELETION:
return 'Marked for deletion';
case Artifact.State.DELETED:
return 'Deleted';
default:
return undefined;
}
} else {
// type == EXECUTION
const state = props.resource.getLastKnownState();
switch (state) {
case Execution.State.UNKNOWN:
return undefined;
case Execution.State.NEW:
return 'New';
case Execution.State.RUNNING:
return 'Running';
case Execution.State.COMPLETE:
return 'Complete';
case Execution.State.CANCELED:
return 'Canceled';
case Execution.State.FAILED:
return 'Failed';
case Execution.State.CACHED:
return 'Cached';
default:
return undefined;
}
}
};

export const translateStatusForNode = (state?: string): RunStatus | undefined => {
switch (state) {
case 'Canceled':
case RuntimeStateKF.CANCELED:
case RuntimeStateKF.CANCELING:
return RunStatus.Cancelled;
case RuntimeStateKF.PAUSED:
return RunStatus.Pending;
case 'Running':
return RunStatus.Running;
case 'Failed':
case RuntimeStateKF.FAILED:
return RunStatus.Failed;
case 'Pending':
case RuntimeStateKF.PAUSED:
case RuntimeStateKF.PENDING:
return RunStatus.Pending;
case RuntimeStateKF.RUNNING:
return RunStatus.InProgress;
case 'Complete':
case RuntimeStateKF.SUCCEEDED:
return RunStatus.Succeeded;
case RuntimeStateKF.SKIPPED:
Expand Down
5 changes: 3 additions & 2 deletions frontend/src/concepts/pipelines/topology/pipelineTaskTypes.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { InputDefinitionParameterType, RuntimeStateKF } from '~/concepts/pipelines/kfTypes';
import { InputDefinitionParameterType } from '~/concepts/pipelines/kfTypes';
import { createNode } from '~/concepts/topology';
import { VolumeMount } from '~/types';

Expand Down Expand Up @@ -31,7 +31,7 @@ export type PipelineTaskRunStatus = {
startTime: string;
completeTime?: string;
podName?: string;
state?: RuntimeStateKF;
state?: string;
taskId?: string;
};

Expand All @@ -57,4 +57,5 @@ export type KubeFlowTaskTopology = {
* Nodes to render in topology.
*/
nodes: ReturnType<typeof createNode>[];
loaded: boolean;
};
Loading

0 comments on commit 0c0974f

Please sign in to comment.