Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TBS: set default sampling.tail.storage_limit to 0 but limit disk usage to 90% #15467

Merged
merged 51 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
0e138ff
PoC on disk threshold
carsonip Jan 30, 2025
34406b7
Remove default storage limit
carsonip Jan 30, 2025
a614778
Cache results
carsonip Jan 30, 2025
27d2e21
Fix default storage limit test
carsonip Jan 30, 2025
a047c91
Use rate limited logger
carsonip Jan 30, 2025
6bc1078
Update comment
carsonip Jan 30, 2025
b409d03
Better error message
carsonip Jan 30, 2025
28e440f
Refactor
carsonip Jan 30, 2025
e51d329
Refactor
carsonip Jan 30, 2025
e550b11
Rename
carsonip Jan 30, 2025
954eca1
Fix compile error
carsonip Jan 30, 2025
38489a9
Add comment
carsonip Jan 30, 2025
cf9e2da
Add fallback to GetDiskUsage error
carsonip Jan 30, 2025
7c89ced
Disable disk_threshold on first error
carsonip Jan 30, 2025
454ab84
Add FIXME
carsonip Jan 30, 2025
d6c84b8
Ensure that fallback is correct
carsonip Jan 30, 2025
40ed22a
Merge
carsonip Jan 30, 2025
b602c0a
Move fallback logic to NewReadWriter
carsonip Jan 30, 2025
fa9f12c
Remove rateLimitedLogger
carsonip Jan 30, 2025
f333764
Add configurable disk threshold
carsonip Jan 30, 2025
d23c40d
Fix tests
carsonip Jan 30, 2025
06cea30
Use atomics
carsonip Jan 30, 2025
fe5fa61
Allow for unlimited even in case of error
carsonip Jan 30, 2025
5c0a7d9
Update comment
carsonip Jan 30, 2025
5474d43
Rename
carsonip Jan 30, 2025
a22f2a3
Log disk threshold configuration
carsonip Jan 30, 2025
6342883
Add disk threshold test
carsonip Jan 30, 2025
2e4206e
Improve comment
carsonip Jan 30, 2025
0144ddb
Use 0,0
carsonip Jan 30, 2025
6988450
Add special case handling
carsonip Jan 30, 2025
af3fe53
More descriptive name
carsonip Jan 30, 2025
494a540
Print disk space
carsonip Jan 30, 2025
c6403d5
Improve comments
carsonip Jan 30, 2025
e66e070
Fix logs
carsonip Jan 30, 2025
e2034b5
Logs
carsonip Jan 30, 2025
5bc7004
Revert fallback to 3GB
carsonip Jan 31, 2025
1904ae0
diskStat->cachedDiskStat, diskStatFailed->getDiskUsageFailed
carsonip Jan 31, 2025
839efb3
Change default to an explicit 0
carsonip Jan 31, 2025
682f7d3
Move unlimitedReadWriter
carsonip Jan 31, 2025
18fe30b
disk threshold ratio -> disk usage threshold
carsonip Jan 31, 2025
ca62f9f
Enforce only 1 of storage limit or disk usage threshold
carsonip Jan 31, 2025
370c7ea
Rename
carsonip Jan 31, 2025
0678a5f
Update tests
carsonip Jan 31, 2025
1e1b4cf
Update StorageLimitRW name
carsonip Jan 31, 2025
bdff329
Improve comment
carsonip Jan 31, 2025
0d74181
Reduce diff
carsonip Jan 31, 2025
856a5bc
Better logs
carsonip Jan 31, 2025
615e94d
Add comment
carsonip Jan 31, 2025
7bcfc60
Set used to 0 as well
carsonip Jan 31, 2025
4677241
Merge branch 'main' into disk-threshold
carsonip Jan 31, 2025
235d1ed
Add changelog
carsonip Jan 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions internal/beater/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,9 @@ func TestUnpackConfig(t *testing.T) {
ESConfig: elasticsearch.DefaultConfig(),
Interval: 1 * time.Minute,
IngestRateDecayFactor: 0.25,
StorageLimit: "3GB",
StorageLimitParsed: 3000000000,
StorageLimit: "",
StorageLimitParsed: 0,
DiskThresholdRatio: 0.9,
TTL: 30 * time.Minute,
},
},
Expand Down Expand Up @@ -410,11 +411,12 @@ func TestUnpackConfig(t *testing.T) {
},
},
"sampling.tail": map[string]interface{}{
"enabled": false,
"policies": []map[string]interface{}{{"sample_rate": 0.5}},
"interval": "2m",
"ingest_rate_decay": 1.0,
"storage_limit": "1GB",
"enabled": false,
"policies": []map[string]interface{}{{"sample_rate": 0.5}},
"interval": "2m",
"ingest_rate_decay": 1.0,
"storage_limit": "1GB",
"disk_threshold_ratio": 0.8,
},
"data_streams": map[string]interface{}{
"namespace": "foo",
Expand Down Expand Up @@ -495,6 +497,7 @@ func TestUnpackConfig(t *testing.T) {
IngestRateDecayFactor: 1.0,
StorageLimit: "1GB",
StorageLimitParsed: 1000000000,
DiskThresholdRatio: 0.8,
TTL: 30 * time.Minute,
},
},
Expand Down
38 changes: 25 additions & 13 deletions internal/beater/config/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,23 @@ type TailSamplingConfig struct {
Interval time.Duration `config:"interval" validate:"min=1s"`
IngestRateDecayFactor float64 `config:"ingest_rate_decay" validate:"min=0, max=1"`
TTL time.Duration `config:"ttl" validate:"min=1s"`
StorageLimit string `config:"storage_limit"`
StorageLimitParsed uint64

// StorageLimit is the user-configured tail-sampling database size limit.
// 0 means unlimited storage.
StorageLimit string `config:"storage_limit"`
StorageLimitParsed uint64

// DiskThresholdRatio controls the proportion of the disk to be filled at max, irrespective of db size.
Copy link
Member Author

@carsonip carsonip Jan 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[to reviewer] Q: @lahsivjar suggested that it doesn't need to be configurable. I've made it configurable but undocumented atm. As StorageLimit default is changed, we'll have to update observability docs anyway. I'm thinking we still have to either document this config, or describe this new default behavior as if it is not configurable. I'm inclined to document it, but it also means we'll have to support it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having this as a config option is okay for now. If we want to expose this (either as a config or documentation) we should discuss a bit on how we want both the options to interact and the user experience it entails. To start with, I would recommend documenting this (using up to 90% of the available disk) as default TBS behaviour with a mention on how to tweak the max using storage_limit option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having this as a config option is okay for now.

Once we expose it, we cannot simply revert this decision.

With the current implementation, one needs to be aware of both config options and understand how they play together. While also checking for not exceeding a certain threshold is a guardrail, it also adds complexity for the user. I am not a huge fan of this complexity.
If the DiskThresholdRatio would only be checked if no StorageLimit is set, I think it would be easier to grasp. Either the APM Server always tries to write up to X GB or it writes up to x% of the overall disk size, but only if available.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we expose it, we cannot simply revert this decision.

Hmm, I was thinking if it is exposed as undocumented configuration (i.e. only documenting the behaviour) we could revert the decision but if this is not true then I would be reluctant to release this config.

If the DiskThresholdRatio would only be checked if no StorageLimit is set, I think it would be easier to grasp. Either the APM Server always tries to write up to X GB or it writes up to x% of the overall disk size, but only if available.

This sounds good to me, however, I would advocate for starting with the x% of overall disk size as a fixed default behaviour for now because I am not sure if this config is the best way forward. So, the behavior would be, if storage_limit is set then that is obeyed, otherwise, 90% of the overall disk size would be used for TBS. Does this sound reasonable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with keeping it an undocumented config, and disable disk usage threshold check whenever StorageLimit is set. The edge case we'll have to handle is when user sets it to 0.

  • What if the user really wants unlimited storage limit, without disk usage threshold check?
  • For the above case, what if GetDiskUsage returns an error?

i.e. are we happy with

  • no more truly unlimited storage limit, either set storage_limit to non-0 and TBS will use that absolute value, or set it to 0 and disk usage threshold will be limited to 90%

Alternatives:

  • make StorageLimit not an uint64 but an int64, where <0 means truly unlimited and disables disk usage threshold?

I'm happy with no more truly unlimited storage limit but just checking if we want an escape hatch for it. I think it is ok to tell users who want a truly unlimited to set the StorageLimit as maxUInt64.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated PR to implement what @simitt suggested. Any non-0 storage limit disables disk usage threshold checks. On non-0 storage limit, disk usage threshold checks using the undocumented default disk_usage_threshold=0.9, unless getDiskUsage errored, which would then fallback to 3GB db storage limit.

// e.g. 0.9 means the last 10% of disk should not be written to.
//
// This is additional to StorageLimit which controls db size,
// i.e. if both StorageLimit and DiskThresholdRatio are set, both are enforced.
// If StorageLimit is 0 and any error occurs when getting filesystem stats,
// a fallback StorageLimit will be applied.
// To disable the fallback to allow unlimited db size and disk threshold under all circumstances,
// set both StorageLimit and DiskThresholdRatio to 0.
DiskThresholdRatio float64 `config:"disk_threshold_ratio" validate:"min=0, max=1"`
carsonip marked this conversation as resolved.
Show resolved Hide resolved

DiscardOnWriteFailure bool `config:"discard_on_write_failure"`

esConfigured bool
Expand Down Expand Up @@ -90,15 +105,16 @@ func (c *TailSamplingConfig) Unpack(in *config.C) error {
err = errors.Wrap(err, "error unpacking config")
return nil
}
limit, err := humanize.ParseBytes(cfg.StorageLimit)
if err != nil {
return err
if cfg.StorageLimit != "" {
limit, err := humanize.ParseBytes(cfg.StorageLimit)
if err != nil {
return err
}
cfg.StorageLimitParsed = limit
}
cfg.StorageLimitParsed = limit
cfg.Enabled = in.Enabled()
*c = TailSamplingConfig(cfg)
c.esConfigured = in.HasField("elasticsearch")
c.StorageLimitParsed = limit
err = errors.Wrap(c.Validate(), "invalid config")
return nil
}
Expand Down Expand Up @@ -151,13 +167,9 @@ func defaultTailSamplingConfig() TailSamplingConfig {
Interval: 1 * time.Minute,
IngestRateDecayFactor: 0.25,
TTL: 30 * time.Minute,
StorageLimit: "3GB",
StorageLimit: "",
DiskThresholdRatio: 0.9,
DiscardOnWriteFailure: false,
}
parsed, err := humanize.ParseBytes(cfg.StorageLimit)
if err != nil {
panic(err)
}
cfg.StorageLimitParsed = parsed
return cfg
}
2 changes: 1 addition & 1 deletion x-pack/apm-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
},
StorageConfig: sampling.StorageConfig{
DB: db,
Storage: db.NewReadWriter(tailSamplingConfig.StorageLimitParsed),
Storage: db.NewReadWriter(tailSamplingConfig.StorageLimitParsed, tailSamplingConfig.DiskThresholdRatio),
TTL: tailSamplingConfig.TTL,
DiscardOnWriteFailure: tailSamplingConfig.DiscardOnWriteFailure,
},
Expand Down
105 changes: 95 additions & 10 deletions x-pack/apm-server/sampling/eventstorage/storage_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/cockroachdb/pebble/v2"
"github.com/cockroachdb/pebble/v2/vfs"
"go.opentelemetry.io/otel/metric"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -46,6 +47,10 @@ const (

// diskUsageFetchInterval is how often disk usage is fetched which is equivalent to how long disk usage is cached.
diskUsageFetchInterval = 1 * time.Second

// dbStorageLimitFallback is the default fallback storage limit in bytes
// that applies when disk threshold cannot be enforced due to an error.
dbStorageLimitFallback = 5 << 30
carsonip marked this conversation as resolved.
Show resolved Hide resolved
)

type StorageManagerOptions func(*StorageManager)
Expand All @@ -62,6 +67,19 @@ func WithMeterProvider(mp metric.MeterProvider) StorageManagerOptions {
}
}

// WithGetDiskUsage configures getDiskUsage function used by StorageManager.
// For testing only.
func WithGetDiskUsage(getDiskUsage func() (DiskUsage, error)) StorageManagerOptions {
1pkg marked this conversation as resolved.
Show resolved Hide resolved
return func(sm *StorageManager) {
sm.getDiskUsage = getDiskUsage
}
}

// DiskUsage is the struct returned by getDiskUsage.
type DiskUsage struct {
UsedBytes, TotalBytes uint64
}

// StorageManager encapsulates pebble.DB.
// It assumes exclusive access to pebble DB at storageDir.
type StorageManager struct {
Expand All @@ -83,6 +101,13 @@ type StorageManager struct {
// cachedDBSize is a cached result of db size.
cachedDBSize atomic.Uint64

getDiskUsage func() (DiskUsage, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call it fetchDiskUsage to support the distinction between why getDiskUsage() vs. diskStat is needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should have called it cachedDiskStat and changing diskStatFailed to getDiskUsageFailed. Keeping the call as getDiskUsage as the pebble function is called GetDiskUsage.

// diskStat is disk usage statistics about the disk only, not related to the databases.
diskStat struct {
used, total atomic.Uint64
}
diskStatFailed atomic.Bool

// runCh acts as a mutex to ensure only 1 Run is actively running per StorageManager.
// as it is possible that 2 separate Run are created by 2 TBS processors during a hot reload.
runCh chan struct{}
Expand All @@ -100,6 +125,13 @@ func NewStorageManager(storageDir string, opts ...StorageManagerOptions) (*Stora
runCh: make(chan struct{}, 1),
logger: logp.NewLogger(logs.Sampling),
codec: ProtobufCodec{},
getDiskUsage: func() (DiskUsage, error) {
usage, err := vfs.Default.GetDiskUsage(storageDir)
return DiskUsage{
UsedBytes: usage.UsedBytes,
TotalBytes: usage.TotalBytes,
}, err
},
}
for _, opt := range opts {
opt(sm)
Expand Down Expand Up @@ -211,6 +243,28 @@ func (sm *StorageManager) dbSize() uint64 {

func (sm *StorageManager) updateDiskUsage() {
sm.cachedDBSize.Store(sm.eventDB.Metrics().DiskSpaceUsage() + sm.decisionDB.Metrics().DiskSpaceUsage())

if sm.diskStatFailed.Load() {
// Skip GetDiskUsage under the assumption that
// it will always get the same error if GetDiskUsage ever returns one,
// such that it does not keep logging GetDiskUsage errors.
return
}
usage, err := sm.getDiskUsage()
if err != nil {
sm.logger.With(logp.Error(err)).Warn("failed to get disk usage")
1pkg marked this conversation as resolved.
Show resolved Hide resolved
sm.diskStatFailed.Store(true)
sm.diskStat.total.Store(0) // setting total to 0 to disable any running disk threshold checks
return
}
sm.diskStat.used.Store(usage.UsedBytes)
sm.diskStat.total.Store(usage.TotalBytes)
}

// diskUsed returns the actual used disk space in bytes.
// Not to be confused with dbSize which is specific to database.
func (sm *StorageManager) diskUsed() uint64 {
return sm.diskStat.used.Load()
}

// runDiskUsageLoop runs a loop that updates cached disk usage regularly.
Expand Down Expand Up @@ -341,30 +395,61 @@ func (sm *StorageManager) WriteSubscriberPosition(data []byte) error {
// NewUnlimitedReadWriter returns a read writer with no storage limit.
// For testing only.
carsonip marked this conversation as resolved.
Show resolved Hide resolved
func (sm *StorageManager) NewUnlimitedReadWriter() StorageLimitReadWriter {
return sm.NewReadWriter(0)
return sm.NewReadWriter(0, 0)
}

// NewReadWriter returns a read writer with storage limit.
func (sm *StorageManager) NewReadWriter(storageLimit uint64) StorageLimitReadWriter {
// NewReadWriter returns a read writer configured with storage limit and disk threshold ratio.
func (sm *StorageManager) NewReadWriter(storageLimit uint64, diskThresholdRatio float64) StorageLimitReadWriter {
splitRW := SplitReadWriter{
eventRW: sm.eventStorage.NewReadWriter(),
decisionRW: sm.decisionStorage.NewReadWriter(),
}

dbStorageLimit := func() uint64 {
return storageLimit
}
if storageLimit == 0 {
sm.logger.Infof("setting database storage limit to unlimited")
var dbStorageLimit func() uint64
if sm.diskStatFailed.Load() && storageLimit == 0 && diskThresholdRatio > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section feels a bit confusing, what if the disk usage hasn't ran yet and fails after NewReadWriter is called?

How about we add the disk usage fallback in the getDiskUsage itself and remove the error return? - this way the method has to add fallback by definitione

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getDiskUsage is called on StorageManager initialization, the first time a TBS processor is created, and before any processor is Run. I can add a comment to make it clear.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I see. It is still a bit complicated to maintain, what do you think about refactoring that bit to ensure fallback is encapsulated within the getDiskUsage method? - can be a follow-up PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ on refactoring in a follow up PR when my head is clearer.

getDiskUsage is designed to mock pebble GetDiskUsage without modifying internal state. cachedDiskStat, getDiskUsageFailed, cachedDBSize are all refreshed regularly throughout the lifetime of StorageManager and should be seen as a (cached) source of truth. Therefore, the use of getDiskUsageFailed here is almost like calling getDiskUsage within NewReadWriter and handling its error.

dbStorageLimit = func() uint64 {
return dbStorageLimitFallback
}
sm.logger.Warnf("failed to get disk usage; overriding storage_limit to fallback default %.1fgb", float64(dbStorageLimitFallback))
} else {
sm.logger.Infof("setting database storage limit to %.1fgb", float64(storageLimit))
dbStorageLimit = func() uint64 {
return storageLimit
}
if storageLimit == 0 {
sm.logger.Infof("setting database storage limit to unlimited")
} else {
sm.logger.Infof("setting database storage limit to %.1fgb", float64(storageLimit))
}
}

// To limit db size to storage_limit
dbStorageLimitChecker := NewStorageLimitCheckerFunc(sm.dbSize, dbStorageLimit)
dbStorageLimitRW := NewStorageLimitReadWriter("database storage limit", dbStorageLimitChecker, splitRW)

return dbStorageLimitRW
// diskThreshold returns max used disk space in bytes.
// After which, writes should be rejected.
var diskThreshold func() uint64
if sm.diskStatFailed.Load() {
diskThreshold = func() uint64 {
return 0 // unlimited
}
sm.logger.Warnf("failed to get disk usage; disabling disk threshold check")
} else {
diskThreshold = func() uint64 {
return uint64(float64(sm.diskStat.total.Load()) * diskThresholdRatio)
}
sm.logger.Infof("setting disk threshold ratio to %.2f", diskThresholdRatio)
}

// To limit actual disk usage percentage to diskThresholdRatio
diskThresholdChecker := NewStorageLimitCheckerFunc(sm.diskUsed, diskThreshold)
diskThresholdRW := NewStorageLimitReadWriter(
fmt.Sprintf("disk usage ratio exceeding threshold of %.2f", diskThresholdRatio),
Copy link
Contributor

@lahsivjar lahsivjar Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this used? The method def for NewStorageLimitReadWriter says that this is a name parameter (ref)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It first started as a name of the StorageLimitReadWriter, but now I'm using it more like a description. It is prepended to the error. While changing it to "disk usage threshold" makes it become a name again, having the value of it seems to help understand, because current: X, limit: Y in the error message are absolute values and is not very easy to understand: disk usage ratio exceeding threshold of 0.01: configured storage limit reached (current: 38973440, limit 21474836)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, tbh, it feels a bit of a smell. Maybe we can refactor it later to abstract the error outside StorageLimitReadWriter and remove the name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to do this now - we can do this in a future PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated it slightly to make the passed in argument more like a name. Abstracting the error outside StorageLimitReadWriter to create a DiskThresholdReadWriter is fine, but they would look too similar apart from the error / name.

diskThresholdChecker,
dbStorageLimitRW,
)
carsonip marked this conversation as resolved.
Show resolved Hide resolved

return diskThresholdRW
}

// wrapNonNilErr only wraps an error with format if the error is not nil.
Expand Down
Loading
Loading