From a7107ce4d42287beef3c49b44c8b556a51cee91d Mon Sep 17 00:00:00 2001 From: Victor Martinez Date: Thu, 30 Jan 2025 20:36:46 +0100 Subject: [PATCH 1/5] github-actions: rename github secrets (#15486) --- .github/workflows/benchmarks.yml | 4 ++-- .github/workflows/run-major-release.yml | 4 ++-- .github/workflows/run-minor-release.yml | 4 ++-- .github/workflows/run-patch-release.yml | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml index c0d7214a36c..fc10ebd02af 100644 --- a/.github/workflows/benchmarks.yml +++ b/.github/workflows/benchmarks.yml @@ -234,8 +234,8 @@ jobs: - name: Import GPG key uses: crazy-max/ghaction-import-gpg@cb9bde2e2525e640591a934b1fd28eef1dcaf5e5 # v6.2.0 with: - gpg_private_key: ${{ secrets.APM_SERVER_RELEASE_GPG_PRIVATE_KEY }} - passphrase: ${{ secrets.APM_SERVER_RELEASE_PASSPHRASE }} + gpg_private_key: ${{ secrets.GPG_PRIVATE_KEY }} + passphrase: ${{ secrets.GPG_PASSPHRASE }} git_user_signingkey: true git_commit_gpgsign: true diff --git a/.github/workflows/run-major-release.yml b/.github/workflows/run-major-release.yml index 016e870b08a..94ec2505598 100644 --- a/.github/workflows/run-major-release.yml +++ b/.github/workflows/run-major-release.yml @@ -85,8 +85,8 @@ jobs: - name: Import GPG key uses: crazy-max/ghaction-import-gpg@cb9bde2e2525e640591a934b1fd28eef1dcaf5e5 # v6.2.0 with: - gpg_private_key: ${{ secrets.APM_SERVER_RELEASE_GPG_PRIVATE_KEY }} - passphrase: ${{ secrets.APM_SERVER_RELEASE_PASSPHRASE }} + gpg_private_key: ${{ secrets.GPG_PRIVATE_KEY }} + passphrase: ${{ secrets.GPG_PASSPHRASE }} git_user_signingkey: true git_commit_gpgsign: true diff --git a/.github/workflows/run-minor-release.yml b/.github/workflows/run-minor-release.yml index 9776fea8185..8783fefc018 100644 --- a/.github/workflows/run-minor-release.yml +++ b/.github/workflows/run-minor-release.yml @@ -85,8 +85,8 @@ jobs: - name: Import GPG key uses: crazy-max/ghaction-import-gpg@cb9bde2e2525e640591a934b1fd28eef1dcaf5e5 # v6.2.0 with: - gpg_private_key: ${{ secrets.APM_SERVER_RELEASE_GPG_PRIVATE_KEY }} - passphrase: ${{ secrets.APM_SERVER_RELEASE_PASSPHRASE }} + gpg_private_key: ${{ secrets.GPG_PRIVATE_KEY }} + passphrase: ${{ secrets.GPG_PASSPHRASE }} git_user_signingkey: true git_commit_gpgsign: true diff --git a/.github/workflows/run-patch-release.yml b/.github/workflows/run-patch-release.yml index 4afe520578a..5ede896b06c 100644 --- a/.github/workflows/run-patch-release.yml +++ b/.github/workflows/run-patch-release.yml @@ -78,8 +78,8 @@ jobs: - name: Import GPG key uses: crazy-max/ghaction-import-gpg@cb9bde2e2525e640591a934b1fd28eef1dcaf5e5 # v6.2.0 with: - gpg_private_key: ${{ secrets.APM_SERVER_RELEASE_GPG_PRIVATE_KEY }} - passphrase: ${{ secrets.APM_SERVER_RELEASE_PASSPHRASE }} + gpg_private_key: ${{ secrets.GPG_PRIVATE_KEY }} + passphrase: ${{ secrets.GPG_PASSPHRASE }} git_user_signingkey: true git_commit_gpgsign: true From f281484ebdaf50b91a4646a7c887363c399331ef Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 30 Jan 2025 19:49:26 +0000 Subject: [PATCH 2/5] build(deps): bump github.com/spf13/pflag from 1.0.5 to 1.0.6 in the spf13 group (#15484) * build(deps): bump github.com/spf13/pflag in the spf13 group Bumps the spf13 group with 1 update: [github.com/spf13/pflag](https://github.com/spf13/pflag). Updates `github.com/spf13/pflag` from 1.0.5 to 1.0.6 - [Release notes](https://github.com/spf13/pflag/releases) - [Commits](https://github.com/spf13/pflag/compare/v1.0.5...v1.0.6) --- updated-dependencies: - dependency-name: github.com/spf13/pflag dependency-type: direct:production update-type: version-update:semver-patch dependency-group: spf13 ... Signed-off-by: dependabot[bot] * Update NOTICE.txt --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- NOTICE.txt | 4 ++-- go.mod | 2 +- go.sum | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/NOTICE.txt b/NOTICE.txt index 30ab989e0ec..c027c4777bc 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -3289,11 +3289,11 @@ Contents of probable licence file $GOMODCACHE/github.com/spf13/cobra@v1.8.1/LICE -------------------------------------------------------------------------------- Dependency : github.com/spf13/pflag -Version: v1.0.5 +Version: v1.0.6 Licence type (autodetected): BSD-3-Clause -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/spf13/pflag@v1.0.5/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/spf13/pflag@v1.0.6/LICENSE: Copyright (c) 2012 Alex Ogier. All rights reserved. Copyright (c) 2012 The Go Authors. All rights reserved. diff --git a/go.mod b/go.mod index 7c5ce74a349..c511c941d92 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/ryanuber/go-glob v1.0.0 github.com/spf13/cobra v1.8.1 - github.com/spf13/pflag v1.0.5 + github.com/spf13/pflag v1.0.6 github.com/stretchr/testify v1.10.0 go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.3 go.elastic.co/apm/module/apmgorilla/v2 v2.6.3 diff --git a/go.sum b/go.sum index b53c9a1de8b..db142ee94b5 100644 --- a/go.sum +++ b/go.sum @@ -360,8 +360,9 @@ github.com/shirou/gopsutil/v4 v4.24.9/go.mod h1:3fkaHNeYsUFCGZ8+9vZVWtbyM1k2eRnl github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= -github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= +github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= From dcb08ac9a356743407ab32c6a44882598b54dfe9 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Thu, 30 Jan 2025 19:59:17 +0000 Subject: [PATCH 3/5] TBS: make storage_limit follow processor lifecycle; update TBS processor config (#15488) Fix a regression from #15235 where storage_limit does not follow processor lifecycle. Remove storage limit from processor config. Add storage to processor config validation. --- x-pack/apm-server/main.go | 3 +- x-pack/apm-server/sampling/config.go | 6 +- x-pack/apm-server/sampling/config_test.go | 3 + x-pack/apm-server/sampling/eventstorage/rw.go | 25 +++++++- .../sampling/eventstorage/rw_test.go | 2 +- .../eventstorage/storage_bench_test.go | 10 ++-- .../sampling/eventstorage/storage_manager.go | 54 +++++++++++------- .../storage_manager_bench_test.go | 8 +-- .../eventstorage/storage_manager_test.go | 57 +++++++++++++++---- x-pack/apm-server/sampling/processor.go | 2 +- x-pack/apm-server/sampling/processor_test.go | 25 ++++---- 11 files changed, 131 insertions(+), 64 deletions(-) diff --git a/x-pack/apm-server/main.go b/x-pack/apm-server/main.go index 3ffb9dc6757..a3ed05e2819 100644 --- a/x-pack/apm-server/main.go +++ b/x-pack/apm-server/main.go @@ -147,8 +147,7 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er }, StorageConfig: sampling.StorageConfig{ DB: db, - Storage: db.NewReadWriter(), - StorageLimit: tailSamplingConfig.StorageLimitParsed, + Storage: db.NewReadWriter(tailSamplingConfig.StorageLimitParsed), TTL: tailSamplingConfig.TTL, DiscardOnWriteFailure: tailSamplingConfig.DiscardOnWriteFailure, }, diff --git a/x-pack/apm-server/sampling/config.go b/x-pack/apm-server/sampling/config.go index 41d85265690..4c1fe54fdaa 100644 --- a/x-pack/apm-server/sampling/config.go +++ b/x-pack/apm-server/sampling/config.go @@ -108,9 +108,6 @@ type StorageConfig struct { // Storage is the read writer to DB. Storage eventstorage.RW - // StorageLimit for the TBS database, in bytes. - StorageLimit uint64 - // TTL holds the amount of time before events and sampling decisions // are expired from local storage. TTL time.Duration @@ -238,6 +235,9 @@ func (config StorageConfig) validate() error { if config.DB == nil { return errors.New("DB unspecified") } + if config.Storage == nil { + return errors.New("Storage unspecified") + } if config.TTL <= 0 { return errors.New("TTL unspecified or negative") } diff --git a/x-pack/apm-server/sampling/config_test.go b/x-pack/apm-server/sampling/config_test.go index b5575a939b3..a62d7913fdf 100644 --- a/x-pack/apm-server/sampling/config_test.go +++ b/x-pack/apm-server/sampling/config_test.go @@ -73,6 +73,9 @@ func TestNewProcessorConfigInvalid(t *testing.T) { assertInvalidConfigError("invalid storage config: DB unspecified") config.DB = &eventstorage.StorageManager{} + assertInvalidConfigError("invalid storage config: Storage unspecified") + config.Storage = &eventstorage.SplitReadWriter{} + assertInvalidConfigError("invalid storage config: TTL unspecified or negative") config.TTL = 1 } diff --git a/x-pack/apm-server/sampling/eventstorage/rw.go b/x-pack/apm-server/sampling/eventstorage/rw.go index be208cb3f0a..df8f9629527 100644 --- a/x-pack/apm-server/sampling/eventstorage/rw.go +++ b/x-pack/apm-server/sampling/eventstorage/rw.go @@ -62,15 +62,36 @@ type storageLimitChecker interface { StorageLimit() uint64 } +type storageLimitCheckerFunc struct { + diskUsage, storageLimit func() uint64 +} + +func NewStorageLimitCheckerFunc(diskUsage, storageLimit func() uint64) storageLimitCheckerFunc { + return storageLimitCheckerFunc{ + diskUsage: diskUsage, + storageLimit: storageLimit, + } +} + +func (f storageLimitCheckerFunc) DiskUsage() uint64 { + return f.diskUsage() +} + +func (f storageLimitCheckerFunc) StorageLimit() uint64 { + return f.storageLimit() +} + // StorageLimitReadWriter is a RW that forbids Write* method calls based on disk usage and limit from storageLimitChecker. // If there is no limit or limit is not reached, method calls are passed through to nextRW. type StorageLimitReadWriter struct { + name string checker storageLimitChecker nextRW RW } -func NewStorageLimitReadWriter(checker storageLimitChecker, nextRW RW) StorageLimitReadWriter { +func NewStorageLimitReadWriter(name string, checker storageLimitChecker, nextRW RW) StorageLimitReadWriter { return StorageLimitReadWriter{ + name: name, checker: checker, nextRW: nextRW, } @@ -81,7 +102,7 @@ func (s StorageLimitReadWriter) checkStorageLimit() error { if limit != 0 { // unlimited storage usage := s.checker.DiskUsage() if usage >= limit { - return fmt.Errorf("%w (current: %d, limit %d)", ErrLimitReached, usage, limit) + return fmt.Errorf("%s: %w (current: %d, limit %d)", s.name, ErrLimitReached, usage, limit) } } return nil diff --git a/x-pack/apm-server/sampling/eventstorage/rw_test.go b/x-pack/apm-server/sampling/eventstorage/rw_test.go index bcf6490c16d..f43fbd697ad 100644 --- a/x-pack/apm-server/sampling/eventstorage/rw_test.go +++ b/x-pack/apm-server/sampling/eventstorage/rw_test.go @@ -79,7 +79,7 @@ func TestStorageLimitReadWriter(t *testing.T) { t.Run(fmt.Sprintf("limit=%d,usage=%d", tt.limit, tt.usage), func(t *testing.T) { checker := mockChecker{limit: tt.limit, usage: tt.usage} var callCount int - rw := eventstorage.NewStorageLimitReadWriter(checker, mockRW{ + rw := eventstorage.NewStorageLimitReadWriter("foo_storage_limiter", checker, mockRW{ callback: func() { callCount++ }, diff --git a/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go b/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go index fa464df323b..3848efab1b8 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go +++ b/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go @@ -18,7 +18,7 @@ import ( func BenchmarkWriteTransaction(b *testing.B) { test := func(b *testing.B, codec eventstorage.Codec, bigTX bool) { sm := newStorageManager(b, eventstorage.WithCodec(codec)) - readWriter := sm.NewReadWriter() + readWriter := sm.NewUnlimitedReadWriter() traceID := hex.EncodeToString([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) transactionID := hex.EncodeToString([]byte{1, 2, 3, 4, 5, 6, 7, 8}) @@ -79,7 +79,7 @@ func BenchmarkReadEvents(b *testing.B) { for _, count := range counts { b.Run(fmt.Sprintf("%d events", count), func(b *testing.B) { sm := newStorageManager(b, eventstorage.WithCodec(codec)) - readWriter := sm.NewReadWriter() + readWriter := sm.NewUnlimitedReadWriter() for i := 0; i < count; i++ { transactionID := uuid.Must(uuid.NewV4()).String() @@ -154,7 +154,7 @@ func BenchmarkReadEventsHit(b *testing.B) { for _, hit := range []bool{false, true} { b.Run(fmt.Sprintf("hit=%v", hit), func(b *testing.B) { sm := newStorageManager(b) - readWriter := sm.NewReadWriter() + readWriter := sm.NewUnlimitedReadWriter() traceIDs := make([]string, b.N) @@ -185,7 +185,7 @@ func BenchmarkReadEventsHit(b *testing.B) { } } - readWriter = sm.NewReadWriter() + readWriter = sm.NewUnlimitedReadWriter() b.ResetTimer() var batch modelpb.Batch @@ -224,7 +224,7 @@ func BenchmarkIsTraceSampled(b *testing.B) { // Test with varying numbers of events in the trace. sm := newStorageManager(b) - readWriter := sm.NewReadWriter() + readWriter := sm.NewUnlimitedReadWriter() if err := readWriter.WriteTraceSampled(sampledTraceUUID.String(), true); err != nil { b.Fatal(err) diff --git a/x-pack/apm-server/sampling/eventstorage/storage_manager.go b/x-pack/apm-server/sampling/eventstorage/storage_manager.go index 9abec2117f8..1951be8b46d 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage_manager.go +++ b/x-pack/apm-server/sampling/eventstorage/storage_manager.go @@ -75,15 +75,13 @@ type StorageManager struct { partitioner *Partitioner - storageLimit atomic.Uint64 - codec Codec // subscriberPosMu protects the subscriber file from concurrent RW. subscriberPosMu sync.Mutex - // cachedDiskUsage is a cached result of DiskUsage - cachedDiskUsage atomic.Uint64 + // cachedDBSize is a cached result of db size. + cachedDBSize atomic.Uint64 // runCh acts as a mutex to ensure only 1 Run is actively running per StorageManager. // as it is possible that 2 separate Run are created by 2 TBS processors during a hot reload. @@ -202,17 +200,17 @@ func (sm *StorageManager) Size() (lsm, vlog int64) { // Also remember to update // - x-pack/apm-server/sampling/processor.go:CollectMonitoring // - systemtest/benchtest/expvar/metrics.go - return int64(sm.DiskUsage()), 0 + return int64(sm.dbSize()), 0 } -// DiskUsage returns the disk usage of databases in bytes. -func (sm *StorageManager) DiskUsage() uint64 { +// dbSize returns the disk usage of databases in bytes. +func (sm *StorageManager) dbSize() uint64 { // pebble DiskSpaceUsage overhead is not high, but it adds up when performed per-event. - return sm.cachedDiskUsage.Load() + return sm.cachedDBSize.Load() } func (sm *StorageManager) updateDiskUsage() { - sm.cachedDiskUsage.Store(sm.eventDB.Metrics().DiskSpaceUsage() + sm.decisionDB.Metrics().DiskSpaceUsage()) + sm.cachedDBSize.Store(sm.eventDB.Metrics().DiskSpaceUsage() + sm.decisionDB.Metrics().DiskSpaceUsage()) } // runDiskUsageLoop runs a loop that updates cached disk usage regularly. @@ -229,10 +227,6 @@ func (sm *StorageManager) runDiskUsageLoop(stopping <-chan struct{}) error { } } -func (sm *StorageManager) StorageLimit() uint64 { - return sm.storageLimit.Load() -} - func (sm *StorageManager) Flush() error { return errors.Join( wrapNonNilErr("event db flush error: %w", sm.eventDB.Flush()), @@ -260,7 +254,7 @@ func (sm *StorageManager) close() error { // Reload flushes out pending disk writes to disk by reloading the database. // For testing only. -// Read writers created prior to Reload cannot be used and will need to be recreated via NewReadWriter. +// Read writers created prior to Reload cannot be used and will need to be recreated via NewUnlimitedReadWriter. func (sm *StorageManager) Reload() error { if err := sm.close(); err != nil { return err @@ -269,7 +263,7 @@ func (sm *StorageManager) Reload() error { } // Run has the same lifecycle as the TBS processor as opposed to StorageManager to facilitate EA hot reload. -func (sm *StorageManager) Run(stopping <-chan struct{}, ttl time.Duration, storageLimit uint64) error { +func (sm *StorageManager) Run(stopping <-chan struct{}, ttl time.Duration) error { select { case <-stopping: return nil @@ -279,8 +273,6 @@ func (sm *StorageManager) Run(stopping <-chan struct{}, ttl time.Duration, stora <-sm.runCh }() - sm.storageLimit.Store(storageLimit) - g := errgroup.Group{} g.Go(func() error { return sm.runTTLGCLoop(stopping, ttl) @@ -346,11 +338,33 @@ func (sm *StorageManager) WriteSubscriberPosition(data []byte) error { return os.WriteFile(filepath.Join(sm.storageDir, subscriberPositionFile), data, 0644) } -func (sm *StorageManager) NewReadWriter() StorageLimitReadWriter { - return NewStorageLimitReadWriter(sm, SplitReadWriter{ +// NewUnlimitedReadWriter returns a read writer with no storage limit. +// For testing only. +func (sm *StorageManager) NewUnlimitedReadWriter() StorageLimitReadWriter { + return sm.NewReadWriter(0) +} + +// NewReadWriter returns a read writer with storage limit. +func (sm *StorageManager) NewReadWriter(storageLimit uint64) StorageLimitReadWriter { + splitRW := SplitReadWriter{ eventRW: sm.eventStorage.NewReadWriter(), decisionRW: sm.decisionStorage.NewReadWriter(), - }) + } + + dbStorageLimit := func() uint64 { + return storageLimit + } + if storageLimit == 0 { + sm.logger.Infof("setting database storage limit to unlimited") + } else { + sm.logger.Infof("setting database storage limit to %.1fgb", float64(storageLimit)) + } + + // To limit db size to storage_limit + dbStorageLimitChecker := NewStorageLimitCheckerFunc(sm.dbSize, dbStorageLimit) + dbStorageLimitRW := NewStorageLimitReadWriter("database storage limit", dbStorageLimitChecker, splitRW) + + return dbStorageLimitRW } // wrapNonNilErr only wraps an error with format if the error is not nil. diff --git a/x-pack/apm-server/sampling/eventstorage/storage_manager_bench_test.go b/x-pack/apm-server/sampling/eventstorage/storage_manager_bench_test.go index 86ebc1d5430..131a0bdbb8e 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage_manager_bench_test.go +++ b/x-pack/apm-server/sampling/eventstorage/storage_manager_bench_test.go @@ -12,12 +12,12 @@ import ( "github.com/stretchr/testify/require" ) -func BenchmarkStorageManager_DiskUsage(b *testing.B) { +func BenchmarkStorageManager_Size(b *testing.B) { stopping := make(chan struct{}) defer close(stopping) sm := newStorageManager(b) - go sm.Run(stopping, time.Second, 0) - rw := sm.NewReadWriter() + go sm.Run(stopping, time.Second) + rw := sm.NewUnlimitedReadWriter() for i := 0; i < 1000; i++ { traceID := uuid.Must(uuid.NewV4()).String() txnID := uuid.Must(uuid.NewV4()).String() @@ -29,7 +29,7 @@ func BenchmarkStorageManager_DiskUsage(b *testing.B) { } b.ResetTimer() for i := 0; i < b.N; i++ { - _ = sm.DiskUsage() + _, _ = sm.Size() } b.StopTimer() } diff --git a/x-pack/apm-server/sampling/eventstorage/storage_manager_test.go b/x-pack/apm-server/sampling/eventstorage/storage_manager_test.go index 1145a9d8fe4..fb96ebef14a 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage_manager_test.go +++ b/x-pack/apm-server/sampling/eventstorage/storage_manager_test.go @@ -32,7 +32,7 @@ func newStorageManagerNoCleanup(tb testing.TB, path string, opts ...eventstorage func TestStorageManager_samplingDecisionTTL(t *testing.T) { sm := newStorageManager(t) - rw := sm.NewReadWriter() + rw := sm.NewUnlimitedReadWriter() traceID := uuid.Must(uuid.NewV4()).String() err := rw.WriteTraceSampled(traceID, true) assert.NoError(t, err) @@ -65,7 +65,7 @@ func TestStorageManager_samplingDecisionTTL(t *testing.T) { func TestStorageManager_eventTTL(t *testing.T) { sm := newStorageManager(t) - rw := sm.NewReadWriter() + rw := sm.NewUnlimitedReadWriter() traceID := uuid.Must(uuid.NewV4()).String() txnID1 := uuid.Must(uuid.NewV4()).String() txn1 := makeTransaction(txnID1, traceID) @@ -119,7 +119,7 @@ func TestStorageManager_partitionID(t *testing.T) { assert.NoError(t, sm.RotatePartitions()) // write to partition 1 - err := sm.NewReadWriter().WriteTraceSampled(traceID, true) + err := sm.NewUnlimitedReadWriter().WriteTraceSampled(traceID, true) assert.NoError(t, err) assert.NoError(t, sm.Close()) @@ -127,7 +127,7 @@ func TestStorageManager_partitionID(t *testing.T) { // it should read directly from partition 1 on startup instead of 0 sm = newStorageManagerNoCleanup(t, tmpDir) defer sm.Close() - sampled, err := sm.NewReadWriter().IsTraceSampled(traceID) + sampled, err := sm.NewUnlimitedReadWriter().IsTraceSampled(traceID) assert.NoError(t, err) assert.True(t, sampled) } @@ -136,29 +136,36 @@ func TestStorageManager_DiskUsage(t *testing.T) { stopping := make(chan struct{}) defer close(stopping) sm := newStorageManager(t) - go sm.Run(stopping, time.Second, 0) - old := sm.DiskUsage() + go sm.Run(stopping, time.Second) - err := sm.NewReadWriter().WriteTraceSampled("foo", true) + lsm, vlog := sm.Size() + oldSize := lsm + vlog + + err := sm.NewUnlimitedReadWriter().WriteTraceSampled("foo", true) require.NoError(t, err) err = sm.Flush() require.NoError(t, err) assert.Eventually(t, func() bool { - return sm.DiskUsage() > old + lsm, vlog := sm.Size() + newSize := lsm + vlog + return newSize > oldSize }, 10*time.Second, 100*time.Millisecond) - old = sm.DiskUsage() + lsm, vlog = sm.Size() + oldSize = lsm + vlog - err = sm.NewReadWriter().WriteTraceEvent("foo", "bar", makeTransaction("bar", "foo")) + err = sm.NewUnlimitedReadWriter().WriteTraceEvent("foo", "bar", makeTransaction("bar", "foo")) require.NoError(t, err) err = sm.Flush() require.NoError(t, err) assert.Eventually(t, func() bool { - return sm.DiskUsage() > old + lsm, vlog := sm.Size() + newSize := lsm + vlog + return newSize > oldSize }, 10*time.Second, 100*time.Millisecond) } @@ -167,9 +174,35 @@ func TestStorageManager_Run(t *testing.T) { stopping := make(chan struct{}) sm := newStorageManager(t) go func() { - assert.NoError(t, sm.Run(stopping, time.Second, 0)) + assert.NoError(t, sm.Run(stopping, time.Second)) + close(done) + }() + close(stopping) + <-done +} + +func TestStorageManager_StorageLimit(t *testing.T) { + done := make(chan struct{}) + stopping := make(chan struct{}) + sm := newStorageManager(t) + go func() { + assert.NoError(t, sm.Run(stopping, time.Second)) close(done) }() + require.NoError(t, sm.Flush()) + lsm, _ := sm.Size() + assert.Greater(t, lsm, int64(1)) + + traceID := uuid.Must(uuid.NewV4()).String() + txnID := uuid.Must(uuid.NewV4()).String() + txn := makeTransaction(txnID, traceID) + + small := sm.NewReadWriter(1) + assert.ErrorIs(t, small.WriteTraceEvent(traceID, txnID, txn), eventstorage.ErrLimitReached) + + big := sm.NewReadWriter(10 << 10) + assert.NoError(t, big.WriteTraceEvent(traceID, txnID, txn)) + close(stopping) <-done } diff --git a/x-pack/apm-server/sampling/processor.go b/x-pack/apm-server/sampling/processor.go index 7a149ef2cea..7ffff1ec409 100644 --- a/x-pack/apm-server/sampling/processor.go +++ b/x-pack/apm-server/sampling/processor.go @@ -345,7 +345,7 @@ func (p *Processor) Run() error { } }) g.Go(func() error { - return p.config.DB.Run(p.stopping, p.config.TTL, p.config.StorageLimit) + return p.config.DB.Run(p.stopping, p.config.TTL) }) g.Go(func() error { // Subscribe to remotely sampled trace IDs. This is cancelled immediately when diff --git a/x-pack/apm-server/sampling/processor_test.go b/x-pack/apm-server/sampling/processor_test.go index fdc1e0d578b..88272d9bb0f 100644 --- a/x-pack/apm-server/sampling/processor_test.go +++ b/x-pack/apm-server/sampling/processor_test.go @@ -61,14 +61,14 @@ func TestProcessAlreadyTailSampled(t *testing.T) { // subsequent events in the trace will be reported immediately. trace1 := modelpb.Trace{Id: "0102030405060708090a0b0c0d0e0f10"} trace2 := modelpb.Trace{Id: "0102030405060708090a0b0c0d0e0f11"} - writer := config.DB.NewReadWriter() + writer := config.DB.NewUnlimitedReadWriter() assert.NoError(t, writer.WriteTraceSampled(trace2.Id, true)) // simulate 2 TTL assert.NoError(t, config.DB.RotatePartitions()) assert.NoError(t, config.DB.RotatePartitions()) - writer = config.DB.NewReadWriter() + writer = config.DB.NewUnlimitedReadWriter() assert.NoError(t, writer.WriteTraceSampled(trace1.Id, true)) require.NoError(t, config.DB.Flush()) @@ -127,7 +127,7 @@ func TestProcessAlreadyTailSampled(t *testing.T) { // Stop the processor and flush global storage so we can access the database. assert.NoError(t, processor.Stop(context.Background())) assert.NoError(t, config.DB.Flush()) - reader := config.DB.NewReadWriter() + reader := config.DB.NewUnlimitedReadWriter() batch = nil err = reader.ReadTraceEvents(trace1.Id, &batch) @@ -243,7 +243,7 @@ func TestProcessLocalTailSampling(t *testing.T) { // Stop the processor and flush global storage so we can access the database. assert.NoError(t, processor.Stop(context.Background())) assert.NoError(t, config.DB.Flush()) - reader := config.DB.NewReadWriter() + reader := config.DB.NewUnlimitedReadWriter() sampled, err := reader.IsTraceSampled(sampledTraceID) assert.NoError(t, err) @@ -307,7 +307,7 @@ func TestProcessLocalTailSamplingUnsampled(t *testing.T) { // Stop the processor so we can access the database. assert.NoError(t, processor.Stop(context.Background())) assert.NoError(t, config.DB.Flush()) - reader := config.DB.NewReadWriter() + reader := config.DB.NewUnlimitedReadWriter() var anyUnsampled bool for _, traceID := range traceIDs { @@ -470,7 +470,7 @@ func TestProcessRemoteTailSampling(t *testing.T) { assert.Empty(t, cmp.Diff(trace1Events, events, protocmp.Transform())) - reader := config.DB.NewReadWriter() + reader := config.DB.NewUnlimitedReadWriter() sampled, err := reader.IsTraceSampled(traceID1) assert.NoError(t, err) @@ -682,12 +682,10 @@ func TestStorageLimit(t *testing.T) { err := config.DB.Reload() assert.NoError(t, err) - config.Storage = config.DB.NewReadWriter() - lsm, vlog := config.DB.Size() assert.Greater(t, lsm+vlog, int64(10<<10)) - config.StorageLimit = 10 << 10 // Set the storage limit to smaller than existing storage + config.Storage = config.DB.NewReadWriter(10 << 10) // Set the storage limit to smaller than existing storage writeBatch(1000, config, func(b modelpb.Batch) { assert.Len(t, b, 1000) @@ -758,7 +756,7 @@ func TestGracefulShutdown(t *testing.T) { assert.Empty(t, batch) assert.NoError(t, processor.Stop(context.Background())) - reader := config.DB.NewReadWriter() + reader := config.DB.NewUnlimitedReadWriter() var count int for i := 0; i < totalTraces; i++ { @@ -815,10 +813,9 @@ func newTempdirConfig(tb testing.TB) testConfig { UUID: "local-apm-server", }, StorageConfig: sampling.StorageConfig{ - DB: db, - Storage: db.NewReadWriter(), - TTL: 30 * time.Minute, - StorageLimit: 0, // No storage limit. + DB: db, + Storage: db.NewUnlimitedReadWriter(), + TTL: 30 * time.Minute, }, }, } From 00a250d3a459382ef36356ced6e1c4db352015d9 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Thu, 30 Jan 2025 23:52:30 +0000 Subject: [PATCH 4/5] TBS: log a warning with error field on processing trace failed (#15492) --- x-pack/apm-server/sampling/processor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/apm-server/sampling/processor.go b/x-pack/apm-server/sampling/processor.go index 7ffff1ec409..0416d451893 100644 --- a/x-pack/apm-server/sampling/processor.go +++ b/x-pack/apm-server/sampling/processor.go @@ -121,10 +121,10 @@ func (p *Processor) ProcessBatch(ctx context.Context, batch *modelpb.Batch) erro stored = false if p.config.DiscardOnWriteFailure { report = false - p.rateLimitedLogger.Info("processing trace failed, discarding by default") + p.rateLimitedLogger.With(logp.Error(err)).Warn("processing trace failed, discarding by default") } else { report = true - p.rateLimitedLogger.Info("processing trace failed, indexing by default") + p.rateLimitedLogger.With(logp.Error(err)).Warn("processing trace failed, indexing by default") } } From 28068bd28e2af8c38ab7d9d30040c448c1002b95 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 31 Jan 2025 11:14:13 +0000 Subject: [PATCH 5/5] TBS: fix log formatting (#15496) Fix a missing colon in logs (typo from #15235 ), and remove "storage" in "configured storage limit reached" message to make way for #15467 to avoid confusion --- x-pack/apm-server/sampling/eventstorage/rw.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/apm-server/sampling/eventstorage/rw.go b/x-pack/apm-server/sampling/eventstorage/rw.go index df8f9629527..a12b60f52d9 100644 --- a/x-pack/apm-server/sampling/eventstorage/rw.go +++ b/x-pack/apm-server/sampling/eventstorage/rw.go @@ -14,7 +14,7 @@ import ( var ( // ErrLimitReached is returned by RW methods when storage usage // is greater than configured limit. - ErrLimitReached = errors.New("configured storage limit reached") + ErrLimitReached = errors.New("configured limit reached") ) // RW is a read writer interface that has methods to read and write trace event and sampling decisions. @@ -102,7 +102,7 @@ func (s StorageLimitReadWriter) checkStorageLimit() error { if limit != 0 { // unlimited storage usage := s.checker.DiskUsage() if usage >= limit { - return fmt.Errorf("%s: %w (current: %d, limit %d)", s.name, ErrLimitReached, usage, limit) + return fmt.Errorf("%s: %w (current: %d, limit: %d)", s.name, ErrLimitReached, usage, limit) } } return nil