Skip to content

Commit ccd2fcf

Browse files
committed
sampling/eventstorage: introduce event storage (elastic#4108)
* sampling/eventstorage: introduce event storage Introduce a package which provides local event storage, tailored for tail-based sampling. Storage is backed by Badger (github.com/dgraph-io/badger). The Storage type provides two types for readiing/writing data: ReadWriter, and ShardedReadWriter. ReadWriter is not safe for concurrent access, while ShardedReadWriter is. ShardedReadWriter provides locked access to one of several ReadWriters, sharding on trace ID. Currently we encode transactions and spans as JSON. We should look into developing a more efficient codec later, using protobuf or similar.
1 parent d659a4d commit ccd2fcf

File tree

11 files changed

+930
-2
lines changed

11 files changed

+930
-2
lines changed

NOTICE.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ Contents of "LICENSE":
422422

423423
--------------------------------------------------------------------
424424
Dependency: github.com/dustin/go-humanize
425-
Revision: bb3d318650d4
425+
Version: v1.0.0
426426
License type (autodetected): MIT
427427
Contents of "LICENSE":
428428

go.mod

+6-1
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ require (
99
github.com/cespare/xxhash/v2 v2.1.1
1010
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e
1111
github.com/client9/misspell v0.3.5-0.20180309020325-c0b55c823952 // indirect
12+
github.com/dgraph-io/badger/v2 v2.0.3
13+
github.com/dgraph-io/ristretto v0.0.2 // indirect
1214
github.com/dlclark/regexp2 v1.2.1 // indirect
1315
github.com/dop251/goja v0.0.0-20200824171909-536f9d946569 // indirect
1416
github.com/dop251/goja_nodejs v0.0.0-20200811150831-9bc458b4bbeb // indirect
15-
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4
17+
github.com/dustin/go-humanize v1.0.0
1618
github.com/elastic/apm-server/approvaltest v0.0.0-00010101000000-000000000000
1719
github.com/elastic/beats/v7 v7.0.0-alpha2.0.20200901193040-34159563fa1e
1820
github.com/elastic/go-elasticsearch/v7 v7.9.0
@@ -28,13 +30,16 @@ require (
2830
github.com/golang/protobuf v1.4.2
2931
github.com/google/addlicense v0.0.0-20190907113143-be125746c2c4 // indirect
3032
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99
33+
github.com/hashicorp/go-multierror v1.1.0
3134
github.com/hashicorp/golang-lru v0.5.3
3235
github.com/ianlancetaylor/demangle v0.0.0-20200715173712-053cf528c12f // indirect
3336
github.com/jaegertracing/jaeger v1.16.0
3437
github.com/jcmturner/gofork v1.0.0 // indirect
3538
github.com/josephspurrier/goversioninfo v1.2.0 // indirect
39+
github.com/json-iterator/go v1.1.8
3640
github.com/jstemmer/go-junit-report v0.9.1
3741
github.com/klauspost/compress v1.9.3-0.20191122130757-c099ac9f21dd // indirect
42+
github.com/kr/pretty v0.2.0 // indirect
3843
github.com/magefile/mage v1.10.0
3944
github.com/mattn/go-colorable v0.1.7 // indirect
4045
github.com/mitchellh/hashstructure v1.0.0 // indirect

go.sum

+17
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,16 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
6161
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
6262
github.com/DATA-DOG/godog v0.7.13/go.mod h1:z2OZ6a3X0/YAKVqLfVzYBwFt3j6uSt3Xrqa7XTtcQE0=
6363
github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
64+
github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM=
65+
github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
6466
github.com/Masterminds/semver v1.4.2 h1:WBLTQ37jOCzSLtXNdoo8bNM8876KhNqOKvrlGITgsTc=
6567
github.com/Masterminds/semver v1.4.2/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y=
6668
github.com/Microsoft/go-winio v0.4.15-0.20190919025122-fc70bd9a86b5 h1:ygIc8M6trr62pF5DucadTWGdEB4mEyvzi0e2nbcmcyA=
6769
github.com/Microsoft/go-winio v0.4.15-0.20190919025122-fc70bd9a86b5/go.mod h1:tTuCMEN+UleMWgg9dVx4Hu52b1bJo+59jBh3ajtinzw=
6870
github.com/Microsoft/hcsshim v0.8.7/go.mod h1:OHd7sQqRFrYd3RmSgbgji+ctCwkbq2wbEYNSzOYtcBQ=
6971
github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
7072
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
73+
github.com/OneOfOne/xxhash v1.2.5 h1:zl/OfRA6nftbBK9qTohYBJ5xvw6C/oNKizR7cZGl3cI=
7174
github.com/OneOfOne/xxhash v1.2.5/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
7275
github.com/OpenPeeDeeP/depguard v1.0.1 h1:VlW4R6jmBIv3/u1JNlawEvJMM4J+dPORPaZasQee8Us=
7376
github.com/OpenPeeDeeP/depguard v1.0.1/go.mod h1:xsIw86fROiiwelg+jB2uM9PiKihMMmUx/1V+TNhjQvM=
@@ -194,8 +197,15 @@ github.com/davecgh/go-xdr v0.0.0-20161123171359-e6a2ba005892/go.mod h1:CTDl0pzVz
194197
github.com/denisenkom/go-mssqldb v0.0.0-20200206145737-bbfc9a55622e/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
195198
github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
196199
github.com/devigned/tab v0.1.2-0.20190607222403-0c15cf42f9a2/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
200+
github.com/dgraph-io/badger/v2 v2.0.3 h1:inzdf6VF/NZ+tJ8RwwYMjJMvsOALTHYdozn0qSl6XJI=
201+
github.com/dgraph-io/badger/v2 v2.0.3/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM=
202+
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
203+
github.com/dgraph-io/ristretto v0.0.2 h1:a5WaUrDa0qm0YrAAS1tUykT5El3kt62KNZZeMxQn3po=
204+
github.com/dgraph-io/ristretto v0.0.2/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
197205
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
198206
github.com/dgrijalva/jwt-go v3.2.1-0.20190620180102-5e25c22bd5d6+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
207+
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
208+
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
199209
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
200210
github.com/dgryski/go-sip13 v0.0.0-20190329191031-25c5027a8c7b/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
201211
github.com/digitalocean/go-libvirt v0.0.0-20180301200012-6075ea3c39a1/go.mod h1:PRcPVAAma6zcLpFd4GZrjR/MRpood3TamjKI2m/z/Uw=
@@ -218,6 +228,8 @@ github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6 h1:RrkoB0pT3gnj
218228
github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6/go.mod h1:hn7BA7c8pLvoGndExHudxTDKZ84Pyvv+90pbBjbTz0Y=
219229
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4 h1:qk/FSDDxo05wdJH28W+p5yivv7LuLYLRXPPD8KQCtZs=
220230
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
231+
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
232+
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
221233
github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU=
222234
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
223235
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
@@ -548,6 +560,7 @@ github.com/hashicorp/go-immutable-radix v1.1.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh
548560
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
549561
github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
550562
github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I=
563+
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
551564
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
552565
github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI=
553566
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
@@ -642,6 +655,8 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxv
642655
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
643656
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
644657
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
658+
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
659+
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
645660
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
646661
github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
647662
github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw=
@@ -882,6 +897,7 @@ github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4k
882897
github.com/sourcegraph/go-diff v0.5.1 h1:gO6i5zugwzo1RVTvgvfwCOSVegNuvnNi6bAD1QCmkHs=
883898
github.com/sourcegraph/go-diff v0.5.1/go.mod h1:j2dHj3m8aZgQO8lMTcTnBcXkRRRqi34cd2MNlA9u1mE=
884899
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
900+
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
885901
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
886902
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
887903
github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc=
@@ -1150,6 +1166,7 @@ golang.org/x/sys v0.0.0-20190529164535-6a60838ec259/go.mod h1:h1NjWce9XRLGQEsW7w
11501166
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
11511167
golang.org/x/sys v0.0.0-20190616124812-15dcb6c0061f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
11521168
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
1169+
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
11531170
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
11541171
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
11551172
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package eventstorage
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package eventstorage
6+
7+
import (
8+
"encoding/json"
9+
10+
// NOTE(axw) encoding/json is faster for encoding,
11+
// json-iterator is faster for decoding.
12+
jsoniter "github.com/json-iterator/go"
13+
14+
"github.com/elastic/apm-server/model"
15+
)
16+
17+
// JSONCodec is an implementation of Codec, using JSON encoding.
18+
type JSONCodec struct{}
19+
20+
// DecodeSpan decodes data as JSON into span.
21+
func (JSONCodec) DecodeSpan(data []byte, span *model.Span) error {
22+
return jsoniter.ConfigFastest.Unmarshal(data, span)
23+
}
24+
25+
// DecodeTransaction decodes data as JSON into tx.
26+
func (JSONCodec) DecodeTransaction(data []byte, tx *model.Transaction) error {
27+
return jsoniter.ConfigFastest.Unmarshal(data, tx)
28+
}
29+
30+
// EncodeSpan encodes span as JSON.
31+
func (JSONCodec) EncodeSpan(span *model.Span) ([]byte, error) {
32+
return json.Marshal(span)
33+
}
34+
35+
// EncodeTransaction encodes tx as JSON.
36+
func (JSONCodec) EncodeTransaction(tx *model.Transaction) ([]byte, error) {
37+
return json.Marshal(tx)
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package eventstorage
6+
7+
import "github.com/elastic/beats/v7/libbeat/logp"
8+
9+
// LogpAdaptor adapts logp.Logger to the badger.Logger interface.
10+
type LogpAdaptor struct {
11+
*logp.Logger
12+
}
13+
14+
// Warningf adapts badger.Logger.Warningf to logp.Logger.Warngf.
15+
func (a LogpAdaptor) Warningf(format string, args ...interface{}) {
16+
a.Warnf(format, args...)
17+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package eventstorage
6+
7+
import (
8+
"runtime"
9+
"sync"
10+
11+
"github.com/cespare/xxhash/v2"
12+
"github.com/hashicorp/go-multierror"
13+
14+
"github.com/elastic/apm-server/model"
15+
)
16+
17+
// ShardedReadWriter provides sharded, locked, access to a Storage.
18+
//
19+
// ShardedReadWriter shards on trace ID.
20+
type ShardedReadWriter struct {
21+
readWriters []lockedReadWriter
22+
}
23+
24+
func newShardedReadWriter(storage *Storage) *ShardedReadWriter {
25+
s := &ShardedReadWriter{
26+
// Create as many ReadWriters as there are CPUs,
27+
// so we can ideally minimise lock contention.
28+
readWriters: make([]lockedReadWriter, runtime.NumCPU()),
29+
}
30+
for i := range s.readWriters {
31+
s.readWriters[i].rw = storage.NewReadWriter()
32+
}
33+
return s
34+
}
35+
36+
// Close closes all sharded storage readWriters.
37+
func (s *ShardedReadWriter) Close() {
38+
for i := range s.readWriters {
39+
s.readWriters[i].Close()
40+
}
41+
}
42+
43+
// Flush flushes all sharded storage readWriters.
44+
func (s *ShardedReadWriter) Flush() error {
45+
var result error
46+
for i := range s.readWriters {
47+
if err := s.readWriters[i].Flush(); err != nil {
48+
result = multierror.Append(result, err)
49+
}
50+
}
51+
return result
52+
}
53+
54+
// ReadEvents calls Writer.ReadEvents, using a sharded, locked, Writer.
55+
func (s *ShardedReadWriter) ReadEvents(traceID string, out *model.Batch) error {
56+
return s.getWriter(traceID).ReadEvents(traceID, out)
57+
}
58+
59+
// WriteTransaction calls Writer.WriteTransaction, using a sharded, locked, Writer.
60+
func (s *ShardedReadWriter) WriteTransaction(tx *model.Transaction) error {
61+
return s.getWriter(tx.TraceID).WriteTransaction(tx)
62+
}
63+
64+
// WriteSpan calls Writer.WriteSpan, using a sharded, locked, Writer.
65+
func (s *ShardedReadWriter) WriteSpan(span *model.Span) error {
66+
return s.getWriter(span.TraceID).WriteSpan(span)
67+
}
68+
69+
// WriteTraceSampled calls Writer.WriteTraceSampled, using a sharded, locked, Writer.
70+
func (s *ShardedReadWriter) WriteTraceSampled(traceID string, sampled bool) error {
71+
return s.getWriter(traceID).WriteTraceSampled(traceID, sampled)
72+
}
73+
74+
// IsTraceSampled calls Writer.IsTraceSampled, using a sharded, locked, Writer.
75+
func (s *ShardedReadWriter) IsTraceSampled(traceID string) (bool, error) {
76+
return s.getWriter(traceID).IsTraceSampled(traceID)
77+
}
78+
79+
// getWriter returns an event storage writer for the given trace ID.
80+
//
81+
// This method is idempotent, which is necessary to avoid transaction
82+
// conflicts and ensure all events are reported once a sampling decision
83+
// has been recorded.
84+
func (s *ShardedReadWriter) getWriter(traceID string) *lockedReadWriter {
85+
var h xxhash.Digest
86+
h.WriteString(traceID)
87+
return &s.readWriters[h.Sum64()%uint64(len(s.readWriters))]
88+
}
89+
90+
type lockedReadWriter struct {
91+
mu sync.Mutex
92+
rw *ReadWriter
93+
}
94+
95+
func (rw *lockedReadWriter) Close() {
96+
rw.mu.Lock()
97+
defer rw.mu.Unlock()
98+
rw.rw.Close()
99+
}
100+
101+
func (rw *lockedReadWriter) Flush() error {
102+
rw.mu.Lock()
103+
defer rw.mu.Unlock()
104+
return rw.rw.Flush()
105+
}
106+
107+
func (rw *lockedReadWriter) ReadEvents(traceID string, out *model.Batch) error {
108+
rw.mu.Lock()
109+
defer rw.mu.Unlock()
110+
return rw.rw.ReadEvents(traceID, out)
111+
}
112+
113+
func (rw *lockedReadWriter) WriteTransaction(tx *model.Transaction) error {
114+
rw.mu.Lock()
115+
defer rw.mu.Unlock()
116+
return rw.rw.WriteTransaction(tx)
117+
}
118+
119+
func (rw *lockedReadWriter) WriteSpan(s *model.Span) error {
120+
rw.mu.Lock()
121+
defer rw.mu.Unlock()
122+
return rw.rw.WriteSpan(s)
123+
}
124+
125+
func (rw *lockedReadWriter) WriteTraceSampled(traceID string, sampled bool) error {
126+
rw.mu.Lock()
127+
defer rw.mu.Unlock()
128+
return rw.rw.WriteTraceSampled(traceID, sampled)
129+
}
130+
131+
func (rw *lockedReadWriter) IsTraceSampled(traceID string) (bool, error) {
132+
rw.mu.Lock()
133+
defer rw.mu.Unlock()
134+
return rw.rw.IsTraceSampled(traceID)
135+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package eventstorage_test
6+
7+
import (
8+
"testing"
9+
"time"
10+
11+
"github.com/gofrs/uuid"
12+
13+
"github.com/elastic/apm-server/model"
14+
"github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage"
15+
)
16+
17+
func BenchmarkShardedWriteTransactionUncontended(b *testing.B) {
18+
db := newBadgerDB(b, badgerOptions)
19+
ttl := time.Minute
20+
store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl)
21+
sharded := store.NewShardedReadWriter()
22+
defer sharded.Close()
23+
24+
b.RunParallel(func(pb *testing.PB) {
25+
traceUUID := uuid.Must(uuid.NewV4())
26+
transaction := &model.Transaction{TraceID: traceUUID.String(), ID: traceUUID.String()}
27+
for pb.Next() {
28+
if err := sharded.WriteTransaction(transaction); err != nil {
29+
b.Fatal(err)
30+
}
31+
}
32+
})
33+
}
34+
35+
func BenchmarkShardedWriteTransactionContended(b *testing.B) {
36+
db := newBadgerDB(b, badgerOptions)
37+
ttl := time.Minute
38+
store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl)
39+
sharded := store.NewShardedReadWriter()
40+
defer sharded.Close()
41+
42+
// Use a single trace ID, causing all events to go through
43+
// the same sharded writer, contending for a single lock.
44+
traceUUID := uuid.Must(uuid.NewV4())
45+
46+
b.RunParallel(func(pb *testing.PB) {
47+
transactionUUID := uuid.Must(uuid.NewV4())
48+
transaction := &model.Transaction{TraceID: traceUUID.String(), ID: transactionUUID.String()}
49+
for pb.Next() {
50+
if err := sharded.WriteTransaction(transaction); err != nil {
51+
b.Fatal(err)
52+
}
53+
}
54+
})
55+
}

0 commit comments

Comments
 (0)