Skip to content

Commit

Permalink
Allow recurring runs to always use the latest pipeline version
Browse files Browse the repository at this point in the history
This makes the pipeline_version_id optional and makes the
Scheduled Workflow controller levarege the REST API to launch the run
rather than rely on compiled Argo Workflows stored in the
ScheduledWorkflow object.

The previous behavior is preserved if the user is using the v1 API or
specifies a pipeline version ID or pipeline spec manifest.

Resolves:
kubeflow#11542

Signed-off-by: mprahl <[email protected]>
  • Loading branch information
mprahl committed Jan 30, 2025
1 parent ac9b257 commit f6ac254
Show file tree
Hide file tree
Showing 35 changed files with 460 additions and 132 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,8 @@ __pycache__
# kfp local execution default directory
local_outputs/

# Ignore the Kind cluster kubeconfig
kubeconfig_dev-pipelines-api

# Ignore debug Driver Dockerfile produced from `make -C backend image_driver_debug`
backend/Dockerfile.driver-debug
35 changes: 34 additions & 1 deletion backend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,40 @@ You can also directly connect to the MariaDB database server with:
mysql -h 127.0.0.1 -u root
```

### Remote Debug the Driver
### Scheduled Workflow Development

If you also want to run the Scheduled Workflow controller locally, stop the controller on the cluster with:

```bash
kubectl -n kubeflow scale deployment ml-pipeline-scheduledworkflow --replicas=0
```

Then you may leverage the following sample `.vscode/launch.json` file to run the Scheduled Workflow controller locally:

```json
{
"version": "0.2.0",
"configurations": [
{
"name": "Launch Scheduled Workflow controller (Kind)",
"type": "go",
"request": "launch",
"mode": "debug",
"program": "${workspaceFolder}/backend/src/crd/controller/scheduledworkflow",
"env": {
"CRON_SCHEDULE_TIMEZONE": "UTC"
},
"args": [
"-namespace=kubeflow",
"-kubeconfig=${workspaceFolder}/kubeconfig_dev-pipelines-api",
"-mlPipelineAPIServerName=localhost"
]
}
]
}
```

## Remote Debug the Driver

These instructions assume you are leveraging the Kind cluster in the
[Run Locally With a Kind Cluster](#run-locally-with-a-kind-cluster) section.
Expand Down
2 changes: 2 additions & 0 deletions backend/api/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
IMAGE_TAG=kfp-api-generator
# Contact chensun or zijianjoy if this remote image needs an update.
REMOTE_IMAGE=ghcr.io/kubeflow/kfp-api-generator
# Assume the latest API version by default.
API_VERSION ?= v2beta1

# Keep in sync with the version used in test/release/Dockerfile.release
PREBUILT_REMOTE_IMAGE=ghcr.io/kubeflow/kfp-api-generator:1.0
Expand Down
2 changes: 1 addition & 1 deletion backend/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Tools needed:
Set the environment variable `API_VERSION` to the version that you want to generate. We use `v1beta1` as example here.

```bash
export API_VERSION="v1beta1"
export API_VERSION="v2beta1"
```

## Compiling `.proto` files to Go client and swagger definitions
Expand Down
4 changes: 2 additions & 2 deletions backend/api/v1beta1/job.proto
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,9 @@ message Job {
// Optional input field. Describing the purpose of the job
string description = 3;

// Required input field.
// Optional input field.
// Describing what the pipeline manifest and parameters to use
// for the scheduled job.
// for the scheduled job. If unset, fetch the pipline_spec at runtime.
PipelineSpec pipeline_spec = 4;

// Optional input field. Specify which resource this job belongs to.
Expand Down
2 changes: 1 addition & 1 deletion backend/api/v2beta1/go_client/recurring_run.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions backend/api/v2beta1/go_client/run.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend/api/v2beta1/recurring_run.proto
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ message RecurringRun {
string description = 3;

// Required input field. Specifies the source of the pipeline spec for this
// recurring run. Can be either a pipeline version id, or a pipeline spec.
// recurring run. Can be either a pipeline id, pipeline version id, or a pipeline spec.
oneof pipeline_source {
// This field is Deprecated. The pipeline version id is under pipeline_version_reference for v2.
string pipeline_version_id = 4 [deprecated=true];
Expand Down
4 changes: 2 additions & 2 deletions backend/api/v2beta1/run.proto
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ message Run {
// Pipeline spec.
google.protobuf.Struct pipeline_spec = 7;

// Reference to a pipeline version containing pipeline_id and pipeline_version_id.
// Reference to a pipeline containing pipeline_id and optionally the pipeline_version_id.
PipelineVersionReference pipeline_version_reference = 18;
}

Expand Down Expand Up @@ -213,7 +213,7 @@ message PipelineVersionReference {
// Input. Required. Unique ID of the parent pipeline.
string pipeline_id = 1;

// Input. Required. Unique ID of an existing pipeline version.
// Input. Optional. Unique ID of an existing pipeline version. If unset, the latest pipeline version is used.
string pipeline_version_id = 2;
}

Expand Down
4 changes: 2 additions & 2 deletions backend/api/v2beta1/swagger/kfp_api_single_file.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2020,7 +2020,7 @@
},
"pipeline_version_id": {
"type": "string",
"description": "Input. Required. Unique ID of an existing pipeline version."
"description": "Input. Optional. Unique ID of an existing pipeline version. If unset, the latest pipeline version is used."
}
},
"description": "Reference to an existing pipeline version."
Expand Down Expand Up @@ -2349,7 +2349,7 @@
},
"pipeline_version_reference": {
"$ref": "#/definitions/v2beta1PipelineVersionReference",
"description": "Reference to a pipeline version containing pipeline_id and pipeline_version_id."
"description": "Reference to a pipeline containing pipeline_id and optionally the pipeline_version_id."
},
"runtime_config": {
"$ref": "#/definitions/v2beta1RuntimeConfig",
Expand Down
2 changes: 1 addition & 1 deletion backend/api/v2beta1/swagger/recurring_run.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@
},
"pipeline_version_id": {
"type": "string",
"description": "Input. Required. Unique ID of an existing pipeline version."
"description": "Input. Optional. Unique ID of an existing pipeline version. If unset, the latest pipeline version is used."
}
},
"description": "Reference to an existing pipeline version."
Expand Down
4 changes: 2 additions & 2 deletions backend/api/v2beta1/swagger/run.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@
},
"pipeline_version_id": {
"type": "string",
"description": "Input. Required. Unique ID of an existing pipeline version."
"description": "Input. Optional. Unique ID of an existing pipeline version. If unset, the latest pipeline version is used."
}
},
"description": "Reference to an existing pipeline version."
Expand Down Expand Up @@ -667,7 +667,7 @@
},
"pipeline_version_reference": {
"$ref": "#/definitions/v2beta1PipelineVersionReference",
"description": "Reference to a pipeline version containing pipeline_id and pipeline_version_id."
"description": "Reference to a pipeline containing pipeline_id and optionally the pipeline_version_id."
},
"runtime_config": {
"$ref": "#/definitions/v2beta1RuntimeConfig",
Expand Down
5 changes: 3 additions & 2 deletions backend/src/apiserver/client/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import (
)

const (
MYSQL_TEXT_FORMAT string = "longtext not null"
MYSQL_EXIST_ERROR string = "database exists"
MYSQL_TEXT_FORMAT string = "longtext not null"
MYSQL_TEXT_FORMAT_NULL string = "longtext"
MYSQL_EXIST_ERROR string = "database exists"

PGX_TEXT_FORMAT string = "text"
PGX_EXIST_ERROR string = "already exists"
Expand Down
6 changes: 6 additions & 0 deletions backend/src/apiserver/client_manager/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,12 @@ func InitDBClient(initConnectionTimeout time.Duration) *storage.DB {
if ignoreAlreadyExistError(driverName, response.Error) != nil {
glog.Fatalf("Failed to create a foreign key for RunUUID in task table. Error: %s", response.Error)
}

// This is a workaround because AutoMigration does not detect that the column went from not null to nullable.
response = db.Model(&model.Job{}).ModifyColumn("WorkflowSpecManifest", client.MYSQL_TEXT_FORMAT_NULL)
if response.Error != nil {
glog.Fatalf("Failed to make the WorkflowSpecManifest column nullable on jobs. Error: %s", response.Error)
}
default:
glog.Fatalf("Driver %v is not supported, use \"mysql\" for MySQL, or \"pgx\" for PostgreSQL", driverName)
}
Expand Down
3 changes: 2 additions & 1 deletion backend/src/apiserver/model/pipeline_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ type PipelineSpec struct {
PipelineSpecManifest string `gorm:"column:PipelineSpecManifest; size:33554432;"`

// Argo workflow YAML definition. This is the Argo Spec converted from Pipeline YAML.
WorkflowSpecManifest string `gorm:"column:WorkflowSpecManifest; not null; size:33554432;"`
// This is deprecated. Use the pipeline ID, pipeline version ID, or pipeline spec manifest.
WorkflowSpecManifest string `gorm:"column:WorkflowSpecManifest; size:33554432;"`

// Store parameters key-value pairs as serialized string.
// This field is only used for V1 API. For V2, use the `Parameters` field in RuntimeConfig.
Expand Down
79 changes: 67 additions & 12 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,12 @@ func (r *ResourceManager) ReconcileSwfCrs(ctx context.Context) error {
default:
}

// If the pipeline isn't pinned, skip it. The runs API is used directly by the ScheduledWorkflow controller
// in this case with just the pipeline ID and optionally the pipeline version ID.
if jobs[i].PipelineSpec.PipelineSpecManifest == "" && jobs[i].PipelineSpec.WorkflowSpecManifest == "" {
continue
}

tmpl, _, err := r.fetchTemplateFromPipelineSpec(&jobs[i].PipelineSpec)
if err != nil {
return failedToReconcileSwfCrsError(err)
Expand Down Expand Up @@ -1041,13 +1047,6 @@ func (r *ResourceManager) fetchPipelineVersionFromPipelineSpec(pipelineSpec mode
// Manifest's namespace gets overwritten with the job.Namespace if the later is non-empty.
// Otherwise, job.Namespace gets overwritten by the manifest.
func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model.Job, error) {
// Create a template based on the manifest of an existing pipeline version or used-provided manifest.
// Update the job.PipelineSpec if an existing pipeline version is used.
tmpl, manifest, err := r.fetchTemplateFromPipelineSpec(&job.PipelineSpec)
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to create a recurring run with an invalid pipeline spec manifest")
}

// Create a new ScheduledWorkflow at the ScheduledWorkflow client.
k8sNamespace := job.Namespace
if k8sNamespace == "" {
Expand All @@ -1059,12 +1058,63 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model

job.Namespace = k8sNamespace

// TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB).
// Convert modelJob into scheduledWorkflow.
scheduledWorkflow, err := tmpl.ScheduledWorkflow(job)
if err != nil {
return nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation")
var manifest string
var scheduledWorkflow *scheduledworkflow.ScheduledWorkflow
var tmpl template.Template

// If the pipeline version or pipeline spec is provided, this means the user wants to pin to a specific pipeline.
// Otherwise, always let the ScheduledWorkflow controller pick the latest.
if job.PipelineVersionId != "" || job.PipelineSpecManifest != "" || job.WorkflowSpecManifest != "" {
var err error
// Create a template based on the manifest of an existing pipeline version or used-provided manifest.
// Update the job.PipelineSpec if an existing pipeline version is used.
tmpl, manifest, err = r.fetchTemplateFromPipelineSpec(&job.PipelineSpec)
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to create a recurring run with an invalid pipeline spec manifest")
}

// TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB).
// Convert modelJob into scheduledWorkflow.
scheduledWorkflow, err = tmpl.ScheduledWorkflow(job)
if err != nil {
return nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation")
}
} else if job.PipelineId == "" {
return nil, errors.New("Cannot create a job with an empty pipeline ID")
} else {
// Validate the input parameters on the latest pipeline version. The latest pipeline version is not stored
// in the ScheduledWorkflow. It's just to help the user with up front validation at recurring run creation
// time.
manifest, err := r.GetPipelineLatestTemplate(job.PipelineId)
if err != nil {
return nil, util.Wrap(err, "Failed to validate the input parameters on the latest pipeline version")
}

tmpl, err := template.New(manifest)
if err != nil {
return nil, util.Wrap(err, "Failed to fetch a template with an invalid pipeline spec manifest")
}

_, err = tmpl.ScheduledWorkflow(job)
if err != nil {
return nil, util.Wrap(err, "Failed to validate the input parameters on the latest pipeline version")
}

scheduledWorkflow, err = template.NewGenericScheduledWorkflow(job)
if err != nil {
return nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation")
}

parameters, err := template.StringMapToCRDParameters(job.RuntimeConfig.Parameters)
if err != nil {
return nil, util.Wrap(err, "Converting runtime config's parameters to CDR parameters failed")
}

scheduledWorkflow.Spec.Workflow = &scheduledworkflow.WorkflowResource{
Parameters: parameters, PipelineRoot: job.PipelineRoot,
}
}

newScheduledWorkflow, err := r.getScheduledWorkflowClient(k8sNamespace).Create(ctx, scheduledWorkflow)
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
Expand All @@ -1080,6 +1130,11 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model
for _, modelRef := range job.ResourceReferences {
modelRef.ResourceUUID = string(swf.UID)
}

if tmpl == nil {
return r.jobStore.CreateJob(job)
}

if tmpl.GetTemplateType() == template.V1 {
// Get the service account
serviceAccount := ""
Expand Down
Loading

0 comments on commit f6ac254

Please sign in to comment.