Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZING-17808: add stats operations for health metrics #38

Merged
merged 1 commit into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion example/health/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func main() {
// define monitored components
busComponent, err := component.New(
mercedesComponent, "", busRouteTarget, true,
[]string{speedMetricID},
map[string]component.Aggregator{speedMetricID: component.DefaultAggregator},
[]string{stationsCounterID},
[]string{passengersCounterID},
)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/twmb/murmur3 v1.1.8
github.com/zenoss/zenoss-protobufs v1.5.1
go.opencensus.io v0.24.0
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1
google.golang.org/api v0.180.0
google.golang.org/grpc v1.63.2
google.golang.org/protobuf v1.34.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 23 additions & 9 deletions health/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
package component

import (
"slices"

"github.com/zenoss/zenoss-go-sdk/health/utils"
sdk_utils "github.com/zenoss/zenoss-go-sdk/utils"
)

// New creates a new Component object
Expand All @@ -13,10 +14,10 @@ import (
//
//revive:disable:argument-limit
func New(
id, cType, targetID string, enableHeartbeat bool, metricIDs, counterIDs, totalCounterIDs []string,
id, cType, targetID string, enableHeartbeat bool, metrics map[string]Aggregator, counterIDs, totalCounterIDs []string,
) (*Component, error) {
set := make(map[string]struct{})
for _, val := range metricIDs {
for val := range metrics {
set[val] = struct{}{}
}
for _, val := range counterIDs {
Expand All @@ -25,7 +26,7 @@ func New(
for _, val := range totalCounterIDs {
set[val] = struct{}{}
}
if len(set) < len(metricIDs)+len(counterIDs)+len(totalCounterIDs) {
if len(set) < len(metrics)+len(counterIDs)+len(totalCounterIDs) {
return nil, utils.ErrMeasureIDTaken
}
if cType == "" {
Expand All @@ -36,7 +37,7 @@ func New(
Type: cType,
TargetID: targetID,
EnableHeartbeat: enableHeartbeat,
MetricIDs: metricIDs,
Metrics: metrics,
CounterIDs: counterIDs,
TotalCounterIDs: totalCounterIDs,
}, nil
Expand All @@ -48,7 +49,7 @@ func New(
// configuration (such as whether to enable heartbeat)
type Component struct {
ID string
MetricIDs []string
Metrics map[string]Aggregator
CounterIDs []string
TotalCounterIDs []string

Expand All @@ -62,10 +63,23 @@ type Component struct {
EnableHeartbeat bool
}

type Aggregator int

const (
AggregatorMean Aggregator = iota
AggregatorMin
AggregatorMax
AggregatorSum
AggregatorCount

DefaultAggregator = AggregatorMean
)

// IsMeasureIDUnique searches through all metric and counter IDs and
// returns false if such ID is already taken
func (t *Component) IsMeasureIDUnique(id string) bool {
return !(sdk_utils.ListContainsString(t.MetricIDs, id) ||
sdk_utils.ListContainsString(t.CounterIDs, id) ||
sdk_utils.ListContainsString(t.TotalCounterIDs, id))
_, metricOk := t.Metrics[id]
return !(metricOk ||
slices.Contains(t.CounterIDs, id) ||
slices.Contains(t.TotalCounterIDs, id))
}
15 changes: 8 additions & 7 deletions health/component/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,22 @@ var _ = Describe("Component Tests", func() {
)

Context("constructor", func() {
var metricIDs, counterIDs, tCounterIDs []string
var metrics map[string]component.Aggregator
var counterIDs, tCounterIDs []string
BeforeEach(func() {
metricIDs = []string{"someMetric"}
metrics = map[string]component.Aggregator{"someMetric": component.DefaultAggregator}
counterIDs = []string{"someCounter"}
tCounterIDs = []string{"totalCounter", mockStr}
})
It("should return an error if measure is not unique", func() {
metricIDs = append(metricIDs, mockStr)
component, err := component.New(id, "", "", true, metricIDs, counterIDs, tCounterIDs)
metrics[mockStr] = component.DefaultAggregator
component, err := component.New(id, "", "", true, metrics, counterIDs, tCounterIDs)
Ω(component).Should(BeNil())
Ω(err).Should(Equal(utils.ErrMeasureIDTaken))
})

It("should return a new Component", func() {
component, err := component.New(id, "", "", true, metricIDs, counterIDs, tCounterIDs)
component, err := component.New(id, "", "", true, metrics, counterIDs, tCounterIDs)
Ω(err).Should(BeNil())
Ω(component).ShouldNot(BeNil())
})
Expand All @@ -45,11 +46,11 @@ var _ = Describe("Component Tests", func() {
var testComponent *component.Component

BeforeEach(func() {
metricIDs := []string{"someMetric"}
metrics := map[string]component.Aggregator{"someMetric": component.DefaultAggregator}
counterIDs := []string{"someCounter"}
tCounterIDs := []string{"totalCounter"}

testComponent, _ = component.New(id, "", "", true, metricIDs, counterIDs, tCounterIDs)
testComponent, _ = component.New(id, "", "", true, metrics, counterIDs, tCounterIDs)
})

It("should return false if measure is not unique", func() {
Expand Down
48 changes: 28 additions & 20 deletions health/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package health
import (
"context"
"fmt"
"slices"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -359,29 +360,29 @@ func (hm *healthManager) updateComponentHealthData(measure *ComponentMeasurement
}

func (hm *healthManager) updateComponentsMetric(cHealth *rawHealth, measure *ComponentMeasurement) error {
if !sdk_utils.ListContainsString(cHealth.component.MetricIDs, measure.MeasureID) {
if _, metricOk := cHealth.component.Metrics[measure.MeasureID]; !metricOk {
if !hm.registrationOnCollect() {
return utils.ErrMetricNotRegistered
}
if !cHealth.component.IsMeasureIDUnique(measure.MeasureID) {
return utils.ErrMeasureIDTaken
}
cHealth.component.MetricIDs = append(cHealth.component.MetricIDs, measure.MeasureID)
cHealth.rawMetrics[measure.MeasureID] = []float64{measure.MetricValue}
cHealth.component.Metrics[measure.MeasureID] = component.DefaultAggregator
cHealth.metrics[measure.MeasureID] = &metric{values: []float64{measure.MetricValue}}
} else {
cHealth.rawMetrics[measure.MeasureID] = append(
cHealth.rawMetrics[measure.MeasureID],
cHealth.metrics[measure.MeasureID].values = append(
cHealth.metrics[measure.MeasureID].values,
measure.MetricValue,
)
}
return nil
}

func (hm *healthManager) updateComponentsCounter(cHealth *rawHealth, measure *ComponentMeasurement) error {
if sdk_utils.ListContainsString(cHealth.component.TotalCounterIDs, measure.MeasureID) {
if slices.Contains(cHealth.component.TotalCounterIDs, measure.MeasureID) {
cHealth.totalCounters[measure.MeasureID] += measure.CounterChange
} else {
if !sdk_utils.ListContainsString(cHealth.component.CounterIDs, measure.MeasureID) {
if !slices.Contains(cHealth.component.CounterIDs, measure.MeasureID) {
if !hm.registrationOnCollect() {
return utils.ErrCounterNotRegistered
}
Expand All @@ -397,20 +398,20 @@ func (hm *healthManager) updateComponentsCounter(cHealth *rawHealth, measure *Co

func (*healthManager) buildComponentFromMeasure(measure *ComponentMeasurement) (*rawHealth, error) {
var enableHeartbeat bool
metricIDs := make([]string, 0)
metrics := make(map[string]component.Aggregator)
counterIDs := make([]string, 0)
totalCounterIDs := make([]string, 0)
switch measure.MeasureType {
case Heartbeat:
enableHeartbeat = true
case Metric:
metricIDs = []string{measure.MeasureID}
metrics[measure.MeasureID] = component.DefaultAggregator
case CounterChange:
counterIDs = []string{measure.MeasureID}
}
component, err := component.New(
measure.ComponentID, utils.DefaultComponentType, "", enableHeartbeat,
metricIDs, counterIDs, totalCounterIDs,
metrics, counterIDs, totalCounterIDs,
)
if err != nil { // shouldn't ever happen here
return nil, err
Expand Down Expand Up @@ -509,13 +510,20 @@ func (*healthManager) calculateComponentHealth(rawHealth *rawHealth) *component.
for counterID, counter := range rawHealth.totalCounters {
health.Counters[counterID] = counter
}
for metricID, mValues := range rawHealth.rawMetrics {
if len(mValues) > 0 {
var sum float64
for _, value := range mValues {
sum += value
for metricID, m := range rawHealth.metrics {
if len(m.values) > 0 {
switch m.aggregator {
case component.AggregatorMean:
health.Metrics[metricID] = sdk_utils.Sum(m.values) / float64(len(m.values))
case component.AggregatorMin:
health.Metrics[metricID] = slices.Min(m.values)
case component.AggregatorMax:
health.Metrics[metricID] = slices.Max(m.values)
case component.AggregatorSum:
health.Metrics[metricID] = sdk_utils.Sum(m.values)
case component.AggregatorCount:
health.Metrics[metricID] = float64(len(m.values))
}
health.Metrics[metricID] = sum / float64(len(mValues))
}
}
return health
Expand All @@ -527,9 +535,9 @@ func (*healthManager) cleanHealthValues(rawHealth *rawHealth) {
for counterID := range rawHealth.counters {
rawHealth.counters[counterID] = 0
}
for metricID, mValues := range rawHealth.rawMetrics {
if len(mValues) > 0 {
rawHealth.rawMetrics[metricID] = []float64{}
for _, m := range rawHealth.metrics {
if len(m.values) > 0 {
m.values = []float64{}
}
}
}
Expand Down Expand Up @@ -646,6 +654,6 @@ func debugRawHealthStats(rawHealth *rawHealth) {
log.Debug().Msgf(
"ComponentID: %s, Status=%v, Heartbeat=%t, Counters=%v, Metrics=%v, Messages=%v",
rawHealth.component.ID, rawHealth.status, rawHealth.heartBeat, rawHealth.counters,
rawHealth.rawMetrics, messageSums,
rawHealth.metrics, messageSums,
)
}
105 changes: 103 additions & 2 deletions health/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var _ = Describe("Health Manager", Ordered, func() {
addTestComponent := func(target string) {
comp, _ := component.New(
testComponentID, utils.DefaultComponentType, target, true,
[]string{testMetric},
map[string]component.Aggregator{testMetric: component.DefaultAggregator},
[]string{testCounter},
[]string{testTotalCounter},
)
Expand Down Expand Up @@ -803,7 +803,7 @@ var _ = Describe("Health Manager", Ordered, func() {
)
low2, _ := component.New(
"low2", "low", mid1.ID, false,
[]string{testMetric}, nil, nil,
map[string]component.Aggregator{testMetric: component.DefaultAggregator}, nil, nil,
)
low3, _ := component.New(
"low3", "low", mid1.ID, false,
Expand Down Expand Up @@ -1004,4 +1004,105 @@ var _ = Describe("Health Manager", Ordered, func() {
Ω(len(targetComponentsHealth["target.high"].Messages)).Should(Equal(2))
})
})

Context("metrics aggregation", func() {
var (
metricMean = "test.metric.mean"
metricMin = "test.metric.min"
metricMax = "test.metric.max"
metricSum = "test.metric.sum"
metricCount = "test.metric.count"
)

BeforeEach(func() {
ctx = context.Background()

config := health.NewConfig()
config.CollectionCycle = 200 * time.Millisecond
config.LogLevel = "fatal"
manager = health.NewManager(ctx, config)

mesuresCh = make(chan *health.ComponentMeasurement)
healthCh = make(chan *component.Health)
componentCh = make(chan *component.Component)

controller := make(chan struct{})
go func() {
testComponent := <-componentCh
Ω(testComponent.ID).Should(Equal(testComponentID))
close(controller)
}()

manager.Start(ctx, mesuresCh, healthCh, componentCh)
comp, _ := component.New(
testComponentID, utils.DefaultComponentType, "", true,
map[string]component.Aggregator{
metricMean: component.AggregatorMean,
metricMin: component.AggregatorMin,
metricMax: component.AggregatorMax,
metricSum: component.AggregatorSum,
metricCount: component.AggregatorCount,
},
[]string{testCounter},
[]string{testTotalCounter},
)
manager.AddComponents([]*component.Component{comp})
<-controller
})

AfterEach(func() {
controller := make(chan struct{})
go func() {
<-healthCh
close(controller)
}()
close(mesuresCh)
manager.Shutdown()
<-controller
})

It("should calculate correct metric values for aggregators", func() {
for i := 1; i <= 10; i++ {
mesuresCh <- &health.ComponentMeasurement{
ComponentID: testComponentID,
MeasureID: metricMean,
MeasureType: health.Metric,
MetricValue: float64(i),
}
mesuresCh <- &health.ComponentMeasurement{
ComponentID: testComponentID,
MeasureID: metricMin,
MeasureType: health.Metric,
MetricValue: float64(i),
}
mesuresCh <- &health.ComponentMeasurement{
ComponentID: testComponentID,
MeasureID: metricMax,
MeasureType: health.Metric,
MetricValue: float64(i),
}
mesuresCh <- &health.ComponentMeasurement{
ComponentID: testComponentID,
MeasureID: metricSum,
MeasureType: health.Metric,
MetricValue: float64(i),
}
mesuresCh <- &health.ComponentMeasurement{
ComponentID: testComponentID,
MeasureID: metricCount,
MeasureType: health.Metric,
MetricValue: float64(i),
}
}

actualHealth := <-healthCh
Ω(actualHealth).ShouldNot(BeNil())
Ω(actualHealth.ComponentID).Should(Equal(testComponentID))
Ω(actualHealth.Metrics[metricMean]).Should(Equal(5.5))
Ω(actualHealth.Metrics[metricMin]).Should(Equal(float64(1)))
Ω(actualHealth.Metrics[metricMax]).Should(Equal(float64(10)))
Ω(actualHealth.Metrics[metricSum]).Should(Equal(float64(55)))
Ω(actualHealth.Metrics[metricCount]).Should(Equal(float64(10)))
})
})
})
Loading
Loading