Skip to content

Commit

Permalink
TBS: Replace badger with pebble (#15235)
Browse files Browse the repository at this point in the history
This PR replaces badger with pebble as the database for tail-based sampling. Significant performance gains.

The database of choice is Pebble, which does not have TTL handling built-in,
and we implement our own TTL handling on top of the database:
- TTL is divided up into N parts, where N is partitionsPerTTL.
- A database holds N + 1 + 1 partitions.
- Every TTL/N we will discard the oldest partition, so we keep a rolling window of N+1 partitions.
- Writes will go to the most recent partition, and we'll read across N+1 partitions
  • Loading branch information
carsonip authored Jan 29, 2025
1 parent 42e0cbf commit 0ca58b8
Show file tree
Hide file tree
Showing 35 changed files with 2,048 additions and 2,325 deletions.
673 changes: 383 additions & 290 deletions NOTICE.txt

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ https://github.com/elastic/apm-server/compare/8.16\...8.x[View commits]

[float]
==== Added
- Tail-based sampling: Storage layer is rewritten to use Pebble database instead of BadgerDB. The new implementation offers a substantial throughput increase while consuming significantly less memory. Disk usage is lower and more stable. See PR for benchmark details. {pull}15235[15235]
10 changes: 4 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.23.0
require (
github.com/KimMachineGun/automemlimit v0.7.0
github.com/cespare/xxhash/v2 v2.3.0
github.com/dgraph-io/badger/v2 v2.2007.4
github.com/cockroachdb/pebble/v2 v2.0.2
github.com/dustin/go-humanize v1.0.1
github.com/elastic/apm-aggregation v1.2.0
github.com/elastic/apm-data v1.16.0
Expand Down Expand Up @@ -60,20 +60,18 @@ require (
require (
github.com/DataDog/zstd v1.5.6 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/OneOfOne/xxhash v1.2.8 // indirect
github.com/armon/go-radix v1.0.0 // indirect
github.com/axiomhq/hyperloglog v0.2.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cockroachdb/crlib v0.0.0-20241015224233-894974b3ad94 // indirect
github.com/cockroachdb/errors v1.11.3 // indirect
github.com/cockroachdb/fifo v0.0.0-20240816210425-c5d0cb0b6fc0 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/pebble v1.1.2 // indirect
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/cockroachdb/swiss v0.0.0-20240612210725-f4de07ae6964 // indirect
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgraph-io/ristretto v0.2.0 // indirect
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140 // indirect
github.com/dlclark/regexp2 v1.8.1 // indirect
github.com/docker/go-connections v0.5.0 // indirect
Expand All @@ -97,7 +95,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/golang/snappy v0.0.5-0.20231225225746-43d5d4cd4e0e // indirect
github.com/gomodule/redigo v1.8.9 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
Expand Down
65 changes: 16 additions & 49 deletions go.sum

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions internal/beater/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,6 @@ func TestUnpackConfig(t *testing.T) {
ESConfig: elasticsearch.DefaultConfig(),
Interval: 1 * time.Minute,
IngestRateDecayFactor: 0.25,
StorageGCInterval: 5 * time.Minute,
StorageLimit: "3GB",
StorageLimitParsed: 3000000000,
TTL: 30 * time.Minute,
Expand Down Expand Up @@ -494,7 +493,6 @@ func TestUnpackConfig(t *testing.T) {
ESConfig: elasticsearch.DefaultConfig(),
Interval: 2 * time.Minute,
IngestRateDecayFactor: 1.0,
StorageGCInterval: 5 * time.Minute,
StorageLimit: "1GB",
StorageLimitParsed: 1000000000,
TTL: 30 * time.Minute,
Expand Down
2 changes: 0 additions & 2 deletions internal/beater/config/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type TailSamplingConfig struct {
ESConfig *elasticsearch.Config `config:"elasticsearch"`
Interval time.Duration `config:"interval" validate:"min=1s"`
IngestRateDecayFactor float64 `config:"ingest_rate_decay" validate:"min=0, max=1"`
StorageGCInterval time.Duration `config:"storage_gc_interval" validate:"min=1s"`
TTL time.Duration `config:"ttl" validate:"min=1s"`
StorageLimit string `config:"storage_limit"`
StorageLimitParsed uint64
Expand Down Expand Up @@ -151,7 +150,6 @@ func defaultTailSamplingConfig() TailSamplingConfig {
ESConfig: elasticsearch.DefaultConfig(),
Interval: 1 * time.Minute,
IngestRateDecayFactor: 0.25,
StorageGCInterval: 5 * time.Minute,
TTL: 30 * time.Minute,
StorageLimit: "3GB",
DiscardOnWriteFailure: false,
Expand Down
32 changes: 24 additions & 8 deletions internal/beater/monitoringtest/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,39 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

func ExpectOtelMetrics(t *testing.T, reader sdkmetric.Reader, expectedMetrics map[string]interface{}) {
assertOtelMetrics(t, reader, expectedMetrics, true)
func ExpectOtelMetrics(t *testing.T, reader sdkmetric.Reader, expectedMetrics map[string]any) {
assertOtelMetrics(t, reader, expectedMetrics, true, true)
}

func ExpectContainOtelMetrics(t *testing.T, reader sdkmetric.Reader, expectedMetrics map[string]interface{}) {
assertOtelMetrics(t, reader, expectedMetrics, false)
func ExpectContainOtelMetrics(t *testing.T, reader sdkmetric.Reader, expectedMetrics map[string]any) {
assertOtelMetrics(t, reader, expectedMetrics, false, true)
}

func assertOtelMetrics(t *testing.T, reader sdkmetric.Reader, expectedMetrics map[string]interface{}, match bool) {
func ExpectContainOtelMetricsKeys(t *testing.T, reader sdkmetric.Reader, expectedMetricsKeys []string) {
expectedMetrics := make(map[string]any)
for _, metricKey := range expectedMetricsKeys {
expectedMetrics[metricKey] = nil
}
assertOtelMetrics(t, reader, expectedMetrics, false, false)
}

func assertOtelMetrics(t *testing.T, reader sdkmetric.Reader, expectedMetrics map[string]any, match, matchVal bool) {
t.Helper()

var rm metricdata.ResourceMetrics
assert.NoError(t, reader.Collect(context.Background(), &rm))

assert.NotEqual(t, 0, len(rm.ScopeMetrics))
foundMetrics := []string{}
var foundMetrics []string
for _, sm := range rm.ScopeMetrics {

for _, m := range sm.Metrics {
switch d := m.Data.(type) {
case metricdata.Gauge[int64]:
assert.Equal(t, 1, len(d.DataPoints))
foundMetrics = append(foundMetrics, m.Name)
if !matchVal {
continue
}

if v, ok := expectedMetrics[m.Name]; ok {
if dp, ok := v.(int); ok {
Expand All @@ -62,6 +72,9 @@ func assertOtelMetrics(t *testing.T, reader sdkmetric.Reader, expectedMetrics ma
case metricdata.Sum[int64]:
assert.Equal(t, 1, len(d.DataPoints))
foundMetrics = append(foundMetrics, m.Name)
if !matchVal {
continue
}

if v, ok := expectedMetrics[m.Name]; ok {
if dp, ok := v.(int); ok {
Expand All @@ -75,6 +88,9 @@ func assertOtelMetrics(t *testing.T, reader sdkmetric.Reader, expectedMetrics ma
case metricdata.Histogram[int64]:
assert.Equal(t, 1, len(d.DataPoints))
foundMetrics = append(foundMetrics, m.Name)
if !matchVal {
continue
}

if v, ok := expectedMetrics[m.Name]; ok {
if dp, ok := v.(int); ok {
Expand All @@ -89,7 +105,7 @@ func assertOtelMetrics(t *testing.T, reader sdkmetric.Reader, expectedMetrics ma
}
}

expectedMetricsKeys := []string{}
var expectedMetricsKeys []string
for k := range expectedMetrics {
expectedMetricsKeys = append(expectedMetricsKeys, k)
}
Expand Down
81 changes: 27 additions & 54 deletions x-pack/apm-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,9 @@ const (
)

var (
// badgerDB holds the badger database to use when tail-based sampling is configured.
badgerMu sync.Mutex
badgerDB *eventstorage.StorageManager
badgerDBMetricRegistration metric.Registration

storageMu sync.Mutex
storage *eventstorage.ManagedReadWriter
// db holds the database to use when tail-based sampling is configured.
dbMu sync.Mutex
db *eventstorage.StorageManager

// samplerUUID is a UUID used to identify sampled trace ID documents
// published by this process.
Expand Down Expand Up @@ -112,11 +108,10 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
}

storageDir := paths.Resolve(paths.Data, tailSamplingStorageDir)
badgerDB, err = getBadgerDB(storageDir, args.MeterProvider)
db, err := getDB(storageDir, args.MeterProvider)
if err != nil {
return nil, fmt.Errorf("failed to get Badger database: %w", err)
return nil, fmt.Errorf("failed to get tail-sampling database: %w", err)
}
readWriter := getStorage(badgerDB)

policies := make([]sampling.Policy, len(tailSamplingConfig.Policies))
for i, in := range tailSamplingConfig.Policies {
Expand Down Expand Up @@ -151,48 +146,30 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
UUID: samplerUUID.String(),
},
StorageConfig: sampling.StorageConfig{
DB: badgerDB,
Storage: readWriter,
StorageDir: storageDir,
StorageGCInterval: tailSamplingConfig.StorageGCInterval,
DB: db,
Storage: db.NewReadWriter(),
StorageLimit: tailSamplingConfig.StorageLimitParsed,
TTL: tailSamplingConfig.TTL,
DiscardOnWriteFailure: tailSamplingConfig.DiscardOnWriteFailure,
},
})
}

func getBadgerDB(storageDir string, mp metric.MeterProvider) (*eventstorage.StorageManager, error) {
badgerMu.Lock()
defer badgerMu.Unlock()
if badgerDB == nil {
sm, err := eventstorage.NewStorageManager(storageDir)
func getDB(storageDir string, mp metric.MeterProvider) (*eventstorage.StorageManager, error) {
dbMu.Lock()
defer dbMu.Unlock()
if db == nil {
var opts []eventstorage.StorageManagerOptions
if mp != nil {
opts = append(opts, eventstorage.WithMeterProvider(mp))
}
sm, err := eventstorage.NewStorageManager(storageDir, opts...)
if err != nil {
return nil, err
}
badgerDB = sm

meter := mp.Meter("github.com/elastic/apm-server/x-pack/apm-server")
lsmSizeGauge, _ := meter.Int64ObservableGauge("apm-server.sampling.tail.storage.lsm_size")
valueLogSizeGauge, _ := meter.Int64ObservableGauge("apm-server.sampling.tail.storage.value_log_size")

badgerDBMetricRegistration, _ = meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
lsmSize, valueLogSize := sm.Size()
o.ObserveInt64(lsmSizeGauge, lsmSize)
o.ObserveInt64(valueLogSizeGauge, valueLogSize)
return nil
}, lsmSizeGauge, valueLogSizeGauge)
db = sm
}
return badgerDB, nil
}

func getStorage(sm *eventstorage.StorageManager) *eventstorage.ManagedReadWriter {
storageMu.Lock()
defer storageMu.Unlock()
if storage == nil {
storage = sm.NewReadWriter()
}
return storage
return db, nil
}

// runServerWithProcessors runs the APM Server and the given list of processors.
Expand Down Expand Up @@ -256,25 +233,21 @@ func wrapServer(args beater.ServerParams, runServer beater.RunServerFunc) (beate
return args, wrappedRunServer, nil
}

// closeBadger is called at process exit time to close the badger.DB opened
// closeDB is called at process exit time to close the StorageManager opened
// by the tail-based sampling processor constructor, if any. This is never
// called concurrently with opening badger.DB/accessing the badgerDB global,
// so it does not need to hold badgerMu.
func closeBadger() error {
if badgerDBMetricRegistration != nil {
badgerDBMetricRegistration.Unregister()
badgerDBMetricRegistration = nil
}
if badgerDB != nil {
db := badgerDB
badgerDB = nil
return db.Close()
// called concurrently with opening DB/accessing the db global,
// so it does not need to hold dbMu.
func closeDB() error {
if db != nil {
err := db.Close()
db = nil
return err
}
return nil
}

func cleanup() error {
return closeBadger()
return closeDB()
}

func Main() error {
Expand Down
Loading

0 comments on commit 0ca58b8

Please sign in to comment.