Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

feat: Dask add pod template support #374

Merged
merged 9 commits into from
Sep 18, 2023

Conversation

bstadlbauer
Copy link
Member

@bstadlbauer bstadlbauer commented Jul 14, 2023

TL;DR

This PR change the dask plugin to use ToK8sPodSpec to create the basis of the pod template to be used for all dask components. This is coherent with other plugins (such as the ray plugin) and enables the use of common features such as pod templates.

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

This has been pretty straight forward. The only downside was that I had to remove the interruptible config from the pod spec after it has been added, as I couldn't see a way of not adding it in ToK8sPodSpec.

Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Comment on lines 58 to 95
func removeInterruptibleConfig(spec *v1.PodSpec, taskCtx pluginsCore.TaskExecutionContext) {
if !taskCtx.TaskExecutionMetadata().IsInterruptible() {
return
}

jobRunnerContainer := v1.Container{
Name: "job-runner",
Image: defaultImage,
Args: defaultContainerSpec.GetArgs(),
Env: defaultEnvVars,
Resources: *containerResources,
// Tolerations
interruptlibleTolerations := config.GetK8sPluginConfig().InterruptibleTolerations
newTolerations := []v1.Toleration{}
for _, toleration := range spec.Tolerations {
if !slices.Contains(interruptlibleTolerations, toleration) {
newTolerations = append(newTolerations, toleration)
}
}
spec.Tolerations = newTolerations

templateParameters := template.Parameters{
TaskExecMetadata: taskCtx.TaskExecutionMetadata(),
Inputs: taskCtx.InputReader(),
OutputPath: taskCtx.OutputWriter(),
Task: taskCtx.TaskReader(),
// Node selectors
interruptibleNodeSelector := config.GetK8sPluginConfig().InterruptibleNodeSelector
for key := range spec.NodeSelector {
if _, ok := interruptibleNodeSelector[key]; ok {
delete(spec.NodeSelector, key)
}
}
if err = flytek8s.AddFlyteCustomizationsToContainer(ctx, templateParameters,
flytek8s.ResourceCustomizationModeMergeExistingResources, &jobRunnerContainer); err != nil {

return nil, err
// Node selector requirements
interruptibleNodeSelectorRequirements := config.GetK8sPluginConfig().InterruptibleNodeSelectorRequirement
nodeSelectorTerms := spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
for i := range nodeSelectorTerms {
nst := &nodeSelectorTerms[i]
matchExpressions := nst.MatchExpressions
newMatchExpressions := []v1.NodeSelectorRequirement{}
for _, matchExpression := range matchExpressions {
if !nodeSelectorRequirementsAreEqual(matchExpression, *interruptibleNodeSelectorRequirements) {
newMatchExpressions = append(newMatchExpressions, matchExpression)
}
}
nst.MatchExpressions = newMatchExpressions
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hamersaw I'm not too keen on this removal function, happy to change ToK8sPodSpec to enable not adding the config in the first place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative solution would be to create a wrapper over the TaskExecutionContext / TaskExecutionMetadata which forcesinterruptible to be false. This would stop them from ever being injected without adding more parameters. Something like:

struct daskTaskExecutionContext {
    taskExecutionContext
    metadata TaskExecutionMetadata
}

func (d *daskTaskExecutionContext) GetMetadata() TaskExecutionMetadata{
    return metadata
}

type daskTaskExecutionMetadata {
    taskExecutionMetadata
}

func (d *daskTaskExecutionMetadata) IsInterruptible() bool {
    return falsee
}

the function and type names are all wrong (don't know off the top of my head), but basically just forcing interruptible to always be false for dask tasks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me - I'll try to implement that!

@bstadlbauer bstadlbauer marked this pull request as ready for review July 17, 2023 10:03
@bstadlbauer
Copy link
Member Author

@hamersaw I'm done with the work on this, but haven't smoke tested this in a local cluster yet, should hopefully get to that tomorrow.

The test and lint failures are a bit puzzling to me as both pass locally, and it seems like they flag something that's not even in the code (e.g. dask_test:543 does not contain the function mentioned)

@hamersaw
Copy link
Contributor

should hopefully get to that tomorrow.

@bstadlbauer have you had a chance to test this locally yet? Everything looks good to me, but always nice to double-check.

As far as the CI checks, I'm not quite sure what's going on. Will plan on addressing as we iterate on this.

@codecov
Copy link

codecov bot commented Sep 17, 2023

Codecov Report

Patch coverage: 78.72% and project coverage change: +1.19% 🎉

Comparison is base (9bf0fb9) 62.91% compared to head (26410b2) 64.10%.

❗ Current head 26410b2 differs from pull request most recent head fba2d0b. Consider uploading reports for the commit fba2d0b to get more accurate results

Additional details and impacted files
@@            Coverage Diff             @@
##           master     #374      +/-   ##
==========================================
+ Coverage   62.91%   64.10%   +1.19%     
==========================================
  Files         156      156              
  Lines       13154    10654    -2500     
==========================================
- Hits         8276     6830    -1446     
+ Misses       4257     3200    -1057     
- Partials      621      624       +3     
Flag Coverage Δ
unittests ?

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Changed Coverage Δ
go/tasks/plugins/k8s/dask/dask.go 81.33% <78.49%> (-1.72%) ⬇️
go/tasks/pluginmachinery/flytek8s/pod_helper.go 71.07% <100.00%> (-3.03%) ⬇️

... and 134 files with indirect coverage changes

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@bstadlbauer
Copy link
Member Author

@hamersaw This would be ready from my end now - again sorry for the incredibly long delay here.

I've swapped the removeInterruptibleConfig function in favor of a nonInterruptibleTaskExecutionMetadata wrapper as you've suggested above.

In addition, I've written a small end-to-end test suite in https://github.com/bstadlbauer/flyte-dev-setup which can automatically end-to-end test the plugin which should hopefully decrease the iteration time on future changes as I was mostly blocked by testing this. I've also found a small bug (an incorrect restart policy due to a dask kubernetes update) which I've fixed along the way.
In addition I've noticed that Labels are somehow not propagated (i.e. this task will fail), but I'm not 100% sure yet what the root cause is - I've done some debugging and it somehow seems like

mergeMapInto(taskCtx.TaskExecutionMetadata().GetLabels(), objectMeta.Labels)

the output GetLabels() here does not contain the label, which would mean it's outside of the plugin.

@hamersaw hamersaw merged commit 6048689 into flyteorg:master Sep 18, 2023
6 of 7 checks passed
eapolinario pushed a commit that referenced this pull request Sep 28, 2023
* Add failing test

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* WIP

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Improve test

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Refactor to use `ToK8sPodSpec`

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Fix linting

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Use `Always` restart policy for workers

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Add test which checks whether labels are propagated

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Replace `removeInterruptibleConfig` with `TaskExectuionMetadata` wrapper

Signed-off-by: Bernhard Stadlbauer <[email protected]>

---------

Signed-off-by: Bernhard Stadlbauer <[email protected]>
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants