Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TBS: set default sampling.tail.storage_limit to 0 but limit disk usage to 90% #15467

Merged
merged 51 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
0e138ff
PoC on disk threshold
carsonip Jan 30, 2025
34406b7
Remove default storage limit
carsonip Jan 30, 2025
a614778
Cache results
carsonip Jan 30, 2025
27d2e21
Fix default storage limit test
carsonip Jan 30, 2025
a047c91
Use rate limited logger
carsonip Jan 30, 2025
6bc1078
Update comment
carsonip Jan 30, 2025
b409d03
Better error message
carsonip Jan 30, 2025
28e440f
Refactor
carsonip Jan 30, 2025
e51d329
Refactor
carsonip Jan 30, 2025
e550b11
Rename
carsonip Jan 30, 2025
954eca1
Fix compile error
carsonip Jan 30, 2025
38489a9
Add comment
carsonip Jan 30, 2025
cf9e2da
Add fallback to GetDiskUsage error
carsonip Jan 30, 2025
7c89ced
Disable disk_threshold on first error
carsonip Jan 30, 2025
454ab84
Add FIXME
carsonip Jan 30, 2025
d6c84b8
Ensure that fallback is correct
carsonip Jan 30, 2025
40ed22a
Merge
carsonip Jan 30, 2025
b602c0a
Move fallback logic to NewReadWriter
carsonip Jan 30, 2025
fa9f12c
Remove rateLimitedLogger
carsonip Jan 30, 2025
f333764
Add configurable disk threshold
carsonip Jan 30, 2025
d23c40d
Fix tests
carsonip Jan 30, 2025
06cea30
Use atomics
carsonip Jan 30, 2025
fe5fa61
Allow for unlimited even in case of error
carsonip Jan 30, 2025
5c0a7d9
Update comment
carsonip Jan 30, 2025
5474d43
Rename
carsonip Jan 30, 2025
a22f2a3
Log disk threshold configuration
carsonip Jan 30, 2025
6342883
Add disk threshold test
carsonip Jan 30, 2025
2e4206e
Improve comment
carsonip Jan 30, 2025
0144ddb
Use 0,0
carsonip Jan 30, 2025
6988450
Add special case handling
carsonip Jan 30, 2025
af3fe53
More descriptive name
carsonip Jan 30, 2025
494a540
Print disk space
carsonip Jan 30, 2025
c6403d5
Improve comments
carsonip Jan 30, 2025
e66e070
Fix logs
carsonip Jan 30, 2025
e2034b5
Logs
carsonip Jan 30, 2025
5bc7004
Revert fallback to 3GB
carsonip Jan 31, 2025
1904ae0
diskStat->cachedDiskStat, diskStatFailed->getDiskUsageFailed
carsonip Jan 31, 2025
839efb3
Change default to an explicit 0
carsonip Jan 31, 2025
682f7d3
Move unlimitedReadWriter
carsonip Jan 31, 2025
18fe30b
disk threshold ratio -> disk usage threshold
carsonip Jan 31, 2025
ca62f9f
Enforce only 1 of storage limit or disk usage threshold
carsonip Jan 31, 2025
370c7ea
Rename
carsonip Jan 31, 2025
0678a5f
Update tests
carsonip Jan 31, 2025
1e1b4cf
Update StorageLimitRW name
carsonip Jan 31, 2025
bdff329
Improve comment
carsonip Jan 31, 2025
0d74181
Reduce diff
carsonip Jan 31, 2025
856a5bc
Better logs
carsonip Jan 31, 2025
615e94d
Add comment
carsonip Jan 31, 2025
7bcfc60
Set used to 0 as well
carsonip Jan 31, 2025
4677241
Merge branch 'main' into disk-threshold
carsonip Jan 31, 2025
235d1ed
Add changelog
carsonip Jan 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions internal/beater/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,9 @@ func TestUnpackConfig(t *testing.T) {
ESConfig: elasticsearch.DefaultConfig(),
Interval: 1 * time.Minute,
IngestRateDecayFactor: 0.25,
StorageLimit: "3GB",
StorageLimitParsed: 3000000000,
StorageLimit: "0",
StorageLimitParsed: 0,
DiskUsageThreshold: 0.9,
TTL: 30 * time.Minute,
},
},
Expand Down Expand Up @@ -410,11 +411,12 @@ func TestUnpackConfig(t *testing.T) {
},
},
"sampling.tail": map[string]interface{}{
"enabled": false,
"policies": []map[string]interface{}{{"sample_rate": 0.5}},
"interval": "2m",
"ingest_rate_decay": 1.0,
"storage_limit": "1GB",
"enabled": false,
"policies": []map[string]interface{}{{"sample_rate": 0.5}},
"interval": "2m",
"ingest_rate_decay": 1.0,
"storage_limit": "1GB",
"disk_usage_threshold": 0.8,
},
"data_streams": map[string]interface{}{
"namespace": "foo",
Expand Down Expand Up @@ -495,6 +497,7 @@ func TestUnpackConfig(t *testing.T) {
IngestRateDecayFactor: 1.0,
StorageLimit: "1GB",
StorageLimitParsed: 1000000000,
DiskUsageThreshold: 0.8,
TTL: 30 * time.Minute,
},
},
Expand Down
18 changes: 14 additions & 4 deletions internal/beater/config/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,18 @@ type TailSamplingConfig struct {
Interval time.Duration `config:"interval" validate:"min=1s"`
IngestRateDecayFactor float64 `config:"ingest_rate_decay" validate:"min=0, max=1"`
TTL time.Duration `config:"ttl" validate:"min=1s"`
StorageLimit string `config:"storage_limit"`
StorageLimitParsed uint64

// StorageLimit is the user-configured tail-sampling database size limit.
// 0 means unlimited storage with DiskUsageThreshold check enabled.
StorageLimit string `config:"storage_limit"`
StorageLimitParsed uint64

// DiskUsageThreshold controls the proportion of the disk to be filled at max, irrespective of db size.
// e.g. 0.9 means the last 10% of disk should not be written to.
//
// Any non-0 StorageLimit causes DiskUsageThreshold to be ignored.
DiskUsageThreshold float64 `config:"disk_usage_threshold" validate:"min=0, max=1"`

DiscardOnWriteFailure bool `config:"discard_on_write_failure"`

esConfigured bool
Expand Down Expand Up @@ -98,7 +108,6 @@ func (c *TailSamplingConfig) Unpack(in *config.C) error {
cfg.Enabled = in.Enabled()
*c = TailSamplingConfig(cfg)
c.esConfigured = in.HasField("elasticsearch")
c.StorageLimitParsed = limit
err = errors.Wrap(c.Validate(), "invalid config")
return nil
}
Expand Down Expand Up @@ -151,7 +160,8 @@ func defaultTailSamplingConfig() TailSamplingConfig {
Interval: 1 * time.Minute,
IngestRateDecayFactor: 0.25,
TTL: 30 * time.Minute,
StorageLimit: "3GB",
StorageLimit: "0",
DiskUsageThreshold: 0.9,
DiscardOnWriteFailure: false,
}
parsed, err := humanize.ParseBytes(cfg.StorageLimit)
Expand Down
2 changes: 1 addition & 1 deletion x-pack/apm-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
},
StorageConfig: sampling.StorageConfig{
DB: db,
Storage: db.NewReadWriter(tailSamplingConfig.StorageLimitParsed),
Storage: db.NewReadWriter(tailSamplingConfig.StorageLimitParsed, tailSamplingConfig.DiskUsageThreshold),
TTL: tailSamplingConfig.TTL,
DiscardOnWriteFailure: tailSamplingConfig.DiscardOnWriteFailure,
},
Expand Down
10 changes: 5 additions & 5 deletions x-pack/apm-server/sampling/eventstorage/storage_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
func BenchmarkWriteTransaction(b *testing.B) {
test := func(b *testing.B, codec eventstorage.Codec, bigTX bool) {
sm := newStorageManager(b, eventstorage.WithCodec(codec))
readWriter := sm.NewUnlimitedReadWriter()
readWriter := newUnlimitedReadWriter(sm)

traceID := hex.EncodeToString([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
transactionID := hex.EncodeToString([]byte{1, 2, 3, 4, 5, 6, 7, 8})
Expand Down Expand Up @@ -79,7 +79,7 @@ func BenchmarkReadEvents(b *testing.B) {
for _, count := range counts {
b.Run(fmt.Sprintf("%d events", count), func(b *testing.B) {
sm := newStorageManager(b, eventstorage.WithCodec(codec))
readWriter := sm.NewUnlimitedReadWriter()
readWriter := newUnlimitedReadWriter(sm)

for i := 0; i < count; i++ {
transactionID := uuid.Must(uuid.NewV4()).String()
Expand Down Expand Up @@ -154,7 +154,7 @@ func BenchmarkReadEventsHit(b *testing.B) {
for _, hit := range []bool{false, true} {
b.Run(fmt.Sprintf("hit=%v", hit), func(b *testing.B) {
sm := newStorageManager(b)
readWriter := sm.NewUnlimitedReadWriter()
readWriter := newUnlimitedReadWriter(sm)

traceIDs := make([]string, b.N)

Expand Down Expand Up @@ -185,7 +185,7 @@ func BenchmarkReadEventsHit(b *testing.B) {
}
}

readWriter = sm.NewUnlimitedReadWriter()
readWriter = newUnlimitedReadWriter(sm)

b.ResetTimer()
var batch modelpb.Batch
Expand Down Expand Up @@ -224,7 +224,7 @@ func BenchmarkIsTraceSampled(b *testing.B) {

// Test with varying numbers of events in the trace.
sm := newStorageManager(b)
readWriter := sm.NewUnlimitedReadWriter()
readWriter := newUnlimitedReadWriter(sm)

if err := readWriter.WriteTraceSampled(sampledTraceUUID.String(), true); err != nil {
b.Fatal(err)
Expand Down
133 changes: 112 additions & 21 deletions x-pack/apm-server/sampling/eventstorage/storage_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/cockroachdb/pebble/v2"
"github.com/cockroachdb/pebble/v2/vfs"
"go.opentelemetry.io/otel/metric"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -46,6 +47,12 @@ const (

// diskUsageFetchInterval is how often disk usage is fetched which is equivalent to how long disk usage is cached.
diskUsageFetchInterval = 1 * time.Second

// dbStorageLimitFallback is the default fallback storage limit in bytes
// that applies when disk usage threshold cannot be enforced due to an error.
dbStorageLimitFallback = 3 << 30

gb = float64(1 << 30)
)

type StorageManagerOptions func(*StorageManager)
Expand All @@ -62,6 +69,27 @@ func WithMeterProvider(mp metric.MeterProvider) StorageManagerOptions {
}
}

// WithGetDBSize configures getDBSize function used by StorageManager.
// For testing only.
func WithGetDBSize(getDBSize func() uint64) StorageManagerOptions {
return func(sm *StorageManager) {
sm.getDBSize = getDBSize
}
}

// WithGetDiskUsage configures getDiskUsage function used by StorageManager.
// For testing only.
func WithGetDiskUsage(getDiskUsage func() (DiskUsage, error)) StorageManagerOptions {
1pkg marked this conversation as resolved.
Show resolved Hide resolved
return func(sm *StorageManager) {
sm.getDiskUsage = getDiskUsage
}
}

// DiskUsage is the struct returned by getDiskUsage.
type DiskUsage struct {
UsedBytes, TotalBytes uint64
}

// StorageManager encapsulates pebble.DB.
// It assumes exclusive access to pebble DB at storageDir.
type StorageManager struct {
Expand All @@ -80,9 +108,20 @@ type StorageManager struct {
// subscriberPosMu protects the subscriber file from concurrent RW.
subscriberPosMu sync.Mutex

// getDBSize returns the total size of databases in bytes.
getDBSize func() uint64
// cachedDBSize is a cached result of db size.
cachedDBSize atomic.Uint64

// getDiskUsage returns the disk / filesystem usage statistics of storageDir.
getDiskUsage func() (DiskUsage, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call it fetchDiskUsage to support the distinction between why getDiskUsage() vs. diskStat is needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should have called it cachedDiskStat and changing diskStatFailed to getDiskUsageFailed. Keeping the call as getDiskUsage as the pebble function is called GetDiskUsage.

// getDiskUsageFailed indicates if getDiskUsage calls ever failed.
getDiskUsageFailed atomic.Bool
// cachedDiskStat is disk usage statistics about the disk only, not related to the databases.
cachedDiskStat struct {
used, total atomic.Uint64
}

// runCh acts as a mutex to ensure only 1 Run is actively running per StorageManager.
// as it is possible that 2 separate Run are created by 2 TBS processors during a hot reload.
runCh chan struct{}
Expand All @@ -100,6 +139,16 @@ func NewStorageManager(storageDir string, opts ...StorageManagerOptions) (*Stora
runCh: make(chan struct{}, 1),
logger: logp.NewLogger(logs.Sampling),
codec: ProtobufCodec{},
getDiskUsage: func() (DiskUsage, error) {
usage, err := vfs.Default.GetDiskUsage(storageDir)
return DiskUsage{
UsedBytes: usage.UsedBytes,
TotalBytes: usage.TotalBytes,
}, err
},
}
sm.getDBSize = func() uint64 {
return sm.eventDB.Metrics().DiskSpaceUsage() + sm.decisionDB.Metrics().DiskSpaceUsage()
}
for _, opt := range opts {
opt(sm)
Expand Down Expand Up @@ -210,7 +259,29 @@ func (sm *StorageManager) dbSize() uint64 {
}

func (sm *StorageManager) updateDiskUsage() {
sm.cachedDBSize.Store(sm.eventDB.Metrics().DiskSpaceUsage() + sm.decisionDB.Metrics().DiskSpaceUsage())
sm.cachedDBSize.Store(sm.getDBSize())

if sm.getDiskUsageFailed.Load() {
// Skip GetDiskUsage under the assumption that
// it will always get the same error if GetDiskUsage ever returns one,
// such that it does not keep logging GetDiskUsage errors.
return
}
usage, err := sm.getDiskUsage()
if err != nil {
sm.logger.With(logp.Error(err)).Warn("failed to get disk usage")
1pkg marked this conversation as resolved.
Show resolved Hide resolved
sm.getDiskUsageFailed.Store(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: While I can't think of what could lead to transient failures, I am not sure about always failing on error bit - something to think about in a future PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gives me a headache as well. What should existing disk usage threshold checks perform when getDiskUsage has transient failures, should it use a stale number, or should it become unlimited? With this assumption of always failing, it simplifies the implementation.

sm.cachedDiskStat.total.Store(0) // setting total to 0 to disable any running disk usage threshold checks
carsonip marked this conversation as resolved.
Show resolved Hide resolved
return
}
sm.cachedDiskStat.used.Store(usage.UsedBytes)
sm.cachedDiskStat.total.Store(usage.TotalBytes)
}

// diskUsed returns the actual used disk space in bytes.
// Not to be confused with dbSize which is specific to database.
func (sm *StorageManager) diskUsed() uint64 {
return sm.cachedDiskStat.used.Load()
}

// runDiskUsageLoop runs a loop that updates cached disk usage regularly.
Expand Down Expand Up @@ -338,33 +409,53 @@ func (sm *StorageManager) WriteSubscriberPosition(data []byte) error {
return os.WriteFile(filepath.Join(sm.storageDir, subscriberPositionFile), data, 0644)
}

// NewUnlimitedReadWriter returns a read writer with no storage limit.
// For testing only.
func (sm *StorageManager) NewUnlimitedReadWriter() StorageLimitReadWriter {
return sm.NewReadWriter(0)
}

// NewReadWriter returns a read writer with storage limit.
func (sm *StorageManager) NewReadWriter(storageLimit uint64) StorageLimitReadWriter {
splitRW := SplitReadWriter{
// NewReadWriter returns a read writer configured with storage limit and disk usage threshold.
func (sm *StorageManager) NewReadWriter(storageLimit uint64, diskUsageThreshold float64) RW {
var rw RW = SplitReadWriter{
eventRW: sm.eventStorage.NewReadWriter(),
decisionRW: sm.decisionStorage.NewReadWriter(),
}

dbStorageLimit := func() uint64 {
return storageLimit
}
if storageLimit == 0 {
sm.logger.Infof("setting database storage limit to unlimited")
} else {
sm.logger.Infof("setting database storage limit to %.1fgb", float64(storageLimit))
// If db storage limit is set, only enforce db storage limit.
if storageLimit > 0 {
// dbStorageLimit returns max size of db in bytes.
// If size of db exceeds dbStorageLimit, writes should be rejected.
dbStorageLimit := func() uint64 {
return storageLimit
}
sm.logger.Infof("setting database storage limit to %0.1fgb", float64(storageLimit)/gb)
dbStorageLimitChecker := NewStorageLimitCheckerFunc(sm.dbSize, dbStorageLimit)
rw = NewStorageLimitReadWriter("database storage limit", dbStorageLimitChecker, rw)
return rw
}

// To limit db size to storage_limit
dbStorageLimitChecker := NewStorageLimitCheckerFunc(sm.dbSize, dbStorageLimit)
dbStorageLimitRW := NewStorageLimitReadWriter("database storage limit", dbStorageLimitChecker, splitRW)
// DB storage limit is unlimited, enforce disk usage threshold if possible.
// Load whether getDiskUsage failed, as it was called during StorageManager initialization.
if sm.getDiskUsageFailed.Load() {
// Limit db size to fallback storage limit as getDiskUsage returned an error
dbStorageLimit := func() uint64 {
return dbStorageLimitFallback
}
sm.logger.Warnf("overriding database storage limit to fallback default of %0.1fgb as get disk usage failed", float64(dbStorageLimitFallback)/gb)
dbStorageLimitChecker := NewStorageLimitCheckerFunc(sm.dbSize, dbStorageLimit)
rw = NewStorageLimitReadWriter("database storage limit", dbStorageLimitChecker, rw)
return rw
}

return dbStorageLimitRW
// diskThreshold returns max used disk space in bytes, not in percentage.
// If size of used disk space exceeds diskThreshold, writes should be rejected.
diskThreshold := func() uint64 {
return uint64(float64(sm.cachedDiskStat.total.Load()) * diskUsageThreshold)
}
// the total disk space could change in runtime, but it is still useful to print it out in logs.
sm.logger.Infof("setting disk usage threshold to %.2f of total disk space of %0.1fgb", diskUsageThreshold, float64(sm.cachedDiskStat.total.Load())/gb)
diskThresholdChecker := NewStorageLimitCheckerFunc(sm.diskUsed, diskThreshold)
rw = NewStorageLimitReadWriter(
fmt.Sprintf("disk usage threshold %.2f", diskUsageThreshold),
diskThresholdChecker,
rw,
)
carsonip marked this conversation as resolved.
Show resolved Hide resolved
return rw
}

// wrapNonNilErr only wraps an error with format if the error is not nil.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func BenchmarkStorageManager_Size(b *testing.B) {
defer close(stopping)
sm := newStorageManager(b)
go sm.Run(stopping, time.Second)
rw := sm.NewUnlimitedReadWriter()
rw := newUnlimitedReadWriter(sm)
for i := 0; i < 1000; i++ {
traceID := uuid.Must(uuid.NewV4()).String()
txnID := uuid.Must(uuid.NewV4()).String()
Expand Down
Loading
Loading