Skip to content

Commit

Permalink
[exporter][batching] Serialized bytes based batching for logs (#12299)
Browse files Browse the repository at this point in the history
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

This PR implements serialized bytes based batching.

<!-- Issue number if applicable -->
#### Link to tracking issue

#3262

<!--Describe what testing was performed and which tests were added.-->
#### Testing

```
BenchmarkSplittingBasedOnItemCountManySmallLogs-10                    	     409	   2822478 ns/op	 3148718 B/op	   71082 allocs/op
BenchmarkSplittingBasedOnByteSizeManySmallLogs-10                     	     412	   2890594 ns/op	 3156103 B/op	   71242 allocs/op
BenchmarkSplittingBasedOnItemCountManyLogsSlightlyAboveLimit-10       	      38	  27579702 ns/op	43531582 B/op	  700471 allocs/op
BenchmarkSplittingBasedOnByteSizeManyLogsSlightlyAboveLimit-10        	      24	  43067219 ns/op	43639889 B/op	  702927 allocs/op
BenchmarkSplittingBasedOnItemCountHugeLogs-10                         	      40	  29722710 ns/op	41922925 B/op	  690300 allocs/op
BenchmarkSplittingBasedOnByteSizeHugeLogs-10                          	      16	  64321198 ns/op	42107144 B/op	  694144 allocs/op
```

#### Caveat
We book keep both item count and byte size regardless of batching
option, which adds an overhead in both cases.

Co-authored-by: Dmitrii Anoshin <[email protected]>
  • Loading branch information
sfc-gh-sili and dmitryax authored Feb 26, 2025
1 parent b3b28ed commit 66d83f0
Show file tree
Hide file tree
Showing 7 changed files with 366 additions and 51 deletions.
7 changes: 6 additions & 1 deletion exporter/exporterbatcher/sizer_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,20 @@ type SizerType struct {

const (
sizerTypeItems = "items"
sizerTypeBytes = "bytes"
)

var SizerTypeItems = SizerType{val: sizerTypeItems}
var (
SizerTypeItems = SizerType{val: sizerTypeItems}
SizerTypeBytes = SizerType{val: sizerTypeBytes}
)

// UnmarshalText implements TextUnmarshaler interface.
func (s *SizerType) UnmarshalText(text []byte) error {
switch str := string(text); str {
case sizerTypeItems:
*s = SizerTypeItems
// TODO support setting sizer to SizerTypeBytes when all logs, traces, and metrics batching support it
default:
return fmt.Errorf("invalid sizer: %q", str)
}
Expand Down
68 changes: 68 additions & 0 deletions exporter/exporterhelper/internal/sizer/logs_sizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"

import (
math_bits "math/bits"

"go.opentelemetry.io/collector/pdata/plog"
)

type LogsSizer interface {
LogsSize(ld plog.Logs) int
ResourceLogsSize(rl plog.ResourceLogs) int
ScopeLogsSize(sl plog.ScopeLogs) int
LogRecordSize(lr plog.LogRecord) int

// DeltaSize() returns the delta size when a ResourceLog, ScopeLog or LogRecord is added.
DeltaSize(newItemSize int) int
}

// LogsByteSizer returns the byte size of serialized protos.
type LogsBytesSizer struct {
plog.ProtoMarshaler
}

// DeltaSize() returns the delta size of a proto slice when a new item is added.
// Example:
//
// prevSize := proto1.Size()
// proto1.RepeatedField().AppendEmpty() = proto2
//
// Then currSize of proto1 can be calculated as
//
// currSize := (prevSize + sizer.DeltaSize(proto2.Size()))
//
// This is derived from opentelemetry-collector/pdata/internal/data/protogen/logs/v1/logs.pb.go
// which is generated with gogo/protobuf.
func (s *LogsBytesSizer) DeltaSize(newItemSize int) int {
return 1 + newItemSize + math_bits.Len64(uint64(newItemSize|1)+6)/7 //nolint:gosec // disable G115
}

// LogsCountSizer returns the nunmber of logs entries.
type LogsCountSizer struct{}

func (s *LogsCountSizer) LogsSize(ld plog.Logs) int {
return ld.LogRecordCount()
}

func (s *LogsCountSizer) ResourceLogsSize(rl plog.ResourceLogs) int {
count := 0
for k := 0; k < rl.ScopeLogs().Len(); k++ {
count += rl.ScopeLogs().At(k).LogRecords().Len()
}
return count
}

func (s *LogsCountSizer) ScopeLogsSize(sl plog.ScopeLogs) int {
return sl.LogRecords().Len()
}

func (s *LogsCountSizer) LogRecordSize(_ plog.LogRecord) int {
return 1
}

func (s *LogsCountSizer) DeltaSize(newItemSize int) int {
return newItemSize
}
57 changes: 57 additions & 0 deletions exporter/exporterhelper/internal/sizer/logs_sizer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"

import (
"testing"

"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/pdata/testdata"
)

func TestLogsCountSizer(t *testing.T) {
ld := testdata.GenerateLogs(5)
sizer := LogsCountSizer{}
require.Equal(t, 5, sizer.LogsSize(ld))

rl := ld.ResourceLogs().At(0)
require.Equal(t, 5, sizer.ResourceLogsSize(rl))

sl := rl.ScopeLogs().At(0)
require.Equal(t, 5, sizer.ScopeLogsSize(sl))

require.Equal(t, 1, sizer.LogRecordSize(sl.LogRecords().At(0)))
require.Equal(t, 1, sizer.LogRecordSize(sl.LogRecords().At(1)))
require.Equal(t, 1, sizer.LogRecordSize(sl.LogRecords().At(2)))
require.Equal(t, 1, sizer.LogRecordSize(sl.LogRecords().At(3)))
require.Equal(t, 1, sizer.LogRecordSize(sl.LogRecords().At(4)))

prevSize := sizer.ScopeLogsSize(sl)
lr := sl.LogRecords().At(2)
lr.CopyTo(sl.LogRecords().AppendEmpty())
require.Equal(t, sizer.ScopeLogsSize(sl), prevSize+sizer.DeltaSize(sizer.LogRecordSize(lr)))
}

func TestLogsBytesSizer(t *testing.T) {
ld := testdata.GenerateLogs(5)
sizer := LogsBytesSizer{}
require.Equal(t, 545, sizer.LogsSize(ld))

rl := ld.ResourceLogs().At(0)
require.Equal(t, 542, sizer.ResourceLogsSize(rl))

sl := rl.ScopeLogs().At(0)
require.Equal(t, 497, sizer.ScopeLogsSize(sl))

require.Equal(t, 109, sizer.LogRecordSize(sl.LogRecords().At(0)))
require.Equal(t, 79, sizer.LogRecordSize(sl.LogRecords().At(1)))
require.Equal(t, 109, sizer.LogRecordSize(sl.LogRecords().At(2)))
require.Equal(t, 79, sizer.LogRecordSize(sl.LogRecords().At(3)))
require.Equal(t, 109, sizer.LogRecordSize(sl.LogRecords().At(4)))

prevSize := sizer.ScopeLogsSize(sl)
lr := sl.LogRecords().At(2)
lr.CopyTo(sl.LogRecords().AppendEmpty())
require.Equal(t, sizer.ScopeLogsSize(sl), prevSize+sizer.DeltaSize(sizer.LogRecordSize(lr)))
}
26 changes: 17 additions & 9 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pipeline"
Expand All @@ -25,16 +26,16 @@ var (
)

type logsRequest struct {
ld plog.Logs
pusher consumer.ConsumeLogsFunc
cachedItemsCount int
ld plog.Logs
pusher consumer.ConsumeLogsFunc
cachedSize int
}

func newLogsRequest(ld plog.Logs, pusher consumer.ConsumeLogsFunc) Request {
return &logsRequest{
ld: ld,
pusher: pusher,
cachedItemsCount: ld.LogRecordCount(),
ld: ld,
pusher: pusher,
cachedSize: -1,
}
}

Expand Down Expand Up @@ -65,11 +66,18 @@ func (req *logsRequest) Export(ctx context.Context) error {
}

func (req *logsRequest) ItemsCount() int {
return req.cachedItemsCount
return req.ld.LogRecordCount()
}

func (req *logsRequest) setCachedItemsCount(count int) {
req.cachedItemsCount = count
func (req *logsRequest) Size(sizer sizer.LogsSizer) int {
if req.cachedSize == -1 {
req.cachedSize = sizer.LogsSize(req.ld)
}
return req.cachedSize
}

func (req *logsRequest) setCachedSize(size int) {
req.cachedSize = size
}

type logsExporter struct {
Expand Down
92 changes: 56 additions & 36 deletions exporter/exporterhelper/logs_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,104 +8,124 @@ import (
"errors"

"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
"go.opentelemetry.io/collector/pdata/plog"
)

func sizerFromConfig(cfg exporterbatcher.SizeConfig) sizer.LogsSizer {
switch cfg.Sizer {
case exporterbatcher.SizerTypeItems:
return &sizer.LogsCountSizer{}
case exporterbatcher.SizerTypeBytes:
return &sizer.LogsBytesSizer{}
default:
return &sizer.LogsCountSizer{}
}
}

// MergeSplit splits and/or merges the provided logs request and the current request into one or more requests
// conforming with the MaxSizeConfig.
func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.SizeConfig, r2 Request) ([]Request, error) {
sizer := sizerFromConfig(cfg)
if r2 != nil {
req2, ok := r2.(*logsRequest)
if !ok {
return nil, errors.New("invalid input type")
}
req2.mergeTo(req)
req2.mergeTo(req, sizer)
}

// If no limit we can simply merge the new request into the current and return.
if cfg.MaxSize == 0 {
return []Request{req}, nil
}
return req.split(cfg)

return req.split(cfg.MaxSize, sizer), nil
}

func (req *logsRequest) mergeTo(dst *logsRequest) {
dst.setCachedItemsCount(dst.ItemsCount() + req.ItemsCount())
req.setCachedItemsCount(0)
func (req *logsRequest) mergeTo(dst *logsRequest, sizer sizer.LogsSizer) {
if sizer != nil {
dst.setCachedSize(dst.Size(sizer) + req.Size(sizer))
req.setCachedSize(0)
}
req.ld.ResourceLogs().MoveAndAppendTo(dst.ld.ResourceLogs())
}

func (req *logsRequest) split(cfg exporterbatcher.SizeConfig) ([]Request, error) {
func (req *logsRequest) split(maxSize int, sizer sizer.LogsSizer) []Request {
var res []Request
for req.ItemsCount() > cfg.MaxSize {
ld := extractLogs(req.ld, cfg.MaxSize)
size := ld.LogRecordCount()
req.setCachedItemsCount(req.ItemsCount() - size)
res = append(res, &logsRequest{ld: ld, pusher: req.pusher, cachedItemsCount: size})
for req.Size(sizer) > maxSize {
ld := extractLogs(req.ld, maxSize, sizer)
size := sizer.LogsSize(ld)
req.setCachedSize(req.Size(sizer) - size)
res = append(res, &logsRequest{ld: ld, pusher: req.pusher, cachedSize: size})
}
res = append(res, req)
return res, nil
return res
}

// extractLogs extracts logs from the input logs and returns a new logs with the specified number of log records.
func extractLogs(srcLogs plog.Logs, count int) plog.Logs {
func extractLogs(srcLogs plog.Logs, capacity int, sizer sizer.LogsSizer) plog.Logs {
capacityReached := false
destLogs := plog.NewLogs()
capacityLeft := capacity - sizer.LogsSize(destLogs)
srcLogs.ResourceLogs().RemoveIf(func(srcRL plog.ResourceLogs) bool {
if count == 0 {
if capacityReached {
return false
}
needToExtract := resourceLogsCount(srcRL) > count
needToExtract := sizer.ResourceLogsSize(srcRL) > capacityLeft
if needToExtract {
srcRL = extractResourceLogs(srcRL, count)
srcRL, capacityReached = extractResourceLogs(srcRL, capacityLeft, sizer)
if srcRL.ScopeLogs().Len() == 0 {
return false
}
}
count -= resourceLogsCount(srcRL)
capacityLeft -= sizer.DeltaSize(sizer.ResourceLogsSize(srcRL))
srcRL.MoveTo(destLogs.ResourceLogs().AppendEmpty())
return !needToExtract
})
return destLogs
}

// extractResourceLogs extracts resource logs and returns a new resource logs with the specified number of log records.
func extractResourceLogs(srcRL plog.ResourceLogs, count int) plog.ResourceLogs {
func extractResourceLogs(srcRL plog.ResourceLogs, capacity int, sizer sizer.LogsSizer) (plog.ResourceLogs, bool) {
capacityReached := false
destRL := plog.NewResourceLogs()
destRL.SetSchemaUrl(srcRL.SchemaUrl())
srcRL.Resource().CopyTo(destRL.Resource())
capacityLeft := capacity - sizer.ResourceLogsSize(destRL)
srcRL.ScopeLogs().RemoveIf(func(srcSL plog.ScopeLogs) bool {
if count == 0 {
if capacityReached {
return false
}
needToExtract := srcSL.LogRecords().Len() > count
needToExtract := sizer.ScopeLogsSize(srcSL) > capacityLeft
if needToExtract {
srcSL = extractScopeLogs(srcSL, count)
srcSL, capacityReached = extractScopeLogs(srcSL, capacityLeft, sizer)
if srcSL.LogRecords().Len() == 0 {
return false
}
}
count -= srcSL.LogRecords().Len()
capacityLeft -= sizer.DeltaSize(sizer.ScopeLogsSize(srcSL))
srcSL.MoveTo(destRL.ScopeLogs().AppendEmpty())
return !needToExtract
})
return destRL
return destRL, capacityReached
}

// extractScopeLogs extracts scope logs and returns a new scope logs with the specified number of log records.
func extractScopeLogs(srcSL plog.ScopeLogs, count int) plog.ScopeLogs {
func extractScopeLogs(srcSL plog.ScopeLogs, capacity int, sizer sizer.LogsSizer) (plog.ScopeLogs, bool) {
capacityReached := false
destSL := plog.NewScopeLogs()
destSL.SetSchemaUrl(srcSL.SchemaUrl())
srcSL.Scope().CopyTo(destSL.Scope())
capacityLeft := capacity - sizer.ScopeLogsSize(destSL)
srcSL.LogRecords().RemoveIf(func(srcLR plog.LogRecord) bool {
if count == 0 {
if capacityReached || sizer.LogRecordSize(srcLR) > capacityLeft {
capacityReached = true
return false
}
capacityLeft -= sizer.DeltaSize(sizer.LogRecordSize(srcLR))
srcLR.MoveTo(destSL.LogRecords().AppendEmpty())
count--
return true
})
return destSL
}

// resourceLogsCount calculates the total number of log records in the plog.ResourceLogs.
func resourceLogsCount(rl plog.ResourceLogs) int {
count := 0
for k := 0; k < rl.ScopeLogs().Len(); k++ {
count += rl.ScopeLogs().At(k).LogRecords().Len()
}
return count
return destSL, capacityReached
}
Loading

0 comments on commit 66d83f0

Please sign in to comment.