Skip to content

Commit

Permalink
Merge pull request #146 from john-delivuk/em-isolation
Browse files Browse the repository at this point in the history
External Metrics contd
  • Loading branch information
s-urbaniak authored Apr 3, 2019
2 parents 5de0247 + fa27078 commit 680e404
Show file tree
Hide file tree
Showing 48 changed files with 9,885 additions and 238 deletions.
232 changes: 41 additions & 191 deletions Gopkg.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,7 @@ version="v1.4.7"
[prune]
go-tests = true
unused-packages = true

[[constraint]]
name = "github.com/stretchr/testify"
version = "1.3.0"
39 changes: 38 additions & 1 deletion cmd/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
mprom "github.com/directxman12/k8s-prometheus-adapter/pkg/client/metrics"
adaptercfg "github.com/directxman12/k8s-prometheus-adapter/pkg/config"
cmprov "github.com/directxman12/k8s-prometheus-adapter/pkg/custom-provider"
extprov "github.com/directxman12/k8s-prometheus-adapter/pkg/external-provider"
"github.com/directxman12/k8s-prometheus-adapter/pkg/naming"
resprov "github.com/directxman12/k8s-prometheus-adapter/pkg/resourceprovider"
)

Expand Down Expand Up @@ -159,7 +161,7 @@ func (cmd *PrometheusAdapter) makeProvider(promClient prom.Client, stopCh <-chan
}

// extract the namers
namers, err := cmprov.NamersFromConfig(cmd.metricsConfig, mapper)
namers, err := naming.NamersFromConfig(cmd.metricsConfig.Rules, mapper)
if err != nil {
return nil, fmt.Errorf("unable to construct naming scheme from metrics rules: %v", err)
}
Expand All @@ -171,6 +173,30 @@ func (cmd *PrometheusAdapter) makeProvider(promClient prom.Client, stopCh <-chan
return cmProvider, nil
}

func (cmd *PrometheusAdapter) makeExternalProvider(promClient prom.Client, stopCh <-chan struct{}) (provider.ExternalMetricsProvider, error) {
if len(cmd.metricsConfig.ExternalRules) == 0 {
return nil, nil
}

// grab the mapper
mapper, err := cmd.RESTMapper()
if err != nil {
return nil, fmt.Errorf("unable to construct RESTMapper: %v", err)
}

// extract the namers
namers, err := naming.NamersFromConfig(cmd.metricsConfig.ExternalRules, mapper)
if err != nil {
return nil, fmt.Errorf("unable to construct naming scheme from metrics rules: %v", err)
}

// construct the provider and start it
emProvider, runner := extprov.NewExternalPrometheusProvider(promClient, namers, cmd.MetricsRelistInterval)
runner.RunUntil(stopCh)

return emProvider, nil
}

func (cmd *PrometheusAdapter) addResourceMetricsAPI(promClient prom.Client) error {
if cmd.metricsConfig.ResourceRules == nil {
// bail if we don't have rules for setting things up
Expand Down Expand Up @@ -247,6 +273,17 @@ func main() {
cmd.WithCustomMetrics(cmProvider)
}

// construct the external provider
emProvider, err := cmd.makeExternalProvider(promClient, wait.NeverStop)
if err != nil {
glog.Fatalf("unable to construct external metrics provider: %v", err)
}

// attach the provider to the server, if it's needed
if emProvider != nil {
cmd.WithExternalMetrics(emProvider)
}

// attach resource metrics support, if it's needed
if err := cmd.addResourceMetricsAPI(promClient); err != nil {
glog.Fatalf("unable to install resource metrics API: %v", err)
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type MetricsDiscoveryConfig struct {
// will make only a single API call.
Rules []DiscoveryRule `yaml:"rules"`
ResourceRules *ResourceRules `yaml:"resourceRules,omitempty"`
ExternalRules []DiscoveryRule `yaml:"externalRules,omitempty"`
}

// DiscoveryRule describes a set of rules for transforming Prometheus metrics to/from
Expand Down
5 changes: 3 additions & 2 deletions pkg/custom-provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"k8s.io/metrics/pkg/apis/custom_metrics"

prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/directxman12/k8s-prometheus-adapter/pkg/naming"
)

// Runnable represents something that can be run until told to stop.
Expand All @@ -55,7 +56,7 @@ type prometheusProvider struct {
SeriesRegistry
}

func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []MetricNamer, updateInterval time.Duration, maxAge time.Duration) (provider.CustomMetricsProvider, Runnable) {
func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []naming.MetricNamer, updateInterval time.Duration, maxAge time.Duration) (provider.CustomMetricsProvider, Runnable) {
lister := &cachingMetricsLister{
updateInterval: updateInterval,
maxAge: maxAge,
Expand Down Expand Up @@ -193,7 +194,7 @@ type cachingMetricsLister struct {
promClient prom.Client
updateInterval time.Duration
maxAge time.Duration
namers []MetricNamer
namers []naming.MetricNamer
}

func (l *cachingMetricsLister) Run() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/custom-provider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
config "github.com/directxman12/k8s-prometheus-adapter/cmd/config-gen/utils"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
fakeprom "github.com/directxman12/k8s-prometheus-adapter/pkg/client/fake"
"github.com/directxman12/k8s-prometheus-adapter/pkg/naming"
pmodel "github.com/prometheus/common/model"
)

Expand All @@ -39,7 +40,7 @@ func setupPrometheusProvider() (provider.CustomMetricsProvider, *fakeprom.FakePr
fakeKubeClient := &fakedyn.FakeDynamicClient{}

cfg := config.DefaultConfig(1*time.Minute, "")
namers, err := NamersFromConfig(cfg, restMapper())
namers, err := naming.NamersFromConfig(cfg.Rules, restMapper())
Expect(err).NotTo(HaveOccurred())

prov, _ := NewPrometheusProvider(restMapper(), fakeKubeClient, fakeProm, namers, fakeProviderUpdateInterval, fakeProviderStartDuration)
Expand Down
11 changes: 6 additions & 5 deletions pkg/custom-provider/series_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
apimeta "k8s.io/apimachinery/pkg/api/meta"

prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/directxman12/k8s-prometheus-adapter/pkg/naming"
"github.com/golang/glog"
pmodel "github.com/prometheus/common/model"
)
Expand All @@ -45,7 +46,7 @@ const (
type SeriesRegistry interface {
// SetSeries replaces the known series in this registry.
// Each slice in series should correspond to a MetricNamer in namers.
SetSeries(series [][]prom.Series, namers []MetricNamer) error
SetSeries(series [][]prom.Series, namers []naming.MetricNamer) error
// ListAllMetrics lists all metrics known to this registry
ListAllMetrics() []provider.CustomMetricInfo
// SeriesForMetric looks up the minimum required series information to make a query for the given metric
Expand All @@ -60,7 +61,7 @@ type seriesInfo struct {
seriesName string

// namer is the MetricNamer used to name this series
namer MetricNamer
namer naming.MetricNamer
}

// overridableSeriesRegistry is a basic SeriesRegistry
Expand All @@ -75,7 +76,7 @@ type basicSeriesRegistry struct {
mapper apimeta.RESTMapper
}

func (r *basicSeriesRegistry) SetSeries(newSeriesSlices [][]prom.Series, namers []MetricNamer) error {
func (r *basicSeriesRegistry) SetSeries(newSeriesSlices [][]prom.Series, namers []naming.MetricNamer) error {
if len(newSeriesSlices) != len(namers) {
return fmt.Errorf("need one set of series per namer")
}
Expand All @@ -98,8 +99,8 @@ func (r *basicSeriesRegistry) SetSeries(newSeriesSlices [][]prom.Series, namers
Metric: name,
}

// namespace metrics aren't counted as namespaced
if resource == nsGroupResource || resource == nodeGroupResource || resource == pvGroupResource {
// some metrics aren't counted as namespaced
if resource == naming.NsGroupResource || resource == naming.NodeGroupResource || resource == naming.PVGroupResource {
info.Namespaced = false
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/custom-provider/series_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

config "github.com/directxman12/k8s-prometheus-adapter/cmd/config-gen/utils"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/directxman12/k8s-prometheus-adapter/pkg/naming"
)

// restMapper creates a RESTMapper with just the types we need for
Expand All @@ -50,9 +51,9 @@ func restMapper() apimeta.RESTMapper {
return mapper
}

func setupMetricNamer() []MetricNamer {
func setupMetricNamer() []naming.MetricNamer {
cfg := config.DefaultConfig(1*time.Minute, "kube_")
namers, err := NamersFromConfig(cfg, restMapper())
namers, err := naming.NamersFromConfig(cfg.Rules, restMapper())
Expect(err).NotTo(HaveOccurred())
return namers
}
Expand Down
166 changes: 166 additions & 0 deletions pkg/external-provider/basic_metric_lister.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provider

import (
"context"
"fmt"
"time"

"github.com/golang/glog"
pmodel "github.com/prometheus/common/model"

prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/directxman12/k8s-prometheus-adapter/pkg/naming"
)

// Runnable represents something that can be run until told to stop.
type Runnable interface {
// Run runs the runnable forever.
Run()
// RunUntil runs the runnable until the given channel is closed.
RunUntil(stopChan <-chan struct{})
}

// A MetricLister provides a window into all of the metrics that are available within a given
// Prometheus instance, classified as either Custom or External metrics, but presented generically
// so that it can manage both types simultaneously.
type MetricLister interface {
ListAllMetrics() (MetricUpdateResult, error)
}

// A MetricListerWithNotification is a MetricLister that has the ability to notify listeners
// when new metric data is available.
type MetricListerWithNotification interface {
MetricLister
Runnable

// AddNotificationReceiver registers a callback to be invoked when new metric data is available.
AddNotificationReceiver(MetricUpdateCallback)
// UpdateNow forces an immediate refresh from the source data. Primarily for test purposes.
UpdateNow()
}

type basicMetricLister struct {
promClient prom.Client
namers []naming.MetricNamer
lookback time.Duration
}

// NewBasicMetricLister creates a MetricLister that is capable of interactly directly with Prometheus to list metrics.
func NewBasicMetricLister(promClient prom.Client, namers []naming.MetricNamer, lookback time.Duration) MetricLister {
lister := basicMetricLister{
promClient: promClient,
namers: namers,
lookback: lookback,
}

return &lister
}

type selectorSeries struct {
selector prom.Selector
series []prom.Series
}

func (l *basicMetricLister) ListAllMetrics() (MetricUpdateResult, error) {
result := MetricUpdateResult{
series: make([][]prom.Series, 0),
namers: make([]naming.MetricNamer, 0),
}

startTime := pmodel.Now().Add(-1 * l.lookback)

// these can take a while on large clusters, so launch in parallel
// and don't duplicate
selectors := make(map[prom.Selector]struct{})
selectorSeriesChan := make(chan selectorSeries, len(l.namers))
errs := make(chan error, len(l.namers))
for _, converter := range l.namers {
sel := converter.Selector()
if _, ok := selectors[sel]; ok {
errs <- nil
selectorSeriesChan <- selectorSeries{}
continue
}
selectors[sel] = struct{}{}
go func() {
series, err := l.promClient.Series(context.TODO(), pmodel.Interval{startTime, 0}, sel)
if err != nil {
errs <- fmt.Errorf("unable to fetch metrics for query %q: %v", sel, err)
return
}
errs <- nil
// Push into the channel: "this selector produced these series"
selectorSeriesChan <- selectorSeries{
selector: sel,
series: series,
}
}()
}

// don't do duplicate queries when it's just the matchers that change
seriesCacheByQuery := make(map[prom.Selector][]prom.Series)

// iterate through, blocking until we've got all results
// We know that, from above, we should have pushed one item into the channel
// for each converter. So here, we'll assume that we should receive one item per converter.
for range l.namers {
if err := <-errs; err != nil {
return result, fmt.Errorf("unable to update list of all metrics: %v", err)
}
// Receive from the channel: "this selector produced these series"
// We stuff that into this map so that we can collect the data as it arrives
// and then, once we've received it all, we can process it below.
if ss := <-selectorSeriesChan; ss.series != nil {
seriesCacheByQuery[ss.selector] = ss.series
}
}
close(errs)

// Now that we've collected all of the results into `seriesCacheByQuery`
// we can start processing them.
newSeries := make([][]prom.Series, len(l.namers))
for i, namer := range l.namers {
series, cached := seriesCacheByQuery[namer.Selector()]
if !cached {
return result, fmt.Errorf("unable to update list of all metrics: no metrics retrieved for query %q", namer.Selector())
}
// Because converters provide a "post-filtering" option, it's not enough to
// simply take all the series that were produced. We need to further filter them.
newSeries[i] = namer.FilterSeries(series)
}

glog.V(10).Infof("Set available metric list from Prometheus to: %v", newSeries)

result.series = newSeries
result.namers = l.namers
return result, nil
}

// MetricUpdateResult represents the output of a periodic inspection of metrics found to be
// available in Prometheus.
// It includes both the series data the Prometheus exposed, as well as the configurational
// object that led to their discovery.
type MetricUpdateResult struct {
series [][]prom.Series
namers []naming.MetricNamer
}

// MetricUpdateCallback is a function signature for receiving periodic updates about
// available metrics.
type MetricUpdateCallback func(MetricUpdateResult)
Loading

0 comments on commit 680e404

Please sign in to comment.