diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 58cda3b38726..87b1c6761153 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -8,6 +8,7 @@ import ( "fmt" "strings" "sync" + "sync/atomic" "github.com/IBM/sarama" "go.opencensus.io/stats" @@ -148,6 +149,7 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro messageMarking: c.messageMarking, headerExtractor: &nopHeaderExtractor{}, limiter: c.limiter, + //counter: &limiterMetrics{}, } if c.headerExtraction { consumerGroup.headerExtractor = &headerExtractor{ @@ -167,7 +169,7 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error { c.limiter.logger = c.settings.Logger - go c.limiter.run(c.consumerGroup) + go c.limiter.run(ctx, c.consumerGroup) for { if !c.limiter.rateLimited { @@ -438,8 +440,23 @@ type tracesConsumerGroupHandler struct { messageMarking MessageMarking headerExtractor HeaderExtractor limiter Limiter + //counter *limiterMetrics } +type limiterMetrics struct { + msgCounter int64 +} + +func (l *limiterMetrics) increaseMsgCounter() { + atomic.AddInt64(&l.msgCounter, 1) +} + +func (l *limiterMetrics) getMsgCounter() float64 { + return float64(atomic.LoadInt64(&l.msgCounter)) +} + +var metricsCounter = &limiterMetrics{} + type metricsConsumerGroupHandler struct { id component.ID unmarshaler MetricsUnmarshaler @@ -493,6 +510,7 @@ func (c *tracesConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { c.logger.Info("Starting consumer group", zap.Int32("partition", claim.Partition())) + if !c.autocommitEnabled { defer session.Commit() } @@ -517,6 +535,8 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe statMessageOffset.M(message.Offset), statMessageOffsetLag.M(claim.HighWaterMarkOffset()-message.Offset-1)) + metricsCounter.increaseMsgCounter() + traces, err := c.unmarshaler.Unmarshal(message.Value) if err != nil { c.logger.Error("failed to unmarshal message", zap.Error(err)) diff --git a/receiver/kafkareceiver/limiter.go b/receiver/kafkareceiver/limiter.go index 19219e4a3e6a..7382099e3aca 100644 --- a/receiver/kafkareceiver/limiter.go +++ b/receiver/kafkareceiver/limiter.go @@ -1,7 +1,11 @@ package kafkareceiver import ( + "context" "fmt" + "go.opencensus.io/stats" + "go.opencensus.io/tag" + "go.opentelemetry.io/collector/component" "time" "github.com/IBM/sarama" @@ -22,6 +26,7 @@ type Limiter struct { Enabled bool `mapstructure:"enabled"` logger *zap.Logger rateLimited bool + id component.ID } type limiterValues struct { @@ -33,14 +38,14 @@ type limiterValues struct { metricType pmetric.MetricType } -func (c Limiter) run(handler sarama.ConsumerGroup) { +func (c Limiter) run(ctx context.Context, handler sarama.ConsumerGroup) { metricMap := make(map[string]limiterValues) - metricMap["otelcol_receiver_accepted_spans"] = limiterValues{currTs: currTs(), limit: c.ReceiverAcceptedSpansRate, metricType: pmetric.MetricTypeSum} - metricMap["otelcol_processor_accepted_spans"] = limiterValues{currTs: currTs(), limit: c.ProcessorAcceptedSpansRate, metricType: pmetric.MetricTypeSum} - metricMap["otelcol_exporter_sent_spans"] = limiterValues{currTs: currTs(), limit: c.ExporterSentSpansRate, metricType: pmetric.MetricTypeSum} - metricMap["otelcol_exporter_queue_size"] = limiterValues{currTs: currTs(), limit: c.ExporterQueueSizeDiff, metricType: pmetric.MetricTypeGauge} + //metricMap["otelcol_receiver_accepted_spans"] = limiterValues{currTs: currTs(), limit: c.ReceiverAcceptedSpansRate, metricType: pmetric.MetricTypeSum} + //metricMap["otelcol_processor_accepted_spans"] = limiterValues{currTs: currTs(), limit: c.ProcessorAcceptedSpansRate, metricType: pmetric.MetricTypeSum} + //metricMap["otelcol_exporter_sent_spans"] = limiterValues{currTs: currTs(), limit: c.ExporterSentSpansRate, metricType: pmetric.MetricTypeSum} + //metricMap["otelcol_exporter_queue_size"] = limiterValues{currTs: currTs(), limit: c.ExporterQueueSizeDiff, metricType: pmetric.MetricTypeGauge} metricMap["otelcol_kafka_receiver_messages"] = limiterValues{currTs: currTs(), limit: c.KafkaReceiverMessagesRate, metricType: pmetric.MetricTypeSum} for { @@ -49,11 +54,13 @@ func (c Limiter) run(handler sarama.ConsumerGroup) { var pauseRequired bool for metricName, values := range metricMap { - mTs, mVal := c.getMetricValue(metricName, values.metricType) + //mTs, mVal := c.getMetricValue(metricName, values.metricType) values.prevTs = values.currTs values.prevValue = values.currValue - values.currTs = mTs - values.currValue = mVal + //values.currTs = mTs + values.currTs = currTs() + //values.currValue = mVal + values.currValue = metricsCounter.getMsgCounter() metricMap[metricName] = values pause, rate := c.shouldPause(values) @@ -63,17 +70,22 @@ func (c Limiter) run(handler sarama.ConsumerGroup) { if pauseRequired { c.logger.Warn(fmt.Sprintf("rate for %s exceeded: %f (limit: %f). Was already paused, will continue to pause", metricName, rate, values.limit)) } else { - c.logger.Warn(fmt.Sprintf("rate for %s exceeded: %f (limit: %f). Going to start pause", metricName, rate, values.limit)) + c.logger.Warn(fmt.Sprintf("rate for %s exceeded: %f (limit: %f). Going to pause", metricName, rate, values.limit)) pauseRequired = true } } } + //something := metricsCounter.getMsgCounter() + //fmt.Println("%f", something) + + statsTags := []tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())} + if pauseRequired { if c.Enabled { - statLimiterPause.M(1) handler.PauseAll() + _ = stats.RecordWithTags(ctx, statsTags, statLimiterPause.M(1)) c.rateLimited = true } else { c.logger.Debug("rate would be limited but limiter is not enabled") @@ -84,8 +96,8 @@ func (c Limiter) run(handler sarama.ConsumerGroup) { c.logger.Warn("lifting rate limit, resuming operations") } if c.Enabled { - statLimiterPause.M(0) handler.ResumeAll() + _ = stats.RecordWithTags(ctx, statsTags, statLimiterPause.M(0)) } } @@ -99,8 +111,8 @@ func (c Limiter) shouldPause(values limiterValues) (bool, float64) { duration := currentTs.Sub(previousTs) seconds := duration.Seconds() - // if any of the values are 0, do not do anything. This is to avoid unintended pausing at startup. - if values.prevValue != 0 || values.currValue != 0 { + // if both of the values are 0, do not do anything. This is to avoid unintended pausing at startup. + if values.prevValue != 0 && values.currValue != 0 { if seconds > 0 { diff := values.currValue - values.prevValue