diff --git a/.gitignore b/.gitignore index f221dec7bb3..95936b34f3d 100644 --- a/.gitignore +++ b/.gitignore @@ -86,5 +86,8 @@ __pycache__ # kfp local execution default directory local_outputs/ +# Ignore the Kind cluster kubeconfig +kubeconfig_dev-pipelines-api + # Ignore debug Driver Dockerfile produced from `make -C backend image_driver_debug` backend/Dockerfile.driver-debug diff --git a/backend/README.md b/backend/README.md index a6c0f9b82c4..962d40ba653 100644 --- a/backend/README.md +++ b/backend/README.md @@ -159,7 +159,40 @@ You can also directly connect to the MariaDB database server with: mysql -h 127.0.0.1 -u root ``` -### Remote Debug the Driver +### Scheduled Workflow Development + +If you also want to run the Scheduled Workflow controller locally, stop the controller on the cluster with: + +```bash +kubectl -n kubeflow scale deployment ml-pipeline-scheduledworkflow --replicas=0 +``` + +Then you may leverage the following sample `.vscode/launch.json` file to run the Scheduled Workflow controller locally: + +```json +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Launch Scheduled Workflow controller (Kind)", + "type": "go", + "request": "launch", + "mode": "debug", + "program": "${workspaceFolder}/backend/src/crd/controller/scheduledworkflow", + "env": { + "CRON_SCHEDULE_TIMEZONE": "UTC" + }, + "args": [ + "-namespace=kubeflow", + "-kubeconfig=${workspaceFolder}/kubeconfig_dev-pipelines-api", + "-mlPipelineAPIServerName=localhost" + ] + } + ] +} +``` + +## Remote Debug the Driver These instructions assume you are leveraging the Kind cluster in the [Run Locally With a Kind Cluster](#run-locally-with-a-kind-cluster) section. diff --git a/backend/api/Makefile b/backend/api/Makefile index 4bc5db994e9..9ebc9679ff0 100644 --- a/backend/api/Makefile +++ b/backend/api/Makefile @@ -17,6 +17,8 @@ IMAGE_TAG=kfp-api-generator # Contact chensun or zijianjoy if this remote image needs an update. REMOTE_IMAGE=ghcr.io/kubeflow/kfp-api-generator +# Assume the latest API version by default. +API_VERSION ?= v2beta1 # Keep in sync with the version used in test/release/Dockerfile.release PREBUILT_REMOTE_IMAGE=ghcr.io/kubeflow/kfp-api-generator:1.0 diff --git a/backend/api/README.md b/backend/api/README.md index eb2da5bb600..a5ffb040d4d 100644 --- a/backend/api/README.md +++ b/backend/api/README.md @@ -12,7 +12,7 @@ Tools needed: Set the environment variable `API_VERSION` to the version that you want to generate. We use `v1beta1` as example here. ```bash -export API_VERSION="v1beta1" +export API_VERSION="v2beta1" ``` ## Compiling `.proto` files to Go client and swagger definitions diff --git a/backend/api/v1beta1/job.proto b/backend/api/v1beta1/job.proto index a08a36b22eb..c76b64e9143 100644 --- a/backend/api/v1beta1/job.proto +++ b/backend/api/v1beta1/job.proto @@ -210,9 +210,9 @@ message Job { // Optional input field. Describing the purpose of the job string description = 3; - // Required input field. + // Optional input field. // Describing what the pipeline manifest and parameters to use - // for the scheduled job. + // for the scheduled job. If unset, fetch the pipline_spec at runtime. PipelineSpec pipeline_spec = 4; // Optional input field. Specify which resource this job belongs to. diff --git a/backend/api/v2beta1/go_client/recurring_run.pb.go b/backend/api/v2beta1/go_client/recurring_run.pb.go index 29c1fef7dae..fcc42ce0005 100644 --- a/backend/api/v2beta1/go_client/recurring_run.pb.go +++ b/backend/api/v2beta1/go_client/recurring_run.pb.go @@ -159,7 +159,7 @@ type RecurringRun struct { // Optional input field. Describes the purpose of the recurring run. Description string `protobuf:"bytes,3,opt,name=description,proto3" json:"description,omitempty"` // Required input field. Specifies the source of the pipeline spec for this - // recurring run. Can be either a pipeline version id, or a pipeline spec. + // recurring run. Can be either a pipeline id, pipeline version id, or a pipeline spec. // // Types that are assignable to PipelineSource: // diff --git a/backend/api/v2beta1/go_client/run.pb.go b/backend/api/v2beta1/go_client/run.pb.go index 4713fee5ec9..ac0d74d088b 100644 --- a/backend/api/v2beta1/go_client/run.pb.go +++ b/backend/api/v2beta1/go_client/run.pb.go @@ -411,7 +411,7 @@ type Run_PipelineSpec struct { } type Run_PipelineVersionReference struct { - // Reference to a pipeline version containing pipeline_id and pipeline_version_id. + // Reference to a pipeline containing pipeline_id and optionally the pipeline_version_id. PipelineVersionReference *PipelineVersionReference `protobuf:"bytes,18,opt,name=pipeline_version_reference,json=pipelineVersionReference,proto3,oneof"` } @@ -429,7 +429,7 @@ type PipelineVersionReference struct { // Input. Required. Unique ID of the parent pipeline. PipelineId string `protobuf:"bytes,1,opt,name=pipeline_id,json=pipelineId,proto3" json:"pipeline_id,omitempty"` - // Input. Required. Unique ID of an existing pipeline version. + // Input. Optional. Unique ID of an existing pipeline version. If unset, the latest pipeline version is used. PipelineVersionId string `protobuf:"bytes,2,opt,name=pipeline_version_id,json=pipelineVersionId,proto3" json:"pipeline_version_id,omitempty"` } diff --git a/backend/api/v2beta1/go_http_client/recurring_run_model/v2beta1_pipeline_version_reference.go b/backend/api/v2beta1/go_http_client/recurring_run_model/v2beta1_pipeline_version_reference.go index 6c4adb9d62e..615a11c457e 100644 --- a/backend/api/v2beta1/go_http_client/recurring_run_model/v2beta1_pipeline_version_reference.go +++ b/backend/api/v2beta1/go_http_client/recurring_run_model/v2beta1_pipeline_version_reference.go @@ -18,7 +18,7 @@ type V2beta1PipelineVersionReference struct { // Input. Required. Unique ID of the parent pipeline. PipelineID string `json:"pipeline_id,omitempty"` - // Input. Required. Unique ID of an existing pipeline version. + // Input. Optional. Unique ID of an existing pipeline version. If unset, the latest pipeline version is used. PipelineVersionID string `json:"pipeline_version_id,omitempty"` } diff --git a/backend/api/v2beta1/go_http_client/run_model/v2beta1_pipeline_version_reference.go b/backend/api/v2beta1/go_http_client/run_model/v2beta1_pipeline_version_reference.go index e817451c66c..48f17f81926 100644 --- a/backend/api/v2beta1/go_http_client/run_model/v2beta1_pipeline_version_reference.go +++ b/backend/api/v2beta1/go_http_client/run_model/v2beta1_pipeline_version_reference.go @@ -18,7 +18,7 @@ type V2beta1PipelineVersionReference struct { // Input. Required. Unique ID of the parent pipeline. PipelineID string `json:"pipeline_id,omitempty"` - // Input. Required. Unique ID of an existing pipeline version. + // Input. Optional. Unique ID of an existing pipeline version. If unset, the latest pipeline version is used. PipelineVersionID string `json:"pipeline_version_id,omitempty"` } diff --git a/backend/api/v2beta1/go_http_client/run_model/v2beta1_run.go b/backend/api/v2beta1/go_http_client/run_model/v2beta1_run.go index c7a012c57c6..d232fdd05ca 100644 --- a/backend/api/v2beta1/go_http_client/run_model/v2beta1_run.go +++ b/backend/api/v2beta1/go_http_client/run_model/v2beta1_run.go @@ -49,7 +49,7 @@ type V2beta1Run struct { // This field is Deprecated. The pipeline version id is under pipeline_version_reference for v2. PipelineVersionID string `json:"pipeline_version_id,omitempty"` - // Reference to a pipeline version containing pipeline_id and pipeline_version_id. + // Reference to a pipeline containing pipeline_id and optionally the pipeline_version_id. PipelineVersionReference *V2beta1PipelineVersionReference `json:"pipeline_version_reference,omitempty"` // ID of the recurring run that triggered this run. diff --git a/backend/api/v2beta1/recurring_run.proto b/backend/api/v2beta1/recurring_run.proto index 09cec8e200f..66b810901a2 100644 --- a/backend/api/v2beta1/recurring_run.proto +++ b/backend/api/v2beta1/recurring_run.proto @@ -89,7 +89,7 @@ message RecurringRun { string description = 3; // Required input field. Specifies the source of the pipeline spec for this - // recurring run. Can be either a pipeline version id, or a pipeline spec. + // recurring run. Can be either a pipeline id, pipeline version id, or a pipeline spec. oneof pipeline_source { // This field is Deprecated. The pipeline version id is under pipeline_version_reference for v2. string pipeline_version_id = 4 [deprecated=true]; diff --git a/backend/api/v2beta1/run.proto b/backend/api/v2beta1/run.proto index 040abb4a280..5c48ab19317 100644 --- a/backend/api/v2beta1/run.proto +++ b/backend/api/v2beta1/run.proto @@ -168,7 +168,7 @@ message Run { // Pipeline spec. google.protobuf.Struct pipeline_spec = 7; - // Reference to a pipeline version containing pipeline_id and pipeline_version_id. + // Reference to a pipeline containing pipeline_id and optionally the pipeline_version_id. PipelineVersionReference pipeline_version_reference = 18; } @@ -213,7 +213,7 @@ message PipelineVersionReference { // Input. Required. Unique ID of the parent pipeline. string pipeline_id = 1; - // Input. Required. Unique ID of an existing pipeline version. + // Input. Optional. Unique ID of an existing pipeline version. If unset, the latest pipeline version is used. string pipeline_version_id = 2; } diff --git a/backend/api/v2beta1/swagger/kfp_api_single_file.swagger.json b/backend/api/v2beta1/swagger/kfp_api_single_file.swagger.json index 37ac61bdb21..6fc88e10f4b 100644 --- a/backend/api/v2beta1/swagger/kfp_api_single_file.swagger.json +++ b/backend/api/v2beta1/swagger/kfp_api_single_file.swagger.json @@ -2020,7 +2020,7 @@ }, "pipeline_version_id": { "type": "string", - "description": "Input. Required. Unique ID of an existing pipeline version." + "description": "Input. Optional. Unique ID of an existing pipeline version. If unset, the latest pipeline version is used." } }, "description": "Reference to an existing pipeline version." @@ -2349,7 +2349,7 @@ }, "pipeline_version_reference": { "$ref": "#/definitions/v2beta1PipelineVersionReference", - "description": "Reference to a pipeline version containing pipeline_id and pipeline_version_id." + "description": "Reference to a pipeline containing pipeline_id and optionally the pipeline_version_id." }, "runtime_config": { "$ref": "#/definitions/v2beta1RuntimeConfig", diff --git a/backend/api/v2beta1/swagger/recurring_run.swagger.json b/backend/api/v2beta1/swagger/recurring_run.swagger.json index dfc0a80c19f..48e811bd1b6 100644 --- a/backend/api/v2beta1/swagger/recurring_run.swagger.json +++ b/backend/api/v2beta1/swagger/recurring_run.swagger.json @@ -390,7 +390,7 @@ }, "pipeline_version_id": { "type": "string", - "description": "Input. Required. Unique ID of an existing pipeline version." + "description": "Input. Optional. Unique ID of an existing pipeline version. If unset, the latest pipeline version is used." } }, "description": "Reference to an existing pipeline version." diff --git a/backend/api/v2beta1/swagger/run.swagger.json b/backend/api/v2beta1/swagger/run.swagger.json index b71fd939049..d04bda64213 100644 --- a/backend/api/v2beta1/swagger/run.swagger.json +++ b/backend/api/v2beta1/swagger/run.swagger.json @@ -619,7 +619,7 @@ }, "pipeline_version_id": { "type": "string", - "description": "Input. Required. Unique ID of an existing pipeline version." + "description": "Input. Optional. Unique ID of an existing pipeline version. If unset, the latest pipeline version is used." } }, "description": "Reference to an existing pipeline version." @@ -667,7 +667,7 @@ }, "pipeline_version_reference": { "$ref": "#/definitions/v2beta1PipelineVersionReference", - "description": "Reference to a pipeline version containing pipeline_id and pipeline_version_id." + "description": "Reference to a pipeline containing pipeline_id and optionally the pipeline_version_id." }, "runtime_config": { "$ref": "#/definitions/v2beta1RuntimeConfig", diff --git a/backend/src/apiserver/client/sql.go b/backend/src/apiserver/client/sql.go index 026ef056190..3a111eb3afc 100644 --- a/backend/src/apiserver/client/sql.go +++ b/backend/src/apiserver/client/sql.go @@ -22,8 +22,9 @@ import ( ) const ( - MYSQL_TEXT_FORMAT string = "longtext not null" - MYSQL_EXIST_ERROR string = "database exists" + MYSQL_TEXT_FORMAT string = "longtext not null" + MYSQL_TEXT_FORMAT_NULL string = "longtext" + MYSQL_EXIST_ERROR string = "database exists" PGX_TEXT_FORMAT string = "text" PGX_EXIST_ERROR string = "already exists" diff --git a/backend/src/apiserver/client_manager/client_manager.go b/backend/src/apiserver/client_manager/client_manager.go index 2368666e3ff..9857aac68d5 100644 --- a/backend/src/apiserver/client_manager/client_manager.go +++ b/backend/src/apiserver/client_manager/client_manager.go @@ -345,6 +345,12 @@ func InitDBClient(initConnectionTimeout time.Duration) *storage.DB { if ignoreAlreadyExistError(driverName, response.Error) != nil { glog.Fatalf("Failed to create a foreign key for RunUUID in task table. Error: %s", response.Error) } + + // This is a workaround because AutoMigration does not detect that the column went from not null to nullable. + response = db.Model(&model.Job{}).ModifyColumn("WorkflowSpecManifest", client.MYSQL_TEXT_FORMAT_NULL) + if response.Error != nil { + glog.Fatalf("Failed to make the WorkflowSpecManifest column nullable on jobs. Error: %s", response.Error) + } default: glog.Fatalf("Driver %v is not supported, use \"mysql\" for MySQL, or \"pgx\" for PostgreSQL", driverName) } diff --git a/backend/src/apiserver/model/pipeline_spec.go b/backend/src/apiserver/model/pipeline_spec.go index b0fb4184119..2f92c5c68bd 100644 --- a/backend/src/apiserver/model/pipeline_spec.go +++ b/backend/src/apiserver/model/pipeline_spec.go @@ -33,7 +33,8 @@ type PipelineSpec struct { PipelineSpecManifest string `gorm:"column:PipelineSpecManifest; size:33554432;"` // Argo workflow YAML definition. This is the Argo Spec converted from Pipeline YAML. - WorkflowSpecManifest string `gorm:"column:WorkflowSpecManifest; not null; size:33554432;"` + // This is deprecated. Use the pipeline ID, pipeline version ID, or pipeline spec manifest. + WorkflowSpecManifest string `gorm:"column:WorkflowSpecManifest; size:33554432;"` // Store parameters key-value pairs as serialized string. // This field is only used for V1 API. For V2, use the `Parameters` field in RuntimeConfig. diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index c183aa7799f..6523496e012 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -590,6 +590,12 @@ func (r *ResourceManager) ReconcileSwfCrs(ctx context.Context) error { default: } + // If the pipeline isn't pinned, skip it. The runs API is used directly by the ScheduledWorkflow controller + // in this case with just the pipeline ID and optionally the pipeline version ID. + if jobs[i].PipelineSpec.PipelineSpecManifest == "" && jobs[i].PipelineSpec.WorkflowSpecManifest == "" { + continue + } + tmpl, _, err := r.fetchTemplateFromPipelineSpec(&jobs[i].PipelineSpec) if err != nil { return failedToReconcileSwfCrsError(err) @@ -1041,13 +1047,6 @@ func (r *ResourceManager) fetchPipelineVersionFromPipelineSpec(pipelineSpec mode // Manifest's namespace gets overwritten with the job.Namespace if the later is non-empty. // Otherwise, job.Namespace gets overwritten by the manifest. func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model.Job, error) { - // Create a template based on the manifest of an existing pipeline version or used-provided manifest. - // Update the job.PipelineSpec if an existing pipeline version is used. - tmpl, manifest, err := r.fetchTemplateFromPipelineSpec(&job.PipelineSpec) - if err != nil { - return nil, util.NewInternalServerError(err, "Failed to create a recurring run with an invalid pipeline spec manifest") - } - // Create a new ScheduledWorkflow at the ScheduledWorkflow client. k8sNamespace := job.Namespace if k8sNamespace == "" { @@ -1059,12 +1058,63 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model job.Namespace = k8sNamespace - // TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB). - // Convert modelJob into scheduledWorkflow. - scheduledWorkflow, err := tmpl.ScheduledWorkflow(job) - if err != nil { - return nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation") + var manifest string + var scheduledWorkflow *scheduledworkflow.ScheduledWorkflow + var tmpl template.Template + + // If the pipeline version or pipeline spec is provided, this means the user wants to pin to a specific pipeline. + // Otherwise, always let the ScheduledWorkflow controller pick the latest. + if job.PipelineVersionId != "" || job.PipelineSpecManifest != "" || job.WorkflowSpecManifest != "" { + var err error + // Create a template based on the manifest of an existing pipeline version or used-provided manifest. + // Update the job.PipelineSpec if an existing pipeline version is used. + tmpl, manifest, err = r.fetchTemplateFromPipelineSpec(&job.PipelineSpec) + if err != nil { + return nil, util.NewInternalServerError(err, "Failed to create a recurring run with an invalid pipeline spec manifest") + } + + // TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB). + // Convert modelJob into scheduledWorkflow. + scheduledWorkflow, err = tmpl.ScheduledWorkflow(job) + if err != nil { + return nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation") + } + } else if job.PipelineId == "" { + return nil, errors.New("Cannot create a job with an empty pipeline ID") + } else { + // Validate the input parameters on the latest pipeline version. The latest pipeline version is not stored + // in the ScheduledWorkflow. It's just to help the user with up front validation at recurring run creation + // time. + manifest, err := r.GetPipelineLatestTemplate(job.PipelineId) + if err != nil { + return nil, util.Wrap(err, "Failed to validate the input parameters on the latest pipeline version") + } + + tmpl, err := template.New(manifest) + if err != nil { + return nil, util.Wrap(err, "Failed to fetch a template with an invalid pipeline spec manifest") + } + + _, err = tmpl.ScheduledWorkflow(job) + if err != nil { + return nil, util.Wrap(err, "Failed to validate the input parameters on the latest pipeline version") + } + + scheduledWorkflow, err = template.NewGenericScheduledWorkflow(job) + if err != nil { + return nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation") + } + + parameters, err := template.StringMapToCRDParameters(job.RuntimeConfig.Parameters) + if err != nil { + return nil, util.Wrap(err, "Converting runtime config's parameters to CDR parameters failed") + } + + scheduledWorkflow.Spec.Workflow = &scheduledworkflow.WorkflowResource{ + Parameters: parameters, PipelineRoot: job.PipelineRoot, + } } + newScheduledWorkflow, err := r.getScheduledWorkflowClient(k8sNamespace).Create(ctx, scheduledWorkflow) if err != nil { if err, ok := err.(net.Error); ok && err.Timeout() { @@ -1080,6 +1130,11 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model for _, modelRef := range job.ResourceReferences { modelRef.ResourceUUID = string(swf.UID) } + + if tmpl == nil { + return r.jobStore.CreateJob(job) + } + if tmpl.GetTemplateType() == template.V1 { // Get the service account serviceAccount := "" diff --git a/backend/src/apiserver/resource/resource_manager_test.go b/backend/src/apiserver/resource/resource_manager_test.go index 3a29893fec5..4271e4f05fd 100644 --- a/backend/src/apiserver/resource/resource_manager_test.go +++ b/backend/src/apiserver/resource/resource_manager_test.go @@ -2366,37 +2366,25 @@ func TestCreateJob_ThroughPipelineID(t *testing.T) { pipelineStore, ok := store.pipelineStore.(*storage.PipelineStore) assert.True(t, ok) pipelineStore.SetUUIDGenerator(util.NewFakeUUIDGeneratorOrFatal(FakeUUIDOne, nil)) - pv := createPipelineVersion( - pipeline.UUID, - "version_for_job", - "", - "", - testWorkflow.ToStringForStore(), - "", - "", - ) - version, err := manager.CreatePipelineVersion(pv) - assert.Nil(t, err) // The pipeline specified via pipeline id will be converted to this // pipeline's default version, which will be used to create run. newJob, err := manager.CreateJob(context.Background(), job) expectedJob := &model.Job{ - UUID: "123e4567-e89b-12d3-a456-426655440000", - DisplayName: "j1", - K8SName: "job-", - Namespace: "ns1", - ServiceAccount: "pipeline-runner", + UUID: "123e4567-e89b-12d3-a456-426655440000", + DisplayName: "j1", + K8SName: "job-", + Namespace: "ns1", + // Since there is no pipeline version or service account specified, the API server will select the service + // account when compiling the run, not within the ScheduledWorkflow. + ServiceAccount: "", Enabled: true, - CreatedAtInSec: 5, - UpdatedAtInSec: 5, + CreatedAtInSec: 4, + UpdatedAtInSec: 4, Conditions: "STATUS_UNSPECIFIED", PipelineSpec: model.PipelineSpec{ - PipelineId: pipeline.UUID, - PipelineName: version.Name, - PipelineVersionId: version.UUID, - WorkflowSpecManifest: testWorkflow.ToStringForStore(), - Parameters: "[{\"name\":\"param1\",\"value\":\"world\"}]", + PipelineId: pipeline.UUID, + Parameters: "[{\"name\":\"param1\",\"value\":\"world\"}]", }, ExperimentId: experiment.UUID, } @@ -2512,6 +2500,7 @@ func TestCreateJob_ThroughPipelineIdAndPipelineVersion(t *testing.T) { } func TestCreateJob_EmptyPipelineSpec(t *testing.T) { + initEnvVars() store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch()) defer store.Close() manager := NewResourceManager(store, &ResourceManagerOptions{CollectMetrics: false}) @@ -2526,7 +2515,11 @@ func TestCreateJob_EmptyPipelineSpec(t *testing.T) { } _, err := manager.CreateJob(context.Background(), job) assert.NotNil(t, err) - assert.Contains(t, err.Error(), "Failed to fetch a template with an empty pipeline spec manifest") + errMsg := "" + if err != nil { + errMsg = err.Error() + } + assert.Contains(t, errMsg, "Cannot create a job with an empty pipeline ID") } func TestCreateJob_InvalidWorkflowSpec(t *testing.T) { diff --git a/backend/src/apiserver/server/api_converter.go b/backend/src/apiserver/server/api_converter.go index bdf55811de3..e6351b1397a 100644 --- a/backend/src/apiserver/server/api_converter.go +++ b/backend/src/apiserver/server/api_converter.go @@ -1869,10 +1869,13 @@ func toModelJob(j interface{}) (*model.Job, error) { case *apiv2beta1.RecurringRun: pipelineId = apiJob.GetPipelineVersionReference().GetPipelineId() pipelineVersionId = apiJob.GetPipelineVersionReference().GetPipelineVersionId() - if spec, err := pipelineSpecStructToYamlString(apiJob.GetPipelineSpec()); err == nil { - pipelineSpec = spec - } else { - return nil, util.Wrap(err, "Failed to convert API recurring run to its internal representation due to pipeline spec conversion error") + + if apiJob.GetPipelineSpec() != nil { + if spec, err := pipelineSpecStructToYamlString(apiJob.GetPipelineSpec()); err == nil { + pipelineSpec = spec + } else { + return nil, util.Wrap(err, "Failed to convert API recurring run to its internal representation due to pipeline spec conversion error") + } } cfg, err := toModelRuntimeConfig(apiJob.GetRuntimeConfig()) @@ -1933,6 +1936,7 @@ func toModelJob(j interface{}) (*model.Job, error) { } else if pipelineVersionId != "" { pipelineName = fmt.Sprintf("pipelines/%v", pipelineVersionId) } + status := model.StatusStateUnspecified if isEnabled { status = model.StatusStateEnabled diff --git a/backend/src/apiserver/server/job_server.go b/backend/src/apiserver/server/job_server.go index d78db2aff76..2b1af2c74c8 100644 --- a/backend/src/apiserver/server/job_server.go +++ b/backend/src/apiserver/server/job_server.go @@ -115,6 +115,19 @@ func (s *JobServer) CreateJob(ctx context.Context, request *apiv1beta1.CreateJob if err != nil { return nil, util.Wrap(err, "Failed to create a recurring run due to conversion error") } + + // In the v2 API, the pipeline version being empty means always pick the latest at run submission time. In v1, + // it means to use the latest pipeline version at recurring run creation time. Handle this case here since + // modelJob does not have the concept of which API version it came from. + if modelJob.WorkflowSpecManifest == "" && modelJob.PipelineSpecManifest == "" && modelJob.PipelineVersionId == "" { + pipelineVersion, err := s.resourceManager.GetLatestPipelineVersion(modelJob.PipelineId) + if err != nil { + return nil, util.Wrapf(err, "Failed to fetch a pipeline version from pipeline %v", modelJob.PipelineId) + } + + modelJob.PipelineVersionId = pipelineVersion.UUID + } + newJob, err := s.createJob(ctx, modelJob) if err != nil { return nil, util.Wrap(err, "Failed to create a recurring run") diff --git a/backend/src/apiserver/server/job_server_test.go b/backend/src/apiserver/server/job_server_test.go index f9dc4b80a77..b72c74515f8 100644 --- a/backend/src/apiserver/server/job_server_test.go +++ b/backend/src/apiserver/server/job_server_test.go @@ -148,10 +148,12 @@ func TestCreateJob_WrongInput(t *testing.T) { Trigger: &apiv1beta1.Trigger_CronSchedule{CronSchedule: &apiv1beta1.CronSchedule{ StartTime: ×tamp.Timestamp{Seconds: 1}, Cron: "1 * * * *", - }}}, + }}, + }, ResourceReferences: validReference, }, - "Failed to fetch a template with an empty pipeline spec manifest", + "Failed to fetch a pipeline version from pipeline : Failed to get the latest pipeline version as " + + "pipeline was not found: ResourceNotFoundError: Pipeline not found", }, { "invalid pipeline spec", @@ -172,7 +174,8 @@ func TestCreateJob_WrongInput(t *testing.T) { {Key: &apiv1beta1.ResourceKey{Type: apiv1beta1.ResourceType_EXPERIMENT, Id: experiment.UUID}, Relationship: apiv1beta1.Relationship_OWNER}, }, }, - "Failed to get the latest pipeline version as pipeline was not found: ResourceNotFoundError: Pipeline not_exist_pipeline not found", + "Failed to fetch a pipeline version from pipeline not_exist_pipeline: Failed to get the latest " + + "pipeline version as pipeline was not found: ResourceNotFoundError: Pipeline not_exist_pipeline not found", }, { "invalid cron", @@ -240,7 +243,11 @@ func TestCreateJob_WrongInput(t *testing.T) { for _, tt := range tests { got, err := server.CreateJob(context.Background(), &apiv1beta1.CreateJobRequest{Job: tt.arg}) assert.NotNil(t, err) - assert.Contains(t, err.Error(), tt.errMsg) + errMsg := "" + if err != nil { + errMsg = err.Error() + } + assert.Contains(t, errMsg, tt.errMsg) assert.Nil(t, got) } } diff --git a/backend/src/apiserver/template/argo_template.go b/backend/src/apiserver/template/argo_template.go index 90a04f6bd5d..721d9cd8fc3 100644 --- a/backend/src/apiserver/template/argo_template.go +++ b/backend/src/apiserver/template/argo_template.go @@ -23,7 +23,6 @@ import ( "github.com/kubeflow/pipelines/backend/src/apiserver/model" "github.com/kubeflow/pipelines/backend/src/common/util" scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/yaml" ) @@ -113,10 +112,6 @@ func (t *Argo) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Schedu setDefaultServiceAccount(workflow, modelJob.ServiceAccount) // Disable istio sidecar injection if not specified workflow.SetAnnotationsToAllTemplatesIfKeyNotExist(util.AnnotationKeyIstioSidecarInject, util.AnnotationValueIstioSidecarInjectDisabled) - swfGeneratedName, err := toSWFCRDResourceGeneratedName(modelJob.K8SName) - if err != nil { - return nil, util.Wrap(err, "Create job failed") - } // Marking auto-added artifacts as optional. Otherwise most older workflows will start failing after upgrade to Argo 2.3. // TODO: Fix the components to explicitly declare the artifacts they really output. @@ -127,28 +122,17 @@ func (t *Argo) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Schedu if err != nil { return nil, util.Wrap(err, "Failed to convert v1 parameters to CRD parameters") } - crdTrigger, err := modelToCRDTrigger(modelJob.Trigger) + + scheduledWorkflow, err := NewGenericScheduledWorkflow(modelJob) if err != nil { return nil, err } - scheduledWorkflow := &scheduledworkflow.ScheduledWorkflow{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "kubeflow.org/v1beta1", - Kind: "ScheduledWorkflow", - }, - ObjectMeta: metav1.ObjectMeta{GenerateName: swfGeneratedName}, - Spec: scheduledworkflow.ScheduledWorkflowSpec{ - Enabled: modelJob.Enabled, - MaxConcurrency: &modelJob.MaxConcurrency, - Trigger: crdTrigger, - Workflow: &scheduledworkflow.WorkflowResource{ - Parameters: swfParameters, - Spec: workflow.ToStringForSchedule(), - }, - NoCatchup: util.BoolPointer(modelJob.NoCatchup), - }, + scheduledWorkflow.Spec.Workflow = &scheduledworkflow.WorkflowResource{ + Parameters: swfParameters, + Spec: workflow.ToStringForSchedule(), } + return scheduledWorkflow, nil } diff --git a/backend/src/apiserver/template/template.go b/backend/src/apiserver/template/template.go index 738838c1651..f793dbc646f 100644 --- a/backend/src/apiserver/template/template.go +++ b/backend/src/apiserver/template/template.go @@ -169,7 +169,7 @@ func modelToPipelineJobRuntimeConfig(modelRuntimeConfig *model.RuntimeConfig) (* // Assumes that the serialized parameters will take a form of // map[string]*structpb.Value, which works for runtimeConfig.Parameters such as // {"param1":"value1","param2":"value2"}. -func stringMapToCRDParameters(modelParams string) ([]scheduledworkflow.Parameter, error) { +func StringMapToCRDParameters(modelParams string) ([]scheduledworkflow.Parameter, error) { var swParams []scheduledworkflow.Parameter var parameters map[string]*structpb.Value if modelParams == "" { diff --git a/backend/src/apiserver/template/v2_template.go b/backend/src/apiserver/template/v2_template.go index 1055bcdf8a9..df2ff9e3a13 100644 --- a/backend/src/apiserver/template/v2_template.go +++ b/backend/src/apiserver/template/v2_template.go @@ -41,9 +41,38 @@ type V2Spec struct { platformSpec *pipelinespec.PlatformSpec } -var ( - Launcher = "" -) +var Launcher = "" + +func NewGenericScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.ScheduledWorkflow, error) { + swfGeneratedName, err := toSWFCRDResourceGeneratedName(modelJob.K8SName) + if err != nil { + return nil, util.Wrap(err, "Create job failed") + } + + crdTrigger, err := modelToCRDTrigger(modelJob.Trigger) + if err != nil { + return nil, util.Wrap(err, "converting model trigger to crd trigger failed") + } + + return &scheduledworkflow.ScheduledWorkflow{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "kubeflow.org/v2beta1", + Kind: "ScheduledWorkflow", + }, + ObjectMeta: metav1.ObjectMeta{GenerateName: swfGeneratedName}, + Spec: scheduledworkflow.ScheduledWorkflowSpec{ + Enabled: modelJob.Enabled, + MaxConcurrency: &modelJob.MaxConcurrency, + Trigger: crdTrigger, + NoCatchup: util.BoolPointer(modelJob.NoCatchup), + ExperimentId: modelJob.ExperimentId, + PipelineId: modelJob.PipelineId, + PipelineName: modelJob.PipelineName, + PipelineVersionId: modelJob.PipelineVersionId, + ServiceAccount: modelJob.ServiceAccount, + }, + }, nil +} // Converts modelJob to ScheduledWorkflow. func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.ScheduledWorkflow, error) { @@ -102,41 +131,23 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche } // Disable istio sidecar injection if not specified executionSpec.SetAnnotationsToAllTemplatesIfKeyNotExist(util.AnnotationKeyIstioSidecarInject, util.AnnotationValueIstioSidecarInjectDisabled) - swfGeneratedName, err := toSWFCRDResourceGeneratedName(modelJob.K8SName) - if err != nil { - return nil, util.Wrap(err, "Create job failed") - } - parameters, err := stringMapToCRDParameters(modelJob.RuntimeConfig.Parameters) + parameters, err := StringMapToCRDParameters(modelJob.RuntimeConfig.Parameters) if err != nil { return nil, util.Wrap(err, "Converting runtime config's parameters to CDR parameters failed") } - crdTrigger, err := modelToCRDTrigger(modelJob.Trigger) + + scheduledWorkflow, err := NewGenericScheduledWorkflow(modelJob) if err != nil { - return nil, util.Wrap(err, "converting model trigger to crd trigger failed") + return nil, err } - scheduledWorkflow := &scheduledworkflow.ScheduledWorkflow{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "kubeflow.org/v2beta1", - Kind: "ScheduledWorkflow", - }, - ObjectMeta: metav1.ObjectMeta{GenerateName: swfGeneratedName}, - Spec: scheduledworkflow.ScheduledWorkflowSpec{ - Enabled: modelJob.Enabled, - MaxConcurrency: &modelJob.MaxConcurrency, - Trigger: crdTrigger, - Workflow: &scheduledworkflow.WorkflowResource{ - Parameters: parameters, - Spec: executionSpec.ToStringForSchedule(), - }, - NoCatchup: util.BoolPointer(modelJob.NoCatchup), - ExperimentId: modelJob.ExperimentId, - PipelineId: modelJob.PipelineId, - PipelineName: modelJob.PipelineName, - PipelineVersionId: modelJob.PipelineVersionId, - ServiceAccount: executionSpec.ServiceAccount(), - }, + scheduledWorkflow.Spec.Workflow = &scheduledworkflow.WorkflowResource{ + Parameters: parameters, + Spec: executionSpec.ToStringForSchedule(), } + + scheduledWorkflow.Spec.ServiceAccount = executionSpec.ServiceAccount() + return scheduledWorkflow, nil } diff --git a/backend/src/common/util/service.go b/backend/src/common/util/service.go index cf0f5379a38..c5624b64229 100644 --- a/backend/src/common/util/service.go +++ b/backend/src/common/util/service.go @@ -15,6 +15,7 @@ package util import ( + "context" "fmt" "net/http" "strings" @@ -94,6 +95,16 @@ func GetKubernetesClientFromClientConfig(clientConfig clientcmd.ClientConfig) ( return clientSet, config, namespace, nil } +func GetRpcConnectionWithTimeout(address string, timeout time.Time) (*grpc.ClientConn, error) { + ctx, _ := context.WithDeadline(context.Background(), timeout) + + conn, err := grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithBlock()) + if err != nil { + return nil, errors.Wrapf(err, "Failed to create gRPC connection") + } + return conn, nil +} + func GetRpcConnection(address string) (*grpc.ClientConn, error) { conn, err := grpc.Dial(address, grpc.WithInsecure()) if err != nil { diff --git a/backend/src/crd/controller/scheduledworkflow/controller.go b/backend/src/crd/controller/scheduledworkflow/controller.go index 51581c47e46..781d9518069 100644 --- a/backend/src/crd/controller/scheduledworkflow/controller.go +++ b/backend/src/crd/controller/scheduledworkflow/controller.go @@ -20,6 +20,7 @@ import ( "time" workflowapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + api "github.com/kubeflow/pipelines/backend/api/v2beta1/go_client" commonutil "github.com/kubeflow/pipelines/backend/src/common/util" "github.com/kubeflow/pipelines/backend/src/crd/controller/scheduledworkflow/client" "github.com/kubeflow/pipelines/backend/src/crd/controller/scheduledworkflow/util" @@ -30,6 +31,8 @@ import ( swfinformers "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/informers/externalversions" wraperror "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/structpb" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" @@ -40,6 +43,7 @@ import ( _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/transport" "k8s.io/client-go/util/workqueue" ) @@ -60,6 +64,7 @@ type Controller struct { kubeClient *client.KubeClient swfClient *client.ScheduledWorkflowClient workflowClient *client.WorkflowClient + runClient api.RunServiceClient // workqueue is a rate limited work queue. This is used to queue work to be // processed instead of performing it as soon as a change happens. This @@ -73,6 +78,10 @@ type Controller struct { // the timezone loation which the scheduled will use location *time.Location + + // tokenSrc provides a way to get the latest refreshed token when authentication to the REST API server is enabled. + // This will be nil when authentication is not enabled (e.g. Kubeconfig does not have token based authentication). + tokenSrc transport.ResettableTokenSource } // NewController returns a new sample controller @@ -80,10 +89,12 @@ func NewController( kubeClientSet kubernetes.Interface, swfClientSet swfclientset.Interface, workflowClientSet commonutil.ExecutionClient, + runClient api.RunServiceClient, swfInformerFactory swfinformers.SharedInformerFactory, executionInformer commonutil.ExecutionInformer, time commonutil.TimeInterface, location *time.Location, + tokenSrc transport.ResettableTokenSource, ) (*Controller, error) { // obtain references to shared informers swfInformer := swfInformerFactory.Scheduledworkflow().V1beta1().ScheduledWorkflows() @@ -102,11 +113,13 @@ func NewController( controller := &Controller{ kubeClient: client.NewKubeClient(kubeClientSet, recorder), swfClient: client.NewScheduledWorkflowClient(swfClientSet, swfInformer), + runClient: runClient, workflowClient: client.NewWorkflowClient(workflowClientSet, executionInformer), workqueue: workqueue.NewNamedRateLimitingQueue( workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), swfregister.Kind), time: time, location: location, + tokenSrc: tokenSrc, } log.Info("Setting up event handlers") @@ -507,12 +520,72 @@ func (c *Controller) submitNewWorkflowIfNotAlreadySubmitted( } // If the workflow is not found, we need to create it. - newWorkflow, err := swf.NewWorkflow(nextScheduledEpoch, nowEpoch) - createdWorkflow, err := c.workflowClient.Create(ctx, swf.Namespace, newWorkflow) + if swf.Spec.Workflow != nil && swf.Spec.Workflow.Spec != nil { + newWorkflow, err := swf.NewWorkflow(nextScheduledEpoch, nowEpoch) + if err != nil { + return false, "", err + } + + createdWorkflow, err := c.workflowClient.Create(ctx, swf.Namespace, newWorkflow) + if err != nil { + return false, "", err + } + return true, createdWorkflow.ExecutionName(), nil + } + + if c.tokenSrc != nil { + token, err := c.tokenSrc.Token() + if err != nil { + return false, "", fmt.Errorf("Failed to get a token to communicate with the REST API: %w", err) + } + + ctx = metadata.AppendToOutgoingContext(ctx, "Authorization", "Bearer "+token.AccessToken) + } + + var runtimeConfig *api.RuntimeConfig + + if swf.Spec.Workflow != nil { + runtimeConfig = &api.RuntimeConfig{ + Parameters: map[string]*structpb.Value{}, + PipelineRoot: swf.Spec.Workflow.PipelineRoot, + } + + for _, param := range swf.Spec.Workflow.Parameters { + val := &structpb.Value{} + + err := val.UnmarshalJSON([]byte(param.Value)) + if err != nil { + return false, "", err + } + + runtimeConfig.Parameters[param.Name] = val + } + } + + run, err := c.runClient.CreateRun(ctx, &api.CreateRunRequest{ + ExperimentId: swf.Spec.ExperimentId, + Run: &api.Run{ + ExperimentId: swf.Spec.ExperimentId, + DisplayName: swf.NextResourceName(), + RecurringRunId: string(swf.UID), + RuntimeConfig: runtimeConfig, + PipelineSource: &api.Run_PipelineVersionReference{ + PipelineVersionReference: &api.PipelineVersionReference{ + PipelineId: swf.Spec.PipelineId, + // This can be empty, which causes the latest pipeline version to be selected. + PipelineVersionId: swf.Spec.PipelineVersionId, + }, + }, + ServiceAccount: swf.Spec.ServiceAccount, + }, + }) if err != nil { - return false, "", err + return false, "", fmt.Errorf( + "failed to create a run from the scheduled workflow (%s/%s): %w", swf.Namespace, swf.Name, err, + ) } - return true, createdWorkflow.ExecutionName(), nil + + return true, run.DisplayName, nil } func (c *Controller) updateStatus( diff --git a/backend/src/crd/controller/scheduledworkflow/main.go b/backend/src/crd/controller/scheduledworkflow/main.go index e4d176a2a1f..bde7e22d870 100644 --- a/backend/src/crd/controller/scheduledworkflow/main.go +++ b/backend/src/crd/controller/scheduledworkflow/main.go @@ -16,9 +16,11 @@ package main import ( "flag" + "fmt" "strings" "time" + api "github.com/kubeflow/pipelines/backend/api/v2beta1/go_client" commonutil "github.com/kubeflow/pipelines/backend/src/common/util" "github.com/kubeflow/pipelines/backend/src/crd/controller/scheduledworkflow/util" swfclientset "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned" @@ -26,19 +28,30 @@ import ( "github.com/kubeflow/pipelines/backend/src/crd/pkg/signals" log "github.com/sirupsen/logrus" "github.com/spf13/viper" + "golang.org/x/oauth2" "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/transport" ) var ( - logLevel string - masterURL string - kubeconfig string - namespace string - location *time.Location - clientQPS float64 - clientBurst int + logLevel string + masterURL string + kubeconfig string + namespace string + location *time.Location + clientQPS float64 + clientBurst int + mlPipelineAPIServerName string + mlPipelineServiceGRPCPort string +) + +const ( + // These flags match the persistence agent + mlPipelineAPIServerBasePathFlagName = "mlPipelineAPIServerBasePath" + mlPipelineAPIServerNameFlagName = "mlPipelineAPIServerName" + mlPipelineAPIServerGRPCPortFlagName = "mlPipelineServiceGRPCPort" ) func main() { @@ -85,14 +98,37 @@ func main() { scheduleInformerFactory = swfinformers.NewFilteredSharedInformerFactory(scheduleClient, time.Second*30, namespace, nil) } + grpcAddress := fmt.Sprintf("%s:%s", mlPipelineAPIServerName, mlPipelineServiceGRPCPort) + + log.Infof("Connecting the API server over GRPC at: %s", grpcAddress) + apiConnection, err := commonutil.GetRpcConnectionWithTimeout(grpcAddress, time.Now().Add(time.Minute)) + if err != nil { + log.Fatalf("Error connecting to the API server after trying for one minute: %v", err) + } + + var tokenSrc transport.ResettableTokenSource + + if cfg.BearerTokenFile != "" { + tokenSrc = transport.NewCachedFileTokenSource(cfg.BearerTokenFile) + } else if cfg.BearerToken != "" { + tokenSrc = transport.NewCachedTokenSource(oauth2.StaticTokenSource(&oauth2.Token{AccessToken: cfg.BearerToken})) + } + + runClient := api.NewRunServiceClient(apiConnection) + + log.Info("Successfully connected to the API server") + controller, err := NewController( kubeClient, scheduleClient, execClient, + runClient, scheduleInformerFactory, execInformer, commonutil.NewRealTime(), - location) + location, + tokenSrc, + ) if err != nil { log.Fatalf("Failed to instantiate the controller: %v", err) } @@ -123,6 +159,8 @@ func init() { // Use default value of client QPS (5) & burst (10) defined in // k8s.io/client-go/rest/config.go#RESTClientFor flag.Float64Var(&clientQPS, "clientQPS", 5, "The maximum QPS to the master from this client.") + flag.StringVar(&mlPipelineAPIServerName, mlPipelineAPIServerNameFlagName, "ml-pipeline", "Name of the ML pipeline API server.") + flag.StringVar(&mlPipelineServiceGRPCPort, mlPipelineAPIServerGRPCPortFlagName, "8887", "GRPC Port of the ML pipeline API server.") flag.IntVar(&clientBurst, "clientBurst", 10, "Maximum burst for throttle from this client.") var err error location, err = util.GetLocation() diff --git a/backend/src/crd/controller/scheduledworkflow/util/scheduled_workflow.go b/backend/src/crd/controller/scheduledworkflow/util/scheduled_workflow.go index 3e1aac51211..ea4bfe2b8f1 100644 --- a/backend/src/crd/controller/scheduledworkflow/util/scheduled_workflow.go +++ b/backend/src/crd/controller/scheduledworkflow/util/scheduled_workflow.go @@ -17,12 +17,13 @@ package util import ( "fmt" "hash/fnv" - corev1 "k8s.io/api/core/v1" "math" "sort" "strconv" "time" + corev1 "k8s.io/api/core/v1" + commonutil "github.com/kubeflow/pipelines/backend/src/common/util" swfapi "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" diff --git a/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1/types.go b/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1/types.go index 66cc7692707..3de0d6dae15 100644 --- a/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1/types.go +++ b/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1/types.go @@ -108,10 +108,14 @@ type WorkflowResource struct { Parameters []Parameter `json:"parameters,omitempty"` + PipelineRoot string `json:"pipelineRoot,omitempty"` + // Specification of the workflow to start. // Use interface{} for backward compatibility // TODO: change it to string and avoid type casting // after several releases + // This is deprecated. In a future release, this will be ignored and this will be compiled by the API server + // at runtime. Spec interface{} `json:"spec,omitempty"` } diff --git a/backend/test/v2/integration/recurring_run_api_test.go b/backend/test/v2/integration/recurring_run_api_test.go index cfb0a5a245d..0cf92365515 100644 --- a/backend/test/v2/integration/recurring_run_api_test.go +++ b/backend/test/v2/integration/recurring_run_api_test.go @@ -373,6 +373,73 @@ func (s *RecurringRunApiTestSuite) TestRecurringRunApis() { } } +func (s *RecurringRunApiTestSuite) TestRecurringRunApisUseLatest() { + t := s.T() + + /* ---------- Upload pipelines YAML ---------- */ + helloWorldPipeline, err := s.pipelineUploadClient.UploadFile("../resources/hello-world.yaml", upload_params.NewUploadPipelineParams()) + assert.Nil(t, err) + + /* ---------- Upload pipeline version YAML ---------- */ + time.Sleep(1 * time.Second) + helloWorldPipelineVersion, err := s.pipelineUploadClient.UploadPipelineVersion( + "../resources/hello-world.yaml", &upload_params.UploadPipelineVersionParams{ + Name: util.StringPointer("hello-world-version"), + Pipelineid: util.StringPointer(helloWorldPipeline.PipelineID), + }) + assert.Nil(t, err) + + /* ---------- Create a new hello world experiment ---------- */ + experiment := test.MakeExperiment("hello world experiment", "", s.resourceNamespace) + helloWorldExperiment, err := s.experimentClient.Create(&experiment_params.ExperimentServiceCreateExperimentParams{Body: experiment}) + assert.Nil(t, err) + + /* ---------- Create a new hello world recurringRun by specifying pipeline ID without a version ---------- */ + createRecurringRunRequest := &recurring_run_params.RecurringRunServiceCreateRecurringRunParams{Body: &recurring_run_model.V2beta1RecurringRun{ + DisplayName: "hello world with latest pipeline version", + Description: "this is hello world", + ExperimentID: helloWorldExperiment.ExperimentID, + PipelineVersionReference: &recurring_run_model.V2beta1PipelineVersionReference{ + PipelineID: helloWorldPipelineVersion.PipelineID, + }, + MaxConcurrency: 10, + Mode: recurring_run_model.RecurringRunModeENABLE, + }} + helloWorldRecurringRun, err := s.recurringRunClient.Create(createRecurringRunRequest) + assert.Nil(t, err) + + // The scheduledWorkflow CRD would create the run and it synced to the DB by persistent agent. + // This could take a few seconds to finish. + + /* ---------- Check run for hello world recurringRun ---------- */ + var helloWorldRun *run_model.V2beta1Run + + if err := retrier.New(retrier.ConstantBackoff(8, 5*time.Second), nil).Run(func() error { + runs, totalSize, _, err := s.runClient.List(&run_params.RunServiceListRunsParams{ + ExperimentID: util.StringPointer(helloWorldExperiment.ExperimentID), + }) + if err != nil { + return err + } + if len(runs) != 1 { + return fmt.Errorf("expected runs to be length 1, got: %v", len(runs)) + } + if totalSize != 1 { + return fmt.Errorf("expected total size 1, got: %v", totalSize) + } + helloWorldRun = runs[0] + return s.checkHelloWorldRun(helloWorldRun, helloWorldExperiment.ExperimentID, helloWorldRecurringRun.RecurringRunID) + }); err != nil { + assert.Nil(t, err) + assert.FailNow(t, "Timed out waiting for the recurring run") + } + + // Verify the latest pipeline version was selected + assert.Equal( + t, helloWorldPipelineVersion.PipelineVersionID, helloWorldRun.PipelineVersionReference.PipelineVersionID, + ) +} + func (s *RecurringRunApiTestSuite) TestRecurringRunApis_noCatchupOption() { t := s.T() diff --git a/manifests/kustomize/base/installs/multi-user/scheduled-workflow/cluster-role.yaml b/manifests/kustomize/base/installs/multi-user/scheduled-workflow/cluster-role.yaml index fd868eaad07..45a10bb45ec 100644 --- a/manifests/kustomize/base/installs/multi-user/scheduled-workflow/cluster-role.yaml +++ b/manifests/kustomize/base/installs/multi-user/scheduled-workflow/cluster-role.yaml @@ -15,6 +15,12 @@ rules: - update - patch - delete +- apiGroups: + - pipelines.kubeflow.org + resources: + - runs + verbs: + - create - apiGroups: - kubeflow.org resources: diff --git a/manifests/kustomize/base/pipeline/ml-pipeline-scheduledworkflow-deployment.yaml b/manifests/kustomize/base/pipeline/ml-pipeline-scheduledworkflow-deployment.yaml index 045a0882302..6bec64a139a 100644 --- a/manifests/kustomize/base/pipeline/ml-pipeline-scheduledworkflow-deployment.yaml +++ b/manifests/kustomize/base/pipeline/ml-pipeline-scheduledworkflow-deployment.yaml @@ -41,4 +41,15 @@ spec: capabilities: drop: - ALL + volumeMounts: + - mountPath: /var/run/secrets/kubeflow/tokens + name: scheduledworkflow-sa-token serviceAccountName: ml-pipeline-scheduledworkflow + volumes: + - name: scheduledworkflow-sa-token + projected: + sources: + - serviceAccountToken: + path: scheduledworkflow-sa-token + expirationSeconds: 3600 + audience: pipelines.kubeflow.org diff --git a/manifests/kustomize/base/pipeline/ml-pipeline-scheduledworkflow-role.yaml b/manifests/kustomize/base/pipeline/ml-pipeline-scheduledworkflow-role.yaml index 36729d74ed3..c6f4918f1d8 100644 --- a/manifests/kustomize/base/pipeline/ml-pipeline-scheduledworkflow-role.yaml +++ b/manifests/kustomize/base/pipeline/ml-pipeline-scheduledworkflow-role.yaml @@ -30,6 +30,12 @@ rules: - update - patch - delete +- apiGroups: + - pipelines.kubeflow.org + resources: + - runs + verbs: + - create - apiGroups: - '' resources: