From e77a36e0fce25a63bfcaa8543d7adab83f5b31ef Mon Sep 17 00:00:00 2001 From: justinsb Date: Fri, 6 Sep 2024 14:57:34 -0400 Subject: [PATCH 1/3] feat: allow users to opt-in to direct reconciliation Add support for a new annotation, 'alpha.cnrm.cloud.google.com/reconciler' If set to `direct`, the direct reconciler will be used. --- pkg/controller/dcl/controller.go | 12 ++- pkg/controller/direct/registry/registry.go | 9 +- .../direct/sql/sqlinstance_controller.go | 9 +- pkg/controller/predicate/optin.go | 33 +++++++ .../registration/registration_controller.go | 86 +++++++++++-------- pkg/controller/tf/controller.go | 6 +- 6 files changed, 106 insertions(+), 49 deletions(-) create mode 100644 pkg/controller/predicate/optin.go diff --git a/pkg/controller/dcl/controller.go b/pkg/controller/dcl/controller.go index cd3447704c..682e23bdab 100644 --- a/pkg/controller/dcl/controller.go +++ b/pkg/controller/dcl/controller.go @@ -27,7 +27,7 @@ import ( "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/jitter" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/lifecyclehandler" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/metrics" - "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/predicate" + kccpredicate "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/predicate" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/ratelimiter" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/resourceactuation" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/resourcewatcher" @@ -64,6 +64,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -98,10 +99,15 @@ type Reconciler struct { } func Add(mgr manager.Manager, crd *apiextensions.CustomResourceDefinition, converter *conversion.Converter, - dclConfig *mmdcl.Config, serviceMappingLoader *servicemappingloader.ServiceMappingLoader, defaulters []k8s.Defaulter, jitterGenerator jitter.Generator) (k8s.SchemaReferenceUpdater, error) { + dclConfig *mmdcl.Config, serviceMappingLoader *servicemappingloader.ServiceMappingLoader, defaulters []k8s.Defaulter, jitterGenerator jitter.Generator, + additionalPredicate predicate.Predicate) (k8s.SchemaReferenceUpdater, error) { if jitterGenerator == nil { return nil, fmt.Errorf("jitter generator not initialized") } + predicates := []predicate.Predicate{kccpredicate.UnderlyingResourceOutOfSyncPredicate{}} + if additionalPredicate != nil { + predicates = append(predicates, additionalPredicate) + } kind := crd.Spec.Names.Kind apiVersion := k8s.GetAPIVersionFromCRD(crd) controllerName := fmt.Sprintf("%v-controller", strings.ToLower(kind)) @@ -122,7 +128,7 @@ func Add(mgr manager.Manager, crd *apiextensions.CustomResourceDefinition, conve Named(controllerName). WithOptions(controller.Options{MaxConcurrentReconciles: k8s.ControllerMaxConcurrentReconciles, RateLimiter: ratelimiter.NewRateLimiter()}). WatchesRawSource(&source.Channel{Source: immediateReconcileRequests}, &handler.EnqueueRequestForObject{}). - For(obj, builder.OnlyMetadata, builder.WithPredicates(predicate.UnderlyingResourceOutOfSyncPredicate{})). + For(obj, builder.OnlyMetadata, builder.WithPredicates(predicates...)). Build(r) if err != nil { return nil, fmt.Errorf("error creating new controller: %w", err) diff --git a/pkg/controller/direct/registry/registry.go b/pkg/controller/direct/registry/registry.go index c4437b0867..cb32532cd0 100644 --- a/pkg/controller/direct/registry/registry.go +++ b/pkg/controller/direct/registry/registry.go @@ -92,13 +92,8 @@ func Init(ctx context.Context, config *config.ControllerConfig) error { } func RegisterModel(gvk schema.GroupVersionKind, modelFn ModelFactoryFunc) { - if singleton.registrations == nil { - singleton.registrations = make(map[schema.GroupKind]*registration) - } - singleton.registrations[gvk.GroupKind()] = ®istration{ - gvk: gvk, - factory: modelFn, - } + rg := &predicate.OptInToDirectReconciliation{} + RegisterModelWithReconcileGate(gvk, modelFn, rg) } func RegisterModelWithReconcileGate(gvk schema.GroupVersionKind, modelFn ModelFactoryFunc, rg predicate.ReconcileGate) { diff --git a/pkg/controller/direct/sql/sqlinstance_controller.go b/pkg/controller/direct/sql/sqlinstance_controller.go index 8333582e97..673fa55f3d 100644 --- a/pkg/controller/direct/sql/sqlinstance_controller.go +++ b/pkg/controller/direct/sql/sqlinstance_controller.go @@ -43,11 +43,16 @@ func init() { registry.RegisterModelWithReconcileGate(krm.SQLInstanceGVK, newSQLInstanceModel, rg) } -type SQLInstanceReconcileGate struct{} +type SQLInstanceReconcileGate struct { + optIn kccpredicate.OptInToDirectReconciliation +} var _ kccpredicate.ReconcileGate = &SQLInstanceReconcileGate{} -func (*SQLInstanceReconcileGate) ShouldReconcile(o *unstructured.Unstructured) bool { +func (r *SQLInstanceReconcileGate) ShouldReconcile(o *unstructured.Unstructured) bool { + if r.optIn.ShouldReconcile(o) { + return true + } obj := &krm.SQLInstance{} if err := runtime.DefaultUnstructuredConverter.FromUnstructured(o.Object, &obj); err != nil { return false diff --git a/pkg/controller/predicate/optin.go b/pkg/controller/predicate/optin.go new file mode 100644 index 0000000000..984b1a1bf3 --- /dev/null +++ b/pkg/controller/predicate/optin.go @@ -0,0 +1,33 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package predicate + +import "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + +// AnnotationKeyAlphaReconciler allows customers to opt-in to using the direct reconciler. +const AnnotationKeyAlphaReconciler = "alpha.cnrm.cloud.google.com/reconciler" + +// OptInToDirectReconciliation allows users to opt in to direct reconciliation +// by specifying an AnnotationKeyAlphaReconciler annotation. +type OptInToDirectReconciliation struct { +} + +var _ ReconcileGate = &OptInToDirectReconciliation{} + +// ShouldReconcile returns true if the reconciler should be used to for the resource. +func (r *OptInToDirectReconciliation) ShouldReconcile(o *unstructured.Unstructured) bool { + v := o.GetAnnotations()[AnnotationKeyAlphaReconciler] + return v == "direct" +} diff --git a/pkg/controller/registration/registration_controller.go b/pkg/controller/registration/registration_controller.go index 496f34100e..9433f40380 100644 --- a/pkg/controller/registration/registration_controller.go +++ b/pkg/controller/registration/registration_controller.go @@ -52,6 +52,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" crlog "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -238,55 +239,72 @@ func registerDefaultController(r *ReconcileRegistration, config *config.Controll } } - // register controllers for dcl-based CRDs - if val, ok := crd.Labels[k8s.DCL2CRDLabel]; ok && val == "true" { - su, err := dclcontroller.Add(r.mgr, crd, r.dclConverter, r.dclConfig, r.smLoader, r.defaulters, r.jitterGenerator) - if err != nil { - return nil, fmt.Errorf("error adding dcl controller for %v to a manager: %w", crd.Spec.Names.Kind, err) + hasDirectController := registry.IsDirectByGK(gvk.GroupKind()) + hasTerraformController := crd.Labels[crdgeneration.TF2CRDLabel] == "true" + hasDCLController := crd.Labels[k8s.DCL2CRDLabel] == "true" + + var useDirectReconcilerPredicate predicate.Predicate + var useLegacyPredicate predicate.Predicate + + // If we have a choice of controllers, construct predicates to choose between them + if hasDirectController && (hasTerraformController || hasDCLController) { + reconcileGate := registry.GetReconcileGate(gvk.GroupKind()) + if reconcileGate != nil { + // If reconcile gate is enabled for this gvk, generate a controller-runtime predicate that will + // run the direct reconciler only when the reconcile gate returns true. + useDirectReconcilerPredicate = kccpredicate.NewReconcilePredicate(r.mgr.GetClient(), gvk, reconcileGate) + useLegacyPredicate = kccpredicate.NewInverseReconcilePredicate(r.mgr.GetClient(), gvk, reconcileGate) } - return su, nil - } - // register controllers for tf-based CRDs - if val, ok := crd.Labels[crdgeneration.TF2CRDLabel]; ok && val == "true" { - su, err := tf.Add(r.mgr, crd, r.provider, r.smLoader, r.defaulters, r.jitterGenerator, nil) - if err != nil { - return nil, fmt.Errorf("error adding terraform controller for %v to a manager: %w", crd.Spec.Names.Kind, err) + + if !hasTerraformController && !hasDCLController { + // We're always going to use the direct reconciler + useDirectReconcilerPredicate = nil + useLegacyPredicate = nil + } + + if (hasTerraformController || hasDCLController) && useDirectReconcilerPredicate == nil { + logger.Error(fmt.Errorf("no predicate where we have multiple controllers"), "skipping direct controller registration", "group", gvk.Group, "version", gvk.Version, "kind", gvk.Kind) + hasDirectController = false } - return su, nil } + // register controllers for direct CRDs - if registry.IsDirectByGK(gvk.GroupKind()) { + if hasDirectController { model, err := registry.GetModel(gvk.GroupKind()) if err != nil { return nil, err } deps := directbase.Deps{ - JitterGenerator: r.jitterGenerator, - } - rg := registry.GetReconcileGate(gvk.GroupKind()) - if rg != nil { - // If reconcile gate is enabled for this gvk, generate a controller-runtime predicate that will - // run the direct reconciler only when the reconcile gate returns true. - rp := kccpredicate.NewReconcilePredicate(r.mgr.GetClient(), gvk, rg) - deps.ReconcilePredicate = rp + JitterGenerator: r.jitterGenerator, + ReconcilePredicate: useDirectReconcilerPredicate, } if err := directbase.AddController(r.mgr, gvk, model, deps); err != nil { return nil, fmt.Errorf("error adding direct controller for %v to a manager: %w", crd.Spec.Names.Kind, err) } - if rg != nil { - // If reconcile gate is enabled for this gvk, generate a controller-runtime predicate that will - // run the terraform-based reconciler when the reconcile gate returns false. - irp := kccpredicate.NewInverseReconcilePredicate(r.mgr.GetClient(), gvk, rg) - su, err := tf.Add(r.mgr, crd, r.provider, r.smLoader, r.defaulters, r.jitterGenerator, irp) - if err != nil { - return nil, fmt.Errorf("error adding terraform controller for %v to a manager: %w", crd.Spec.Names.Kind, err) - } - return su, nil + } + + // register controllers for dcl-based CRDs + if hasDCLController { + su, err := dclcontroller.Add(r.mgr, crd, r.dclConverter, r.dclConfig, r.smLoader, r.defaulters, r.jitterGenerator, useLegacyPredicate) + if err != nil { + return nil, fmt.Errorf("error adding dcl controller for %v to a manager: %w", crd.Spec.Names.Kind, err) } - return schemaUpdater, nil + return su, nil } - logger.Error(fmt.Errorf("unrecognized CRD: %v", crd.Spec.Names.Kind), "skipping controller registration", "group", gvk.Group, "version", gvk.Version, "kind", gvk.Kind) - return nil, nil + // register controllers for tf-based CRDs + if hasTerraformController { + su, err := tf.Add(r.mgr, crd, r.provider, r.smLoader, r.defaulters, r.jitterGenerator, useLegacyPredicate) + if err != nil { + return nil, fmt.Errorf("error adding terraform controller for %v to a manager: %w", crd.Spec.Names.Kind, err) + } + return su, nil + } + + if !hasDCLController && !hasTerraformController && !hasDirectController { + logger.Error(fmt.Errorf("unrecognized CRD: %v", crd.Spec.Names.Kind), "skipping controller registration", "group", gvk.Group, "version", gvk.Version, "kind", gvk.Kind) + return nil, nil + } + } return schemaUpdater, nil } diff --git a/pkg/controller/tf/controller.go b/pkg/controller/tf/controller.go index 6e0f36e207..e6985f70e6 100644 --- a/pkg/controller/tf/controller.go +++ b/pkg/controller/tf/controller.go @@ -80,7 +80,7 @@ type Reconciler struct { resourceWatcherRoutines *semaphore.Weighted // Used to cap number of goroutines watching unready dependencies } -func Add(mgr manager.Manager, crd *apiextensions.CustomResourceDefinition, provider *tfschema.Provider, smLoader *servicemappingloader.ServiceMappingLoader, defaulters []k8s.Defaulter, jitterGenerator jitter.Generator, irp predicate.Predicate) (k8s.SchemaReferenceUpdater, error) { +func Add(mgr manager.Manager, crd *apiextensions.CustomResourceDefinition, provider *tfschema.Provider, smLoader *servicemappingloader.ServiceMappingLoader, defaulters []k8s.Defaulter, jitterGenerator jitter.Generator, additionalPredicate predicate.Predicate) (k8s.SchemaReferenceUpdater, error) { kind := crd.Spec.Names.Kind apiVersion := k8s.GetAPIVersionFromCRD(crd) controllerName := fmt.Sprintf("%v-controller", strings.ToLower(kind)) @@ -97,8 +97,8 @@ func Add(mgr manager.Manager, crd *apiextensions.CustomResourceDefinition, provi }, } predicateList := []predicate.Predicate{kccpredicate.UnderlyingResourceOutOfSyncPredicate{}} - if irp != nil { - predicateList = append(predicateList, irp) + if additionalPredicate != nil { + predicateList = append(predicateList, additionalPredicate) } _, err = builder. ControllerManagedBy(mgr). From 9b0baba2adfcbfa07150a2a8bf1145c495363aa1 Mon Sep 17 00:00:00 2001 From: justinsb Date: Fri, 6 Sep 2024 19:22:58 -0400 Subject: [PATCH 2/3] tests: add opt-in version of test for DataflowFlexTemplateJob We just create a parallel version of an existing test. --- ...dataflowflextemplatejob-direct.golden.yaml | 34 + .../_http.log | 1041 +++++++++++++++++ .../create.yaml | 33 + .../dependencies.yaml | 21 + tests/e2e/httplog.go | 6 + 5 files changed, 1135 insertions(+) create mode 100644 pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/_generated_object_batchdataflowflextemplatejob-direct.golden.yaml create mode 100644 pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/_http.log create mode 100644 pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/create.yaml create mode 100644 pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/dependencies.yaml diff --git a/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/_generated_object_batchdataflowflextemplatejob-direct.golden.yaml b/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/_generated_object_batchdataflowflextemplatejob-direct.golden.yaml new file mode 100644 index 0000000000..4a0768448c --- /dev/null +++ b/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/_generated_object_batchdataflowflextemplatejob-direct.golden.yaml @@ -0,0 +1,34 @@ +apiVersion: dataflow.cnrm.cloud.google.com/v1beta1 +kind: DataflowFlexTemplateJob +metadata: + annotations: + alpha.cnrm.cloud.google.com/reconciler: direct + cnrm.cloud.google.com/management-conflict-prevention-policy: none + cnrm.cloud.google.com/on-delete: cancel + cnrm.cloud.google.com/project-id: ${projectId} + finalizers: + - cnrm.cloud.google.com/finalizer + - cnrm.cloud.google.com/deletion-defender + generation: 1 + labels: + cnrm-test: "true" + name: dataflowflextemplatejob-${uniqueId} + namespace: ${uniqueId} +spec: + containerSpecGcsPath: gs://dataflow-templates/2022-10-03-00_RC00/flex/File_Format_Conversion + parameters: + inputFileFormat: csv + inputFileSpec: gs://config-connector-samples/dataflowflextemplate/numbertest.csv + outputBucket: gs://storagebucket-${uniqueId} + outputFileFormat: avro + schema: gs://config-connector-samples/dataflowflextemplate/numbers.avsc + region: us-central1 +status: + conditions: + - lastTransitionTime: "1970-01-01T00:00:00Z" + message: The resource is up to date + reason: UpToDate + status: "True" + type: Ready + jobId: ${jobID} + observedGeneration: 1 diff --git a/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/_http.log b/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/_http.log new file mode 100644 index 0000000000..337de31165 --- /dev/null +++ b/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/_http.log @@ -0,0 +1,1041 @@ +GET https://storage.googleapis.com/storage/v1/b/storagebucket-${uniqueId}?alt=json&prettyPrint=false +User-Agent: google-api-go-client/0.5 Terraform/ (+https://www.terraform.io) Terraform-Plugin-SDK/2.10.1 terraform-provider-google-beta/kcc/controller-manager + +404 Not Found +Cache-Control: no-cache, no-store, max-age=0, must-revalidate +Content-Type: application/json; charset=UTF-8 +Expires: Mon, 01 Jan 1990 00:00:00 GMT +Pragma: no-cache +Server: UploadServer +Vary: Origin +Vary: X-Origin + +{ + "error": { + "code": 404, + "errors": [ + { + "domain": "global", + "message": "The specified bucket does not exist.", + "reason": "notFound" + } + ], + "message": "The specified bucket does not exist." + } +} + +--- + +POST https://storage.googleapis.com/storage/v1/b?alt=json&prettyPrint=false&project=${projectId} +Content-Type: application/json +User-Agent: google-api-go-client/0.5 Terraform/ (+https://www.terraform.io) Terraform-Plugin-SDK/2.10.1 terraform-provider-google-beta/kcc/controller-manager + +{ + "iamConfiguration": { + "uniformBucketLevelAccess": { + "enabled": false + } + }, + "labels": { + "cnrm-test": "true", + "managed-by-cnrm": "true" + }, + "lifecycle": { + "rule": [] + }, + "name": "storagebucket-${uniqueId}", + "storageClass": "STANDARD" +} + +200 OK +Cache-Control: no-cache, no-store, max-age=0, must-revalidate +Content-Type: application/json; charset=UTF-8 +Expires: Mon, 01 Jan 1990 00:00:00 GMT +Pragma: no-cache +Server: UploadServer +Vary: Origin +Vary: X-Origin + +{ + "etag": "abcdef0123A=", + "iamConfiguration": { + "bucketPolicyOnly": { + "enabled": false + }, + "publicAccessPrevention": "inherited", + "uniformBucketLevelAccess": { + "enabled": false + } + }, + "id": "000000000000000000000", + "kind": "storage#bucket", + "labels": { + "cnrm-test": "true", + "managed-by-cnrm": "true" + }, + "location": "US", + "locationType": "multi-region", + "metageneration": "1", + "name": "storagebucket-${uniqueId}", + "projectNumber": "${projectNumber}", + "rpo": "DEFAULT", + "selfLink": "https://www.googleapis.com/storage/v1/b/storagebucket-${uniqueId}", + "softDeletePolicy": { + "effectiveTime": "2024-04-01T12:34:56.123456Z", + "retentionDurationSeconds": "604800" + }, + "storageClass": "STANDARD", + "timeCreated": "2024-04-01T12:34:56.123456Z", + "updated": "2024-04-01T12:34:56.123456Z" +} + +--- + +GET https://storage.googleapis.com/storage/v1/b/storagebucket-${uniqueId}?alt=json&prettyPrint=false +User-Agent: google-api-go-client/0.5 Terraform/ (+https://www.terraform.io) Terraform-Plugin-SDK/2.10.1 terraform-provider-google-beta/kcc/controller-manager + +200 OK +Cache-Control: private, max-age=0, must-revalidate, no-transform +Content-Type: application/json; charset=UTF-8 +Expires: {now+0m} +Server: UploadServer +Vary: Origin +Vary: X-Origin + +{ + "etag": "abcdef0123A=", + "iamConfiguration": { + "bucketPolicyOnly": { + "enabled": false + }, + "publicAccessPrevention": "inherited", + "uniformBucketLevelAccess": { + "enabled": false + } + }, + "id": "000000000000000000000", + "kind": "storage#bucket", + "labels": { + "cnrm-test": "true", + "managed-by-cnrm": "true" + }, + "location": "US", + "locationType": "multi-region", + "metageneration": "1", + "name": "storagebucket-${uniqueId}", + "projectNumber": "${projectNumber}", + "rpo": "DEFAULT", + "selfLink": "https://www.googleapis.com/storage/v1/b/storagebucket-${uniqueId}", + "softDeletePolicy": { + "effectiveTime": "2024-04-01T12:34:56.123456Z", + "retentionDurationSeconds": "604800" + }, + "storageClass": "STANDARD", + "timeCreated": "2024-04-01T12:34:56.123456Z", + "updated": "2024-04-01T12:34:56.123456Z" +} + +--- + +POST https://dataflow.googleapis.com/v1b3/projects/${projectId}/locations/us-central1/flexTemplates:launch?%24alt=json%3Benum-encoding%3Dint +Content-Type: application/json +User-Agent: kcc/controller-manager +x-goog-request-params: project_id=${projectId}&location=us-central1 + +{ + "launchParameter": { + "containerSpecGcsPath": "gs://dataflow-templates/2022-10-03-00_RC00/flex/File_Format_Conversion", + "environment": {}, + "jobName": "dataflowflextemplatejob-${uniqueId}", + "parameters": { + "inputFileFormat": "csv", + "inputFileSpec": "gs://config-connector-samples/dataflowflextemplate/numbertest.csv", + "outputBucket": "gs://storagebucket-${uniqueId}", + "outputFileFormat": "avro", + "schema": "gs://config-connector-samples/dataflowflextemplate/numbers.avsc" + } + }, + "location": "us-central1", + "projectId": "${projectId}" +} + +200 OK +Cache-Control: private +Content-Type: application/json; charset=UTF-8 +Server: ESF +Vary: Origin +Vary: X-Origin +Vary: Referer +X-Content-Type-Options: nosniff +X-Frame-Options: SAMEORIGIN +X-Xss-Protection: 0 + +{ + "job": { + "createTime": "2024-04-01T12:34:56.123456Z", + "currentStateTime": "1970-01-01T00:00:00Z", + "id": "${jobID}", + "location": "us-central1", + "name": "dataflowflextemplatejob-${uniqueId}", + "projectId": "${projectId}", + "startTime": "2024-04-01T12:34:56.123456Z" + } +} + +--- + +GET https://dataflow.googleapis.com/v1b3/projects/${projectId}/locations/us-central1/jobs/${jobID}?%24alt=json%3Benum-encoding%3Dint&view=JOB_VIEW_SUMMARY +Content-Type: application/json +User-Agent: kcc/controller-manager +x-goog-request-params: project_id=${projectId}&location=us-central1&job_id=${jobID} + +200 OK +Cache-Control: private +Content-Type: application/json; charset=UTF-8 +Server: ESF +Vary: Origin +Vary: X-Origin +Vary: Referer +X-Content-Type-Options: nosniff +X-Frame-Options: SAMEORIGIN +X-Xss-Protection: 0 + +{ + "createTime": "2024-04-01T12:34:56.123456Z", + "currentState": 11, + "currentStateTime": "2024-04-01T12:34:56.123456Z", + "environment": { + "dataset": "bigquery.googleapis.com/cloud_dataflow" + }, + "id": "000000000000000000000", + "labels": { + "goog-dataflow-provided-template-name": "file_format_conversion", + "goog-dataflow-provided-template-type": "flex" + }, + "location": "us-central1", + "name": "dataflowflextemplatejob-${uniqueId}", + "pipelineDescription": { + "removed": "simplicity" + }, + "projectId": "${projectId}", + "startTime": "2024-04-01T12:34:56.123456Z" +} + +--- + +GET https://dataflow.googleapis.com/v1b3/projects/${projectId}/locations/us-central1/jobs/${jobID}?%24alt=json%3Benum-encoding%3Dint&view=JOB_VIEW_SUMMARY +Content-Type: application/json +User-Agent: kcc/controller-manager +x-goog-request-params: project_id=${projectId}&location=us-central1&job_id=${jobID} + +200 OK +Cache-Control: private +Content-Type: application/json; charset=UTF-8 +Server: ESF +Vary: Origin +Vary: X-Origin +Vary: Referer +X-Content-Type-Options: nosniff +X-Frame-Options: SAMEORIGIN +X-Xss-Protection: 0 + +{ + "createTime": "2024-04-01T12:34:56.123456Z", + "currentState": 9, + "currentStateTime": "2024-04-01T12:34:56.123456Z", + "environment": { + "dataset": "bigquery.googleapis.com/cloud_dataflow", + "experiments": [ + "auto_google_template_runner_v2", + "auto_high_core_runner_v2", + "auto_runner_v2_min_sdk=2.54.0", + "configure_shuffle_service_addresses_in_control_plane", + "delayed_launch", + "disable_baggins_exp", + "disable_primeflex", + "disable_runner_v2_reason=java_job_google_template", + "ek_regions=", + "enable_always_on_exception_sampling", + "enable_async_job_creation", + "enable_billing_v_1_5", + "enable_cloud_permissions_checking", + "enable_cmek_org_policy_check", + "enable_compute_default_service_account_org_policy", + "enable_data_sampling_telemetry", + "enable_dataprep_new_billing", + "enable_fnapi_multimap_side_input_bulk_read", + "enable_memory_sampler", + "enable_oom_sampler", + "enable_recommendations", + "enable_remote_image_ping", + "enable_secure_boot", + "enable_throttled_based_rescaling", + "enable_worker_cloud_monitoring_exporter", + "enable_worker_disk_cloud_monitoring", + "enable_worker_memory_cloud_monitoring", + "enable_zonal_outage_aware_routing", + "limit_preemptible_worker_pct", + "limit_resizing_by_cpu_util", + "min_sdk_version_to_reject_worker_in_different_region_than_service=2.44.0", + "override_controller_service_account", + "primeflex_slow_start_pct=5", + "primeflex_slow_start_seconds=3600", + "regional_physical_zone_separation_enabled", + "shuffle_mode=auto", + "shuffle_service_address_type=DIRECTPATH_WITH_CFE_FALLBACK", + "sideinput_io_metrics", + "use_dataflow_service_account_in_igm", + "use_e2_for_default_machine_type_worker_regions=africa-south1,europe-north2,europe-southwest1,europe-west10,europe-west12,europe-west8,europe-west9,me-central1,me-central2,me-west1,northamerica-south1,southamerica-west1,us-east10,us-east5,us-east7,us-south1,us-west8", + "use_job_admission_controller", + "use_multi_hop_delegation", + "use_templates_regional_bucket", + "use_worker_zone_chooser_by_default" + ], + "sdkPipelineOptions": { + "display_data": [], + "options": { + "apiRootUrl": "https://dataflow.googleapis.com/", + "appName": "FileFormatConversion", + "autoscalingAlgorithm": null, + "containsHeaders": false, + "credentialFactoryClass": "org.apache.beam.sdk.extensions.gcp.auth.GcpCredentialFactory", + "csvFileEncoding": "UTF-8", + "csvFormat": "Default", + "dataflowEndpoint": "", + "dataflowKmsKey": null, + "dataflowServiceOptions": null, + "dataflowWorkerJar": null, + "defaultEnvironmentConfig": null, + "defaultEnvironmentType": null, + "delimiter": ",", + "diskSizeGb": 0, + "enableCloudDebugger": false, + "enableStreamingEngine": false, + "environmentOptions": null, + "experiments": [ + "disable_runner_v2_reason=java_job_google_template", + "enable_always_on_exception_sampling" + ], + "filesToStage": [ + "/template/file-format-conversion/file-format-conversion.jar" + ], + "gcpTempLocation": "gs://dataflow-staging-us-central1-${projectNumber}/tmp", + "gcsPerformanceMetrics": false, + "gcsUploadBufferSizeBytes": null, + "googleApiTrace": null, + "hotKeyLoggingEnabled": false, + "inputFileFormat": "csv", + "inputFileSpec": "gs://config-connector-samples/dataflowflextemplate/numbertest.csv", + "jobName": "dataflowflextemplatejob-${uniqueId}", + "labels": { + "goog-dataflow-provided-template-name": "file_format_conversion", + "goog-dataflow-provided-template-type": "flex" + }, + "logDetailedCsvConversionErrors": false, + "maxNumWorkers": 0, + "network": null, + "numShards": 0, + "numWorkers": 0, + "numberOfWorkerHarnessThreads": 0, + "optionsId": 0, + "outputBucket": "gs://storagebucket-${uniqueId}", + "outputFileFormat": "avro", + "outputFilePrefix": "output", + "overrideWindmillBinary": null, + "pathValidatorClass": "org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator", + "pipelineUrl": "${pipelineUrl}", + "project": "${projectId}", + "recordJfrOnGcThrashing": false, + "region": "us-central1", + "resourceHints": [], + "runner": "org.apache.beam.runners.dataflow.DataflowRunner", + "saveProfilesToGcs": null, + "schema": "gs://config-connector-samples/dataflowflextemplate/numbers.avsc", + "sdkContainerImage": null, + "sdkHarnessContainerImageOverrides": null, + "serviceAccount": "${projectNumber}-compute@developer.gserviceaccount.com", + "stableUniqueNames": "WARNING", + "stagerClass": "org.apache.beam.runners.dataflow.util.GcsStager", + "stagingLocation": "gs://dataflow-staging-us-central1-${projectNumber}/staging", + "streaming": false, + "subnetwork": null, + "tempLocation": "gs://dataflow-staging-us-central1-${projectNumber}/tmp", + "templateLocation": "gs://dataflow-staging-us-central1-${projectNumber}/staging/template_launches/${jobID}/job_object", + "userAgent": "Apache_Beam_SDK_for_Java/2.41.0(JRE_11_environment)", + "workerDiskType": null, + "workerHarnessContainerImage": null, + "workerMachineType": null, + "workerRegion": null, + "zone": null + } + }, + "serviceAccountEmail": "${projectNumber}-compute@developer.gserviceaccount.com", + "shuffleMode": 2, + "tempStoragePrefix": "storage.googleapis.com/dataflow-staging-us-central1-${projectNumber}/tmp", + "userAgent": { + "removed": "simplicity" + }, + "version": { + "removed": "simplicity" + }, + "workerPools": [] + }, + "id": "000000000000000000000", + "jobMetadata": { + "removed": "simplicity" + }, + "labels": { + "goog-dataflow-provided-template-name": "file_format_conversion", + "goog-dataflow-provided-template-type": "flex" + }, + "location": "us-central1", + "name": "dataflowflextemplatejob-${uniqueId}", + "pipelineDescription": { + "removed": "simplicity" + }, + "projectId": "${projectId}", + "startTime": "2024-04-01T12:34:56.123456Z", + "steps": [], + "type": 1 +} + +--- + +GET https://dataflow.googleapis.com/v1b3/projects/${projectId}/locations/us-central1/jobs/${jobID}?%24alt=json%3Benum-encoding%3Dint&view=JOB_VIEW_SUMMARY +Content-Type: application/json +User-Agent: kcc/controller-manager +x-goog-request-params: project_id=${projectId}&location=us-central1&job_id=${jobID} + +200 OK +Cache-Control: private +Content-Type: application/json; charset=UTF-8 +Server: ESF +Vary: Origin +Vary: X-Origin +Vary: Referer +X-Content-Type-Options: nosniff +X-Frame-Options: SAMEORIGIN +X-Xss-Protection: 0 + +{ + "createTime": "2024-04-01T12:34:56.123456Z", + "currentState": 2, + "currentStateTime": "2024-04-01T12:34:56.123456Z", + "environment": { + "dataset": "bigquery.googleapis.com/cloud_dataflow", + "experiments": [ + "auto_google_template_runner_v2", + "auto_high_core_runner_v2", + "auto_runner_v2_min_sdk=2.54.0", + "configure_shuffle_service_addresses_in_control_plane", + "delayed_launch", + "disable_baggins_exp", + "disable_primeflex", + "disable_runner_v2_reason=java_job_google_template", + "ek_regions=", + "enable_always_on_exception_sampling", + "enable_async_job_creation", + "enable_billing_v_1_5", + "enable_cloud_permissions_checking", + "enable_cmek_org_policy_check", + "enable_compute_default_service_account_org_policy", + "enable_data_sampling_telemetry", + "enable_dataprep_new_billing", + "enable_fnapi_multimap_side_input_bulk_read", + "enable_memory_sampler", + "enable_oom_sampler", + "enable_recommendations", + "enable_remote_image_ping", + "enable_secure_boot", + "enable_throttled_based_rescaling", + "enable_worker_cloud_monitoring_exporter", + "enable_worker_disk_cloud_monitoring", + "enable_worker_memory_cloud_monitoring", + "enable_zonal_outage_aware_routing", + "limit_preemptible_worker_pct", + "limit_resizing_by_cpu_util", + "min_sdk_version_to_reject_worker_in_different_region_than_service=2.44.0", + "override_controller_service_account", + "primeflex_slow_start_pct=5", + "primeflex_slow_start_seconds=3600", + "regional_physical_zone_separation_enabled", + "shuffle_mode=auto", + "shuffle_service_address_type=DIRECTPATH_WITH_CFE_FALLBACK", + "sideinput_io_metrics", + "use_dataflow_service_account_in_igm", + "use_e2_for_default_machine_type_worker_regions=africa-south1,europe-north2,europe-southwest1,europe-west10,europe-west12,europe-west8,europe-west9,me-central1,me-central2,me-west1,northamerica-south1,southamerica-west1,us-east10,us-east5,us-east7,us-south1,us-west8", + "use_job_admission_controller", + "use_multi_hop_delegation", + "use_templates_regional_bucket", + "use_worker_zone_chooser_by_default" + ], + "sdkPipelineOptions": { + "display_data": [], + "options": { + "apiRootUrl": "https://dataflow.googleapis.com/", + "appName": "FileFormatConversion", + "autoscalingAlgorithm": null, + "containsHeaders": false, + "credentialFactoryClass": "org.apache.beam.sdk.extensions.gcp.auth.GcpCredentialFactory", + "csvFileEncoding": "UTF-8", + "csvFormat": "Default", + "dataflowEndpoint": "", + "dataflowKmsKey": null, + "dataflowServiceOptions": null, + "dataflowWorkerJar": null, + "defaultEnvironmentConfig": null, + "defaultEnvironmentType": null, + "delimiter": ",", + "diskSizeGb": 0, + "enableCloudDebugger": false, + "enableStreamingEngine": false, + "environmentOptions": null, + "experiments": [ + "disable_runner_v2_reason=java_job_google_template", + "enable_always_on_exception_sampling" + ], + "filesToStage": [ + "/template/file-format-conversion/file-format-conversion.jar" + ], + "gcpTempLocation": "gs://dataflow-staging-us-central1-${projectNumber}/tmp", + "gcsPerformanceMetrics": false, + "gcsUploadBufferSizeBytes": null, + "googleApiTrace": null, + "hotKeyLoggingEnabled": false, + "inputFileFormat": "csv", + "inputFileSpec": "gs://config-connector-samples/dataflowflextemplate/numbertest.csv", + "jobName": "dataflowflextemplatejob-${uniqueId}", + "labels": { + "goog-dataflow-provided-template-name": "file_format_conversion", + "goog-dataflow-provided-template-type": "flex" + }, + "logDetailedCsvConversionErrors": false, + "maxNumWorkers": 0, + "network": null, + "numShards": 0, + "numWorkers": 0, + "numberOfWorkerHarnessThreads": 0, + "optionsId": 0, + "outputBucket": "gs://storagebucket-${uniqueId}", + "outputFileFormat": "avro", + "outputFilePrefix": "output", + "overrideWindmillBinary": null, + "pathValidatorClass": "org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator", + "pipelineUrl": "${pipelineUrl}", + "project": "${projectId}", + "recordJfrOnGcThrashing": false, + "region": "us-central1", + "resourceHints": [], + "runner": "org.apache.beam.runners.dataflow.DataflowRunner", + "saveProfilesToGcs": null, + "schema": "gs://config-connector-samples/dataflowflextemplate/numbers.avsc", + "sdkContainerImage": null, + "sdkHarnessContainerImageOverrides": null, + "serviceAccount": "${projectNumber}-compute@developer.gserviceaccount.com", + "stableUniqueNames": "WARNING", + "stagerClass": "org.apache.beam.runners.dataflow.util.GcsStager", + "stagingLocation": "gs://dataflow-staging-us-central1-${projectNumber}/staging", + "streaming": false, + "subnetwork": null, + "tempLocation": "gs://dataflow-staging-us-central1-${projectNumber}/tmp", + "templateLocation": "gs://dataflow-staging-us-central1-${projectNumber}/staging/template_launches/${jobID}/job_object", + "userAgent": "Apache_Beam_SDK_for_Java/2.41.0(JRE_11_environment)", + "workerDiskType": null, + "workerHarnessContainerImage": null, + "workerMachineType": null, + "workerRegion": null, + "zone": null + } + }, + "serviceAccountEmail": "${projectNumber}-compute@developer.gserviceaccount.com", + "shuffleMode": 2, + "tempStoragePrefix": "storage.googleapis.com/dataflow-staging-us-central1-${projectNumber}/tmp", + "userAgent": { + "removed": "simplicity" + }, + "version": { + "removed": "simplicity" + }, + "workerPools": [] + }, + "id": "000000000000000000000", + "jobMetadata": { + "removed": "simplicity" + }, + "labels": { + "goog-dataflow-provided-template-name": "file_format_conversion", + "goog-dataflow-provided-template-type": "flex" + }, + "location": "us-central1", + "name": "dataflowflextemplatejob-${uniqueId}", + "pipelineDescription": { + "removed": "simplicity" + }, + "projectId": "${projectId}", + "stageStates": [], + "startTime": "2024-04-01T12:34:56.123456Z", + "steps": [], + "type": 1 +} + +--- + +PUT https://dataflow.googleapis.com/v1b3/projects/${projectId}/locations/us-central1/jobs/${jobID}?%24alt=json%3Benum-encoding%3Dint +Content-Type: application/json +User-Agent: kcc/controller-manager +x-goog-request-params: project_id=${projectId}&location=us-central1&job_id=${jobID} + +{ + "requestedState": 5 +} + +200 OK +Cache-Control: private +Content-Type: application/json; charset=UTF-8 +Server: ESF +Vary: Origin +Vary: X-Origin +Vary: Referer +X-Content-Type-Options: nosniff +X-Frame-Options: SAMEORIGIN +X-Xss-Protection: 0 + +{ + "type": 1 +} + +--- + +GET https://dataflow.googleapis.com/v1b3/projects/${projectId}/locations/us-central1/jobs/${jobID}?%24alt=json%3Benum-encoding%3Dint&view=JOB_VIEW_SUMMARY +Content-Type: application/json +User-Agent: kcc/controller-manager +x-goog-request-params: project_id=${projectId}&location=us-central1&job_id=${jobID} + +200 OK +Cache-Control: private +Content-Type: application/json; charset=UTF-8 +Server: ESF +Vary: Origin +Vary: X-Origin +Vary: Referer +X-Content-Type-Options: nosniff +X-Frame-Options: SAMEORIGIN +X-Xss-Protection: 0 + +{ + "createTime": "2024-04-01T12:34:56.123456Z", + "currentState": 10, + "currentStateTime": "2024-04-01T12:34:56.123456Z", + "environment": { + "dataset": "bigquery.googleapis.com/cloud_dataflow", + "experiments": [ + "auto_google_template_runner_v2", + "auto_high_core_runner_v2", + "auto_runner_v2_min_sdk=2.54.0", + "configure_shuffle_service_addresses_in_control_plane", + "delayed_launch", + "disable_baggins_exp", + "disable_primeflex", + "disable_runner_v2_reason=java_job_google_template", + "ek_regions=", + "enable_always_on_exception_sampling", + "enable_async_job_creation", + "enable_billing_v_1_5", + "enable_cloud_permissions_checking", + "enable_cmek_org_policy_check", + "enable_compute_default_service_account_org_policy", + "enable_data_sampling_telemetry", + "enable_dataprep_new_billing", + "enable_fnapi_multimap_side_input_bulk_read", + "enable_memory_sampler", + "enable_oom_sampler", + "enable_recommendations", + "enable_remote_image_ping", + "enable_secure_boot", + "enable_throttled_based_rescaling", + "enable_worker_cloud_monitoring_exporter", + "enable_worker_disk_cloud_monitoring", + "enable_worker_memory_cloud_monitoring", + "enable_zonal_outage_aware_routing", + "limit_preemptible_worker_pct", + "limit_resizing_by_cpu_util", + "min_sdk_version_to_reject_worker_in_different_region_than_service=2.44.0", + "override_controller_service_account", + "primeflex_slow_start_pct=5", + "primeflex_slow_start_seconds=3600", + "regional_physical_zone_separation_enabled", + "shuffle_mode=auto", + "shuffle_service_address_type=DIRECTPATH_WITH_CFE_FALLBACK", + "sideinput_io_metrics", + "use_dataflow_service_account_in_igm", + "use_e2_for_default_machine_type_worker_regions=africa-south1,europe-north2,europe-southwest1,europe-west10,europe-west12,europe-west8,europe-west9,me-central1,me-central2,me-west1,northamerica-south1,southamerica-west1,us-east10,us-east5,us-east7,us-south1,us-west8", + "use_job_admission_controller", + "use_multi_hop_delegation", + "use_templates_regional_bucket", + "use_worker_zone_chooser_by_default" + ], + "sdkPipelineOptions": { + "display_data": [], + "options": { + "apiRootUrl": "https://dataflow.googleapis.com/", + "appName": "FileFormatConversion", + "autoscalingAlgorithm": null, + "containsHeaders": false, + "credentialFactoryClass": "org.apache.beam.sdk.extensions.gcp.auth.GcpCredentialFactory", + "csvFileEncoding": "UTF-8", + "csvFormat": "Default", + "dataflowEndpoint": "", + "dataflowKmsKey": null, + "dataflowServiceOptions": null, + "dataflowWorkerJar": null, + "defaultEnvironmentConfig": null, + "defaultEnvironmentType": null, + "delimiter": ",", + "diskSizeGb": 0, + "enableCloudDebugger": false, + "enableStreamingEngine": false, + "environmentOptions": null, + "experiments": [ + "disable_runner_v2_reason=java_job_google_template", + "enable_always_on_exception_sampling" + ], + "filesToStage": [ + "/template/file-format-conversion/file-format-conversion.jar" + ], + "gcpTempLocation": "gs://dataflow-staging-us-central1-${projectNumber}/tmp", + "gcsPerformanceMetrics": false, + "gcsUploadBufferSizeBytes": null, + "googleApiTrace": null, + "hotKeyLoggingEnabled": false, + "inputFileFormat": "csv", + "inputFileSpec": "gs://config-connector-samples/dataflowflextemplate/numbertest.csv", + "jobName": "dataflowflextemplatejob-${uniqueId}", + "labels": { + "goog-dataflow-provided-template-name": "file_format_conversion", + "goog-dataflow-provided-template-type": "flex" + }, + "logDetailedCsvConversionErrors": false, + "maxNumWorkers": 0, + "network": null, + "numShards": 0, + "numWorkers": 0, + "numberOfWorkerHarnessThreads": 0, + "optionsId": 0, + "outputBucket": "gs://storagebucket-${uniqueId}", + "outputFileFormat": "avro", + "outputFilePrefix": "output", + "overrideWindmillBinary": null, + "pathValidatorClass": "org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator", + "pipelineUrl": "${pipelineUrl}", + "project": "${projectId}", + "recordJfrOnGcThrashing": false, + "region": "us-central1", + "resourceHints": [], + "runner": "org.apache.beam.runners.dataflow.DataflowRunner", + "saveProfilesToGcs": null, + "schema": "gs://config-connector-samples/dataflowflextemplate/numbers.avsc", + "sdkContainerImage": null, + "sdkHarnessContainerImageOverrides": null, + "serviceAccount": "${projectNumber}-compute@developer.gserviceaccount.com", + "stableUniqueNames": "WARNING", + "stagerClass": "org.apache.beam.runners.dataflow.util.GcsStager", + "stagingLocation": "gs://dataflow-staging-us-central1-${projectNumber}/staging", + "streaming": false, + "subnetwork": null, + "tempLocation": "gs://dataflow-staging-us-central1-${projectNumber}/tmp", + "templateLocation": "gs://dataflow-staging-us-central1-${projectNumber}/staging/template_launches/${jobID}/job_object", + "userAgent": "Apache_Beam_SDK_for_Java/2.41.0(JRE_11_environment)", + "workerDiskType": null, + "workerHarnessContainerImage": null, + "workerMachineType": null, + "workerRegion": null, + "zone": null + } + }, + "serviceAccountEmail": "${projectNumber}-compute@developer.gserviceaccount.com", + "shuffleMode": 2, + "tempStoragePrefix": "storage.googleapis.com/dataflow-staging-us-central1-${projectNumber}/tmp", + "userAgent": { + "removed": "simplicity" + }, + "version": { + "removed": "simplicity" + }, + "workerPools": [] + }, + "id": "000000000000000000000", + "jobMetadata": { + "removed": "simplicity" + }, + "labels": { + "goog-dataflow-provided-template-name": "file_format_conversion", + "goog-dataflow-provided-template-type": "flex" + }, + "location": "us-central1", + "name": "dataflowflextemplatejob-${uniqueId}", + "pipelineDescription": { + "removed": "simplicity" + }, + "projectId": "${projectId}", + "requestedState": 5, + "stageStates": [], + "startTime": "2024-04-01T12:34:56.123456Z", + "steps": [], + "type": 1 +} + +--- + +GET https://dataflow.googleapis.com/v1b3/projects/${projectId}/locations/us-central1/jobs/${jobID}?%24alt=json%3Benum-encoding%3Dint&view=JOB_VIEW_SUMMARY +Content-Type: application/json +User-Agent: kcc/controller-manager +x-goog-request-params: project_id=${projectId}&location=us-central1&job_id=${jobID} + +200 OK +Cache-Control: private +Content-Type: application/json; charset=UTF-8 +Server: ESF +Vary: Origin +Vary: X-Origin +Vary: Referer +X-Content-Type-Options: nosniff +X-Frame-Options: SAMEORIGIN +X-Xss-Protection: 0 + +{ + "createTime": "2024-04-01T12:34:56.123456Z", + "currentState": 5, + "currentStateTime": "2024-04-01T12:34:56.123456Z", + "environment": { + "dataset": "bigquery.googleapis.com/cloud_dataflow", + "experiments": [ + "auto_google_template_runner_v2", + "auto_high_core_runner_v2", + "auto_runner_v2_min_sdk=2.54.0", + "configure_shuffle_service_addresses_in_control_plane", + "delayed_launch", + "disable_baggins_exp", + "disable_primeflex", + "disable_runner_v2_reason=java_job_google_template", + "ek_regions=", + "enable_always_on_exception_sampling", + "enable_async_job_creation", + "enable_billing_v_1_5", + "enable_cloud_permissions_checking", + "enable_cmek_org_policy_check", + "enable_compute_default_service_account_org_policy", + "enable_data_sampling_telemetry", + "enable_dataprep_new_billing", + "enable_fnapi_multimap_side_input_bulk_read", + "enable_memory_sampler", + "enable_oom_sampler", + "enable_recommendations", + "enable_remote_image_ping", + "enable_secure_boot", + "enable_throttled_based_rescaling", + "enable_worker_cloud_monitoring_exporter", + "enable_worker_disk_cloud_monitoring", + "enable_worker_memory_cloud_monitoring", + "enable_zonal_outage_aware_routing", + "limit_preemptible_worker_pct", + "limit_resizing_by_cpu_util", + "min_sdk_version_to_reject_worker_in_different_region_than_service=2.44.0", + "override_controller_service_account", + "primeflex_slow_start_pct=5", + "primeflex_slow_start_seconds=3600", + "regional_physical_zone_separation_enabled", + "shuffle_mode=auto", + "shuffle_service_address_type=DIRECTPATH_WITH_CFE_FALLBACK", + "sideinput_io_metrics", + "use_dataflow_service_account_in_igm", + "use_e2_for_default_machine_type_worker_regions=africa-south1,europe-north2,europe-southwest1,europe-west10,europe-west12,europe-west8,europe-west9,me-central1,me-central2,me-west1,northamerica-south1,southamerica-west1,us-east10,us-east5,us-east7,us-south1,us-west8", + "use_job_admission_controller", + "use_multi_hop_delegation", + "use_templates_regional_bucket", + "use_worker_zone_chooser_by_default" + ], + "sdkPipelineOptions": { + "display_data": [], + "options": { + "apiRootUrl": "https://dataflow.googleapis.com/", + "appName": "FileFormatConversion", + "autoscalingAlgorithm": null, + "containsHeaders": false, + "credentialFactoryClass": "org.apache.beam.sdk.extensions.gcp.auth.GcpCredentialFactory", + "csvFileEncoding": "UTF-8", + "csvFormat": "Default", + "dataflowEndpoint": "", + "dataflowKmsKey": null, + "dataflowServiceOptions": null, + "dataflowWorkerJar": null, + "defaultEnvironmentConfig": null, + "defaultEnvironmentType": null, + "delimiter": ",", + "diskSizeGb": 0, + "enableCloudDebugger": false, + "enableStreamingEngine": false, + "environmentOptions": null, + "experiments": [ + "disable_runner_v2_reason=java_job_google_template", + "enable_always_on_exception_sampling" + ], + "filesToStage": [ + "/template/file-format-conversion/file-format-conversion.jar" + ], + "gcpTempLocation": "gs://dataflow-staging-us-central1-${projectNumber}/tmp", + "gcsPerformanceMetrics": false, + "gcsUploadBufferSizeBytes": null, + "googleApiTrace": null, + "hotKeyLoggingEnabled": false, + "inputFileFormat": "csv", + "inputFileSpec": "gs://config-connector-samples/dataflowflextemplate/numbertest.csv", + "jobName": "dataflowflextemplatejob-${uniqueId}", + "labels": { + "goog-dataflow-provided-template-name": "file_format_conversion", + "goog-dataflow-provided-template-type": "flex" + }, + "logDetailedCsvConversionErrors": false, + "maxNumWorkers": 0, + "network": null, + "numShards": 0, + "numWorkers": 0, + "numberOfWorkerHarnessThreads": 0, + "optionsId": 0, + "outputBucket": "gs://storagebucket-${uniqueId}", + "outputFileFormat": "avro", + "outputFilePrefix": "output", + "overrideWindmillBinary": null, + "pathValidatorClass": "org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator", + "pipelineUrl": "${pipelineUrl}", + "project": "${projectId}", + "recordJfrOnGcThrashing": false, + "region": "us-central1", + "resourceHints": [], + "runner": "org.apache.beam.runners.dataflow.DataflowRunner", + "saveProfilesToGcs": null, + "schema": "gs://config-connector-samples/dataflowflextemplate/numbers.avsc", + "sdkContainerImage": null, + "sdkHarnessContainerImageOverrides": null, + "serviceAccount": "${projectNumber}-compute@developer.gserviceaccount.com", + "stableUniqueNames": "WARNING", + "stagerClass": "org.apache.beam.runners.dataflow.util.GcsStager", + "stagingLocation": "gs://dataflow-staging-us-central1-${projectNumber}/staging", + "streaming": false, + "subnetwork": null, + "tempLocation": "gs://dataflow-staging-us-central1-${projectNumber}/tmp", + "templateLocation": "gs://dataflow-staging-us-central1-${projectNumber}/staging/template_launches/${jobID}/job_object", + "userAgent": "Apache_Beam_SDK_for_Java/2.41.0(JRE_11_environment)", + "workerDiskType": null, + "workerHarnessContainerImage": null, + "workerMachineType": null, + "workerRegion": null, + "zone": null + } + }, + "serviceAccountEmail": "${projectNumber}-compute@developer.gserviceaccount.com", + "shuffleMode": 2, + "tempStoragePrefix": "storage.googleapis.com/dataflow-staging-us-central1-${projectNumber}/tmp", + "userAgent": { + "removed": "simplicity" + }, + "version": { + "removed": "simplicity" + }, + "workerPools": [] + }, + "id": "000000000000000000000", + "jobMetadata": { + "removed": "simplicity" + }, + "labels": { + "goog-dataflow-provided-template-name": "file_format_conversion", + "goog-dataflow-provided-template-type": "flex" + }, + "location": "us-central1", + "name": "dataflowflextemplatejob-${uniqueId}", + "pipelineDescription": { + "removed": "simplicity" + }, + "projectId": "${projectId}", + "stageStates": [], + "startTime": "2024-04-01T12:34:56.123456Z", + "steps": [], + "type": 1 +} + +--- + +GET https://storage.googleapis.com/storage/v1/b/storagebucket-${uniqueId}?alt=json&prettyPrint=false +User-Agent: google-api-go-client/0.5 Terraform/ (+https://www.terraform.io) Terraform-Plugin-SDK/2.10.1 terraform-provider-google-beta/kcc/controller-manager + +200 OK +Cache-Control: private, max-age=0, must-revalidate, no-transform +Content-Type: application/json; charset=UTF-8 +Expires: {now+0m} +Server: UploadServer +Vary: Origin +Vary: X-Origin + +{ + "etag": "abcdef0123A=", + "iamConfiguration": { + "bucketPolicyOnly": { + "enabled": false + }, + "publicAccessPrevention": "inherited", + "uniformBucketLevelAccess": { + "enabled": false + } + }, + "id": "000000000000000000000", + "kind": "storage#bucket", + "labels": { + "cnrm-test": "true", + "managed-by-cnrm": "true" + }, + "location": "US", + "locationType": "multi-region", + "metageneration": "1", + "name": "storagebucket-${uniqueId}", + "projectNumber": "${projectNumber}", + "rpo": "DEFAULT", + "selfLink": "https://www.googleapis.com/storage/v1/b/storagebucket-${uniqueId}", + "softDeletePolicy": { + "effectiveTime": "2024-04-01T12:34:56.123456Z", + "retentionDurationSeconds": "604800" + }, + "storageClass": "STANDARD", + "timeCreated": "2024-04-01T12:34:56.123456Z", + "updated": "2024-04-01T12:34:56.123456Z" +} + +--- + +GET https://storage.googleapis.com/storage/v1/b/storagebucket-${uniqueId}/o?alt=json&prettyPrint=false&versions=true +User-Agent: google-api-go-client/0.5 Terraform/ (+https://www.terraform.io) Terraform-Plugin-SDK/2.10.1 terraform-provider-google-beta/kcc/controller-manager + +200 OK +Cache-Control: private, max-age=0, must-revalidate, no-transform +Content-Type: application/json; charset=UTF-8 +Expires: {now+0m} +Server: UploadServer +Vary: Origin +Vary: X-Origin + +{ + "kind": "storage#objects" +} + +--- + +DELETE https://storage.googleapis.com/storage/v1/b/storagebucket-${uniqueId}?alt=json&prettyPrint=false +User-Agent: google-api-go-client/0.5 Terraform/ (+https://www.terraform.io) Terraform-Plugin-SDK/2.10.1 terraform-provider-google-beta/kcc/controller-manager + +204 No Content +Cache-Control: no-cache, no-store, max-age=0, must-revalidate +Content-Type: application/json +Expires: Mon, 01 Jan 1990 00:00:00 GMT +Pragma: no-cache +Server: UploadServer +Vary: Origin +Vary: X-Origin \ No newline at end of file diff --git a/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/create.yaml b/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/create.yaml new file mode 100644 index 0000000000..0954dd4c82 --- /dev/null +++ b/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/create.yaml @@ -0,0 +1,33 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: dataflow.cnrm.cloud.google.com/v1beta1 +kind: DataflowFlexTemplateJob +metadata: + annotations: + cnrm.cloud.google.com/on-delete: "cancel" + alpha.cnrm.cloud.google.com/reconciler: "direct" + name: dataflowflextemplatejob-${uniqueId} +spec: + region: us-central1 + # This is a public, Google-maintained Dataflow Job flex template of a batch job + containerSpecGcsPath: gs://dataflow-templates/2022-10-03-00_RC00/flex/File_Format_Conversion + parameters: + inputFileFormat: csv + outputFileFormat: avro + # This is maintained by us. + inputFileSpec: gs://config-connector-samples/dataflowflextemplate/numbertest.csv + outputBucket: gs://storagebucket-${uniqueId} + # This is maintained by us. + schema: gs://config-connector-samples/dataflowflextemplate/numbers.avsc diff --git a/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/dependencies.yaml b/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/dependencies.yaml new file mode 100644 index 0000000000..376de8329f --- /dev/null +++ b/pkg/test/resourcefixture/testdata/basic/dataflow/v1beta1/dataflowflextemplatejob/batchdataflowflextemplatejob-direct/dependencies.yaml @@ -0,0 +1,21 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: storage.cnrm.cloud.google.com/v1beta1 +kind: StorageBucket +metadata: + annotations: + cnrm.cloud.google.com/force-destroy: "true" + cnrm.cloud.google.com/reconcile-interval-in-seconds : "0" # Avoid time-dependencies + name: storagebucket-${uniqueId} \ No newline at end of file diff --git a/tests/e2e/httplog.go b/tests/e2e/httplog.go index fb30ca723b..7b55946223 100644 --- a/tests/e2e/httplog.go +++ b/tests/e2e/httplog.go @@ -76,6 +76,12 @@ func RemoveExtraEvents(events test.LogEntries) test.LogEntries { case "JOB_STATE_PENDING", "JOB_STATE_QUEUED": return false } + // Also handle when we're encoding enums as integers + currentStateEnum, _, _ := unstructured.NestedInt64(responseBody, "currentState") + switch currentStateEnum { + case 9 /* JOB_STATE_PENDING */, 11 /* JOB_STATE_QUEUED */ : + return false + } return true }) From 2db88e03ed892950850474b1d798bcbfd8987bc7 Mon Sep 17 00:00:00 2001 From: justinsb Date: Fri, 6 Sep 2024 19:23:49 -0400 Subject: [PATCH 3/3] fix: support visiting interface{} in reference normalization We can treat this the same as Ptr --- pkg/controller/direct/monitoring/utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/direct/monitoring/utils.go b/pkg/controller/direct/monitoring/utils.go index 70e73a6c50..43f4447aa5 100644 --- a/pkg/controller/direct/monitoring/utils.go +++ b/pkg/controller/direct/monitoring/utils.go @@ -108,7 +108,7 @@ func (w *visitorWalker) visitAny(path string, v reflect.Value) { } switch v.Kind() { - case reflect.Ptr: + case reflect.Ptr, reflect.Interface: if v.IsNil() { return }