Skip to content

Commit

Permalink
removed Kafka lag and added pause metric
Browse files Browse the repository at this point in the history
  • Loading branch information
Martijn Berkvens committed Oct 18, 2023
1 parent f5af3c7 commit 2effcdf
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 6 deletions.
2 changes: 0 additions & 2 deletions receiver/kafkareceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func TestLoadConfig(t *testing.T) {
ExporterSentSpansRate: 20,
ExporterQueueSizeDiff: 2000,
KafkaReceiverMessagesRate: 20,
KafkaReceiverLagDiff: 2000,
Enabled: true,
},
},
Expand Down Expand Up @@ -108,7 +107,6 @@ func TestLoadConfig(t *testing.T) {
ExporterSentSpansRate: 100,
ExporterQueueSizeDiff: 1000,
KafkaReceiverMessagesRate: 100,
KafkaReceiverLagDiff: 1000,
Enabled: false,
},
},
Expand Down
1 change: 0 additions & 1 deletion receiver/kafkareceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ func createDefaultConfig() component.Config {
ExporterSentSpansRate: 100,
ExporterQueueSizeDiff: 1000,
KafkaReceiverMessagesRate: 100,
KafkaReceiverLagDiff: 1000,
Enabled: false,
},
}
Expand Down
4 changes: 2 additions & 2 deletions receiver/kafkareceiver/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ type Limiter struct {
ExporterSentSpansRate float64 `mapstructure:"exporter_sent_spans_rate"`
ExporterQueueSizeDiff float64 `mapstructure:"exporter_queue_size_diff"`
KafkaReceiverMessagesRate float64 `mapstructure:"kafka_receiver_messages_rate"`
KafkaReceiverLagDiff float64 `mapstructure:"kafka_receiver_lag_diff"`
Enabled bool `mapstructure:"enabled"`
logger *zap.Logger
rateLimited bool
Expand All @@ -43,7 +42,6 @@ func (c Limiter) run(handler sarama.ConsumerGroup) {
metricMap["otelcol_exporter_sent_spans"] = limiterValues{currTs: currTs(), limit: c.ExporterSentSpansRate, metricType: pmetric.MetricTypeSum}
metricMap["otelcol_exporter_queue_size"] = limiterValues{currTs: currTs(), limit: c.ExporterQueueSizeDiff, metricType: pmetric.MetricTypeGauge}
metricMap["otelcol_kafka_receiver_messages"] = limiterValues{currTs: currTs(), limit: c.KafkaReceiverMessagesRate, metricType: pmetric.MetricTypeSum}
metricMap["otelcol_kafka_receiver_offset_lag"] = limiterValues{currTs: currTs(), limit: c.KafkaReceiverLagDiff, metricType: pmetric.MetricTypeGauge}

for {
c.logger.Debug("running limiter")
Expand Down Expand Up @@ -74,6 +72,7 @@ func (c Limiter) run(handler sarama.ConsumerGroup) {

if pauseRequired {
if c.Enabled {
statLimiterPause.M(1)
handler.PauseAll()
c.rateLimited = true
} else {
Expand All @@ -85,6 +84,7 @@ func (c Limiter) run(handler sarama.ConsumerGroup) {
c.logger.Warn("lifting rate limit, resuming operations")
}
if c.Enabled {
statLimiterPause.M(0)
handler.ResumeAll()
}
}
Expand Down
11 changes: 11 additions & 0 deletions receiver/kafkareceiver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ var (

statPartitionStart = stats.Int64("kafka_receiver_partition_start", "Number of started partitions", stats.UnitDimensionless)
statPartitionClose = stats.Int64("kafka_receiver_partition_close", "Number of finished partitions", stats.UnitDimensionless)

statLimiterPause = stats.Int64("kafka_receiver_limiter_pause", "Current limiter pause status", stats.UnitDimensionless)
)

// MetricViews return metric views for Kafka receiver.
Expand Down Expand Up @@ -64,11 +66,20 @@ func MetricViews() []*view.View {
Aggregation: view.Sum(),
}

lastValueLimiterPause := &view.View{
Name: statLimiterPause.Name(),
Measure: statLimiterPause,
Description: statLimiterPause.Description(),
TagKeys: tagKeys,
Aggregation: view.LastValue(),
}

return []*view.View{
countMessages,
lastValueOffset,
lastValueOffsetLag,
countPartitionStart,
countPartitionClose,
lastValueLimiterPause,
}
}
1 change: 1 addition & 0 deletions receiver/kafkareceiver/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func TestMetrics(t *testing.T) {
"kafka_receiver_offset_lag",
"kafka_receiver_partition_start",
"kafka_receiver_partition_close",
"kafka_receiver_limiter_pause",
}
for i, viewName := range viewNames {
assert.Equal(t, viewName, metricViews[i].Name)
Expand Down
1 change: 0 additions & 1 deletion receiver/kafkareceiver/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ kafka:
exporter_sent_spans_rate: 20
exporter_queue_size_diff: 2000
kafka_receiver_messages_rate: 20
kafka_receiver_lag_diff: 2000
enabled: true
kafka/logs:
topic: logs
Expand Down

0 comments on commit 2effcdf

Please sign in to comment.