Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Make list execution transformer configurable #555

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1442,7 +1442,12 @@ func (m *ExecutionManager) ListExecutions(
logger.Debugf(ctx, "Failed to list executions using input [%+v] with err %v", listExecutionsInput, err)
return nil, err
}
executionList, err := transformers.FromExecutionModels(output.Executions, transformers.ListExecutionTransformerOptions)

listExecutionTransformer := &transformers.ExecutionTransformerOptions{
TrimErrorMessage: m.config.ApplicationConfiguration().GetTopLevelConfig().ListExecutionTransformersConfig.TrimErrorMessages,
MaxErrorMessageLength: m.config.ApplicationConfiguration().GetTopLevelConfig().ListExecutionTransformersConfig.MaxErrorMessageLength,
}
executionList, err := transformers.FromExecutionModels(output.Executions, listExecutionTransformer)
if err != nil {
logger.Errorf(ctx,
"Failed to transform execution models [%+v] with err: %v", output.Executions, err)
Expand Down
7 changes: 6 additions & 1 deletion pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,11 @@ func (m *NodeExecutionManager) transformNodeExecutionModel(ctx context.Context,

func (m *NodeExecutionManager) transformNodeExecutionModelList(ctx context.Context, nodeExecutionModels []models.NodeExecution) ([]*admin.NodeExecution, error) {
nodeExecutions := make([]*admin.NodeExecution, len(nodeExecutionModels))
listExecutionTransformer := &transformers.ExecutionTransformerOptions{
TrimErrorMessage: m.config.ApplicationConfiguration().GetTopLevelConfig().ListExecutionTransformersConfig.TrimErrorMessages,
MaxErrorMessageLength: m.config.ApplicationConfiguration().GetTopLevelConfig().ListExecutionTransformersConfig.MaxErrorMessageLength,
}

for idx, nodeExecutionModel := range nodeExecutionModels {
nodeExecution, err := m.transformNodeExecutionModel(ctx, nodeExecutionModel, &core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Expand All @@ -341,7 +346,7 @@ func (m *NodeExecutionManager) transformNodeExecutionModelList(ctx context.Conte
Name: nodeExecutionModel.Name,
},
NodeId: nodeExecutionModel.NodeID,
}, transformers.ListExecutionTransformerOptions)
}, listExecutionTransformer)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/manager/impl/node_execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,8 @@ func TestTransformNodeExecutionModelList(t *testing.T) {
})

manager := NodeExecutionManager{
db: repository,
db: repository,
config: getMockExecutionsConfigProvider(),
}
nodeExecutions, err := manager.transformNodeExecutionModelList(ctx, []models.NodeExecution{
{
Expand Down
3 changes: 3 additions & 0 deletions pkg/manager/impl/testutils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,8 @@ func GetApplicationConfigWithDefaultDomains() runtimeInterfaces.ApplicationConfi
Scheme: common.Local, SignedURL: runtimeInterfaces.SignedURL{
Enabled: true,
}})

config.GetTopLevelConfig().ListExecutionTransformersConfig.TrimErrorMessages = true
config.GetTopLevelConfig().ListExecutionTransformersConfig.MaxErrorMessageLength = 10240
return &config
}
12 changes: 4 additions & 8 deletions pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
)

const trimmedErrMessageLen = 100

var clusterReassignablePhases = sets.NewString(core.WorkflowExecution_UNDEFINED.String(), core.WorkflowExecution_QUEUED.String())

// CreateExecutionModelInput encapsulates request parameters for calls to CreateExecutionModel.
Expand All @@ -47,13 +45,11 @@ type CreateExecutionModelInput struct {
}

type ExecutionTransformerOptions struct {
TrimErrorMessage bool
TrimErrorMessage bool
MaxErrorMessageLength int
}

var DefaultExecutionTransformerOptions = &ExecutionTransformerOptions{}
var ListExecutionTransformerOptions = &ExecutionTransformerOptions{
TrimErrorMessage: true,
}

// CreateExecutionModel transforms a ExecutionCreateRequest to a Execution model
func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, error) {
Expand Down Expand Up @@ -328,8 +324,8 @@ func FromExecutionModel(executionModel models.Execution, opts *ExecutionTransfor
}
if closure.GetError() != nil && opts != nil && opts.TrimErrorMessage && len(closure.GetError().Message) > 0 {
trimmedErrOutputResult := closure.GetError()
if len(trimmedErrOutputResult.Message) > trimmedErrMessageLen {
trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:trimmedErrMessageLen]
if len(trimmedErrOutputResult.Message) > opts.MaxErrorMessageLength {
trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:opts.MaxErrorMessageLength]
}
closure.OutputResult = &admin.ExecutionClosure_Error{
Error: trimmedErrOutputResult,
Expand Down
4 changes: 3 additions & 1 deletion pkg/repositories/transformers/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ func TestFromExecutionModel_Aborted(t *testing.T) {
}

func TestFromExecutionModel_Error(t *testing.T) {
trimmedErrMessageLen := 10240
extraLongErrMsg := string(make([]byte, 2*trimmedErrMessageLen))
execErr := &core.ExecutionError{
Code: "CODE",
Expand All @@ -590,7 +591,8 @@ func TestFromExecutionModel_Error(t *testing.T) {
Closure: executionClosureBytes,
}
execution, err := FromExecutionModel(executionModel, &ExecutionTransformerOptions{
TrimErrorMessage: true,
TrimErrorMessage: true,
MaxErrorMessageLength: trimmedErrMessageLen,
})
expectedExecErr := execErr
expectedExecErr.Message = string(make([]byte, trimmedErrMessageLen))
Expand Down
4 changes: 2 additions & 2 deletions pkg/repositories/transformers/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ func FromNodeExecutionModel(nodeExecutionModel models.NodeExecution, opts *Execu
}
if closure.GetError() != nil && opts != nil && opts.TrimErrorMessage && len(closure.GetError().Message) > 0 {
trimmedErrOutputResult := closure.GetError()
if len(trimmedErrOutputResult.Message) > trimmedErrMessageLen {
trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:trimmedErrMessageLen]
if len(trimmedErrOutputResult.Message) > opts.MaxErrorMessageLength {
trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:opts.MaxErrorMessageLength]
}
closure.OutputResult = &admin.NodeExecutionClosure_Error{
Error: trimmedErrOutputResult,
Expand Down
6 changes: 5 additions & 1 deletion pkg/repositories/transformers/node_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,7 @@ func TestFromNodeExecutionModel(t *testing.T) {
}

func TestFromNodeExecutionModel_Error(t *testing.T) {
trimmedErrMessageLen := 10240
extraLongErrMsg := string(make([]byte, 2*trimmedErrMessageLen))
execErr := &core.ExecutionError{
Code: "CODE",
Expand All @@ -551,7 +552,10 @@ func TestFromNodeExecutionModel_Error(t *testing.T) {
NodeExecutionMetadata: nodeExecutionMetadataBytes,
InputURI: "input uri",
Duration: duration,
}, &ExecutionTransformerOptions{TrimErrorMessage: true})
}, &ExecutionTransformerOptions{
TrimErrorMessage: true,
MaxErrorMessageLength: trimmedErrMessageLen,
})
assert.Nil(t, err)

expectedExecErr := execErr
Expand Down
4 changes: 2 additions & 2 deletions pkg/repositories/transformers/task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,8 @@ func FromTaskExecutionModel(taskExecutionModel models.TaskExecution, opts *Execu
}
if closure.GetError() != nil && opts != nil && opts.TrimErrorMessage && len(closure.GetError().Message) > 0 {
trimmedErrOutputResult := closure.GetError()
if len(trimmedErrOutputResult.Message) > trimmedErrMessageLen {
trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:trimmedErrMessageLen]
if len(trimmedErrOutputResult.Message) > opts.MaxErrorMessageLength {
trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:opts.MaxErrorMessageLength]
}
closure.OutputResult = &admin.TaskExecutionClosure_Error{
Error: trimmedErrOutputResult,
Expand Down
7 changes: 5 additions & 2 deletions pkg/repositories/transformers/task_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ func TestFromTaskExecutionModel(t *testing.T) {
}

func TestFromTaskExecutionModel_Error(t *testing.T) {
trimmedErrMessageLen := 10240
extraLongErrMsg := string(make([]byte, 2*trimmedErrMessageLen))
execErr := &core.ExecutionError{
Code: "CODE",
Expand Down Expand Up @@ -633,7 +634,8 @@ func TestFromTaskExecutionModel_Error(t *testing.T) {
Closure: closureBytes,
}
taskExecution, err := FromTaskExecutionModel(taskExecutionModel, &ExecutionTransformerOptions{
TrimErrorMessage: true,
TrimErrorMessage: true,
MaxErrorMessageLength: trimmedErrMessageLen,
})

expectedExecErr := execErr
Expand All @@ -653,7 +655,8 @@ func TestFromTaskExecutionModel_Error(t *testing.T) {
})
taskExecutionModel.Closure = closureBytes
taskExecution, err = FromTaskExecutionModel(taskExecutionModel, &ExecutionTransformerOptions{
TrimErrorMessage: true,
TrimErrorMessage: true,
MaxErrorMessageLength: trimmedErrMessageLen,
})
expectedExecErr = execErr
expectedExecErr.Message = string(make([]byte, 10))
Expand Down
5 changes: 5 additions & 0 deletions pkg/runtime/application_config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ var flyteAdminConfig = config.MustRegisterSection(flyteAdmin, &interfaces.Applic
MaxParallelism: 25,
K8SServiceAccount: "",
UseOffloadedWorkflowClosure: false,

ListExecutionTransformersConfig: interfaces.ExecutionTransformersConfig{
TrimErrorMessages: true,
MaxErrorMessageLength: 10240,
},
})

var schedulerConfig = config.MustRegisterSection(scheduler, &interfaces.SchedulerConfig{
Expand Down
10 changes: 10 additions & 0 deletions pkg/runtime/interfaces/application_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ type PostgresConfig struct {
Debug bool `json:"debug" pflag:" Whether or not to start the database connection with debug mode enabled."`
}

type ExecutionTransformersConfig struct {
// TrimErrorMessages indicates whether error messages returned by the list workflow execution API should be trimmed.
TrimErrorMessages bool `json:"trimErrorMessages"`
// The maximum length of an error message returned by the list workflow execution API.
MaxErrorMessageLength int `json:"maxErrorMessageLength"`
}

// ApplicationConfig is the base configuration to start admin
type ApplicationConfig struct {
// The RoleName key inserted as an annotation (https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/)
Expand Down Expand Up @@ -94,6 +101,9 @@ type ApplicationConfig struct {

// Enabling will use Storage (s3/gcs/etc) to offload static parts of CRDs.
UseOffloadedWorkflowClosure bool `json:"useOffloadedWorkflowClosure"`

// Configures the execution transformers
ListExecutionTransformersConfig ExecutionTransformersConfig `json:"listExecutionTransformersConfig"`
}

func (a *ApplicationConfig) GetRoleNameKey() string {
Expand Down