diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ebc9088c24..b02afc58c7a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ ### 🧰 Bug fixes 🧰 - Fixes the "service.version" label value for internal metrics, always was "latest" in core/contrib distros. (#5449). +- Send correct batch stats when SendBatchMaxSize is set (#5385) ## v0.52.0 Beta diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index abdaf532d2e..11be58beabf 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -59,7 +59,7 @@ type batchProcessor struct { type batch interface { // export the current batch - export(ctx context.Context, sendBatchMaxSize int) error + export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (sentBatchSize int, sentBatchBytes int, err error) // itemCount returns the size of the current batch itemCount() int @@ -175,15 +175,16 @@ func (bp *batchProcessor) resetTimer() { } func (bp *batchProcessor) sendItems(triggerMeasure *stats.Int64Measure) { - // Add that it came form the trace pipeline? - stats.Record(bp.exportCtx, triggerMeasure.M(1), statBatchSendSize.M(int64(bp.batch.itemCount()))) - - if bp.telemetryLevel == configtelemetry.LevelDetailed { - stats.Record(bp.exportCtx, statBatchSendSizeBytes.M(int64(bp.batch.size()))) - } - - if err := bp.batch.export(bp.exportCtx, bp.sendBatchMaxSize); err != nil { + detailed := bp.telemetryLevel == configtelemetry.LevelDetailed + sent, bytes, err := bp.batch.export(bp.exportCtx, bp.sendBatchMaxSize, detailed) + if err != nil { bp.logger.Warn("Sender failed", zap.Error(err)) + } else { + // Add that it came form the trace pipeline? + stats.Record(bp.exportCtx, triggerMeasure.M(1), statBatchSendSize.M(int64(sent))) + if detailed { + stats.Record(bp.exportCtx, statBatchSendSizeBytes.M(int64(bytes))) + } } } @@ -244,17 +245,24 @@ func (bt *batchTraces) add(item interface{}) { td.ResourceSpans().MoveAndAppendTo(bt.traceData.ResourceSpans()) } -func (bt *batchTraces) export(ctx context.Context, sendBatchMaxSize int) error { +func (bt *batchTraces) export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, error) { var req ptrace.Traces + var sent int + var bytes int if sendBatchMaxSize > 0 && bt.itemCount() > sendBatchMaxSize { req = splitTraces(sendBatchMaxSize, bt.traceData) bt.spanCount -= sendBatchMaxSize + sent = sendBatchMaxSize } else { req = bt.traceData + sent = bt.spanCount bt.traceData = ptrace.NewTraces() bt.spanCount = 0 } - return bt.nextConsumer.ConsumeTraces(ctx, req) + if returnBytes { + bytes = bt.sizer.TracesSize(req) + } + return sent, bytes, bt.nextConsumer.ConsumeTraces(ctx, req) } func (bt *batchTraces) itemCount() int { @@ -276,17 +284,24 @@ func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics { return &batchMetrics{nextConsumer: nextConsumer, metricData: pmetric.NewMetrics(), sizer: pmetric.NewProtoMarshaler().(pmetric.Sizer)} } -func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int) error { +func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, error) { var req pmetric.Metrics + var sent int + var bytes int if sendBatchMaxSize > 0 && bm.dataPointCount > sendBatchMaxSize { req = splitMetrics(sendBatchMaxSize, bm.metricData) bm.dataPointCount -= sendBatchMaxSize + sent = sendBatchMaxSize } else { req = bm.metricData + sent = bm.dataPointCount bm.metricData = pmetric.NewMetrics() bm.dataPointCount = 0 } - return bm.nextConsumer.ConsumeMetrics(ctx, req) + if returnBytes { + bytes = bm.sizer.MetricsSize(req) + } + return sent, bytes, bm.nextConsumer.ConsumeMetrics(ctx, req) } func (bm *batchMetrics) itemCount() int { @@ -319,17 +334,24 @@ func newBatchLogs(nextConsumer consumer.Logs) *batchLogs { return &batchLogs{nextConsumer: nextConsumer, logData: plog.NewLogs(), sizer: plog.NewProtoMarshaler().(plog.Sizer)} } -func (bl *batchLogs) export(ctx context.Context, sendBatchMaxSize int) error { +func (bl *batchLogs) export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, error) { var req plog.Logs + var sent int + var bytes int if sendBatchMaxSize > 0 && bl.logCount > sendBatchMaxSize { req = splitLogs(sendBatchMaxSize, bl.logData) bl.logCount -= sendBatchMaxSize + sent = sendBatchMaxSize } else { req = bl.logData + sent = bl.logCount bl.logData = plog.NewLogs() bl.logCount = 0 } - return bl.nextConsumer.ConsumeLogs(ctx, req) + if returnBytes { + bytes = bl.sizer.LogsSize(req) + } + return sent, bytes, bl.nextConsumer.ConsumeLogs(ctx, req) } func (bl *batchLogs) itemCount() int { diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index df85d4cc13a..f0e45b83d23 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -17,6 +17,7 @@ package batchprocessor import ( "context" "fmt" + "math" "sync" "testing" "time" @@ -183,6 +184,55 @@ func TestBatchProcessorSentBySize(t *testing.T) { assert.Equal(t, sizeSum, int(distData.Sum())) } +func TestBatchProcessorSentBySize_withMaxSize(t *testing.T) { + views := MetricViews() + require.NoError(t, view.Register(views...)) + defer view.Unregister(views...) + + sink := new(consumertest.TracesSink) + cfg := createDefaultConfig().(*Config) + sendBatchSize := 20 + sendBatchMaxSize := 37 + cfg.SendBatchSize = uint32(sendBatchSize) + cfg.SendBatchMaxSize = uint32(sendBatchMaxSize) + cfg.Timeout = 500 * time.Millisecond + creationSet := componenttest.NewNopProcessorCreateSettings() + batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, configtelemetry.LevelDetailed) + require.NoError(t, err) + require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + + requestCount := 1 + spansPerRequest := 500 + totalSpans := requestCount * spansPerRequest + + start := time.Now() + for requestNum := 0; requestNum < requestCount; requestNum++ { + td := testdata.GenerateTracesManySpansSameResource(spansPerRequest) + assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + } + + require.NoError(t, batcher.Shutdown(context.Background())) + + elapsed := time.Since(start) + require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds()) + + // The max batch size is not a divisor of the total number of spans + expectedBatchesNum := int(math.Ceil(float64(totalSpans) / float64(sendBatchMaxSize))) + + require.Equal(t, totalSpans, sink.SpanCount()) + receivedTraces := sink.AllTraces() + require.EqualValues(t, expectedBatchesNum, len(receivedTraces)) + + viewData, err := view.RetrieveData("processor/batch/" + statBatchSendSize.Name()) + require.NoError(t, err) + assert.Equal(t, 1, len(viewData)) + distData := viewData[0].Data.(*view.DistributionData) + assert.Equal(t, int64(expectedBatchesNum), distData.Count) + assert.Equal(t, sink.SpanCount(), int(distData.Sum())) + assert.Equal(t, totalSpans%sendBatchMaxSize, int(distData.Min)) + assert.Equal(t, sendBatchMaxSize, int(distData.Max)) +} + func TestBatchProcessorSentByTimeout(t *testing.T) { sink := new(consumertest.TracesSink) cfg := createDefaultConfig().(*Config) @@ -387,7 +437,9 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) { batchMetrics.add(md) require.Equal(t, dataPointsPerMetric*metricsCount, batchMetrics.dataPointCount) - require.NoError(t, batchMetrics.export(ctx, sendBatchMaxSize)) + sent, _, sendErr := batchMetrics.export(ctx, sendBatchMaxSize, false) + require.NoError(t, sendErr) + require.Equal(t, sendBatchMaxSize, sent) remainingDataPointCount := metricsCount*dataPointsPerMetric - sendBatchMaxSize require.Equal(t, remainingDataPointCount, batchMetrics.dataPointCount) }