Skip to content

Commit

Permalink
Merge branch 'main' into update-9.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip authored Jan 31, 2025
2 parents 79c1dd1 + 28068bd commit 80bbde8
Show file tree
Hide file tree
Showing 18 changed files with 147 additions and 79 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ jobs:
- name: Import GPG key
uses: crazy-max/ghaction-import-gpg@cb9bde2e2525e640591a934b1fd28eef1dcaf5e5 # v6.2.0
with:
gpg_private_key: ${{ secrets.APM_SERVER_RELEASE_GPG_PRIVATE_KEY }}
passphrase: ${{ secrets.APM_SERVER_RELEASE_PASSPHRASE }}
gpg_private_key: ${{ secrets.GPG_PRIVATE_KEY }}
passphrase: ${{ secrets.GPG_PASSPHRASE }}
git_user_signingkey: true
git_commit_gpgsign: true

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/run-major-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ jobs:
- name: Import GPG key
uses: crazy-max/ghaction-import-gpg@cb9bde2e2525e640591a934b1fd28eef1dcaf5e5 # v6.2.0
with:
gpg_private_key: ${{ secrets.APM_SERVER_RELEASE_GPG_PRIVATE_KEY }}
passphrase: ${{ secrets.APM_SERVER_RELEASE_PASSPHRASE }}
gpg_private_key: ${{ secrets.GPG_PRIVATE_KEY }}
passphrase: ${{ secrets.GPG_PASSPHRASE }}
git_user_signingkey: true
git_commit_gpgsign: true

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/run-minor-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ jobs:
- name: Import GPG key
uses: crazy-max/ghaction-import-gpg@cb9bde2e2525e640591a934b1fd28eef1dcaf5e5 # v6.2.0
with:
gpg_private_key: ${{ secrets.APM_SERVER_RELEASE_GPG_PRIVATE_KEY }}
passphrase: ${{ secrets.APM_SERVER_RELEASE_PASSPHRASE }}
gpg_private_key: ${{ secrets.GPG_PRIVATE_KEY }}
passphrase: ${{ secrets.GPG_PASSPHRASE }}
git_user_signingkey: true
git_commit_gpgsign: true

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/run-patch-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ jobs:
- name: Import GPG key
uses: crazy-max/ghaction-import-gpg@cb9bde2e2525e640591a934b1fd28eef1dcaf5e5 # v6.2.0
with:
gpg_private_key: ${{ secrets.APM_SERVER_RELEASE_GPG_PRIVATE_KEY }}
passphrase: ${{ secrets.APM_SERVER_RELEASE_PASSPHRASE }}
gpg_private_key: ${{ secrets.GPG_PRIVATE_KEY }}
passphrase: ${{ secrets.GPG_PASSPHRASE }}
git_user_signingkey: true
git_commit_gpgsign: true

Expand Down
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3289,11 +3289,11 @@ Contents of probable licence file $GOMODCACHE/github.com/spf13/[email protected]/LICE

--------------------------------------------------------------------------------
Dependency : github.com/spf13/pflag
Version: v1.0.5
Version: v1.0.6
Licence type (autodetected): BSD-3-Clause
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/spf13/[email protected].5/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/spf13/[email protected].6/LICENSE:

Copyright (c) 2012 Alex Ogier. All rights reserved.
Copyright (c) 2012 The Go Authors. All rights reserved.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/ryanuber/go-glob v1.0.0
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/spf13/pflag v1.0.6
github.com/stretchr/testify v1.10.0
go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.3
go.elastic.co/apm/module/apmgorilla/v2 v2.6.3
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,9 @@ github.com/shirou/gopsutil/v4 v4.24.9/go.mod h1:3fkaHNeYsUFCGZ8+9vZVWtbyM1k2eRnl
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o=
github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
Expand Down
3 changes: 1 addition & 2 deletions x-pack/apm-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
},
StorageConfig: sampling.StorageConfig{
DB: db,
Storage: db.NewReadWriter(),
StorageLimit: tailSamplingConfig.StorageLimitParsed,
Storage: db.NewReadWriter(tailSamplingConfig.StorageLimitParsed),
TTL: tailSamplingConfig.TTL,
DiscardOnWriteFailure: tailSamplingConfig.DiscardOnWriteFailure,
},
Expand Down
6 changes: 3 additions & 3 deletions x-pack/apm-server/sampling/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ type StorageConfig struct {
// Storage is the read writer to DB.
Storage eventstorage.RW

// StorageLimit for the TBS database, in bytes.
StorageLimit uint64

// TTL holds the amount of time before events and sampling decisions
// are expired from local storage.
TTL time.Duration
Expand Down Expand Up @@ -238,6 +235,9 @@ func (config StorageConfig) validate() error {
if config.DB == nil {
return errors.New("DB unspecified")
}
if config.Storage == nil {
return errors.New("Storage unspecified")
}
if config.TTL <= 0 {
return errors.New("TTL unspecified or negative")
}
Expand Down
3 changes: 3 additions & 0 deletions x-pack/apm-server/sampling/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func TestNewProcessorConfigInvalid(t *testing.T) {
assertInvalidConfigError("invalid storage config: DB unspecified")
config.DB = &eventstorage.StorageManager{}

assertInvalidConfigError("invalid storage config: Storage unspecified")
config.Storage = &eventstorage.SplitReadWriter{}

assertInvalidConfigError("invalid storage config: TTL unspecified or negative")
config.TTL = 1
}
27 changes: 24 additions & 3 deletions x-pack/apm-server/sampling/eventstorage/rw.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
var (
// ErrLimitReached is returned by RW methods when storage usage
// is greater than configured limit.
ErrLimitReached = errors.New("configured storage limit reached")
ErrLimitReached = errors.New("configured limit reached")
)

// RW is a read writer interface that has methods to read and write trace event and sampling decisions.
Expand Down Expand Up @@ -62,15 +62,36 @@ type storageLimitChecker interface {
StorageLimit() uint64
}

type storageLimitCheckerFunc struct {
diskUsage, storageLimit func() uint64
}

func NewStorageLimitCheckerFunc(diskUsage, storageLimit func() uint64) storageLimitCheckerFunc {
return storageLimitCheckerFunc{
diskUsage: diskUsage,
storageLimit: storageLimit,
}
}

func (f storageLimitCheckerFunc) DiskUsage() uint64 {
return f.diskUsage()
}

func (f storageLimitCheckerFunc) StorageLimit() uint64 {
return f.storageLimit()
}

// StorageLimitReadWriter is a RW that forbids Write* method calls based on disk usage and limit from storageLimitChecker.
// If there is no limit or limit is not reached, method calls are passed through to nextRW.
type StorageLimitReadWriter struct {
name string
checker storageLimitChecker
nextRW RW
}

func NewStorageLimitReadWriter(checker storageLimitChecker, nextRW RW) StorageLimitReadWriter {
func NewStorageLimitReadWriter(name string, checker storageLimitChecker, nextRW RW) StorageLimitReadWriter {
return StorageLimitReadWriter{
name: name,
checker: checker,
nextRW: nextRW,
}
Expand All @@ -81,7 +102,7 @@ func (s StorageLimitReadWriter) checkStorageLimit() error {
if limit != 0 { // unlimited storage
usage := s.checker.DiskUsage()
if usage >= limit {
return fmt.Errorf("%w (current: %d, limit %d)", ErrLimitReached, usage, limit)
return fmt.Errorf("%s: %w (current: %d, limit: %d)", s.name, ErrLimitReached, usage, limit)
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion x-pack/apm-server/sampling/eventstorage/rw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestStorageLimitReadWriter(t *testing.T) {
t.Run(fmt.Sprintf("limit=%d,usage=%d", tt.limit, tt.usage), func(t *testing.T) {
checker := mockChecker{limit: tt.limit, usage: tt.usage}
var callCount int
rw := eventstorage.NewStorageLimitReadWriter(checker, mockRW{
rw := eventstorage.NewStorageLimitReadWriter("foo_storage_limiter", checker, mockRW{
callback: func() {
callCount++
},
Expand Down
10 changes: 5 additions & 5 deletions x-pack/apm-server/sampling/eventstorage/storage_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
func BenchmarkWriteTransaction(b *testing.B) {
test := func(b *testing.B, codec eventstorage.Codec, bigTX bool) {
sm := newStorageManager(b, eventstorage.WithCodec(codec))
readWriter := sm.NewReadWriter()
readWriter := sm.NewUnlimitedReadWriter()

traceID := hex.EncodeToString([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
transactionID := hex.EncodeToString([]byte{1, 2, 3, 4, 5, 6, 7, 8})
Expand Down Expand Up @@ -79,7 +79,7 @@ func BenchmarkReadEvents(b *testing.B) {
for _, count := range counts {
b.Run(fmt.Sprintf("%d events", count), func(b *testing.B) {
sm := newStorageManager(b, eventstorage.WithCodec(codec))
readWriter := sm.NewReadWriter()
readWriter := sm.NewUnlimitedReadWriter()

for i := 0; i < count; i++ {
transactionID := uuid.Must(uuid.NewV4()).String()
Expand Down Expand Up @@ -154,7 +154,7 @@ func BenchmarkReadEventsHit(b *testing.B) {
for _, hit := range []bool{false, true} {
b.Run(fmt.Sprintf("hit=%v", hit), func(b *testing.B) {
sm := newStorageManager(b)
readWriter := sm.NewReadWriter()
readWriter := sm.NewUnlimitedReadWriter()

traceIDs := make([]string, b.N)

Expand Down Expand Up @@ -185,7 +185,7 @@ func BenchmarkReadEventsHit(b *testing.B) {
}
}

readWriter = sm.NewReadWriter()
readWriter = sm.NewUnlimitedReadWriter()

b.ResetTimer()
var batch modelpb.Batch
Expand Down Expand Up @@ -224,7 +224,7 @@ func BenchmarkIsTraceSampled(b *testing.B) {

// Test with varying numbers of events in the trace.
sm := newStorageManager(b)
readWriter := sm.NewReadWriter()
readWriter := sm.NewUnlimitedReadWriter()

if err := readWriter.WriteTraceSampled(sampledTraceUUID.String(), true); err != nil {
b.Fatal(err)
Expand Down
54 changes: 34 additions & 20 deletions x-pack/apm-server/sampling/eventstorage/storage_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,13 @@ type StorageManager struct {

partitioner *Partitioner

storageLimit atomic.Uint64

codec Codec

// subscriberPosMu protects the subscriber file from concurrent RW.
subscriberPosMu sync.Mutex

// cachedDiskUsage is a cached result of DiskUsage
cachedDiskUsage atomic.Uint64
// cachedDBSize is a cached result of db size.
cachedDBSize atomic.Uint64

// runCh acts as a mutex to ensure only 1 Run is actively running per StorageManager.
// as it is possible that 2 separate Run are created by 2 TBS processors during a hot reload.
Expand Down Expand Up @@ -202,17 +200,17 @@ func (sm *StorageManager) Size() (lsm, vlog int64) {
// Also remember to update
// - x-pack/apm-server/sampling/processor.go:CollectMonitoring
// - systemtest/benchtest/expvar/metrics.go
return int64(sm.DiskUsage()), 0
return int64(sm.dbSize()), 0
}

// DiskUsage returns the disk usage of databases in bytes.
func (sm *StorageManager) DiskUsage() uint64 {
// dbSize returns the disk usage of databases in bytes.
func (sm *StorageManager) dbSize() uint64 {
// pebble DiskSpaceUsage overhead is not high, but it adds up when performed per-event.
return sm.cachedDiskUsage.Load()
return sm.cachedDBSize.Load()
}

func (sm *StorageManager) updateDiskUsage() {
sm.cachedDiskUsage.Store(sm.eventDB.Metrics().DiskSpaceUsage() + sm.decisionDB.Metrics().DiskSpaceUsage())
sm.cachedDBSize.Store(sm.eventDB.Metrics().DiskSpaceUsage() + sm.decisionDB.Metrics().DiskSpaceUsage())
}

// runDiskUsageLoop runs a loop that updates cached disk usage regularly.
Expand All @@ -229,10 +227,6 @@ func (sm *StorageManager) runDiskUsageLoop(stopping <-chan struct{}) error {
}
}

func (sm *StorageManager) StorageLimit() uint64 {
return sm.storageLimit.Load()
}

func (sm *StorageManager) Flush() error {
return errors.Join(
wrapNonNilErr("event db flush error: %w", sm.eventDB.Flush()),
Expand Down Expand Up @@ -260,7 +254,7 @@ func (sm *StorageManager) close() error {

// Reload flushes out pending disk writes to disk by reloading the database.
// For testing only.
// Read writers created prior to Reload cannot be used and will need to be recreated via NewReadWriter.
// Read writers created prior to Reload cannot be used and will need to be recreated via NewUnlimitedReadWriter.
func (sm *StorageManager) Reload() error {
if err := sm.close(); err != nil {
return err
Expand All @@ -269,7 +263,7 @@ func (sm *StorageManager) Reload() error {
}

// Run has the same lifecycle as the TBS processor as opposed to StorageManager to facilitate EA hot reload.
func (sm *StorageManager) Run(stopping <-chan struct{}, ttl time.Duration, storageLimit uint64) error {
func (sm *StorageManager) Run(stopping <-chan struct{}, ttl time.Duration) error {
select {
case <-stopping:
return nil
Expand All @@ -279,8 +273,6 @@ func (sm *StorageManager) Run(stopping <-chan struct{}, ttl time.Duration, stora
<-sm.runCh
}()

sm.storageLimit.Store(storageLimit)

g := errgroup.Group{}
g.Go(func() error {
return sm.runTTLGCLoop(stopping, ttl)
Expand Down Expand Up @@ -346,11 +338,33 @@ func (sm *StorageManager) WriteSubscriberPosition(data []byte) error {
return os.WriteFile(filepath.Join(sm.storageDir, subscriberPositionFile), data, 0644)
}

func (sm *StorageManager) NewReadWriter() StorageLimitReadWriter {
return NewStorageLimitReadWriter(sm, SplitReadWriter{
// NewUnlimitedReadWriter returns a read writer with no storage limit.
// For testing only.
func (sm *StorageManager) NewUnlimitedReadWriter() StorageLimitReadWriter {
return sm.NewReadWriter(0)
}

// NewReadWriter returns a read writer with storage limit.
func (sm *StorageManager) NewReadWriter(storageLimit uint64) StorageLimitReadWriter {
splitRW := SplitReadWriter{
eventRW: sm.eventStorage.NewReadWriter(),
decisionRW: sm.decisionStorage.NewReadWriter(),
})
}

dbStorageLimit := func() uint64 {
return storageLimit
}
if storageLimit == 0 {
sm.logger.Infof("setting database storage limit to unlimited")
} else {
sm.logger.Infof("setting database storage limit to %.1fgb", float64(storageLimit))
}

// To limit db size to storage_limit
dbStorageLimitChecker := NewStorageLimitCheckerFunc(sm.dbSize, dbStorageLimit)
dbStorageLimitRW := NewStorageLimitReadWriter("database storage limit", dbStorageLimitChecker, splitRW)

return dbStorageLimitRW
}

// wrapNonNilErr only wraps an error with format if the error is not nil.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (
"github.com/stretchr/testify/require"
)

func BenchmarkStorageManager_DiskUsage(b *testing.B) {
func BenchmarkStorageManager_Size(b *testing.B) {
stopping := make(chan struct{})
defer close(stopping)
sm := newStorageManager(b)
go sm.Run(stopping, time.Second, 0)
rw := sm.NewReadWriter()
go sm.Run(stopping, time.Second)
rw := sm.NewUnlimitedReadWriter()
for i := 0; i < 1000; i++ {
traceID := uuid.Must(uuid.NewV4()).String()
txnID := uuid.Must(uuid.NewV4()).String()
Expand All @@ -29,7 +29,7 @@ func BenchmarkStorageManager_DiskUsage(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = sm.DiskUsage()
_, _ = sm.Size()
}
b.StopTimer()
}
Loading

0 comments on commit 80bbde8

Please sign in to comment.