diff --git a/pp/entrypoint/head/lss.h b/pp/entrypoint/head/lss.h index fc28126f8..4114ab24b 100644 --- a/pp/entrypoint/head/lss.h +++ b/pp/entrypoint/head/lss.h @@ -1,14 +1,19 @@ #pragma once +#include #include #include "bare_bones/exception.h" +#include "primitives/primitives.h" #include "primitives/snug_composites.h" #include "series_index/queryable_encoding_bimap.h" #include "series_index/trie/cedarpp_tree.h" namespace entrypoint::head { +using LsIdsSlice = BareBones::Vector; +using LsIdsSlicePtr = std::unique_ptr; + enum class LssType : uint32_t { kEncodingBimap = 0, kOrderedEncodingBimap, diff --git a/pp/entrypoint/primitives_lss.cpp b/pp/entrypoint/primitives_lss.cpp index f6e62d22b..3c2bc6c28 100644 --- a/pp/entrypoint/primitives_lss.cpp +++ b/pp/entrypoint/primitives_lss.cpp @@ -9,6 +9,8 @@ using GoLabelMatchers = PromPP::Primitives::Go::SliceView>; using GoSliceOfString = PromPP::Primitives::Go::Slice; +using entrypoint::head::LsIdsSlice; +using entrypoint::head::LsIdsSlicePtr; using entrypoint::head::LssType; using entrypoint::head::LssVariantPtr; using entrypoint::head::QueryableEncodingBimap; @@ -255,7 +257,8 @@ extern "C" void prompp_primitives_lss_copy_added_series(uint64_t source_lss, uin auto& dst = std::get(*std::bit_cast(destination_lss)); src.build_deferred_indexes(); - series_index::QueryableEncodingBimapCopier copier(src, src.sorting_index(), src.added_series(), dst); + BareBones::Vector dst_src_ids_mapping; + series_index::QueryableEncodingBimapCopier copier(src, src.sorting_index(), src.added_series(), dst, dst_src_ids_mapping); copier.copy_added_series_and_build_indexes(); } @@ -281,11 +284,21 @@ extern "C" void prompp_primitives_lss_bitset_dtor(void* args) { static_cast(args)->~Arguments(); } -extern "C" void prompp_primitives_readonly_lss_copy_added_series(uint64_t source_lss, uint64_t source_bitset, uint64_t destination_lss) { +extern "C" void prompp_primitives_readonly_lss_copy_added_series(uint64_t source_lss, uint64_t source_bitset, uint64_t destination_lss, uint64_t ids_mapping) { const auto& src = std::get(*std::bit_cast(source_lss)); const auto& src_bitset = *std::bit_cast(source_bitset); auto& dst = std::get(*std::bit_cast(destination_lss)); + const auto dst_src_ids_mapping = std::bit_cast(ids_mapping); + *dst_src_ids_mapping = std::make_unique(); - series_index::QueryableEncodingBimapCopier copier(src, src.sorting_index(), src_bitset, dst); + series_index::QueryableEncodingBimapCopier copier(src, src.sorting_index(), src_bitset, dst, **dst_src_ids_mapping); copier.copy_added_series_and_build_indexes(); } + +void prompp_primitives_free_ls_ids_mapping(void* args) { + struct Arguments { + LsIdsSlicePtr ls_ids_mapping; + }; + + static_cast(args)->~Arguments(); +} diff --git a/pp/entrypoint/primitives_lss.h b/pp/entrypoint/primitives_lss.h index 17d51954f..3778f5abe 100644 --- a/pp/entrypoint/primitives_lss.h +++ b/pp/entrypoint/primitives_lss.h @@ -223,11 +223,22 @@ void prompp_primitives_lss_bitset_dtor(void* args); * @param source_lss pointer to source label sets; * @param source_bitset pointer to source bitset; * @param destination_lss pointer to destination label sets; + * @param ids_mapping pointer to uintptr * * @attention This binding used as a CGO call!!! * */ -void prompp_primitives_readonly_lss_copy_added_series(uint64_t source_lss, uint64_t source_bitset, uint64_t destination_lss); +void prompp_primitives_readonly_lss_copy_added_series(uint64_t source_lss, uint64_t source_bitset, uint64_t destination_lss, uint64_t ids_mapping); + +/** + * @brief destroy ls ids mapping + * + * @param args { + * ls_ids_mapping uintptr + * } + * + */ +void prompp_primitives_free_ls_ids_mapping(void* args); #ifdef __cplusplus } // extern "C" diff --git a/pp/entrypoint/prometheus_relabeler.cpp b/pp/entrypoint/prometheus_relabeler.cpp index 859356f0c..c8f38cf8a 100644 --- a/pp/entrypoint/prometheus_relabeler.cpp +++ b/pp/entrypoint/prometheus_relabeler.cpp @@ -869,7 +869,7 @@ extern "C" void prompp_prometheus_per_goroutine_relabeler_append_relabeler_serie } } -void prompp_prometheus_per_goroutine_relabeler_track_stale_nans(void* args) { +extern "C" void prompp_prometheus_per_goroutine_relabeler_track_stale_nans(void* args) { struct Arguments { PromPP::Primitives::Go::SliceView inner_series; StaleNaNsStatePtr stale_nans_state; @@ -879,3 +879,13 @@ void prompp_prometheus_per_goroutine_relabeler_track_stale_nans(void* args) { const auto in = static_cast(args); PromPP::Prometheus::Relabel::PerGoroutineRelabeler::track_stale_nans(in->inner_series, *in->stale_nans_state, in->default_timestamp); } + +extern "C" void prompp_remap_stale_nans_state(void* args) { + struct Arguments { + StaleNaNsStatePtr stale_nans_state; + entrypoint::head::LsIdsSlicePtr dst_src_ls_ids_mapping; + }; + + const auto in = static_cast(args); + in->stale_nans_state->remap(*in->dst_src_ls_ids_mapping); +} diff --git a/pp/entrypoint/prometheus_relabeler.h b/pp/entrypoint/prometheus_relabeler.h index aa9733e64..3494692ff 100644 --- a/pp/entrypoint/prometheus_relabeler.h +++ b/pp/entrypoint/prometheus_relabeler.h @@ -616,6 +616,16 @@ void prompp_prometheus_per_goroutine_relabeler_append_relabeler_series(void* arg */ void prompp_prometheus_per_goroutine_relabeler_track_stale_nans(void* args); +/** + * @brief add stale nans to inner series if needed + * + * @param args { + * stale_nan_state uintptr // pointer to source state + * ls_ids_mapping uintptr // pointer to dst_src_ls_ids_mapping + * } + */ +void prompp_remap_stale_nans_state(void* args); + #ifdef __cplusplus } // extern "C" #endif diff --git a/pp/go/cppbridge/entrypoint.go b/pp/go/cppbridge/entrypoint.go index 0a478df9b..3679cbf86 100644 --- a/pp/go/cppbridge/entrypoint.go +++ b/pp/go/cppbridge/entrypoint.go @@ -457,7 +457,7 @@ var ( ) ) -func freeBytes(b []byte) { +func freeBytes[T any](b []T) { testGC() fastcgo.UnsafeCall1( C.prompp_free_bytes, @@ -1561,18 +1561,35 @@ func primitivesLSSBitsetDtor(bitset uintptr) { // primitivesReadonlyLSSCopyAddedSeries copy the label sets from the source lss to the destination lss // that were added source lss. -func primitivesReadonlyLSSCopyAddedSeries(source, sourceBitset, destination uintptr) { +func primitivesReadonlyLSSCopyAddedSeries(source, sourceBitset, destination uintptr) uintptr { + var dstSrcLsIdsMapping uintptr + C.prompp_primitives_readonly_lss_copy_added_series( C.uint64_t(source), C.uint64_t(sourceBitset), C.uint64_t(destination), + C.uint64_t(uintptr(unsafe.Pointer(&dstSrcLsIdsMapping))), ) + + return dstSrcLsIdsMapping } func primitivesLSSCopyAddedSeries(source, destination uintptr) { C.prompp_primitives_lss_copy_added_series(C.uint64_t(source), C.uint64_t(destination)) } +func primitivesFreeLsIdsMapping(lsIdsMapping uintptr) { + args := struct { + lsIdsMapping uintptr + }{lsIdsMapping} + + testGC() + fastcgo.UnsafeCall1( + C.prompp_primitives_free_ls_ids_mapping, + uintptr(unsafe.Pointer(&args)), + ) +} + // // StatelessRelabeler // @@ -3762,3 +3779,16 @@ func prometheusPerGoroutineRelabelerTrackStaleNans( uintptr(unsafe.Pointer(&args)), ) } + +func prometheusRemapStaleNansState(staleNansState, lsIdsMapping uintptr) { + args := struct { + staleNansState uintptr + lsIdsMapping uintptr + }{staleNansState, lsIdsMapping} + + testGC() + fastcgo.UnsafeCall1( + C.prompp_remap_stale_nans_state, + uintptr(unsafe.Pointer(&args)), + ) +} diff --git a/pp/go/cppbridge/entrypoint.h b/pp/go/cppbridge/entrypoint.h index 736b0cfc8..7d2460232 100755 --- a/pp/go/cppbridge/entrypoint.h +++ b/pp/go/cppbridge/entrypoint.h @@ -694,11 +694,22 @@ void prompp_primitives_lss_bitset_dtor(void* args); * @param source_lss pointer to source label sets; * @param source_bitset pointer to source bitset; * @param destination_lss pointer to destination label sets; + * @param ids_mapping pointer to []uint32; * * @attention This binding used as a CGO call!!! * */ -void prompp_primitives_readonly_lss_copy_added_series(uint64_t source_lss, uint64_t source_bitset, uint64_t destination_lss); +void prompp_primitives_readonly_lss_copy_added_series(uint64_t source_lss, uint64_t source_bitset, uint64_t destination_lss, uint64_t ids_mapping); + +/** + * @brief destroy ls ids mapping + * + * @param args { + * ls_ids_mapping uintptr + * } + * + */ +void prompp_primitives_free_ls_ids_mapping(void* args); #ifdef __cplusplus } // extern "C" @@ -1321,6 +1332,16 @@ void prompp_prometheus_per_goroutine_relabeler_append_relabeler_series(void* arg */ void prompp_prometheus_per_goroutine_relabeler_track_stale_nans(void* args); +/** + * @brief add stale nans to inner series if needed + * + * @param args { + * stale_nan_state uintptr // pointer to source state + * ls_ids_mapping uintptr // pointer to dst_src_ls_ids_mapping + * } + */ +void prompp_remap_stale_nans_state(void* args); + #ifdef __cplusplus } // extern "C" #endif diff --git a/pp/go/cppbridge/lss_snapshot.go b/pp/go/cppbridge/lss_snapshot.go index 8b95d1aca..1fb8f38b3 100644 --- a/pp/go/cppbridge/lss_snapshot.go +++ b/pp/go/cppbridge/lss_snapshot.go @@ -77,11 +77,27 @@ func (lss *LabelSetSnapshot) Query(selector uintptr) *LSSQueryResult { return result } +type IdsMapping struct { + pointer uintptr +} + +func (m *IdsMapping) IsEmpty() bool { + return m.pointer == uintptr(0) +} + // CopyAddedSeries copy the label sets from the source lss to the destination lss // that were added source lss. -func (lss *LabelSetSnapshot) CopyAddedSeries(bitsetSeries *BitsetSeries, destination *LabelSetStorage) { - primitivesReadonlyLSSCopyAddedSeries(lss.pointer, bitsetSeries.pointer, destination.pointer) +func (lss *LabelSetSnapshot) CopyAddedSeries(bitsetSeries *BitsetSeries, destination *LabelSetStorage) *IdsMapping { + idsMapping := &IdsMapping{ + pointer: primitivesReadonlyLSSCopyAddedSeries(lss.pointer, bitsetSeries.pointer, destination.pointer), + } + runtime.SetFinalizer(idsMapping, func(idsMapping *IdsMapping) { + primitivesFreeLsIdsMapping(idsMapping.pointer) + }) + runtime.KeepAlive(lss) runtime.KeepAlive(bitsetSeries) runtime.KeepAlive(destination) + + return idsMapping } diff --git a/pp/go/cppbridge/prometheus_relabeler.go b/pp/go/cppbridge/prometheus_relabeler.go index 39677a1c9..1e4e0e0c7 100644 --- a/pp/go/cppbridge/prometheus_relabeler.go +++ b/pp/go/cppbridge/prometheus_relabeler.go @@ -7,6 +7,7 @@ import ( "math" "regexp" "runtime" + "slices" "strconv" "strings" "sync" @@ -556,6 +557,12 @@ func NewStaleNansState() *StaleNansState { return s } +func (s *StaleNansState) Remap(mapping *IdsMapping) { + prometheusRemapStaleNansState(s.state, mapping.pointer) + runtime.KeepAlive(s) + runtime.KeepAlive(mapping) +} + // RelabelerStats statistics return from relabeler. type RelabelerStats struct { SamplesAdded uint32 @@ -1614,11 +1621,14 @@ func (s *StateV2) EnableTrackStaleness() { func (s *StateV2) Reconfigure( generationHead uint64, numberOfShards uint16, + staleNansIdsMappings []*IdsMapping, ) { if s.status&inited == inited && generationHead == s.generationHead { return } + remapStaleNansState := (generationHead-s.generationHead == 1) && (int(numberOfShards) == len(s.staleNansStates)) + // long way s.locker.Lock() @@ -1637,7 +1647,7 @@ func (s *StateV2) Reconfigure( } s.resetCaches(numberOfShards) - s.resetStaleNansStates(numberOfShards) + s.resetStaleNansStates(numberOfShards, remapStaleNansState, staleNansIdsMappings) s.status |= inited s.generationHead = generationHead @@ -1721,7 +1731,7 @@ func (s *StateV2) resetCaches(numberOfShards uint16) { } // resetStaleNansStates recreate StaleNansStates. -func (s *StateV2) resetStaleNansStates(numberOfShards uint16) { +func (s *StateV2) resetStaleNansStates(numberOfShards uint16, remapStaleNansState bool, staleNansIdsMappings []*IdsMapping) { if !s.trackStaleness { return } @@ -1736,10 +1746,14 @@ func (s *StateV2) resetStaleNansStates(numberOfShards uint16) { s.staleNansStates = s.staleNansStates[:numberOfShards] case len(s.staleNansStates) < int(numberOfShards): // grow - s.staleNansStates = make([]*StaleNansState, numberOfShards) + s.staleNansStates = slices.Grow(s.staleNansStates, int(numberOfShards)-len(s.staleNansStates))[:numberOfShards] } for shardID := range s.staleNansStates { - s.staleNansStates[shardID] = NewStaleNansState() + if remapStaleNansState && staleNansIdsMappings[shardID] != nil && !staleNansIdsMappings[shardID].IsEmpty() { + s.staleNansStates[shardID].Remap(staleNansIdsMappings[shardID]) + } else { + s.staleNansStates[shardID] = NewStaleNansState() + } } } diff --git a/pp/go/cppbridge/prometheus_relabeler_test.go b/pp/go/cppbridge/prometheus_relabeler_test.go index 1d90246ea..3e3040b47 100644 --- a/pp/go/cppbridge/prometheus_relabeler_test.go +++ b/pp/go/cppbridge/prometheus_relabeler_test.go @@ -10,6 +10,7 @@ import ( "github.com/cespare/xxhash/v2" "github.com/golang/snappy" "github.com/prometheus/prometheus/pp/go/cppbridge" + "github.com/prometheus/prometheus/pp/go/model" "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/suite" "gopkg.in/yaml.v3" @@ -674,7 +675,7 @@ func (s *PerGoroutineRelabelerSuite) TestRelabeling() { state := cppbridge.NewStateV2WithoutLock() state.SetRelabelerOptions(&s.options) state.SetStatelessRelabeler(statelessRelabeler) - state.Reconfigure(0, s.numberOfShards) + state.Reconfigure(0, s.numberOfShards, make([]*cppbridge.IdsMapping, s.numberOfShards)) pgr := cppbridge.NewPerGoroutineRelabeler(s.numberOfShards, 0) stats, hasReallocations, err := pgr.Relabeling( @@ -725,7 +726,7 @@ func (s *PerGoroutineRelabelerSuite) TestRelabelingDrop() { state := cppbridge.NewStateV2WithoutLock() state.SetRelabelerOptions(&s.options) state.SetStatelessRelabeler(statelessRelabeler) - state.Reconfigure(0, s.numberOfShards) + state.Reconfigure(0, s.numberOfShards, make([]*cppbridge.IdsMapping, s.numberOfShards)) pgr := cppbridge.NewPerGoroutineRelabeler(s.numberOfShards, 0) stats, hasReallocations, err := pgr.Relabeling( @@ -768,7 +769,7 @@ func (s *PerGoroutineRelabelerSuite) TestRelabelingFromCacheTrue() { state := cppbridge.NewStateV2WithoutLock() state.SetStatelessRelabeler(statelessRelabeler) state.SetRelabelerOptions(&s.options) - state.Reconfigure(0, s.numberOfShards) + state.Reconfigure(0, s.numberOfShards, make([]*cppbridge.IdsMapping, s.numberOfShards)) pgr := cppbridge.NewPerGoroutineRelabeler(s.numberOfShards, 0) stats, hasReallocations, err := pgr.Relabeling( @@ -826,7 +827,7 @@ func (s *PerGoroutineRelabelerSuite) TestRelabelingFromCacheFalse() { shardsInnerSeries := cppbridge.NewShardsInnerSeries(s.numberOfShards) state := cppbridge.NewStateV2WithoutLock() state.SetRelabelerOptions(&s.options) - state.Reconfigure(0, s.numberOfShards) + state.Reconfigure(0, s.numberOfShards, make([]*cppbridge.IdsMapping, s.numberOfShards)) pgr := cppbridge.NewPerGoroutineRelabeler(s.numberOfShards, 0) stats, ok, err := pgr.RelabelingFromCache( @@ -992,7 +993,7 @@ func (s *PerGoroutineRelabelerSuite) TestRelabelingFromCachePartially() { state := cppbridge.NewStateV2WithoutLock() state.SetRelabelerOptions(&s.options) state.SetStatelessRelabeler(statelessRelabeler) - state.Reconfigure(0, s.numberOfShards) + state.Reconfigure(0, s.numberOfShards, make([]*cppbridge.IdsMapping, s.numberOfShards)) pgr := cppbridge.NewPerGoroutineRelabeler(s.numberOfShards, 0) stats, hasReallocations, err := pgr.Relabeling( @@ -1066,7 +1067,7 @@ func (s *PerGoroutineRelabelerSuite) TestRelabelingTransition() { shardsRelabeledSeries := cppbridge.NewShardsRelabeledSeries(s.numberOfShards) state := cppbridge.NewTransitionStateV2() - state.Reconfigure(0, s.numberOfShards) + state.Reconfigure(0, s.numberOfShards, make([]*cppbridge.IdsMapping, s.numberOfShards)) pgr := cppbridge.NewPerGoroutineRelabeler(s.numberOfShards, 0) stats, hasReallocations, err := pgr.Relabeling( @@ -1104,7 +1105,7 @@ func (s *PerGoroutineRelabelerSuite) TestRelabelingFromCacheTrueTransition() { shardsRelabeledSeries := cppbridge.NewShardsRelabeledSeries(s.numberOfShards) state := cppbridge.NewTransitionStateV2() - state.Reconfigure(0, s.numberOfShards) + state.Reconfigure(0, s.numberOfShards, make([]*cppbridge.IdsMapping, s.numberOfShards)) pgr := cppbridge.NewPerGoroutineRelabeler(s.numberOfShards, 0) stats, hasReallocations, err := pgr.Relabeling( @@ -1161,7 +1162,7 @@ func (s *PerGoroutineRelabelerSuite) TestRelabelingFromCacheFalseTransition() { shardsInnerSeries := cppbridge.NewShardsInnerSeries(s.numberOfShards) state := cppbridge.NewTransitionStateV2() - state.Reconfigure(0, s.numberOfShards) + state.Reconfigure(0, s.numberOfShards, make([]*cppbridge.IdsMapping, s.numberOfShards)) pgr := cppbridge.NewPerGoroutineRelabeler(s.numberOfShards, 0) stats, ok, err := pgr.RelabelingFromCache( @@ -1322,7 +1323,7 @@ func (s *PerGoroutineRelabelerSuite) TestRelabelingFromCachePartiallyTransition( shardsRelabeledSeries := cppbridge.NewShardsRelabeledSeries(s.numberOfShards) state := cppbridge.NewTransitionStateV2() - state.Reconfigure(0, s.numberOfShards) + state.Reconfigure(0, s.numberOfShards, make([]*cppbridge.IdsMapping, s.numberOfShards)) pgr := cppbridge.NewPerGoroutineRelabeler(s.numberOfShards, 0) stats, hasReallocations, err := pgr.Relabeling( @@ -1402,7 +1403,7 @@ func (s *PerGoroutineRelabelerSuite) TestRelabelingWithStalenans() { state.SetRelabelerOptions(&s.options) state.SetStatelessRelabeler(statelessRelabeler) state.EnableTrackStaleness() - state.Reconfigure(0, s.numberOfShards) + state.Reconfigure(0, s.numberOfShards, make([]*cppbridge.IdsMapping, s.numberOfShards)) pgr := cppbridge.NewPerGoroutineRelabeler(s.numberOfShards, 0) stats, hasReallocations, err := pgr.Relabeling( @@ -1469,7 +1470,7 @@ func (s *PerGoroutineRelabelerSuite) TestRelabelingWithStalenansFromCacheTrue() state.SetStatelessRelabeler(statelessRelabeler) state.SetRelabelerOptions(&s.options) state.EnableTrackStaleness() - state.Reconfigure(0, s.numberOfShards) + state.Reconfigure(0, s.numberOfShards, make([]*cppbridge.IdsMapping, s.numberOfShards)) pgr := cppbridge.NewPerGoroutineRelabeler(s.numberOfShards, 0) stats, hasReallocations, err := pgr.Relabeling( @@ -1549,7 +1550,7 @@ func (s *PerGoroutineRelabelerSuite) TestRelabelingWithStalenansFromCacheFalse() state := cppbridge.NewStateV2WithoutLock() state.SetRelabelerOptions(&s.options) state.EnableTrackStaleness() - state.Reconfigure(0, s.numberOfShards) + state.Reconfigure(0, s.numberOfShards, make([]*cppbridge.IdsMapping, s.numberOfShards)) pgr := cppbridge.NewPerGoroutineRelabeler(s.numberOfShards, 0) stats, ok, err := pgr.RelabelingFromCache( @@ -1717,7 +1718,7 @@ func (s *PerGoroutineRelabelerSuite) TestRelabelingWithStalenansFromCachePartial state.SetRelabelerOptions(&s.options) state.SetStatelessRelabeler(statelessRelabeler) state.EnableTrackStaleness() - state.Reconfigure(0, s.numberOfShards) + state.Reconfigure(0, s.numberOfShards, make([]*cppbridge.IdsMapping, s.numberOfShards)) pgr := cppbridge.NewPerGoroutineRelabeler(s.numberOfShards, 0) stats, hasReallocations, err := pgr.Relabeling( @@ -1831,7 +1832,7 @@ func (s *StateV2Suite) TestStateReconfigure() { } func (s *StateV2Suite) stateReconfigure(state *cppbridge.StateV2) { - state.Reconfigure(0, 1) + state.Reconfigure(0, 1, nil) s.NotNil(state.CacheByShard(0)) s.False(state.TrackStaleness()) @@ -1844,12 +1845,12 @@ func (s *StateV2Suite) TestStateReconfigureWithoutReconfigure() { } func (s *StateV2Suite) stateReconfigureWithoutReconfigure(state *cppbridge.StateV2) { - state.Reconfigure(0, 1) + state.Reconfigure(0, 1, nil) cache1 := state.CacheByShard(0) s.NotNil(cache1) - state.Reconfigure(0, 1) + state.Reconfigure(0, 1, nil) cache2 := state.CacheByShard(0) s.NotNil(cache2) s.Equal(cache1, cache2) @@ -1862,14 +1863,14 @@ func (s *StateV2Suite) TestStateReconfigureNumberOfShards() { func (s *StateV2Suite) stateReconfigureNumberOfShards(state *cppbridge.StateV2) { state.EnableTrackStaleness() - state.Reconfigure(0, 2) + state.Reconfigure(0, 2, make([]*cppbridge.IdsMapping, 2)) cache0 := state.CacheByShard(0) s.NotNil(cache0) cache1 := state.CacheByShard(1) s.NotNil(cache1) - state.Reconfigure(1, 1) + state.Reconfigure(1, 1, make([]*cppbridge.IdsMapping, 1)) newCache0 := state.CacheByShard(0) s.NotNil(newCache0) s.NotEqual(cache0, newCache0) @@ -1883,7 +1884,7 @@ func (s *StateV2Suite) TestStateReconfigureTrackStaleness() { func (s *StateV2Suite) stateReconfigureTrackStaleness(state *cppbridge.StateV2) { state.EnableTrackStaleness() - state.Reconfigure(0, 1) + state.Reconfigure(0, 1, make([]*cppbridge.IdsMapping, 1)) s.NotNil(state.CacheByShard(0)) s.True(state.TrackStaleness()) @@ -1924,7 +1925,7 @@ func (s *StateV2Suite) TestStateTransitionReconfigure() { } func (s *StateV2Suite) stateTransitionReconfigure(state *cppbridge.StateV2) { - state.Reconfigure(0, 1) + state.Reconfigure(0, 1, make([]*cppbridge.IdsMapping, 1)) s.False(state.TrackStaleness()) s.Panics(func() { state.CacheByShard(0) }) @@ -1938,7 +1939,7 @@ func (s *StateV2Suite) TestStateTransitionReconfigureTrackStaleness() { func (s *StateV2Suite) stateTransitionReconfigureTrackStaleness(state *cppbridge.StateV2) { s.Panics(func() { state.EnableTrackStaleness() }) - state.Reconfigure(0, 1) + state.Reconfigure(0, 1, make([]*cppbridge.IdsMapping, 1)) s.False(state.TrackStaleness()) s.Panics(func() { state.CacheByShard(0) }) @@ -1959,3 +1960,85 @@ func (s *StateV2Suite) statelessRelabelerTransition(state *cppbridge.StateV2) { s.Panics(func() { state.SetStatelessRelabeler(statelessRelabeler) }) s.Panics(func() { state.StatelessRelabeler() }) } + +func (s *StateV2Suite) createIdsMapping() *cppbridge.IdsMapping { + lss1 := cppbridge.NewQueryableLssStorage() + lss1.FindOrEmplace(model.NewLabelSetBuilder().Set("job", "1").Build()) + snapshot := lss1.CreateLabelSetSnapshot() + + return snapshot.CopyAddedSeries(lss1.BitsetSeries(), cppbridge.NewQueryableLssStorage()) +} + +func (s *StateV2Suite) TestReconfigureWithInvalidOrderOfGenerationHead() { + // Arrange + state := cppbridge.NewStateV2() + state.EnableTrackStaleness() + state.Reconfigure(0, 1, nil) + staleNanState := state.StaleNansStateByShard(0) + + // Act + state.Reconfigure(2, 1, []*cppbridge.IdsMapping{s.createIdsMapping()}) + + // Assert + s.NotEqual(unsafe.Pointer(staleNanState), unsafe.Pointer(state.StaleNansStateByShard(0))) +} + +func (s *StateV2Suite) TestReconfigureWithShardNumberChange() { + // Arrange + state := cppbridge.NewStateV2() + state.EnableTrackStaleness() + state.Reconfigure(0, 1, nil) + staleNanState := state.StaleNansStateByShard(0) + + // Act + state.Reconfigure(1, 2, []*cppbridge.IdsMapping{s.createIdsMapping()}) + + // Assert + s.NotEqual(unsafe.Pointer(staleNanState), unsafe.Pointer(state.StaleNansStateByShard(0))) +} + +func (s *StateV2Suite) TestReconfigureWithNilIdsMapping() { + // Arrange + state := cppbridge.NewStateV2() + state.EnableTrackStaleness() + state.Reconfigure(0, 1, nil) + staleNanState := state.StaleNansStateByShard(0) + + // Act + state.Reconfigure(1, 2, []*cppbridge.IdsMapping{nil, nil}) + + // Assert + s.NotEqual(unsafe.Pointer(staleNanState), unsafe.Pointer(state.StaleNansStateByShard(0))) +} + +func (s *StateV2Suite) TestReconfigureWithEmptyIdsMapping() { + // Arrange + state := cppbridge.NewStateV2() + state.EnableTrackStaleness() + state.Reconfigure(0, 1, nil) + staleNanState := state.StaleNansStateByShard(0) + + // Act + state.Reconfigure(1, 1, []*cppbridge.IdsMapping{{}, {}}) + + // Assert + s.NotEqual(unsafe.Pointer(staleNanState), unsafe.Pointer(state.StaleNansStateByShard(0))) +} + +func (s *StateV2Suite) TestReconfigureWithRemap() { + // Arrange + state := cppbridge.NewStateV2() + state.EnableTrackStaleness() + state.Reconfigure(0, 2, nil) + staleNanState1 := state.StaleNansStateByShard(0) + staleNanState2 := state.StaleNansStateByShard(1) + + mapping := s.createIdsMapping() + + // Act + state.Reconfigure(1, 2, []*cppbridge.IdsMapping{mapping, mapping}) + + // Assert + s.Equal(unsafe.Pointer(staleNanState1), unsafe.Pointer(state.StaleNansStateByShard(0))) + s.Equal(unsafe.Pointer(staleNanState2), unsafe.Pointer(state.StaleNansStateByShard(1))) +} diff --git a/pp/go/storage/appender/appender.go b/pp/go/storage/appender/appender.go index af2609b50..8b9f11679 100644 --- a/pp/go/storage/appender/appender.go +++ b/pp/go/storage/appender/appender.go @@ -60,6 +60,9 @@ type Shard interface { // WalWrite append the incoming inner series to wal encoder. WalWrite(innerSeriesSlice []*cppbridge.InnerSeries) (bool, error) + + // DstSrcLsIdsMapping return ids mapping after lss copying + DstSrcLsIdsMapping() *cppbridge.IdsMapping } // @@ -404,7 +407,12 @@ func (a *Appender[TTask, TShard, TGoroutineShard, THead]) resolveState(state *cp return errNilState } - state.Reconfigure(a.head.Generation(), a.head.NumberOfShards()) + staleNansIdsMappings := make([]*cppbridge.IdsMapping, 0, a.head.NumberOfShards()) + for shard := range a.head.RangeShards() { + staleNansIdsMappings = append(staleNansIdsMappings, shard.DstSrcLsIdsMapping()) + } + + state.Reconfigure(a.head.Generation(), a.head.NumberOfShards(), staleNansIdsMappings) return nil } diff --git a/pp/go/storage/head/shard/lss.go b/pp/go/storage/head/shard/lss.go index 6a30a0d16..6cef309e9 100644 --- a/pp/go/storage/head/shard/lss.go +++ b/pp/go/storage/head/shard/lss.go @@ -11,11 +11,12 @@ import ( // LSS labelset storage for [shard]. type LSS struct { - input *cppbridge.LabelSetStorage - target *cppbridge.LabelSetStorage - snapshot *cppbridge.LabelSetSnapshot - locker sync.RWMutex - once sync.Once + input *cppbridge.LabelSetStorage + target *cppbridge.LabelSetStorage + snapshot *cppbridge.LabelSetSnapshot + dstSrcLsIdsMapping *cppbridge.IdsMapping + locker sync.RWMutex + once sync.Once } // NewLSS init new [LSS]. @@ -42,7 +43,7 @@ func (l *LSS) CopyAddedSeriesTo(destination *LSS) { bitsetSeries := l.target.BitsetSeries() l.locker.RUnlock() - snapshot.CopyAddedSeries(bitsetSeries, destination.target) + destination.dstSrcLsIdsMapping = snapshot.CopyAddedSeries(bitsetSeries, destination.target) } // Input returns input lss. diff --git a/pp/go/storage/head/shard/shard.go b/pp/go/storage/head/shard/shard.go index 851b21de2..ce1de1ef7 100644 --- a/pp/go/storage/head/shard/shard.go +++ b/pp/go/storage/head/shard/shard.go @@ -230,6 +230,11 @@ func (s *Shard) LoadAndQuerySeriesData() (err error) { return nil } +// DstSrcLsIdsMapping return ids mapping after lss copying +func (s *Shard) DstSrcLsIdsMapping() *cppbridge.IdsMapping { + return s.lss.dstSrcLsIdsMapping +} + // // PerGoroutineShard // diff --git a/pp/performance_tests/benchmarks/queryable_encoding_bimap_copy_benchmark.cpp b/pp/performance_tests/benchmarks/queryable_encoding_bimap_copy_benchmark.cpp index ddd962f36..48eb766a1 100644 --- a/pp/performance_tests/benchmarks/queryable_encoding_bimap_copy_benchmark.cpp +++ b/pp/performance_tests/benchmarks/queryable_encoding_bimap_copy_benchmark.cpp @@ -17,8 +17,8 @@ namespace { using Lss = series_index::QueryableEncodingBimap; -template -using LssCopier = series_index::QueryableEncodingBimapCopier; +template +using LssCopier = series_index::QueryableEncodingBimapCopier; std::string GetWalFileName() { if (auto& context = benchmark::internal::GetGlobalContext(); context != nullptr) { @@ -34,7 +34,7 @@ void mark_all_series_as_added(const std::shared_ptr& lss) { } std::shared_ptr LoadLssFromFile() { - auto file_name = GetWalFileName(); + const auto file_name = GetWalFileName(); auto lss = std::make_shared(); std::ifstream istrm(file_name, std::ios::binary); @@ -58,7 +58,8 @@ void BM_CopyAllStepsWithTiming(benchmark::State& state) { for ([[maybe_unused]] auto _ : state) { Lss lss_copy; - LssCopier copier(*lss, lss->sorting_index(), lss->added_series(), lss_copy); + BareBones::Vector dst_src_ids_mapping; + LssCopier copier(*lss, lss->sorting_index(), lss->added_series(), lss_copy, dst_src_ids_mapping); { auto start = steady_clock::now(); diff --git a/pp/prometheus/relabeler.h b/pp/prometheus/relabeler.h index 367752384..0139f42e3 100644 --- a/pp/prometheus/relabeler.h +++ b/pp/prometheus/relabeler.h @@ -278,6 +278,24 @@ class StaleNaNsState { previous_ = std::move(current); } + template + void remap(const Container& dst_src_ls_ids_mapping) { + roaring::Roaring new_state; + + uint32_t dst_ls_id = 0; + for (auto src_ls_id : dst_src_ls_ids_mapping) { + if (previous_.contains(src_ls_id)) { + new_state.add(dst_ls_id); + } + + ++dst_ls_id; + } + + previous_ = std::move(new_state); + } + + [[nodiscard]] PROMPP_ALWAYS_INLINE const roaring::Roaring& state() const noexcept { return previous_; } + private: roaring::Roaring previous_; }; diff --git a/pp/prometheus/tests/relabeler_tests.cpp b/pp/prometheus/tests/relabeler_tests.cpp index 8c3dea8a6..1e2c7a145 100644 --- a/pp/prometheus/tests/relabeler_tests.cpp +++ b/pp/prometheus/tests/relabeler_tests.cpp @@ -1,10 +1,10 @@ -#include -#include #include #include #include #include +#include + #include "primitives/label_set.h" #include "primitives/snug_composites.h" #include "prometheus/relabeler.h" @@ -841,4 +841,41 @@ TEST_F(StaleNaNsStateDeprecatedFixture, Swap) { EXPECT_EQ(std::vector{}, target_ls_ids); } +class StaleNaNsStateFixture : public testing::Test { + protected: + StaleNaNsState state_; +}; + +TEST_F(StaleNaNsStateFixture, TestRemapOnEmptyState) { + // Arrange + + // Act + state_.remap(std::initializer_list{3U, 1U, 0U, 2U}); + + // Assert + EXPECT_EQ((std::initializer_list{}), state_.state()); +} + +TEST_F(StaleNaNsStateFixture, TestRemapWithFullFilledState) { + // Arrange + state_.swap(roaring::Roaring{0, 1, 2, 3}, [](auto&) {}); + + // Act + state_.remap(std::initializer_list{3U, 1U, 0U, 2U}); + + // Assert + EXPECT_EQ((roaring::Roaring{0, 1, 2, 3}), state_.state()); +} + +TEST_F(StaleNaNsStateFixture, TestRemapWithPartialFilledState) { + // Arrange + state_.swap(roaring::Roaring{0, 3}, [](auto&) {}); + + // Act + state_.remap(std::initializer_list{3U, 1U, 0U, 2U}); + + // Assert + EXPECT_EQ((roaring::Roaring{0, 2}), state_.state()); +} + } // namespace diff --git a/pp/series_index/queryable_encoding_bimap.h b/pp/series_index/queryable_encoding_bimap.h index 06c06a273..96b83afd2 100644 --- a/pp/series_index/queryable_encoding_bimap.h +++ b/pp/series_index/queryable_encoding_bimap.h @@ -85,7 +85,7 @@ class QueryableEncodingBimap final : public BareBones::SnugComposite::GenericDec private: using LabelSet = typename Base::value_type; - template + template friend class QueryableEncodingBimapCopier; TrieIndex trie_index_; @@ -140,7 +140,7 @@ class QueryableEncodingBimap final : public BareBones::SnugComposite::GenericDec PROMPP_ALWAYS_INLINE static size_t phmap_hash(size_t hash) noexcept { return phmap::phmap_mix()(hash); } }; -template +template class QueryableEncodingBimapCopier { public: static constexpr auto kInvalidIdFillByte = static_cast(DecodingTable::kInvalidId); @@ -180,24 +180,25 @@ class QueryableEncodingBimapCopier { QueryableEncodingBimapCopier(const DecodingTable& source, const SortingIndex& sorting_index, const SeriesIds& ls_id_range, - QueryableEncodingBimap& destination) - : source_(source), sorting_index_(sorting_index), destination_(destination), ls_id_range_(ls_id_range) { + QueryableEncodingBimap& destination, + LsIdVector& dst_src_ids_mapping) + : source_(source), sorting_index_(sorting_index), ls_id_range_(ls_id_range), destination_(destination), dst_src_ids_mapping_(dst_src_ids_mapping) { assert(destination.size() == 0); } void copy_added_series() { - BareBones::Vector ids; - ids.reserve(source_.size()); - std::ranges::copy(ls_id_range_.begin(), ls_id_range_.end(), std::back_inserter(ids)); + dst_src_ids_mapping_.clear(); + dst_src_ids_mapping_.reserve(source_.size()); + std::ranges::copy(ls_id_range_.begin(), ls_id_range_.end(), std::back_inserter(dst_src_ids_mapping_)); - sorting_index_.sort(ids); + sorting_index_.sort(dst_src_ids_mapping_); Cache cache; cache.reserve(source_.data().label_name_sets_table.size(), source_.data().label_name_sets_table.data().symbols_table.size(), source_.data().symbols_tables); destination_.reserve(source_); - for (const auto ls_id : ids) { + for (const auto ls_id : dst_src_ids_mapping_) { destination_.items_.emplace_back(destination_.data_, source_[ls_id], cache); } } @@ -252,8 +253,9 @@ class QueryableEncodingBimapCopier { private: const DecodingTable& source_; const SortingIndex& sorting_index_; - QueryableEncodingBimap& destination_; const SeriesIds& ls_id_range_; + QueryableEncodingBimap& destination_; + LsIdVector& dst_src_ids_mapping_; }; } // namespace series_index diff --git a/pp/series_index/tests/queryable_encoding_bimap_tests.cpp b/pp/series_index/tests/queryable_encoding_bimap_tests.cpp index 2b723427f..fba032ea4 100644 --- a/pp/series_index/tests/queryable_encoding_bimap_tests.cpp +++ b/pp/series_index/tests/queryable_encoding_bimap_tests.cpp @@ -15,8 +15,8 @@ using series_index::SeriesReverseIndex; using series_index::trie::CedarMatchesList; using series_index::trie::CedarTrie; -template -using Copier = QueryableEncodingBimapCopier; +template +using Copier = QueryableEncodingBimapCopier; class QueryableEncodingBimapFixture : public testing::Test { protected: @@ -119,18 +119,22 @@ TEST_F(QueryableEncodingBimapFixture, EmplaceDuplicatedLabelSet) { EXPECT_NE(ls_id1, ls_id2); } -class QueryableEncodingBimapCopierFixture : public QueryableEncodingBimapFixture {}; +class QueryableEncodingBimapCopierFixture : public QueryableEncodingBimapFixture { + protected: + BareBones::Vector dst_src_ids_mapping_; +}; TEST_F(QueryableEncodingBimapCopierFixture, EmptyLss) { // Arrange Lss lss_copy; - Copier copier(lss_, lss_.sorting_index(), lss_.added_series(), lss_copy); + Copier copier(lss_, lss_.sorting_index(), lss_.added_series(), lss_copy, dst_src_ids_mapping_); // Act copier.copy_added_series_and_build_indexes(); // Assert EXPECT_EQ(0U, lss_copy.size()); + EXPECT_EQ(0U, dst_src_ids_mapping_.size()); } TEST_F(QueryableEncodingBimapCopierFixture, NonEmptyLss) { @@ -144,7 +148,7 @@ TEST_F(QueryableEncodingBimapCopierFixture, NonEmptyLss) { lss_.build_deferred_indexes(); Lss lss_copy; - Copier copier(lss_, lss_.sorting_index(), lss_.added_series(), lss_copy); + Copier copier(lss_, lss_.sorting_index(), lss_.added_series(), lss_copy, dst_src_ids_mapping_); // Act copier.copy_added_series_and_build_indexes(); @@ -180,7 +184,7 @@ TEST_F(QueryableEncodingBimapCopierFixture, NonEmptyLssKeepOrder) { lss_.build_deferred_indexes(); Lss lss_copy; - Copier copier(lss_, lss_.sorting_index(), lss_.added_series(), lss_copy); + Copier copier(lss_, lss_.sorting_index(), lss_.added_series(), lss_copy, dst_src_ids_mapping_); // Act copier.copy_added_series_and_build_indexes(); @@ -194,8 +198,8 @@ TEST_F(QueryableEncodingBimapCopierFixture, CopyOfCopy) { // Arrange Lss lss_copy; Lss lss_copy_of_copy; - Copier copier(lss_, lss_.sorting_index(), lss_.added_series(), lss_copy); - Copier copier2(lss_copy, lss_copy.sorting_index(), lss_copy.added_series(), lss_copy_of_copy); + Copier copier(lss_, lss_.sorting_index(), lss_.added_series(), lss_copy, dst_src_ids_mapping_); + Copier copier2(lss_copy, lss_copy.sorting_index(), lss_copy.added_series(), lss_copy_of_copy, dst_src_ids_mapping_); const auto label_set = LabelViewSet{{"job", "cron"}, {"key", ""}, {"process", "php"}}; const auto label_set2 = LabelViewSet{{"job", "cron"}, {"key", ""}, {"process", "php1"}};