diff --git a/NOTICE.txt b/NOTICE.txt index 3103abb5838..d10bc37cbc2 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -422,7 +422,7 @@ Contents of "LICENSE": -------------------------------------------------------------------- Dependency: github.com/dustin/go-humanize -Revision: bb3d318650d4 +Version: v1.0.0 License type (autodetected): MIT Contents of "LICENSE": diff --git a/go.mod b/go.mod index 40e4854d625..40c4faec44c 100644 --- a/go.mod +++ b/go.mod @@ -9,10 +9,12 @@ require ( github.com/cespare/xxhash/v2 v2.1.1 github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e github.com/client9/misspell v0.3.5-0.20180309020325-c0b55c823952 // indirect + github.com/dgraph-io/badger/v2 v2.0.3 + github.com/dgraph-io/ristretto v0.0.2 // indirect github.com/dlclark/regexp2 v1.2.1 // indirect github.com/dop251/goja v0.0.0-20200824171909-536f9d946569 // indirect github.com/dop251/goja_nodejs v0.0.0-20200811150831-9bc458b4bbeb // indirect - github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4 + github.com/dustin/go-humanize v1.0.0 github.com/elastic/apm-server/approvaltest v0.0.0-00010101000000-000000000000 github.com/elastic/beats/v7 v7.0.0-alpha2.0.20200824113715-49e8024953a4 github.com/elastic/go-elasticsearch/v7 v7.8.0 @@ -21,21 +23,23 @@ require ( github.com/elastic/go-licenser v0.3.1 github.com/elastic/go-sysinfo v1.4.0 // indirect github.com/elastic/go-ucfg v0.8.3 - github.com/fatih/color v1.9.0 + github.com/fatih/color v1.9.0 // indirect github.com/go-sourcemap/sourcemap v2.1.3+incompatible github.com/gofrs/uuid v3.3.0+incompatible github.com/gogo/googleapis v1.3.1-0.20190914144012-b8d18e97a9a1 // indirect github.com/golang/protobuf v1.4.2 github.com/google/addlicense v0.0.0-20190907113143-be125746c2c4 // indirect - github.com/google/go-cmp v0.5.2 github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99 + github.com/hashicorp/go-multierror v1.1.0 github.com/hashicorp/golang-lru v0.5.3 github.com/ianlancetaylor/demangle v0.0.0-20200715173712-053cf528c12f // indirect github.com/jaegertracing/jaeger v1.16.0 github.com/jcmturner/gofork v1.0.0 // indirect github.com/josephspurrier/goversioninfo v1.2.0 // indirect + github.com/json-iterator/go v1.1.8 github.com/jstemmer/go-junit-report v0.9.1 github.com/klauspost/compress v1.9.3-0.20191122130757-c099ac9f21dd // indirect + github.com/kr/pretty v0.2.0 // indirect github.com/magefile/mage v1.10.0 github.com/mattn/go-colorable v0.1.7 // indirect github.com/mitchellh/hashstructure v1.0.0 // indirect diff --git a/go.sum b/go.sum index 7a38581ca15..f2e559a2de0 100644 --- a/go.sum +++ b/go.sum @@ -63,6 +63,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DATA-DOG/godog v0.7.13/go.mod h1:z2OZ6a3X0/YAKVqLfVzYBwFt3j6uSt3Xrqa7XTtcQE0= github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= +github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM= +github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/Masterminds/semver v1.4.2 h1:WBLTQ37jOCzSLtXNdoo8bNM8876KhNqOKvrlGITgsTc= github.com/Masterminds/semver v1.4.2/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= github.com/Microsoft/go-winio v0.4.15-0.20190919025122-fc70bd9a86b5 h1:ygIc8M6trr62pF5DucadTWGdEB4mEyvzi0e2nbcmcyA= @@ -70,6 +72,7 @@ github.com/Microsoft/go-winio v0.4.15-0.20190919025122-fc70bd9a86b5/go.mod h1:tT github.com/Microsoft/hcsshim v0.8.7/go.mod h1:OHd7sQqRFrYd3RmSgbgji+ctCwkbq2wbEYNSzOYtcBQ= github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/OneOfOne/xxhash v1.2.5 h1:zl/OfRA6nftbBK9qTohYBJ5xvw6C/oNKizR7cZGl3cI= github.com/OneOfOne/xxhash v1.2.5/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= github.com/OpenPeeDeeP/depguard v1.0.1 h1:VlW4R6jmBIv3/u1JNlawEvJMM4J+dPORPaZasQee8Us= github.com/OpenPeeDeeP/depguard v1.0.1/go.mod h1:xsIw86fROiiwelg+jB2uM9PiKihMMmUx/1V+TNhjQvM= @@ -200,9 +203,16 @@ github.com/davecgh/go-xdr v0.0.0-20161123171359-e6a2ba005892/go.mod h1:CTDl0pzVz github.com/denisenkom/go-mssqldb v0.0.0-20200206145737-bbfc9a55622e/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU= github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY= github.com/devigned/tab v0.1.2-0.20190607222403-0c15cf42f9a2/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY= +github.com/dgraph-io/badger/v2 v2.0.3 h1:inzdf6VF/NZ+tJ8RwwYMjJMvsOALTHYdozn0qSl6XJI= +github.com/dgraph-io/badger/v2 v2.0.3/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM= +github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= +github.com/dgraph-io/ristretto v0.0.2 h1:a5WaUrDa0qm0YrAAS1tUykT5El3kt62KNZZeMxQn3po= +github.com/dgraph-io/ristretto v0.0.2/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgrijalva/jwt-go v3.2.1-0.20190620180102-5e25c22bd5d6+incompatible h1:4jGdduO4ceTJFKf0IhgaB8NJapGqKHwC2b4xQ/cXujM= github.com/dgrijalva/jwt-go v3.2.1-0.20190620180102-5e25c22bd5d6+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dgryski/go-sip13 v0.0.0-20190329191031-25c5027a8c7b/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/digitalocean/go-libvirt v0.0.0-20180301200012-6075ea3c39a1/go.mod h1:PRcPVAAma6zcLpFd4GZrjR/MRpood3TamjKI2m/z/Uw= @@ -225,6 +235,8 @@ github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6 h1:RrkoB0pT3gnj github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6/go.mod h1:hn7BA7c8pLvoGndExHudxTDKZ84Pyvv+90pbBjbTz0Y= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4 h1:qk/FSDDxo05wdJH28W+p5yivv7LuLYLRXPPD8KQCtZs= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= @@ -559,6 +571,7 @@ github.com/hashicorp/go-immutable-radix v1.1.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I= +github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI= github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA= @@ -654,6 +667,8 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxv github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= @@ -895,6 +910,7 @@ github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4k github.com/sourcegraph/go-diff v0.5.1 h1:gO6i5zugwzo1RVTvgvfwCOSVegNuvnNi6bAD1QCmkHs= github.com/sourcegraph/go-diff v0.5.1/go.mod h1:j2dHj3m8aZgQO8lMTcTnBcXkRRRqi34cd2MNlA9u1mE= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc= @@ -1168,6 +1184,7 @@ golang.org/x/sys v0.0.0-20190529164535-6a60838ec259/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190616124812-15dcb6c0061f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/x-pack/apm-server/sampling/eventstorage/codec.go b/x-pack/apm-server/sampling/eventstorage/codec.go new file mode 100644 index 00000000000..64078d0743f --- /dev/null +++ b/x-pack/apm-server/sampling/eventstorage/codec.go @@ -0,0 +1,5 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package eventstorage diff --git a/x-pack/apm-server/sampling/eventstorage/jsoncodec.go b/x-pack/apm-server/sampling/eventstorage/jsoncodec.go new file mode 100644 index 00000000000..4e040db1d2d --- /dev/null +++ b/x-pack/apm-server/sampling/eventstorage/jsoncodec.go @@ -0,0 +1,38 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package eventstorage + +import ( + "encoding/json" + + // NOTE(axw) encoding/json is faster for encoding, + // json-iterator is faster for decoding. + jsoniter "github.com/json-iterator/go" + + "github.com/elastic/apm-server/model" +) + +// JSONCodec is an implementation of Codec, using JSON encoding. +type JSONCodec struct{} + +// DecodeSpan decodes data as JSON into span. +func (JSONCodec) DecodeSpan(data []byte, span *model.Span) error { + return jsoniter.ConfigFastest.Unmarshal(data, span) +} + +// DecodeTransaction decodes data as JSON into tx. +func (JSONCodec) DecodeTransaction(data []byte, tx *model.Transaction) error { + return jsoniter.ConfigFastest.Unmarshal(data, tx) +} + +// EncodeSpan encodes span as JSON. +func (JSONCodec) EncodeSpan(span *model.Span) ([]byte, error) { + return json.Marshal(span) +} + +// EncodeTransaction encodes tx as JSON. +func (JSONCodec) EncodeTransaction(tx *model.Transaction) ([]byte, error) { + return json.Marshal(tx) +} diff --git a/x-pack/apm-server/sampling/eventstorage/logger.go b/x-pack/apm-server/sampling/eventstorage/logger.go new file mode 100644 index 00000000000..bf60f4ebce8 --- /dev/null +++ b/x-pack/apm-server/sampling/eventstorage/logger.go @@ -0,0 +1,17 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package eventstorage + +import "github.com/elastic/beats/v7/libbeat/logp" + +// LogpAdaptor adapts logp.Logger to the badger.Logger interface. +type LogpAdaptor struct { + *logp.Logger +} + +// Warningf adapts badger.Logger.Warningf to logp.Logger.Warngf. +func (a LogpAdaptor) Warningf(format string, args ...interface{}) { + a.Warnf(format, args...) +} diff --git a/x-pack/apm-server/sampling/eventstorage/sharded.go b/x-pack/apm-server/sampling/eventstorage/sharded.go new file mode 100644 index 00000000000..c0cb1089a75 --- /dev/null +++ b/x-pack/apm-server/sampling/eventstorage/sharded.go @@ -0,0 +1,135 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package eventstorage + +import ( + "runtime" + "sync" + + "github.com/cespare/xxhash/v2" + "github.com/hashicorp/go-multierror" + + "github.com/elastic/apm-server/model" +) + +// ShardedReadWriter provides sharded, locked, access to a Storage. +// +// ShardedReadWriter shards on trace ID. +type ShardedReadWriter struct { + readWriters []lockedReadWriter +} + +func newShardedReadWriter(storage *Storage) *ShardedReadWriter { + s := &ShardedReadWriter{ + // Create as many ReadWriters as there are CPUs, + // so we can ideally minimise lock contention. + readWriters: make([]lockedReadWriter, runtime.NumCPU()), + } + for i := range s.readWriters { + s.readWriters[i].rw = storage.NewReadWriter() + } + return s +} + +// Close closes all sharded storage readWriters. +func (s *ShardedReadWriter) Close() { + for i := range s.readWriters { + s.readWriters[i].Close() + } +} + +// Flush flushes all sharded storage readWriters. +func (s *ShardedReadWriter) Flush() error { + var result error + for i := range s.readWriters { + if err := s.readWriters[i].Flush(); err != nil { + result = multierror.Append(result, err) + } + } + return result +} + +// ReadEvents calls Writer.ReadEvents, using a sharded, locked, Writer. +func (s *ShardedReadWriter) ReadEvents(traceID string, out *model.Batch) error { + return s.getWriter(traceID).ReadEvents(traceID, out) +} + +// WriteTransaction calls Writer.WriteTransaction, using a sharded, locked, Writer. +func (s *ShardedReadWriter) WriteTransaction(tx *model.Transaction) error { + return s.getWriter(tx.TraceID).WriteTransaction(tx) +} + +// WriteSpan calls Writer.WriteSpan, using a sharded, locked, Writer. +func (s *ShardedReadWriter) WriteSpan(span *model.Span) error { + return s.getWriter(span.TraceID).WriteSpan(span) +} + +// WriteTraceSampled calls Writer.WriteTraceSampled, using a sharded, locked, Writer. +func (s *ShardedReadWriter) WriteTraceSampled(traceID string, sampled bool) error { + return s.getWriter(traceID).WriteTraceSampled(traceID, sampled) +} + +// IsTraceSampled calls Writer.IsTraceSampled, using a sharded, locked, Writer. +func (s *ShardedReadWriter) IsTraceSampled(traceID string) (bool, error) { + return s.getWriter(traceID).IsTraceSampled(traceID) +} + +// getWriter returns an event storage writer for the given trace ID. +// +// This method is idempotent, which is necessary to avoid transaction +// conflicts and ensure all events are reported once a sampling decision +// has been recorded. +func (s *ShardedReadWriter) getWriter(traceID string) *lockedReadWriter { + var h xxhash.Digest + h.WriteString(traceID) + return &s.readWriters[h.Sum64()%uint64(len(s.readWriters))] +} + +type lockedReadWriter struct { + mu sync.Mutex + rw *ReadWriter +} + +func (rw *lockedReadWriter) Close() { + rw.mu.Lock() + defer rw.mu.Unlock() + rw.rw.Close() +} + +func (rw *lockedReadWriter) Flush() error { + rw.mu.Lock() + defer rw.mu.Unlock() + return rw.rw.Flush() +} + +func (rw *lockedReadWriter) ReadEvents(traceID string, out *model.Batch) error { + rw.mu.Lock() + defer rw.mu.Unlock() + return rw.rw.ReadEvents(traceID, out) +} + +func (rw *lockedReadWriter) WriteTransaction(tx *model.Transaction) error { + rw.mu.Lock() + defer rw.mu.Unlock() + return rw.rw.WriteTransaction(tx) +} + +func (rw *lockedReadWriter) WriteSpan(s *model.Span) error { + rw.mu.Lock() + defer rw.mu.Unlock() + return rw.rw.WriteSpan(s) +} + +func (rw *lockedReadWriter) WriteTraceSampled(traceID string, sampled bool) error { + rw.mu.Lock() + defer rw.mu.Unlock() + return rw.rw.WriteTraceSampled(traceID, sampled) +} + +func (rw *lockedReadWriter) IsTraceSampled(traceID string) (bool, error) { + rw.mu.Lock() + defer rw.mu.Unlock() + return rw.rw.IsTraceSampled(traceID) +} diff --git a/x-pack/apm-server/sampling/eventstorage/sharded_bench_test.go b/x-pack/apm-server/sampling/eventstorage/sharded_bench_test.go new file mode 100644 index 00000000000..fd39b0dde15 --- /dev/null +++ b/x-pack/apm-server/sampling/eventstorage/sharded_bench_test.go @@ -0,0 +1,55 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package eventstorage_test + +import ( + "testing" + "time" + + "github.com/gofrs/uuid" + + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage" +) + +func BenchmarkShardedWriteTransactionUncontended(b *testing.B) { + db := newBadgerDB(b, badgerOptions) + ttl := time.Minute + store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl) + sharded := store.NewShardedReadWriter() + defer sharded.Close() + + b.RunParallel(func(pb *testing.PB) { + traceUUID := uuid.Must(uuid.NewV4()) + transaction := &model.Transaction{TraceID: traceUUID.String(), ID: traceUUID.String()} + for pb.Next() { + if err := sharded.WriteTransaction(transaction); err != nil { + b.Fatal(err) + } + } + }) +} + +func BenchmarkShardedWriteTransactionContended(b *testing.B) { + db := newBadgerDB(b, badgerOptions) + ttl := time.Minute + store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl) + sharded := store.NewShardedReadWriter() + defer sharded.Close() + + // Use a single trace ID, causing all events to go through + // the same sharded writer, contending for a single lock. + traceUUID := uuid.Must(uuid.NewV4()) + + b.RunParallel(func(pb *testing.PB) { + transactionUUID := uuid.Must(uuid.NewV4()) + transaction := &model.Transaction{TraceID: traceUUID.String(), ID: transactionUUID.String()} + for pb.Next() { + if err := sharded.WriteTransaction(transaction); err != nil { + b.Fatal(err) + } + } + }) +} diff --git a/x-pack/apm-server/sampling/eventstorage/storage.go b/x-pack/apm-server/sampling/eventstorage/storage.go new file mode 100644 index 00000000000..1e6648f2d89 --- /dev/null +++ b/x-pack/apm-server/sampling/eventstorage/storage.go @@ -0,0 +1,228 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package eventstorage + +import ( + "errors" + "time" + + "github.com/dgraph-io/badger/v2" + + "github.com/elastic/apm-server/model" +) + +const ( + // NOTE(axw) these values (and their meanings) must remain stable + // over time, to avoid misinterpreting historical data. + entryMetaTraceSampled = 's' + entryMetaTraceUnsampled = 'u' + entryMetaTransaction = 'T' + entryMetaSpan = 'S' +) + +// ErrNotFound is returned by by the Storage.IsTraceSampled method, +// for non-existing trace IDs. +var ErrNotFound = errors.New("key not found") + +// Storage provides storage for sampled transactions and spans, +// and for recording trace sampling decisions. +type Storage struct { + db *badger.DB + codec Codec + ttl time.Duration +} + +// Codec provides methods for encoding and decoding events. +type Codec interface { + DecodeSpan([]byte, *model.Span) error + DecodeTransaction([]byte, *model.Transaction) error + EncodeSpan(*model.Span) ([]byte, error) + EncodeTransaction(*model.Transaction) ([]byte, error) +} + +// New returns a new Storage using db and codec. +// +// Storage entries expire after ttl. +func New(db *badger.DB, codec Codec, ttl time.Duration) *Storage { + return &Storage{db: db, codec: codec, ttl: ttl} +} + +// NewShardedReadWriter returns a new ShardedReadWriter, for sharded +// reading and writing. +// +// The returned ShardedReadWriter must be closed when it is no longer +// needed. +func (s *Storage) NewShardedReadWriter() *ShardedReadWriter { + return newShardedReadWriter(s) +} + +// NewReadWriter returns a new ReadWriter for reading events from and +// writing events to storage. +// +// The returned ReadWriter must be closed when it is no longer needed. +func (s *Storage) NewReadWriter() *ReadWriter { + return &ReadWriter{ + s: s, + txn: s.db.NewTransaction(true), + } +} + +// ReadWriter provides a means of reading events from storage, and batched +// writing of events to storage. +// +// ReadWriter is not safe for concurrent access. All operations that involve +// a given trace ID should be performed with the same ReadWriter in order to +// avoid conflicts, e.g. by using consistent hashing to distribute to one of +// a set of ReadWriters, such as implemented by ShardedReadWriter. +type ReadWriter struct { + s *Storage + txn *badger.Txn + pendingWrites int + + // readKeyBuf is a reusable buffer for keys used in read operations. + // This must not be used in write operations, as keys are expected to + // be unmodified until the end of a transaction. + readKeyBuf []byte +} + +// Close closes the writer. Any writes that have not been flushed may be lost. +// +// This must be called when the writer is no longer needed, in order to reclaim +// resources. +func (rw *ReadWriter) Close() { + rw.txn.Discard() +} + +// Flush waits for preceding writes to be committed to storage. +// +// Flush must be called to ensure writes are committed to storage. +// If Flush is not called before the writer is closed, then writes +// may be lost. +func (rw *ReadWriter) Flush() error { + err := rw.txn.Commit() + rw.txn = rw.s.db.NewTransaction(true) + rw.pendingWrites = 0 + return err +} + +// WriteTraceSampled records the tail-sampling decision for the given trace ID. +func (rw *ReadWriter) WriteTraceSampled(traceID string, sampled bool) error { + key := []byte(traceID) + var meta uint8 = entryMetaTraceUnsampled + if sampled { + meta = entryMetaTraceSampled + } + entry := badger.NewEntry(key[:], nil).WithMeta(meta) + return rw.writeEntry(entry.WithTTL(rw.s.ttl)) +} + +// IsTraceSampled reports whether traceID belongs to a trace that is sampled +// or unsampled. If no sampling decision has been recorded, IsTraceSampled +// returns ErrNotFound. +func (rw *ReadWriter) IsTraceSampled(traceID string) (bool, error) { + rw.readKeyBuf = append(rw.readKeyBuf[:0], traceID...) + item, err := rw.txn.Get(rw.readKeyBuf) + if err != nil { + if err == badger.ErrKeyNotFound { + return false, ErrNotFound + } + return false, err + } + return item.UserMeta() == entryMetaTraceSampled, nil +} + +// WriteTransaction writes tx to storage. +// +// WriteTransaction may return before the write is committed to storage. +// Call Flush to ensure the write is committed. +func (rw *ReadWriter) WriteTransaction(tx *model.Transaction) error { + key := append(append([]byte(tx.TraceID), ':'), tx.ID...) + data, err := rw.s.codec.EncodeTransaction(tx) + if err != nil { + return err + } + return rw.writeEvent(key[:], data, entryMetaTransaction) +} + +// WriteSpan writes span to storage. +// +// WriteSpan may return before the write is committed to storage. +// Call Flush to ensure the write is committed. +func (rw *ReadWriter) WriteSpan(span *model.Span) error { + key := append(append([]byte(span.TraceID), ':'), span.ID...) + data, err := rw.s.codec.EncodeSpan(span) + if err != nil { + return err + } + return rw.writeEvent(key[:], data, entryMetaSpan) +} + +func (rw *ReadWriter) writeEvent(key, value []byte, meta byte) error { + return rw.writeEntry(badger.NewEntry(key, value).WithMeta(meta).WithTTL(rw.s.ttl)) +} + +func (rw *ReadWriter) writeEntry(e *badger.Entry) error { + rw.pendingWrites++ + err := rw.txn.SetEntry(e) + if err != badger.ErrTxnTooBig { + return err + } + if err := rw.Flush(); err != nil { + return err + } + return rw.txn.SetEntry(e) +} + +// ReadEvents reads events with the given trace ID from storage into a batch. +// +// ReadEvents may implicitly commit the current transaction when the number +// of pending writes exceeds a threshold. This is due to how Badger internally +// iterates over uncommitted writes, where it will sort keys for each new +// iterator. +func (rw *ReadWriter) ReadEvents(traceID string, out *model.Batch) error { + opts := badger.DefaultIteratorOptions + rw.readKeyBuf = append(append(rw.readKeyBuf[:0], traceID...), ':') + opts.Prefix = rw.readKeyBuf + + // NewIterator slows down with uncommitted writes, as it must sort + // all keys lexicographically. If there are a significant number of + // writes pending, flush first. + if rw.pendingWrites > 100 { + if err := rw.Flush(); err != nil { + return err + } + } + + iter := rw.txn.NewIterator(opts) + defer iter.Close() + for iter.Rewind(); iter.Valid(); iter.Next() { + item := iter.Item() + if item.IsDeletedOrExpired() { + continue + } + switch item.UserMeta() { + case entryMetaTransaction: + var event model.Transaction + if err := item.Value(func(data []byte) error { + return rw.s.codec.DecodeTransaction(data, &event) + }); err != nil { + return err + } + out.Transactions = append(out.Transactions, &event) + case entryMetaSpan: + var event model.Span + if err := item.Value(func(data []byte) error { + return rw.s.codec.DecodeSpan(data, &event) + }); err != nil { + return err + } + out.Spans = append(out.Spans, &event) + default: + // Unknown entry meta: ignore. + continue + } + } + return nil +} diff --git a/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go b/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go new file mode 100644 index 00000000000..90a9e60a527 --- /dev/null +++ b/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go @@ -0,0 +1,162 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package eventstorage_test + +import ( + "encoding/hex" + "fmt" + "testing" + "time" + + "github.com/gofrs/uuid" + "github.com/stretchr/testify/assert" + + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage" +) + +func BenchmarkWriteTransaction(b *testing.B) { + test := func(b *testing.B, codec eventstorage.Codec) { + db := newBadgerDB(b, badgerOptions) + ttl := time.Minute + store := eventstorage.New(db, codec, ttl) + readWriter := store.NewReadWriter() + defer readWriter.Close() + + traceID := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} + transactionID := []byte{1, 2, 3, 4, 5, 6, 7, 8} + transaction := &model.Transaction{ + TraceID: hex.EncodeToString(traceID), + ID: hex.EncodeToString(transactionID), + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := readWriter.WriteTransaction(transaction); err != nil { + b.Fatal(err) + } + } + assert.NoError(b, readWriter.Flush()) + } + b.Run("json_codec", func(b *testing.B) { + test(b, eventstorage.JSONCodec{}) + }) + b.Run("nop_codec", func(b *testing.B) { + // This tests the eventstorage performance without + // JSON encoding. This would be the theoretical + // upper limit of what we can achieve with a more + // efficient codec. + test(b, nopCodec{}) + }) +} + +func BenchmarkReadEvents(b *testing.B) { + traceUUID := uuid.Must(uuid.NewV4()) + + test := func(b *testing.B, codec eventstorage.Codec) { + // Test with varying numbers of events in the trace. + counts := []int{0, 1, 10, 100, 1000} + for _, count := range counts { + b.Run(fmt.Sprintf("%d events", count), func(b *testing.B) { + db := newBadgerDB(b, badgerOptions) + ttl := time.Minute + store := eventstorage.New(db, codec, ttl) + readWriter := store.NewReadWriter() + defer readWriter.Close() + + for i := 0; i < count; i++ { + transactionUUID := uuid.Must(uuid.NewV4()) + transaction := &model.Transaction{ + TraceID: traceUUID.String(), + ID: transactionUUID.String(), + } + if err := readWriter.WriteTransaction(transaction); err != nil { + b.Fatal(err) + } + } + + // NOTE(axw) we don't explicitly flush, which is most representative of + // real workloads. For larger event counts, this ensures we exercise the + // code path which automatically flushes before reads. + + b.ResetTimer() + var batch model.Batch + for i := 0; i < b.N; i++ { + batch.Reset() + if err := readWriter.ReadEvents(traceUUID.String(), &batch); err != nil { + b.Fatal(err) + } + if batch.Len() != count { + panic(fmt.Errorf( + "event count mismatch: expected %d, got %d", + count, batch.Len(), + )) + } + } + }) + } + } + + b.Run("json_codec", func(b *testing.B) { + test(b, eventstorage.JSONCodec{}) + }) + b.Run("nop_codec", func(b *testing.B) { + // This tests the eventstorage performance without + // JSON decoding. This would be the theoretical + // upper limit of what we can achieve with a more + // efficient codec. + test(b, nopCodec{}) + }) +} + +func BenchmarkIsTraceSampled(b *testing.B) { + sampledTraceUUID := uuid.Must(uuid.NewV4()) + unsampledTraceUUID := uuid.Must(uuid.NewV4()) + unknownTraceUUID := uuid.Must(uuid.NewV4()) + + // Test with varying numbers of events in the trace. + db := newBadgerDB(b, badgerOptions) + ttl := time.Minute + store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl) + readWriter := store.NewReadWriter() + defer readWriter.Close() + + if err := readWriter.WriteTraceSampled(sampledTraceUUID.String(), true); err != nil { + b.Fatal(err) + } + if err := readWriter.WriteTraceSampled(unsampledTraceUUID.String(), false); err != nil { + b.Fatal(err) + } + + bench := func(name string, traceID string, expectError bool, expectSampled bool) { + b.Run(name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + sampled, err := readWriter.IsTraceSampled(traceID) + if expectError { + if err == nil { + b.Fatal("expected error") + } + } else { + if err != nil { + b.Fatal(err) + } + if sampled != expectSampled { + b.Fatalf("expected %v, got %v", expectSampled, sampled) + } + } + } + }) + } + bench("sampled", sampledTraceUUID.String(), false, true) + bench("unsampled", unsampledTraceUUID.String(), false, false) + bench("unknown", unknownTraceUUID.String(), true, false) +} + +type nopCodec struct{} + +func (nopCodec) DecodeSpan(data []byte, span *model.Span) error { return nil } +func (nopCodec) DecodeTransaction(data []byte, tx *model.Transaction) error { return nil } +func (nopCodec) EncodeSpan(*model.Span) ([]byte, error) { return nil, nil } +func (nopCodec) EncodeTransaction(*model.Transaction) ([]byte, error) { return nil, nil } diff --git a/x-pack/apm-server/sampling/eventstorage/storage_test.go b/x-pack/apm-server/sampling/eventstorage/storage_test.go new file mode 100644 index 00000000000..8febfffd7f1 --- /dev/null +++ b/x-pack/apm-server/sampling/eventstorage/storage_test.go @@ -0,0 +1,266 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package eventstorage_test + +import ( + "encoding/json" + "testing" + "time" + + "github.com/dgraph-io/badger/v2" + "github.com/gofrs/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage" +) + +func TestWriteEvents(t *testing.T) { + // Run two tests: + // - 1 transaction and 1 span + // - 1 transaction and 100 spans + // + // The latter test will cause ReadEvents to implicitly call flush. + t.Run("no_flush", func(t *testing.T) { + testWriteEvents(t, 1) + }) + t.Run("implicit_flush", func(t *testing.T) { + testWriteEvents(t, 100) + }) +} + +func testWriteEvents(t *testing.T, numSpans int) { + db := newBadgerDB(t, badgerOptions) + ttl := time.Minute + store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl) + readWriter := store.NewShardedReadWriter() + defer readWriter.Close() + + before := time.Now() + + traceUUID := uuid.Must(uuid.NewV4()) + transactionUUID := uuid.Must(uuid.NewV4()) + transaction := &model.Transaction{ + TraceID: traceUUID.String(), + ID: transactionUUID.String(), + } + assert.NoError(t, readWriter.WriteTransaction(transaction)) + + var spans []*model.Span + for i := 0; i < numSpans; i++ { + spanUUID := uuid.Must(uuid.NewV4()) + span := &model.Span{ + TraceID: traceUUID.String(), + ID: spanUUID.String(), + } + assert.NoError(t, readWriter.WriteSpan(span)) + spans = append(spans, span) + } + + // We can read our writes without flushing. + var batch model.Batch + assert.NoError(t, readWriter.ReadEvents(traceUUID.String(), &batch)) + assert.ElementsMatch(t, []*model.Transaction{transaction}, batch.Transactions) + assert.ElementsMatch(t, spans, batch.Spans) + + // Flush in order for the writes to be visible to other readers. + assert.NoError(t, readWriter.Flush()) + + var recorded []interface{} + assert.NoError(t, db.View(func(txn *badger.Txn) error { + iter := txn.NewIterator(badger.IteratorOptions{ + Prefix: []byte(traceUUID.String()), + }) + defer iter.Close() + for iter.Rewind(); iter.Valid(); iter.Next() { + item := iter.Item() + expiresAt := item.ExpiresAt() + expiryTime := time.Unix(int64(expiresAt), 0) + assert.Condition(t, func() bool { + return !before.After(expiryTime) && !expiryTime.After(before.Add(ttl)) + }) + + var value interface{} + switch meta := item.UserMeta(); meta { + case 'T': + value = &model.Transaction{} + case 'S': + value = &model.Span{} + default: + t.Fatalf("invalid meta %q", meta) + } + assert.NoError(t, item.Value(func(data []byte) error { + return json.Unmarshal(data, value) + })) + recorded = append(recorded, value) + } + return nil + })) + assert.ElementsMatch(t, batch.Transformables(), recorded) +} + +func TestWriteTraceSampled(t *testing.T) { + db := newBadgerDB(t, badgerOptions) + ttl := time.Minute + store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl) + readWriter := store.NewShardedReadWriter() + defer readWriter.Close() + + before := time.Now() + assert.NoError(t, readWriter.WriteTraceSampled("sampled_trace_id", true)) + assert.NoError(t, readWriter.WriteTraceSampled("unsampled_trace_id", false)) + + // We can read our writes without flushing. + isSampled, err := readWriter.IsTraceSampled("sampled_trace_id") + assert.NoError(t, err) + assert.True(t, isSampled) + + // Flush in order for the writes to be visible to other readers. + assert.NoError(t, readWriter.Flush()) + + sampled := make(map[string]bool) + assert.NoError(t, db.View(func(txn *badger.Txn) error { + iter := txn.NewIterator(badger.IteratorOptions{}) + defer iter.Close() + for iter.Rewind(); iter.Valid(); iter.Next() { + item := iter.Item() + expiresAt := item.ExpiresAt() + expiryTime := time.Unix(int64(expiresAt), 0) + assert.Condition(t, func() bool { + return !before.After(expiryTime) && !expiryTime.After(before.Add(ttl)) + }) + + key := string(item.Key()) + switch meta := item.UserMeta(); meta { + case 's': + sampled[key] = true + case 'u': + sampled[key] = false + default: + t.Fatalf("invalid meta %q", meta) + } + assert.Zero(t, item.ValueSize()) + } + return nil + })) + assert.Equal(t, map[string]bool{ + "sampled_trace_id": true, + "unsampled_trace_id": false, + }, sampled) +} + +func TestReadEvents(t *testing.T) { + db := newBadgerDB(t, badgerOptions) + ttl := time.Minute + store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl) + + traceID := [...]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} + require.NoError(t, db.Update(func(txn *badger.Txn) error { + key := append(traceID[:], ":12345678"...) + value := []byte(`{"name":"transaction"}`) + if err := txn.SetEntry(badger.NewEntry(key, value).WithMeta('T')); err != nil { + return err + } + + key = append(traceID[:], ":87654321"...) + value = []byte(`{"name":"span"}`) + if err := txn.SetEntry(badger.NewEntry(key, value).WithMeta('S')); err != nil { + return err + } + + // Write an entry with the trace ID as a prefix, but with no + // proceeding colon, causing it to be ignored. + key = append(traceID[:], "nocolon"...) + value = []byte(`not-json`) + if err := txn.SetEntry(badger.NewEntry(key, value).WithMeta('S')); err != nil { + return err + } + + // Write an entry with an unknown meta value. It will be ignored. + key = append(traceID[:], ":11111111"...) + value = []byte(`not-json`) + if err := txn.SetEntry(badger.NewEntry(key, value).WithMeta('?')); err != nil { + return err + } + return nil + })) + + reader := store.NewShardedReadWriter() + defer reader.Close() + + var events model.Batch + assert.NoError(t, reader.ReadEvents(string(traceID[:]), &events)) + assert.Equal(t, []*model.Transaction{{Name: "transaction"}}, events.Transactions) + assert.Equal(t, []*model.Span{{Name: "span"}}, events.Spans) +} + +func TestReadEventsDecodeError(t *testing.T) { + db := newBadgerDB(t, badgerOptions) + ttl := time.Minute + store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl) + + traceID := [...]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} + require.NoError(t, db.Update(func(txn *badger.Txn) error { + key := append(traceID[:], ":12345678"...) + value := []byte(`wat`) + if err := txn.SetEntry(badger.NewEntry(key, value).WithMeta('T')); err != nil { + return err + } + return nil + })) + + reader := store.NewShardedReadWriter() + defer reader.Close() + + var events model.Batch + err := reader.ReadEvents(string(traceID[:]), &events) + assert.Error(t, err) +} + +func TestIsTraceSampled(t *testing.T) { + db := newBadgerDB(t, badgerOptions) + ttl := time.Minute + store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl) + + require.NoError(t, db.Update(func(txn *badger.Txn) error { + if err := txn.SetEntry(badger.NewEntry([]byte("sampled_trace_id"), nil).WithMeta('s')); err != nil { + return err + } + if err := txn.SetEntry(badger.NewEntry([]byte("unsampled_trace_id"), nil).WithMeta('u')); err != nil { + return err + } + return nil + })) + + reader := store.NewShardedReadWriter() + defer reader.Close() + + sampled, err := reader.IsTraceSampled("sampled_trace_id") + assert.NoError(t, err) + assert.True(t, sampled) + + sampled, err = reader.IsTraceSampled("unsampled_trace_id") + assert.NoError(t, err) + assert.False(t, sampled) + + _, err = reader.IsTraceSampled("unknown_trace_id") + assert.Equal(t, err, eventstorage.ErrNotFound) +} + +func badgerOptions() badger.Options { + return badger.DefaultOptions("").WithInMemory(true).WithLogger(nil) +} + +type badgerOptionsFunc func() badger.Options + +func newBadgerDB(tb testing.TB, badgerOptions badgerOptionsFunc) *badger.DB { + db, err := badger.Open(badgerOptions()) + if err != nil { + panic(err) + } + tb.Cleanup(func() { db.Close() }) + return db +}