Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/splunk observability scaler #6192

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### New

- **General**: Add Splunk Observability Cloud Scaler ([#6190](https://github.com/kedacore/keda/issues/6190))
- **CloudEventSource**: Introduce ClusterCloudEventSource ([#3533](https://github.com/kedacore/keda/issues/3533))

#### Experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.14.0
controller-gen.kubebuilder.io/version: v0.15.0
name: clustercloudeventsources.eventing.keda.sh
spec:
group: eventing.keda.sh
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ require (
github.com/robfig/cron/v3 v3.0.1
github.com/segmentio/kafka-go v0.4.47
github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.1.0
github.com/signalfx/signalflow-client-go/v2 v2.3.0
github.com/spf13/cast v1.6.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
Expand Down Expand Up @@ -119,6 +120,8 @@ require (
sigs.k8s.io/kustomize/kustomize/v5 v5.4.3
)

require github.com/signalfx/signalfx-go v1.34.0 // indirect

// Remove this when they merge the PR and cut a release https://github.com/open-policy-agent/cert-controller/pull/202
replace github.com/open-policy-agent/cert-controller => github.com/jorturfer/cert-controller v0.0.0-20240427003941-363ba56751d7

Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1638,6 +1638,10 @@ github.com/shurcooL/go v0.0.0-20200502201357-93f07166e636/go.mod h1:TDJrrUr11Vxr
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw=
github.com/signalfx/signalflow-client-go/v2 v2.3.0 h1:CMhvEfDDWbdPCfMNiQTAymRIRzVbgveGbTq5wr8OHuM=
github.com/signalfx/signalflow-client-go/v2 v2.3.0/go.mod h1:ir6CHksVkhh1vlslldjf6k5qD88QQxWW8WMG5PxSQco=
github.com/signalfx/signalfx-go v1.34.0 h1:OQ6tyMY4efWB57EPIQqrpWrAfcSdyfa+bLtmAe7GLfE=
github.com/signalfx/signalfx-go v1.34.0/go.mod h1:IpGZLPvCKNFyspAXoS480jB02mocTpo0KYd8jbl6/T8=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
Expand Down
2 changes: 1 addition & 1 deletion pkg/metricsservice/api/metrics.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/metricsservice/api/metrics_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/scalers/externalscaler/externalscaler.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/scalers/externalscaler/externalscaler_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/scalers/liiklus/LiiklusService.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/scalers/liiklus/LiiklusService_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

182 changes: 182 additions & 0 deletions pkg/scalers/splunk_observability_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package scalers

import (
"context"
"fmt"
"math"
"regexp"
"time"

"github.com/go-logr/logr"
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
kedautil "github.com/kedacore/keda/v2/pkg/util"
"github.com/signalfx/signalflow-client-go/v2/signalflow"
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/metrics/pkg/apis/external_metrics"
)

type splunkObservabilityMetadata struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we support a parameter to override an endpoint in case that changes in the future?

TriggerIndex int

AccessToken string `keda:"name=accessToken, order=authParams"`
Realm string `keda:"name=realm, order=authParams"`
Query string `keda:"name=query, order=triggerMetadata"`
Duration int `keda:"name=duration, order=triggerMetadata"`
TargetValue float64 `keda:"name=targetValue, order=triggerMetadata"`
QueryAggregator string `keda:"name=queryAggregator, order=triggerMetadata"`
ActivationTargetValue float64 `keda:"name=activationTargetValue, order=triggerMetadata"`
}

type splunkObservabilityScaler struct {
metadata *splunkObservabilityMetadata
apiClient *signalflow.Client
logger logr.Logger
}

func parseSplunkObservabilityMetadata(config *scalersconfig.ScalerConfig) (*splunkObservabilityMetadata, error) {
meta := &splunkObservabilityMetadata{}
meta.TriggerIndex = config.TriggerIndex

if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing splunk observability metadata: %w", err)
}

return meta, nil
}

func newSplunkO11yConnection(meta *splunkObservabilityMetadata, logger logr.Logger) (*signalflow.Client, error) {
logger.Info(fmt.Sprintf("meta: %+v\n", meta))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we actually need to log this?


if meta.Realm == "" || meta.AccessToken == "" {
return nil, fmt.Errorf("error: Could not find splunk access token or ream")
sschimper-splunk marked this conversation as resolved.
Show resolved Hide resolved
}

apiClient, err := signalflow.NewClient(
signalflow.StreamURLForRealm(meta.Realm),
signalflow.AccessToken(meta.AccessToken),
signalflow.OnError(func(err error) {
error_msg := fmt.Sprintf("error in SignalFlow client: %v\n", err)
logger.Info(error_msg)
}))
if err != nil {
return nil, fmt.Errorf("error creating SignalFlow client: %w", err)
}

return apiClient, nil
}

func NewSplunkObservabilityScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
logger := InitializeLogger(config, "splunk_observability_scaler")

meta, err := parseSplunkObservabilityMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing Splunk metadata: %w", err)
}

apiClient, err := newSplunkO11yConnection(meta, logger)
if err != nil {
return nil, fmt.Errorf("error establishing Splunk Observability Cloud connection: %w", err)
}

return &splunkObservabilityScaler{
metadata: meta,
apiClient: apiClient,
logger: logger,
}, nil
}

func (s *splunkObservabilityScaler) getQueryResult() (float64, error) {
comp, err := s.apiClient.Execute(context.Background(), &signalflow.ExecuteRequest{
Program: s.metadata.Query,
})
if err != nil {
return -1, fmt.Errorf("error: could not execute signalflow query: %w", err)
}

s.logger.Info("Started MTS stream.")

time.Sleep(time.Duration(s.metadata.Duration * int(time.Second)))
if err := comp.Stop(context.Background()); err != nil {
return -1, fmt.Errorf("error creating SignalFlow client: %w", err)
}

s.logger.Info("Closed MTS stream.")

max := math.Inf(-1)
min := math.Inf(1)
valueSum := 0.0
valueCount := 0
s.logger.Info("Now iterating over results.")
for msg := range comp.Data() {
if len(msg.Payloads) == 0 {
s.logger.Info("No data retreived.")
sschimper-splunk marked this conversation as resolved.
Show resolved Hide resolved
continue
}
for _, pl := range msg.Payloads {
value, ok := pl.Value().(float64)
if !ok {
return -1, fmt.Errorf("error: could not convert Splunk Observability metric value to float64")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're returning an error type, I don't think we need to include error: in the error message

}
s.logger.Info(fmt.Sprintf("Encountering value %.4f\n", value))
max = math.Max(max, value)
min = math.Min(min, value)
valueSum += value
valueCount++
}
}

if valueCount > 1 && s.metadata.QueryAggregator == "" {
return 0, fmt.Errorf("query returned more than 1 series; modify the query to return only 1 series or add a queryAggregator")
}

switch s.metadata.QueryAggregator {
case "max":
s.logger.Info(fmt.Sprintf("Returning max value: %.4f\n", max))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to log every value being returned. The function should just execute the logic. Otherwise, the logs will get pretty noisy

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was just for me as a sort of poor-mans-debugging. Let me get back to you with this.

return max, nil
case "min":
s.logger.Info(fmt.Sprintf("Returning min value: %.4f\n", min))
return min, nil
case "avg":
avg := valueSum / float64(valueCount)
s.logger.Info(fmt.Sprintf("Returning avg value: %.4f\n", avg))
return avg, nil
default:
return max, nil
}
}

func (s *splunkObservabilityScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
num, err := s.getQueryResult()

if err != nil {
s.logger.Error(err, "error getting metrics from Splunk Observability Cloud.")
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error getting metrics from Splunk Observability Cloud: %w", err)
}
metric := GenerateMetricInMili(metricName, num)

return []external_metrics.ExternalMetricValue{metric}, num > s.metadata.ActivationTargetValue, nil
}

func (s *splunkObservabilityScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := kedautil.NormalizeString("signalfx")
re := regexp.MustCompile(`data\('([^']*)'`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recompiling this regex every time GetMetricSpecForScaling() is expensive in terms of CPU operations. To optimize this. We should move the regex to be compiled with the package and not every time the function is called. To do this, we can simply put it in a var at the top of the package like so:

var (
     dataRegex = regexp.MustCompile(`data\('([^']*)'`)
)

Is regex necessary though? Is there not a more predictable way to get the data returned?

match := re.FindStringSubmatch(s.metadata.Query)
if len(match) > 1 {
metricName = kedautil.NormalizeString(match[1])
}

externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: metricName,
},
Target: GetMetricTargetMili(v2.ValueMetricType, s.metadata.TargetValue),
}
metricSpec := v2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}
return []v2.MetricSpec{metricSpec}
}

func (s *splunkObservabilityScaler) Close(context.Context) error {
return nil
}
89 changes: 89 additions & 0 deletions pkg/scalers/splunk_observability_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package scalers

import (
"context"
"testing"

"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
)

type parseSplunkObservabilityMetadataTestData struct {
metadata map[string]string
authParams map[string]string
isError bool
}

type SplunkObservabilityMetricIdentifier struct {
metadataTestData *parseSplunkObservabilityMetadataTestData
triggerIndex int
metricName string
}

var validSplunkObservabilityAuthParams = map[string]string{
"accessToken": "my-suyper-secret-access-token",
"realm": "my-realm",
}

var validSplunkObservabilityMetadata = map[string]string{
"query": "data('demo.trans.latency').max().publish()",
"duration": "10",
"targetValue": "200.0",
"queryAggregator": "avg",
"ActivationTargetValue": "1.1",
}

var testSplunkObservabilityMetadata = []parseSplunkObservabilityMetadataTestData{
// Valid metadata and valid auth params, pass.
{validSplunkObservabilityMetadata, validSplunkObservabilityAuthParams, false},
// no params at all, fail
{map[string]string{}, map[string]string{}, true},
// No meta dada but valid auth, fail.
{map[string]string{}, validSplunkObservabilityAuthParams, true},
// Valid meta dada but no auth params, fail.
{validSplunkObservabilityMetadata, map[string]string{}, true},
// Missing 'query' field, fail
{map[string]string{"duration": "10", "targetValue": "200.0", "queryAggregator": "avg", "ActivationTargetValue": "1.1"}, validSplunkObservabilityAuthParams, true},
// Missing 'duration' field, fail
{map[string]string{"query": "data('demo.trans.latency').max().publish()", "targetValue": "200.0", "queryAggregator": "avg", "ActivationTargetValue": "1.1"}, validSplunkObservabilityAuthParams, true},
// Missing 'targetValue' field, fail
{map[string]string{"query": "data('demo.trans.latency').max().publish()", "duration": "10", "queryAggregator": "avg", "ActivationTargetValue": "1.1"}, validSplunkObservabilityAuthParams, true},
// Missing 'queryAggregator' field, fail
{map[string]string{"query": "data('demo.trans.latency').max().publish()", "duration": "10", "targetValue": "200.0", "ActivationTargetValue": "1.1"}, validSplunkObservabilityAuthParams, true},
// Missing 'ActivationTargetValue' field, fail
{map[string]string{"query": "data('demo.trans.latency').max().publish()", "duration": "10", "targetValue": "200.0", "queryAggregator": "avg"}, validSplunkObservabilityAuthParams, true},
}

var SplunkObservabilityMetricIdentifiers = []SplunkObservabilityMetricIdentifier{
{&testSplunkObservabilityMetadata[0], 0, "demo-trans-latency"},
{&testSplunkObservabilityMetadata[0], 1, "demo-trans-latency"},
}

func TestSplunkObservabilityParseMetadata(t *testing.T) {
for _, testData := range testSplunkObservabilityMetadata {
_, err := parseSplunkObservabilityMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
} else if testData.isError && err == nil {
t.Error("Expected error but got success")
}
}
}

func TestSplunkObservabilityGetMetricSpecForScaling(t *testing.T) {
for _, testData := range SplunkObservabilityMetricIdentifiers {
ctx := context.Background()
meta, err := parseSplunkObservabilityMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: validSplunkObservabilityAuthParams, TriggerIndex: testData.triggerIndex})
if err != nil {
t.Fatal("Could not parse Splunk Observability metadata:", err)
}
mockSplunkObservabilityScaler := splunkObservabilityScaler{
metadata: meta,
}

metricSpec := mockSplunkObservabilityScaler.GetMetricSpecForScaling(ctx)
metricName := metricSpec[0].External.Metric.Name
if metricName != testData.metricName {
t.Error("Wrong External metric source name:", metricName)
}
}
}
2 changes: 2 additions & 0 deletions pkg/scaling/scalers_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string,
return scalers.NewSolrScaler(config)
case "splunk":
return scalers.NewSplunkScaler(config)
case "splunk-observability":
return scalers.NewSplunkObservabilityScaler(config)
case "stan":
return scalers.NewStanScaler(config)
default:
Expand Down
Loading
Loading