Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[9.0] TBS: set default sampling.tail.storage_limit to 0 but limit disk usage to 90% (backport #15467) #15501

Merged
merged 2 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions changelogs/all-breaking-changes.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,24 @@
This section describes the breaking changes and deprecations introduced in this release
and previous minor versions.

// tag::90-bc[]
[float]
[[breaking-changes-9.0]]
=== 9.0

The following breaking changes are introduced in APM version 9.0.0:

- Change `sampling.tail.storage_limit` default to `0`.
While `0` means unlimited local tail-sampling database size,
it now enforces a max 90% disk usage on the disk where the data directory is located.
Any tail sampling writes after this threshold will be rejected,
similar to what happens when tail-sampling database size exceeds a non-0 storage limit.
Setting `sampling.tail.storage_limit` to non-0 maintains the existing behavior
which limits the tail-sampling database size to `sampling.tail.storage_limit`
and does not have the new disk usage threshold check.
For more details, see https://github.com/elastic/apm-server/pull/15467[PR #15467]
// end::90-bc[]

// tag::811-bc[]
[float]
[[breaking-changes-8.11]]
Expand Down
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ https://github.com/elastic/apm-server/compare/8.16\...8.x[View commits]

[float]
==== Breaking Changes
- Change `sampling.tail.storage_limit` default to `0`. While `0` means unlimited local tail-sampling database size, it now enforces a max 90% disk usage on the disk where the data directory is located. Any tail sampling writes after this threshold will be rejected, similar to what happens when tail-sampling database size exceeds a non-0 storage limit. Setting `sampling.tail.storage_limit` to non-0 maintains the existing behavior which limits the tail-sampling database size to `sampling.tail.storage_limit` and does not have the new disk usage threshold check. {pull}15467[15467]

[float]
==== Deprecations
Expand Down
17 changes: 10 additions & 7 deletions internal/beater/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,9 @@ func TestUnpackConfig(t *testing.T) {
ESConfig: elasticsearch.DefaultConfig(),
Interval: 1 * time.Minute,
IngestRateDecayFactor: 0.25,
StorageLimit: "3GB",
StorageLimitParsed: 3000000000,
StorageLimit: "0",
StorageLimitParsed: 0,
DiskUsageThreshold: 0.9,
TTL: 30 * time.Minute,
},
},
Expand Down Expand Up @@ -410,11 +411,12 @@ func TestUnpackConfig(t *testing.T) {
},
},
"sampling.tail": map[string]interface{}{
"enabled": false,
"policies": []map[string]interface{}{{"sample_rate": 0.5}},
"interval": "2m",
"ingest_rate_decay": 1.0,
"storage_limit": "1GB",
"enabled": false,
"policies": []map[string]interface{}{{"sample_rate": 0.5}},
"interval": "2m",
"ingest_rate_decay": 1.0,
"storage_limit": "1GB",
"disk_usage_threshold": 0.8,
},
"data_streams": map[string]interface{}{
"namespace": "foo",
Expand Down Expand Up @@ -495,6 +497,7 @@ func TestUnpackConfig(t *testing.T) {
IngestRateDecayFactor: 1.0,
StorageLimit: "1GB",
StorageLimitParsed: 1000000000,
DiskUsageThreshold: 0.8,
TTL: 30 * time.Minute,
},
},
Expand Down
18 changes: 14 additions & 4 deletions internal/beater/config/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,18 @@ type TailSamplingConfig struct {
Interval time.Duration `config:"interval" validate:"min=1s"`
IngestRateDecayFactor float64 `config:"ingest_rate_decay" validate:"min=0, max=1"`
TTL time.Duration `config:"ttl" validate:"min=1s"`
StorageLimit string `config:"storage_limit"`
StorageLimitParsed uint64

// StorageLimit is the user-configured tail-sampling database size limit.
// 0 means unlimited storage with DiskUsageThreshold check enabled.
StorageLimit string `config:"storage_limit"`
StorageLimitParsed uint64

// DiskUsageThreshold controls the proportion of the disk to be filled at max, irrespective of db size.
// e.g. 0.9 means the last 10% of disk should not be written to.
//
// Any non-0 StorageLimit causes DiskUsageThreshold to be ignored.
DiskUsageThreshold float64 `config:"disk_usage_threshold" validate:"min=0, max=1"`

DiscardOnWriteFailure bool `config:"discard_on_write_failure"`

esConfigured bool
Expand Down Expand Up @@ -98,7 +108,6 @@ func (c *TailSamplingConfig) Unpack(in *config.C) error {
cfg.Enabled = in.Enabled()
*c = TailSamplingConfig(cfg)
c.esConfigured = in.HasField("elasticsearch")
c.StorageLimitParsed = limit
err = errors.Wrap(c.Validate(), "invalid config")
return nil
}
Expand Down Expand Up @@ -151,7 +160,8 @@ func defaultTailSamplingConfig() TailSamplingConfig {
Interval: 1 * time.Minute,
IngestRateDecayFactor: 0.25,
TTL: 30 * time.Minute,
StorageLimit: "3GB",
StorageLimit: "0",
DiskUsageThreshold: 0.9,
DiscardOnWriteFailure: false,
}
parsed, err := humanize.ParseBytes(cfg.StorageLimit)
Expand Down
2 changes: 1 addition & 1 deletion x-pack/apm-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
},
StorageConfig: sampling.StorageConfig{
DB: db,
Storage: db.NewReadWriter(tailSamplingConfig.StorageLimitParsed),
Storage: db.NewReadWriter(tailSamplingConfig.StorageLimitParsed, tailSamplingConfig.DiskUsageThreshold),
TTL: tailSamplingConfig.TTL,
DiscardOnWriteFailure: tailSamplingConfig.DiscardOnWriteFailure,
},
Expand Down
10 changes: 5 additions & 5 deletions x-pack/apm-server/sampling/eventstorage/storage_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
func BenchmarkWriteTransaction(b *testing.B) {
test := func(b *testing.B, codec eventstorage.Codec, bigTX bool) {
sm := newStorageManager(b, eventstorage.WithCodec(codec))
readWriter := sm.NewUnlimitedReadWriter()
readWriter := newUnlimitedReadWriter(sm)

traceID := hex.EncodeToString([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
transactionID := hex.EncodeToString([]byte{1, 2, 3, 4, 5, 6, 7, 8})
Expand Down Expand Up @@ -79,7 +79,7 @@ func BenchmarkReadEvents(b *testing.B) {
for _, count := range counts {
b.Run(fmt.Sprintf("%d events", count), func(b *testing.B) {
sm := newStorageManager(b, eventstorage.WithCodec(codec))
readWriter := sm.NewUnlimitedReadWriter()
readWriter := newUnlimitedReadWriter(sm)

for i := 0; i < count; i++ {
transactionID := uuid.Must(uuid.NewV4()).String()
Expand Down Expand Up @@ -154,7 +154,7 @@ func BenchmarkReadEventsHit(b *testing.B) {
for _, hit := range []bool{false, true} {
b.Run(fmt.Sprintf("hit=%v", hit), func(b *testing.B) {
sm := newStorageManager(b)
readWriter := sm.NewUnlimitedReadWriter()
readWriter := newUnlimitedReadWriter(sm)

traceIDs := make([]string, b.N)

Expand Down Expand Up @@ -185,7 +185,7 @@ func BenchmarkReadEventsHit(b *testing.B) {
}
}

readWriter = sm.NewUnlimitedReadWriter()
readWriter = newUnlimitedReadWriter(sm)

b.ResetTimer()
var batch modelpb.Batch
Expand Down Expand Up @@ -224,7 +224,7 @@ func BenchmarkIsTraceSampled(b *testing.B) {

// Test with varying numbers of events in the trace.
sm := newStorageManager(b)
readWriter := sm.NewUnlimitedReadWriter()
readWriter := newUnlimitedReadWriter(sm)

if err := readWriter.WriteTraceSampled(sampledTraceUUID.String(), true); err != nil {
b.Fatal(err)
Expand Down
134 changes: 113 additions & 21 deletions x-pack/apm-server/sampling/eventstorage/storage_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/cockroachdb/pebble/v2"
"github.com/cockroachdb/pebble/v2/vfs"
"go.opentelemetry.io/otel/metric"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -46,6 +47,12 @@ const (

// diskUsageFetchInterval is how often disk usage is fetched which is equivalent to how long disk usage is cached.
diskUsageFetchInterval = 1 * time.Second

// dbStorageLimitFallback is the default fallback storage limit in bytes
// that applies when disk usage threshold cannot be enforced due to an error.
dbStorageLimitFallback = 3 << 30

gb = float64(1 << 30)
)

type StorageManagerOptions func(*StorageManager)
Expand All @@ -62,6 +69,27 @@ func WithMeterProvider(mp metric.MeterProvider) StorageManagerOptions {
}
}

// WithGetDBSize configures getDBSize function used by StorageManager.
// For testing only.
func WithGetDBSize(getDBSize func() uint64) StorageManagerOptions {
return func(sm *StorageManager) {
sm.getDBSize = getDBSize
}
}

// WithGetDiskUsage configures getDiskUsage function used by StorageManager.
// For testing only.
func WithGetDiskUsage(getDiskUsage func() (DiskUsage, error)) StorageManagerOptions {
return func(sm *StorageManager) {
sm.getDiskUsage = getDiskUsage
}
}

// DiskUsage is the struct returned by getDiskUsage.
type DiskUsage struct {
UsedBytes, TotalBytes uint64
}

// StorageManager encapsulates pebble.DB.
// It assumes exclusive access to pebble DB at storageDir.
type StorageManager struct {
Expand All @@ -80,9 +108,20 @@ type StorageManager struct {
// subscriberPosMu protects the subscriber file from concurrent RW.
subscriberPosMu sync.Mutex

// getDBSize returns the total size of databases in bytes.
getDBSize func() uint64
// cachedDBSize is a cached result of db size.
cachedDBSize atomic.Uint64

// getDiskUsage returns the disk / filesystem usage statistics of storageDir.
getDiskUsage func() (DiskUsage, error)
// getDiskUsageFailed indicates if getDiskUsage calls ever failed.
getDiskUsageFailed atomic.Bool
// cachedDiskStat is disk usage statistics about the disk only, not related to the databases.
cachedDiskStat struct {
used, total atomic.Uint64
}

// runCh acts as a mutex to ensure only 1 Run is actively running per StorageManager.
// as it is possible that 2 separate Run are created by 2 TBS processors during a hot reload.
runCh chan struct{}
Expand All @@ -100,6 +139,16 @@ func NewStorageManager(storageDir string, opts ...StorageManagerOptions) (*Stora
runCh: make(chan struct{}, 1),
logger: logp.NewLogger(logs.Sampling),
codec: ProtobufCodec{},
getDiskUsage: func() (DiskUsage, error) {
usage, err := vfs.Default.GetDiskUsage(storageDir)
return DiskUsage{
UsedBytes: usage.UsedBytes,
TotalBytes: usage.TotalBytes,
}, err
},
}
sm.getDBSize = func() uint64 {
return sm.eventDB.Metrics().DiskSpaceUsage() + sm.decisionDB.Metrics().DiskSpaceUsage()
}
for _, opt := range opts {
opt(sm)
Expand Down Expand Up @@ -210,7 +259,30 @@ func (sm *StorageManager) dbSize() uint64 {
}

func (sm *StorageManager) updateDiskUsage() {
sm.cachedDBSize.Store(sm.eventDB.Metrics().DiskSpaceUsage() + sm.decisionDB.Metrics().DiskSpaceUsage())
sm.cachedDBSize.Store(sm.getDBSize())

if sm.getDiskUsageFailed.Load() {
// Skip GetDiskUsage under the assumption that
// it will always get the same error if GetDiskUsage ever returns one,
// such that it does not keep logging GetDiskUsage errors.
return
}
usage, err := sm.getDiskUsage()
if err != nil {
sm.logger.With(logp.Error(err)).Warn("failed to get disk usage")
sm.getDiskUsageFailed.Store(true)
sm.cachedDiskStat.used.Store(0)
sm.cachedDiskStat.total.Store(0) // setting total to 0 to disable any running disk usage threshold checks
return
}
sm.cachedDiskStat.used.Store(usage.UsedBytes)
sm.cachedDiskStat.total.Store(usage.TotalBytes)
}

// diskUsed returns the actual used disk space in bytes.
// Not to be confused with dbSize which is specific to database.
func (sm *StorageManager) diskUsed() uint64 {
return sm.cachedDiskStat.used.Load()
}

// runDiskUsageLoop runs a loop that updates cached disk usage regularly.
Expand Down Expand Up @@ -338,33 +410,53 @@ func (sm *StorageManager) WriteSubscriberPosition(data []byte) error {
return os.WriteFile(filepath.Join(sm.storageDir, subscriberPositionFile), data, 0644)
}

// NewUnlimitedReadWriter returns a read writer with no storage limit.
// For testing only.
func (sm *StorageManager) NewUnlimitedReadWriter() StorageLimitReadWriter {
return sm.NewReadWriter(0)
}

// NewReadWriter returns a read writer with storage limit.
func (sm *StorageManager) NewReadWriter(storageLimit uint64) StorageLimitReadWriter {
splitRW := SplitReadWriter{
// NewReadWriter returns a read writer configured with storage limit and disk usage threshold.
func (sm *StorageManager) NewReadWriter(storageLimit uint64, diskUsageThreshold float64) RW {
var rw RW = SplitReadWriter{
eventRW: sm.eventStorage.NewReadWriter(),
decisionRW: sm.decisionStorage.NewReadWriter(),
}

dbStorageLimit := func() uint64 {
return storageLimit
}
if storageLimit == 0 {
sm.logger.Infof("setting database storage limit to unlimited")
} else {
sm.logger.Infof("setting database storage limit to %.1fgb", float64(storageLimit))
// If db storage limit is set, only enforce db storage limit.
if storageLimit > 0 {
// dbStorageLimit returns max size of db in bytes.
// If size of db exceeds dbStorageLimit, writes should be rejected.
dbStorageLimit := func() uint64 {
return storageLimit
}
sm.logger.Infof("setting database storage limit to %0.1fgb", float64(storageLimit)/gb)
dbStorageLimitChecker := NewStorageLimitCheckerFunc(sm.dbSize, dbStorageLimit)
rw = NewStorageLimitReadWriter("database storage limit", dbStorageLimitChecker, rw)
return rw
}

// To limit db size to storage_limit
dbStorageLimitChecker := NewStorageLimitCheckerFunc(sm.dbSize, dbStorageLimit)
dbStorageLimitRW := NewStorageLimitReadWriter("database storage limit", dbStorageLimitChecker, splitRW)
// DB storage limit is unlimited, enforce disk usage threshold if possible.
// Load whether getDiskUsage failed, as it was called during StorageManager initialization.
if sm.getDiskUsageFailed.Load() {
// Limit db size to fallback storage limit as getDiskUsage returned an error
dbStorageLimit := func() uint64 {
return dbStorageLimitFallback
}
sm.logger.Warnf("overriding database storage limit to fallback default of %0.1fgb as get disk usage failed", float64(dbStorageLimitFallback)/gb)
dbStorageLimitChecker := NewStorageLimitCheckerFunc(sm.dbSize, dbStorageLimit)
rw = NewStorageLimitReadWriter("database storage limit", dbStorageLimitChecker, rw)
return rw
}

return dbStorageLimitRW
// diskThreshold returns max used disk space in bytes, not in percentage.
// If size of used disk space exceeds diskThreshold, writes should be rejected.
diskThreshold := func() uint64 {
return uint64(float64(sm.cachedDiskStat.total.Load()) * diskUsageThreshold)
}
// the total disk space could change in runtime, but it is still useful to print it out in logs.
sm.logger.Infof("setting disk usage threshold to %.2f of total disk space of %0.1fgb", diskUsageThreshold, float64(sm.cachedDiskStat.total.Load())/gb)
diskThresholdChecker := NewStorageLimitCheckerFunc(sm.diskUsed, diskThreshold)
rw = NewStorageLimitReadWriter(
fmt.Sprintf("disk usage threshold %.2f", diskUsageThreshold),
diskThresholdChecker,
rw,
)
return rw
}

// wrapNonNilErr only wraps an error with format if the error is not nil.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func BenchmarkStorageManager_Size(b *testing.B) {
defer close(stopping)
sm := newStorageManager(b)
go sm.Run(stopping, time.Second)
rw := sm.NewUnlimitedReadWriter()
rw := newUnlimitedReadWriter(sm)
for i := 0; i < 1000; i++ {
traceID := uuid.Must(uuid.NewV4()).String()
txnID := uuid.Must(uuid.NewV4()).String()
Expand Down
Loading
Loading