diff --git a/README.md b/README.md index 89048d12..7808ec54 100644 --- a/README.md +++ b/README.md @@ -402,6 +402,64 @@ the `backend` label under `matchLabels` for the metric. The ingress annotation where the backend weights can be obtained can be specified through the flag `--skipper-backends-annotation`. +## External RPS collector + +The External RPS collector, like Skipper collector, is a simple wrapper around the Prometheus collector to +make it easy to define an HPA for scaling based on the RPS measured for a given hostname. When +[skipper](https://github.com/zalando/skipper) is used as the ingress +implementation in your cluster everything should work automatically, in case another reverse proxy is used as ingress, like [Nginx](https://github.com/kubernetes/ingress-nginx) for example, its necessary to configure which prometheus metric should be used through `--external-rps-metric-name ` flag. Assuming `skipper-ingress` is being used or the appropriate metric name is passed using the flag mentioned previously this collector provides the correct Prometheus queries out of the +box so users don't have to define those manually. + +### Supported metrics + +| Metric | Description | Type | Kind | K8s Versions | +| ------------ | -------------- | ------- | -- | -- | +| `requests-per-second` | Scale based on requests per second for a certain hostname. | External | | `>=1.12` | + +### Example: External Metric + +This is an example of an HPA that will scale based on `requests-per-second` for the RPS measured in the hostnames called: `www.example1.com` and `www.example2.com`; and weighted by 42%. + +```yaml +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: myapp-hpa + annotations: + metric-config.external.example-rps.requests-per-second/hostname: www.example1.com,www.example2.com + metric-config.external.example-rps.requests-per-second/weight: "42" +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: custom-metrics-consumer + minReplicas: 1 + maxReplicas: 10 + metrics: + - type: External + external: + metric: + name: example-rps + selector: + matchLabels: + type: requests-per-second + target: + type: AverageValue + averageValue: "42" +``` +### Multiple hostnames per metric + +This metric supports a relation of n:1 between hostnames and metrics. The way it works is the measured RPS is the sum of the RPS rate of each of the specified hostnames. This value is further modified by the weight parameter explained bellow. + +### Metric weighting based on backend + +There are ingress-controllers, like skipper-ingress, that supports sending traffic to different backends based on some kind of configuration, in case of skipper annotations +present on the `Ingress` object, or weights on the RouteGroup backends. By +default the number of replicas will be calculated based on the full traffic +served by these components. If however only the traffic being routed to +a specific hostname should be used then the weight for the configured hostname(s) might be specified via the `weight` annotation `metric-config.external..request-per-second/weight` for the metric being configured. + + ## InfluxDB collector The InfluxDB collector maps [Flux](https://github.com/influxdata/flux) queries to metrics that can be used for scaling. diff --git a/pkg/collector/external_rps_collector.go b/pkg/collector/external_rps_collector.go new file mode 100644 index 00000000..4d2d17ed --- /dev/null +++ b/pkg/collector/external_rps_collector.go @@ -0,0 +1,128 @@ +package collector + +import ( + "fmt" + "regexp" + "strconv" + "strings" + "time" + + autoscalingv2 "k8s.io/api/autoscaling/v2" +) + +const ( + ExternalRPSMetricType = "requests-per-second" + ExternalRPSQuery = `scalar(sum(rate(%s{host=~"%s"}[1m])) * %.4f)` +) + +type ExternalRPSCollectorPlugin struct { + metricName string + promPlugin CollectorPlugin + pattern *regexp.Regexp +} + +type ExternalRPSCollector struct { + interval time.Duration + promCollector Collector +} + +func NewExternalRPSCollectorPlugin( + promPlugin CollectorPlugin, + metricName string, +) (*ExternalRPSCollectorPlugin, error) { + if metricName == "" { + return nil, fmt.Errorf("failed to initialize hostname collector plugin, metric name was not defined") + } + + p, err := regexp.Compile("^[a-zA-Z0-9.-]+$") + if err != nil { + return nil, fmt.Errorf("failed to create regular expression to match hostname format") + } + + return &ExternalRPSCollectorPlugin{ + metricName: metricName, + promPlugin: promPlugin, + pattern: p, + }, nil +} + +// NewCollector initializes a new skipper collector from the specified HPA. +func (p *ExternalRPSCollectorPlugin) NewCollector( + hpa *autoscalingv2.HorizontalPodAutoscaler, + config *MetricConfig, + interval time.Duration, +) (Collector, error) { + if config == nil { + return nil, fmt.Errorf("metric config not present, it is not possible to initialize the collector") + } + // Need to copy config and add a promQL query in order to get + // RPS data from a specific hostname from prometheus. The idea + // of the copy is to not modify the original config struct. + confCopy := *config + + if _, ok := config.Config["hostnames"]; !ok { + return nil, fmt.Errorf("Hostname is not specified, unable to create collector") + } + + hostnames := strings.Split(config.Config["hostnames"], ",") + if p.pattern == nil { + return nil, fmt.Errorf("plugin did not specify hostname regex pattern, unable to create collector") + } + for _, h := range hostnames { + if ok := p.pattern.MatchString(h); !ok { + return nil, fmt.Errorf( + "invalid hostname format, unable to create collector: %s", + h, + ) + } + } + + weight := 1.0 + if w, ok := config.Config["weight"]; ok { + num, err := strconv.ParseFloat(w, 64) + if err != nil { + return nil, fmt.Errorf("could not parse weight annotation, unable to create collector: %s", w) + } + weight = num / 100.0 + } + + + + confCopy.Config = map[string]string{ + "query": fmt.Sprintf( + ExternalRPSQuery, + p.metricName, + strings.ReplaceAll(strings.Join(hostnames, "|"), ".", "_"), + weight, + ), + } + + c, err := p.promPlugin.NewCollector(hpa, &confCopy, interval) + if err != nil { + return nil, err + } + + return &ExternalRPSCollector{ + interval: interval, + promCollector: c, + }, nil +} + +// GetMetrics gets hostname metrics from Prometheus +func (c *ExternalRPSCollector) GetMetrics() ([]CollectedMetric, error) { + v, err := c.promCollector.GetMetrics() + if err != nil { + return nil, err + } + + if len(v) != 1 { + return nil, fmt.Errorf("expected to only get one metric value, got %d", len(v)) + } + return v, nil +} + +// Interval returns the interval at which the collector should run. +func (c *ExternalRPSCollector) Interval() time.Duration { + return c.interval +} + diff --git a/pkg/collector/external_rps_collector_test.go b/pkg/collector/external_rps_collector_test.go new file mode 100644 index 00000000..afaf3540 --- /dev/null +++ b/pkg/collector/external_rps_collector_test.go @@ -0,0 +1,305 @@ +package collector + +import ( + "fmt" + "regexp" + "testing" + "time" + + "github.com/stretchr/testify/require" + autoscalingv2 "k8s.io/api/autoscaling/v2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/metrics/pkg/apis/external_metrics" +) + +func TestExternalRPSCollectorPluginConstructor(tt *testing.T) { + for _, testcase := range []struct { + msg string + name string + isValid bool + }{ + {"No metric name", "", false}, + {"Valid metric name", "a_valid_metric_name", true}, + } { + tt.Run(testcase.msg, func(t *testing.T) { + + fakePlugin := &FakeCollectorPlugin{} + plugin, err := NewExternalRPSCollectorPlugin(fakePlugin, testcase.name) + + if testcase.isValid { + require.NoError(t, err) + require.NotNil(t, plugin) + require.Equal(t, testcase.name, plugin.metricName) + require.Equal(t, fakePlugin, plugin.promPlugin) + } else { + require.NotNil(t, err) + require.Nil(t, plugin) + } + }) + } +} + +func TestExternalRPSPluginNewCollector(tt *testing.T) { + fakePlugin := &FakeCollectorPlugin{} + + pattern, err := regexp.Compile("^[a-zA-Z0-9.-]+$") + require.Nil(tt, err, "Something is up, regex compiling failed.") + + plugin := &ExternalRPSCollectorPlugin{ + metricName: "a_valid_one", + promPlugin: fakePlugin, + pattern: pattern, + } + interval := time.Duration(42) + + for _, testcase := range []struct { + msg string + config *MetricConfig + expectedQuery string + shouldWork bool + }{ + { + "No hostname config", + &MetricConfig{Config: make(map[string]string)}, + "", + false, + }, + { + "Nil metric config", + nil, + "", + false, + }, + { + "Valid hostname no prom query config", + &MetricConfig{Config: map[string]string{"hostnames": "foo.bar.baz"}}, + `scalar(sum(rate(a_valid_one{host=~"foo_bar_baz"}[1m])) * 1.0000)`, + true, + }, + { + "Valid hostname no prom query config", + &MetricConfig{Config: map[string]string{"hostnames": "foo.bar.baz", "weight": "42"}}, + `scalar(sum(rate(a_valid_one{host=~"foo_bar_baz"}[1m])) * 0.4200)`, + true, + }, + { + "Multiple valid hostnames no prom query config", + &MetricConfig{Config: map[string]string{"hostnames": "foo.bar.baz,foz.bax.bas"}}, + `scalar(sum(rate(a_valid_one{host=~"foo_bar_baz|foz_bax_bas"}[1m])) * 1.0000)`, + true, + }, + { + "Valid hostname with prom query config", + &MetricConfig{ + Config: map[string]string{"hostnames": "foo.bar.baz", "query": "some_other_query"}, + }, + `scalar(sum(rate(a_valid_one{host=~"foo_bar_baz"}[1m])) * 1.0000)`, + true, + }, + } { + tt.Run(testcase.msg, func(t *testing.T) { + c, err := plugin.NewCollector( + &autoscalingv2.HorizontalPodAutoscaler{}, + testcase.config, + interval, + ) + + if testcase.shouldWork { + require.NotNil(t, c) + require.Nil(t, err) + require.Equal(t, testcase.expectedQuery, fakePlugin.config["query"]) + } else { + require.Nil(t, c) + require.NotNil(t, err) + } + }) + } +} + +func TestExternalRPSCollectorGetMetrics(tt *testing.T) { + genericErr := fmt.Errorf("This is an error") + expectedMetric := *resource.NewQuantity(int64(42), resource.DecimalSI) + + for _, testcase := range []struct { + msg string + stub func() ([]CollectedMetric, error) + shouldWork bool + }{ + { + "Internal collector error", + func() ([]CollectedMetric, error) { + return nil, genericErr + }, + false, + }, + { + "Invalid metric collection from internal collector", + func() ([]CollectedMetric, error) { + return []CollectedMetric{ + {External: external_metrics.ExternalMetricValue{Value: *resource.NewQuantity(int64(24), resource.DecimalSI)}}, + {External: external_metrics.ExternalMetricValue{Value: *resource.NewQuantity(int64(42), resource.DecimalSI)}}, + }, nil + }, + false, + }, + { + "Internal collector return single metric", + func() ([]CollectedMetric, error) { + return []CollectedMetric{ + {External: external_metrics.ExternalMetricValue{Value: *resource.NewQuantity(int64(42), resource.DecimalSI)}}, + }, nil + }, + true, + }, + } { + tt.Run(testcase.msg, func(t *testing.T) { + fake := makeCollectorWithStub(testcase.stub) + c := &ExternalRPSCollector{promCollector: fake} + m, err := c.GetMetrics() + + if testcase.shouldWork { + require.Nil(t, err) + require.NotNil(t, m) + require.Len(t, m, 1) + require.Equal(t, expectedMetric, m[0].External.Value) + } else { + require.NotNil(t, err) + require.Nil(t, m) + } + }) + } +} + +func TestExternalRPSCollectorInterval(t *testing.T) { + interval := time.Duration(42) + fakePlugin := &FakeCollectorPlugin{} + pattern, err := regexp.Compile("^[a-zA-Z0-9.-]+$") + require.Nil(t, err, "Something is up, regex compiling failed.") + plugin := &ExternalRPSCollectorPlugin{ + metricName: "a_valid_one", + promPlugin: fakePlugin, + pattern: pattern, + } + c, err := plugin.NewCollector( + &autoscalingv2.HorizontalPodAutoscaler{}, + &MetricConfig{Config: map[string]string{"hostnames": "foo.bar.baz"}}, + interval, + ) + + require.Nil(t, err) + require.NotNil(t, c) + require.Equal(t, interval, c.Interval()) +} + +func TestExternalRPSCollectorAndCollectorFabricInteraction(t *testing.T) { + expectedQuery := `scalar(sum(rate(a_metric{host=~"just_testing_com"}[1m])) * 0.4200)` + hpa := &autoscalingv2.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "metric-config.external.foo.requests-per-second/hostnames": "just.testing.com", + "metric-config.external.foo.requests-per-second/weight": "42", + }, + }, + Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ + Metrics: []autoscalingv2.MetricSpec{ + { + Type: autoscalingv2.ExternalMetricSourceType, + External: &autoscalingv2.ExternalMetricSource{ + Metric: autoscalingv2.MetricIdentifier{ + Name: "foo", + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"type": "requests-per-second"}, + }, + }, + }, + }, + }, + }, + } + + factory := NewCollectorFactory() + fakePlugin := makePlugin(42) + hostnamePlugin, err := NewExternalRPSCollectorPlugin(fakePlugin, "a_metric") + require.NoError(t, err) + factory.RegisterExternalCollector([]string{ExternalRPSMetricType}, hostnamePlugin) + conf, err := ParseHPAMetrics(hpa) + require.NoError(t, err) + require.Len(t, conf, 1) + + c, err := factory.NewCollector(hpa, conf[0], 0) + + require.NoError(t, err) + _, ok := c.(*ExternalRPSCollector) + require.True(t, ok) + require.Equal(t, expectedQuery, fakePlugin.config["query"]) + +} + +func TestExternalRPSPrometheusCollectorInteraction(t *testing.T) { + externalRPSQuery := `scalar(sum(rate(a_metric{host=~"just_testing_com"}[1m])) * 0.4200)` + promQuery := "sum(rate(rps[1m]))" + hpa := &autoscalingv2.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "metric-config.external.foo.requests-per-second/hostnames": "just.testing.com", + "metric-config.external.foo.requests-per-second/weight": "42", + "metric-config.external.bar.prometheus/query": promQuery, + }, + }, + Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ + Metrics: []autoscalingv2.MetricSpec{ + { + Type: autoscalingv2.ExternalMetricSourceType, + External: &autoscalingv2.ExternalMetricSource{ + Metric: autoscalingv2.MetricIdentifier{ + Name: "foo", + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"type": "requests-per-second"}, + }, + }, + }, + }, + { + Type: autoscalingv2.ExternalMetricSourceType, + External: &autoscalingv2.ExternalMetricSource{ + Metric: autoscalingv2.MetricIdentifier{ + Name: "bar", + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"type": "prometheus"}, + }, + }, + }, + }, + }, + }, + } + + factory := NewCollectorFactory() + promPlugin, err := NewPrometheusCollectorPlugin(nil, "http://prometheus") + require.NoError(t, err) + factory.RegisterExternalCollector([]string{PrometheusMetricType, PrometheusMetricNameLegacy}, promPlugin) + hostnamePlugin, err := NewExternalRPSCollectorPlugin(promPlugin, "a_metric") + require.NoError(t, err) + factory.RegisterExternalCollector([]string{ExternalRPSMetricType}, hostnamePlugin) + + conf, err := ParseHPAMetrics(hpa) + require.NoError(t, err) + require.Len(t, conf, 2) + + collectors := make(map[string]Collector) + collectors["hostname"], err = factory.NewCollector(hpa, conf[0], 0) + require.NoError(t, err) + collectors["prom"], err = factory.NewCollector(hpa, conf[1], 0) + require.NoError(t, err) + + prom, ok := collectors["prom"].(*PrometheusCollector) + require.True(t, ok) + hostname, ok := collectors["hostname"].(*ExternalRPSCollector) + require.True(t, ok) + hostnameProm, ok := hostname.promCollector.(*PrometheusCollector) + require.True(t, ok) + + require.Equal(t, promQuery, prom.query) + require.Equal(t, externalRPSQuery, hostnameProm.query) +} diff --git a/pkg/collector/fake_collector_test.go b/pkg/collector/fake_collector_test.go new file mode 100644 index 00000000..7b22719d --- /dev/null +++ b/pkg/collector/fake_collector_test.go @@ -0,0 +1,57 @@ +package collector + +import ( + "time" + + autoscalingv2 "k8s.io/api/autoscaling/v2" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/metrics/pkg/apis/custom_metrics" +) + +type FakeCollectorPlugin struct { + metrics []CollectedMetric + config map[string]string +} + +type FakeCollector struct { + metrics []CollectedMetric + interval time.Duration + stub func() ([]CollectedMetric, error) +} + +func (c *FakeCollector) GetMetrics() ([]CollectedMetric, error) { + if c.stub != nil { + v, err := c.stub() + return v, err + } + + return c.metrics, nil +} + +func (FakeCollector) Interval() time.Duration { + return time.Minute +} + +func (p *FakeCollectorPlugin) NewCollector( + hpa *autoscalingv2.HorizontalPodAutoscaler, + config *MetricConfig, + interval time.Duration, +) (Collector, error) { + + p.config = config.Config + return &FakeCollector{metrics: p.metrics, interval: interval}, nil +} + +func makePlugin(metric int) *FakeCollectorPlugin { + return &FakeCollectorPlugin{ + metrics: []CollectedMetric{ + { + Custom: custom_metrics.MetricValue{Value: *resource.NewQuantity(int64(metric), resource.DecimalSI)}, + }, + }, + } +} + +func makeCollectorWithStub(f func() ([]CollectedMetric, error)) *FakeCollector { + return &FakeCollector{stub: f} +} diff --git a/pkg/collector/skipper_collector_test.go b/pkg/collector/skipper_collector_test.go index 63bcbd15..62e14392 100644 --- a/pkg/collector/skipper_collector_test.go +++ b/pkg/collector/skipper_collector_test.go @@ -658,38 +658,3 @@ func makeConfig(resourceName, namespace, kind, backend string, fakedAverage bool } return config } - -type FakeCollectorPlugin struct { - metrics []CollectedMetric - config map[string]string -} - -type FakeCollector struct { - metrics []CollectedMetric -} - -func (c *FakeCollector) GetMetrics() ([]CollectedMetric, error) { - return c.metrics, nil -} - -func (FakeCollector) Interval() time.Duration { - return time.Minute -} - -func (p *FakeCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) { - if p.config != nil { - return nil, fmt.Errorf("config already assigned once: %v", p.config) - } - p.config = config.Config - return &FakeCollector{metrics: p.metrics}, nil -} - -func makePlugin(metric int) *FakeCollectorPlugin { - return &FakeCollectorPlugin{ - metrics: []CollectedMetric{ - { - Custom: custom_metrics.MetricValue{Value: *resource.NewQuantity(int64(metric), resource.DecimalSI)}, - }, - }, - } -} diff --git a/pkg/server/start.go b/pkg/server/start.go index 86388c51..f49bf384 100644 --- a/pkg/server/start.go +++ b/pkg/server/start.go @@ -65,6 +65,7 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command { MetricsAddress: ":7979", ZMONTokenName: "zmon", CredentialsDir: "/meta/credentials", + ExternalRPSMetricName: "skipper_serve_host_duration_seconds_count", } cmd := &cobra.Command{ @@ -132,6 +133,10 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command { flags.DurationVar(&o.DefaultScheduledScalingWindow, "scaling-schedule-default-scaling-window", 10*time.Minute, "Default rampup and rampdown window duration for ScalingSchedules") flags.IntVar(&o.RampSteps, "scaling-schedule-ramp-steps", 10, "Number of steps used to rampup and rampdown ScalingSchedules. It's used to guarantee won't avoid reaching the max scaling due to the 10% minimum change rule.") flags.StringVar(&o.DefaultTimeZone, "scaling-schedule-default-time-zone", "Europe/Berlin", "Default time zone to use for ScalingSchedules.") + flags.StringVar(&o.ExternalRPSMetricName, "external-rps-metric-name", o.ExternalRPSMetricName, ""+ + "The name of the metric that should be used to query prometheus for RPS per hostname.") + flags.BoolVar(&o.ExternalRPSMetrics, "external-rps-metrics", o.ExternalRPSMetrics, ""+ + "whether to enable external RPS metric collector or not") return cmd } @@ -218,6 +223,18 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct } } } + + // External RPS collector, like skipper's, depends on prometheus being enabled. + // Also, to enable hostname metric its necessary to pass the metric name that + // will be used. This was built this way so we can support hostname metrics to + // any ingress provider, e.g. Skipper, Nginx, envoy etc, in a simple way. + if o.ExternalRPSMetrics && o.ExternalRPSMetricName != "" { + externalRPSPlugin, err := collector.NewExternalRPSCollectorPlugin(promPlugin, o.ExternalRPSMetricName) + collectorFactory.RegisterExternalCollector([]string{collector.ExternalRPSMetricType}, externalRPSPlugin) + if err != nil { + return fmt.Errorf("failed to register hostname collector plugin: %v", err) + } + } } if o.InfluxDBAddress != "" { @@ -445,4 +462,8 @@ type AdapterServerOptions struct { RampSteps int // Default time zone to use for ScalingSchedules. DefaultTimeZone string + // Feature flag to enable external rps metric collector + ExternalRPSMetrics bool + // Name of the Prometheus metric that stores RPS by hostname for external RPS metrics. + ExternalRPSMetricName string }