Skip to content

Commit

Permalink
Add azure blob scaler (#514)
Browse files Browse the repository at this point in the history
* added azure blob scaler

* rename "containerName" param to "blobContainerName"
  • Loading branch information
IsharaPannila authored and ahmelsayed committed Jan 16, 2020
1 parent 82b0803 commit 1aeb063
Show file tree
Hide file tree
Showing 5 changed files with 481 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/handler/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ func (h *ScaleHandler) getScaler(name, namespace, triggerType string, resolvedEn
return scalers.NewStanScaler(resolvedEnv, triggerMetadata)
case "huawei-cloudeye":
return scalers.NewHuaweiCloudeyeScaler(triggerMetadata, authParams)
case "azure-blob":
return scalers.NewAzureBlobScaler(resolvedEnv, triggerMetadata, authParams, podIdentity)
default:
return nil, fmt.Errorf("no scaler found for type: %s", triggerType)
}
Expand Down
60 changes: 60 additions & 0 deletions pkg/scalers/azure_blob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package scalers

import (
"context"
"fmt"
"net/url"

"github.com/Azure/azure-storage-blob-go/azblob"
)

// GetAzureBlobListLength returns the count of the blobs in blob container in int
func GetAzureBlobListLength(ctx context.Context, podIdentity string, connectionString, blobContainerName string, accountName string, blobDelimiter string, blobPrefix string) (int, error) {

var credential azblob.Credential
var listBlobsSegmentOptions azblob.ListBlobsSegmentOptions
var err error

if podIdentity == "" || podIdentity == "none" {

var accountKey string

_, accountName, accountKey, _, err = ParseAzureStorageConnectionString(connectionString)

if err != nil {
return -1, err
}

credential, err = azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
return -1, err
}
} else if podIdentity == "azure" {
token, err := getAzureADPodIdentityToken("https://storage.azure.com/")
if err != nil {
azureBlobLog.Error(err, "Error fetching token cannot determine blob list count")
return -1, nil
}

credential = azblob.NewTokenCredential(token.AccessToken, nil)
} else {
return -1, fmt.Errorf("Azure blobs doesn't support %s pod identity type", podIdentity)

}

if blobPrefix != "" {
listBlobsSegmentOptions.Prefix = blobPrefix
}

p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
u, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net", accountName))
serviceURL := azblob.NewServiceURL(*u, p)
containerURL := serviceURL.NewContainerURL(blobContainerName)

props, err := containerURL.ListBlobsHierarchySegment(ctx, azblob.Marker{} , blobDelimiter, listBlobsSegmentOptions)
if err != nil {
return -1, err
}

return len(props.Segment.BlobItems) , nil
}
184 changes: 184 additions & 0 deletions pkg/scalers/azure_blob_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package scalers

import (
"context"
"fmt"
"strconv"

v2beta1 "k8s.io/api/autoscaling/v2beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

const (
blobCountMetricName = "blobCount"
defaultTargetBlobCount = 5
defaultBlobDelimiter = "/"
defaultBlobPrefix = ""
defaultBlobConnectionSetting = "AzureWebJobsStorage"
)

type azureBlobScaler struct {
metadata *azureBlobMetadata
podIdentity string
}

type azureBlobMetadata struct {
targetBlobCount int
blobContainerName string
blobDelimiter string
blobPrefix string
connection string
useAAdPodIdentity bool
accountName string
}

var azureBlobLog = logf.Log.WithName("azure_blob_scaler")

// NewAzureBlobScaler creates a new azureBlobScaler
func NewAzureBlobScaler(resolvedEnv, metadata, authParams map[string]string, podIdentity string) (Scaler, error) {
meta, podIdentity, err := parseAzureBlobMetadata(metadata, resolvedEnv, authParams, podIdentity)
if err != nil {
return nil, fmt.Errorf("error parsing azure blob metadata: %s", err)
}

return &azureBlobScaler{
metadata: meta,
podIdentity: podIdentity,
}, nil
}

func parseAzureBlobMetadata(metadata, resolvedEnv, authParams map[string]string, podAuth string) (*azureBlobMetadata, string, error) {
meta := azureBlobMetadata{}
meta.targetBlobCount = defaultTargetBlobCount
meta.blobDelimiter = defaultBlobDelimiter
meta.blobPrefix = defaultBlobPrefix

if val, ok := metadata[blobCountMetricName]; ok {
blobCount, err := strconv.Atoi(val)
if err != nil {
azureBlobLog.Error(err, "Error parsing azure blob metadata", "blobCountMetricName", blobCountMetricName)
return nil, "", fmt.Errorf("Error parsing azure blob metadata %s: %s", blobCountMetricName, err.Error())
}

meta.targetBlobCount = blobCount
}

if val, ok := metadata["blobContainerName"]; ok && val != "" {
meta.blobContainerName = val
} else {
return nil, "", fmt.Errorf("no blobContainerName given")
}

if val, ok := metadata["blobDelimiter"]; ok {
if val != "" {
meta.blobDelimiter = val
}
}

if val, ok := metadata["blobPrefix"]; ok {
if val != "" {
meta.blobPrefix = val + meta.blobDelimiter
}
}
// before triggerAuthentication CRD, pod identity was configured using this property
if val, ok := metadata["useAAdPodIdentity"]; ok && podAuth == "" {
if val == "true" {
podAuth = "azure"
}
}

// If the Use AAD Pod Identity is not present, or set to "none"
// then check for connection string
if podAuth == "" || podAuth == "none" {
// Azure Blob Scaler expects a "connection" parameter in the metadata
// of the scaler or in a TriggerAuthentication object
connection := authParams["connection"]
if connection != "" {
// Found the connection in a parameter from TriggerAuthentication
meta.connection = connection
} else {
connectionSetting := defaultBlobConnectionSetting
if val, ok := metadata["connection"]; ok && val != "" {
connectionSetting = val
}

if val, ok := resolvedEnv[connectionSetting]; ok {
meta.connection = val
} else {
return nil, "", fmt.Errorf("no connection setting given")
}
}
} else if podAuth == "azure" {
// If the Use AAD Pod Identity is present then check account name
if val, ok := metadata["accountName"]; ok && val != "" {
meta.accountName = val
} else {
return nil, "", fmt.Errorf("no accountName given")
}
} else {
return nil, "", fmt.Errorf("pod identity %s not supported for azure storage blobs", podAuth)
}

return &meta, podAuth, nil
}

// GetScaleDecision is a func
func (s *azureBlobScaler) IsActive(ctx context.Context) (bool, error) {
length, err := GetAzureBlobListLength(
ctx,
s.podIdentity,
s.metadata.connection,
s.metadata.blobContainerName,
s.metadata.accountName,
s.metadata.blobDelimiter,
s.metadata.blobPrefix,
)

if err != nil {
azureBlobLog.Error(err, "error)")
return false, err
}

return length > 0, nil
}

func (s *azureBlobScaler) Close() error {
return nil
}

func (s *azureBlobScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec {
targetBlobCount := resource.NewQuantity(int64(s.metadata.targetBlobCount), resource.DecimalSI)
externalMetric := &v2beta1.ExternalMetricSource{MetricName: blobCountMetricName, TargetAverageValue: targetBlobCount}
metricSpec := v2beta1.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2beta1.MetricSpec{metricSpec}
}

//GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *azureBlobScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
bloblen, err := GetAzureBlobListLength(
ctx,
s.podIdentity,
s.metadata.connection,
s.metadata.blobContainerName,
s.metadata.accountName,
s.metadata.blobDelimiter,
s.metadata.blobPrefix,
)

if err != nil {
azureBlobLog.Error(err, "error getting blob list length")
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(bloblen), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
84 changes: 84 additions & 0 deletions pkg/scalers/azure_blob_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package scalers

import (
"context"
"strings"
"testing"
)

func TestGetBlobLength(t *testing.T) {
length, err := GetAzureBlobListLength(context.TODO(), "", "", "blobContainerName", "", "","")
if length != -1 {
t.Error("Expected length to be -1, but got", length)
}

if err == nil {
t.Error("Expected error for empty connection string, but got nil")
}

if !strings.Contains(err.Error(), "parse storage connection string") {
t.Error("Expected error to contain parsing error message, but got", err.Error())
}

length, err = GetAzureBlobListLength(context.TODO(), "", "DefaultEndpointsProtocol=https;AccountName=name;AccountKey=key==;EndpointSuffix=core.windows.net", "blobContainerName", "", "","")

if length != -1 {
t.Error("Expected length to be -1, but got", length)
}

if err == nil {
t.Error("Expected error for empty connection string, but got nil")
}

if !strings.Contains(err.Error(), "illegal base64") {
t.Error("Expected error to contain base64 error message, but got", err.Error())
}
}

var testAzBlobResolvedEnv = map[string]string{
"CONNECTION": "SAMPLE",
}

type parseAzBlobMetadataTestData struct {
metadata map[string]string
isError bool
resolvedEnv map[string]string
authParams map[string]string
podIdentity string
}

var testAzBlobMetadata = []parseAzBlobMetadataTestData{
// nothing passed
{map[string]string{}, true, testAzBlobResolvedEnv, map[string]string{}, ""},
// properly formed
{map[string]string{"connection": "CONNECTION", "blobContainerName": "sample", "blobCount": "5", "blobDelimiter": "/", "blobPrefix": "blobsubpath"}, false, testAzBlobResolvedEnv, map[string]string{}, ""},
// Empty blobcontainerName
{map[string]string{"connection": "CONNECTION", "blobContainerName": ""}, true, testAzBlobResolvedEnv, map[string]string{}, ""},
// improperly formed blobCount
{map[string]string{"connection": "CONNECTION", "blobContainerName": "sample", "blobCount": "AA"}, true, testAzBlobResolvedEnv, map[string]string{}, ""},
// podIdentity = azure with account name
{map[string]string{"accountName": "sample_acc", "blobContainerName": "sample_container"}, false, testAzBlobResolvedEnv, map[string]string{}, "azure"},
// podIdentity = azure without account name
{map[string]string{"accountName": "", "blobContainerName": "sample_container"}, true, testAzBlobResolvedEnv, map[string]string{}, "azure"},
// podIdentity = azure without blob container name
{map[string]string{"accountName": "sample_acc", "blobContainerName": ""}, true, testAzBlobResolvedEnv, map[string]string{}, "azure"},
// connection from authParams
{map[string]string{"blobContainerName": "sample_container", "blobCount": "5"}, false, testAzBlobResolvedEnv, map[string]string{"connection": "value"}, "none"},

}

func TestAzBlobParseMetadata(t *testing.T) {
for _, testData := range testAzBlobMetadata {
_, podIdentity, err := parseAzureBlobMetadata(testData.metadata, testData.resolvedEnv, testData.authParams, testData.podIdentity)
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Errorf("Expected error but got success. testData: %v", testData)
}
if testData.podIdentity != "" && testData.podIdentity != podIdentity && err == nil {
t.Error("Expected success but got error: podIdentity value is not returned as expected")

}
}
}
Loading

0 comments on commit 1aeb063

Please sign in to comment.