Skip to content

Commit

Permalink
Eventhub podidentity (#1305)
Browse files Browse the repository at this point in the history
Signed-off-by: Emad Alashi <[email protected]>
  • Loading branch information
eashi authored Nov 16, 2020
1 parent f7fab40 commit d217b93
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 25 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
## Unreleased

### New
- Can use Pod Identity with Azure Event Hub scaler ([#994](https://github.com/kedacore/keda/issues/994))

### Improvements

- Support add ScaledJob's label to its job ([#1311](https://github.com/kedacore/keda/issues/1311))
- Bug fix in aws_iam_authorization to utilize correct secret from env key name ([PR #1332](https://github.com/kedacore/keda/pull/1332))

Expand Down
10 changes: 7 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ go 1.15

require (
cloud.google.com/go v0.65.0
github.com/Azure/azure-amqp-common-go/v3 v3.0.1
github.com/Azure/azure-event-hubs-go v1.3.1
github.com/Azure/azure-sdk-for-go v47.0.0+incompatible
github.com/Azure/azure-amqp-common-go/v3 v3.1.0
github.com/Azure/azure-event-hubs-go v1.3.1 // indirect
github.com/Azure/azure-event-hubs-go/v3 v3.3.2
github.com/Azure/azure-sdk-for-go v48.0.0+incompatible
github.com/Azure/azure-service-bus-go v0.10.6
github.com/Azure/azure-storage-blob-go v0.10.0
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd
github.com/Azure/go-autorest/autorest v0.11.10
github.com/Azure/go-autorest/autorest/azure/auth v0.5.3
github.com/Huawei/gophercloud v1.0.21
github.com/Shopify/sarama v1.27.1
Expand All @@ -34,6 +36,8 @@ require (
github.com/stretchr/testify v1.6.1
github.com/tidwall/gjson v1.6.1
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 // indirect
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102 // indirect
google.golang.org/api v0.31.0
google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d
google.golang.org/grpc v1.31.1
Expand Down
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,12 @@ github.com/Azure/azure-amqp-common-go/v2 v2.1.0 h1:+QbFgmWCnPzdaRMfsI0Yb6GrRdBj5
github.com/Azure/azure-amqp-common-go/v2 v2.1.0/go.mod h1:R8rea+gJRuJR6QxTir/XuEd+YuKoUiazDC/N96FiDEU=
github.com/Azure/azure-amqp-common-go/v3 v3.0.1 h1:mXh+eyOxGLBfqDtfmbtby0l7XfG/6b2NkuZ3B7i6zHA=
github.com/Azure/azure-amqp-common-go/v3 v3.0.1/go.mod h1:PBIGdzcO1teYoufTKMcGibdKaYZv4avS+O6LNIp8bq0=
github.com/Azure/azure-amqp-common-go/v3 v3.1.0 h1:1N4YSkWYWffOpQHromYdOucBSQXhNRKzqtgICy6To8Q=
github.com/Azure/azure-amqp-common-go/v3 v3.1.0/go.mod h1:PBIGdzcO1teYoufTKMcGibdKaYZv4avS+O6LNIp8bq0=
github.com/Azure/azure-event-hubs-go v1.3.1 h1:vKw7tLOFJ8kVMkhNvOXZWz+3purRQ/WTe60+bavZ5qc=
github.com/Azure/azure-event-hubs-go v1.3.1/go.mod h1:me2m3+0WC7G7JRBTWI5SQ81s2TYyOqgV3JIpYg86jZA=
github.com/Azure/azure-event-hubs-go/v3 v3.3.2 h1:R3HoM9QiZ2uBcxfMPROBJPSsCXUL31TiV5vQ3dRsRNg=
github.com/Azure/azure-event-hubs-go/v3 v3.3.2/go.mod h1:sszMsQpFy8Au2s2NColbnJY8lRVm1koW0XxBJ3rN5TY=
github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
github.com/Azure/azure-pipeline-go v0.1.9/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
github.com/Azure/azure-pipeline-go v0.2.1/go.mod h1:UGSo8XybXnIGZ3epmeBw7Jdz+HiUVpqIlpz/HKHylF4=
Expand All @@ -73,12 +77,15 @@ github.com/Azure/azure-sdk-for-go v46.0.0+incompatible h1:4qlEOCDcDQZTGczYGzbGYC
github.com/Azure/azure-sdk-for-go v46.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go v47.0.0+incompatible h1:Hn9OhJUtoLjm27f17/JKw38KBQny0cjpnsBHn7kPpTI=
github.com/Azure/azure-sdk-for-go v47.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go v48.0.0+incompatible h1:adRBpSbkY3IAgqBA83nSDN8yXDsy48zJNPqSwZabDNQ=
github.com/Azure/azure-sdk-for-go v48.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-service-bus-go v0.9.1 h1:G1qBLQvHCFDv9pcpgwgFkspzvnGknJRR0PYJ9ytY/JA=
github.com/Azure/azure-service-bus-go v0.9.1/go.mod h1:yzBx6/BUGfjfeqbRZny9AQIbIe3AcV9WZbAdpkoXOa0=
github.com/Azure/azure-service-bus-go v0.10.6 h1:xjxJf6rnEoX5yCCoKiXe5VPJAxdXZU1e2zgSkHMBRR8=
github.com/Azure/azure-service-bus-go v0.10.6/go.mod h1:1tX7Ap1oTgovUJ9iUtMOjM9I/xcvjyZ0VMWzNF0cBf0=
github.com/Azure/azure-storage-blob-go v0.0.0-20181023070848-cf01652132cc/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y=
github.com/Azure/azure-storage-blob-go v0.0.0-20190123011202-457680cc0804/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y=
github.com/Azure/azure-storage-blob-go v0.6.0/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y=
github.com/Azure/azure-storage-blob-go v0.8.0 h1:53qhf0Oxa0nOjgbDeeYPUeyiNmafAFEY95rZLK0Tj6o=
github.com/Azure/azure-storage-blob-go v0.8.0/go.mod h1:lPI3aLPpuLTeUwh1sViKXFxwl2B6teiRqI0deQUvsw0=
github.com/Azure/azure-storage-blob-go v0.10.0 h1:evCwGreYo3XLeBV4vSxLbLiYb6e0SzsJiXQVRGsRXxs=
Expand Down Expand Up @@ -112,6 +119,8 @@ github.com/Azure/go-autorest/autorest v0.11.3 h1:fyYnmYujkIXUgv88D9/Wo2ybE4Zwd/T
github.com/Azure/go-autorest/autorest v0.11.3/go.mod h1:JFgpikqFJ/MleTTxwepExTKnFUKKszPS8UavbQYUMuw=
github.com/Azure/go-autorest/autorest v0.11.9 h1:P0ZF0dEYoUPUVDQo3mA1CvH5b8mKev7DDcmTwauuNME=
github.com/Azure/go-autorest/autorest v0.11.9/go.mod h1:eipySxLmqSyC5s5k1CLupqet0PSENBEDP93LQ9a8QYw=
github.com/Azure/go-autorest/autorest v0.11.10 h1:j5sGbX7uj1ieYYkQ3Mpvewd4DCsEQ+ZeJpqnSM9pjnM=
github.com/Azure/go-autorest/autorest v0.11.10/go.mod h1:eipySxLmqSyC5s5k1CLupqet0PSENBEDP93LQ9a8QYw=
github.com/Azure/go-autorest/autorest/adal v0.1.0/go.mod h1:MeS4XhScH55IST095THyTxElntu7WqB7pNbZo8Q5G3E=
github.com/Azure/go-autorest/autorest/adal v0.5.0/go.mod h1:8Z9fGy2MpX0PvDjB1pEgQTmVqjGhiHBW7RJJEciWzS0=
github.com/Azure/go-autorest/autorest/adal v0.8.0/go.mod h1:Z6vX6WXXuyieHAXwMj0S6HY6e6wcHn37qQMBQlvY3lc=
Expand Down Expand Up @@ -1499,6 +1508,8 @@ golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rB
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down Expand Up @@ -1586,6 +1597,8 @@ golang.org/x/net v0.0.0-20200904194848-62affa334b73 h1:MXfv8rhZWmFeqX3GNZRsd6vOL
golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0 h1:wBouT66WTYFXdxfVdz9sVWARVd/2vfGcmI45D2gj45M=
golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102 h1:42cLlJJdEh+ySyeUUbEQ5bsTiq8voBeTuweGVkY6Puw=
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/oauth2 v0.0.0-20180724155351-3d292e4d0cdc/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand Down Expand Up @@ -1836,6 +1849,7 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogR
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b h1:QRR6H1YWRnHb4Y/HeNFCTJLFVxaq6wH4YuVdsUOr75U=
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
Expand Down
45 changes: 36 additions & 9 deletions pkg/scalers/azure/azure_eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import (

"github.com/imdario/mergo"

eventhub "github.com/Azure/azure-event-hubs-go"
"github.com/Azure/azure-amqp-common-go/v3/aad"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-storage-blob-go/azblob"

"github.com/Azure/go-autorest/autorest/azure"
kedav1alpha1 "github.com/kedacore/keda/api/v1alpha1"
)

Expand Down Expand Up @@ -46,16 +47,35 @@ type EventHubInfo struct {
EventHubConsumerGroup string
StorageConnection string
BlobContainer string
Namespace string
EventHubName string
}

// GetEventHubClient returns eventhub client
func GetEventHubClient(info EventHubInfo) (*eventhub.Hub, error) {
hub, err := eventhub.NewHubFromConnectionString(info.EventHubConnection)
if err != nil {
return nil, fmt.Errorf("failed to create hub client: %s", err)
// The user wants to use a connectionstring, not a pod identity
if info.EventHubConnection != "" {
hub, err := eventhub.NewHubFromConnectionString(info.EventHubConnection)
if err != nil {
return nil, fmt.Errorf("failed to create hub client: %s", err)
}
return hub, nil
}

return hub, nil
// Since there is no connectionstring, then user wants to use pod identity
// Internally, the JWTProvider will use Managed Service Identity to authenticate if no Service Principal info supplied
provider, aadErr := aad.NewJWTProvider(func(config *aad.TokenProviderConfiguration) error {
if config.Env == nil {
config.Env = &azure.PublicCloud
}
return nil
})

if aadErr == nil {
return eventhub.NewHub(info.Namespace, info.EventHubName, provider)
}

return nil, aadErr
}

// GetCheckpointFromBlobStorage accesses Blob storage and gets checkpoint information of a partition
Expand All @@ -65,9 +85,16 @@ func GetCheckpointFromBlobStorage(ctx context.Context, info EventHubInfo, partit
return Checkpoint{}, err
}

eventHubNamespace, eventHubName, err := ParseAzureEventHubConnectionString(info.EventHubConnection)
if err != nil {
return Checkpoint{}, err
var eventHubNamespace string
var eventHubName string
if info.EventHubConnection != "" {
eventHubNamespace, eventHubName, err = ParseAzureEventHubConnectionString(info.EventHubConnection)
if err != nil {
return Checkpoint{}, err
}
} else {
eventHubNamespace = info.Namespace
eventHubName = info.EventHubName
}

// TODO: add more ways to read from different types of storage and read checkpoints/leases written in different JSON formats
Expand Down
45 changes: 34 additions & 11 deletions pkg/scalers/azure_eventhub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
"math"
"strconv"

"github.com/kedacore/keda/api/v1alpha1"
"github.com/kedacore/keda/pkg/scalers/azure"

eventhub "github.com/Azure/azure-event-hubs-go"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-storage-blob-go/azblob"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -85,16 +86,6 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error)
return nil, fmt.Errorf("no storage connection string given")
}

if config.AuthParams["connection"] != "" {
meta.eventHubInfo.EventHubConnection = config.AuthParams["connection"]
} else if config.TriggerMetadata["connectionFromEnv"] != "" {
meta.eventHubInfo.EventHubConnection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]]
}

if len(meta.eventHubInfo.EventHubConnection) == 0 {
return nil, fmt.Errorf("no event hub connection string given")
}

meta.eventHubInfo.EventHubConsumerGroup = defaultEventHubConsumerGroup
if val, ok := config.TriggerMetadata["consumerGroup"]; ok {
meta.eventHubInfo.EventHubConsumerGroup = val
Expand All @@ -105,6 +96,38 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error)
meta.eventHubInfo.BlobContainer = val
}

if config.PodIdentity == "" || config.PodIdentity == v1alpha1.PodIdentityProviderNone {
if config.AuthParams["connection"] != "" {
meta.eventHubInfo.EventHubConnection = config.AuthParams["connection"]
} else if config.TriggerMetadata["connectionFromEnv"] != "" {
meta.eventHubInfo.EventHubConnection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]]
}

if len(meta.eventHubInfo.EventHubConnection) == 0 {
return nil, fmt.Errorf("no event hub connection string given")
}
} else {
if config.TriggerMetadata["eventHubNamespace"] != "" {
meta.eventHubInfo.Namespace = config.TriggerMetadata["eventHubNamespace"]
} else if config.TriggerMetadata["eventHubNamespaceFromEnv"] != "" {
meta.eventHubInfo.Namespace = config.ResolvedEnv[config.TriggerMetadata["eventHubNamespaceFromEnv"]]
}

if len(meta.eventHubInfo.Namespace) == 0 {
return nil, fmt.Errorf("no event hub namespace string given")
}

if config.TriggerMetadata["eventHubName"] != "" {
meta.eventHubInfo.EventHubName = config.TriggerMetadata["eventHubName"]
} else if config.TriggerMetadata["eventHubNameFromEnv"] != "" {
meta.eventHubInfo.EventHubName = config.ResolvedEnv[config.TriggerMetadata["eventHubNameFromEnv"]]
}

if len(meta.eventHubInfo.EventHubName) == 0 {
return nil, fmt.Errorf("no event hub name string given")
}
}

return &meta, nil
}

Expand Down
25 changes: 24 additions & 1 deletion pkg/scalers/azure_eventhub_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

"github.com/kedacore/keda/pkg/scalers/azure"

eventhub "github.com/Azure/azure-event-hubs-go"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-storage-blob-go/azblob"
)

Expand Down Expand Up @@ -51,6 +51,18 @@ var parseEventHubMetadataDataset = []parseEventHubMetadataTestData{
{map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "blobContainer": testContainerName}, false},
}

var parseEventHubMetadataDatasetWithPodIdentity = []parseEventHubMetadataTestData{
{map[string]string{}, true},
// Even though connection string is provided, this should fail because the eventhub Namespace is not provided explicitly when using Pod Identity
{map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, true},
// properly formed event hub metadata with Pod Identity
{map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, false},
// missing eventHubname
{map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubNamespace": testEventHubNamespace}, true},
// missing eventHubNamespace
{map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName}, true},
}

var eventHubMetricIdentifiers = []eventHubMetricIdentifier{
{&parseEventHubMetadataDataset[1], "azure-eventhub-none-testEventHubConsumerGroup"},
}
Expand All @@ -76,6 +88,17 @@ func TestParseEventHubMetadata(t *testing.T) {
t.Error("Expected error and got success")
}
}

for _, testData := range parseEventHubMetadataDatasetWithPodIdentity {
_, err := parseAzureEventHubMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv, AuthParams: map[string]string{}, PodIdentity: "Azure"})

if err != nil && !testData.isError {
t.Errorf("Expected success but got error: %s", err)
}
if testData.isError && err == nil {
t.Error("Expected error and got success")
}
}
}

func TestGetUnprocessedEventCountInPartition(t *testing.T) {
Expand Down

0 comments on commit d217b93

Please sign in to comment.