From 5eafce8ea0834309190f4f81dd33d637b36ebf51 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Tue, 19 May 2020 20:51:10 +0800 Subject: [PATCH] sampling/eventstorage: introduce event storage Introduce a package which provides local event storage, tailored for tail-based sampling. Storage is backed by Badger (github.com/dgraph-io/badger). The Storage type provides two types for readiing/writing data: ReadWriter, and ShardedReadWriter. ReadWriter is not safe for concurrent access, while ShardedReadWriter is. ShardedReadWriter provides locked access to one of several ReadWriters, sharding on trace ID. Currently we encode transactions and spans as JSON. We should look into developing a more efficient codec later, using protobuf or similar. --- go.mod | 9 +- go.sum | 17 ++ .../apm-server/sampling/eventstorage/codec.go | 1 + .../sampling/eventstorage/jsoncodec.go | 34 +++ .../sampling/eventstorage/logger.go | 17 ++ .../sampling/eventstorage/sharded.go | 135 +++++++++ .../eventstorage/sharded_bench_test.go | 51 ++++ .../sampling/eventstorage/storage.go | 228 +++++++++++++++ .../eventstorage/storage_bench_test.go | 162 +++++++++++ .../sampling/eventstorage/storage_test.go | 266 ++++++++++++++++++ 10 files changed, 918 insertions(+), 2 deletions(-) create mode 100644 x-pack/apm-server/sampling/eventstorage/codec.go create mode 100644 x-pack/apm-server/sampling/eventstorage/jsoncodec.go create mode 100644 x-pack/apm-server/sampling/eventstorage/logger.go create mode 100644 x-pack/apm-server/sampling/eventstorage/sharded.go create mode 100644 x-pack/apm-server/sampling/eventstorage/sharded_bench_test.go create mode 100644 x-pack/apm-server/sampling/eventstorage/storage.go create mode 100644 x-pack/apm-server/sampling/eventstorage/storage_bench_test.go create mode 100644 x-pack/apm-server/sampling/eventstorage/storage_test.go diff --git a/go.mod b/go.mod index 831fb78f614..5aba41b0211 100644 --- a/go.mod +++ b/go.mod @@ -9,10 +9,12 @@ require ( github.com/cespare/xxhash/v2 v2.1.1 github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e github.com/client9/misspell v0.3.5-0.20180309020325-c0b55c823952 // indirect + github.com/dgraph-io/badger/v2 v2.0.3 + github.com/dgraph-io/ristretto v0.0.2 // indirect github.com/dlclark/regexp2 v1.2.1 // indirect github.com/dop251/goja v0.0.0-20200824171909-536f9d946569 // indirect github.com/dop251/goja_nodejs v0.0.0-20200811150831-9bc458b4bbeb // indirect - github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4 + github.com/dustin/go-humanize v1.0.0 github.com/elastic/beats/v7 v7.0.0-alpha2.0.20200824113715-49e8024953a4 github.com/elastic/go-elasticsearch/v7 v7.8.0 github.com/elastic/go-elasticsearch/v8 v8.0.0-20200819071622-59b6a186f8dd @@ -28,13 +30,16 @@ require ( github.com/google/addlicense v0.0.0-20190907113143-be125746c2c4 // indirect github.com/google/go-cmp v0.4.0 github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99 + github.com/hashicorp/go-multierror v1.1.0 github.com/hashicorp/golang-lru v0.5.3 github.com/ianlancetaylor/demangle v0.0.0-20200715173712-053cf528c12f // indirect github.com/jaegertracing/jaeger v1.16.0 github.com/jcmturner/gofork v1.0.0 // indirect github.com/josephspurrier/goversioninfo v1.2.0 // indirect + github.com/json-iterator/go v1.1.8 github.com/jstemmer/go-junit-report v0.9.1 github.com/klauspost/compress v1.9.3-0.20191122130757-c099ac9f21dd // indirect + github.com/kr/pretty v0.2.0 // indirect github.com/magefile/mage v1.10.0 github.com/mattn/go-colorable v0.1.7 // indirect github.com/mitchellh/hashstructure v1.0.0 // indirect @@ -60,7 +65,7 @@ require ( go.elastic.co/apm/module/apmgrpc v1.7.0 go.elastic.co/apm/module/apmhttp v1.7.2 go.elastic.co/ecszap v0.2.0 // indirect - go.elastic.co/fastjson v1.1.0 // indirect + go.elastic.co/fastjson v1.1.0 go.uber.org/atomic v1.6.0 go.uber.org/zap v1.15.0 golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a // indirect diff --git a/go.sum b/go.sum index d570968e8a4..8e50fd13d8e 100644 --- a/go.sum +++ b/go.sum @@ -63,6 +63,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DATA-DOG/godog v0.7.13/go.mod h1:z2OZ6a3X0/YAKVqLfVzYBwFt3j6uSt3Xrqa7XTtcQE0= github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= +github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM= +github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/Masterminds/semver v1.4.2 h1:WBLTQ37jOCzSLtXNdoo8bNM8876KhNqOKvrlGITgsTc= github.com/Masterminds/semver v1.4.2/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= github.com/Microsoft/go-winio v0.4.15-0.20190919025122-fc70bd9a86b5 h1:ygIc8M6trr62pF5DucadTWGdEB4mEyvzi0e2nbcmcyA= @@ -70,6 +72,7 @@ github.com/Microsoft/go-winio v0.4.15-0.20190919025122-fc70bd9a86b5/go.mod h1:tT github.com/Microsoft/hcsshim v0.8.7/go.mod h1:OHd7sQqRFrYd3RmSgbgji+ctCwkbq2wbEYNSzOYtcBQ= github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/OneOfOne/xxhash v1.2.5 h1:zl/OfRA6nftbBK9qTohYBJ5xvw6C/oNKizR7cZGl3cI= github.com/OneOfOne/xxhash v1.2.5/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= github.com/OpenPeeDeeP/depguard v1.0.1 h1:VlW4R6jmBIv3/u1JNlawEvJMM4J+dPORPaZasQee8Us= github.com/OpenPeeDeeP/depguard v1.0.1/go.mod h1:xsIw86fROiiwelg+jB2uM9PiKihMMmUx/1V+TNhjQvM= @@ -200,9 +203,16 @@ github.com/davecgh/go-xdr v0.0.0-20161123171359-e6a2ba005892/go.mod h1:CTDl0pzVz github.com/denisenkom/go-mssqldb v0.0.0-20200206145737-bbfc9a55622e/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU= github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY= github.com/devigned/tab v0.1.2-0.20190607222403-0c15cf42f9a2/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY= +github.com/dgraph-io/badger/v2 v2.0.3 h1:inzdf6VF/NZ+tJ8RwwYMjJMvsOALTHYdozn0qSl6XJI= +github.com/dgraph-io/badger/v2 v2.0.3/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM= +github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= +github.com/dgraph-io/ristretto v0.0.2 h1:a5WaUrDa0qm0YrAAS1tUykT5El3kt62KNZZeMxQn3po= +github.com/dgraph-io/ristretto v0.0.2/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgrijalva/jwt-go v3.2.1-0.20190620180102-5e25c22bd5d6+incompatible h1:4jGdduO4ceTJFKf0IhgaB8NJapGqKHwC2b4xQ/cXujM= github.com/dgrijalva/jwt-go v3.2.1-0.20190620180102-5e25c22bd5d6+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dgryski/go-sip13 v0.0.0-20190329191031-25c5027a8c7b/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/digitalocean/go-libvirt v0.0.0-20180301200012-6075ea3c39a1/go.mod h1:PRcPVAAma6zcLpFd4GZrjR/MRpood3TamjKI2m/z/Uw= @@ -225,6 +235,8 @@ github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6 h1:RrkoB0pT3gnj github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6/go.mod h1:hn7BA7c8pLvoGndExHudxTDKZ84Pyvv+90pbBjbTz0Y= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4 h1:qk/FSDDxo05wdJH28W+p5yivv7LuLYLRXPPD8KQCtZs= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= @@ -557,6 +569,7 @@ github.com/hashicorp/go-immutable-radix v1.1.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I= +github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI= github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA= @@ -652,6 +665,8 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxv github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= @@ -893,6 +908,7 @@ github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4k github.com/sourcegraph/go-diff v0.5.1 h1:gO6i5zugwzo1RVTvgvfwCOSVegNuvnNi6bAD1QCmkHs= github.com/sourcegraph/go-diff v0.5.1/go.mod h1:j2dHj3m8aZgQO8lMTcTnBcXkRRRqi34cd2MNlA9u1mE= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc= @@ -1158,6 +1174,7 @@ golang.org/x/sys v0.0.0-20190529164535-6a60838ec259/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190616124812-15dcb6c0061f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/x-pack/apm-server/sampling/eventstorage/codec.go b/x-pack/apm-server/sampling/eventstorage/codec.go new file mode 100644 index 00000000000..fb541aeeac5 --- /dev/null +++ b/x-pack/apm-server/sampling/eventstorage/codec.go @@ -0,0 +1 @@ +package eventstorage diff --git a/x-pack/apm-server/sampling/eventstorage/jsoncodec.go b/x-pack/apm-server/sampling/eventstorage/jsoncodec.go new file mode 100644 index 00000000000..84fb383e0b1 --- /dev/null +++ b/x-pack/apm-server/sampling/eventstorage/jsoncodec.go @@ -0,0 +1,34 @@ +package eventstorage + +import ( + "encoding/json" + + // NOTE(axw) encoding/json is faster for encoding, + // json-iterator is faster for decoding. + jsoniter "github.com/json-iterator/go" + + "github.com/elastic/apm-server/model" +) + +// JSONCodec is an implementation of Codec, using JSON encoding. +type JSONCodec struct{} + +// DecodeSpan decodes data as JSON into span. +func (JSONCodec) DecodeSpan(data []byte, span *model.Span) error { + return jsoniter.ConfigFastest.Unmarshal(data, span) +} + +// DecodeTransaction decodes data as JSON into tx. +func (JSONCodec) DecodeTransaction(data []byte, tx *model.Transaction) error { + return jsoniter.ConfigFastest.Unmarshal(data, tx) +} + +// EncodeSpan encodes span as JSON. +func (JSONCodec) EncodeSpan(span *model.Span) ([]byte, error) { + return json.Marshal(span) +} + +// EncodeTransaction encodes tx as JSON. +func (JSONCodec) EncodeTransaction(tx *model.Transaction) ([]byte, error) { + return json.Marshal(tx) +} diff --git a/x-pack/apm-server/sampling/eventstorage/logger.go b/x-pack/apm-server/sampling/eventstorage/logger.go new file mode 100644 index 00000000000..bf60f4ebce8 --- /dev/null +++ b/x-pack/apm-server/sampling/eventstorage/logger.go @@ -0,0 +1,17 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package eventstorage + +import "github.com/elastic/beats/v7/libbeat/logp" + +// LogpAdaptor adapts logp.Logger to the badger.Logger interface. +type LogpAdaptor struct { + *logp.Logger +} + +// Warningf adapts badger.Logger.Warningf to logp.Logger.Warngf. +func (a LogpAdaptor) Warningf(format string, args ...interface{}) { + a.Warnf(format, args...) +} diff --git a/x-pack/apm-server/sampling/eventstorage/sharded.go b/x-pack/apm-server/sampling/eventstorage/sharded.go new file mode 100644 index 00000000000..c0cb1089a75 --- /dev/null +++ b/x-pack/apm-server/sampling/eventstorage/sharded.go @@ -0,0 +1,135 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package eventstorage + +import ( + "runtime" + "sync" + + "github.com/cespare/xxhash/v2" + "github.com/hashicorp/go-multierror" + + "github.com/elastic/apm-server/model" +) + +// ShardedReadWriter provides sharded, locked, access to a Storage. +// +// ShardedReadWriter shards on trace ID. +type ShardedReadWriter struct { + readWriters []lockedReadWriter +} + +func newShardedReadWriter(storage *Storage) *ShardedReadWriter { + s := &ShardedReadWriter{ + // Create as many ReadWriters as there are CPUs, + // so we can ideally minimise lock contention. + readWriters: make([]lockedReadWriter, runtime.NumCPU()), + } + for i := range s.readWriters { + s.readWriters[i].rw = storage.NewReadWriter() + } + return s +} + +// Close closes all sharded storage readWriters. +func (s *ShardedReadWriter) Close() { + for i := range s.readWriters { + s.readWriters[i].Close() + } +} + +// Flush flushes all sharded storage readWriters. +func (s *ShardedReadWriter) Flush() error { + var result error + for i := range s.readWriters { + if err := s.readWriters[i].Flush(); err != nil { + result = multierror.Append(result, err) + } + } + return result +} + +// ReadEvents calls Writer.ReadEvents, using a sharded, locked, Writer. +func (s *ShardedReadWriter) ReadEvents(traceID string, out *model.Batch) error { + return s.getWriter(traceID).ReadEvents(traceID, out) +} + +// WriteTransaction calls Writer.WriteTransaction, using a sharded, locked, Writer. +func (s *ShardedReadWriter) WriteTransaction(tx *model.Transaction) error { + return s.getWriter(tx.TraceID).WriteTransaction(tx) +} + +// WriteSpan calls Writer.WriteSpan, using a sharded, locked, Writer. +func (s *ShardedReadWriter) WriteSpan(span *model.Span) error { + return s.getWriter(span.TraceID).WriteSpan(span) +} + +// WriteTraceSampled calls Writer.WriteTraceSampled, using a sharded, locked, Writer. +func (s *ShardedReadWriter) WriteTraceSampled(traceID string, sampled bool) error { + return s.getWriter(traceID).WriteTraceSampled(traceID, sampled) +} + +// IsTraceSampled calls Writer.IsTraceSampled, using a sharded, locked, Writer. +func (s *ShardedReadWriter) IsTraceSampled(traceID string) (bool, error) { + return s.getWriter(traceID).IsTraceSampled(traceID) +} + +// getWriter returns an event storage writer for the given trace ID. +// +// This method is idempotent, which is necessary to avoid transaction +// conflicts and ensure all events are reported once a sampling decision +// has been recorded. +func (s *ShardedReadWriter) getWriter(traceID string) *lockedReadWriter { + var h xxhash.Digest + h.WriteString(traceID) + return &s.readWriters[h.Sum64()%uint64(len(s.readWriters))] +} + +type lockedReadWriter struct { + mu sync.Mutex + rw *ReadWriter +} + +func (rw *lockedReadWriter) Close() { + rw.mu.Lock() + defer rw.mu.Unlock() + rw.rw.Close() +} + +func (rw *lockedReadWriter) Flush() error { + rw.mu.Lock() + defer rw.mu.Unlock() + return rw.rw.Flush() +} + +func (rw *lockedReadWriter) ReadEvents(traceID string, out *model.Batch) error { + rw.mu.Lock() + defer rw.mu.Unlock() + return rw.rw.ReadEvents(traceID, out) +} + +func (rw *lockedReadWriter) WriteTransaction(tx *model.Transaction) error { + rw.mu.Lock() + defer rw.mu.Unlock() + return rw.rw.WriteTransaction(tx) +} + +func (rw *lockedReadWriter) WriteSpan(s *model.Span) error { + rw.mu.Lock() + defer rw.mu.Unlock() + return rw.rw.WriteSpan(s) +} + +func (rw *lockedReadWriter) WriteTraceSampled(traceID string, sampled bool) error { + rw.mu.Lock() + defer rw.mu.Unlock() + return rw.rw.WriteTraceSampled(traceID, sampled) +} + +func (rw *lockedReadWriter) IsTraceSampled(traceID string) (bool, error) { + rw.mu.Lock() + defer rw.mu.Unlock() + return rw.rw.IsTraceSampled(traceID) +} diff --git a/x-pack/apm-server/sampling/eventstorage/sharded_bench_test.go b/x-pack/apm-server/sampling/eventstorage/sharded_bench_test.go new file mode 100644 index 00000000000..94426341532 --- /dev/null +++ b/x-pack/apm-server/sampling/eventstorage/sharded_bench_test.go @@ -0,0 +1,51 @@ +package eventstorage_test + +import ( + "testing" + "time" + + "github.com/gofrs/uuid" + + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage" +) + +func BenchmarkShardedWriteTransactionUncontended(b *testing.B) { + db := newBadgerDB(b, badgerOptions) + ttl := time.Minute + store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl) + sharded := store.NewShardedReadWriter() + defer sharded.Close() + + b.RunParallel(func(pb *testing.PB) { + traceUUID := uuid.Must(uuid.NewV4()) + transaction := &model.Transaction{TraceID: traceUUID.String(), ID: traceUUID.String()} + for pb.Next() { + if err := sharded.WriteTransaction(transaction); err != nil { + b.Fatal(err) + } + } + }) +} + +func BenchmarkShardedWriteTransactionContended(b *testing.B) { + db := newBadgerDB(b, badgerOptions) + ttl := time.Minute + store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl) + sharded := store.NewShardedReadWriter() + defer sharded.Close() + + // Use a single trace ID, causing all events to go through + // the same sharded writer, contending for a single lock. + traceUUID := uuid.Must(uuid.NewV4()) + + b.RunParallel(func(pb *testing.PB) { + transactionUUID := uuid.Must(uuid.NewV4()) + transaction := &model.Transaction{TraceID: traceUUID.String(), ID: transactionUUID.String()} + for pb.Next() { + if err := sharded.WriteTransaction(transaction); err != nil { + b.Fatal(err) + } + } + }) +} diff --git a/x-pack/apm-server/sampling/eventstorage/storage.go b/x-pack/apm-server/sampling/eventstorage/storage.go new file mode 100644 index 00000000000..cb633e4bf45 --- /dev/null +++ b/x-pack/apm-server/sampling/eventstorage/storage.go @@ -0,0 +1,228 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package eventstorage + +import ( + "errors" + "time" + + "github.com/dgraph-io/badger/v2" + + "github.com/elastic/apm-server/model" +) + +// TODO(axw) develop a more efficient codec (protobuf?) for persisting model objects. + +const ( + // NOTE(axw) these values (and their meanings) must remain stable + // over time, to avoid misinterpreting historical data. + entryMetaTraceSampled = 'S' + entryMetaTraceUnsampled = 'U' + entryMetaTransaction = 't' + entryMetaSpan = 's' +) + +// ErrNotFound is returned by by the Storage.IsTraceSampled method, +// for non-existing trace IDs. +var ErrNotFound = errors.New("key not found") + +// Storage provides storage for sampled transactions and spans, +// and for recording trace sampling decisions. +type Storage struct { + db *badger.DB + codec Codec + ttl time.Duration +} + +// Codec provides methods for encoding and decoding events. +type Codec interface { + DecodeSpan([]byte, *model.Span) error + DecodeTransaction([]byte, *model.Transaction) error + EncodeSpan(*model.Span) ([]byte, error) + EncodeTransaction(*model.Transaction) ([]byte, error) +} + +// New returns a new Storage using db and codec. +// +// Storage entries expire after ttl. +func New(db *badger.DB, codec Codec, ttl time.Duration) *Storage { + return &Storage{db: db, codec: codec, ttl: ttl} +} + +// NewShardedReadWriter returns a new ShardedReadWriter, for sharded +// reading and writing. +// +// The returned ShardedReadWriter must be closed when it is no longer +// needed. +func (s *Storage) NewShardedReadWriter() *ShardedReadWriter { + return newShardedReadWriter(s) +} + +// NewReadWriter returns a new ReadWriter for reading events from and +// writing events to storage. +// +// The returned ReadWriter must be closed when it is no longer needed. +func (s *Storage) NewReadWriter() *ReadWriter { + return &ReadWriter{ + s: s, + txn: s.db.NewTransaction(true), + } +} + +// ReadWriter provides a means of reading events from storage, and batched +// writing of events to storage. +// +// ReadWriter is not safe for concurrent access. All operations that involve +// a given trace ID should be performed with the same ReadWriter in order to +// avoid conflicts, e.g. by using consistent hashing to distribute to one of +// a set of ReadWriters, such as implemented by ShardedReadWriter. +type ReadWriter struct { + s *Storage + txn *badger.Txn + pendingWrites int + + // readKeyBuf is a reusable buffer for keys used in read operations. + // This must not be used in write operations, as keys are expected to + // be unmodified until the end of a transaction. + readKeyBuf []byte +} + +// Close closes the writer. Any writes that have not been flushed may be lost. +// +// This must be called when the writer is no longer needed, in order to reclaim +// resources. +func (rw *ReadWriter) Close() { + rw.txn.Discard() +} + +// Flush waits for preceding writes to be committed to storage. +// +// Flush must be called to ensure writes are committed to storage. +// If Flush is not called before the writer is closed, then writes +// may be lost. +func (rw *ReadWriter) Flush() error { + err := rw.txn.Commit() + rw.txn = rw.s.db.NewTransaction(true) + rw.pendingWrites = 0 + return err +} + +// WriteTraceSampled records the tail-sampling decision for the given trace ID. +func (rw *ReadWriter) WriteTraceSampled(traceID string, sampled bool) error { + key := []byte(traceID) + var meta uint8 = entryMetaTraceUnsampled + if sampled { + meta = entryMetaTraceSampled + } + entry := badger.NewEntry(key[:], nil).WithMeta(meta) + return rw.writeEntry(entry.WithTTL(rw.s.ttl)) +} + +// IsTraceSampled reports whether traceID belongs to a trace that is sampled +// or unsampled. If no sampling decision has been recorded, IsTraceSampled +// returns ErrNotFound. +func (rw *ReadWriter) IsTraceSampled(traceID string) (bool, error) { + rw.readKeyBuf = append(rw.readKeyBuf[:0], traceID...) + item, err := rw.txn.Get(rw.readKeyBuf) + if err != nil { + if err == badger.ErrKeyNotFound { + return false, ErrNotFound + } + return false, err + } + return item.UserMeta() == entryMetaTraceSampled, nil +} + +// WriteTransaction writes tx to storage. +// +// WriteTransaction may return before the write is committed to storage. +// Call Flush to ensure the write is committed. +func (rw *ReadWriter) WriteTransaction(tx *model.Transaction) error { + if tx.Sampled != nil && !*tx.Sampled { + return errors.New("transaction is not sampled") + } + key := append(append([]byte(tx.TraceID), ':'), tx.ID...) + data, err := rw.s.codec.EncodeTransaction(tx) + if err != nil { + return err + } + return rw.writeEvent(key[:], data, entryMetaTransaction) +} + +// WriteSpan writes span to storage. +// +// WriteSpan may return before the write is committed to storage. +// Call Flush to ensure the write is committed. +func (rw *ReadWriter) WriteSpan(span *model.Span) error { + key := append(append([]byte(span.TraceID), ':'), span.ID...) + data, err := rw.s.codec.EncodeSpan(span) + if err != nil { + return err + } + return rw.writeEvent(key[:], data, entryMetaSpan) +} + +func (rw *ReadWriter) writeEvent(key, value []byte, meta byte) error { + return rw.writeEntry(badger.NewEntry(key, value).WithMeta(meta).WithTTL(rw.s.ttl)) +} + +func (rw *ReadWriter) writeEntry(e *badger.Entry) error { + rw.pendingWrites++ + err := rw.txn.SetEntry(e) + if err != badger.ErrTxnTooBig { + return err + } + if err := rw.Flush(); err != nil { + return err + } + return rw.txn.SetEntry(e) +} + +// ReadEvents reads events with the given trace ID from storage into a batch. +func (rw *ReadWriter) ReadEvents(traceID string, out *model.Batch) error { + opts := badger.DefaultIteratorOptions + rw.readKeyBuf = append(append(rw.readKeyBuf[:0], traceID...), ':') + opts.Prefix = rw.readKeyBuf + + // NewIterator slows down with uncommitted writes, as it must sort + // all keys lexicographically. If there are a significant number of + // writes pending, flush first. + if rw.pendingWrites > 100 { + if err := rw.Flush(); err != nil { + return err + } + } + + iter := rw.txn.NewIterator(opts) + defer iter.Close() + for iter.Rewind(); iter.Valid(); iter.Next() { + item := iter.Item() + if item.IsDeletedOrExpired() { + continue + } + switch item.UserMeta() { + case entryMetaTransaction: + var event model.Transaction + if err := item.Value(func(data []byte) error { + return rw.s.codec.DecodeTransaction(data, &event) + }); err != nil { + return err + } + out.Transactions = append(out.Transactions, &event) + case entryMetaSpan: + var event model.Span + if err := item.Value(func(data []byte) error { + return rw.s.codec.DecodeSpan(data, &event) + }); err != nil { + return err + } + out.Spans = append(out.Spans, &event) + default: + // Unknown entry meta: ignore. + continue + } + } + return nil +} diff --git a/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go b/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go new file mode 100644 index 00000000000..90a9e60a527 --- /dev/null +++ b/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go @@ -0,0 +1,162 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package eventstorage_test + +import ( + "encoding/hex" + "fmt" + "testing" + "time" + + "github.com/gofrs/uuid" + "github.com/stretchr/testify/assert" + + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage" +) + +func BenchmarkWriteTransaction(b *testing.B) { + test := func(b *testing.B, codec eventstorage.Codec) { + db := newBadgerDB(b, badgerOptions) + ttl := time.Minute + store := eventstorage.New(db, codec, ttl) + readWriter := store.NewReadWriter() + defer readWriter.Close() + + traceID := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} + transactionID := []byte{1, 2, 3, 4, 5, 6, 7, 8} + transaction := &model.Transaction{ + TraceID: hex.EncodeToString(traceID), + ID: hex.EncodeToString(transactionID), + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := readWriter.WriteTransaction(transaction); err != nil { + b.Fatal(err) + } + } + assert.NoError(b, readWriter.Flush()) + } + b.Run("json_codec", func(b *testing.B) { + test(b, eventstorage.JSONCodec{}) + }) + b.Run("nop_codec", func(b *testing.B) { + // This tests the eventstorage performance without + // JSON encoding. This would be the theoretical + // upper limit of what we can achieve with a more + // efficient codec. + test(b, nopCodec{}) + }) +} + +func BenchmarkReadEvents(b *testing.B) { + traceUUID := uuid.Must(uuid.NewV4()) + + test := func(b *testing.B, codec eventstorage.Codec) { + // Test with varying numbers of events in the trace. + counts := []int{0, 1, 10, 100, 1000} + for _, count := range counts { + b.Run(fmt.Sprintf("%d events", count), func(b *testing.B) { + db := newBadgerDB(b, badgerOptions) + ttl := time.Minute + store := eventstorage.New(db, codec, ttl) + readWriter := store.NewReadWriter() + defer readWriter.Close() + + for i := 0; i < count; i++ { + transactionUUID := uuid.Must(uuid.NewV4()) + transaction := &model.Transaction{ + TraceID: traceUUID.String(), + ID: transactionUUID.String(), + } + if err := readWriter.WriteTransaction(transaction); err != nil { + b.Fatal(err) + } + } + + // NOTE(axw) we don't explicitly flush, which is most representative of + // real workloads. For larger event counts, this ensures we exercise the + // code path which automatically flushes before reads. + + b.ResetTimer() + var batch model.Batch + for i := 0; i < b.N; i++ { + batch.Reset() + if err := readWriter.ReadEvents(traceUUID.String(), &batch); err != nil { + b.Fatal(err) + } + if batch.Len() != count { + panic(fmt.Errorf( + "event count mismatch: expected %d, got %d", + count, batch.Len(), + )) + } + } + }) + } + } + + b.Run("json_codec", func(b *testing.B) { + test(b, eventstorage.JSONCodec{}) + }) + b.Run("nop_codec", func(b *testing.B) { + // This tests the eventstorage performance without + // JSON decoding. This would be the theoretical + // upper limit of what we can achieve with a more + // efficient codec. + test(b, nopCodec{}) + }) +} + +func BenchmarkIsTraceSampled(b *testing.B) { + sampledTraceUUID := uuid.Must(uuid.NewV4()) + unsampledTraceUUID := uuid.Must(uuid.NewV4()) + unknownTraceUUID := uuid.Must(uuid.NewV4()) + + // Test with varying numbers of events in the trace. + db := newBadgerDB(b, badgerOptions) + ttl := time.Minute + store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl) + readWriter := store.NewReadWriter() + defer readWriter.Close() + + if err := readWriter.WriteTraceSampled(sampledTraceUUID.String(), true); err != nil { + b.Fatal(err) + } + if err := readWriter.WriteTraceSampled(unsampledTraceUUID.String(), false); err != nil { + b.Fatal(err) + } + + bench := func(name string, traceID string, expectError bool, expectSampled bool) { + b.Run(name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + sampled, err := readWriter.IsTraceSampled(traceID) + if expectError { + if err == nil { + b.Fatal("expected error") + } + } else { + if err != nil { + b.Fatal(err) + } + if sampled != expectSampled { + b.Fatalf("expected %v, got %v", expectSampled, sampled) + } + } + } + }) + } + bench("sampled", sampledTraceUUID.String(), false, true) + bench("unsampled", unsampledTraceUUID.String(), false, false) + bench("unknown", unknownTraceUUID.String(), true, false) +} + +type nopCodec struct{} + +func (nopCodec) DecodeSpan(data []byte, span *model.Span) error { return nil } +func (nopCodec) DecodeTransaction(data []byte, tx *model.Transaction) error { return nil } +func (nopCodec) EncodeSpan(*model.Span) ([]byte, error) { return nil, nil } +func (nopCodec) EncodeTransaction(*model.Transaction) ([]byte, error) { return nil, nil } diff --git a/x-pack/apm-server/sampling/eventstorage/storage_test.go b/x-pack/apm-server/sampling/eventstorage/storage_test.go new file mode 100644 index 00000000000..947e7ee03e6 --- /dev/null +++ b/x-pack/apm-server/sampling/eventstorage/storage_test.go @@ -0,0 +1,266 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package eventstorage_test + +import ( + "encoding/json" + "testing" + "time" + + "github.com/dgraph-io/badger/v2" + "github.com/gofrs/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage" +) + +func TestWriteEvents(t *testing.T) { + // Run two tests: + // - 1 transaction and 1 span + // - 1 transaction and 100 spans + // + // The latter test will cause ReadEvents to implicitly call flush. + t.Run("no_flush", func(t *testing.T) { + testWriteEvents(t, 1) + }) + t.Run("implicit_flush", func(t *testing.T) { + testWriteEvents(t, 100) + }) +} + +func testWriteEvents(t *testing.T, numSpans int) { + db := newBadgerDB(t, badgerOptions) + ttl := time.Minute + store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl) + readWriter := store.NewShardedReadWriter() + defer readWriter.Close() + + before := time.Now() + + traceUUID := uuid.Must(uuid.NewV4()) + transactionUUID := uuid.Must(uuid.NewV4()) + transaction := &model.Transaction{ + TraceID: traceUUID.String(), + ID: transactionUUID.String(), + } + assert.NoError(t, readWriter.WriteTransaction(transaction)) + + var spans []*model.Span + for i := 0; i < numSpans; i++ { + spanUUID := uuid.Must(uuid.NewV4()) + span := &model.Span{ + TraceID: traceUUID.String(), + ID: spanUUID.String(), + } + assert.NoError(t, readWriter.WriteSpan(span)) + spans = append(spans, span) + } + + // We can read our writes without flushing. + var batch model.Batch + assert.NoError(t, readWriter.ReadEvents(traceUUID.String(), &batch)) + assert.ElementsMatch(t, []*model.Transaction{transaction}, batch.Transactions) + assert.ElementsMatch(t, spans, batch.Spans) + + // Flush in order for the writes to be visible to other readers. + assert.NoError(t, readWriter.Flush()) + + var recorded []interface{} + assert.NoError(t, db.View(func(txn *badger.Txn) error { + iter := txn.NewIterator(badger.IteratorOptions{ + Prefix: []byte(traceUUID.String()), + }) + defer iter.Close() + for iter.Rewind(); iter.Valid(); iter.Next() { + item := iter.Item() + expiresAt := item.ExpiresAt() + expiryTime := time.Unix(int64(expiresAt), 0) + assert.Condition(t, func() bool { + return !before.After(expiryTime) && !expiryTime.After(before.Add(ttl)) + }) + + var value interface{} + switch meta := item.UserMeta(); meta { + case 't': + value = &model.Transaction{} + case 's': + value = &model.Span{} + default: + t.Fatalf("invalid meta %q", meta) + } + assert.NoError(t, item.Value(func(data []byte) error { + return json.Unmarshal(data, value) + })) + recorded = append(recorded, value) + } + return nil + })) + assert.ElementsMatch(t, batch.Transformables(), recorded) +} + +func TestWriteTraceSampled(t *testing.T) { + db := newBadgerDB(t, badgerOptions) + ttl := time.Minute + store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl) + readWriter := store.NewShardedReadWriter() + defer readWriter.Close() + + before := time.Now() + assert.NoError(t, readWriter.WriteTraceSampled("sampled_trace_id", true)) + assert.NoError(t, readWriter.WriteTraceSampled("unsampled_trace_id", false)) + + // We can read our writes without flushing. + isSampled, err := readWriter.IsTraceSampled("sampled_trace_id") + assert.NoError(t, err) + assert.True(t, isSampled) + + // Flush in order for the writes to be visible to other readers. + assert.NoError(t, readWriter.Flush()) + + sampled := make(map[string]bool) + assert.NoError(t, db.View(func(txn *badger.Txn) error { + iter := txn.NewIterator(badger.IteratorOptions{}) + defer iter.Close() + for iter.Rewind(); iter.Valid(); iter.Next() { + item := iter.Item() + expiresAt := item.ExpiresAt() + expiryTime := time.Unix(int64(expiresAt), 0) + assert.Condition(t, func() bool { + return !before.After(expiryTime) && !expiryTime.After(before.Add(ttl)) + }) + + key := string(item.Key()) + switch meta := item.UserMeta(); meta { + case 'S': + sampled[key] = true + case 'U': + sampled[key] = false + default: + t.Fatalf("invalid meta %q", meta) + } + assert.Zero(t, item.ValueSize()) + } + return nil + })) + assert.Equal(t, map[string]bool{ + "sampled_trace_id": true, + "unsampled_trace_id": false, + }, sampled) +} + +func TestReadEvents(t *testing.T) { + db := newBadgerDB(t, badgerOptions) + ttl := time.Minute + store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl) + + traceID := [...]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} + require.NoError(t, db.Update(func(txn *badger.Txn) error { + key := append(traceID[:], ":12345678"...) + value := []byte(`{"name":"transaction"}`) + if err := txn.SetEntry(badger.NewEntry(key, value).WithMeta('t')); err != nil { + return err + } + + key = append(traceID[:], ":87654321"...) + value = []byte(`{"name":"span"}`) + if err := txn.SetEntry(badger.NewEntry(key, value).WithMeta('s')); err != nil { + return err + } + + // Write an entry with the trace ID as a prefix, but with no + // proceeding colon, causing it to be ignored. + key = append(traceID[:], "nocolon"...) + value = []byte(`not-json`) + if err := txn.SetEntry(badger.NewEntry(key, value).WithMeta('s')); err != nil { + return err + } + + // Write an entry with an unknown meta value. It will be ignored. + key = append(traceID[:], ":11111111"...) + value = []byte(`not-json`) + if err := txn.SetEntry(badger.NewEntry(key, value).WithMeta('?')); err != nil { + return err + } + return nil + })) + + reader := store.NewShardedReadWriter() + defer reader.Close() + + var events model.Batch + assert.NoError(t, reader.ReadEvents(string(traceID[:]), &events)) + assert.Equal(t, []*model.Transaction{{Name: "transaction"}}, events.Transactions) + assert.Equal(t, []*model.Span{{Name: "span"}}, events.Spans) +} + +func TestReadEventsDecodeError(t *testing.T) { + db := newBadgerDB(t, badgerOptions) + ttl := time.Minute + store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl) + + traceID := [...]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} + require.NoError(t, db.Update(func(txn *badger.Txn) error { + key := append(traceID[:], ":12345678"...) + value := []byte(`wat`) + if err := txn.SetEntry(badger.NewEntry(key, value).WithMeta('t')); err != nil { + return err + } + return nil + })) + + reader := store.NewShardedReadWriter() + defer reader.Close() + + var events model.Batch + err := reader.ReadEvents(string(traceID[:]), &events) + assert.Error(t, err) +} + +func TestIsTraceSampled(t *testing.T) { + db := newBadgerDB(t, badgerOptions) + ttl := time.Minute + store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl) + + require.NoError(t, db.Update(func(txn *badger.Txn) error { + if err := txn.SetEntry(badger.NewEntry([]byte("sampled_trace_id"), nil).WithMeta('S')); err != nil { + return err + } + if err := txn.SetEntry(badger.NewEntry([]byte("unsampled_trace_id"), nil).WithMeta('U')); err != nil { + return err + } + return nil + })) + + reader := store.NewShardedReadWriter() + defer reader.Close() + + sampled, err := reader.IsTraceSampled("sampled_trace_id") + assert.NoError(t, err) + assert.True(t, sampled) + + sampled, err = reader.IsTraceSampled("unsampled_trace_id") + assert.NoError(t, err) + assert.False(t, sampled) + + _, err = reader.IsTraceSampled("unknown_trace_id") + assert.Equal(t, err, eventstorage.ErrNotFound) +} + +func badgerOptions() badger.Options { + return badger.DefaultOptions("").WithInMemory(true).WithLogger(nil) +} + +type badgerOptionsFunc func() badger.Options + +func newBadgerDB(tb testing.TB, badgerOptions badgerOptionsFunc) *badger.DB { + db, err := badger.Open(badgerOptions()) + if err != nil { + panic(err) + } + tb.Cleanup(func() { db.Close() }) + return db +}