Skip to content

Commit

Permalink
[WIP] Add hostname RPS metric collector
Browse files Browse the repository at this point in the history
Signed-off-by: Lucas Thiesen <[email protected]>
  • Loading branch information
lucastt committed Apr 26, 2023
1 parent 88b7d74 commit 0e9ade2
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 0 deletions.
93 changes: 93 additions & 0 deletions pkg/collector/hostname_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package collector

import (
"fmt"
"time"

autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
)

const (
HostnameMetricType = "hostname"
HostnameRPSQuery = `scalar(sum(rate(%s{host=~"%s"}[1m])))`
)

type HostnameCollectorPlugin struct {
metricName string
promPlugin CollectorPlugin
}

type HostnameCollector struct {
interval time.Duration
promCollector Collector
}

func NewHostnameCollectorPlugin(
promPlugin CollectorPlugin,
metricName string,
) (*HostnameCollectorPlugin, error) {
if metricName == "" {
return nil, fmt.Errorf("failed to initialize hostname collector plugin, metric name was not defined")
}

return &HostnameCollectorPlugin{
metricName: metricName,
promPlugin: promPlugin,
}, nil
}

func (p *HostnameCollectorPlugin) NewCollector(
hpa *autoscalingv2.HorizontalPodAutoscaler,
config *MetricConfig,
interval time.Duration,
) (Collector, error) {
// Need to copy config and add a promQL query in order to get
// RPS data from a specific hostname from prometheus. The idea
// of the copy is to not modify the original config struct.
confCopy := *config
hostname := config.Config["hostname"]

if hostname == "" {
return nil, fmt.Errorf("hostname not specified, unable to create collector")
}

confCopy.Config = map[string]string{
"query": fmt.Sprintf(HostnameRPSQuery, p.metricName, hostname),
}

c, err := p.promPlugin.NewCollector(hpa, &confCopy, interval)
if err != nil {
return nil, err
}

return &HostnameCollector{
interval: interval,
promCollector: c,
}, nil
}

// GetMetrics gets hostname metrics from Prometheus
func (c *HostnameCollector) GetMetrics() ([]CollectedMetric, error) {
v, err := c.promCollector.GetMetrics()
if err != nil {
return nil, err
}

if len(v) != 1 {
return nil, fmt.Errorf("expected to only get one metric value, got %d", len(v))
}

// TBD(Lucas):
// The explanation bellow is only true if we want to implement object metrics.
// I believe external metrics would suffice.
// Apparently in case of k8s <v1.14 the value is not average for replica
// In this case we need to calculate RPS per replica manually. Check skipper
// collector. Anyway I need to check wether in the hostname metric I want
// average RPS or total. I probably want average but still need to check...
return v, nil
}

// Interval returns the interval at which the collector should run.
func (c *HostnameCollector) Interval() time.Duration {
return c.interval
}
3 changes: 3 additions & 0 deletions pkg/collector/hostname_collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package collector

// TODO(Lucas)
17 changes: 17 additions & 0 deletions pkg/server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
MetricsAddress: ":7979",
ZMONTokenName: "zmon",
CredentialsDir: "/meta/credentials",
HostnameRPSMetricName: "skipper_serve_host_duration_seconds_count",
}

cmd := &cobra.Command{
Expand Down Expand Up @@ -132,6 +133,8 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
flags.DurationVar(&o.DefaultScheduledScalingWindow, "scaling-schedule-default-scaling-window", 10*time.Minute, "Default rampup and rampdown window duration for ScalingSchedules")
flags.IntVar(&o.RampSteps, "scaling-schedule-ramp-steps", 10, "Number of steps used to rampup and rampdown ScalingSchedules. It's used to guarantee won't avoid reaching the max scaling due to the 10% minimum change rule.")
flags.StringVar(&o.DefaultTimeZone, "scaling-schedule-default-time-zone", "Europe/Berlin", "Default time zone to use for ScalingSchedules.")
flags.StringVar(&o.HostnameRPSMetricName, "hostname-rps-metric-name", o.HostnameRPSMetricName, ""+
"The name of the metric that should be used to query prometheus for RPS per hostname.")
return cmd
}

Expand Down Expand Up @@ -218,6 +221,18 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
}
}
}

// Hostname collector, like skipper's, depends on prometheus being enabled.
// Also, to enable hostname metric its necessary to pass the metric name that
// will be used. This was built this way so we can support hostname metrics to
// any ingress provider, e.g. Skipper, Nginx, envoy etc, in a simple way.
if o.HostnameRPSMetricName != "" {
hostnamePlugin, err := collector.NewHostnameCollectorPlugin(promPlugin, o.HostnameRPSMetricName)
collectorFactory.RegisterExternalCollector([]string{collector.HostnameMetricType}, hostnamePlugin)
if err != nil {
return fmt.Errorf("failed to register hostname collector plugin: %v", err)
}
}
}

if o.InfluxDBAddress != "" {
Expand Down Expand Up @@ -445,4 +460,6 @@ type AdapterServerOptions struct {
RampSteps int
// Default time zone to use for ScalingSchedules.
DefaultTimeZone string
// Name of the Prometheus metric that stores RPS by hostname for Hostname RPS metrics.
HostnameRPSMetricName string
}

0 comments on commit 0e9ade2

Please sign in to comment.