From 18f101da0c8c5e370140e726e4085d0bbb1135a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan-Otto=20Kr=C3=B6pke?= Date: Wed, 3 Jul 2024 22:40:53 +0200 Subject: [PATCH] Added timespan MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jan-Otto Kröpke --- go.mod | 2 +- go.sum | 4 +- pkg/probe/request.go | 355 +++++++++++++++++++++++-------------------- pkg/probe/types.go | 4 +- 4 files changed, 198 insertions(+), 167 deletions(-) diff --git a/go.mod b/go.mod index ac93c37..f165162 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( ) require ( - github.com/Azure/azure-sdk-for-go/sdk/internal v1.9.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/internal v1.9.1 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30 // indirect github.com/beorn7/perks v1.0.1 // indirect diff --git a/go.sum b/go.sum index 1e3c254..579cfd1 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azcore v1.12.0 h1:1nGuui+4POelzDwI7RG56yfQ github.com/Azure/azure-sdk-for-go/sdk/azcore v1.12.0/go.mod h1:99EvauvlcJ1U06amZiksfYz/3aFGyIhWGHVyiZXtBAI= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 h1:tfLQ34V6F7tVSwoTf/4lH5sE0o6eCJuNDTmH09nDpbc= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0/go.mod h1:9kIvujWAA58nmPmWB1m23fyWic1kYZMxD9CxaWn4Qpg= -github.com/Azure/azure-sdk-for-go/sdk/internal v1.9.0 h1:H+U3Gk9zY56G3u872L82bk4thcsy2Gghb9ExT4Zvm1o= -github.com/Azure/azure-sdk-for-go/sdk/internal v1.9.0/go.mod h1:mgrmMSgaLp9hmax62XQTd0N4aAqSE5E0DulSpVYK7vc= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.9.1 h1:Xy/qV1DyOhhqsU/z0PyFMJfYCxnzna+vBEUtFW0ksQo= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.9.1/go.mod h1:oib6iWdC+sILvNUoJbbBn3xv7TXow7mEp/WRcsYvmow= github.com/Azure/azure-sdk-for-go/sdk/monitor/query/azmetrics v1.1.0 h1:X/C/tY3dxwsuFnSNArmTWKr0O6P59SRY6VsUcIkefEw= github.com/Azure/azure-sdk-for-go/sdk/monitor/query/azmetrics v1.1.0/go.mod h1:wCAGp7Xm35A5laB8z8yK9p/kU8OEBFuTvUm4eKCzr/M= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resourcegraph/armresourcegraph v0.9.0 h1:zLzoX5+W2l95UJoVwiyNS4dX8vHyQ6x2xRLoBBL9wMk= diff --git a/pkg/probe/request.go b/pkg/probe/request.go index 1b0681a..778cc5f 100644 --- a/pkg/probe/request.go +++ b/pkg/probe/request.go @@ -18,6 +18,8 @@ import ( "golang.org/x/exp/maps" ) +const maxResourcesPerQuery = 50 + func (r *Request) Describe(_ chan<- *prometheus.Desc) { // Return no descriptors to turn the collector into an unchecked collector. } @@ -66,7 +68,12 @@ func (r *Request) Collect(ch chan<- prometheus.Metric) { // The function's behavior depends on the implementation of the queryResources method and the configuration of the cache. func (r *Request) getResources(ctx context.Context) (*Resources, error) { if r.config.QueryCacheCacheExpiration == 0 { - return r.queryResources(ctx) + resources, err := r.queryResources(ctx) + if err != nil { + return nil, fmt.Errorf("error querying resources: %w", err) + } + + return resources, nil } subscriptions := r.probe.subscriptions @@ -85,7 +92,7 @@ func (r *Request) getResources(ctx context.Context) (*Resources, error) { resources, err := r.queryResources(ctx) if err != nil { - return nil, err + return nil, fmt.Errorf("error querying resources: %w", err) } r.probe.queryCache.Set(cacheKey, resources, r.config.QueryCacheCacheExpiration) @@ -100,7 +107,13 @@ func (r *Request) queryResources(ctx context.Context) (*Resources, error) { var ( err error skipToken string - response armresourcegraph.ClientResourcesResponse + response []any + + resultRow map[string]any + subscriptionID string + location string + labelValue string + resourceID string ) resources := Resources{ @@ -113,82 +126,46 @@ func (r *Request) queryResources(ctx context.Context) (*Resources, error) { subscriptions = r.config.Subscriptions } - for { - query := fmt.Sprintf("%s\n| where type == '%s' \n| project-keep id, subscriptionId, location, label_*", - r.config.Query, strings.ToLower(r.config.ResourceType), - ) + query := fmt.Sprintf("%s\n| where type == '%s' \n| project-keep id, subscriptionId, location, label_*", + r.config.Query, strings.ToLower(r.config.ResourceType), + ) - response, err = r.probe.resourceGraphClient.Resources(ctx, armresourcegraph.QueryRequest{ - Options: &armresourcegraph.QueryRequestOptions{ - ResultFormat: to.Ptr(armresourcegraph.ResultFormatObjectArray), - SkipToken: to.Ptr(skipToken), - }, - Query: &query, - Subscriptions: to.SliceOfPtrs(subscriptions...), - }, nil) + for { + response, skipToken, err = r.resourceGraphQuery(ctx, query, subscriptions, skipToken) if err != nil { - return nil, fmt.Errorf("error querying resource graph '%q': %w", query, err) - } - - if response.ResultTruncated == nil || response.Data == nil || response.Count == nil { - return nil, errors.New("error querying resource graph: unexpected response") + return nil, err } - if *response.ResultTruncated == armresourcegraph.ResultTruncatedTrue { - _ = level.Warn(r).Log("msg", "Result truncated", "query", query) - } - - if *response.Count == 0 { - return nil, errors.New("error querying resource graph: no rows returned") - } - - rows, ok := response.Data.([]any) + firstRow, ok := response[0].(map[string]any) if !ok { - return nil, fmt.Errorf("error querying resource graph: unexpected type: %+v", response.Data) - } - - if len(rows) == 0 { - return nil, errors.New("error querying resource graph: no rows returned") - } - - row, ok := rows[0].(map[string]any) - if !ok { - return nil, fmt.Errorf("error querying resource graph: unexpected type: %+v", rows[0]) + return nil, fmt.Errorf("unexpected type: %+v", response[0]) } for _, field := range []string{"subscriptionId", "location", "id"} { - if _, ok = row[field]; !ok { - return nil, fmt.Errorf("error querying resource graph: missing field %s. Available fields: %v", field, maps.Keys(row)) + if _, ok = firstRow[field]; !ok { + return nil, fmt.Errorf("missing field %s. Available fields: %v", field, maps.Keys(firstRow)) } } - var ( - resultRow map[string]any - subscriptionID string - location string - labelValue string - resourceID string - ) - - for _, row := range rows { + for _, row := range response { resultRow, ok = row.(map[string]any) if !ok { - return nil, fmt.Errorf("error querying resource graph: unexpected row type: %+v", row) + return nil, fmt.Errorf("unexpected row type: %+v", row) } subscriptionID, ok = resultRow["subscriptionId"].(string) if !ok { - return nil, fmt.Errorf("error querying resource graph: unexpected subscriptionId type: %+v", rows[0]) + return nil, fmt.Errorf("unexpected subscriptionId type: %+v", resultRow["subscriptionId"]) } location, ok = resultRow["location"].(string) if !ok { - return nil, fmt.Errorf("error querying resource graph: unexpected location type: %+v", rows[0]) + return nil, fmt.Errorf("unexpected location type: %+v", resultRow["location"]) } resourceID, ok = resultRow["id"].(string) if !ok { - return nil, fmt.Errorf("error querying resource graph: unexpected id type: %+v", rows[0]) + return nil, fmt.Errorf("unexpected id type: %+v", resultRow["id"]) } if _, ok = resources.Resources[location]; !ok { @@ -196,7 +173,7 @@ func (r *Request) queryResources(ctx context.Context) (*Resources, error) { } if _, ok = resources.Resources[location][subscriptionID]; !ok { - resources.Resources[location][subscriptionID] = make([]string, 0, len(rows)) + resources.Resources[location][subscriptionID] = make([]string, 0, len(response)) } if len(resultRow)-3 > 0 { @@ -206,7 +183,7 @@ func (r *Request) queryResources(ctx context.Context) (*Resources, error) { if strings.HasPrefix(key, "label_") { labelValue, ok = value.(string) if !ok { - return nil, fmt.Errorf("error querying resource graph: unexpected id type: %+v", rows[0]) + return nil, fmt.Errorf("error querying resource graph: unexpected id type: %+v", value) } resources.AdditionalLabels[resourceID][key[6:]] = labelValue @@ -220,148 +197,200 @@ func (r *Request) queryResources(ctx context.Context) (*Resources, error) { ) } - if response.SkipToken == nil || *response.SkipToken == "" { + if skipToken == "" { break } + } + + return &resources, nil +} + +func (r *Request) resourceGraphQuery(ctx context.Context, query string, subscriptions []string, skipToken string) ([]any, string, error) { + response, err := r.probe.resourceGraphClient.Resources(ctx, armresourcegraph.QueryRequest{ + Options: &armresourcegraph.QueryRequestOptions{ + ResultFormat: to.Ptr(armresourcegraph.ResultFormatObjectArray), + SkipToken: to.Ptr(skipToken), + }, + Query: &query, + Subscriptions: to.SliceOfPtrs(subscriptions...), + }, nil) + if err != nil { + return nil, "", fmt.Errorf("error querying resource graph '%q': %w", query, err) + } + + if response.ResultTruncated == nil || response.Data == nil || response.Count == nil { + return nil, "", errors.New("error querying resource graph: unexpected response") + } + + if *response.ResultTruncated == armresourcegraph.ResultTruncatedTrue { + _ = level.Warn(r).Log("msg", "Result truncated", "query", query) + } + if *response.Count == 0 { + return nil, "", errors.New("error querying resource graph: no rows returned") + } + + rows, ok := response.Data.([]any) + if !ok { + return nil, "", fmt.Errorf("error querying resource graph: unexpected type: %+v", response.Data) + } + + if len(rows) == 0 { + return nil, "", errors.New("error querying resource graph: no rows returned") + } + + skipToken = "" + if response.SkipToken != nil { skipToken = *response.SkipToken } - return &resources, nil + return rows, skipToken, nil } // fetchMetrics fetches metrics for the resources. -// -//nolint:gocognit,cyclop func (r *Request) fetchMetrics(ctx context.Context, resources *Resources, ch chan<- prometheus.Metric) error { - var ( - client *azmetrics.Client - err error - resp azmetrics.QueryResourcesResponse - ) - if resources == nil { return errors.New("resources is nil") } for location, subscriptions := range resources.Resources { - client, err = r.probe.getMetricsClient(location) - if err != nil { - return fmt.Errorf("error get metrics client: %w", err) + for subscriptionID, resourceIDs := range subscriptions { + if err := r.fetchMetricsPerSubscription(ctx, location, subscriptionID, resourceIDs, resources.AdditionalLabels, ch); err != nil { + return err + } } + } - for subscriptionID, resourceIDs := range subscriptions { - for { - maxResourceIDs := 50 - if len(resourceIDs) < maxResourceIDs { - maxResourceIDs = len(resourceIDs) - } + return nil +} - requestResourceIDs := resourceIDs[:maxResourceIDs] - resourceIDs = resourceIDs[maxResourceIDs:] +//nolint:gocognit,cyclop +func (r *Request) fetchMetricsPerSubscription(ctx context.Context, location, subscriptionID string, resourceIDs []string, + additionalLabels AdditionalLabels, ch chan<- prometheus.Metric, +) error { + client, err := r.probe.getMetricsClient(location) + if err != nil { + return fmt.Errorf("error get metrics client: %w", err) + } - metricNamespace := r.config.ResourceType - if r.config.MetricNamespace != "" { - metricNamespace = r.config.MetricNamespace - } + for { + maxResourceIDs := maxResourcesPerQuery + if len(resourceIDs) < maxResourceIDs { + maxResourceIDs = len(resourceIDs) + } - resp, err = client.QueryResources( - ctx, - subscriptionID, - metricNamespace, - r.config.MetricNames, - azmetrics.ResourceIDList{ResourceIDs: requestResourceIDs}, - &r.config.QueryResourcesOptions, - ) - if err != nil { - var azErr *azcore.ResponseError - if errors.As(err, &azErr) { - return fmt.Errorf("error querying metrics: %w", azErr) - } + requestResourceIDs := resourceIDs[:maxResourceIDs] + resourceIDs = resourceIDs[maxResourceIDs:] - return fmt.Errorf("error querying metrics: %w", err) - } + metricNamespace := r.config.ResourceType + if r.config.MetricNamespace != "" { + metricNamespace = r.config.MetricNamespace + } - var ( - latestTimestamp time.Time - latestMetric map[string]*float64 - ) + resp, err := client.QueryResources( + ctx, + subscriptionID, + metricNamespace, + r.config.MetricNames, + azmetrics.ResourceIDList{ResourceIDs: requestResourceIDs}, + &r.config.QueryResourcesOptions, + ) + if err != nil { + var azErr *azcore.ResponseError + if errors.As(err, &azErr) { + return fmt.Errorf("error querying metrics: %w", azErr) + } - for _, metric := range resp.Values { - prometheusMetricNamespace := "azure_monitor_" + strings.ReplaceAll(strings.ReplaceAll(strings.ToLower(*metric.Namespace), ".", "_"), "/", "_") + return fmt.Errorf("error querying metrics: %w", err) + } - prometheusLabels := map[string]string{ - "subscription_id": subscriptionID, - "region": *metric.ResourceRegion, - "instance": *metric.ResourceID, - } + var ( + latestTimestamp time.Time + latestMetric map[string]*float64 + ) - for labelKey, labelValue := range resources.AdditionalLabels[*metric.ResourceID] { - prometheusLabels[labelKey] = labelValue - } + for _, metric := range resp.Values { + prometheusMetricNamespace := "azure_monitor_" + strings.ReplaceAll(strings.ReplaceAll(strings.ToLower(*metric.Namespace), ".", "_"), "/", "_") - latestTimestamp = time.Time{} - latestMetric = map[string]*float64{ - "total": nil, - "average": nil, - "count": nil, - "minimum": nil, - "maximum": nil, - } + prometheusLabels := map[string]string{ + "subscription_id": subscriptionID, + "region": *metric.ResourceRegion, + "instance": *metric.ResourceID, + } - for _, metricValue := range metric.Values { - for _, metricTimeSeries := range metricValue.TimeSeries { - if len(metricTimeSeries.Data) == 0 { - continue - } - - for _, label := range metricTimeSeries.MetadataValues { - prometheusLabels[*label.Name.Value] = *label.Value - } - - for _, data := range metricTimeSeries.Data { - if data.TimeStamp.After(latestTimestamp) { - latestTimestamp = *data.TimeStamp - latestMetric["total"] = data.Total - latestMetric["average"] = data.Average - latestMetric["count"] = data.Count - latestMetric["minimum"] = data.Minimum - latestMetric["maximum"] = data.Maximum - } - } - } + for labelKey, labelValue := range additionalLabels[*metric.ResourceID] { + prometheusLabels[labelKey] = labelValue + } - for metricType, value := range latestMetric { - if value == nil { - continue - } - - ch <- prometheus.MustNewConstMetric( - prometheus.NewDesc( - prometheus.BuildFQName( - prometheusMetricNamespace, - strings.ReplaceAll(strings.ToLower(*metricValue.Name.Value), " ", ""), - fmt.Sprintf("%s_%s", - metricType, - strings.ToLower(string(*metricValue.Unit)), - ), - ), - fmt.Sprintf("%s: %s", *metricValue.Name.LocalizedValue, *metricValue.DisplayDescription), - nil, - prometheusLabels, - ), - prometheus.GaugeValue, - *value, - ) + latestTimestamp = time.Time{} + latestMetric = map[string]*float64{ + "total": nil, + "average": nil, + "count": nil, + "minimum": nil, + "maximum": nil, + } + + for _, metricValue := range metric.Values { + if metricValue.ErrorCode != nil && *metricValue.ErrorCode != "Success" { + _ = level.Warn(r).Log( + "msg", "Error querying metric", + "err", fmt.Sprintf("%s: %s", *metricValue.ErrorCode, *metricValue.ErrorMessage), + "resource_id", *metric.ResourceID, + ) + continue + } + + if len(metricValue.TimeSeries) == 0 { + continue + } + + for _, label := range metricValue.TimeSeries[0].MetadataValues { + prometheusLabels[*label.Name.Value] = *label.Value + } + + for _, metricTimeSeries := range metricValue.TimeSeries { + for _, data := range metricTimeSeries.Data { + if data.TimeStamp.After(latestTimestamp) { + latestTimestamp = *data.TimeStamp + latestMetric["total"] = data.Total + latestMetric["average"] = data.Average + latestMetric["count"] = data.Count + latestMetric["minimum"] = data.Minimum + latestMetric["maximum"] = data.Maximum } } } - if len(resourceIDs) == 0 { - break + for metricType, value := range latestMetric { + if value == nil { + continue + } + + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName( + prometheusMetricNamespace, + strings.ReplaceAll(strings.ToLower(*metricValue.Name.Value), " ", ""), + fmt.Sprintf("%s_%s", + metricType, + strings.ToLower(string(*metricValue.Unit)), + ), + ), + fmt.Sprintf("%s: %s", *metricValue.Name.LocalizedValue, *metricValue.DisplayDescription), + nil, + prometheusLabels, + ), + prometheus.GaugeValue, + *value, + ) } } } + + if len(resourceIDs) == 0 { + break + } } return nil diff --git a/pkg/probe/types.go b/pkg/probe/types.go index 284018f..3058786 100644 --- a/pkg/probe/types.go +++ b/pkg/probe/types.go @@ -38,9 +38,11 @@ type Request struct { type Resources struct { Resources map[string]map[string][]string - AdditionalLabels map[string]map[string]string + AdditionalLabels AdditionalLabels } +type AdditionalLabels map[string]map[string]string + type Config struct { Subscriptions []string ResourceType string