diff --git a/example/health/demo.go b/example/health/demo.go index 1137b3b..77b4417 100644 --- a/example/health/demo.go +++ b/example/health/demo.go @@ -33,7 +33,7 @@ func main() { // define monitored components busComponent, err := component.New( mercedesComponent, "", busRouteTarget, true, - []string{speedMetricID}, + map[string]component.Aggregator{speedMetricID: component.DefaultAggregator}, []string{stationsCounterID}, []string{passengersCounterID}, ) diff --git a/go.mod b/go.mod index e448148..ddd2880 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/twmb/murmur3 v1.1.8 github.com/zenoss/zenoss-protobufs v1.5.1 go.opencensus.io v0.24.0 + golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 google.golang.org/api v0.180.0 google.golang.org/grpc v1.63.2 google.golang.org/protobuf v1.34.1 diff --git a/go.sum b/go.sum index e1dd916..099fbf1 100644 --- a/go.sum +++ b/go.sum @@ -103,6 +103,8 @@ go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERsqu1GIbi967PQMq3Ivc= +golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= diff --git a/health/component/component.go b/health/component/component.go index cf8d2f1..a53c8bc 100644 --- a/health/component/component.go +++ b/health/component/component.go @@ -3,8 +3,9 @@ package component import ( + "slices" + "github.com/zenoss/zenoss-go-sdk/health/utils" - sdk_utils "github.com/zenoss/zenoss-go-sdk/utils" ) // New creates a new Component object @@ -13,10 +14,10 @@ import ( // //revive:disable:argument-limit func New( - id, cType, targetID string, enableHeartbeat bool, metricIDs, counterIDs, totalCounterIDs []string, + id, cType, targetID string, enableHeartbeat bool, metrics map[string]Aggregator, counterIDs, totalCounterIDs []string, ) (*Component, error) { set := make(map[string]struct{}) - for _, val := range metricIDs { + for val := range metrics { set[val] = struct{}{} } for _, val := range counterIDs { @@ -25,7 +26,7 @@ func New( for _, val := range totalCounterIDs { set[val] = struct{}{} } - if len(set) < len(metricIDs)+len(counterIDs)+len(totalCounterIDs) { + if len(set) < len(metrics)+len(counterIDs)+len(totalCounterIDs) { return nil, utils.ErrMeasureIDTaken } if cType == "" { @@ -36,7 +37,7 @@ func New( Type: cType, TargetID: targetID, EnableHeartbeat: enableHeartbeat, - MetricIDs: metricIDs, + Metrics: metrics, CounterIDs: counterIDs, TotalCounterIDs: totalCounterIDs, }, nil @@ -48,7 +49,7 @@ func New( // configuration (such as whether to enable heartbeat) type Component struct { ID string - MetricIDs []string + Metrics map[string]Aggregator CounterIDs []string TotalCounterIDs []string @@ -62,10 +63,23 @@ type Component struct { EnableHeartbeat bool } +type Aggregator int + +const ( + AggregatorMean Aggregator = iota + AggregatorMin + AggregatorMax + AggregatorSum + AggregatorCount + + DefaultAggregator = AggregatorMean +) + // IsMeasureIDUnique searches through all metric and counter IDs and // returns false if such ID is already taken func (t *Component) IsMeasureIDUnique(id string) bool { - return !(sdk_utils.ListContainsString(t.MetricIDs, id) || - sdk_utils.ListContainsString(t.CounterIDs, id) || - sdk_utils.ListContainsString(t.TotalCounterIDs, id)) + _, metricOk := t.Metrics[id] + return !(metricOk || + slices.Contains(t.CounterIDs, id) || + slices.Contains(t.TotalCounterIDs, id)) } diff --git a/health/component/component_test.go b/health/component/component_test.go index 47dc659..485258b 100644 --- a/health/component/component_test.go +++ b/health/component/component_test.go @@ -21,21 +21,22 @@ var _ = Describe("Component Tests", func() { ) Context("constructor", func() { - var metricIDs, counterIDs, tCounterIDs []string + var metrics map[string]component.Aggregator + var counterIDs, tCounterIDs []string BeforeEach(func() { - metricIDs = []string{"someMetric"} + metrics = map[string]component.Aggregator{"someMetric": component.DefaultAggregator} counterIDs = []string{"someCounter"} tCounterIDs = []string{"totalCounter", mockStr} }) It("should return an error if measure is not unique", func() { - metricIDs = append(metricIDs, mockStr) - component, err := component.New(id, "", "", true, metricIDs, counterIDs, tCounterIDs) + metrics[mockStr] = component.DefaultAggregator + component, err := component.New(id, "", "", true, metrics, counterIDs, tCounterIDs) Ω(component).Should(BeNil()) Ω(err).Should(Equal(utils.ErrMeasureIDTaken)) }) It("should return a new Component", func() { - component, err := component.New(id, "", "", true, metricIDs, counterIDs, tCounterIDs) + component, err := component.New(id, "", "", true, metrics, counterIDs, tCounterIDs) Ω(err).Should(BeNil()) Ω(component).ShouldNot(BeNil()) }) @@ -45,11 +46,11 @@ var _ = Describe("Component Tests", func() { var testComponent *component.Component BeforeEach(func() { - metricIDs := []string{"someMetric"} + metrics := map[string]component.Aggregator{"someMetric": component.DefaultAggregator} counterIDs := []string{"someCounter"} tCounterIDs := []string{"totalCounter"} - testComponent, _ = component.New(id, "", "", true, metricIDs, counterIDs, tCounterIDs) + testComponent, _ = component.New(id, "", "", true, metrics, counterIDs, tCounterIDs) }) It("should return false if measure is not unique", func() { diff --git a/health/manager.go b/health/manager.go index 335fd28..d510d49 100644 --- a/health/manager.go +++ b/health/manager.go @@ -15,6 +15,7 @@ package health import ( "context" "fmt" + "slices" "sync" "sync/atomic" "time" @@ -359,18 +360,18 @@ func (hm *healthManager) updateComponentHealthData(measure *ComponentMeasurement } func (hm *healthManager) updateComponentsMetric(cHealth *rawHealth, measure *ComponentMeasurement) error { - if !sdk_utils.ListContainsString(cHealth.component.MetricIDs, measure.MeasureID) { + if _, metricOk := cHealth.component.Metrics[measure.MeasureID]; !metricOk { if !hm.registrationOnCollect() { return utils.ErrMetricNotRegistered } if !cHealth.component.IsMeasureIDUnique(measure.MeasureID) { return utils.ErrMeasureIDTaken } - cHealth.component.MetricIDs = append(cHealth.component.MetricIDs, measure.MeasureID) - cHealth.rawMetrics[measure.MeasureID] = []float64{measure.MetricValue} + cHealth.component.Metrics[measure.MeasureID] = component.DefaultAggregator + cHealth.metrics[measure.MeasureID] = &metric{values: []float64{measure.MetricValue}} } else { - cHealth.rawMetrics[measure.MeasureID] = append( - cHealth.rawMetrics[measure.MeasureID], + cHealth.metrics[measure.MeasureID].values = append( + cHealth.metrics[measure.MeasureID].values, measure.MetricValue, ) } @@ -378,10 +379,10 @@ func (hm *healthManager) updateComponentsMetric(cHealth *rawHealth, measure *Com } func (hm *healthManager) updateComponentsCounter(cHealth *rawHealth, measure *ComponentMeasurement) error { - if sdk_utils.ListContainsString(cHealth.component.TotalCounterIDs, measure.MeasureID) { + if slices.Contains(cHealth.component.TotalCounterIDs, measure.MeasureID) { cHealth.totalCounters[measure.MeasureID] += measure.CounterChange } else { - if !sdk_utils.ListContainsString(cHealth.component.CounterIDs, measure.MeasureID) { + if !slices.Contains(cHealth.component.CounterIDs, measure.MeasureID) { if !hm.registrationOnCollect() { return utils.ErrCounterNotRegistered } @@ -397,20 +398,20 @@ func (hm *healthManager) updateComponentsCounter(cHealth *rawHealth, measure *Co func (*healthManager) buildComponentFromMeasure(measure *ComponentMeasurement) (*rawHealth, error) { var enableHeartbeat bool - metricIDs := make([]string, 0) + metrics := make(map[string]component.Aggregator) counterIDs := make([]string, 0) totalCounterIDs := make([]string, 0) switch measure.MeasureType { case Heartbeat: enableHeartbeat = true case Metric: - metricIDs = []string{measure.MeasureID} + metrics[measure.MeasureID] = component.DefaultAggregator case CounterChange: counterIDs = []string{measure.MeasureID} } component, err := component.New( measure.ComponentID, utils.DefaultComponentType, "", enableHeartbeat, - metricIDs, counterIDs, totalCounterIDs, + metrics, counterIDs, totalCounterIDs, ) if err != nil { // shouldn't ever happen here return nil, err @@ -509,13 +510,20 @@ func (*healthManager) calculateComponentHealth(rawHealth *rawHealth) *component. for counterID, counter := range rawHealth.totalCounters { health.Counters[counterID] = counter } - for metricID, mValues := range rawHealth.rawMetrics { - if len(mValues) > 0 { - var sum float64 - for _, value := range mValues { - sum += value + for metricID, m := range rawHealth.metrics { + if len(m.values) > 0 { + switch m.aggregator { + case component.AggregatorMean: + health.Metrics[metricID] = sdk_utils.Sum(m.values) / float64(len(m.values)) + case component.AggregatorMin: + health.Metrics[metricID] = slices.Min(m.values) + case component.AggregatorMax: + health.Metrics[metricID] = slices.Max(m.values) + case component.AggregatorSum: + health.Metrics[metricID] = sdk_utils.Sum(m.values) + case component.AggregatorCount: + health.Metrics[metricID] = float64(len(m.values)) } - health.Metrics[metricID] = sum / float64(len(mValues)) } } return health @@ -527,9 +535,9 @@ func (*healthManager) cleanHealthValues(rawHealth *rawHealth) { for counterID := range rawHealth.counters { rawHealth.counters[counterID] = 0 } - for metricID, mValues := range rawHealth.rawMetrics { - if len(mValues) > 0 { - rawHealth.rawMetrics[metricID] = []float64{} + for _, m := range rawHealth.metrics { + if len(m.values) > 0 { + m.values = []float64{} } } } @@ -646,6 +654,6 @@ func debugRawHealthStats(rawHealth *rawHealth) { log.Debug().Msgf( "ComponentID: %s, Status=%v, Heartbeat=%t, Counters=%v, Metrics=%v, Messages=%v", rawHealth.component.ID, rawHealth.status, rawHealth.heartBeat, rawHealth.counters, - rawHealth.rawMetrics, messageSums, + rawHealth.metrics, messageSums, ) } diff --git a/health/manager_test.go b/health/manager_test.go index 572b2ba..541514e 100644 --- a/health/manager_test.go +++ b/health/manager_test.go @@ -31,7 +31,7 @@ var _ = Describe("Health Manager", Ordered, func() { addTestComponent := func(target string) { comp, _ := component.New( testComponentID, utils.DefaultComponentType, target, true, - []string{testMetric}, + map[string]component.Aggregator{testMetric: component.DefaultAggregator}, []string{testCounter}, []string{testTotalCounter}, ) @@ -803,7 +803,7 @@ var _ = Describe("Health Manager", Ordered, func() { ) low2, _ := component.New( "low2", "low", mid1.ID, false, - []string{testMetric}, nil, nil, + map[string]component.Aggregator{testMetric: component.DefaultAggregator}, nil, nil, ) low3, _ := component.New( "low3", "low", mid1.ID, false, @@ -1004,4 +1004,105 @@ var _ = Describe("Health Manager", Ordered, func() { Ω(len(targetComponentsHealth["target.high"].Messages)).Should(Equal(2)) }) }) + + Context("metrics aggregation", func() { + var ( + metricMean = "test.metric.mean" + metricMin = "test.metric.min" + metricMax = "test.metric.max" + metricSum = "test.metric.sum" + metricCount = "test.metric.count" + ) + + BeforeEach(func() { + ctx = context.Background() + + config := health.NewConfig() + config.CollectionCycle = 200 * time.Millisecond + config.LogLevel = "fatal" + manager = health.NewManager(ctx, config) + + mesuresCh = make(chan *health.ComponentMeasurement) + healthCh = make(chan *component.Health) + componentCh = make(chan *component.Component) + + controller := make(chan struct{}) + go func() { + testComponent := <-componentCh + Ω(testComponent.ID).Should(Equal(testComponentID)) + close(controller) + }() + + manager.Start(ctx, mesuresCh, healthCh, componentCh) + comp, _ := component.New( + testComponentID, utils.DefaultComponentType, "", true, + map[string]component.Aggregator{ + metricMean: component.AggregatorMean, + metricMin: component.AggregatorMin, + metricMax: component.AggregatorMax, + metricSum: component.AggregatorSum, + metricCount: component.AggregatorCount, + }, + []string{testCounter}, + []string{testTotalCounter}, + ) + manager.AddComponents([]*component.Component{comp}) + <-controller + }) + + AfterEach(func() { + controller := make(chan struct{}) + go func() { + <-healthCh + close(controller) + }() + close(mesuresCh) + manager.Shutdown() + <-controller + }) + + It("should calculate correct metric values for aggregators", func() { + for i := 1; i <= 10; i++ { + mesuresCh <- &health.ComponentMeasurement{ + ComponentID: testComponentID, + MeasureID: metricMean, + MeasureType: health.Metric, + MetricValue: float64(i), + } + mesuresCh <- &health.ComponentMeasurement{ + ComponentID: testComponentID, + MeasureID: metricMin, + MeasureType: health.Metric, + MetricValue: float64(i), + } + mesuresCh <- &health.ComponentMeasurement{ + ComponentID: testComponentID, + MeasureID: metricMax, + MeasureType: health.Metric, + MetricValue: float64(i), + } + mesuresCh <- &health.ComponentMeasurement{ + ComponentID: testComponentID, + MeasureID: metricSum, + MeasureType: health.Metric, + MetricValue: float64(i), + } + mesuresCh <- &health.ComponentMeasurement{ + ComponentID: testComponentID, + MeasureID: metricCount, + MeasureType: health.Metric, + MetricValue: float64(i), + } + } + + actualHealth := <-healthCh + Ω(actualHealth).ShouldNot(BeNil()) + Ω(actualHealth.ComponentID).Should(Equal(testComponentID)) + Ω(actualHealth.Metrics[metricMean]).Should(Equal(5.5)) + Ω(actualHealth.Metrics[metricMin]).Should(Equal(float64(1))) + Ω(actualHealth.Metrics[metricMax]).Should(Equal(float64(10))) + Ω(actualHealth.Metrics[metricSum]).Should(Equal(float64(55))) + Ω(actualHealth.Metrics[metricCount]).Should(Equal(float64(10))) + }) + }) }) diff --git a/health/registry.go b/health/registry.go index 6741ff0..74a7989 100644 --- a/health/registry.go +++ b/health/registry.go @@ -6,32 +6,42 @@ import ( "github.com/zenoss/zenoss-go-sdk/health/component" ) -func newRawHealth(t *component.Component) *rawHealth { - tHealth := &rawHealth{ - component: t, +func newRawHealth(c *component.Component) *rawHealth { + cHealth := &rawHealth{ + component: c, status: component.Healthy, heartBeat: false, counters: make(map[string]int32), totalCounters: make(map[string]int32), - rawMetrics: make(map[string][]float64), + metrics: make(map[string]*metric), messages: make([]*component.Message, 0), } - for _, metricID := range t.MetricIDs { - tHealth.rawMetrics[metricID] = make([]float64, 0) + for metricID, ag := range c.Metrics { + cHealth.metrics[metricID] = &metric{ + values: make([]float64, 0), + aggregator: ag, + } } - return tHealth + return cHealth } // rawHealth is a struct that keeps component information together with raw component health -type rawHealth struct { - component *component.Component - status component.HealthStatus - heartBeat bool - counters map[string]int32 - totalCounters map[string]int32 - rawMetrics map[string][]float64 - messages []*component.Message -} +type ( + rawHealth struct { + component *component.Component + status component.HealthStatus + heartBeat bool + counters map[string]int32 + totalCounters map[string]int32 + metrics map[string]*metric + messages []*component.Message + } + + metric struct { + values []float64 + aggregator component.Aggregator + } +) // healthRegistry keeps raw health data and provides an interface to get and update it // It is a caller responsibility to lock and unlock registry before usage diff --git a/health/writer/destination.go b/health/writer/destination.go index af0eec7..c331776 100644 --- a/health/writer/destination.go +++ b/health/writer/destination.go @@ -41,9 +41,9 @@ func (l *LogDestination) Register(_ context.Context, component *component.Compon func (l *LogDestination) logComponentInfo(component *component.Component) { l.log.Info().Msgf("Got component update "+ - "ComponentID: %s, TargetID: %s, Heartbeat Enabled: %t, CounterIDs=%v, TotalCounterIDs=%v MetricIDs=%v", + "ComponentID: %s, TargetID: %s, Heartbeat Enabled: %t, CounterIDs=%v, TotalCounterIDs=%v, MetricIDs=%v", component.ID, component.TargetID, component.EnableHeartbeat, - component.CounterIDs, component.TotalCounterIDs, component.MetricIDs, + component.CounterIDs, component.TotalCounterIDs, component.Metrics, ) } diff --git a/health/writer/writer_test.go b/health/writer/writer_test.go index 0e3c44a..eda5347 100644 --- a/health/writer/writer_test.go +++ b/health/writer/writer_test.go @@ -80,7 +80,7 @@ var _ = Describe("Writer", func() { empty := []string{} hComponent, _ := component.New( componentID, utils.DefaultComponentType, utils.DefaultHealthTarget, false, - empty, empty, empty, + map[string]component.Aggregator{}, empty, empty, ) h := component.NewHealth(componentID, utils.DefaultComponentType, utils.DefaultHealthTarget) diff --git a/health/writer/zcDestination_test.go b/health/writer/zcDestination_test.go index b934c86..b9d8e19 100644 --- a/health/writer/zcDestination_test.go +++ b/health/writer/zcDestination_test.go @@ -114,7 +114,7 @@ var _ = Describe("Destination", func() { empty := []string{} component, err := component.New( componentID, utils.DefaultComponentType, utils.DefaultHealthTarget, false, - empty, empty, empty, + map[string]component.Aggregator{}, empty, empty, ) Ω(err).Should(BeNil()) @@ -136,7 +136,7 @@ var _ = Describe("Destination", func() { empty := []string{} component, err := component.New( componentID, utils.DefaultComponentType, utils.DefaultHealthTarget, false, - empty, empty, empty, + map[string]component.Aggregator{}, empty, empty, ) Ω(err).Should(BeNil()) diff --git a/utils/string.go b/utils/string.go deleted file mode 100644 index a38d5a0..0000000 --- a/utils/string.go +++ /dev/null @@ -1,18 +0,0 @@ -package utils - -import "google.golang.org/protobuf/types/known/structpb" - -// ListContainsString just searches if the val is in string list -func ListContainsString(list []string, val string) bool { - for _, el := range list { - if el == val { - return true - } - } - return false -} - -// StrToStructValue wraps string with proto struct Value wrapper -func StrToStructValue(str string) *structpb.Value { - return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: str}} -} diff --git a/utils/string_test.go b/utils/string_test.go deleted file mode 100644 index b47cab5..0000000 --- a/utils/string_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package utils_test - -import ( - "testing" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "github.com/zenoss/zenoss-go-sdk/utils" -) - -func TestStringUtils(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "String Utils Suite") -} - -var _ = Describe("ListContainsString", func() { - It("string in list", func() { - val := "string" - strSlice := []string{"cool", "staff", val} - exist := utils.ListContainsString(strSlice, val) - Ω(exist).Should(BeTrue()) - }) - - It("string not in list", func() { - val := "string" - strSlice := []string{"cool", "staff"} - exist := utils.ListContainsString(strSlice, val) - Ω(exist).Should(BeFalse()) - }) -}) diff --git a/utils/utils.go b/utils/utils.go new file mode 100644 index 0000000..dc58bf8 --- /dev/null +++ b/utils/utils.go @@ -0,0 +1,19 @@ +package utils + +import ( + "golang.org/x/exp/constraints" + "google.golang.org/protobuf/types/known/structpb" +) + +// Sum returns the sum of an array of numbers +func Sum[S ~[]E, E constraints.Integer | constraints.Float](values S) (sum E) { + for _, v := range values { + sum += v + } + return sum +} + +// StrToStructValue wraps string with proto struct Value wrapper +func StrToStructValue(str string) *structpb.Value { + return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: str}} +} diff --git a/utils/utils_test.go b/utils/utils_test.go new file mode 100644 index 0000000..ccbb9e7 --- /dev/null +++ b/utils/utils_test.go @@ -0,0 +1,24 @@ +package utils_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/zenoss/zenoss-go-sdk/utils" +) + +func TestStringUtils(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Utils Suite") +} + +var _ = Describe("Sum", func() { + It("should return sum of integers", func() { + Ω(utils.Sum([]int{1, 2, 3, 4, 5})).Should(Equal(15)) + }) + + It("should return sum of floats", func() { + Ω(utils.Sum([]float64{1.1, 1.9, 2.8, 4.2, 5.0})).Should(Equal(float64(15))) + }) +})