Skip to content

Commit

Permalink
add stats operations for health metrics (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
zakhar1i authored Jan 20, 2025
1 parent 0ce15a5 commit 75a6a82
Show file tree
Hide file tree
Showing 15 changed files with 240 additions and 108 deletions.
2 changes: 1 addition & 1 deletion example/health/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func main() {
// define monitored components
busComponent, err := component.New(
mercedesComponent, "", busRouteTarget, true,
[]string{speedMetricID},
map[string]component.Aggregator{speedMetricID: component.DefaultAggregator},
[]string{stationsCounterID},
[]string{passengersCounterID},
)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/twmb/murmur3 v1.1.8
github.com/zenoss/zenoss-protobufs v1.5.1
go.opencensus.io v0.24.0
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1
google.golang.org/api v0.180.0
google.golang.org/grpc v1.63.2
google.golang.org/protobuf v1.34.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 23 additions & 9 deletions health/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
package component

import (
"slices"

"github.com/zenoss/zenoss-go-sdk/health/utils"
sdk_utils "github.com/zenoss/zenoss-go-sdk/utils"
)

// New creates a new Component object
Expand All @@ -13,10 +14,10 @@ import (
//
//revive:disable:argument-limit
func New(
id, cType, targetID string, enableHeartbeat bool, metricIDs, counterIDs, totalCounterIDs []string,
id, cType, targetID string, enableHeartbeat bool, metrics map[string]Aggregator, counterIDs, totalCounterIDs []string,
) (*Component, error) {
set := make(map[string]struct{})
for _, val := range metricIDs {
for val := range metrics {
set[val] = struct{}{}
}
for _, val := range counterIDs {
Expand All @@ -25,7 +26,7 @@ func New(
for _, val := range totalCounterIDs {
set[val] = struct{}{}
}
if len(set) < len(metricIDs)+len(counterIDs)+len(totalCounterIDs) {
if len(set) < len(metrics)+len(counterIDs)+len(totalCounterIDs) {
return nil, utils.ErrMeasureIDTaken
}
if cType == "" {
Expand All @@ -36,7 +37,7 @@ func New(
Type: cType,
TargetID: targetID,
EnableHeartbeat: enableHeartbeat,
MetricIDs: metricIDs,
Metrics: metrics,
CounterIDs: counterIDs,
TotalCounterIDs: totalCounterIDs,
}, nil
Expand All @@ -48,7 +49,7 @@ func New(
// configuration (such as whether to enable heartbeat)
type Component struct {
ID string
MetricIDs []string
Metrics map[string]Aggregator
CounterIDs []string
TotalCounterIDs []string

Expand All @@ -62,10 +63,23 @@ type Component struct {
EnableHeartbeat bool
}

type Aggregator int

const (
AggregatorMean Aggregator = iota
AggregatorMin
AggregatorMax
AggregatorSum
AggregatorCount

DefaultAggregator = AggregatorMean
)

// IsMeasureIDUnique searches through all metric and counter IDs and
// returns false if such ID is already taken
func (t *Component) IsMeasureIDUnique(id string) bool {
return !(sdk_utils.ListContainsString(t.MetricIDs, id) ||
sdk_utils.ListContainsString(t.CounterIDs, id) ||
sdk_utils.ListContainsString(t.TotalCounterIDs, id))
_, metricOk := t.Metrics[id]
return !(metricOk ||
slices.Contains(t.CounterIDs, id) ||
slices.Contains(t.TotalCounterIDs, id))
}
15 changes: 8 additions & 7 deletions health/component/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,22 @@ var _ = Describe("Component Tests", func() {
)

Context("constructor", func() {
var metricIDs, counterIDs, tCounterIDs []string
var metrics map[string]component.Aggregator
var counterIDs, tCounterIDs []string
BeforeEach(func() {
metricIDs = []string{"someMetric"}
metrics = map[string]component.Aggregator{"someMetric": component.DefaultAggregator}
counterIDs = []string{"someCounter"}
tCounterIDs = []string{"totalCounter", mockStr}
})
It("should return an error if measure is not unique", func() {
metricIDs = append(metricIDs, mockStr)
component, err := component.New(id, "", "", true, metricIDs, counterIDs, tCounterIDs)
metrics[mockStr] = component.DefaultAggregator
component, err := component.New(id, "", "", true, metrics, counterIDs, tCounterIDs)
Ω(component).Should(BeNil())
Ω(err).Should(Equal(utils.ErrMeasureIDTaken))
})

It("should return a new Component", func() {
component, err := component.New(id, "", "", true, metricIDs, counterIDs, tCounterIDs)
component, err := component.New(id, "", "", true, metrics, counterIDs, tCounterIDs)
Ω(err).Should(BeNil())
Ω(component).ShouldNot(BeNil())
})
Expand All @@ -45,11 +46,11 @@ var _ = Describe("Component Tests", func() {
var testComponent *component.Component

BeforeEach(func() {
metricIDs := []string{"someMetric"}
metrics := map[string]component.Aggregator{"someMetric": component.DefaultAggregator}
counterIDs := []string{"someCounter"}
tCounterIDs := []string{"totalCounter"}

testComponent, _ = component.New(id, "", "", true, metricIDs, counterIDs, tCounterIDs)
testComponent, _ = component.New(id, "", "", true, metrics, counterIDs, tCounterIDs)
})

It("should return false if measure is not unique", func() {
Expand Down
48 changes: 28 additions & 20 deletions health/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package health
import (
"context"
"fmt"
"slices"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -359,29 +360,29 @@ func (hm *healthManager) updateComponentHealthData(measure *ComponentMeasurement
}

func (hm *healthManager) updateComponentsMetric(cHealth *rawHealth, measure *ComponentMeasurement) error {
if !sdk_utils.ListContainsString(cHealth.component.MetricIDs, measure.MeasureID) {
if _, metricOk := cHealth.component.Metrics[measure.MeasureID]; !metricOk {
if !hm.registrationOnCollect() {
return utils.ErrMetricNotRegistered
}
if !cHealth.component.IsMeasureIDUnique(measure.MeasureID) {
return utils.ErrMeasureIDTaken
}
cHealth.component.MetricIDs = append(cHealth.component.MetricIDs, measure.MeasureID)
cHealth.rawMetrics[measure.MeasureID] = []float64{measure.MetricValue}
cHealth.component.Metrics[measure.MeasureID] = component.DefaultAggregator
cHealth.metrics[measure.MeasureID] = &metric{values: []float64{measure.MetricValue}}
} else {
cHealth.rawMetrics[measure.MeasureID] = append(
cHealth.rawMetrics[measure.MeasureID],
cHealth.metrics[measure.MeasureID].values = append(
cHealth.metrics[measure.MeasureID].values,
measure.MetricValue,
)
}
return nil
}

func (hm *healthManager) updateComponentsCounter(cHealth *rawHealth, measure *ComponentMeasurement) error {
if sdk_utils.ListContainsString(cHealth.component.TotalCounterIDs, measure.MeasureID) {
if slices.Contains(cHealth.component.TotalCounterIDs, measure.MeasureID) {
cHealth.totalCounters[measure.MeasureID] += measure.CounterChange
} else {
if !sdk_utils.ListContainsString(cHealth.component.CounterIDs, measure.MeasureID) {
if !slices.Contains(cHealth.component.CounterIDs, measure.MeasureID) {
if !hm.registrationOnCollect() {
return utils.ErrCounterNotRegistered
}
Expand All @@ -397,20 +398,20 @@ func (hm *healthManager) updateComponentsCounter(cHealth *rawHealth, measure *Co

func (*healthManager) buildComponentFromMeasure(measure *ComponentMeasurement) (*rawHealth, error) {
var enableHeartbeat bool
metricIDs := make([]string, 0)
metrics := make(map[string]component.Aggregator)
counterIDs := make([]string, 0)
totalCounterIDs := make([]string, 0)
switch measure.MeasureType {
case Heartbeat:
enableHeartbeat = true
case Metric:
metricIDs = []string{measure.MeasureID}
metrics[measure.MeasureID] = component.DefaultAggregator
case CounterChange:
counterIDs = []string{measure.MeasureID}
}
component, err := component.New(
measure.ComponentID, utils.DefaultComponentType, "", enableHeartbeat,
metricIDs, counterIDs, totalCounterIDs,
metrics, counterIDs, totalCounterIDs,
)
if err != nil { // shouldn't ever happen here
return nil, err
Expand Down Expand Up @@ -509,13 +510,20 @@ func (*healthManager) calculateComponentHealth(rawHealth *rawHealth) *component.
for counterID, counter := range rawHealth.totalCounters {
health.Counters[counterID] = counter
}
for metricID, mValues := range rawHealth.rawMetrics {
if len(mValues) > 0 {
var sum float64
for _, value := range mValues {
sum += value
for metricID, m := range rawHealth.metrics {
if len(m.values) > 0 {
switch m.aggregator {
case component.AggregatorMean:
health.Metrics[metricID] = sdk_utils.Sum(m.values) / float64(len(m.values))
case component.AggregatorMin:
health.Metrics[metricID] = slices.Min(m.values)
case component.AggregatorMax:
health.Metrics[metricID] = slices.Max(m.values)
case component.AggregatorSum:
health.Metrics[metricID] = sdk_utils.Sum(m.values)
case component.AggregatorCount:
health.Metrics[metricID] = float64(len(m.values))
}
health.Metrics[metricID] = sum / float64(len(mValues))
}
}
return health
Expand All @@ -527,9 +535,9 @@ func (*healthManager) cleanHealthValues(rawHealth *rawHealth) {
for counterID := range rawHealth.counters {
rawHealth.counters[counterID] = 0
}
for metricID, mValues := range rawHealth.rawMetrics {
if len(mValues) > 0 {
rawHealth.rawMetrics[metricID] = []float64{}
for _, m := range rawHealth.metrics {
if len(m.values) > 0 {
m.values = []float64{}
}
}
}
Expand Down Expand Up @@ -646,6 +654,6 @@ func debugRawHealthStats(rawHealth *rawHealth) {
log.Debug().Msgf(
"ComponentID: %s, Status=%v, Heartbeat=%t, Counters=%v, Metrics=%v, Messages=%v",
rawHealth.component.ID, rawHealth.status, rawHealth.heartBeat, rawHealth.counters,
rawHealth.rawMetrics, messageSums,
rawHealth.metrics, messageSums,
)
}
105 changes: 103 additions & 2 deletions health/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var _ = Describe("Health Manager", Ordered, func() {
addTestComponent := func(target string) {
comp, _ := component.New(
testComponentID, utils.DefaultComponentType, target, true,
[]string{testMetric},
map[string]component.Aggregator{testMetric: component.DefaultAggregator},
[]string{testCounter},
[]string{testTotalCounter},
)
Expand Down Expand Up @@ -803,7 +803,7 @@ var _ = Describe("Health Manager", Ordered, func() {
)
low2, _ := component.New(
"low2", "low", mid1.ID, false,
[]string{testMetric}, nil, nil,
map[string]component.Aggregator{testMetric: component.DefaultAggregator}, nil, nil,
)
low3, _ := component.New(
"low3", "low", mid1.ID, false,
Expand Down Expand Up @@ -1004,4 +1004,105 @@ var _ = Describe("Health Manager", Ordered, func() {
Ω(len(targetComponentsHealth["target.high"].Messages)).Should(Equal(2))
})
})

Context("metrics aggregation", func() {
var (
metricMean = "test.metric.mean"
metricMin = "test.metric.min"
metricMax = "test.metric.max"
metricSum = "test.metric.sum"
metricCount = "test.metric.count"
)

BeforeEach(func() {
ctx = context.Background()

config := health.NewConfig()
config.CollectionCycle = 200 * time.Millisecond
config.LogLevel = "fatal"
manager = health.NewManager(ctx, config)

mesuresCh = make(chan *health.ComponentMeasurement)
healthCh = make(chan *component.Health)
componentCh = make(chan *component.Component)

controller := make(chan struct{})
go func() {
testComponent := <-componentCh
Ω(testComponent.ID).Should(Equal(testComponentID))
close(controller)
}()

manager.Start(ctx, mesuresCh, healthCh, componentCh)
comp, _ := component.New(
testComponentID, utils.DefaultComponentType, "", true,
map[string]component.Aggregator{
metricMean: component.AggregatorMean,
metricMin: component.AggregatorMin,
metricMax: component.AggregatorMax,
metricSum: component.AggregatorSum,
metricCount: component.AggregatorCount,
},
[]string{testCounter},
[]string{testTotalCounter},
)
manager.AddComponents([]*component.Component{comp})
<-controller
})

AfterEach(func() {
controller := make(chan struct{})
go func() {
<-healthCh
close(controller)
}()
close(mesuresCh)
manager.Shutdown()
<-controller
})

It("should calculate correct metric values for aggregators", func() {
for i := 1; i <= 10; i++ {
mesuresCh <- &health.ComponentMeasurement{
ComponentID: testComponentID,
MeasureID: metricMean,
MeasureType: health.Metric,
MetricValue: float64(i),
}
mesuresCh <- &health.ComponentMeasurement{
ComponentID: testComponentID,
MeasureID: metricMin,
MeasureType: health.Metric,
MetricValue: float64(i),
}
mesuresCh <- &health.ComponentMeasurement{
ComponentID: testComponentID,
MeasureID: metricMax,
MeasureType: health.Metric,
MetricValue: float64(i),
}
mesuresCh <- &health.ComponentMeasurement{
ComponentID: testComponentID,
MeasureID: metricSum,
MeasureType: health.Metric,
MetricValue: float64(i),
}
mesuresCh <- &health.ComponentMeasurement{
ComponentID: testComponentID,
MeasureID: metricCount,
MeasureType: health.Metric,
MetricValue: float64(i),
}
}

actualHealth := <-healthCh
Ω(actualHealth).ShouldNot(BeNil())
Ω(actualHealth.ComponentID).Should(Equal(testComponentID))
Ω(actualHealth.Metrics[metricMean]).Should(Equal(5.5))
Ω(actualHealth.Metrics[metricMin]).Should(Equal(float64(1)))
Ω(actualHealth.Metrics[metricMax]).Should(Equal(float64(10)))
Ω(actualHealth.Metrics[metricSum]).Should(Equal(float64(55)))
Ω(actualHealth.Metrics[metricCount]).Should(Equal(float64(10)))
})
})
})
Loading

0 comments on commit 75a6a82

Please sign in to comment.