Skip to content

Commit abef45b

Browse files
committed
[WIP] Add TSDB statistics collector
Signed-off-by: Simon Pasquier <[email protected]>
1 parent ca46c79 commit abef45b

File tree

6 files changed

+195
-3
lines changed

6 files changed

+195
-3
lines changed

config/collector.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,3 +213,31 @@ func (c *GrafanaCollector) Verify() error {
213213
}
214214
return nil
215215
}
216+
217+
type TSDBCollector struct {
218+
Enable bool `yaml:"enable"`
219+
Period model.Duration `yaml:"period,omitempty"`
220+
Limit int `yaml:"limit,omitempty"`
221+
HTTPClient HTTPClient `yaml:"prometheus_client"`
222+
PublicURL *common.URL `yaml:"public_url,omitempty"`
223+
}
224+
225+
func (c *TSDBCollector) Verify() error {
226+
if !c.Enable {
227+
return nil
228+
}
229+
230+
if c.Period <= 0 {
231+
c.Period = model.Duration(defaultMetricCollectorPeriodDuration)
232+
}
233+
234+
if c.Limit <= 0 {
235+
c.Limit = 10
236+
}
237+
238+
if c.HTTPClient.URL == nil {
239+
return fmt.Errorf("missing Rest URL for the TSDB collector")
240+
}
241+
242+
return nil
243+
}

config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ type Config struct {
6161
LabelsCollectors []*LabelsCollector `yaml:"labels_collectors,omitempty"`
6262
PersesCollector PersesCollector `yaml:"perses_collector,omitempty"`
6363
GrafanaCollector GrafanaCollector `yaml:"grafana_collector,omitempty"`
64+
TSDBCollectors []TSDBCollector `yaml:"tsdb_collectors,omitempty"`
6465
}
6566

6667
func Resolve(configFile string) (Config, error) {

database/database.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type Database interface {
4141
EnqueuePartialMetricsUsage(usages map[string]*v1.MetricUsage)
4242
EnqueueUsage(usages map[string]*v1.MetricUsage)
4343
EnqueueLabels(labels map[string][]string)
44+
EnqueueMetricStatistics(map[string]*v1.MetricStatistics)
4445
}
4546

4647
func New(cfg config.Database) Database {
@@ -54,13 +55,15 @@ func New(cfg config.Database) Database {
5455
partialMetricsUsageQueue: make(chan map[string]*v1.MetricUsage, 250),
5556
labelsQueue: make(chan map[string][]string, 250),
5657
metricsQueue: make(chan []string, 10),
58+
metricStatisticsQueue: make(chan map[string]*v1.MetricStatistics, 250),
5759
path: cfg.Path,
5860
}
5961

6062
go d.watchUsageQueue()
6163
go d.watchMetricsQueue()
6264
go d.watchPartialMetricsUsageQueue()
6365
go d.watchLabelsQueue()
66+
go d.watchMetricStatisticsQueue()
6467
if !*cfg.InMemory {
6568
if err := d.readMetricsInJSONFile(); err != nil {
6669
logrus.WithError(err).Warning("failed to read metrics file")
@@ -71,7 +74,6 @@ func New(cfg config.Database) Database {
7174
}
7275

7376
type db struct {
74-
Database
7577
// metrics is the list of metric name (as a key) associated with their usage based on the different collector activated.
7678
// This struct is our "database".
7779
metrics map[string]*v1.Metric
@@ -89,6 +91,9 @@ type db struct {
8991
// metricsQueue is the channel that should be used to send and receive the list of metric name to keep in memory.
9092
// Based on this list, we will then collect their usage.
9193
metricsQueue chan []string
94+
// metricStatisticsQueue is the channel that should be used to send and receive the list of metric statistics to keep in memory.
95+
// Based on this list, we will then collect their usage.
96+
metricStatisticsQueue chan map[string]*v1.MetricStatistics
9297
// labelsQueue is the way to send the labels per metric to write in the database.
9398
// There will be no other way to write in it.
9499
// Doing that allows us to accept more HTTP requests to write data and to delay the actual writing.
@@ -147,6 +152,10 @@ func (d *db) EnqueueMetricList(metrics []string) {
147152
d.metricsQueue <- metrics
148153
}
149154

155+
func (d *db) EnqueueMetricStatistics(stats map[string]*v1.MetricStatistics) {
156+
d.metricStatisticsQueue <- stats
157+
}
158+
150159
func (d *db) ListPendingUsage() map[string]*v1.MetricUsage {
151160
d.metricsMutex.Lock()
152161
defer d.metricsMutex.Unlock()
@@ -209,6 +218,25 @@ func (d *db) watchMetricsQueue() {
209218
}
210219
}
211220

221+
// watchMetricStatisticsQueue is the way to store the metric statistics in the database.
222+
func (d *db) watchMetricStatisticsQueue() {
223+
for stats := range d.metricStatisticsQueue {
224+
for metricName := range stats {
225+
d.metricsMutex.Lock()
226+
if _, ok := d.metrics[metricName]; !ok {
227+
logrus.Debugf("metric_name %q is used but it's not found by the metric collector", metricName)
228+
// Since the metric_name is not known yet, we need to buffer it.
229+
// In a later stage, if the metric is received/known,
230+
// we will then use this buffer to populate the usage of the metric.
231+
d.metrics[metricName] = &v1.Metric{}
232+
}
233+
234+
d.metrics[metricName].Statistics = stats[metricName]
235+
d.metricsMutex.Unlock()
236+
}
237+
}
238+
}
239+
212240
func (d *db) watchPartialMetricsUsageQueue() {
213241
for data := range d.partialMetricsUsageQueue {
214242
d.partialMetricsUsageMutex.Lock()

main.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/perses/metrics-usage/source/metric"
2626
"github.com/perses/metrics-usage/source/perses"
2727
"github.com/perses/metrics-usage/source/rules"
28+
"github.com/perses/metrics-usage/source/tsdb"
2829
"github.com/sirupsen/logrus"
2930
)
3031

@@ -71,6 +72,16 @@ func main() {
7172
}
7273
}
7374

75+
for i, tsdbCollectorConfig := range conf.TSDBCollectors {
76+
if tsdbCollectorConfig.Enable {
77+
tsdbCollector, collectorErr := tsdb.NewCollector(db, tsdbCollectorConfig)
78+
if collectorErr != nil {
79+
logrus.WithError(collectorErr).Fatalf("unable to create the TSDB collector number %d", i)
80+
}
81+
runner.WithTimerTasks(time.Duration(tsdbCollectorConfig.Period), tsdbCollector)
82+
}
83+
}
84+
7485
if conf.PersesCollector.Enable {
7586
persesCollectorConfig := conf.PersesCollector
7687
persesCollector, collectorErr := perses.NewCollector(db, persesCollectorConfig)

pkg/api/v1/metric_usage.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,17 @@ func buildKey(v reflect.Value) string {
142142
return key.String()
143143
}
144144

145+
type MetricStatistics struct {
146+
Period uint64 `json:"period"`
147+
SeriesCount uint64 `json:"series_count"`
148+
LabelValueCountByLabelName []LabelCount `json:"label_value_count_by_label_name,omitempty"`
149+
}
150+
151+
type LabelCount struct {
152+
Name string `json:"name"`
153+
Value uint64 `json:"value"`
154+
}
155+
145156
type RuleUsage struct {
146157
PromLink string `json:"prom_link"`
147158
GroupName string `json:"group_name"`
@@ -176,8 +187,9 @@ func MergeUsage(old, new *MetricUsage) *MetricUsage {
176187
}
177188

178189
type Metric struct {
179-
Labels Set[string] `json:"labels,omitempty"`
180-
Usage *MetricUsage `json:"usage,omitempty"`
190+
Labels Set[string] `json:"labels,omitempty"`
191+
Statistics *MetricStatistics `json:"statistics,omitempty"`
192+
Usage *MetricUsage `json:"usage,omitempty"`
181193
}
182194

183195
type PartialMetric struct {

source/tsdb/tsdb.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
// Copyright 2025 The Perses Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package tsdb
15+
16+
import (
17+
"context"
18+
"fmt"
19+
"time"
20+
21+
"github.com/perses/common/async"
22+
"github.com/perses/metrics-usage/config"
23+
"github.com/perses/metrics-usage/database"
24+
modelAPIV1 "github.com/perses/metrics-usage/pkg/api/v1"
25+
promUtils "github.com/perses/metrics-usage/utils/prometheus"
26+
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
27+
"github.com/prometheus/common/model"
28+
"github.com/sirupsen/logrus"
29+
)
30+
31+
func NewCollector(db database.Database, cfg config.TSDBCollector) (async.SimpleTask, error) {
32+
promClient, err := promUtils.NewClient(cfg.HTTPClient)
33+
if err != nil {
34+
return nil, err
35+
}
36+
37+
logger := logrus.StandardLogger().WithField("collector", "tsdb")
38+
url := cfg.HTTPClient.URL.URL
39+
if cfg.PublicURL != nil {
40+
url = cfg.PublicURL.URL
41+
}
42+
43+
return &tsdbCollector{
44+
db: db,
45+
promURL: url.String(),
46+
promClient: promClient,
47+
logger: logger,
48+
period: cfg.Period,
49+
limit: cfg.Limit,
50+
}, nil
51+
}
52+
53+
type tsdbCollector struct {
54+
async.SimpleTask
55+
db database.Database
56+
promClient v1.API
57+
promURL string
58+
logger *logrus.Entry
59+
period model.Duration
60+
limit int
61+
}
62+
63+
func (c *tsdbCollector) Execute(ctx context.Context, _ context.CancelFunc) error {
64+
result, err := c.promClient.TSDB(ctx, v1.WithLimit(uint64(c.limit)))
65+
if err != nil {
66+
c.logger.WithError(err).Error("Failed to get TSDB statistics")
67+
return nil
68+
}
69+
70+
c.logger.Infof("TSDB statistics retrieved successfuly")
71+
72+
now := time.Now()
73+
start := now.Add(time.Duration(-c.period))
74+
stats := map[string]*modelAPIV1.MetricStatistics{}
75+
for _, v := range result.SeriesCountByMetricName {
76+
stats[v.Name] = &modelAPIV1.MetricStatistics{
77+
SeriesCount: v.Value,
78+
Period: uint64(time.Duration(c.period) / time.Second),
79+
}
80+
81+
metricMatcher := []string{fmt.Sprintf("%s", v.Name)}
82+
labels, _, err := c.promClient.LabelNames(ctx, metricMatcher, start, now)
83+
if err != nil {
84+
c.logger.WithError(err).Errorf("failed to query labels for metric %q", v.Name)
85+
return nil
86+
}
87+
88+
for _, label := range labels {
89+
if label == model.MetricNameLabel {
90+
continue
91+
}
92+
93+
values, _, err := c.promClient.LabelValues(ctx, label, metricMatcher, start, now)
94+
if err != nil {
95+
c.logger.WithError(err).Errorf("failed to query label values for label %q metric %q", label, v.Name)
96+
return nil
97+
}
98+
99+
stats[v.Name].LabelValueCountByLabelName = append(
100+
stats[v.Name].LabelValueCountByLabelName,
101+
modelAPIV1.LabelCount{Name: label, Value: uint64(len(values))},
102+
)
103+
}
104+
}
105+
106+
c.db.EnqueueMetricStatistics(stats)
107+
return nil
108+
}
109+
110+
func (c *tsdbCollector) String() string {
111+
return "TSDB statistics collector"
112+
}

0 commit comments

Comments
 (0)