Skip to content

Commit

Permalink
Collect metrics for a set of CG states (#7)
Browse files Browse the repository at this point in the history
* Filter empty consumer groups

* Collect metrics for a set of CG states

* Update minion/config_consumer_group.go

Co-authored-by: Adrian Muraru <[email protected]>

* Implement review

* Implement review

* Update the CG state values

---------

Co-authored-by: Adrian Muraru <[email protected]>
  • Loading branch information
dobrerazvan and amuraru authored Jul 24, 2023
1 parent 4f2c3b0 commit 0e82c7c
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 5 deletions.
15 changes: 15 additions & 0 deletions minion/config_consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ type ConsumerGroupConfig struct {
// IgnoredGroups are regex strings of group ids that shall be ignored/skipped when exporting metrics. Ignored groups
// take precedence over allowed groups.
IgnoredGroupIDs []string `koanf:"ignoredGroups"`

// Monitor consumer group states. Empty list means all consumer groups are monitoring regardless of its state
// Allowed values are: Dead, Empty, Stable, PreparingRebalance, CompletingRebalance
// Source: https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
AllowedConsumerGroupStates []string `koanf:"allowedConsumerGroupStates"`
}

func (c *ConsumerGroupConfig) SetDefaults() {
Expand Down Expand Up @@ -76,3 +81,13 @@ func (c *ConsumerGroupConfig) Validate() error {

return nil
}

// Returns a map of allowed group states for faster lookup
func (c *ConsumerGroupConfig) GetAllowedConsumerGroupStates() map[string]string {
// create a map for faster lookup
groupStatesMap := make(map[string]string, len(c.AllowedConsumerGroupStates))
for _, state := range c.AllowedConsumerGroupStates {
groupStatesMap[state] = state
}
return groupStatesMap
}
9 changes: 8 additions & 1 deletion minion/consumer_group_offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,16 @@ func (s *Service) ListAllConsumerGroupOffsetsAdminAPI(ctx context.Context) (map[
return nil, fmt.Errorf("failed to list groupsRes: %w", err)
}
groupIDs := make([]string, len(groupsRes.AllowedGroups.Groups))
groupStatesMap := s.Cfg.ConsumerGroups.GetAllowedConsumerGroupStates()

for i, group := range groupsRes.AllowedGroups.Groups {
if group.GroupState != "Dead" {
if len(groupStatesMap) == 0 {
groupIDs[i] = group.Group
} else {
// only add group if it's state is allowed
if _, ok := groupStatesMap[group.GroupState]; ok {
groupIDs[i] = group.Group
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion prometheus/collect_broker_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package prometheus

import (
"context"
"strconv"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"strconv"
)

func (e *Exporter) collectBrokerInfo(ctx context.Context, ch chan<- prometheus.Metric) bool {
Expand Down
3 changes: 2 additions & 1 deletion prometheus/collect_cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package prometheus

import (
"context"
"strconv"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"strconv"
)

func (e *Exporter) collectClusterInfo(ctx context.Context, ch chan<- prometheus.Metric) bool {
Expand Down
1 change: 1 addition & 0 deletions prometheus/collect_exporter_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package prometheus

import (
"context"

"github.com/prometheus/client_golang/prometheus"
)

Expand Down
3 changes: 2 additions & 1 deletion prometheus/collect_log_dirs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package prometheus

import (
"context"
"strconv"

"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
"strconv"
)

func (e *Exporter) collectLogDirs(ctx context.Context, ch chan<- prometheus.Metric) bool {
Expand Down
3 changes: 2 additions & 1 deletion prometheus/collect_topic_partition_offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package prometheus

import (
"context"
"strconv"

"github.com/cloudhut/kminion/v2/minion"
"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kerr"
"go.uber.org/zap"
"strconv"
)

func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- prometheus.Metric) bool {
Expand Down

0 comments on commit 0e82c7c

Please sign in to comment.