Skip to content

Commit

Permalink
Add Nats Streaming (aka stan) support in Keda (#377)
Browse files Browse the repository at this point in the history
* add nats streaming support in keda.

* add readme, log info while scaling the metrics.

* add stan scaler parser test.

* revert the changes made to scale_handler.

* add documentation on how to use nats streaming.

* use kedacore in values.yaml.

* dont panic when there is an error.  return the error back.

* reformat the trigger specs in line with other triggers.

* remove some new lines.
  • Loading branch information
balchua authored and ahmelsayed committed Oct 21, 2019
1 parent f969864 commit fed1348
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 0 deletions.
22 changes: 22 additions & 0 deletions examples/stan_scaledobject.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
apiVersion: keda.k8s.io/v1alpha1
kind: ScaledObject
metadata:
name: stan-scaledobject
namespace: gonuts
labels:
deploymentName: gonuts-sub
spec:
pollingInterval: 10 # Optional. Default: 30 seconds
cooldownPeriod: 30 # Optional. Default: 300 seconds
minReplicaCount: 0 # Optional. Default: 0
maxReplicaCount: 30 # Optional. Default: 100
scaleTargetRef:
deploymentName: gonuts-sub
triggers:
- type: stan
metadata:
natsServerMonitoringEndpoint: "stan-nats-ss.stan.svc.cluster.local:8222"
queueGroup: "grp1"
durableName: "ImDurable"
subject: "Test"
lagThreshold: "10"
2 changes: 2 additions & 0 deletions pkg/handler/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,8 @@ func (h *ScaleHandler) getScaler(name, namespace, triggerType string, resolvedEn
return scalers.NewExternalScaler(name, namespace, resolvedEnv, triggerMetadata)
case "liiklus":
return scalers.NewLiiklusScaler(resolvedEnv, triggerMetadata)
case "stan":
return scalers.NewStanScaler(resolvedEnv, triggerMetadata)
default:
return nil, fmt.Errorf("no scaler found for type: %s", triggerType)
}
Expand Down
189 changes: 189 additions & 0 deletions pkg/scalers/stan_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package scalers

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strconv"

log "github.com/sirupsen/logrus"
v2beta1 "k8s.io/api/autoscaling/v2beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
)

type monitorChannelInfo struct {
Name string `json:"name"`
MsgCount int64 `json:"msgs"`
LastSequence int64 `json:"last_seq"`
Subscriber []monitorSubscriberInfo `json:"subscriptions"`
}

type monitorSubscriberInfo struct {
ClientID string `json:"client_id"`
QueueName string `json:"queue_name"`
Inbox string `json:"inbox"`
AckInbox string `json:"ack_inbox"`
IsDurable bool `json:"is_durable"`
IsOffline bool `json:"is_offline"`
MaxInflight int `json:"max_inflight"`
LastSent int64 `json:"last_sent"`
PendingCount int `json:"pending_count"`
IsStalled bool `json:"is_stalled"`
}

type stanScaler struct {
channelInfo *monitorChannelInfo
metadata stanMetadata
}

type stanMetadata struct {
natsServerMonitoringEndpoint string
queueGroup string
durableName string
subject string
lagThreshold int64
}

const (
stanLagThresholdMetricName = "lagThreshold"
stanMetricType = "External"
defaultStanLagThreshold = 10
)

// NewStanScaler creates a new stanScaler
func NewStanScaler(resolvedSecrets, metadata map[string]string) (Scaler, error) {
stanMetadata, err := parseStanMetadata(metadata)
if err != nil {
return nil, fmt.Errorf("error parsing kafka metadata: %s", err)
}

return &stanScaler{
channelInfo: &monitorChannelInfo{},
metadata: stanMetadata,
}, nil
}

func parseStanMetadata(metadata map[string]string) (stanMetadata, error) {
meta := stanMetadata{}

if metadata["natsServerMonitoringEndpoint"] == "" {
return meta, errors.New("no monitoring endpoint given")
}
meta.natsServerMonitoringEndpoint = metadata["natsServerMonitoringEndpoint"]

if metadata["queueGroup"] == "" {
return meta, errors.New("no queue group given")
}
meta.queueGroup = metadata["queueGroup"]

if metadata["durableName"] == "" {
return meta, errors.New("no durable name group given")
}
meta.durableName = metadata["durableName"]

if metadata["subject"] == "" {
return meta, errors.New("no subject given")
}
meta.subject = metadata["subject"]

meta.lagThreshold = defaultStanLagThreshold

if val, ok := metadata[lagThresholdMetricName]; ok {
t, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return meta, fmt.Errorf("error parsing %s: %s", lagThresholdMetricName, err)
}
meta.lagThreshold = t
}

return meta, nil
}

// IsActive determines if we need to scale from zero
func (s *stanScaler) IsActive(ctx context.Context) (bool, error) {
resp, err := http.Get(s.getMonitoringEndpoint())
if err != nil {
log.Errorf("Unable to access the nats streaming (%s) broker monitoring endpoint", s.metadata.natsServerMonitoringEndpoint)
return false, err
}
defer resp.Body.Close()
json.NewDecoder(resp.Body).Decode(&s.channelInfo)

return s.hasPendingMessage() || s.getMaxMsgLag() > 0, nil
}

func (s *stanScaler) getMonitoringEndpoint() string {
return "http://" + s.metadata.natsServerMonitoringEndpoint + "/streaming/channelsz?" + "channel=" + s.metadata.subject + "&subs=1"
}

func (s *stanScaler) getTotalMessages() int64 {
return s.channelInfo.MsgCount
}

func (s *stanScaler) getMaxMsgLag() int64 {
var maxValue int64
maxValue = 0
for _, subs := range s.channelInfo.Subscriber {
if subs.LastSent > maxValue && subs.QueueName == (s.metadata.durableName+":"+s.metadata.queueGroup) {
maxValue = subs.LastSent
}
}

return s.channelInfo.MsgCount - maxValue
}

func (s *stanScaler) hasPendingMessage() bool {
var hasPending bool
hasPending = false
for _, subs := range s.channelInfo.Subscriber {
if subs.PendingCount > 0 && subs.QueueName == (s.metadata.durableName+":"+s.metadata.queueGroup) {
hasPending = true
}
}

return hasPending
}

func (s *stanScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec {
return []v2beta1.MetricSpec{
{
External: &v2beta1.ExternalMetricSource{
MetricName: lagThresholdMetricName,
TargetAverageValue: resource.NewQuantity(s.metadata.lagThreshold, resource.DecimalSI),
},
Type: stanMetricType,
},
}
}

//GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *stanScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
resp, err := http.Get(s.getMonitoringEndpoint())

if err != nil {
log.Errorf("Unable to access the nats streaming (%s) broker monitoring endpoint", s.metadata.natsServerMonitoringEndpoint)
return []external_metrics.ExternalMetricValue{}, err
}

defer resp.Body.Close()
json.NewDecoder(resp.Body).Decode(&s.channelInfo)
totalLag := s.getMaxMsgLag()
log.Debugf("Stan scaler: Providing metrics based on totalLag %v, threshold %v", totalLag, s.metadata.lagThreshold)
metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(totalLag), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// Nothing to close here.
func (s *stanScaler) Close() error {
return nil
}
35 changes: 35 additions & 0 deletions pkg/scalers/stan_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package scalers

import (
"testing"
)

type parseStanMetadataTestData struct {
metadata map[string]string
isError bool
}

var testStanMetadata = []parseStanMetadataTestData{
// nothing passed
{map[string]string{}, true},
// Missing subject name, should fail
{map[string]string{"natsServerMonitoringEndpoint": "stan-nats-ss", "queueGroup": "grp1", "durableName": "ImDurable"}, true},
// Missing durable name, should fail
{map[string]string{"natsServerMonitoringEndpoint": "stan-nats-ss", "queueGroup": "grp1", "subject": "mySubject"}, true},
// Missing nats server monitoring endpoint, should fail
{map[string]string{"queueGroup": "grp1", "subject": "mySubject"}, true},
// All good.
{map[string]string{"natsServerMonitoringEndpoint": "stan-nats-ss", "queueGroup": "grp1", "durableName": "ImDurable", "subject": "mySubject"}, false},
}

func TestStanParseMetadata(t *testing.T) {
for _, testData := range testStanMetadata {
_, err := parseStanMetadata(testData.metadata)
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
}
}
25 changes: 25 additions & 0 deletions spec/triggers/stan.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Nats Streaming Trigger

The specification describes the `stan` trigger.

```yaml
triggers:
- type: stan
metadata:
natsServerMonitoringEndpoint: "stan-nats-ss.stan.svc.cluster.local:8222"
queueGroup: "grp1"
durableName: "ImDurable"
subject: "Test"
lagThreshold: "10"
```
Where:
* `natsServerMonitoringEndpoint` : Is the location of the Nats Streaming monitoring endpoint.
* `queuGroup` : The queue group name of the subscribers.

This comment has been minimized.

Copy link
@felipecruz91

felipecruz91 Oct 21, 2019

Typo

* `durableName` : Must identify the durability name used by the subscribers.
* `subject` : Sometimes called the channel name.
* `lagThreshold` : This value is used to tell the Horizontal Pod Autoscaler to use as TargetAverageValue.


Example [`examples/stan_scaledobject.yaml`](./../../examples/stan_scaledobject.yaml)

0 comments on commit fed1348

Please sign in to comment.