Skip to content

Commit

Permalink
trying msg counter
Browse files Browse the repository at this point in the history
  • Loading branch information
Martijn Berkvens committed Oct 18, 2023
1 parent 2effcdf commit 615e660
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 14 deletions.
22 changes: 21 additions & 1 deletion receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"

"github.com/IBM/sarama"
"go.opencensus.io/stats"
Expand Down Expand Up @@ -148,6 +149,7 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro
messageMarking: c.messageMarking,
headerExtractor: &nopHeaderExtractor{},
limiter: c.limiter,
//counter: &limiterMetrics{},
}
if c.headerExtraction {
consumerGroup.headerExtractor = &headerExtractor{
Expand All @@ -167,7 +169,7 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro
func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error {

c.limiter.logger = c.settings.Logger
go c.limiter.run(c.consumerGroup)
go c.limiter.run(ctx, c.consumerGroup)

for {
if !c.limiter.rateLimited {
Expand Down Expand Up @@ -438,8 +440,23 @@ type tracesConsumerGroupHandler struct {
messageMarking MessageMarking
headerExtractor HeaderExtractor
limiter Limiter
//counter *limiterMetrics
}

type limiterMetrics struct {
msgCounter int64
}

func (l *limiterMetrics) increaseMsgCounter() {
atomic.AddInt64(&l.msgCounter, 1)
}

func (l *limiterMetrics) getMsgCounter() float64 {
return float64(atomic.LoadInt64(&l.msgCounter))
}

var metricsCounter = &limiterMetrics{}

type metricsConsumerGroupHandler struct {
id component.ID
unmarshaler MetricsUnmarshaler
Expand Down Expand Up @@ -493,6 +510,7 @@ func (c *tracesConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession

func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
c.logger.Info("Starting consumer group", zap.Int32("partition", claim.Partition()))

if !c.autocommitEnabled {
defer session.Commit()
}
Expand All @@ -517,6 +535,8 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
statMessageOffset.M(message.Offset),
statMessageOffsetLag.M(claim.HighWaterMarkOffset()-message.Offset-1))

metricsCounter.increaseMsgCounter()

traces, err := c.unmarshaler.Unmarshal(message.Value)
if err != nil {
c.logger.Error("failed to unmarshal message", zap.Error(err))
Expand Down
38 changes: 25 additions & 13 deletions receiver/kafkareceiver/limiter.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package kafkareceiver

import (
"context"
"fmt"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/component"
"time"

"github.com/IBM/sarama"
Expand All @@ -22,6 +26,7 @@ type Limiter struct {
Enabled bool `mapstructure:"enabled"`
logger *zap.Logger
rateLimited bool
id component.ID
}

type limiterValues struct {
Expand All @@ -33,14 +38,14 @@ type limiterValues struct {
metricType pmetric.MetricType
}

func (c Limiter) run(handler sarama.ConsumerGroup) {
func (c Limiter) run(ctx context.Context, handler sarama.ConsumerGroup) {

metricMap := make(map[string]limiterValues)

metricMap["otelcol_receiver_accepted_spans"] = limiterValues{currTs: currTs(), limit: c.ReceiverAcceptedSpansRate, metricType: pmetric.MetricTypeSum}
metricMap["otelcol_processor_accepted_spans"] = limiterValues{currTs: currTs(), limit: c.ProcessorAcceptedSpansRate, metricType: pmetric.MetricTypeSum}
metricMap["otelcol_exporter_sent_spans"] = limiterValues{currTs: currTs(), limit: c.ExporterSentSpansRate, metricType: pmetric.MetricTypeSum}
metricMap["otelcol_exporter_queue_size"] = limiterValues{currTs: currTs(), limit: c.ExporterQueueSizeDiff, metricType: pmetric.MetricTypeGauge}
//metricMap["otelcol_receiver_accepted_spans"] = limiterValues{currTs: currTs(), limit: c.ReceiverAcceptedSpansRate, metricType: pmetric.MetricTypeSum}
//metricMap["otelcol_processor_accepted_spans"] = limiterValues{currTs: currTs(), limit: c.ProcessorAcceptedSpansRate, metricType: pmetric.MetricTypeSum}
//metricMap["otelcol_exporter_sent_spans"] = limiterValues{currTs: currTs(), limit: c.ExporterSentSpansRate, metricType: pmetric.MetricTypeSum}
//metricMap["otelcol_exporter_queue_size"] = limiterValues{currTs: currTs(), limit: c.ExporterQueueSizeDiff, metricType: pmetric.MetricTypeGauge}
metricMap["otelcol_kafka_receiver_messages"] = limiterValues{currTs: currTs(), limit: c.KafkaReceiverMessagesRate, metricType: pmetric.MetricTypeSum}

for {
Expand All @@ -49,11 +54,13 @@ func (c Limiter) run(handler sarama.ConsumerGroup) {
var pauseRequired bool

for metricName, values := range metricMap {
mTs, mVal := c.getMetricValue(metricName, values.metricType)
//mTs, mVal := c.getMetricValue(metricName, values.metricType)
values.prevTs = values.currTs
values.prevValue = values.currValue
values.currTs = mTs
values.currValue = mVal
//values.currTs = mTs
values.currTs = currTs()
//values.currValue = mVal
values.currValue = metricsCounter.getMsgCounter()
metricMap[metricName] = values

pause, rate := c.shouldPause(values)
Expand All @@ -63,17 +70,22 @@ func (c Limiter) run(handler sarama.ConsumerGroup) {
if pauseRequired {
c.logger.Warn(fmt.Sprintf("rate for %s exceeded: %f (limit: %f). Was already paused, will continue to pause", metricName, rate, values.limit))
} else {
c.logger.Warn(fmt.Sprintf("rate for %s exceeded: %f (limit: %f). Going to start pause", metricName, rate, values.limit))
c.logger.Warn(fmt.Sprintf("rate for %s exceeded: %f (limit: %f). Going to pause", metricName, rate, values.limit))
pauseRequired = true
}
}

}

//something := metricsCounter.getMsgCounter()
//fmt.Println("%f", something)

statsTags := []tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())}

if pauseRequired {
if c.Enabled {
statLimiterPause.M(1)
handler.PauseAll()
_ = stats.RecordWithTags(ctx, statsTags, statLimiterPause.M(1))
c.rateLimited = true
} else {
c.logger.Debug("rate would be limited but limiter is not enabled")
Expand All @@ -84,8 +96,8 @@ func (c Limiter) run(handler sarama.ConsumerGroup) {
c.logger.Warn("lifting rate limit, resuming operations")
}
if c.Enabled {
statLimiterPause.M(0)
handler.ResumeAll()
_ = stats.RecordWithTags(ctx, statsTags, statLimiterPause.M(0))
}
}

Expand All @@ -99,8 +111,8 @@ func (c Limiter) shouldPause(values limiterValues) (bool, float64) {
duration := currentTs.Sub(previousTs)
seconds := duration.Seconds()

// if any of the values are 0, do not do anything. This is to avoid unintended pausing at startup.
if values.prevValue != 0 || values.currValue != 0 {
// if both of the values are 0, do not do anything. This is to avoid unintended pausing at startup.
if values.prevValue != 0 && values.currValue != 0 {

if seconds > 0 {
diff := values.currValue - values.prevValue
Expand Down

0 comments on commit 615e660

Please sign in to comment.