Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Resource manager overhaul #552

Draft
wants to merge 19 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions flyteadmin_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ externalEvents:
eventTypes: all
Logger:
show-source: true
level: 6
level: 5
storage:
type: stow
stow:
Expand All @@ -129,7 +129,7 @@ storage:
secret_key: miniostorage
signedUrl:
stowConfigOverride:
endpoint: http://localhost:30084
endpoint: http://localhost:30002
cache:
max_size_mbs: 10
target_gc_percent: 100
Expand Down Expand Up @@ -162,16 +162,23 @@ queues:
- critical
- tags:
- default
task_resources:
defaults:
cpu: 100m
memory: 200Mi
storage: 100M
limits:
cpu: 500m
gpu: 1
memory: 300Mi
storage: 10G
#task_resources:
# defaults:
# cpu: 100m
# memory: 200Mi
# ephemeralStorage: 100M
# limits:
# cpu: 500m
# memory: 300Mi
# ephemeralStorage: 10G
#task_resources:
# defaults:
# cpu: 100m
#task_resources:
# defaults:
# ephemeralStorage: 500M
# limits:
# ephemeralStorage: 10G
task_type_whitelist:
sparkonk8s:
- project: my_queue_1
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/cloudevents/sdk-go/v2 v2.8.0
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/flyteorg/flyteidl v1.3.14
github.com/flyteorg/flyteidl v1.5.0
github.com/flyteorg/flyteplugins v1.0.40
github.com/flyteorg/flytepropeller v1.1.70
github.com/flyteorg/flytestdlib v1.0.15
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.3.14 h1:o5M0g/r6pXTPu5PEurbYxbQmuOu3hqqsaI2M6uvK0N8=
github.com/flyteorg/flyteidl v1.3.14/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flyteidl v1.5.0 h1:vdaA5Cg9eqi5NMuASSod/AE7RXlHvzdWjSL9abDyd/M=
github.com/flyteorg/flyteidl v1.5.0/go.mod h1:ckLjB51moX4L0oQml+WTCrPK50zrJf6IZJ6LPC0RB4I=
github.com/flyteorg/flyteplugins v1.0.40 h1:RTsYingqmqr13qBbi4CB2ArXDHNHUOkAF+HTLJQiQ/s=
github.com/flyteorg/flyteplugins v1.0.40/go.mod h1:qyUPqVspLcLGJpKxVwHDWf+kBpOGuItOxCaF6zAmDio=
github.com/flyteorg/flytepropeller v1.1.70 h1:/d1qqz13rdVADM85ST70eerAdBstJJz9UUB/mNSZi0w=
Expand Down
2 changes: 1 addition & 1 deletion pkg/clusterresource/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ func NewClusterResourceControllerFromConfig(ctx context.Context, scope promutils
repo := repositories.NewGormRepo(
db, errors2.NewPostgresErrorTransformer(dbScope.NewSubScope("errors")), dbScope)

adminDataProvider = impl2.NewDatabaseAdminDataProvider(repo, configuration, resources.NewResourceManager(repo, configuration.ApplicationConfiguration()))
adminDataProvider = impl2.NewDatabaseAdminDataProvider(repo, configuration, resources.NewResourceManager(repo, configuration))
}

return NewClusterResourceController(adminDataProvider, listTargetsProvider, scope), nil
Expand Down
9 changes: 8 additions & 1 deletion pkg/common/testutils/common.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package testutils

import "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
import (
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"k8s.io/apimachinery/pkg/api/resource"
)

// Convenience method to wrap verbose boilerplate for initializing a PluginOverrides MatchingAttributes.
func GetPluginOverridesAttributes(vals map[string][]string) *admin.MatchingAttributes {
Expand All @@ -19,3 +22,7 @@ func GetPluginOverridesAttributes(vals map[string][]string) *admin.MatchingAttri
},
}
}

func GetPtr(quantity resource.Quantity) *resource.Quantity {
return &quantity
}
2 changes: 1 addition & 1 deletion pkg/executioncluster/impl/random_cluster_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func NewRandomClusterSelector(listTargets interfaces.ListTargetsInterface, confi
}
return &RandomClusterSelector{
labelWeightedRandomMap: labelWeightedRandomMap,
resourceManager: resources.NewResourceManager(db, config.ApplicationConfiguration()),
resourceManager: resources.NewResourceManager(db, config),
equalWeightedAllClusters: equalWeightedAllClusters,
ListTargetsInterface: listTargets,
defaultExecutionLabel: defaultExecutionLabel,
Expand Down
145 changes: 44 additions & 101 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (

"github.com/flyteorg/flyteadmin/plugins"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s"

"github.com/flyteorg/flyteadmin/auth"

"github.com/flyteorg/flyteadmin/pkg/manager/impl/resources"
Expand Down Expand Up @@ -184,12 +182,49 @@ func (m *ExecutionManager) addPluginOverrides(ctx context.Context, executionID *
return nil, nil
}

// TODO: Delete this code usage after the flyte v0.17.0 release
// defaults should be a coalesce of task defaults, and platform defaults.
// task limits should be the limits from the task, coalesced with the defaults from step one
// then both should be limited by any platform limits.
// anything 0 or empty is not set.
// if both requests and limits end up empty, return nil. if one is empty, return nil for it
func (m *ExecutionManager) getResources(ctx context.Context, taskResources *core.Resources, platformResources workflowengineInterfaces.TaskResources) *core.Resources {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic seems sound to me, but would love to see this simplified. For example, can we convert to a similar type and use a function like:

func UnionResources(resources...) (Resources) {
    ...
}


// requests: coalesce(task request, platform default)
// limits: coalesce(task limits, task requests)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO this only makes sense for non-compressible resources like memory and storage.
Compressible resources like cpu should not be "forced" to inherit limits from requests if not explicitly specified. At the very least the limit should be explicitly removable/nullable.

// check that defaults and limits are both below platform limit
var requestSet runtimeInterfaces.TaskResourceSet
var limitSet runtimeInterfaces.TaskResourceSet
if taskResources != nil && taskResources.GetRequests() != nil {
requestSet = util.GetTaskResourcesAndCoalesce(ctx, taskResources.GetRequests(), platformResources.Defaults)
} else {
requestSet = platformResources.Defaults
}
if taskResources != nil && taskResources.GetLimits() != nil {
limitSet = util.GetTaskResourcesAndCoalesce(ctx, taskResources.GetLimits(), requestSet)
} else {
limitSet = requestSet
}
adjustedRequestSet := util.ConstrainTaskResourceSet(ctx, requestSet, platformResources.Limits)
adjustedLimitSet := util.ConstrainTaskResourceSet(ctx, limitSet, platformResources.Limits)

// convert the sets back to core.Resources
requestEntries := util.ConvertTaskResourceSetToCoreResources(adjustedRequestSet)
limitEntries := util.ConvertTaskResourceSetToCoreResources(adjustedLimitSet)
if len(requestEntries) == 0 && len(limitEntries) == 0 {
return nil
}
res := core.Resources{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the code this replaces (current lines 210-213) we init the Request and Limits to empty lists (ie. []*core.Resources_ResourceEntry{}). I don't believe there is some chance for a nil pointer exception downstream if we do not do this right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we solve this with more testing. looked through the code and i don't think it's used. Also ran locally with no admin config, no matchable resources, and no task resources, and things seem to be running almost correctly. The pod is still being created with

    resources:
      limits:
        cpu: "0"
        memory: "0"
      requests:
        cpu: "0"
        memory: "0"

but the fly crd has node resources

resources: {}

and has execution config task resources:

executionConfig:
  Interruptible: null
  MaxParallelism: 25
  OverwriteCache: false
  RecoveryExecution: {}
  TaskPluginImpls: {}
  TaskResources:
    Limits:
      CPU: "0"
      EphemeralStorage: "0"
      GPU: "0"
      Memory: "0"
      Storage: "0"
    Requests:
      CPU: "0"
      EphemeralStorage: "0"
      GPU: "0"
      Memory: "0"
      Storage: "0"

which is correct because these are non-nullable. and resources doesn't show up at all in the task template part of the task definition.

if downstream code can't handle a nil for some reason it should handle it there i feel.

if len(requestEntries) > 0 {
res.Requests = requestEntries
}
if len(limitEntries) > 0 {
res.Limits = limitEntries
}

return &res
}

// Assumes input contains a compiled task with a valid container resource execConfig.
//
// Note: The system will assign a system-default value for request but for limit it will deduce it from the request
// itself => Limit := Min([Some-Multiplier X Request], System-Max). For now we are using a multiplier of 1. In
// general we recommend the users to set limits close to requests for more predictability in the system.
func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *core.CompiledTask,
platformTaskResources workflowengineInterfaces.TaskResources) {

Expand All @@ -204,99 +239,7 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co
return
}

if task.Template.GetContainer().Resources == nil {
// In case of no resources on the container, create empty requests and limits
// so the container will still have resources configure properly
task.Template.GetContainer().Resources = &core.Resources{
Requests: []*core.Resources_ResourceEntry{},
Limits: []*core.Resources_ResourceEntry{},
}
}

var finalizedResourceRequests = make([]*core.Resources_ResourceEntry, 0)
var finalizedResourceLimits = make([]*core.Resources_ResourceEntry, 0)

// The IDL representation for container-type tasks represents resources as a list with string quantities.
// In order to easily reason about them we convert them to a set where we can O(1) fetch specific resources (e.g. CPU)
// and represent them as comparable quantities rather than strings.
taskResourceRequirements := util.GetCompleteTaskResourceRequirements(ctx, task.Template.Id, task)

cpu := flytek8s.AdjustOrDefaultResource(taskResourceRequirements.Defaults.CPU, taskResourceRequirements.Limits.CPU,
platformTaskResources.Defaults.CPU, platformTaskResources.Limits.CPU)
finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{
Name: core.Resources_CPU,
Value: cpu.Request.String(),
})
finalizedResourceLimits = append(finalizedResourceLimits, &core.Resources_ResourceEntry{
Name: core.Resources_CPU,
Value: cpu.Limit.String(),
})

memory := flytek8s.AdjustOrDefaultResource(taskResourceRequirements.Defaults.Memory, taskResourceRequirements.Limits.Memory,
platformTaskResources.Defaults.Memory, platformTaskResources.Limits.Memory)
finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{
Name: core.Resources_MEMORY,
Value: memory.Request.String(),
})
finalizedResourceLimits = append(finalizedResourceLimits, &core.Resources_ResourceEntry{
Name: core.Resources_MEMORY,
Value: memory.Limit.String(),
})

// Only assign ephemeral storage when it is either requested or limited in the task definition, or a platform
// default exists.
if !taskResourceRequirements.Defaults.EphemeralStorage.IsZero() ||
!taskResourceRequirements.Limits.EphemeralStorage.IsZero() ||
!platformTaskResources.Defaults.EphemeralStorage.IsZero() {
Comment on lines -248 to -250
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to check for cases where a user manually sets a resource limit to 0 to override the default limits? For example - flyteadmin configurations default limit to 5G Memory, if a user wants a task with unbounded Memory can they set limits(mem="0') to override this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah not sure. So as of this PR, "0", is not treated specially until the very end. It's like any other number. It's at prepare_execution.go::addExecutionOverrides that we check or zero and then don't apply them. Added a unit test to see this directly. Easy enough add logic to execution_manager.go to not add resources if they're 0. What do you think?

ephemeralStorage := flytek8s.AdjustOrDefaultResource(taskResourceRequirements.Defaults.EphemeralStorage, taskResourceRequirements.Limits.EphemeralStorage,
platformTaskResources.Defaults.EphemeralStorage, platformTaskResources.Limits.EphemeralStorage)
finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: ephemeralStorage.Request.String(),
})
finalizedResourceLimits = append(finalizedResourceLimits, &core.Resources_ResourceEntry{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: ephemeralStorage.Limit.String(),
})
}

// Only assign storage when it is either requested or limited in the task definition, or a platform
// default exists.
if !taskResourceRequirements.Defaults.Storage.IsZero() ||
!taskResourceRequirements.Limits.Storage.IsZero() ||
!platformTaskResources.Defaults.Storage.IsZero() {
storageResource := flytek8s.AdjustOrDefaultResource(taskResourceRequirements.Defaults.Storage, taskResourceRequirements.Limits.Storage,
platformTaskResources.Defaults.Storage, platformTaskResources.Limits.Storage)
finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{
Name: core.Resources_STORAGE,
Value: storageResource.Request.String(),
})
finalizedResourceLimits = append(finalizedResourceLimits, &core.Resources_ResourceEntry{
Name: core.Resources_STORAGE,
Value: storageResource.Limit.String(),
})
}

// Only assign gpu when it is either requested or limited in the task definition, or a platform default exists.
if !taskResourceRequirements.Defaults.GPU.IsZero() ||
!taskResourceRequirements.Limits.GPU.IsZero() ||
!platformTaskResources.Defaults.GPU.IsZero() {
gpu := flytek8s.AdjustOrDefaultResource(taskResourceRequirements.Defaults.GPU, taskResourceRequirements.Limits.GPU,
platformTaskResources.Defaults.GPU, platformTaskResources.Limits.GPU)
finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{
Name: core.Resources_GPU,
Value: gpu.Request.String(),
})
finalizedResourceLimits = append(finalizedResourceLimits, &core.Resources_ResourceEntry{
Name: core.Resources_GPU,
Value: gpu.Limit.String(),
})
}

task.Template.GetContainer().Resources = &core.Resources{
Requests: finalizedResourceRequests,
Limits: finalizedResourceLimits,
}
task.Template.GetContainer().Resources = m.getResources(ctx, task.Template.GetContainer().Resources, platformTaskResources)
}

// Fetches inherited execution metadata including the parent node execution db model id and the source execution model id
Expand Down Expand Up @@ -1623,7 +1566,7 @@ func NewExecutionManager(db repositoryInterfaces.Repository, pluginRegistry *plu
"size in bytes of serialized execution outputs"),
}

resourceManager := resources.NewResourceManager(db, config.ApplicationConfiguration())
resourceManager := resources.NewResourceManager(db, config)
return &ExecutionManager{
db: db,
config: config,
Expand Down
Loading