From 032b69da1e529366f0be77376259969963215d03 Mon Sep 17 00:00:00 2001 From: Israel Blancas Date: Thu, 27 Feb 2025 19:45:48 +0100 Subject: [PATCH] Bytes based batching for traces Signed-off-by: Israel Blancas --- .../internal/sizer/traces_sizer.go | 67 +++++++++++ .../internal/sizer/traces_sizer_test.go | 56 +++++++++ exporter/exporterhelper/traces.go | 26 +++-- exporter/exporterhelper/traces_batch.go | 94 +++++++++------- exporter/exporterhelper/traces_batch_test.go | 106 +++++++++++++++++- pdata/ptrace/pb.go | 12 ++ 6 files changed, 312 insertions(+), 49 deletions(-) create mode 100644 exporter/exporterhelper/internal/sizer/traces_sizer.go create mode 100644 exporter/exporterhelper/internal/sizer/traces_sizer_test.go diff --git a/exporter/exporterhelper/internal/sizer/traces_sizer.go b/exporter/exporterhelper/internal/sizer/traces_sizer.go new file mode 100644 index 00000000000..894a8196a56 --- /dev/null +++ b/exporter/exporterhelper/internal/sizer/traces_sizer.go @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer" + +import ( + math_bits "math/bits" + + "go.opentelemetry.io/collector/pdata/ptrace" +) + +type TracesSizer interface { + TracesSize(ld ptrace.Traces) int + ResourceSpansSize(rs ptrace.ResourceSpans) int + ScopeSpansSize(ss ptrace.ScopeSpans) int + SpanSize(span ptrace.Span) int + // DeltaSize() returns the delta size when a span is added. + DeltaSize(newItemSize int) int +} + +// TracesBytesSizer returns the byte size of serialized protos. +type TracesBytesSizer struct { + ptrace.ProtoMarshaler +} + +// DeltaSize() returns the delta size of a proto slice when a new item is added. +// Example: +// +// prevSize := proto1.Size() +// proto1.RepeatedField().AppendEmpty() = proto2 +// +// Then currSize of proto1 can be calculated as +// +// currSize := (prevSize + sizer.DeltaSize(proto2.Size())) +// +// This is derived from pdata/internal/data/protogen/trace/v1/trace.pb.go +// which is generated with gogo/protobuf. +func (s *TracesBytesSizer) DeltaSize(newItemSize int) int { + return 1 + newItemSize + math_bits.Len64(uint64(newItemSize|1)+6)/7 //nolint:gosec // disable G115 +} + +// TracesCountSizer returns the number of spans in the traces. +type TracesCountSizer struct{} + +func (s *TracesCountSizer) TracesSize(td ptrace.Traces) int { + return td.SpanCount() +} + +func (s *TracesCountSizer) ResourceSpansSize(rs ptrace.ResourceSpans) int { + count := 0 + for k := 0; k < rs.ScopeSpans().Len(); k++ { + count += rs.ScopeSpans().At(k).Spans().Len() + } + return count +} + +func (s *TracesCountSizer) ScopeSpansSize(ss ptrace.ScopeSpans) int { + return ss.Spans().Len() +} + +func (s *TracesCountSizer) SpanSize(_ ptrace.Span) int { + return 1 +} + +func (s *TracesCountSizer) DeltaSize(newItemSize int) int { + return newItemSize +} diff --git a/exporter/exporterhelper/internal/sizer/traces_sizer_test.go b/exporter/exporterhelper/internal/sizer/traces_sizer_test.go new file mode 100644 index 00000000000..68be415b085 --- /dev/null +++ b/exporter/exporterhelper/internal/sizer/traces_sizer_test.go @@ -0,0 +1,56 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 +package sizer + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/pdata/testdata" +) + +func TestTracesCountSizer(t *testing.T) { + td := testdata.GenerateTraces(5) + sizer := TracesCountSizer{} + require.Equal(t, 5, sizer.TracesSize(td)) + + rs := td.ResourceSpans().At(0) + require.Equal(t, 5, sizer.ResourceSpansSize(rs)) + + ss := rs.ScopeSpans().At(0) + require.Equal(t, 5, sizer.ScopeSpansSize(ss)) + + require.Equal(t, 1, sizer.SpanSize(ss.Spans().At(0))) + require.Equal(t, 1, sizer.SpanSize(ss.Spans().At(1))) + require.Equal(t, 1, sizer.SpanSize(ss.Spans().At(2))) + require.Equal(t, 1, sizer.SpanSize(ss.Spans().At(3))) + require.Equal(t, 1, sizer.SpanSize(ss.Spans().At(4))) + + prevSize := sizer.ScopeSpansSize(ss) + span := ss.Spans().At(2) + span.CopyTo(ss.Spans().AppendEmpty()) + require.Equal(t, sizer.ScopeSpansSize(ss), prevSize+sizer.DeltaSize(sizer.SpanSize(span))) +} + +func TestTracesBytesSizer(t *testing.T) { + td := testdata.GenerateTraces(2) + sizer := TracesBytesSizer{} + require.Equal(t, 338, sizer.TracesSize(td)) + + rs := td.ResourceSpans().At(0) + require.Equal(t, 335, sizer.ResourceSpansSize(rs)) + + ss := rs.ScopeSpans().At(0) + require.Equal(t, 290, sizer.ScopeSpansSize(ss)) + + require.Equal(t, 187, sizer.SpanSize(ss.Spans().At(0))) + require.Equal(t, 96, sizer.SpanSize(ss.Spans().At(1))) + + prevSize := sizer.ScopeSpansSize(ss) + span := ss.Spans().At(1) + spanSize := sizer.SpanSize(span) + span.CopyTo(ss.Spans().AppendEmpty()) + ds := sizer.DeltaSize(spanSize) + require.Equal(t, prevSize+ds, sizer.ScopeSpansSize(ss)) +} diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 602917a7306..c4089af7f3c 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer" "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pipeline" @@ -25,16 +26,16 @@ var ( ) type tracesRequest struct { - td ptrace.Traces - pusher consumer.ConsumeTracesFunc - cachedItemsCount int + td ptrace.Traces + pusher consumer.ConsumeTracesFunc + cachedSize int } func newTracesRequest(td ptrace.Traces, pusher consumer.ConsumeTracesFunc) Request { return &tracesRequest{ - td: td, - pusher: pusher, - cachedItemsCount: td.SpanCount(), + td: td, + pusher: pusher, + cachedSize: -1, } } @@ -65,11 +66,18 @@ func (req *tracesRequest) Export(ctx context.Context) error { } func (req *tracesRequest) ItemsCount() int { - return req.cachedItemsCount + return req.td.SpanCount() } -func (req *tracesRequest) setCachedItemsCount(count int) { - req.cachedItemsCount = count +func (req *tracesRequest) Size(sizer sizer.TracesSizer) int { + if req.cachedSize == -1 { + req.cachedSize = sizer.TracesSize(req.td) + } + return req.cachedSize +} + +func (req *tracesRequest) setCachedSize(size int) { + req.cachedSize = size } type tracesExporter struct { diff --git a/exporter/exporterhelper/traces_batch.go b/exporter/exporterhelper/traces_batch.go index c7cf43c6b1d..7981a84ad8b 100644 --- a/exporter/exporterhelper/traces_batch.go +++ b/exporter/exporterhelper/traces_batch.go @@ -8,57 +8,77 @@ import ( "errors" "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer" "go.opentelemetry.io/collector/pdata/ptrace" ) +func tracesSizerFromConfig(cfg exporterbatcher.SizeConfig) sizer.TracesSizer { + switch cfg.Sizer { + case exporterbatcher.SizerTypeItems: + return &sizer.TracesCountSizer{} + case exporterbatcher.SizerTypeBytes: + return &sizer.TracesBytesSizer{} + default: + return &sizer.TracesCountSizer{} + } +} + // MergeSplit splits and/or merges the provided traces request and the current request into one or more requests // conforming with the MaxSizeConfig. func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.SizeConfig, r2 Request) ([]Request, error) { + sizer := tracesSizerFromConfig(cfg) if r2 != nil { req2, ok := r2.(*tracesRequest) if !ok { return nil, errors.New("invalid input type") } - req2.mergeTo(req) + req2.mergeTo(req, sizer) } // If no limit we can simply merge the new request into the current and return. if cfg.MaxSize == 0 { return []Request{req}, nil } - return req.split(cfg) + return req.split(cfg.MaxSize, sizer), nil } -func (req *tracesRequest) mergeTo(dst *tracesRequest) { - dst.setCachedItemsCount(dst.ItemsCount() + req.ItemsCount()) - req.setCachedItemsCount(0) +func (req *tracesRequest) mergeTo(dst *tracesRequest, sizer sizer.TracesSizer) { + if sizer != nil { + dst.setCachedSize(dst.Size(sizer) + req.Size(sizer)) + req.setCachedSize(0) + } req.td.ResourceSpans().MoveAndAppendTo(dst.td.ResourceSpans()) } -func (req *tracesRequest) split(cfg exporterbatcher.SizeConfig) ([]Request, error) { +func (req *tracesRequest) split(maxSize int, sizer sizer.TracesSizer) []Request { var res []Request - for req.ItemsCount() > cfg.MaxSize { - td := extractTraces(req.td, cfg.MaxSize) - size := td.SpanCount() - req.setCachedItemsCount(req.ItemsCount() - size) - res = append(res, &tracesRequest{td: td, pusher: req.pusher, cachedItemsCount: size}) + for req.Size(sizer) > maxSize { + td := extractTraces(req.td, maxSize, sizer) + size := sizer.TracesSize(td) + req.setCachedSize(req.Size(sizer) - size) + res = append(res, &tracesRequest{td: td, pusher: req.pusher, cachedSize: size}) } res = append(res, req) - return res, nil + return res } // extractTraces extracts a new traces with a maximum number of spans. -func extractTraces(srcTraces ptrace.Traces, count int) ptrace.Traces { +func extractTraces(srcTraces ptrace.Traces, capacity int, sizer sizer.TracesSizer) ptrace.Traces { + capacityReached := false destTraces := ptrace.NewTraces() + capacityLeft := capacity - sizer.TracesSize(destTraces) srcTraces.ResourceSpans().RemoveIf(func(srcRS ptrace.ResourceSpans) bool { - if count == 0 { + if capacityReached { return false } - needToExtract := resourceTracesCount(srcRS) > count + needToExtract := sizer.ResourceSpansSize(srcRS) > capacityLeft if needToExtract { - srcRS = extractResourceSpans(srcRS, count) + srcRS, capacityReached = extractResourceSpans(srcRS, capacityLeft, sizer) + if srcRS.ScopeSpans().Len() == 0 { + return false + } } - count -= resourceTracesCount(srcRS) + capacityLeft -= sizer.DeltaSize(sizer.ResourceSpansSize(srcRS)) srcRS.MoveTo(destTraces.ResourceSpans().AppendEmpty()) return !needToExtract }) @@ -66,48 +86,46 @@ func extractTraces(srcTraces ptrace.Traces, count int) ptrace.Traces { } // extractResourceSpans extracts spans and returns a new resource spans with the specified number of spans. -func extractResourceSpans(srcRS ptrace.ResourceSpans, count int) ptrace.ResourceSpans { +func extractResourceSpans(srcRS ptrace.ResourceSpans, capacity int, sizer sizer.TracesSizer) (ptrace.ResourceSpans, bool) { + capacityReached := false destRS := ptrace.NewResourceSpans() destRS.SetSchemaUrl(srcRS.SchemaUrl()) srcRS.Resource().CopyTo(destRS.Resource()) + capacityLeft := capacity - sizer.ResourceSpansSize(destRS) srcRS.ScopeSpans().RemoveIf(func(srcSS ptrace.ScopeSpans) bool { - if count == 0 { + if capacityReached { return false } - needToExtract := srcSS.Spans().Len() > count + needToExtract := sizer.ScopeSpansSize(srcSS) > capacityLeft if needToExtract { - srcSS = extractScopeSpans(srcSS, count) + srcSS, capacityReached = extractScopeSpans(srcSS, capacityLeft, sizer) + if srcSS.Spans().Len() == 0 { + return false + } } - count -= srcSS.Spans().Len() + capacityLeft -= sizer.DeltaSize(sizer.ScopeSpansSize(srcSS)) srcSS.MoveTo(destRS.ScopeSpans().AppendEmpty()) return !needToExtract }) - srcRS.Resource().CopyTo(destRS.Resource()) - return destRS + return destRS, capacityReached } // extractScopeSpans extracts spans and returns a new scope spans with the specified number of spans. -func extractScopeSpans(srcSS ptrace.ScopeSpans, count int) ptrace.ScopeSpans { +func extractScopeSpans(srcSS ptrace.ScopeSpans, capacity int, sizer sizer.TracesSizer) (ptrace.ScopeSpans, bool) { + capacityReached := false destSS := ptrace.NewScopeSpans() destSS.SetSchemaUrl(srcSS.SchemaUrl()) srcSS.Scope().CopyTo(destSS.Scope()) + capacityLeft := capacity - sizer.ScopeSpansSize(destSS) srcSS.Spans().RemoveIf(func(srcSpan ptrace.Span) bool { - if count == 0 { + if capacityReached || sizer.SpanSize(srcSpan) > capacityLeft { + capacityReached = true return false } + capacityLeft -= sizer.DeltaSize(sizer.SpanSize(srcSpan)) + srcSpan.MoveTo(destSS.Spans().AppendEmpty()) - count-- return true }) - return destSS -} - -// resourceTracesCount calculates the total number of spans in the pdata.ResourceSpans. -func resourceTracesCount(rs ptrace.ResourceSpans) int { - count := 0 - rs.ScopeSpans().RemoveIf(func(ss ptrace.ScopeSpans) bool { - count += ss.Spans().Len() - return false - }) - return count + return destSS, capacityReached } diff --git a/exporter/exporterhelper/traces_batch_test.go b/exporter/exporterhelper/traces_batch_test.go index 2dc8f3b4f5f..92f387cb5eb 100644 --- a/exporter/exporterhelper/traces_batch_test.go +++ b/exporter/exporterhelper/traces_batch_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/testdata" ) @@ -130,7 +131,108 @@ func TestMergeSplitTraces(t *testing.T) { require.NoError(t, err) assert.Equal(t, len(tt.expected), len(res)) for i := range res { - assert.Equal(t, tt.expected[i], res[i]) + assert.Equal(t, tt.expected[i].(*tracesRequest).td, res[i].(*tracesRequest).td) + } + }) + } +} + +func TestMergeSplitTracesBasedOnByteSize(t *testing.T) { + tests := []struct { + name string + cfg exporterbatcher.SizeConfig + lr1 Request + lr2 Request + expected []Request + }{ + { + name: "both_requests_empty", + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: tracesMarshaler.TracesSize(testdata.GenerateTraces(10))}, + lr1: newTracesRequest(ptrace.NewTraces(), nil), + lr2: newTracesRequest(ptrace.NewTraces(), nil), + expected: []Request{newTracesRequest(ptrace.NewTraces(), nil)}, + }, + { + name: "first_request_empty", + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: tracesMarshaler.TracesSize(testdata.GenerateTraces(10))}, + lr1: newTracesRequest(ptrace.NewTraces(), nil), + lr2: newTracesRequest(testdata.GenerateTraces(5), nil), + expected: []Request{newTracesRequest(testdata.GenerateTraces(5), nil)}, + }, + { + name: "first_empty_second_nil", + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: tracesMarshaler.TracesSize(testdata.GenerateTraces(10))}, + lr1: newTracesRequest(ptrace.NewTraces(), nil), + lr2: nil, + expected: []Request{newTracesRequest(ptrace.NewTraces(), nil)}, + }, + { + name: "merge_only", + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: tracesMarshaler.TracesSize(testdata.GenerateTraces(10))}, + lr1: newTracesRequest(testdata.GenerateTraces(1), nil), + lr2: newTracesRequest(testdata.GenerateTraces(6), nil), + expected: []Request{newTracesRequest(func() ptrace.Traces { + traces := testdata.GenerateTraces(1) + testdata.GenerateTraces(6).ResourceSpans().MoveAndAppendTo(traces.ResourceSpans()) + return traces + }(), nil)}, + }, + { + name: "split_only", + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: tracesMarshaler.TracesSize(testdata.GenerateTraces(4))}, + lr1: newTracesRequest(ptrace.NewTraces(), nil), + lr2: newTracesRequest(testdata.GenerateTraces(10), nil), + expected: []Request{ + newTracesRequest(testdata.GenerateTraces(4), nil), + newTracesRequest(testdata.GenerateTraces(4), nil), + newTracesRequest(testdata.GenerateTraces(2), nil), + }, + }, + { + name: "merge_and_split", + cfg: exporterbatcher.SizeConfig{ + Sizer: exporterbatcher.SizerTypeBytes, + MaxSize: tracesMarshaler.TracesSize(testdata.GenerateTraces(10))/2 + tracesMarshaler.TracesSize(testdata.GenerateTraces(11))/2, + }, + lr1: newTracesRequest(testdata.GenerateTraces(8), nil), + lr2: newTracesRequest(testdata.GenerateTraces(20), nil), + expected: []Request{ + newTracesRequest(func() ptrace.Traces { + traces := testdata.GenerateTraces(8) + testdata.GenerateTraces(2).ResourceSpans().MoveAndAppendTo(traces.ResourceSpans()) + return traces + }(), nil), + newTracesRequest(testdata.GenerateTraces(10), nil), + newTracesRequest(testdata.GenerateTraces(8), nil), + }, + }, + { + name: "scope_spans_split", + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: tracesMarshaler.TracesSize(testdata.GenerateTraces(4))}, + lr1: newTracesRequest(func() ptrace.Traces { + ld := testdata.GenerateTraces(4) + ld.ResourceSpans().At(0).ScopeSpans().AppendEmpty().Spans().AppendEmpty().Attributes().PutStr("attr", "attrvalue") + return ld + }(), nil), + lr2: newTracesRequest(testdata.GenerateTraces(2), nil), + expected: []Request{ + newTracesRequest(testdata.GenerateTraces(4), nil), + newTracesRequest(func() ptrace.Traces { + ld := testdata.GenerateTraces(0) + ld.ResourceSpans().At(0).ScopeSpans().At(0).Spans().AppendEmpty().Attributes().PutStr("attr", "attrvalue") + testdata.GenerateTraces(2).ResourceSpans().MoveAndAppendTo(ld.ResourceSpans()) + return ld + }(), nil), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res, err := tt.lr1.MergeSplit(context.Background(), tt.cfg, tt.lr2) + require.NoError(t, err) + assert.Equal(t, len(tt.expected), len(res)) + for i := range res { + assert.Equal(t, tt.expected[i].(*tracesRequest).td, res[i].(*tracesRequest).td) } }) } @@ -154,7 +256,7 @@ func TestMergeSplitTracesInvalidInput(t *testing.T) { func TestExtractTraces(t *testing.T) { for i := 0; i < 10; i++ { td := testdata.GenerateTraces(10) - extractedTraces := extractTraces(td, i) + extractedTraces := extractTraces(td, i, &sizer.TracesCountSizer{}) assert.Equal(t, i, extractedTraces.SpanCount()) assert.Equal(t, 10-i, td.SpanCount()) } diff --git a/pdata/ptrace/pb.go b/pdata/ptrace/pb.go index e0b2168884a..a3c78be27c1 100644 --- a/pdata/ptrace/pb.go +++ b/pdata/ptrace/pb.go @@ -22,6 +22,18 @@ func (e *ProtoMarshaler) TracesSize(td Traces) int { return pb.Size() } +func (e *ProtoMarshaler) ResourceSpansSize(rs ResourceSpans) int { + return rs.orig.Size() +} + +func (e *ProtoMarshaler) ScopeSpansSize(ss ScopeSpans) int { + return ss.orig.Size() +} + +func (e *ProtoMarshaler) SpanSize(span Span) int { + return span.orig.Size() +} + type ProtoUnmarshaler struct{} func (d *ProtoUnmarshaler) UnmarshalTraces(buf []byte) (Traces, error) {