Skip to content

Commit

Permalink
TBS: Add sampling.tail.discard_on_write_failure config (#15159) (#1…
Browse files Browse the repository at this point in the history
…5172)

Add config sampling.tail.discard_on_write_failure (default=false) for users to opt in to data loss when TBS storage limit is reached, as an escape hatch when increased ES indexing load is unacceptable.

(cherry picked from commit 9b40574)

Co-authored-by: Carson Ip <[email protected]>
  • Loading branch information
mergify[bot] and carsonip authored Jan 7, 2025
1 parent 23af97b commit 4177fba
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 18 deletions.
2 changes: 2 additions & 0 deletions internal/beater/config/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type TailSamplingConfig struct {
TTL time.Duration `config:"ttl" validate:"min=1s"`
StorageLimit string `config:"storage_limit"`
StorageLimitParsed uint64
DiscardOnWriteFailure bool `config:"discard_on_write_failure"`

esConfigured bool
}
Expand Down Expand Up @@ -153,6 +154,7 @@ func defaultTailSamplingConfig() TailSamplingConfig {
StorageGCInterval: 5 * time.Minute,
TTL: 30 * time.Minute,
StorageLimit: "3GB",
DiscardOnWriteFailure: false,
}
parsed, err := humanize.ParseBytes(cfg.StorageLimit)
if err != nil {
Expand Down
13 changes: 7 additions & 6 deletions x-pack/apm-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,13 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
UUID: samplerUUID.String(),
},
StorageConfig: sampling.StorageConfig{
DB: badgerDB,
Storage: readWriter,
StorageDir: storageDir,
StorageGCInterval: tailSamplingConfig.StorageGCInterval,
StorageLimit: tailSamplingConfig.StorageLimitParsed,
TTL: tailSamplingConfig.TTL,
DB: badgerDB,
Storage: readWriter,
StorageDir: storageDir,
StorageGCInterval: tailSamplingConfig.StorageGCInterval,
StorageLimit: tailSamplingConfig.StorageLimitParsed,
TTL: tailSamplingConfig.TTL,
DiscardOnWriteFailure: tailSamplingConfig.DiscardOnWriteFailure,
},
})
}
Expand Down
7 changes: 6 additions & 1 deletion x-pack/apm-server/sampling/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type StorageConfig struct {
//
// Storage lives outside processor lifecycle and will not be closed when processor
// is closed
Storage *eventstorage.ManagedReadWriter
Storage rw

// StorageDir holds the directory in which event storage will be maintained.
StorageDir string
Expand All @@ -118,6 +118,11 @@ type StorageConfig struct {
// TTL holds the amount of time before events and sampling decisions
// are expired from local storage.
TTL time.Duration

// DiscardOnWriteFailure defines indexing behavior when event storage write fails, e.g. when storage limit is reached.
// When set to false, TBS indexes all traces, and may significantly increase indexing load.
// When set to true, there will be data loss, resulting in broken traces.
DiscardOnWriteFailure bool
}

// Policy holds a tail-sampling policy: criteria for matching root transactions,
Expand Down
15 changes: 4 additions & 11 deletions x-pack/apm-server/sampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ type Processor struct {
stopMu sync.Mutex
stopping chan struct{}
stopped chan struct{}

indexOnWriteFailure bool
}

type eventMetrics struct {
Expand Down Expand Up @@ -76,11 +74,6 @@ func NewProcessor(config Config) (*Processor, error) {
eventMetrics: &eventMetrics{},
stopping: make(chan struct{}),
stopped: make(chan struct{}),
// NOTE(marclop) This behavior should be configurable so users who
// rely on tail sampling for cost cutting, can discard events once
// the disk is full.
// Index all traces when the storage limit is reached.
indexOnWriteFailure: true,
}
return p, nil
}
Expand Down Expand Up @@ -154,12 +147,12 @@ func (p *Processor) ProcessBatch(ctx context.Context, batch *modelpb.Batch) erro
if err != nil {
failed = true
stored = false
if p.indexOnWriteFailure {
report = true
p.rateLimitedLogger.Info("processing trace failed, indexing by default")
} else {
if p.config.DiscardOnWriteFailure {
report = false
p.rateLimitedLogger.Info("processing trace failed, discarding by default")
} else {
report = true
p.rateLimitedLogger.Info("processing trace failed, indexing by default")
}
}

Expand Down
63 changes: 63 additions & 0 deletions x-pack/apm-server/sampling/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,69 @@ func TestProcessRemoteTailSampling(t *testing.T) {
assert.Empty(t, batch)
}

type errorRW struct {
err error
}

func (m errorRW) ReadTraceEvents(traceID string, out *modelpb.Batch) error {
return m.err
}

func (m errorRW) WriteTraceEvent(traceID, id string, event *modelpb.APMEvent, opts eventstorage.WriterOpts) error {
return m.err
}

func (m errorRW) WriteTraceSampled(traceID string, sampled bool, opts eventstorage.WriterOpts) error {
return m.err
}

func (m errorRW) IsTraceSampled(traceID string) (bool, error) {
return false, eventstorage.ErrNotFound
}

func (m errorRW) DeleteTraceEvent(traceID, id string) error {
return m.err
}

func (m errorRW) Flush() error {
return m.err
}

func TestProcessDiscardOnWriteFailure(t *testing.T) {
for _, discard := range []bool{true, false} {
t.Run(fmt.Sprintf("discard=%v", discard), func(t *testing.T) {
config := newTempdirConfig(t)
config.DiscardOnWriteFailure = discard
config.Storage = errorRW{err: errors.New("boom")}
processor, err := sampling.NewProcessor(config)
require.NoError(t, err)
go processor.Run()
defer processor.Stop(context.Background())

in := modelpb.Batch{{
Trace: &modelpb.Trace{
Id: "0102030405060708090a0b0c0d0e0f10",
},
Span: &modelpb.Span{
Type: "type",
Id: "0102030405060708",
},
}}
out := in[:]
err = processor.ProcessBatch(context.Background(), &out)
require.NoError(t, err)

if discard {
// Discarding by default
assert.Empty(t, out)
} else {
// Indexing by default
assert.Equal(t, in, out)
}
})
}
}

func TestGroupsMonitoring(t *testing.T) {
config := newTempdirConfig(t)
config.MaxDynamicServices = 5
Expand Down

0 comments on commit 4177fba

Please sign in to comment.