Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
FlyteAdmin will always add base_exec_id unless it is already added
Browse files Browse the repository at this point in the history
Reasons:
1. Make it possible to retrieve all executions launched by the same base execution id (even recursively)
2. users could group executions using their own base exec id
3. flytectl get executions or remote list executions can use this label as a filter to retrieve high level progress of all subworkflows

Signed-off-by: Ketan Umare <[email protected]>
  • Loading branch information
kumare3 committed Sep 22, 2023
1 parent dc8cc9d commit 4ae657a
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 6 deletions.
21 changes: 21 additions & 0 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,10 @@ func (m *ExecutionManager) launchSingleTaskExecution(
if err != nil {
return nil, nil, err
}
labels, err = m.addBaseExecutionLabel(ctx, workflowExecutionID.Name, labels)
if err != nil {
return nil, nil, err
}

Check warning on line 559 in pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/manager/impl/execution_manager.go#L558-L559

Added lines #L558 - L559 were not covered by tests

var annotations map[string]string
if executionConfig.Annotations != nil {
Expand Down Expand Up @@ -810,6 +814,10 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
if err != nil {
return nil, nil, err
}
labels, err = m.addBaseExecutionLabel(ctx, workflowExecutionID.Name, labels)
if err != nil {
return nil, nil, err
}

Check warning on line 820 in pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/manager/impl/execution_manager.go#L819-L820

Added lines #L819 - L820 were not covered by tests
annotations, err := resolveStringMap(executionConfig.GetAnnotations(), launchPlan.Spec.Annotations, "annotations", m.config.RegistrationValidationConfiguration().GetMaxAnnotationEntries())
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -1687,6 +1695,19 @@ func (m *ExecutionManager) addProjectLabels(ctx context.Context, projectName str
return initialLabels, nil
}

// Adds base execution label to execution labels. Base execution label is ignored if a corresponding label is set on the execution already.
// An execution label will exist if Flytepropeller launches a child workflow execution, as it will copy the parent execution's labels.
// This label can later be used to retrieve all executions that were launched from a given execution, no matter how deep in the recursion tree.
func (m *ExecutionManager) addBaseExecutionLabel(_ context.Context, execID string, initialLabels map[string]string) (map[string]string, error) {

Check failure on line 1701 in pkg/manager/impl/execution_manager.go

View workflow job for this annotation

GitHub Actions / Lint / Run Lint

(*ExecutionManager).addBaseExecutionLabel - result 1 (error) is always nil (unparam)
if initialLabels == nil {
initialLabels = make(map[string]string)
}

Check warning on line 1704 in pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/manager/impl/execution_manager.go#L1703-L1704

Added lines #L1703 - L1704 were not covered by tests
if _, ok := initialLabels[shared.BaseExecutionIDLabelKey]; !ok {
initialLabels[shared.BaseExecutionIDLabelKey] = execID
}
return initialLabels, nil
}

func addStateFilter(filters []common.InlineFilter) ([]common.InlineFilter, error) {
var stateFilterExists bool
for _, inlineFilter := range filters {
Expand Down
10 changes: 6 additions & 4 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,8 +886,9 @@ func TestCreateExecutionDynamicLabelsAndAnnotations(t *testing.T) {
mockExecutor := workflowengineMocks.WorkflowExecutor{}
mockExecutor.OnExecuteMatch(mock.Anything, mock.MatchedBy(func(executionData workflowengineInterfaces.ExecutionData) bool {
assert.EqualValues(t, map[string]string{
"dynamiclabel1": "dynamic1",
"dynamiclabel2": "dynamic2",
"dynamiclabel1": "dynamic1",
"dynamiclabel2": "dynamic2",
shared.BaseExecutionIDLabelKey: "name",
}, executionData.ExecutionParameters.Labels)
assert.EqualValues(t, map[string]string{
"dynamicannotation3": "dynamic3",
Expand Down Expand Up @@ -3834,8 +3835,9 @@ func TestCreateExecution_LegacyClient(t *testing.T) {
mockExecutor := workflowengineMocks.WorkflowExecutor{}
mockExecutor.OnExecuteMatch(mock.Anything, mock.MatchedBy(func(execData workflowengineInterfaces.ExecutionData) bool {
assert.EqualValues(t, map[string]string{
"label1": "1",
"label2": "2",
"label1": "1",
"label2": "2",
shared.BaseExecutionIDLabelKey: "name",
}, execData.ExecutionParameters.Labels)
assert.EqualValues(t, map[string]string{
"annotation3": "3",
Expand Down
7 changes: 5 additions & 2 deletions pkg/manager/impl/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const (
Attributes = "attributes"
MatchingAttributes = "matching_attributes"
// Parent of a node execution in the node executions table
ParentID = "parent_id"
WorkflowClosure = "workflow_closure"
ParentID = "parent_id"
WorkflowClosure = "workflow_closure"
BaseExecutionIDLabelKey = "base_exec_id"
// BaseExecutionIDLabelKey is the label key for the base execution ID and is globally known. The UI, CLI and potentially
// other components use this label key to identify the base execution ID, so DO NOT CHANGE THIS VALUE.
)

0 comments on commit 4ae657a

Please sign in to comment.