Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
d43fa02
add transaction head
u-veles-a Oct 31, 2025
84f311a
Refactor rule evaluation to support concurrent execution
u-veles-a Oct 31, 2025
31ee148
some fix
u-veles-a Oct 31, 2025
70e4b89
small fix
u-veles-a Oct 31, 2025
bcd1576
fix mutex
u-veles-a Nov 1, 2025
4378aab
fix test
u-veles-a Nov 1, 2025
56f457b
Merge branch 'concurrency_rules' into transactionhead
u-veles-a Nov 1, 2025
89342ae
Merge branch 'pp' into concurrency_rules
u-veles-a Nov 1, 2025
135eced
merge
u-veles-a Nov 1, 2025
a6c48ce
Merge branch 'concurrency_rules' into transactionhead
u-veles-a Nov 1, 2025
6ada215
Merge branch 'pp' into transactionhead
u-veles-a Nov 6, 2025
5570192
add transition head to groups
u-veles-a Nov 7, 2025
544c599
Merge branch 'pp' into transactionhead
u-veles-a Nov 7, 2025
9579b48
fix after metge
u-veles-a Nov 7, 2025
881e91d
fix
u-veles-a Nov 7, 2025
e45be61
fix
u-veles-a Nov 7, 2025
3def43a
fix
u-veles-a Nov 7, 2025
836979f
fix check on empty
u-veles-a Nov 7, 2025
7499706
Merge branch 'pp' into fix_append_full_relabeled
u-veles-a Nov 7, 2025
4da0e3f
Merge branch 'pp' into transactionhead
u-veles-a Nov 7, 2025
391085f
Merge branch 'fix_append_full_relabeled' into transactionhead
u-veles-a Nov 7, 2025
9b5bd71
Merge branch 'pp' into transactionhead
u-veles-a Nov 10, 2025
5b793b8
some fix
u-veles-a Nov 10, 2025
02b81fd
some fix
u-veles-a Nov 11, 2025
4c506bb
Merge branch 'pp' into transactionhead
u-veles-a Nov 11, 2025
b05429d
fix time interval
u-veles-a Nov 11, 2025
49e5b57
fix test
u-veles-a Nov 12, 2025
85e81b6
Merge branch 'pp' into transactionhead
u-veles-a Nov 12, 2025
f5b083d
fix linter
u-veles-a Nov 12, 2025
42fb195
Merge branch 'pp' into transactionhead
u-veles-a Nov 14, 2025
e7ff7cc
fix rules
u-veles-a Nov 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
"github.com/prometheus/prometheus/pp-pkg/remote" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp-pkg/scrape" // PP_CHANGES.md: rebuild on cpp
pp_pkg_storage "github.com/prometheus/prometheus/pp-pkg/storage" // PP_CHANGES.md: rebuild on cpp
pp_pkg_remote "github.com/prometheus/prometheus/pp-pkg/storage/remote" // PP_CHANGES.md: rebuild on cpp
pp_pkg_tsdb "github.com/prometheus/prometheus/pp-pkg/tsdb" // PP_CHANGES.md: rebuild on cpp

pp_storage "github.com/prometheus/prometheus/pp/go/storage" // PP_CHANGES.md: rebuild on cpp
Expand Down Expand Up @@ -801,6 +802,7 @@ func main() {
adapter := pp_pkg_storage.NewAdapter(
clock,
hManager.Proxy(),
hManager.Builder(),
hManager.MergeOutOfOrderChunks,
prometheus.DefaultRegisterer,
)
Expand All @@ -812,7 +814,7 @@ func main() {
scraper = &readyScrapeManager{}

// PP_CHANGES.md: rebuild on cpp start
remoteRead = pp_pkg_storage.NewRemoteRead(
remoteRead = pp_pkg_remote.NewRemoteRead(
log.With(logger, "component", "remote"),
localStorage.StartTime,
)
Expand Down Expand Up @@ -956,9 +958,15 @@ func main() {

ruleQueryOffset := time.Duration(cfgFile.GlobalConfig.RuleQueryOffset)
ruleManager = rules.NewManager(&rules.ManagerOptions{
Appendable: adapter, // PP_CHANGES.md: rebuild on cpp
Queryable: adapter, // PP_CHANGES.md: rebuild on cpp
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
Queryable: adapter, // PP_CHANGES.md: rebuild on cpp
Engine: queryEngine, // PP_CHANGES.md: rebuild on cpp
FanoutQueryable: fanoutStorage, // PP_CHANGES.md: rebuild on cpp
EngineQueryCtor: rules.EngineQueryFunc, // PP_CHANGES.md: rebuild on cpp
Batcher: adapter, // PP_CHANGES.md: rebuild on cpp

// Appendable: adapter, // PP_CHANGES.md: rebuild on cpp
// QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),

NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()),
Context: ctxRule,
ExternalURL: cfg.web.ExternalURL,
Expand Down
36 changes: 18 additions & 18 deletions cmd/promtool/testdata/unittest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,24 @@ tests:
- value: 3
labels: "test_increase"

# Histograms
- expr: test_histogram
eval_time: 1m
exp_samples:
- labels: "test_histogram"
histogram: "{{schema:1 sum:-0.3 count:32.1 z_bucket:7.1 z_bucket_w:0.05 buckets:[5.1 10 7] offset:-3 n_buckets:[4.1 5] n_offset:-5}}"

- expr: test_histogram_repeat
eval_time: 2m
exp_samples:
- labels: "test_histogram_repeat"
histogram: "{{count:2 sum:3 buckets:[2]}}"

- expr: test_histogram_increase
eval_time: 2m
exp_samples:
- labels: "test_histogram_increase"
histogram: "{{count:4 sum:5.6 buckets:[4]}}"
# # Histograms
# - expr: test_histogram
# eval_time: 1m
# exp_samples:
# - labels: "test_histogram"
# histogram: "{{schema:1 sum:-0.3 count:32.1 z_bucket:7.1 z_bucket_w:0.05 buckets:[5.1 10 7] offset:-3 n_buckets:[4.1 5] n_offset:-5}}"

# - expr: test_histogram_repeat
# eval_time: 2m
# exp_samples:
# - labels: "test_histogram_repeat"
# histogram: "{{count:2 sum:3 buckets:[2]}}"

# - expr: test_histogram_increase
# eval_time: 2m
# exp_samples:
# - labels: "test_histogram_increase"
# histogram: "{{count:4 sum:5.6 buckets:[4]}}"

# Ensure a value is stale as soon as it is marked as such.
- expr: test_stale
Expand Down
13 changes: 8 additions & 5 deletions cmd/promtool/unittest.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,16 @@ func (tg *testGroup) test(evalInterval time.Duration, groupOrderMap map[string]i
}()
suite.SubqueryInterval = evalInterval

fanoutStorage := storage.NewFanout(log.NewNopLogger(), suite.Adapter(), suite.LocalStorage())
// Load the rule files.
opts := &rules.ManagerOptions{
QueryFunc: rules.EngineQueryFunc(suite.QueryEngine(), suite.Storage()),
Appendable: suite.Storage(),
Context: context.Background(),
NotifyFunc: func(ctx context.Context, expr string, alerts ...*rules.Alert) {},
Logger: log.NewNopLogger(),
Engine: suite.QueryEngine(), // PP_CHANGES.md: rebuild on cpp
FanoutQueryable: fanoutStorage, // PP_CHANGES.md: rebuild on cpp
EngineQueryCtor: rules.EngineQueryFunc, // PP_CHANGES.md: rebuild on cpp
Batcher: suite.Adapter(), // PP_CHANGES.md: rebuild on cpp
Context: context.Background(),
NotifyFunc: func(ctx context.Context, expr string, alerts ...*rules.Alert) {},
Logger: log.NewNopLogger(),
}
m := rules.NewManager(opts)
groupsMap, ers := m.LoadGroups(time.Duration(tg.Interval), tg.ExternalLabels, tg.ExternalURL, nil, ruleFiles...)
Expand Down
32 changes: 32 additions & 0 deletions pp-pkg/storage/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var _ storage.Storage = (*Adapter)(nil)
// Adapter for implementing the [Queryable] interface and append data.
type Adapter struct {
proxy *pp_storage.Proxy
builder *pp_storage.Builder
haTracker *hatracker.HighAvailabilityTracker
hashdexFactory cppbridge.HashdexFactory
hashdexLimits cppbridge.WALHashdexLimits
Expand All @@ -44,12 +45,14 @@ type Adapter struct {
func NewAdapter(
clock clockwork.Clock,
proxy *pp_storage.Proxy,
builder *pp_storage.Builder,
mergeOutOfOrderChunks func(),
registerer prometheus.Registerer,
) *Adapter {
factory := util.NewUnconflictRegisterer(registerer)
return &Adapter{
proxy: proxy,
builder: builder,
haTracker: hatracker.NewHighAvailabilityTracker(clock, registerer),
hashdexFactory: cppbridge.HashdexFactory{},
hashdexLimits: cppbridge.DefaultWALHashdexLimits(),
Expand Down Expand Up @@ -202,6 +205,18 @@ func (ar *Adapter) Appender(ctx context.Context) storage.Appender {
return newTimeSeriesAppender(ctx, ar, ar.transparentState)
}

// BatchStorage creates a new [storage.BatchStorage] for appending time series data to [TransactionHead]
// and reading appended series data.
func (ar *Adapter) BatchStorage() storage.BatchStorage {
return NewBatchStorage(
ar.hashdexFactory,
ar.hashdexLimits,
ar.builder.BuildTransactionHead(),
ar.transparentState,
ar,
)
}

// ChunkQuerier provides querying access over time series data of a fixed time range.
// Returns new Chunk Querier that merges results of given primary and secondary chunk queriers.
func (ar *Adapter) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
Expand Down Expand Up @@ -279,6 +294,11 @@ func (ar *Adapter) Querier(mint, maxt int64) (storage.Querier, error) {
continue
}

timeInterval := headTimeInterval(head)
if !timeInterval.IsInvalid() && mint > timeInterval.MaxT {
continue
}

queriers = append(
queriers,
querier.NewQuerier(head, querier.NewNoOpShardedDeduplicator, mint, maxt, nil, ar.storageQuerierMetrics),
Expand All @@ -293,3 +313,15 @@ func (ar *Adapter) Querier(mint, maxt int64) (storage.Querier, error) {
func (*Adapter) StartTime() (int64, error) {
return math.MaxInt64, nil
}

// headTimeInterval returns [cppbridge.TimeInterval] from [pp_storage.Head].
func headTimeInterval(head *pp_storage.Head) cppbridge.TimeInterval {
timeInterval := cppbridge.NewInvalidTimeInterval()
for shard := range head.RangeShards() {
interval := shard.TimeInterval(false)
timeInterval.MinT = min(interval.MinT, timeInterval.MinT)
timeInterval.MaxT = max(interval.MaxT, timeInterval.MaxT)
}

return timeInterval
}
47 changes: 35 additions & 12 deletions pp-pkg/storage/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
pp_pkg_model "github.com/prometheus/prometheus/pp-pkg/model"
"github.com/prometheus/prometheus/pp/go/cppbridge"
"github.com/prometheus/prometheus/pp/go/model"
"github.com/prometheus/prometheus/storage"
Expand All @@ -27,27 +28,49 @@ func (d *timeSeriesBatch) Destroy() {
d.timeSeries = nil
}

//
// AppenderTimeSeries
//

// AppenderTimeSeries append TimeSeries data to [Head].
type AppenderTimeSeries interface {
// AppendTimeSeries append TimeSeries data to [Head].
AppendTimeSeries(
ctx context.Context,
data pp_pkg_model.TimeSeriesBatch,
state *cppbridge.StateV2,
commitToWal bool,
) (cppbridge.RelabelerStats, error)
}

//
// TimeSeriesAppender
//

// TimeSeriesAppender appender for rules, aggregates the [model.TimeSeries] batch and append to head,
// implementation [storage.Appender].
type TimeSeriesAppender struct {
ctx context.Context
adapter *Adapter
state *cppbridge.StateV2
batch *timeSeriesBatch
lsb *model.LabelSetBuilder
ctx context.Context
appender AppenderTimeSeries
state *cppbridge.StateV2
batch *timeSeriesBatch
lsb *model.LabelSetSimpleBuilder
}

// newTimeSeriesAppender init new [TimeSeriesAppender].
func newTimeSeriesAppender(
ctx context.Context,
adapter *Adapter,
appender AppenderTimeSeries,
state *cppbridge.StateV2,
) *TimeSeriesAppender {
return &TimeSeriesAppender{
ctx: ctx,
adapter: adapter,
state: state,
batch: &timeSeriesBatch{timeSeries: make([]model.TimeSeries, 0, 10)},
lsb: model.NewLabelSetBuilderSize(10),
ctx: ctx,
appender: appender,
state: state,
//revive:disable-next-line:add-constant // there are usually 10 rules on average.
batch: &timeSeriesBatch{timeSeries: make([]model.TimeSeries, 0, 10)},
//revive:disable-next-line:add-constant // there are usually 10 labels on average.
lsb: model.NewLabelSetSimpleBuilderSize(10),
}
}

Expand Down Expand Up @@ -106,7 +129,7 @@ func (a *TimeSeriesAppender) Commit() error {
return nil
}

_, err := a.adapter.AppendTimeSeries(a.ctx, a.batch, a.state, false)
_, err := a.appender.AppendTimeSeries(a.ctx, a.batch, a.state, false)
return err
}

Expand Down
89 changes: 89 additions & 0 deletions pp-pkg/storage/batch_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package storage

import (
"context"

"github.com/prometheus/prometheus/pp-pkg/model"
"github.com/prometheus/prometheus/pp/go/cppbridge"
pp_model "github.com/prometheus/prometheus/pp/go/model"
pp_storage "github.com/prometheus/prometheus/pp/go/storage"
"github.com/prometheus/prometheus/pp/go/storage/appender"
"github.com/prometheus/prometheus/pp/go/storage/head/services"
"github.com/prometheus/prometheus/pp/go/storage/querier"
"github.com/prometheus/prometheus/storage"
)

// BatchStorage appender for rules, aggregates the [model.TimeSeries] batch and append to [pp_storage.TransactionHead],
// on commit append from [pp_storage.TransactionHead] to [Head]. It can read as [storage.Querier] the added data.
type BatchStorage struct {
hashdexFactory cppbridge.HashdexFactory
hashdexLimits cppbridge.WALHashdexLimits
transactionHead *pp_storage.TransactionHead
state *cppbridge.StateV2
// TODO tmp
batch *timeSeriesBatch
adapter *Adapter
}

// NewBatchStorage init new [BatchStorage].
func NewBatchStorage(
hashdexFactory cppbridge.HashdexFactory,
hashdexLimits cppbridge.WALHashdexLimits,
transactionHead *pp_storage.TransactionHead,
state *cppbridge.StateV2,
adapter *Adapter,
) *BatchStorage {
return &BatchStorage{
hashdexFactory: hashdexFactory,
hashdexLimits: hashdexLimits,
transactionHead: transactionHead,
state: state,
batch: &timeSeriesBatch{timeSeries: make([]pp_model.TimeSeries, 0, 10)},
adapter: adapter,
}
}

// Appender creates a new [storage.Appender] for appending time series data to [pp_storage.TransactionHead].
func (bs *BatchStorage) Appender(ctx context.Context) storage.Appender {
return newTimeSeriesAppender(ctx, bs, bs.state)
}

// AppendTimeSeries append TimeSeries data to [pp_storage.TransactionHead].
func (bs *BatchStorage) AppendTimeSeries(
ctx context.Context,
data model.TimeSeriesBatch,
state *cppbridge.StateV2,
commitToWal bool,
) (stats cppbridge.RelabelerStats, err error) {
hx, err := bs.hashdexFactory.GoModel(data.TimeSeries(), bs.hashdexLimits)
if err != nil {
data.Destroy()
return stats, err
}

tdata := data.TimeSeries()
stats, err = appender.New(bs.transactionHead, services.CFViaRange).Append(
ctx,
&appender.IncomingData{Hashdex: hx, Data: data},
state,
commitToWal,
)
bs.batch.timeSeries = append(bs.batch.timeSeries, tdata...)

return stats, err
}

// Commit adds aggregated series from [pp_storage.TransactionHead] to the [Head].
func (bs *BatchStorage) Commit(ctx context.Context) error {
if len(bs.batch.timeSeries) == 0 {
return nil
}

_, err := bs.adapter.AppendTimeSeries(ctx, bs.batch, bs.state, false)
return err
}

// Querier calls f() with the given parameters. Returns a [querier.Querier].
func (bs *BatchStorage) Querier(mint, maxt int64) (storage.Querier, error) {
return querier.NewQuerier(bs.transactionHead, querier.NewNoOpShardedDeduplicator, mint, maxt, nil, nil), nil
}
2 changes: 1 addition & 1 deletion pp-pkg/storage/noop.go → pp-pkg/storage/remote/noop.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package storage
package remote

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package storage
package remote

import (
"crypto/md5" // #nosec G501 // cryptographic strength is not required
Expand Down
Loading
Loading