Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions config/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,31 @@ func (c *GrafanaCollector) Verify() error {
}
return nil
}

type TSDBCollector struct {
Enable bool `yaml:"enable"`
Period model.Duration `yaml:"period,omitempty"`
Limit int `yaml:"limit,omitempty"`
HTTPClient HTTPClient `yaml:"prometheus_client"`
PublicURL *common.URL `yaml:"public_url,omitempty"`
}

func (c *TSDBCollector) Verify() error {
if !c.Enable {
return nil
}

if c.Period <= 0 {
c.Period = model.Duration(defaultMetricCollectorPeriodDuration)
}

if c.Limit <= 0 {
c.Limit = 10
}

if c.HTTPClient.URL == nil {
return fmt.Errorf("missing Rest URL for the TSDB collector")
}

return nil
}
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Config struct {
LabelsCollectors []*LabelsCollector `yaml:"labels_collectors,omitempty"`
PersesCollector PersesCollector `yaml:"perses_collector,omitempty"`
GrafanaCollector GrafanaCollector `yaml:"grafana_collector,omitempty"`
TSDBCollectors []TSDBCollector `yaml:"tsdb_collectors,omitempty"`
}

func Resolve(configFile string) (Config, error) {
Expand Down
30 changes: 29 additions & 1 deletion database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Database interface {
EnqueuePartialMetricsUsage(usages map[string]*v1.MetricUsage)
EnqueueUsage(usages map[string]*v1.MetricUsage)
EnqueueLabels(labels map[string][]string)
EnqueueMetricStatistics(map[string]*v1.MetricStatistics)
}

func New(cfg config.Database) Database {
Expand All @@ -54,13 +55,15 @@ func New(cfg config.Database) Database {
partialMetricsUsageQueue: make(chan map[string]*v1.MetricUsage, 250),
labelsQueue: make(chan map[string][]string, 250),
metricsQueue: make(chan []string, 10),
metricStatisticsQueue: make(chan map[string]*v1.MetricStatistics, 250),
path: cfg.Path,
}

go d.watchUsageQueue()
go d.watchMetricsQueue()
go d.watchPartialMetricsUsageQueue()
go d.watchLabelsQueue()
go d.watchMetricStatisticsQueue()
if !*cfg.InMemory {
if err := d.readMetricsInJSONFile(); err != nil {
logrus.WithError(err).Warning("failed to read metrics file")
Expand All @@ -71,7 +74,6 @@ func New(cfg config.Database) Database {
}

type db struct {
Database
// metrics is the list of metric name (as a key) associated with their usage based on the different collector activated.
// This struct is our "database".
metrics map[string]*v1.Metric
Expand All @@ -89,6 +91,9 @@ type db struct {
// metricsQueue is the channel that should be used to send and receive the list of metric name to keep in memory.
// Based on this list, we will then collect their usage.
metricsQueue chan []string
// metricStatisticsQueue is the channel that should be used to send and receive the list of metric statistics to keep in memory.
// Based on this list, we will then collect their usage.
metricStatisticsQueue chan map[string]*v1.MetricStatistics
// labelsQueue is the way to send the labels per metric to write in the database.
// There will be no other way to write in it.
// Doing that allows us to accept more HTTP requests to write data and to delay the actual writing.
Expand Down Expand Up @@ -147,6 +152,10 @@ func (d *db) EnqueueMetricList(metrics []string) {
d.metricsQueue <- metrics
}

func (d *db) EnqueueMetricStatistics(stats map[string]*v1.MetricStatistics) {
d.metricStatisticsQueue <- stats
}

func (d *db) ListPendingUsage() map[string]*v1.MetricUsage {
d.metricsMutex.Lock()
defer d.metricsMutex.Unlock()
Expand Down Expand Up @@ -209,6 +218,25 @@ func (d *db) watchMetricsQueue() {
}
}

// watchMetricStatisticsQueue is the way to store the metric statistics in the database.
func (d *db) watchMetricStatisticsQueue() {
for stats := range d.metricStatisticsQueue {
for metricName := range stats {
d.metricsMutex.Lock()
if _, ok := d.metrics[metricName]; !ok {
logrus.Debugf("metric_name %q is used but it's not found by the metric collector", metricName)
// Since the metric_name is not known yet, we need to buffer it.
// In a later stage, if the metric is received/known,
// we will then use this buffer to populate the usage of the metric.
d.metrics[metricName] = &v1.Metric{}
}

d.metrics[metricName].Statistics = stats[metricName]
d.metricsMutex.Unlock()
}
}
}

func (d *db) watchPartialMetricsUsageQueue() {
for data := range d.partialMetricsUsageQueue {
d.partialMetricsUsageMutex.Lock()
Expand Down
11 changes: 11 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/perses/metrics-usage/source/metric"
"github.com/perses/metrics-usage/source/perses"
"github.com/perses/metrics-usage/source/rules"
"github.com/perses/metrics-usage/source/tsdb"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -71,6 +72,16 @@ func main() {
}
}

for i, tsdbCollectorConfig := range conf.TSDBCollectors {
if tsdbCollectorConfig.Enable {
tsdbCollector, collectorErr := tsdb.NewCollector(db, tsdbCollectorConfig)
if collectorErr != nil {
logrus.WithError(collectorErr).Fatalf("unable to create the TSDB collector number %d", i)
}
runner.WithTimerTasks(time.Duration(tsdbCollectorConfig.Period), tsdbCollector)
}
}

if conf.PersesCollector.Enable {
persesCollectorConfig := conf.PersesCollector
persesCollector, collectorErr := perses.NewCollector(db, persesCollectorConfig)
Expand Down
16 changes: 14 additions & 2 deletions pkg/api/v1/metric_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,17 @@ func buildKey(v reflect.Value) string {
return key.String()
}

type MetricStatistics struct {
Period uint64 `json:"period"`
SeriesCount uint64 `json:"series_count"`
LabelValueCountByLabelName []LabelCount `json:"label_value_count_by_label_name,omitempty"`
}

type LabelCount struct {
Name string `json:"name"`
Value uint64 `json:"value"`
}

type RuleUsage struct {
PromLink string `json:"prom_link"`
GroupName string `json:"group_name"`
Expand Down Expand Up @@ -176,8 +187,9 @@ func MergeUsage(old, new *MetricUsage) *MetricUsage {
}

type Metric struct {
Labels Set[string] `json:"labels,omitempty"`
Usage *MetricUsage `json:"usage,omitempty"`
Labels Set[string] `json:"labels,omitempty"`
Statistics *MetricStatistics `json:"statistics,omitempty"`
Usage *MetricUsage `json:"usage,omitempty"`
}

type PartialMetric struct {
Expand Down
112 changes: 112 additions & 0 deletions source/tsdb/tsdb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2025 The Perses Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tsdb

import (
"context"
"fmt"
"time"

"github.com/perses/common/async"
"github.com/perses/metrics-usage/config"
"github.com/perses/metrics-usage/database"
modelAPIV1 "github.com/perses/metrics-usage/pkg/api/v1"
promUtils "github.com/perses/metrics-usage/utils/prometheus"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/sirupsen/logrus"
)

func NewCollector(db database.Database, cfg config.TSDBCollector) (async.SimpleTask, error) {
promClient, err := promUtils.NewClient(cfg.HTTPClient)
if err != nil {
return nil, err
}

logger := logrus.StandardLogger().WithField("collector", "tsdb")
url := cfg.HTTPClient.URL.URL
if cfg.PublicURL != nil {
url = cfg.PublicURL.URL
}

return &tsdbCollector{
db: db,
promURL: url.String(),
promClient: promClient,
logger: logger,
period: cfg.Period,
limit: cfg.Limit,
}, nil
}

type tsdbCollector struct {
async.SimpleTask
db database.Database
promClient v1.API
promURL string
logger *logrus.Entry
period model.Duration
limit int
}

func (c *tsdbCollector) Execute(ctx context.Context, _ context.CancelFunc) error {
result, err := c.promClient.TSDB(ctx, v1.WithLimit(uint64(c.limit)))
if err != nil {
c.logger.WithError(err).Error("Failed to get TSDB statistics")
return nil
}

c.logger.Infof("TSDB statistics retrieved successfuly")

now := time.Now()
start := now.Add(time.Duration(-c.period))
stats := map[string]*modelAPIV1.MetricStatistics{}
for _, v := range result.SeriesCountByMetricName {
stats[v.Name] = &modelAPIV1.MetricStatistics{
SeriesCount: v.Value,
Period: uint64(time.Duration(c.period) / time.Second),
}

metricMatcher := []string{fmt.Sprintf("%s", v.Name)}

Check failure on line 81 in source/tsdb/tsdb.go

View workflow job for this annotation

GitHub Actions / lint

S1025: the argument is already a string, there's no need to use fmt.Sprintf (staticcheck)
labels, _, err := c.promClient.LabelNames(ctx, metricMatcher, start, now)
if err != nil {
c.logger.WithError(err).Errorf("failed to query labels for metric %q", v.Name)
return nil
}

for _, label := range labels {
if label == model.MetricNameLabel {
continue
}

values, _, err := c.promClient.LabelValues(ctx, label, metricMatcher, start, now)
if err != nil {
c.logger.WithError(err).Errorf("failed to query label values for label %q metric %q", label, v.Name)
return nil
}

stats[v.Name].LabelValueCountByLabelName = append(
stats[v.Name].LabelValueCountByLabelName,
modelAPIV1.LabelCount{Name: label, Value: uint64(len(values))},
)
}
}

c.db.EnqueueMetricStatistics(stats)
return nil
}

func (c *tsdbCollector) String() string {
return "TSDB statistics collector"
}
Loading