From f21a7db9e698cb7acf562e0392c7adf4fb871372 Mon Sep 17 00:00:00 2001 From: kevinteng525 Date: Thu, 8 Dec 2022 22:50:11 +0800 Subject: [PATCH] Support Restrict Secret Access (#3677) Signed-off-by: kevin --- CHANGELOG.md | 1 + adapter/main.go | 30 +++++- controllers/keda/scaledjob_controller.go | 6 +- controllers/keda/scaledobject_controller.go | 1 - controllers/keda/suite_test.go | 2 +- main.go | 32 ++++++- .../resolver/azure_keyvault_handler.go | 10 +- pkg/scaling/resolver/scale_resolvers.go | 95 +++++++++++-------- pkg/scaling/resolver/scale_resolvers_test.go | 14 ++- pkg/scaling/scale_handler.go | 9 +- pkg/util/env_resolver.go | 29 ++++++ 11 files changed, 167 insertions(+), 62 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 418211e8f5c..13439488131 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -63,6 +63,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **General:** Add support to use pod identities for authentication in Azure Key Vault ([#3813](https://github.com/kedacore/keda/issues/3813) - **General:** Support disable keep http connection alive([#3874](https://github.com/kedacore/keda/issues/3874) - **General:** Improve the function used to normalize metric names ([#3789](https://github.com/kedacore/keda/issues/3789) +- **General:** Support Restrict Secret Access to mitigate the security risk ([#3668](https://github.com/kedacore/keda/issues/3668) - **Apache Kafka Scaler:** SASL/OAuthbearer Implementation ([#3681](https://github.com/kedacore/keda/issues/3681)) - **Apache Kafka Scaler:** Limit Kafka Partitions KEDA operates on ([#3830](https://github.com/kedacore/keda/issues/3830)) - **Apache Kafka Scaler:** Implementation for Excluding Persistent Lag ([#3904](https://github.com/kedacore/keda/issues/3904)) diff --git a/adapter/main.go b/adapter/main.go index b4e0ca786d7..cd7e428a769 100644 --- a/adapter/main.go +++ b/adapter/main.go @@ -28,7 +28,10 @@ import ( "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "k8s.io/klog/v2/klogr" @@ -131,7 +134,25 @@ func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Durat broadcaster := record.NewBroadcaster() recorder := broadcaster.NewRecorder(scheme, corev1.EventSource{Component: "keda-metrics-adapter"}) - handler := scaling.NewScaleHandler(mgr.GetClient(), nil, scheme, globalHTTPTimeout, recorder) + + kubeClientset, err := kubernetes.NewForConfig(cfg) + if err != nil { + logger.Error(err, "Unable to create kube clientset") + return nil, nil, err + } + objectNamespace, err := kedautil.GetClusterObjectNamespace() + if err != nil { + logger.Error(err, "Unable to get cluster object namespace") + return nil, nil, err + } + // the namespaced kubeInformerFactory is used to restrict secret informer to only list/watch secrets in KEDA cluster object namespace, + // refer to https://github.com/kedacore/keda/issues/3668 + kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClientset, 1*time.Hour, kubeinformers.WithNamespace(objectNamespace)) + secretInformer := kubeInformerFactory.Core().V1().Secrets() + + handler := scaling.NewScaleHandler(mgr.GetClient(), nil, scheme, globalHTTPTimeout, recorder, secretInformer.Lister()) + kubeInformerFactory.Start(ctx.Done()) + externalMetricsInfo := &[]provider.ExternalMetricInfo{} externalMetricsInfoLock := &sync.RWMutex{} @@ -146,13 +167,13 @@ func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Durat } stopCh := make(chan struct{}) - if err := runScaledObjectController(ctx, mgr, handler, logger, externalMetricsInfo, externalMetricsInfoLock, maxConcurrentReconciles, stopCh); err != nil { + if err := runScaledObjectController(ctx, mgr, handler, logger, externalMetricsInfo, externalMetricsInfoLock, maxConcurrentReconciles, stopCh, secretInformer.Informer().HasSynced); err != nil { return nil, nil, err } return kedaprovider.NewProvider(ctx, logger, handler, mgr.GetClient(), *grpcClient, useMetricsServiceGrpc, namespace, externalMetricsInfo, externalMetricsInfoLock), stopCh, nil } -func runScaledObjectController(ctx context.Context, mgr manager.Manager, scaleHandler scaling.ScaleHandler, logger logr.Logger, externalMetricsInfo *[]provider.ExternalMetricInfo, externalMetricsInfoLock *sync.RWMutex, maxConcurrentReconciles int, stopCh chan<- struct{}) error { +func runScaledObjectController(ctx context.Context, mgr manager.Manager, scaleHandler scaling.ScaleHandler, logger logr.Logger, externalMetricsInfo *[]provider.ExternalMetricInfo, externalMetricsInfoLock *sync.RWMutex, maxConcurrentReconciles int, stopCh chan<- struct{}, secretSynced cache.InformerSynced) error { if err := (&kedacontrollers.MetricsScaledObjectReconciler{ Client: mgr.GetClient(), ScaleHandler: scaleHandler, @@ -170,6 +191,9 @@ func runScaledObjectController(ctx context.Context, mgr manager.Manager, scaleHa } }() + if ok := cache.WaitForCacheSync(ctx.Done(), secretSynced); !ok { + return fmt.Errorf("failed to wait Secrets cache synced") + } return nil } diff --git a/controllers/keda/scaledjob_controller.go b/controllers/keda/scaledjob_controller.go index 0a5e49b1ac0..e7e11cea143 100644 --- a/controllers/keda/scaledjob_controller.go +++ b/controllers/keda/scaledjob_controller.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" @@ -56,6 +57,8 @@ type ScaledJobReconciler struct { scaledJobGenerations *sync.Map scaleHandler scaling.ScaleHandler + SecretsLister corev1listers.SecretLister + SecretsSynced cache.InformerSynced } type scaledJobMetricsData struct { @@ -75,9 +78,8 @@ func init() { // SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance. func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error { - r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler")) + r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"), r.SecretsLister) r.scaledJobGenerations = &sync.Map{} - return ctrl.NewControllerManagedBy(mgr). WithOptions(options). // Ignore updates to ScaledJob Status (in this case metadata.Generation does not change) diff --git a/controllers/keda/scaledobject_controller.go b/controllers/keda/scaledobject_controller.go index 24a4b4390f6..a81dba5ce9f 100644 --- a/controllers/keda/scaledobject_controller.go +++ b/controllers/keda/scaledobject_controller.go @@ -114,7 +114,6 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, options cont if r.Recorder == nil { return fmt.Errorf("ScaledObjectReconciler.Recorder is not initialized") } - // Start controller return ctrl.NewControllerManagedBy(mgr). WithOptions(options). diff --git a/controllers/keda/suite_test.go b/controllers/keda/suite_test.go index be88c4d53d3..bd73322a868 100644 --- a/controllers/keda/suite_test.go +++ b/controllers/keda/suite_test.go @@ -91,7 +91,7 @@ var _ = BeforeSuite(func(done Done) { Client: k8sManager.GetClient(), Scheme: k8sManager.GetScheme(), Recorder: k8sManager.GetEventRecorderFor("keda-operator"), - ScaleHandler: scaling.NewScaleHandler(k8sManager.GetClient(), scaleClient, k8sManager.GetScheme(), time.Duration(10), k8sManager.GetEventRecorderFor("keda-operator")), + ScaleHandler: scaling.NewScaleHandler(k8sManager.GetClient(), scaleClient, k8sManager.GetScheme(), time.Duration(10), k8sManager.GetEventRecorderFor("keda-operator"), nil), ScaleClient: scaleClient, }).SetupWithManager(k8sManager, controller.Options{}) Expect(err).ToNot(HaveOccurred()) diff --git a/main.go b/main.go index 93d719885ad..d4e31ee389f 100644 --- a/main.go +++ b/main.go @@ -26,8 +26,11 @@ import ( "github.com/spf13/pflag" apimachineryruntime "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" clientgoscheme "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth" + "k8s.io/client-go/tools/cache" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/healthz" @@ -158,13 +161,28 @@ func main() { globalHTTPTimeout := time.Duration(globalHTTPTimeoutMS) * time.Millisecond eventRecorder := mgr.GetEventRecorderFor("keda-operator") + kubeClientset, err := kubernetes.NewForConfig(cfg) + if err != nil { + setupLog.Error(err, "Unable to create kube clientset") + os.Exit(1) + } + objectNamespace, err := kedautil.GetClusterObjectNamespace() + if err != nil { + setupLog.Error(err, "Unable to get cluster object namespace") + os.Exit(1) + } + // the namespaced kubeInformerFactory is used to restrict secret informer to only list/watch secrets in KEDA cluster object namespace, + // refer to https://github.com/kedacore/keda/issues/3668 + kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClientset, 1*time.Hour, kubeinformers.WithNamespace(objectNamespace)) + secretInformer := kubeInformerFactory.Core().V1().Secrets() + scaleClient, kubeVersion, err := k8s.InitScaleClient(mgr) if err != nil { setupLog.Error(err, "unable to init scale client") os.Exit(1) } - scaledHandler := scaling.NewScaleHandler(mgr.GetClient(), scaleClient, mgr.GetScheme(), globalHTTPTimeout, eventRecorder) + scaledHandler := scaling.NewScaleHandler(mgr.GetClient(), scaleClient, mgr.GetScheme(), globalHTTPTimeout, eventRecorder, secretInformer.Lister()) if err = (&kedacontrollers.ScaledObjectReconciler{ Client: mgr.GetClient(), @@ -181,6 +199,8 @@ func main() { Scheme: mgr.GetScheme(), GlobalHTTPTimeout: globalHTTPTimeout, Recorder: eventRecorder, + SecretsLister: secretInformer.Lister(), + SecretsSynced: secretInformer.Informer().HasSynced, }).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: scaledJobMaxReconciles}); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ScaledJob") os.Exit(1) @@ -223,7 +243,15 @@ func main() { setupLog.Info(fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH)) setupLog.Info(fmt.Sprintf("Running on Kubernetes %s", kubeVersion.PrettyVersion), "version", kubeVersion.Version) - if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + ctx := ctrl.SetupSignalHandler() + kubeInformerFactory.Start(ctx.Done()) + + if ok := cache.WaitForCacheSync(ctx.Done(), secretInformer.Informer().HasSynced); !ok { + setupLog.Error(nil, "failed to wait Secrets cache synced") + os.Exit(1) + } + + if err := mgr.Start(ctx); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } diff --git a/pkg/scaling/resolver/azure_keyvault_handler.go b/pkg/scaling/resolver/azure_keyvault_handler.go index a76d387b572..3baf2e53db1 100644 --- a/pkg/scaling/resolver/azure_keyvault_handler.go +++ b/pkg/scaling/resolver/azure_keyvault_handler.go @@ -25,6 +25,7 @@ import ( az "github.com/Azure/go-autorest/autorest/azure" "github.com/Azure/go-autorest/autorest/azure/auth" "github.com/go-logr/logr" + corev1listers "k8s.io/client-go/listers/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" @@ -42,13 +43,13 @@ func NewAzureKeyVaultHandler(v *kedav1alpha1.AzureKeyVault) *AzureKeyVaultHandle } } -func (vh *AzureKeyVaultHandler) Initialize(ctx context.Context, client client.Client, logger logr.Logger, triggerNamespace string) error { +func (vh *AzureKeyVaultHandler) Initialize(ctx context.Context, client client.Client, logger logr.Logger, triggerNamespace string, secretsLister corev1listers.SecretLister) error { keyvaultResourceURL, activeDirectoryEndpoint, err := vh.getPropertiesForCloud() if err != nil { return err } - authConfig, err := vh.getAuthConfig(ctx, client, logger, triggerNamespace, keyvaultResourceURL, activeDirectoryEndpoint) + authConfig, err := vh.getAuthConfig(ctx, client, logger, triggerNamespace, keyvaultResourceURL, activeDirectoryEndpoint, secretsLister) if err != nil { return err } @@ -101,12 +102,11 @@ func (vh *AzureKeyVaultHandler) getPropertiesForCloud() (string, string, error) } func (vh *AzureKeyVaultHandler) getAuthConfig(ctx context.Context, client client.Client, logger logr.Logger, - triggerNamespace, keyVaultResourceURL, activeDirectoryEndpoint string) (auth.AuthorizerConfig, error) { + triggerNamespace, keyVaultResourceURL, activeDirectoryEndpoint string, secretsLister corev1listers.SecretLister) (auth.AuthorizerConfig, error) { podIdentity := vh.vault.PodIdentity if podIdentity == nil { podIdentity = &kedav1alpha1.AuthPodIdentity{} } - switch podIdentity.Provider { case "", kedav1alpha1.PodIdentityProviderNone: clientID := vh.vault.Credentials.ClientID @@ -114,7 +114,7 @@ func (vh *AzureKeyVaultHandler) getAuthConfig(ctx context.Context, client client clientSecretName := vh.vault.Credentials.ClientSecret.ValueFrom.SecretKeyRef.Name clientSecretKey := vh.vault.Credentials.ClientSecret.ValueFrom.SecretKeyRef.Key - clientSecret := resolveAuthSecret(ctx, client, logger, clientSecretName, triggerNamespace, clientSecretKey) + clientSecret := resolveAuthSecret(ctx, client, logger, clientSecretName, triggerNamespace, clientSecretKey, secretsLister) if clientID == "" || tenantID == "" || clientSecret == "" { return nil, fmt.Errorf("clientID, tenantID and clientSecret are expected when not using a pod identity provider") diff --git a/pkg/scaling/resolver/scale_resolvers.go b/pkg/scaling/resolver/scale_resolvers.go index 41fe4bef4f5..35fa6d1a078 100644 --- a/pkg/scaling/resolver/scale_resolvers.go +++ b/pkg/scaling/resolver/scale_resolvers.go @@ -20,18 +20,20 @@ import ( "bytes" "context" "fmt" - "os" + "strings" "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" + corev1listers "k8s.io/client-go/listers/core/v1" "knative.dev/pkg/apis/duck" duckv1 "knative.dev/pkg/apis/duck/v1" "sigs.k8s.io/controller-runtime/pkg/client" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" + "github.com/kedacore/keda/v2/pkg/util" ) const ( @@ -40,6 +42,23 @@ const ( referenceCloser = ')' ) +var ( + kedaNamespace, _ = util.GetClusterObjectNamespace() + restrictSecretAccess = util.GetRestrictSecretAccess() +) + +// isSecretAccessRestricted returns whether secret access need to be restricted in KEDA namespace +func isSecretAccessRestricted(logger logr.Logger) bool { + if restrictSecretAccess == "" { + return false + } + if strings.ToLower(restrictSecretAccess) == "true" { + logger.V(1).Info("Secret Access is restricted to be in Cluster Object Namespace, please use ClusterTriggerAuthentication instead of TriggerAuthentication", "Cluster Object Namespace", kedaNamespace, "Env Var", util.RestrictSecretAccessEnvVar, "Env Value", strings.ToLower(restrictSecretAccess)) + return true + } + return false +} + // ResolveScaleTargetPodSpec for given scalableObject inspects the scale target workload, // which could be almost any k8s resource (Deployment, StatefulSet, CustomResource...) // and for the given resource returns *corev1.PodTemplateSpec and a name of the container @@ -102,7 +121,7 @@ func ResolveScaleTargetPodSpec(ctx context.Context, kubeClient client.Client, lo // ResolveContainerEnv resolves all environment variables in a container. // It returns either map of env variable key and value or error if there is any. -func ResolveContainerEnv(ctx context.Context, client client.Client, logger logr.Logger, podSpec *corev1.PodSpec, containerName, namespace string) (map[string]string, error) { +func ResolveContainerEnv(ctx context.Context, client client.Client, logger logr.Logger, podSpec *corev1.PodSpec, containerName, namespace string, secretsLister corev1listers.SecretLister) (map[string]string, error) { if len(podSpec.Containers) < 1 { return nil, fmt.Errorf("target object doesn't have containers") } @@ -125,15 +144,15 @@ func ResolveContainerEnv(ctx context.Context, client client.Client, logger logr. container = podSpec.Containers[0] } - return resolveEnv(ctx, client, logger, &container, namespace) + return resolveEnv(ctx, client, logger, &container, namespace, secretsLister) } // ResolveAuthRefAndPodIdentity provides authentication parameters and pod identity needed authenticate scaler with the environment. func ResolveAuthRefAndPodIdentity(ctx context.Context, client client.Client, logger logr.Logger, triggerAuthRef *kedav1alpha1.ScaledObjectAuthRef, podTemplateSpec *corev1.PodTemplateSpec, - namespace string) (map[string]string, kedav1alpha1.AuthPodIdentity, error) { + namespace string, secretsLister corev1listers.SecretLister) (map[string]string, kedav1alpha1.AuthPodIdentity, error) { if podTemplateSpec != nil { - authParams, podIdentity := resolveAuthRef(ctx, client, logger, triggerAuthRef, &podTemplateSpec.Spec, namespace) + authParams, podIdentity := resolveAuthRef(ctx, client, logger, triggerAuthRef, &podTemplateSpec.Spec, namespace, secretsLister) if podIdentity.Provider == kedav1alpha1.PodIdentityProviderAwsEKS { serviceAccountName := podTemplateSpec.Spec.ServiceAccountName @@ -150,7 +169,7 @@ func ResolveAuthRefAndPodIdentity(ctx context.Context, client client.Client, log return authParams, podIdentity, nil } - authParams, _ := resolveAuthRef(ctx, client, logger, triggerAuthRef, nil, namespace) + authParams, _ := resolveAuthRef(ctx, client, logger, triggerAuthRef, nil, namespace, secretsLister) return authParams, kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderNone}, nil } @@ -158,7 +177,7 @@ func ResolveAuthRefAndPodIdentity(ctx context.Context, client client.Client, log // based on authentication method defined in TriggerAuthentication, authParams and podIdentity is returned func resolveAuthRef(ctx context.Context, client client.Client, logger logr.Logger, triggerAuthRef *kedav1alpha1.ScaledObjectAuthRef, podSpec *corev1.PodSpec, - namespace string) (map[string]string, kedav1alpha1.AuthPodIdentity) { + namespace string, secretsLister corev1listers.SecretLister) (map[string]string, kedav1alpha1.AuthPodIdentity) { result := make(map[string]string) var podIdentity kedav1alpha1.AuthPodIdentity @@ -176,7 +195,7 @@ func resolveAuthRef(ctx context.Context, client client.Client, logger logr.Logge result[e.Parameter] = "" continue } - env, err := ResolveContainerEnv(ctx, client, logger, podSpec, e.ContainerName, namespace) + env, err := ResolveContainerEnv(ctx, client, logger, podSpec, e.ContainerName, namespace, secretsLister) if err != nil { result[e.Parameter] = "" } else { @@ -186,7 +205,7 @@ func resolveAuthRef(ctx context.Context, client client.Client, logger logr.Logge } if triggerAuthSpec.SecretTargetRef != nil { for _, e := range triggerAuthSpec.SecretTargetRef { - result[e.Parameter] = resolveAuthSecret(ctx, client, logger, e.Name, triggerNamespace, e.Key) + result[e.Parameter] = resolveAuthSecret(ctx, client, logger, e.Name, triggerNamespace, e.Key, secretsLister) } } if triggerAuthSpec.HashiCorpVault != nil && len(triggerAuthSpec.HashiCorpVault.Secrets) > 0 { @@ -216,7 +235,7 @@ func resolveAuthRef(ctx context.Context, client client.Client, logger logr.Logge } if triggerAuthSpec.AzureKeyVault != nil && len(triggerAuthSpec.AzureKeyVault.Secrets) > 0 { vaultHandler := NewAzureKeyVaultHandler(triggerAuthSpec.AzureKeyVault) - err := vaultHandler.Initialize(ctx, client, logger, triggerNamespace) + err := vaultHandler.Initialize(ctx, client, logger, triggerNamespace, secretsLister) if err != nil { logger.Error(err, "Error authenticating to Azure Key Vault", "triggerAuthRef.Name", triggerAuthRef.Name) } else { @@ -237,27 +256,6 @@ func resolveAuthRef(ctx context.Context, client client.Client, logger logr.Logge return result, podIdentity } -var clusterObjectNamespaceCache *string - -func getClusterObjectNamespace() (string, error) { - // Check if a cached value is available. - if clusterObjectNamespaceCache != nil { - return *clusterObjectNamespaceCache, nil - } - env := os.Getenv("KEDA_CLUSTER_OBJECT_NAMESPACE") - if env != "" { - clusterObjectNamespaceCache = &env - return env, nil - } - data, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace") - if err != nil { - return "", err - } - strData := string(data) - clusterObjectNamespaceCache = &strData - return strData, nil -} - func getTriggerAuthSpec(ctx context.Context, client client.Client, triggerAuthRef *kedav1alpha1.ScaledObjectAuthRef, namespace string) (*kedav1alpha1.TriggerAuthenticationSpec, string, error) { if triggerAuthRef.Kind == "" || triggerAuthRef.Kind == "TriggerAuthentication" { triggerAuth := &kedav1alpha1.TriggerAuthentication{} @@ -267,7 +265,7 @@ func getTriggerAuthSpec(ctx context.Context, client client.Client, triggerAuthRe } return &triggerAuth.Spec, namespace, nil } else if triggerAuthRef.Kind == "ClusterTriggerAuthentication" { - clusterNamespace, err := getClusterObjectNamespace() + clusterNamespace, err := util.GetClusterObjectNamespace() if err != nil { return nil, "", err } @@ -281,7 +279,7 @@ func getTriggerAuthSpec(ctx context.Context, client client.Client, triggerAuthRe return nil, "", fmt.Errorf("unknown trigger auth kind %s", triggerAuthRef.Kind) } -func resolveEnv(ctx context.Context, client client.Client, logger logr.Logger, container *corev1.Container, namespace string) (map[string]string, error) { +func resolveEnv(ctx context.Context, client client.Client, logger logr.Logger, container *corev1.Container, namespace string, secretsLister corev1listers.SecretLister) (map[string]string, error) { resolved := make(map[string]string) if container.EnvFrom != nil { @@ -300,7 +298,7 @@ func resolveEnv(ctx context.Context, client client.Client, logger logr.Logger, c return nil, fmt.Errorf("error reading config ref %s on namespace %s/: %s", source.ConfigMapRef, namespace, err) } } else if source.SecretRef != nil { - secretsMap, err := resolveSecretMap(ctx, client, source.SecretRef, namespace) + secretsMap, err := resolveSecretMap(ctx, client, logger, source.SecretRef, namespace, secretsLister) switch { case err == nil: for k, v := range secretsMap { @@ -330,7 +328,7 @@ func resolveEnv(ctx context.Context, client client.Client, logger logr.Logger, c switch { case envVar.ValueFrom.SecretKeyRef != nil: // env is a secret selector - value, err = resolveSecretValue(ctx, client, envVar.ValueFrom.SecretKeyRef, envVar.ValueFrom.SecretKeyRef.Key, namespace) + value, err = resolveSecretValue(ctx, client, logger, envVar.ValueFrom.SecretKeyRef, envVar.ValueFrom.SecretKeyRef.Key, namespace, secretsLister) if err != nil { if envVar.ValueFrom.SecretKeyRef.Optional != nil && *envVar.ValueFrom.SecretKeyRef.Optional { continue @@ -420,9 +418,14 @@ func resolveConfigMap(ctx context.Context, client client.Client, configMapRef *c return configMap.Data, nil } -func resolveSecretMap(ctx context.Context, client client.Client, secretMapRef *corev1.SecretEnvSource, namespace string) (map[string]string, error) { +func resolveSecretMap(ctx context.Context, client client.Client, logger logr.Logger, secretMapRef *corev1.SecretEnvSource, namespace string, secretsLister corev1listers.SecretLister) (map[string]string, error) { secret := &corev1.Secret{} - err := client.Get(ctx, types.NamespacedName{Name: secretMapRef.Name, Namespace: namespace}, secret) + var err error + if isSecretAccessRestricted(logger) { + secret, err = secretsLister.Secrets(kedaNamespace).Get(secretMapRef.Name) + } else { + err = client.Get(ctx, types.NamespacedName{Name: secretMapRef.Name, Namespace: namespace}, secret) + } if err != nil { return nil, err } @@ -434,9 +437,14 @@ func resolveSecretMap(ctx context.Context, client client.Client, secretMapRef *c return secretsStr, nil } -func resolveSecretValue(ctx context.Context, client client.Client, secretKeyRef *corev1.SecretKeySelector, keyName, namespace string) (string, error) { +func resolveSecretValue(ctx context.Context, client client.Client, logger logr.Logger, secretKeyRef *corev1.SecretKeySelector, keyName, namespace string, secretsLister corev1listers.SecretLister) (string, error) { secret := &corev1.Secret{} - err := client.Get(ctx, types.NamespacedName{Name: secretKeyRef.Name, Namespace: namespace}, secret) + var err error + if isSecretAccessRestricted(logger) { + secret, err = secretsLister.Secrets(kedaNamespace).Get(secretKeyRef.Name) + } else { + err = client.Get(ctx, types.NamespacedName{Name: secretKeyRef.Name, Namespace: namespace}, secret) + } if err != nil { return "", err } @@ -452,14 +460,19 @@ func resolveConfigValue(ctx context.Context, client client.Client, configKeyRef return configMap.Data[keyName], nil } -func resolveAuthSecret(ctx context.Context, client client.Client, logger logr.Logger, name, namespace, key string) string { +func resolveAuthSecret(ctx context.Context, client client.Client, logger logr.Logger, name, namespace, key string, secretsLister corev1listers.SecretLister) string { if name == "" || namespace == "" || key == "" { logger.Error(fmt.Errorf("error trying to get secret"), "name, namespace and key are required", "Secret.Namespace", namespace, "Secret.Name", name, "key", key) return "" } secret := &corev1.Secret{} - err := client.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, secret) + var err error + if isSecretAccessRestricted(logger) { + secret, err = secretsLister.Secrets(kedaNamespace).Get(name) + } else { + err = client.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, secret) + } if err != nil { logger.Error(err, "Error trying to get secret from namespace", "Secret.Namespace", namespace, "Secret.Name", name) return "" diff --git a/pkg/scaling/resolver/scale_resolvers_test.go b/pkg/scaling/resolver/scale_resolvers_test.go index 6f5f9d53a77..fdcda134781 100644 --- a/pkg/scaling/resolver/scale_resolvers_test.go +++ b/pkg/scaling/resolver/scale_resolvers_test.go @@ -18,6 +18,7 @@ package resolver import ( "context" + "os" "testing" "github.com/google/go-cmp/cmp" @@ -25,6 +26,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" + corev1listers "k8s.io/client-go/listers/core/v1" "sigs.k8s.io/controller-runtime/pkg/client/fake" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -218,9 +220,10 @@ var testMetadatas = []testMetadata{ } func TestResolveNonExistingConfigMapsOrSecretsEnv(t *testing.T) { + var secretsLister corev1listers.SecretLister for _, testData := range testMetadatas { ctx := context.Background() - _, err := resolveEnv(ctx, fake.NewClientBuilder().Build(), logf.Log.WithName("test"), testData.container, namespace) + _, err := resolveEnv(ctx, fake.NewClientBuilder().Build(), logf.Log.WithName("test"), testData.container, namespace, secretsLister) if err != nil && !testData.isError { t.Errorf("Expected success because %s got error, %s", testData.comment, err) @@ -399,18 +402,20 @@ func TestResolveAuthRef(t *testing.T) { expectedPodIdentity: kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderNone}, }, } + var secretsLister corev1listers.SecretLister for _, test := range tests { test := test t.Run(test.name, func(t *testing.T) { ctx := context.Background() - clusterObjectNamespaceCache = &clusterNamespace // Inject test cluster namespace. + os.Setenv("KEDA_CLUSTER_OBJECT_NAMESPACE", clusterNamespace) // Inject test cluster namespace. gotMap, gotPodIdentity := resolveAuthRef( ctx, fake.NewClientBuilder().WithScheme(scheme.Scheme).WithRuntimeObjects(test.existing...).Build(), logf.Log.WithName("test"), test.soar, test.podSpec, - namespace) + namespace, + secretsLister) if diff := cmp.Diff(gotMap, test.expected); diff != "" { t.Errorf("Returned authParams are different: %s", diff) } @@ -524,11 +529,12 @@ func TestResolveDependentEnv(t *testing.T) { }, }, } + var secretsLister corev1listers.SecretLister for _, test := range tests { test := test t.Run(test.name, func(t *testing.T) { ctx := context.Background() - envMap, _ := resolveEnv(ctx, fake.NewClientBuilder().Build(), logf.Log.WithName("test"), test.container, namespace) + envMap, _ := resolveEnv(ctx, fake.NewClientBuilder().Build(), logf.Log.WithName("test"), test.container, namespace, secretsLister) if diff := cmp.Diff(envMap, test.expected); diff != "" { t.Errorf("Returned authParams are different: %s", diff) } diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 568064ea881..1a8cbfae10c 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -27,6 +27,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/scale" "k8s.io/client-go/tools/record" "k8s.io/metrics/pkg/apis/external_metrics" @@ -64,10 +65,11 @@ type scaleHandler struct { recorder record.EventRecorder scalerCaches map[string]*cache.ScalersCache lock *sync.RWMutex + secretsLister corev1listers.SecretLister } // NewScaleHandler creates a ScaleHandler object -func NewScaleHandler(client client.Client, scaleClient scale.ScalesGetter, reconcilerScheme *runtime.Scheme, globalHTTPTimeout time.Duration, recorder record.EventRecorder) ScaleHandler { +func NewScaleHandler(client client.Client, scaleClient scale.ScalesGetter, reconcilerScheme *runtime.Scheme, globalHTTPTimeout time.Duration, recorder record.EventRecorder, secretsLister corev1listers.SecretLister) ScaleHandler { return &scaleHandler{ client: client, logger: logf.Log.WithName("scalehandler"), @@ -77,6 +79,7 @@ func NewScaleHandler(client client.Client, scaleClient scale.ScalesGetter, recon recorder: recorder, scalerCaches: map[string]*cache.ScalersCache{}, lock: &sync.RWMutex{}, + secretsLister: secretsLister, } } @@ -465,7 +468,7 @@ func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alp factory := func() (scalers.Scaler, *scalers.ScalerConfig, error) { if podTemplateSpec != nil { - resolvedEnv, err = resolver.ResolveContainerEnv(ctx, h.client, logger, &podTemplateSpec.Spec, containerName, withTriggers.Namespace) + resolvedEnv, err = resolver.ResolveContainerEnv(ctx, h.client, logger, &podTemplateSpec.Spec, containerName, withTriggers.Namespace, h.secretsLister) if err != nil { return nil, nil, fmt.Errorf("error resolving secrets for ScaleTarget: %s", err) } @@ -483,7 +486,7 @@ func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alp MetricType: trigger.MetricType, } - config.AuthParams, config.PodIdentity, err = resolver.ResolveAuthRefAndPodIdentity(ctx, h.client, logger, trigger.AuthenticationRef, podTemplateSpec, withTriggers.Namespace) + config.AuthParams, config.PodIdentity, err = resolver.ResolveAuthRefAndPodIdentity(ctx, h.client, logger, trigger.AuthenticationRef, podTemplateSpec, withTriggers.Namespace, h.secretsLister) if err != nil { return nil, nil, err } diff --git a/pkg/util/env_resolver.go b/pkg/util/env_resolver.go index 21e6647ac2e..99b828dfe79 100644 --- a/pkg/util/env_resolver.go +++ b/pkg/util/env_resolver.go @@ -22,6 +22,10 @@ import ( "time" ) +const RestrictSecretAccessEnvVar = "KEDA_RESTRICT_SECRET_ACCESS" + +var clusterObjectNamespaceCache *string + func ResolveOsEnvBool(envName string, defaultValue bool) (bool, error) { valueStr, found := os.LookupEnv(envName) @@ -52,3 +56,28 @@ func ResolveOsEnvDuration(envName string) (*time.Duration, error) { return nil, nil } + +// GetClusterObjectNamespace retrieves the cluster object namespace of KEDA, default is the namespace of KEDA Operator & Metrics Server +func GetClusterObjectNamespace() (string, error) { + // Check if a cached value is available. + if clusterObjectNamespaceCache != nil { + return *clusterObjectNamespaceCache, nil + } + env := os.Getenv("KEDA_CLUSTER_OBJECT_NAMESPACE") + if env != "" { + clusterObjectNamespaceCache = &env + return env, nil + } + data, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace") + if err != nil { + return "", err + } + strData := string(data) + clusterObjectNamespaceCache = &strData + return strData, nil +} + +// GetRestrictSecretAccess retrieves the value of the environment variable of KEDA_RESTRICT_SECRET_ACCESS +func GetRestrictSecretAccess() string { + return os.Getenv(RestrictSecretAccessEnvVar) +}