diff --git a/config/collector.go b/config/collector.go index 043ed25..ed45b03 100644 --- a/config/collector.go +++ b/config/collector.go @@ -213,3 +213,31 @@ func (c *GrafanaCollector) Verify() error { } return nil } + +type TSDBCollector struct { + Enable bool `yaml:"enable"` + Period model.Duration `yaml:"period,omitempty"` + Limit int `yaml:"limit,omitempty"` + HTTPClient HTTPClient `yaml:"prometheus_client"` + PublicURL *common.URL `yaml:"public_url,omitempty"` +} + +func (c *TSDBCollector) Verify() error { + if !c.Enable { + return nil + } + + if c.Period <= 0 { + c.Period = model.Duration(defaultMetricCollectorPeriodDuration) + } + + if c.Limit <= 0 { + c.Limit = 10 + } + + if c.HTTPClient.URL == nil { + return fmt.Errorf("missing Rest URL for the TSDB collector") + } + + return nil +} diff --git a/config/config.go b/config/config.go index 57eebe4..5aa5a34 100644 --- a/config/config.go +++ b/config/config.go @@ -61,6 +61,7 @@ type Config struct { LabelsCollectors []*LabelsCollector `yaml:"labels_collectors,omitempty"` PersesCollector PersesCollector `yaml:"perses_collector,omitempty"` GrafanaCollector GrafanaCollector `yaml:"grafana_collector,omitempty"` + TSDBCollectors []TSDBCollector `yaml:"tsdb_collectors,omitempty"` } func Resolve(configFile string) (Config, error) { diff --git a/database/database.go b/database/database.go index bb73bc5..85fbbcf 100644 --- a/database/database.go +++ b/database/database.go @@ -41,6 +41,7 @@ type Database interface { EnqueuePartialMetricsUsage(usages map[string]*v1.MetricUsage) EnqueueUsage(usages map[string]*v1.MetricUsage) EnqueueLabels(labels map[string][]string) + EnqueueMetricStatistics(map[string]*v1.MetricStatistics) } func New(cfg config.Database) Database { @@ -54,6 +55,7 @@ func New(cfg config.Database) Database { partialMetricsUsageQueue: make(chan map[string]*v1.MetricUsage, 250), labelsQueue: make(chan map[string][]string, 250), metricsQueue: make(chan []string, 10), + metricStatisticsQueue: make(chan map[string]*v1.MetricStatistics, 250), path: cfg.Path, } @@ -61,6 +63,7 @@ func New(cfg config.Database) Database { go d.watchMetricsQueue() go d.watchPartialMetricsUsageQueue() go d.watchLabelsQueue() + go d.watchMetricStatisticsQueue() if !*cfg.InMemory { if err := d.readMetricsInJSONFile(); err != nil { logrus.WithError(err).Warning("failed to read metrics file") @@ -71,7 +74,6 @@ func New(cfg config.Database) Database { } type db struct { - Database // metrics is the list of metric name (as a key) associated with their usage based on the different collector activated. // This struct is our "database". metrics map[string]*v1.Metric @@ -89,6 +91,9 @@ type db struct { // metricsQueue is the channel that should be used to send and receive the list of metric name to keep in memory. // Based on this list, we will then collect their usage. metricsQueue chan []string + // metricStatisticsQueue is the channel that should be used to send and receive the list of metric statistics to keep in memory. + // Based on this list, we will then collect their usage. + metricStatisticsQueue chan map[string]*v1.MetricStatistics // labelsQueue is the way to send the labels per metric to write in the database. // There will be no other way to write in it. // Doing that allows us to accept more HTTP requests to write data and to delay the actual writing. @@ -147,6 +152,10 @@ func (d *db) EnqueueMetricList(metrics []string) { d.metricsQueue <- metrics } +func (d *db) EnqueueMetricStatistics(stats map[string]*v1.MetricStatistics) { + d.metricStatisticsQueue <- stats +} + func (d *db) ListPendingUsage() map[string]*v1.MetricUsage { d.metricsMutex.Lock() defer d.metricsMutex.Unlock() @@ -209,6 +218,25 @@ func (d *db) watchMetricsQueue() { } } +// watchMetricStatisticsQueue is the way to store the metric statistics in the database. +func (d *db) watchMetricStatisticsQueue() { + for stats := range d.metricStatisticsQueue { + for metricName := range stats { + d.metricsMutex.Lock() + if _, ok := d.metrics[metricName]; !ok { + logrus.Debugf("metric_name %q is used but it's not found by the metric collector", metricName) + // Since the metric_name is not known yet, we need to buffer it. + // In a later stage, if the metric is received/known, + // we will then use this buffer to populate the usage of the metric. + d.metrics[metricName] = &v1.Metric{} + } + + d.metrics[metricName].Statistics = stats[metricName] + d.metricsMutex.Unlock() + } + } +} + func (d *db) watchPartialMetricsUsageQueue() { for data := range d.partialMetricsUsageQueue { d.partialMetricsUsageMutex.Lock() diff --git a/main.go b/main.go index 47c3a3c..8a3ea28 100644 --- a/main.go +++ b/main.go @@ -25,6 +25,7 @@ import ( "github.com/perses/metrics-usage/source/metric" "github.com/perses/metrics-usage/source/perses" "github.com/perses/metrics-usage/source/rules" + "github.com/perses/metrics-usage/source/tsdb" "github.com/sirupsen/logrus" ) @@ -71,6 +72,16 @@ func main() { } } + for i, tsdbCollectorConfig := range conf.TSDBCollectors { + if tsdbCollectorConfig.Enable { + tsdbCollector, collectorErr := tsdb.NewCollector(db, tsdbCollectorConfig) + if collectorErr != nil { + logrus.WithError(collectorErr).Fatalf("unable to create the TSDB collector number %d", i) + } + runner.WithTimerTasks(time.Duration(tsdbCollectorConfig.Period), tsdbCollector) + } + } + if conf.PersesCollector.Enable { persesCollectorConfig := conf.PersesCollector persesCollector, collectorErr := perses.NewCollector(db, persesCollectorConfig) diff --git a/pkg/api/v1/metric_usage.go b/pkg/api/v1/metric_usage.go index bda6154..90658b1 100644 --- a/pkg/api/v1/metric_usage.go +++ b/pkg/api/v1/metric_usage.go @@ -142,6 +142,17 @@ func buildKey(v reflect.Value) string { return key.String() } +type MetricStatistics struct { + Period uint64 `json:"period"` + SeriesCount uint64 `json:"series_count"` + LabelValueCountByLabelName []LabelCount `json:"label_value_count_by_label_name,omitempty"` +} + +type LabelCount struct { + Name string `json:"name"` + Value uint64 `json:"value"` +} + type RuleUsage struct { PromLink string `json:"prom_link"` GroupName string `json:"group_name"` @@ -176,8 +187,9 @@ func MergeUsage(old, new *MetricUsage) *MetricUsage { } type Metric struct { - Labels Set[string] `json:"labels,omitempty"` - Usage *MetricUsage `json:"usage,omitempty"` + Labels Set[string] `json:"labels,omitempty"` + Statistics *MetricStatistics `json:"statistics,omitempty"` + Usage *MetricUsage `json:"usage,omitempty"` } type PartialMetric struct { diff --git a/source/tsdb/tsdb.go b/source/tsdb/tsdb.go new file mode 100644 index 0000000..8026517 --- /dev/null +++ b/source/tsdb/tsdb.go @@ -0,0 +1,112 @@ +// Copyright 2025 The Perses Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "context" + "fmt" + "time" + + "github.com/perses/common/async" + "github.com/perses/metrics-usage/config" + "github.com/perses/metrics-usage/database" + modelAPIV1 "github.com/perses/metrics-usage/pkg/api/v1" + promUtils "github.com/perses/metrics-usage/utils/prometheus" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + "github.com/sirupsen/logrus" +) + +func NewCollector(db database.Database, cfg config.TSDBCollector) (async.SimpleTask, error) { + promClient, err := promUtils.NewClient(cfg.HTTPClient) + if err != nil { + return nil, err + } + + logger := logrus.StandardLogger().WithField("collector", "tsdb") + url := cfg.HTTPClient.URL.URL + if cfg.PublicURL != nil { + url = cfg.PublicURL.URL + } + + return &tsdbCollector{ + db: db, + promURL: url.String(), + promClient: promClient, + logger: logger, + period: cfg.Period, + limit: cfg.Limit, + }, nil +} + +type tsdbCollector struct { + async.SimpleTask + db database.Database + promClient v1.API + promURL string + logger *logrus.Entry + period model.Duration + limit int +} + +func (c *tsdbCollector) Execute(ctx context.Context, _ context.CancelFunc) error { + result, err := c.promClient.TSDB(ctx, v1.WithLimit(uint64(c.limit))) + if err != nil { + c.logger.WithError(err).Error("Failed to get TSDB statistics") + return nil + } + + c.logger.Infof("TSDB statistics retrieved successfuly") + + now := time.Now() + start := now.Add(time.Duration(-c.period)) + stats := map[string]*modelAPIV1.MetricStatistics{} + for _, v := range result.SeriesCountByMetricName { + stats[v.Name] = &modelAPIV1.MetricStatistics{ + SeriesCount: v.Value, + Period: uint64(time.Duration(c.period) / time.Second), + } + + metricMatcher := []string{fmt.Sprintf("%s", v.Name)} + labels, _, err := c.promClient.LabelNames(ctx, metricMatcher, start, now) + if err != nil { + c.logger.WithError(err).Errorf("failed to query labels for metric %q", v.Name) + return nil + } + + for _, label := range labels { + if label == model.MetricNameLabel { + continue + } + + values, _, err := c.promClient.LabelValues(ctx, label, metricMatcher, start, now) + if err != nil { + c.logger.WithError(err).Errorf("failed to query label values for label %q metric %q", label, v.Name) + return nil + } + + stats[v.Name].LabelValueCountByLabelName = append( + stats[v.Name].LabelValueCountByLabelName, + modelAPIV1.LabelCount{Name: label, Value: uint64(len(values))}, + ) + } + } + + c.db.EnqueueMetricStatistics(stats) + return nil +} + +func (c *tsdbCollector) String() string { + return "TSDB statistics collector" +}