-
Notifications
You must be signed in to change notification settings - Fork 529
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
sampling/eventstorage: introduce event storage
Introduce a package which provides local event storage, tailored for tail-based sampling. Storage is backed by Badger (github.com/dgraph-io/badger). The Storage type provides two types for readiing/writing data: ReadWriter, and ShardedReadWriter. ReadWriter is not safe for concurrent access, while ShardedReadWriter is. ShardedReadWriter provides locked access to one of several ReadWriters, sharding on trace ID. Currently we encode transactions and spans as JSON. We should look into developing a more efficient codec later, using protobuf or similar.
- Loading branch information
Showing
10 changed files
with
918 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
package eventstorage |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
package eventstorage | ||
|
||
import ( | ||
"encoding/json" | ||
|
||
// NOTE(axw) encoding/json is faster for encoding, | ||
// json-iterator is faster for decoding. | ||
jsoniter "github.com/json-iterator/go" | ||
|
||
"github.com/elastic/apm-server/model" | ||
) | ||
|
||
// JSONCodec is an implementation of Codec, using JSON encoding. | ||
type JSONCodec struct{} | ||
|
||
// DecodeSpan decodes data as JSON into span. | ||
func (JSONCodec) DecodeSpan(data []byte, span *model.Span) error { | ||
return jsoniter.ConfigFastest.Unmarshal(data, span) | ||
} | ||
|
||
// DecodeTransaction decodes data as JSON into tx. | ||
func (JSONCodec) DecodeTransaction(data []byte, tx *model.Transaction) error { | ||
return jsoniter.ConfigFastest.Unmarshal(data, tx) | ||
} | ||
|
||
// EncodeSpan encodes span as JSON. | ||
func (JSONCodec) EncodeSpan(span *model.Span) ([]byte, error) { | ||
return json.Marshal(span) | ||
} | ||
|
||
// EncodeTransaction encodes tx as JSON. | ||
func (JSONCodec) EncodeTransaction(tx *model.Transaction) ([]byte, error) { | ||
return json.Marshal(tx) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package eventstorage | ||
|
||
import "github.com/elastic/beats/v7/libbeat/logp" | ||
|
||
// LogpAdaptor adapts logp.Logger to the badger.Logger interface. | ||
type LogpAdaptor struct { | ||
*logp.Logger | ||
} | ||
|
||
// Warningf adapts badger.Logger.Warningf to logp.Logger.Warngf. | ||
func (a LogpAdaptor) Warningf(format string, args ...interface{}) { | ||
a.Warnf(format, args...) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package eventstorage | ||
|
||
import ( | ||
"runtime" | ||
"sync" | ||
|
||
"github.com/cespare/xxhash/v2" | ||
"github.com/hashicorp/go-multierror" | ||
|
||
"github.com/elastic/apm-server/model" | ||
) | ||
|
||
// ShardedReadWriter provides sharded, locked, access to a Storage. | ||
// | ||
// ShardedReadWriter shards on trace ID. | ||
type ShardedReadWriter struct { | ||
readWriters []lockedReadWriter | ||
} | ||
|
||
func newShardedReadWriter(storage *Storage) *ShardedReadWriter { | ||
s := &ShardedReadWriter{ | ||
// Create as many ReadWriters as there are CPUs, | ||
// so we can ideally minimise lock contention. | ||
readWriters: make([]lockedReadWriter, runtime.NumCPU()), | ||
} | ||
for i := range s.readWriters { | ||
s.readWriters[i].rw = storage.NewReadWriter() | ||
} | ||
return s | ||
} | ||
|
||
// Close closes all sharded storage readWriters. | ||
func (s *ShardedReadWriter) Close() { | ||
for i := range s.readWriters { | ||
s.readWriters[i].Close() | ||
} | ||
} | ||
|
||
// Flush flushes all sharded storage readWriters. | ||
func (s *ShardedReadWriter) Flush() error { | ||
var result error | ||
for i := range s.readWriters { | ||
if err := s.readWriters[i].Flush(); err != nil { | ||
result = multierror.Append(result, err) | ||
} | ||
} | ||
return result | ||
} | ||
|
||
// ReadEvents calls Writer.ReadEvents, using a sharded, locked, Writer. | ||
func (s *ShardedReadWriter) ReadEvents(traceID string, out *model.Batch) error { | ||
return s.getWriter(traceID).ReadEvents(traceID, out) | ||
} | ||
|
||
// WriteTransaction calls Writer.WriteTransaction, using a sharded, locked, Writer. | ||
func (s *ShardedReadWriter) WriteTransaction(tx *model.Transaction) error { | ||
return s.getWriter(tx.TraceID).WriteTransaction(tx) | ||
} | ||
|
||
// WriteSpan calls Writer.WriteSpan, using a sharded, locked, Writer. | ||
func (s *ShardedReadWriter) WriteSpan(span *model.Span) error { | ||
return s.getWriter(span.TraceID).WriteSpan(span) | ||
} | ||
|
||
// WriteTraceSampled calls Writer.WriteTraceSampled, using a sharded, locked, Writer. | ||
func (s *ShardedReadWriter) WriteTraceSampled(traceID string, sampled bool) error { | ||
return s.getWriter(traceID).WriteTraceSampled(traceID, sampled) | ||
} | ||
|
||
// IsTraceSampled calls Writer.IsTraceSampled, using a sharded, locked, Writer. | ||
func (s *ShardedReadWriter) IsTraceSampled(traceID string) (bool, error) { | ||
return s.getWriter(traceID).IsTraceSampled(traceID) | ||
} | ||
|
||
// getWriter returns an event storage writer for the given trace ID. | ||
// | ||
// This method is idempotent, which is necessary to avoid transaction | ||
// conflicts and ensure all events are reported once a sampling decision | ||
// has been recorded. | ||
func (s *ShardedReadWriter) getWriter(traceID string) *lockedReadWriter { | ||
var h xxhash.Digest | ||
h.WriteString(traceID) | ||
return &s.readWriters[h.Sum64()%uint64(len(s.readWriters))] | ||
} | ||
|
||
type lockedReadWriter struct { | ||
mu sync.Mutex | ||
rw *ReadWriter | ||
} | ||
|
||
func (rw *lockedReadWriter) Close() { | ||
rw.mu.Lock() | ||
defer rw.mu.Unlock() | ||
rw.rw.Close() | ||
} | ||
|
||
func (rw *lockedReadWriter) Flush() error { | ||
rw.mu.Lock() | ||
defer rw.mu.Unlock() | ||
return rw.rw.Flush() | ||
} | ||
|
||
func (rw *lockedReadWriter) ReadEvents(traceID string, out *model.Batch) error { | ||
rw.mu.Lock() | ||
defer rw.mu.Unlock() | ||
return rw.rw.ReadEvents(traceID, out) | ||
} | ||
|
||
func (rw *lockedReadWriter) WriteTransaction(tx *model.Transaction) error { | ||
rw.mu.Lock() | ||
defer rw.mu.Unlock() | ||
return rw.rw.WriteTransaction(tx) | ||
} | ||
|
||
func (rw *lockedReadWriter) WriteSpan(s *model.Span) error { | ||
rw.mu.Lock() | ||
defer rw.mu.Unlock() | ||
return rw.rw.WriteSpan(s) | ||
} | ||
|
||
func (rw *lockedReadWriter) WriteTraceSampled(traceID string, sampled bool) error { | ||
rw.mu.Lock() | ||
defer rw.mu.Unlock() | ||
return rw.rw.WriteTraceSampled(traceID, sampled) | ||
} | ||
|
||
func (rw *lockedReadWriter) IsTraceSampled(traceID string) (bool, error) { | ||
rw.mu.Lock() | ||
defer rw.mu.Unlock() | ||
return rw.rw.IsTraceSampled(traceID) | ||
} |
51 changes: 51 additions & 0 deletions
51
x-pack/apm-server/sampling/eventstorage/sharded_bench_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
package eventstorage_test | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/gofrs/uuid" | ||
|
||
"github.com/elastic/apm-server/model" | ||
"github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage" | ||
) | ||
|
||
func BenchmarkShardedWriteTransactionUncontended(b *testing.B) { | ||
db := newBadgerDB(b, badgerOptions) | ||
ttl := time.Minute | ||
store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl) | ||
sharded := store.NewShardedReadWriter() | ||
defer sharded.Close() | ||
|
||
b.RunParallel(func(pb *testing.PB) { | ||
traceUUID := uuid.Must(uuid.NewV4()) | ||
transaction := &model.Transaction{TraceID: traceUUID.String(), ID: traceUUID.String()} | ||
for pb.Next() { | ||
if err := sharded.WriteTransaction(transaction); err != nil { | ||
b.Fatal(err) | ||
} | ||
} | ||
}) | ||
} | ||
|
||
func BenchmarkShardedWriteTransactionContended(b *testing.B) { | ||
db := newBadgerDB(b, badgerOptions) | ||
ttl := time.Minute | ||
store := eventstorage.New(db, eventstorage.JSONCodec{}, ttl) | ||
sharded := store.NewShardedReadWriter() | ||
defer sharded.Close() | ||
|
||
// Use a single trace ID, causing all events to go through | ||
// the same sharded writer, contending for a single lock. | ||
traceUUID := uuid.Must(uuid.NewV4()) | ||
|
||
b.RunParallel(func(pb *testing.PB) { | ||
transactionUUID := uuid.Must(uuid.NewV4()) | ||
transaction := &model.Transaction{TraceID: traceUUID.String(), ID: transactionUUID.String()} | ||
for pb.Next() { | ||
if err := sharded.WriteTransaction(transaction); err != nil { | ||
b.Fatal(err) | ||
} | ||
} | ||
}) | ||
} |
Oops, something went wrong.