Skip to content

Commit

Permalink
Support Restrict Secret Access (#3677)
Browse files Browse the repository at this point in the history

Signed-off-by: kevin <[email protected]>
  • Loading branch information
kevinteng525 authored Dec 8, 2022
1 parent 98d40ab commit f21a7db
Show file tree
Hide file tree
Showing 11 changed files with 167 additions and 62 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General:** Add support to use pod identities for authentication in Azure Key Vault ([#3813](https://github.com/kedacore/keda/issues/3813)
- **General:** Support disable keep http connection alive([#3874](https://github.com/kedacore/keda/issues/3874)
- **General:** Improve the function used to normalize metric names ([#3789](https://github.com/kedacore/keda/issues/3789)
- **General:** Support Restrict Secret Access to mitigate the security risk ([#3668](https://github.com/kedacore/keda/issues/3668)
- **Apache Kafka Scaler:** SASL/OAuthbearer Implementation ([#3681](https://github.com/kedacore/keda/issues/3681))
- **Apache Kafka Scaler:** Limit Kafka Partitions KEDA operates on ([#3830](https://github.com/kedacore/keda/issues/3830))
- **Apache Kafka Scaler:** Implementation for Excluding Persistent Lag ([#3904](https://github.com/kedacore/keda/issues/3904))
Expand Down
30 changes: 27 additions & 3 deletions adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ import (
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
Expand Down Expand Up @@ -131,7 +134,25 @@ func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Durat

broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme, corev1.EventSource{Component: "keda-metrics-adapter"})
handler := scaling.NewScaleHandler(mgr.GetClient(), nil, scheme, globalHTTPTimeout, recorder)

kubeClientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
logger.Error(err, "Unable to create kube clientset")
return nil, nil, err
}
objectNamespace, err := kedautil.GetClusterObjectNamespace()
if err != nil {
logger.Error(err, "Unable to get cluster object namespace")
return nil, nil, err
}
// the namespaced kubeInformerFactory is used to restrict secret informer to only list/watch secrets in KEDA cluster object namespace,
// refer to https://github.com/kedacore/keda/issues/3668
kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClientset, 1*time.Hour, kubeinformers.WithNamespace(objectNamespace))
secretInformer := kubeInformerFactory.Core().V1().Secrets()

handler := scaling.NewScaleHandler(mgr.GetClient(), nil, scheme, globalHTTPTimeout, recorder, secretInformer.Lister())
kubeInformerFactory.Start(ctx.Done())

externalMetricsInfo := &[]provider.ExternalMetricInfo{}
externalMetricsInfoLock := &sync.RWMutex{}

Expand All @@ -146,13 +167,13 @@ func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Durat
}

stopCh := make(chan struct{})
if err := runScaledObjectController(ctx, mgr, handler, logger, externalMetricsInfo, externalMetricsInfoLock, maxConcurrentReconciles, stopCh); err != nil {
if err := runScaledObjectController(ctx, mgr, handler, logger, externalMetricsInfo, externalMetricsInfoLock, maxConcurrentReconciles, stopCh, secretInformer.Informer().HasSynced); err != nil {
return nil, nil, err
}
return kedaprovider.NewProvider(ctx, logger, handler, mgr.GetClient(), *grpcClient, useMetricsServiceGrpc, namespace, externalMetricsInfo, externalMetricsInfoLock), stopCh, nil
}

func runScaledObjectController(ctx context.Context, mgr manager.Manager, scaleHandler scaling.ScaleHandler, logger logr.Logger, externalMetricsInfo *[]provider.ExternalMetricInfo, externalMetricsInfoLock *sync.RWMutex, maxConcurrentReconciles int, stopCh chan<- struct{}) error {
func runScaledObjectController(ctx context.Context, mgr manager.Manager, scaleHandler scaling.ScaleHandler, logger logr.Logger, externalMetricsInfo *[]provider.ExternalMetricInfo, externalMetricsInfoLock *sync.RWMutex, maxConcurrentReconciles int, stopCh chan<- struct{}, secretSynced cache.InformerSynced) error {
if err := (&kedacontrollers.MetricsScaledObjectReconciler{
Client: mgr.GetClient(),
ScaleHandler: scaleHandler,
Expand All @@ -170,6 +191,9 @@ func runScaledObjectController(ctx context.Context, mgr manager.Manager, scaleHa
}
}()

if ok := cache.WaitForCacheSync(ctx.Done(), secretSynced); !ok {
return fmt.Errorf("failed to wait Secrets cache synced")
}
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -56,6 +57,8 @@ type ScaledJobReconciler struct {

scaledJobGenerations *sync.Map
scaleHandler scaling.ScaleHandler
SecretsLister corev1listers.SecretLister
SecretsSynced cache.InformerSynced
}

type scaledJobMetricsData struct {
Expand All @@ -75,9 +78,8 @@ func init() {

// SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"))
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"), r.SecretsLister)
r.scaledJobGenerations = &sync.Map{}

return ctrl.NewControllerManagedBy(mgr).
WithOptions(options).
// Ignore updates to ScaledJob Status (in this case metadata.Generation does not change)
Expand Down
1 change: 0 additions & 1 deletion controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, options cont
if r.Recorder == nil {
return fmt.Errorf("ScaledObjectReconciler.Recorder is not initialized")
}

// Start controller
return ctrl.NewControllerManagedBy(mgr).
WithOptions(options).
Expand Down
2 changes: 1 addition & 1 deletion controllers/keda/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ var _ = BeforeSuite(func(done Done) {
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Recorder: k8sManager.GetEventRecorderFor("keda-operator"),
ScaleHandler: scaling.NewScaleHandler(k8sManager.GetClient(), scaleClient, k8sManager.GetScheme(), time.Duration(10), k8sManager.GetEventRecorderFor("keda-operator")),
ScaleHandler: scaling.NewScaleHandler(k8sManager.GetClient(), scaleClient, k8sManager.GetScheme(), time.Duration(10), k8sManager.GetEventRecorderFor("keda-operator"), nil),
ScaleClient: scaleClient,
}).SetupWithManager(k8sManager, controller.Options{})
Expect(err).ToNot(HaveOccurred())
Expand Down
32 changes: 30 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ import (
"github.com/spf13/pflag"
apimachineryruntime "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/tools/cache"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/healthz"
Expand Down Expand Up @@ -158,13 +161,28 @@ func main() {
globalHTTPTimeout := time.Duration(globalHTTPTimeoutMS) * time.Millisecond
eventRecorder := mgr.GetEventRecorderFor("keda-operator")

kubeClientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
setupLog.Error(err, "Unable to create kube clientset")
os.Exit(1)
}
objectNamespace, err := kedautil.GetClusterObjectNamespace()
if err != nil {
setupLog.Error(err, "Unable to get cluster object namespace")
os.Exit(1)
}
// the namespaced kubeInformerFactory is used to restrict secret informer to only list/watch secrets in KEDA cluster object namespace,
// refer to https://github.com/kedacore/keda/issues/3668
kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClientset, 1*time.Hour, kubeinformers.WithNamespace(objectNamespace))
secretInformer := kubeInformerFactory.Core().V1().Secrets()

scaleClient, kubeVersion, err := k8s.InitScaleClient(mgr)
if err != nil {
setupLog.Error(err, "unable to init scale client")
os.Exit(1)
}

scaledHandler := scaling.NewScaleHandler(mgr.GetClient(), scaleClient, mgr.GetScheme(), globalHTTPTimeout, eventRecorder)
scaledHandler := scaling.NewScaleHandler(mgr.GetClient(), scaleClient, mgr.GetScheme(), globalHTTPTimeout, eventRecorder, secretInformer.Lister())

if err = (&kedacontrollers.ScaledObjectReconciler{
Client: mgr.GetClient(),
Expand All @@ -181,6 +199,8 @@ func main() {
Scheme: mgr.GetScheme(),
GlobalHTTPTimeout: globalHTTPTimeout,
Recorder: eventRecorder,
SecretsLister: secretInformer.Lister(),
SecretsSynced: secretInformer.Informer().HasSynced,
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: scaledJobMaxReconciles}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ScaledJob")
os.Exit(1)
Expand Down Expand Up @@ -223,7 +243,15 @@ func main() {
setupLog.Info(fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH))
setupLog.Info(fmt.Sprintf("Running on Kubernetes %s", kubeVersion.PrettyVersion), "version", kubeVersion.Version)

if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
ctx := ctrl.SetupSignalHandler()
kubeInformerFactory.Start(ctx.Done())

if ok := cache.WaitForCacheSync(ctx.Done(), secretInformer.Informer().HasSynced); !ok {
setupLog.Error(nil, "failed to wait Secrets cache synced")
os.Exit(1)
}

if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/scaling/resolver/azure_keyvault_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
az "github.com/Azure/go-autorest/autorest/azure"
"github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/go-logr/logr"
corev1listers "k8s.io/client-go/listers/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
Expand All @@ -42,13 +43,13 @@ func NewAzureKeyVaultHandler(v *kedav1alpha1.AzureKeyVault) *AzureKeyVaultHandle
}
}

func (vh *AzureKeyVaultHandler) Initialize(ctx context.Context, client client.Client, logger logr.Logger, triggerNamespace string) error {
func (vh *AzureKeyVaultHandler) Initialize(ctx context.Context, client client.Client, logger logr.Logger, triggerNamespace string, secretsLister corev1listers.SecretLister) error {
keyvaultResourceURL, activeDirectoryEndpoint, err := vh.getPropertiesForCloud()
if err != nil {
return err
}

authConfig, err := vh.getAuthConfig(ctx, client, logger, triggerNamespace, keyvaultResourceURL, activeDirectoryEndpoint)
authConfig, err := vh.getAuthConfig(ctx, client, logger, triggerNamespace, keyvaultResourceURL, activeDirectoryEndpoint, secretsLister)
if err != nil {
return err
}
Expand Down Expand Up @@ -101,20 +102,19 @@ func (vh *AzureKeyVaultHandler) getPropertiesForCloud() (string, string, error)
}

func (vh *AzureKeyVaultHandler) getAuthConfig(ctx context.Context, client client.Client, logger logr.Logger,
triggerNamespace, keyVaultResourceURL, activeDirectoryEndpoint string) (auth.AuthorizerConfig, error) {
triggerNamespace, keyVaultResourceURL, activeDirectoryEndpoint string, secretsLister corev1listers.SecretLister) (auth.AuthorizerConfig, error) {
podIdentity := vh.vault.PodIdentity
if podIdentity == nil {
podIdentity = &kedav1alpha1.AuthPodIdentity{}
}

switch podIdentity.Provider {
case "", kedav1alpha1.PodIdentityProviderNone:
clientID := vh.vault.Credentials.ClientID
tenantID := vh.vault.Credentials.TenantID

clientSecretName := vh.vault.Credentials.ClientSecret.ValueFrom.SecretKeyRef.Name
clientSecretKey := vh.vault.Credentials.ClientSecret.ValueFrom.SecretKeyRef.Key
clientSecret := resolveAuthSecret(ctx, client, logger, clientSecretName, triggerNamespace, clientSecretKey)
clientSecret := resolveAuthSecret(ctx, client, logger, clientSecretName, triggerNamespace, clientSecretKey, secretsLister)

if clientID == "" || tenantID == "" || clientSecret == "" {
return nil, fmt.Errorf("clientID, tenantID and clientSecret are expected when not using a pod identity provider")
Expand Down
Loading

0 comments on commit f21a7db

Please sign in to comment.