diff --git a/CHANGELOG.md b/CHANGELOG.md index 63365106deb..b57ddadcde3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,9 +17,9 @@ ## Unreleased ### New +- Can use Pod Identity with Azure Event Hub scaler ([#994](https://github.com/kedacore/keda/issues/994)) ### Improvements - - Support add ScaledJob's label to its job ([#1311](https://github.com/kedacore/keda/issues/1311)) - Bug fix in aws_iam_authorization to utilize correct secret from env key name ([PR #1332](https://github.com/kedacore/keda/pull/1332)) diff --git a/go.mod b/go.mod index ce293d448d2..5b67228a069 100644 --- a/go.mod +++ b/go.mod @@ -4,12 +4,14 @@ go 1.15 require ( cloud.google.com/go v0.65.0 - github.com/Azure/azure-amqp-common-go/v3 v3.0.1 - github.com/Azure/azure-event-hubs-go v1.3.1 - github.com/Azure/azure-sdk-for-go v47.0.0+incompatible + github.com/Azure/azure-amqp-common-go/v3 v3.1.0 + github.com/Azure/azure-event-hubs-go v1.3.1 // indirect + github.com/Azure/azure-event-hubs-go/v3 v3.3.2 + github.com/Azure/azure-sdk-for-go v48.0.0+incompatible github.com/Azure/azure-service-bus-go v0.10.6 github.com/Azure/azure-storage-blob-go v0.10.0 github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd + github.com/Azure/go-autorest/autorest v0.11.10 github.com/Azure/go-autorest/autorest/azure/auth v0.5.3 github.com/Huawei/gophercloud v1.0.21 github.com/Shopify/sarama v1.27.1 @@ -34,6 +36,8 @@ require ( github.com/stretchr/testify v1.6.1 github.com/tidwall/gjson v1.6.1 github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c + golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 // indirect + golang.org/x/net v0.0.0-20201031054903-ff519b6c9102 // indirect google.golang.org/api v0.31.0 google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d google.golang.org/grpc v1.31.1 diff --git a/go.sum b/go.sum index 06b1dc4b172..849597f785b 100644 --- a/go.sum +++ b/go.sum @@ -50,8 +50,12 @@ github.com/Azure/azure-amqp-common-go/v2 v2.1.0 h1:+QbFgmWCnPzdaRMfsI0Yb6GrRdBj5 github.com/Azure/azure-amqp-common-go/v2 v2.1.0/go.mod h1:R8rea+gJRuJR6QxTir/XuEd+YuKoUiazDC/N96FiDEU= github.com/Azure/azure-amqp-common-go/v3 v3.0.1 h1:mXh+eyOxGLBfqDtfmbtby0l7XfG/6b2NkuZ3B7i6zHA= github.com/Azure/azure-amqp-common-go/v3 v3.0.1/go.mod h1:PBIGdzcO1teYoufTKMcGibdKaYZv4avS+O6LNIp8bq0= +github.com/Azure/azure-amqp-common-go/v3 v3.1.0 h1:1N4YSkWYWffOpQHromYdOucBSQXhNRKzqtgICy6To8Q= +github.com/Azure/azure-amqp-common-go/v3 v3.1.0/go.mod h1:PBIGdzcO1teYoufTKMcGibdKaYZv4avS+O6LNIp8bq0= github.com/Azure/azure-event-hubs-go v1.3.1 h1:vKw7tLOFJ8kVMkhNvOXZWz+3purRQ/WTe60+bavZ5qc= github.com/Azure/azure-event-hubs-go v1.3.1/go.mod h1:me2m3+0WC7G7JRBTWI5SQ81s2TYyOqgV3JIpYg86jZA= +github.com/Azure/azure-event-hubs-go/v3 v3.3.2 h1:R3HoM9QiZ2uBcxfMPROBJPSsCXUL31TiV5vQ3dRsRNg= +github.com/Azure/azure-event-hubs-go/v3 v3.3.2/go.mod h1:sszMsQpFy8Au2s2NColbnJY8lRVm1koW0XxBJ3rN5TY= github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg= github.com/Azure/azure-pipeline-go v0.1.9/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg= github.com/Azure/azure-pipeline-go v0.2.1/go.mod h1:UGSo8XybXnIGZ3epmeBw7Jdz+HiUVpqIlpz/HKHylF4= @@ -73,12 +77,15 @@ github.com/Azure/azure-sdk-for-go v46.0.0+incompatible h1:4qlEOCDcDQZTGczYGzbGYC github.com/Azure/azure-sdk-for-go v46.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go v47.0.0+incompatible h1:Hn9OhJUtoLjm27f17/JKw38KBQny0cjpnsBHn7kPpTI= github.com/Azure/azure-sdk-for-go v47.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= +github.com/Azure/azure-sdk-for-go v48.0.0+incompatible h1:adRBpSbkY3IAgqBA83nSDN8yXDsy48zJNPqSwZabDNQ= +github.com/Azure/azure-sdk-for-go v48.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-service-bus-go v0.9.1 h1:G1qBLQvHCFDv9pcpgwgFkspzvnGknJRR0PYJ9ytY/JA= github.com/Azure/azure-service-bus-go v0.9.1/go.mod h1:yzBx6/BUGfjfeqbRZny9AQIbIe3AcV9WZbAdpkoXOa0= github.com/Azure/azure-service-bus-go v0.10.6 h1:xjxJf6rnEoX5yCCoKiXe5VPJAxdXZU1e2zgSkHMBRR8= github.com/Azure/azure-service-bus-go v0.10.6/go.mod h1:1tX7Ap1oTgovUJ9iUtMOjM9I/xcvjyZ0VMWzNF0cBf0= github.com/Azure/azure-storage-blob-go v0.0.0-20181023070848-cf01652132cc/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y= github.com/Azure/azure-storage-blob-go v0.0.0-20190123011202-457680cc0804/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y= +github.com/Azure/azure-storage-blob-go v0.6.0/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y= github.com/Azure/azure-storage-blob-go v0.8.0 h1:53qhf0Oxa0nOjgbDeeYPUeyiNmafAFEY95rZLK0Tj6o= github.com/Azure/azure-storage-blob-go v0.8.0/go.mod h1:lPI3aLPpuLTeUwh1sViKXFxwl2B6teiRqI0deQUvsw0= github.com/Azure/azure-storage-blob-go v0.10.0 h1:evCwGreYo3XLeBV4vSxLbLiYb6e0SzsJiXQVRGsRXxs= @@ -112,6 +119,8 @@ github.com/Azure/go-autorest/autorest v0.11.3 h1:fyYnmYujkIXUgv88D9/Wo2ybE4Zwd/T github.com/Azure/go-autorest/autorest v0.11.3/go.mod h1:JFgpikqFJ/MleTTxwepExTKnFUKKszPS8UavbQYUMuw= github.com/Azure/go-autorest/autorest v0.11.9 h1:P0ZF0dEYoUPUVDQo3mA1CvH5b8mKev7DDcmTwauuNME= github.com/Azure/go-autorest/autorest v0.11.9/go.mod h1:eipySxLmqSyC5s5k1CLupqet0PSENBEDP93LQ9a8QYw= +github.com/Azure/go-autorest/autorest v0.11.10 h1:j5sGbX7uj1ieYYkQ3Mpvewd4DCsEQ+ZeJpqnSM9pjnM= +github.com/Azure/go-autorest/autorest v0.11.10/go.mod h1:eipySxLmqSyC5s5k1CLupqet0PSENBEDP93LQ9a8QYw= github.com/Azure/go-autorest/autorest/adal v0.1.0/go.mod h1:MeS4XhScH55IST095THyTxElntu7WqB7pNbZo8Q5G3E= github.com/Azure/go-autorest/autorest/adal v0.5.0/go.mod h1:8Z9fGy2MpX0PvDjB1pEgQTmVqjGhiHBW7RJJEciWzS0= github.com/Azure/go-autorest/autorest/adal v0.8.0/go.mod h1:Z6vX6WXXuyieHAXwMj0S6HY6e6wcHn37qQMBQlvY3lc= @@ -1499,6 +1508,8 @@ golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rB golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E= +golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1586,6 +1597,8 @@ golang.org/x/net v0.0.0-20200904194848-62affa334b73 h1:MXfv8rhZWmFeqX3GNZRsd6vOL golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0 h1:wBouT66WTYFXdxfVdz9sVWARVd/2vfGcmI45D2gj45M= golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201031054903-ff519b6c9102 h1:42cLlJJdEh+ySyeUUbEQ5bsTiq8voBeTuweGVkY6Puw= +golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/oauth2 v0.0.0-20180724155351-3d292e4d0cdc/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1836,6 +1849,7 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogR gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b h1:QRR6H1YWRnHb4Y/HeNFCTJLFVxaq6wH4YuVdsUOr75U= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= diff --git a/pkg/scalers/azure/azure_eventhub.go b/pkg/scalers/azure/azure_eventhub.go index 03edd66900f..872a0b1c53c 100644 --- a/pkg/scalers/azure/azure_eventhub.go +++ b/pkg/scalers/azure/azure_eventhub.go @@ -11,9 +11,10 @@ import ( "github.com/imdario/mergo" - eventhub "github.com/Azure/azure-event-hubs-go" + "github.com/Azure/azure-amqp-common-go/v3/aad" + eventhub "github.com/Azure/azure-event-hubs-go/v3" "github.com/Azure/azure-storage-blob-go/azblob" - + "github.com/Azure/go-autorest/autorest/azure" kedav1alpha1 "github.com/kedacore/keda/api/v1alpha1" ) @@ -46,16 +47,35 @@ type EventHubInfo struct { EventHubConsumerGroup string StorageConnection string BlobContainer string + Namespace string + EventHubName string } // GetEventHubClient returns eventhub client func GetEventHubClient(info EventHubInfo) (*eventhub.Hub, error) { - hub, err := eventhub.NewHubFromConnectionString(info.EventHubConnection) - if err != nil { - return nil, fmt.Errorf("failed to create hub client: %s", err) + // The user wants to use a connectionstring, not a pod identity + if info.EventHubConnection != "" { + hub, err := eventhub.NewHubFromConnectionString(info.EventHubConnection) + if err != nil { + return nil, fmt.Errorf("failed to create hub client: %s", err) + } + return hub, nil } - return hub, nil + // Since there is no connectionstring, then user wants to use pod identity + // Internally, the JWTProvider will use Managed Service Identity to authenticate if no Service Principal info supplied + provider, aadErr := aad.NewJWTProvider(func(config *aad.TokenProviderConfiguration) error { + if config.Env == nil { + config.Env = &azure.PublicCloud + } + return nil + }) + + if aadErr == nil { + return eventhub.NewHub(info.Namespace, info.EventHubName, provider) + } + + return nil, aadErr } // GetCheckpointFromBlobStorage accesses Blob storage and gets checkpoint information of a partition @@ -65,9 +85,16 @@ func GetCheckpointFromBlobStorage(ctx context.Context, info EventHubInfo, partit return Checkpoint{}, err } - eventHubNamespace, eventHubName, err := ParseAzureEventHubConnectionString(info.EventHubConnection) - if err != nil { - return Checkpoint{}, err + var eventHubNamespace string + var eventHubName string + if info.EventHubConnection != "" { + eventHubNamespace, eventHubName, err = ParseAzureEventHubConnectionString(info.EventHubConnection) + if err != nil { + return Checkpoint{}, err + } + } else { + eventHubNamespace = info.Namespace + eventHubName = info.EventHubName } // TODO: add more ways to read from different types of storage and read checkpoints/leases written in different JSON formats diff --git a/pkg/scalers/azure_eventhub_scaler.go b/pkg/scalers/azure_eventhub_scaler.go index 1dc50eed33f..aaabb6ec2a3 100644 --- a/pkg/scalers/azure_eventhub_scaler.go +++ b/pkg/scalers/azure_eventhub_scaler.go @@ -7,9 +7,10 @@ import ( "math" "strconv" + "github.com/kedacore/keda/api/v1alpha1" "github.com/kedacore/keda/pkg/scalers/azure" - eventhub "github.com/Azure/azure-event-hubs-go" + eventhub "github.com/Azure/azure-event-hubs-go/v3" "github.com/Azure/azure-storage-blob-go/azblob" "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" @@ -85,16 +86,6 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error) return nil, fmt.Errorf("no storage connection string given") } - if config.AuthParams["connection"] != "" { - meta.eventHubInfo.EventHubConnection = config.AuthParams["connection"] - } else if config.TriggerMetadata["connectionFromEnv"] != "" { - meta.eventHubInfo.EventHubConnection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]] - } - - if len(meta.eventHubInfo.EventHubConnection) == 0 { - return nil, fmt.Errorf("no event hub connection string given") - } - meta.eventHubInfo.EventHubConsumerGroup = defaultEventHubConsumerGroup if val, ok := config.TriggerMetadata["consumerGroup"]; ok { meta.eventHubInfo.EventHubConsumerGroup = val @@ -105,6 +96,38 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error) meta.eventHubInfo.BlobContainer = val } + if config.PodIdentity == "" || config.PodIdentity == v1alpha1.PodIdentityProviderNone { + if config.AuthParams["connection"] != "" { + meta.eventHubInfo.EventHubConnection = config.AuthParams["connection"] + } else if config.TriggerMetadata["connectionFromEnv"] != "" { + meta.eventHubInfo.EventHubConnection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]] + } + + if len(meta.eventHubInfo.EventHubConnection) == 0 { + return nil, fmt.Errorf("no event hub connection string given") + } + } else { + if config.TriggerMetadata["eventHubNamespace"] != "" { + meta.eventHubInfo.Namespace = config.TriggerMetadata["eventHubNamespace"] + } else if config.TriggerMetadata["eventHubNamespaceFromEnv"] != "" { + meta.eventHubInfo.Namespace = config.ResolvedEnv[config.TriggerMetadata["eventHubNamespaceFromEnv"]] + } + + if len(meta.eventHubInfo.Namespace) == 0 { + return nil, fmt.Errorf("no event hub namespace string given") + } + + if config.TriggerMetadata["eventHubName"] != "" { + meta.eventHubInfo.EventHubName = config.TriggerMetadata["eventHubName"] + } else if config.TriggerMetadata["eventHubNameFromEnv"] != "" { + meta.eventHubInfo.EventHubName = config.ResolvedEnv[config.TriggerMetadata["eventHubNameFromEnv"]] + } + + if len(meta.eventHubInfo.EventHubName) == 0 { + return nil, fmt.Errorf("no event hub name string given") + } + } + return &meta, nil } diff --git a/pkg/scalers/azure_eventhub_scaler_test.go b/pkg/scalers/azure_eventhub_scaler_test.go index f760aae04f0..cd2dfa3761e 100644 --- a/pkg/scalers/azure_eventhub_scaler_test.go +++ b/pkg/scalers/azure_eventhub_scaler_test.go @@ -9,7 +9,7 @@ import ( "github.com/kedacore/keda/pkg/scalers/azure" - eventhub "github.com/Azure/azure-event-hubs-go" + eventhub "github.com/Azure/azure-event-hubs-go/v3" "github.com/Azure/azure-storage-blob-go/azblob" ) @@ -51,6 +51,18 @@ var parseEventHubMetadataDataset = []parseEventHubMetadataTestData{ {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "blobContainer": testContainerName}, false}, } +var parseEventHubMetadataDatasetWithPodIdentity = []parseEventHubMetadataTestData{ + {map[string]string{}, true}, + // Even though connection string is provided, this should fail because the eventhub Namespace is not provided explicitly when using Pod Identity + {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, true}, + // properly formed event hub metadata with Pod Identity + {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, false}, + // missing eventHubname + {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubNamespace": testEventHubNamespace}, true}, + // missing eventHubNamespace + {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName}, true}, +} + var eventHubMetricIdentifiers = []eventHubMetricIdentifier{ {&parseEventHubMetadataDataset[1], "azure-eventhub-none-testEventHubConsumerGroup"}, } @@ -76,6 +88,17 @@ func TestParseEventHubMetadata(t *testing.T) { t.Error("Expected error and got success") } } + + for _, testData := range parseEventHubMetadataDatasetWithPodIdentity { + _, err := parseAzureEventHubMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv, AuthParams: map[string]string{}, PodIdentity: "Azure"}) + + if err != nil && !testData.isError { + t.Errorf("Expected success but got error: %s", err) + } + if testData.isError && err == nil { + t.Error("Expected error and got success") + } + } } func TestGetUnprocessedEventCountInPartition(t *testing.T) {