diff --git a/exporter/exporterbatcher/sizer_type.go b/exporter/exporterbatcher/sizer_type.go index 718a0597ecb..fb93398eeba 100644 --- a/exporter/exporterbatcher/sizer_type.go +++ b/exporter/exporterbatcher/sizer_type.go @@ -13,15 +13,20 @@ type SizerType struct { const ( sizerTypeItems = "items" + sizerTypeBytes = "bytes" ) -var SizerTypeItems = SizerType{val: sizerTypeItems} +var ( + SizerTypeItems = SizerType{val: sizerTypeItems} + SizerTypeBytes = SizerType{val: sizerTypeBytes} +) // UnmarshalText implements TextUnmarshaler interface. func (s *SizerType) UnmarshalText(text []byte) error { switch str := string(text); str { case sizerTypeItems: *s = SizerTypeItems + // TODO support setting sizer to SizerTypeBytes when all logs, traces, and metrics batching support it default: return fmt.Errorf("invalid sizer: %q", str) } diff --git a/exporter/exporterhelper/internal/sizer/logs_sizer.go b/exporter/exporterhelper/internal/sizer/logs_sizer.go new file mode 100644 index 00000000000..7e7683ee34c --- /dev/null +++ b/exporter/exporterhelper/internal/sizer/logs_sizer.go @@ -0,0 +1,68 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer" + +import ( + math_bits "math/bits" + + "go.opentelemetry.io/collector/pdata/plog" +) + +type LogsSizer interface { + LogsSize(ld plog.Logs) int + ResourceLogsSize(rl plog.ResourceLogs) int + ScopeLogsSize(sl plog.ScopeLogs) int + LogRecordSize(lr plog.LogRecord) int + + // DeltaSize() returns the delta size when a ResourceLog, ScopeLog or LogRecord is added. + DeltaSize(newItemSize int) int +} + +// LogsByteSizer returns the byte size of serialized protos. +type LogsBytesSizer struct { + plog.ProtoMarshaler +} + +// DeltaSize() returns the delta size of a proto slice when a new item is added. +// Example: +// +// prevSize := proto1.Size() +// proto1.RepeatedField().AppendEmpty() = proto2 +// +// Then currSize of proto1 can be calculated as +// +// currSize := (prevSize + sizer.DeltaSize(proto2.Size())) +// +// This is derived from opentelemetry-collector/pdata/internal/data/protogen/logs/v1/logs.pb.go +// which is generated with gogo/protobuf. +func (s *LogsBytesSizer) DeltaSize(newItemSize int) int { + return 1 + newItemSize + math_bits.Len64(uint64(newItemSize|1)+6)/7 //nolint:gosec // disable G115 +} + +// LogsCountSizer returns the nunmber of logs entries. +type LogsCountSizer struct{} + +func (s *LogsCountSizer) LogsSize(ld plog.Logs) int { + return ld.LogRecordCount() +} + +func (s *LogsCountSizer) ResourceLogsSize(rl plog.ResourceLogs) int { + count := 0 + for k := 0; k < rl.ScopeLogs().Len(); k++ { + count += rl.ScopeLogs().At(k).LogRecords().Len() + } + return count +} + +func (s *LogsCountSizer) ScopeLogsSize(sl plog.ScopeLogs) int { + return sl.LogRecords().Len() +} + +func (s *LogsCountSizer) LogRecordSize(_ plog.LogRecord) int { + return 1 +} + +func (s *LogsCountSizer) DeltaSize(newItemSize int) int { + return newItemSize +} diff --git a/exporter/exporterhelper/internal/sizer/logs_sizer_test.go b/exporter/exporterhelper/internal/sizer/logs_sizer_test.go new file mode 100644 index 00000000000..196d1655ede --- /dev/null +++ b/exporter/exporterhelper/internal/sizer/logs_sizer_test.go @@ -0,0 +1,57 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 +package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer" + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/pdata/testdata" +) + +func TestLogsCountSizer(t *testing.T) { + ld := testdata.GenerateLogs(5) + sizer := LogsCountSizer{} + require.Equal(t, 5, sizer.LogsSize(ld)) + + rl := ld.ResourceLogs().At(0) + require.Equal(t, 5, sizer.ResourceLogsSize(rl)) + + sl := rl.ScopeLogs().At(0) + require.Equal(t, 5, sizer.ScopeLogsSize(sl)) + + require.Equal(t, 1, sizer.LogRecordSize(sl.LogRecords().At(0))) + require.Equal(t, 1, sizer.LogRecordSize(sl.LogRecords().At(1))) + require.Equal(t, 1, sizer.LogRecordSize(sl.LogRecords().At(2))) + require.Equal(t, 1, sizer.LogRecordSize(sl.LogRecords().At(3))) + require.Equal(t, 1, sizer.LogRecordSize(sl.LogRecords().At(4))) + + prevSize := sizer.ScopeLogsSize(sl) + lr := sl.LogRecords().At(2) + lr.CopyTo(sl.LogRecords().AppendEmpty()) + require.Equal(t, sizer.ScopeLogsSize(sl), prevSize+sizer.DeltaSize(sizer.LogRecordSize(lr))) +} + +func TestLogsBytesSizer(t *testing.T) { + ld := testdata.GenerateLogs(5) + sizer := LogsBytesSizer{} + require.Equal(t, 545, sizer.LogsSize(ld)) + + rl := ld.ResourceLogs().At(0) + require.Equal(t, 542, sizer.ResourceLogsSize(rl)) + + sl := rl.ScopeLogs().At(0) + require.Equal(t, 497, sizer.ScopeLogsSize(sl)) + + require.Equal(t, 109, sizer.LogRecordSize(sl.LogRecords().At(0))) + require.Equal(t, 79, sizer.LogRecordSize(sl.LogRecords().At(1))) + require.Equal(t, 109, sizer.LogRecordSize(sl.LogRecords().At(2))) + require.Equal(t, 79, sizer.LogRecordSize(sl.LogRecords().At(3))) + require.Equal(t, 109, sizer.LogRecordSize(sl.LogRecords().At(4))) + + prevSize := sizer.ScopeLogsSize(sl) + lr := sl.LogRecords().At(2) + lr.CopyTo(sl.LogRecords().AppendEmpty()) + require.Equal(t, sizer.ScopeLogsSize(sl), prevSize+sizer.DeltaSize(sizer.LogRecordSize(lr))) +} diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index 8ca1e55790e..30fbb4fa2bb 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer" "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pipeline" @@ -25,16 +26,16 @@ var ( ) type logsRequest struct { - ld plog.Logs - pusher consumer.ConsumeLogsFunc - cachedItemsCount int + ld plog.Logs + pusher consumer.ConsumeLogsFunc + cachedSize int } func newLogsRequest(ld plog.Logs, pusher consumer.ConsumeLogsFunc) Request { return &logsRequest{ - ld: ld, - pusher: pusher, - cachedItemsCount: ld.LogRecordCount(), + ld: ld, + pusher: pusher, + cachedSize: -1, } } @@ -65,11 +66,18 @@ func (req *logsRequest) Export(ctx context.Context) error { } func (req *logsRequest) ItemsCount() int { - return req.cachedItemsCount + return req.ld.LogRecordCount() } -func (req *logsRequest) setCachedItemsCount(count int) { - req.cachedItemsCount = count +func (req *logsRequest) Size(sizer sizer.LogsSizer) int { + if req.cachedSize == -1 { + req.cachedSize = sizer.LogsSize(req.ld) + } + return req.cachedSize +} + +func (req *logsRequest) setCachedSize(size int) { + req.cachedSize = size } type logsExporter struct { diff --git a/exporter/exporterhelper/logs_batch.go b/exporter/exporterhelper/logs_batch.go index 0d97239a31b..4849c8eba9c 100644 --- a/exporter/exporterhelper/logs_batch.go +++ b/exporter/exporterhelper/logs_batch.go @@ -8,57 +8,78 @@ import ( "errors" "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer" "go.opentelemetry.io/collector/pdata/plog" ) +func sizerFromConfig(cfg exporterbatcher.SizeConfig) sizer.LogsSizer { + switch cfg.Sizer { + case exporterbatcher.SizerTypeItems: + return &sizer.LogsCountSizer{} + case exporterbatcher.SizerTypeBytes: + return &sizer.LogsBytesSizer{} + default: + return &sizer.LogsCountSizer{} + } +} + // MergeSplit splits and/or merges the provided logs request and the current request into one or more requests // conforming with the MaxSizeConfig. func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.SizeConfig, r2 Request) ([]Request, error) { + sizer := sizerFromConfig(cfg) if r2 != nil { req2, ok := r2.(*logsRequest) if !ok { return nil, errors.New("invalid input type") } - req2.mergeTo(req) + req2.mergeTo(req, sizer) } // If no limit we can simply merge the new request into the current and return. if cfg.MaxSize == 0 { return []Request{req}, nil } - return req.split(cfg) + + return req.split(cfg.MaxSize, sizer), nil } -func (req *logsRequest) mergeTo(dst *logsRequest) { - dst.setCachedItemsCount(dst.ItemsCount() + req.ItemsCount()) - req.setCachedItemsCount(0) +func (req *logsRequest) mergeTo(dst *logsRequest, sizer sizer.LogsSizer) { + if sizer != nil { + dst.setCachedSize(dst.Size(sizer) + req.Size(sizer)) + req.setCachedSize(0) + } req.ld.ResourceLogs().MoveAndAppendTo(dst.ld.ResourceLogs()) } -func (req *logsRequest) split(cfg exporterbatcher.SizeConfig) ([]Request, error) { +func (req *logsRequest) split(maxSize int, sizer sizer.LogsSizer) []Request { var res []Request - for req.ItemsCount() > cfg.MaxSize { - ld := extractLogs(req.ld, cfg.MaxSize) - size := ld.LogRecordCount() - req.setCachedItemsCount(req.ItemsCount() - size) - res = append(res, &logsRequest{ld: ld, pusher: req.pusher, cachedItemsCount: size}) + for req.Size(sizer) > maxSize { + ld := extractLogs(req.ld, maxSize, sizer) + size := sizer.LogsSize(ld) + req.setCachedSize(req.Size(sizer) - size) + res = append(res, &logsRequest{ld: ld, pusher: req.pusher, cachedSize: size}) } res = append(res, req) - return res, nil + return res } // extractLogs extracts logs from the input logs and returns a new logs with the specified number of log records. -func extractLogs(srcLogs plog.Logs, count int) plog.Logs { +func extractLogs(srcLogs plog.Logs, capacity int, sizer sizer.LogsSizer) plog.Logs { + capacityReached := false destLogs := plog.NewLogs() + capacityLeft := capacity - sizer.LogsSize(destLogs) srcLogs.ResourceLogs().RemoveIf(func(srcRL plog.ResourceLogs) bool { - if count == 0 { + if capacityReached { return false } - needToExtract := resourceLogsCount(srcRL) > count + needToExtract := sizer.ResourceLogsSize(srcRL) > capacityLeft if needToExtract { - srcRL = extractResourceLogs(srcRL, count) + srcRL, capacityReached = extractResourceLogs(srcRL, capacityLeft, sizer) + if srcRL.ScopeLogs().Len() == 0 { + return false + } } - count -= resourceLogsCount(srcRL) + capacityLeft -= sizer.DeltaSize(sizer.ResourceLogsSize(srcRL)) srcRL.MoveTo(destLogs.ResourceLogs().AppendEmpty()) return !needToExtract }) @@ -66,46 +87,45 @@ func extractLogs(srcLogs plog.Logs, count int) plog.Logs { } // extractResourceLogs extracts resource logs and returns a new resource logs with the specified number of log records. -func extractResourceLogs(srcRL plog.ResourceLogs, count int) plog.ResourceLogs { +func extractResourceLogs(srcRL plog.ResourceLogs, capacity int, sizer sizer.LogsSizer) (plog.ResourceLogs, bool) { + capacityReached := false destRL := plog.NewResourceLogs() destRL.SetSchemaUrl(srcRL.SchemaUrl()) srcRL.Resource().CopyTo(destRL.Resource()) + capacityLeft := capacity - sizer.ResourceLogsSize(destRL) srcRL.ScopeLogs().RemoveIf(func(srcSL plog.ScopeLogs) bool { - if count == 0 { + if capacityReached { return false } - needToExtract := srcSL.LogRecords().Len() > count + needToExtract := sizer.ScopeLogsSize(srcSL) > capacityLeft if needToExtract { - srcSL = extractScopeLogs(srcSL, count) + srcSL, capacityReached = extractScopeLogs(srcSL, capacityLeft, sizer) + if srcSL.LogRecords().Len() == 0 { + return false + } } - count -= srcSL.LogRecords().Len() + capacityLeft -= sizer.DeltaSize(sizer.ScopeLogsSize(srcSL)) srcSL.MoveTo(destRL.ScopeLogs().AppendEmpty()) return !needToExtract }) - return destRL + return destRL, capacityReached } // extractScopeLogs extracts scope logs and returns a new scope logs with the specified number of log records. -func extractScopeLogs(srcSL plog.ScopeLogs, count int) plog.ScopeLogs { +func extractScopeLogs(srcSL plog.ScopeLogs, capacity int, sizer sizer.LogsSizer) (plog.ScopeLogs, bool) { + capacityReached := false destSL := plog.NewScopeLogs() destSL.SetSchemaUrl(srcSL.SchemaUrl()) srcSL.Scope().CopyTo(destSL.Scope()) + capacityLeft := capacity - sizer.ScopeLogsSize(destSL) srcSL.LogRecords().RemoveIf(func(srcLR plog.LogRecord) bool { - if count == 0 { + if capacityReached || sizer.LogRecordSize(srcLR) > capacityLeft { + capacityReached = true return false } + capacityLeft -= sizer.DeltaSize(sizer.LogRecordSize(srcLR)) srcLR.MoveTo(destSL.LogRecords().AppendEmpty()) - count-- return true }) - return destSL -} - -// resourceLogsCount calculates the total number of log records in the plog.ResourceLogs. -func resourceLogsCount(rl plog.ResourceLogs) int { - count := 0 - for k := 0; k < rl.ScopeLogs().Len(); k++ { - count += rl.ScopeLogs().At(k).LogRecords().Len() - } - return count + return destSL, capacityReached } diff --git a/exporter/exporterhelper/logs_batch_test.go b/exporter/exporterhelper/logs_batch_test.go index d4ae3a68c45..635faa54209 100644 --- a/exporter/exporterhelper/logs_batch_test.go +++ b/exporter/exporterhelper/logs_batch_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/exporter/exporterbatcher" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/testdata" ) @@ -35,8 +35,8 @@ func TestMergeSplitLogs(t *testing.T) { tests := []struct { name string cfg exporterbatcher.SizeConfig - lr1 request.Request - lr2 request.Request + lr1 Request + lr2 Request expected []Request }{ { @@ -123,7 +123,108 @@ func TestMergeSplitLogs(t *testing.T) { require.NoError(t, err) assert.Equal(t, len(tt.expected), len(res)) for i := range res { - assert.Equal(t, tt.expected[i], res[i]) + assert.Equal(t, tt.expected[i].(*logsRequest).ld, res[i].(*logsRequest).ld) + } + }) + } +} + +func TestMergeSplitLogsBasedOnByteSize(t *testing.T) { + tests := []struct { + name string + cfg exporterbatcher.SizeConfig + lr1 Request + lr2 Request + expected []Request + }{ + { + name: "both_requests_empty", + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: logsMarshaler.LogsSize(testdata.GenerateLogs(10))}, + lr1: newLogsRequest(plog.NewLogs(), nil), + lr2: newLogsRequest(plog.NewLogs(), nil), + expected: []Request{newLogsRequest(plog.NewLogs(), nil)}, + }, + { + name: "first_request_empty", + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: logsMarshaler.LogsSize(testdata.GenerateLogs(10))}, + lr1: newLogsRequest(plog.NewLogs(), nil), + lr2: newLogsRequest(testdata.GenerateLogs(5), nil), + expected: []Request{newLogsRequest(testdata.GenerateLogs(5), nil)}, + }, + { + name: "first_empty_second_nil", + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: logsMarshaler.LogsSize(testdata.GenerateLogs(10))}, + lr1: newLogsRequest(plog.NewLogs(), nil), + lr2: nil, + expected: []Request{newLogsRequest(plog.NewLogs(), nil)}, + }, + { + name: "merge_only", + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: logsMarshaler.LogsSize(testdata.GenerateLogs(11))}, + lr1: newLogsRequest(testdata.GenerateLogs(4), nil), + lr2: newLogsRequest(testdata.GenerateLogs(6), nil), + expected: []Request{newLogsRequest(func() plog.Logs { + logs := testdata.GenerateLogs(4) + testdata.GenerateLogs(6).ResourceLogs().MoveAndAppendTo(logs.ResourceLogs()) + return logs + }(), nil)}, + }, + { + name: "split_only", + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: logsMarshaler.LogsSize(testdata.GenerateLogs(4))}, + lr1: newLogsRequest(plog.NewLogs(), nil), + lr2: newLogsRequest(testdata.GenerateLogs(10), nil), + expected: []Request{ + newLogsRequest(testdata.GenerateLogs(4), nil), + newLogsRequest(testdata.GenerateLogs(4), nil), + newLogsRequest(testdata.GenerateLogs(2), nil), + }, + }, + { + name: "merge_and_split", + cfg: exporterbatcher.SizeConfig{ + Sizer: exporterbatcher.SizerTypeBytes, + MaxSize: logsMarshaler.LogsSize(testdata.GenerateLogs(10))/2 + logsMarshaler.LogsSize(testdata.GenerateLogs(11))/2, + }, + lr1: newLogsRequest(testdata.GenerateLogs(8), nil), + lr2: newLogsRequest(testdata.GenerateLogs(20), nil), + expected: []Request{ + newLogsRequest(func() plog.Logs { + logs := testdata.GenerateLogs(8) + testdata.GenerateLogs(2).ResourceLogs().MoveAndAppendTo(logs.ResourceLogs()) + return logs + }(), nil), + newLogsRequest(testdata.GenerateLogs(10), nil), + newLogsRequest(testdata.GenerateLogs(8), nil), + }, + }, + { + name: "scope_logs_split", + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: logsMarshaler.LogsSize(testdata.GenerateLogs(4))}, + lr1: newLogsRequest(func() plog.Logs { + ld := testdata.GenerateLogs(4) + ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("extra log") + return ld + }(), nil), + lr2: newLogsRequest(testdata.GenerateLogs(2), nil), + expected: []Request{ + newLogsRequest(testdata.GenerateLogs(4), nil), + newLogsRequest(func() plog.Logs { + ld := testdata.GenerateLogs(0) + ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().AppendEmpty().Body().SetStr("extra log") + testdata.GenerateLogs(2).ResourceLogs().MoveAndAppendTo(ld.ResourceLogs()) + return ld + }(), nil), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res, err := tt.lr1.MergeSplit(context.Background(), tt.cfg, tt.lr2) + require.NoError(t, err) + assert.Equal(t, len(tt.expected), len(res)) + for i := range res { + assert.Equal(t, tt.expected[i].(*logsRequest).ld, res[i].(*logsRequest).ld) } }) } @@ -147,7 +248,7 @@ func TestMergeSplitLogsInvalidInput(t *testing.T) { func TestExtractLogs(t *testing.T) { for i := 0; i < 10; i++ { ld := testdata.GenerateLogs(10) - extractedLogs := extractLogs(ld, i) + extractedLogs := extractLogs(ld, i, &sizer.LogsCountSizer{}) assert.Equal(t, i, extractedLogs.LogRecordCount()) assert.Equal(t, 10-i, ld.LogRecordCount()) } @@ -180,6 +281,21 @@ func BenchmarkSplittingBasedOnItemCountManySmallLogs(b *testing.B) { } } +func BenchmarkSplittingBasedOnByteSizeManySmallLogs(b *testing.B) { + // All requests merge into a single batch. + cfg := exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: logsMarshaler.LogsSize(testdata.GenerateLogs(11000))} + b.ReportAllocs() + for i := 0; i < b.N; i++ { + merged := []Request{newLogsRequest(testdata.GenerateLogs(10), nil)} + for j := 0; j < 1000; j++ { + lr2 := newLogsRequest(testdata.GenerateLogs(10), nil) + res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2) + merged = append(merged[0:len(merged)-1], res...) + } + assert.Len(b, merged, 1) + } +} + func BenchmarkSplittingBasedOnItemCountManyLogsSlightlyAboveLimit(b *testing.B) { // Every incoming request results in a split. cfg := exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10000} @@ -195,6 +311,22 @@ func BenchmarkSplittingBasedOnItemCountManyLogsSlightlyAboveLimit(b *testing.B) } } +func BenchmarkSplittingBasedOnByteSizeManyLogsSlightlyAboveLimit(b *testing.B) { + // Every incoming request results in a split. + cfg := exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: logsMarshaler.LogsSize(testdata.GenerateLogs(10000))} + b.ReportAllocs() + for i := 0; i < b.N; i++ { + merged := []Request{newLogsRequest(testdata.GenerateLogs(0), nil)} + for j := 0; j < 10; j++ { + lr2 := newLogsRequest(testdata.GenerateLogs(10001), nil) + res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2) + assert.Len(b, res, 2) + merged = append(merged[0:len(merged)-1], res...) + } + assert.Len(b, merged, 11) + } +} + func BenchmarkSplittingBasedOnItemCountHugeLogs(b *testing.B) { // One request splits into many batches. cfg := exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeItems, MaxSize: 10000} @@ -207,3 +339,16 @@ func BenchmarkSplittingBasedOnItemCountHugeLogs(b *testing.B) { assert.Len(b, merged, 10) } } + +func BenchmarkSplittingBasedOnByteSizeHugeLogs(b *testing.B) { + // One request splits into many batches. + cfg := exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: logsMarshaler.LogsSize(testdata.GenerateLogs(10010))} + b.ReportAllocs() + for i := 0; i < b.N; i++ { + merged := []Request{newLogsRequest(testdata.GenerateLogs(0), nil)} + lr2 := newLogsRequest(testdata.GenerateLogs(100000), nil) + res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2) + merged = append(merged[0:len(merged)-1], res...) + assert.Len(b, merged, 10) + } +} diff --git a/pdata/plog/pb.go b/pdata/plog/pb.go index bb102591bf2..a4cb09eb6ea 100644 --- a/pdata/plog/pb.go +++ b/pdata/plog/pb.go @@ -22,6 +22,18 @@ func (e *ProtoMarshaler) LogsSize(ld Logs) int { return pb.Size() } +func (e *ProtoMarshaler) ResourceLogsSize(rl ResourceLogs) int { + return rl.orig.Size() +} + +func (e *ProtoMarshaler) ScopeLogsSize(sl ScopeLogs) int { + return sl.orig.Size() +} + +func (e *ProtoMarshaler) LogRecordSize(lr LogRecord) int { + return lr.orig.Size() +} + var _ Unmarshaler = (*ProtoUnmarshaler)(nil) type ProtoUnmarshaler struct{}