From 30adb59e41cbe9c43eb430198b15a8bd16c4683c Mon Sep 17 00:00:00 2001 From: Kevin Yu Date: Wed, 22 May 2024 14:48:33 -0700 Subject: [PATCH 1/3] Fix: Add fields to frame if it does not already exist when grouping by multiple terms --- pkg/opensearch/response_parser.go | 9 ++- pkg/opensearch/response_parser_test.go | 103 +++++++++++++++++++++++++ 2 files changed, 110 insertions(+), 2 deletions(-) diff --git a/pkg/opensearch/response_parser.go b/pkg/opensearch/response_parser.go index 386b70f3..b9a28a64 100644 --- a/pkg/opensearch/response_parser.go +++ b/pkg/opensearch/response_parser.go @@ -999,7 +999,11 @@ func (rp *responseParser) processAggregationDocs(esAgg *simplejson.Json, aggDef frames := data.Frames{} var fields []*data.Field - if queryResult.Frames == nil { + if queryResult.Frames != nil && len(queryResult.Frames) != 0 { + for _, frame := range queryResult.Frames { + fields = append(fields, frame.Fields...) + } + } else { for _, propKey := range propKeys { fields = append(fields, data.NewField(propKey, nil, []*string{})) } @@ -1012,7 +1016,8 @@ func (rp *responseParser) processAggregationDocs(esAgg *simplejson.Json, aggDef for _, e := range fields { for _, propKey := range propKeys { if e.Name == propKey { - e.Append(props[propKey]) + value := props[propKey] + e.Append(&value) } } if e.Name == aggDef.Field { diff --git a/pkg/opensearch/response_parser_test.go b/pkg/opensearch/response_parser_test.go index 9afa4236..1ee02c85 100644 --- a/pkg/opensearch/response_parser_test.go +++ b/pkg/opensearch/response_parser_test.go @@ -407,6 +407,109 @@ func Test_ResponseParser_test(t *testing.T) { assert.EqualValues(t, 32, *seriesFour.Fields[1].At(1).(*float64)) }) + t.Run("Multiple group by query with two metrics", func(t *testing.T) { + targets := []tsdbQuery{{ + refId: "A", + body: `{ + "timeField": "@timestamp", + "metrics": [{"id": "1", "type": "count"}, {"id": "4", "type": "max", "field": "DistanceMiles"}], + "bucketAggs": [{"field": "DestCityName", "id": "3", "type": "terms"}, {"field": "FlightDelayType", "id": "2", "type": "terms"}] + }`, + }} + response := `{ + "responses": [ + { + "aggregations": { + "3": { + "buckets": [ + { + "2": { + "buckets": [ + { + "4": { + "value": 5640.1 + }, + "key": "Weather Delay", + "doc_count": 10 + }, + { + "4": { + "value": 5624.2 + }, + "key": "Security Delay", + "doc_count": 15 + } + ] + }, + "key": "Zurich", + "doc_count": 691 + }, + { + "2": { + "buckets": [ + { + "4": { + "value": 8245.1 + }, + "key": "Weather Delay", + "doc_count": 9 + }, + { + "4": { + "value": 8300.4 + }, + "key": "Security Delay", + "doc_count": 8 + } + ] + }, + "key": "Xi'an", + "doc_count": 526 + } + ] + } + } + } + ] + }` + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) + assert.Nil(t, err) + result, err := rp.parseResponse() + assert.Nil(t, err) + require.Len(t, result.Responses, 1) + + queryRes := result.Responses["A"] + assert.NotNil(t, queryRes) + assert.Len(t, queryRes.Frames, 1) + frame := queryRes.Frames[0] + require.Len(t, frame.Fields, 4) + + assert.Equal(t, "DestCityName", frame.Fields[0].Name) + assert.Equal(t, "FlightDelayType", frame.Fields[1].Name) + assert.Equal(t, "Count", frame.Fields[2].Name) + assert.Equal(t, "Max", frame.Fields[3].Name) + + assert.Equal(t, "Zurich", *frame.Fields[0].At(0).(*string)) + assert.Equal(t, "Weather Delay", *frame.Fields[1].At(0).(*string)) + assert.Equal(t, float64(10), *frame.Fields[2].At(0).(*float64)) + assert.Equal(t, float64(5640.1), *frame.Fields[3].At(0).(*float64)) + + assert.Equal(t, "Zurich", *frame.Fields[0].At(1).(*string)) + assert.Equal(t, "Security Delay", *frame.Fields[1].At(1).(*string)) + assert.Equal(t, float64(15), *frame.Fields[2].At(1).(*float64)) + assert.Equal(t, float64(5624.2), *frame.Fields[3].At(1).(*float64)) + + assert.Equal(t, "Xi'an", *frame.Fields[0].At(2).(*string)) + assert.Equal(t, "Weather Delay", *frame.Fields[1].At(2).(*string)) + assert.Equal(t, float64(9), *frame.Fields[2].At(2).(*float64)) + assert.Equal(t, float64(8245.1), *frame.Fields[3].At(2).(*float64)) + + assert.Equal(t, "Xi'an", *frame.Fields[0].At(3).(*string)) + assert.Equal(t, "Security Delay", *frame.Fields[1].At(3).(*string)) + assert.Equal(t, float64(8), *frame.Fields[2].At(3).(*float64)) + assert.Equal(t, float64(8300.4), *frame.Fields[3].At(3).(*float64)) + }) + t.Run("With percentiles", func(t *testing.T) { targets := []tsdbQuery{{ refId: "A", From afa276625e5fa842dd4372e2c25fbf4d5535acf8 Mon Sep 17 00:00:00 2001 From: Kevin Yu Date: Thu, 23 May 2024 09:12:36 -0700 Subject: [PATCH 2/3] use utils.Pointer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Nathan VÄ“rzemnieks --- pkg/opensearch/response_parser.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/opensearch/response_parser.go b/pkg/opensearch/response_parser.go index b9a28a64..7405c728 100644 --- a/pkg/opensearch/response_parser.go +++ b/pkg/opensearch/response_parser.go @@ -1016,8 +1016,7 @@ func (rp *responseParser) processAggregationDocs(esAgg *simplejson.Json, aggDef for _, e := range fields { for _, propKey := range propKeys { if e.Name == propKey { - value := props[propKey] - e.Append(&value) + e.Append(utils.Pointer(props[propKey])) } } if e.Name == aggDef.Field { From 1cce630872018c2d2f704cabe014e313a8c2f3f2 Mon Sep 17 00:00:00 2001 From: Kevin Yu Date: Thu, 23 May 2024 09:14:49 -0700 Subject: [PATCH 3/3] pr comment --- pkg/opensearch/response_parser.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/opensearch/response_parser.go b/pkg/opensearch/response_parser.go index 7405c728..f23c57f3 100644 --- a/pkg/opensearch/response_parser.go +++ b/pkg/opensearch/response_parser.go @@ -999,7 +999,7 @@ func (rp *responseParser) processAggregationDocs(esAgg *simplejson.Json, aggDef frames := data.Frames{} var fields []*data.Field - if queryResult.Frames != nil && len(queryResult.Frames) != 0 { + if len(queryResult.Frames) != 0 { for _, frame := range queryResult.Frames { fields = append(fields, frame.Fields...) }