From 24fcd47ea212f058ecf8227554c2d16a93f60a57 Mon Sep 17 00:00:00 2001 From: Shirley <4163034+fridgepoet@users.noreply.github.com> Date: Wed, 5 Jul 2023 15:48:44 +0200 Subject: [PATCH] Add raw_data query support to backend (#203) --- pkg/opensearch/client/search_request.go | 8 + pkg/opensearch/lucene_handler.go | 36 +- pkg/opensearch/response_parser.go | 215 +++++++++- pkg/opensearch/response_parser_test.go | 507 ++++++++++++++++++++++- pkg/opensearch/time_series_query_test.go | 29 ++ src/datasource.ts | 4 +- 6 files changed, 764 insertions(+), 35 deletions(-) diff --git a/pkg/opensearch/client/search_request.go b/pkg/opensearch/client/search_request.go index b2a424c8..0f670da0 100644 --- a/pkg/opensearch/client/search_request.go +++ b/pkg/opensearch/client/search_request.go @@ -101,6 +101,14 @@ func (b *SearchRequestBuilder) AddDocValueField(field string) *SearchRequestBuil return b } +// AddTimeFieldWithStandardizedFormat adds timeField as field with standardized time format to not receive +// invalid formats that Elasticsearch/OpenSearch can parse, but our frontend can't (e.g. yyyy_MM_dd_HH_mm_ss) +// https://opensearch.org/docs/latest/api-reference/search/#request-body +// https://opensearch.org/docs/latest/field-types/supported-field-types/date/#full-date-formats +func (b *SearchRequestBuilder) AddTimeFieldWithStandardizedFormat(timeField string) { + b.customProps["fields"] = []map[string]string{{"field": timeField, "format": "strict_date_optional_time_nanos"}} +} + // Query creates and return a query builder func (b *SearchRequestBuilder) Query() *QueryBuilder { if b.queryBuilder == nil { diff --git a/pkg/opensearch/lucene_handler.go b/pkg/opensearch/lucene_handler.go index d8e88d71..9687cf9f 100644 --- a/pkg/opensearch/lucene_handler.go +++ b/pkg/opensearch/lucene_handler.go @@ -1,6 +1,7 @@ package opensearch import ( + "fmt" "strconv" "time" @@ -50,16 +51,33 @@ func (h *luceneHandler) processQuery(q *Query) error { } if len(q.BucketAggs) == 0 { - if len(q.Metrics) == 0 || q.Metrics[0].Type != "raw_document" { - return nil + // If no aggregations, only document and logs queries are valid + if len(q.Metrics) == 0 || !(q.Metrics[0].Type == rawDataType || q.Metrics[0].Type == rawDocumentType) { + return fmt.Errorf("invalid query, missing metrics and aggregations") } - metric := q.Metrics[0] - b.Size(metric.Settings.Get("size").MustInt(500)) - b.SortDesc("@timestamp", "boolean") - b.AddDocValueField("@timestamp") - return nil } + switch { + case q.Metrics[0].Type == rawDataType: + processRawDataQuery(q, b, h.client.GetTimeField()) + default: + processTimeSeriesQuery(q, b, fromMs, toMs) + } + + return nil +} + +func processRawDataQuery(q *Query, b *es.SearchRequestBuilder, defaultTimeField string) { + metric := q.Metrics[0] + b.SortDesc(defaultTimeField, "boolean") + b.SortDesc("_doc", "") + b.AddTimeFieldWithStandardizedFormat(defaultTimeField) + b.Size(metric.Settings.Get("size").MustInt(500)) +} + +func processTimeSeriesQuery(q *Query, b *es.SearchRequestBuilder, fromMs int64, toMs int64) { + metric := q.Metrics[0] + b.Size(metric.Settings.Get("size").MustInt(500)) aggBuilder := b.Agg() // iterate backwards to create aggregations bottom-down @@ -143,8 +161,6 @@ func (h *luceneHandler) processQuery(q *Query) error { }) } } - - return nil } func getPipelineAggField(m *MetricAgg) string { @@ -177,7 +193,7 @@ func (h *luceneHandler) executeQueries() (*backend.QueryDataResponse, error) { } rp := newResponseParser(res.Responses, h.queries, res.DebugInfo) - return rp.getTimeSeries() + return rp.getTimeSeries(h.client.GetTimeField()) } func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo int64) es.AggBuilder { diff --git a/pkg/opensearch/response_parser.go b/pkg/opensearch/response_parser.go index bd09ea6e..cf866f5c 100644 --- a/pkg/opensearch/response_parser.go +++ b/pkg/opensearch/response_parser.go @@ -1,6 +1,7 @@ package opensearch import ( + "encoding/json" "errors" "regexp" "sort" @@ -26,6 +27,8 @@ const ( filtersType = "filters" termsType = "terms" geohashGridType = "geohash_grid" + rawDataType = "raw_data" + rawDocumentType = "raw_document" ) type responseParser struct { @@ -42,7 +45,7 @@ var newResponseParser = func(responses []*es.SearchResponse, targets []*Query, d } } -func (rp *responseParser) getTimeSeries() (*backend.QueryDataResponse, error) { +func (rp *responseParser) getTimeSeries(timeField string) (*backend.QueryDataResponse, error) { result := backend.NewQueryDataResponse() if rp.Responses == nil { @@ -74,19 +77,217 @@ func (rp *responseParser) getTimeSeries() (*backend.QueryDataResponse, error) { queryRes := backend.DataResponse{ Frames: data.Frames{}, } - props := make(map[string]string) - err := rp.processBuckets(res.Aggregations, target, &queryRes, props, 0) - if err != nil { - return nil, err + + switch { + case target.Metrics[0].Type == rawDataType: + queryRes = processRawDataResponse(res, timeField, queryRes) + default: + props := make(map[string]string) + err := rp.processBuckets(res.Aggregations, target, &queryRes, props, 0) + if err != nil { + return nil, err + } + rp.nameFields(&queryRes.Frames, target) + rp.trimDatapoints(&queryRes.Frames, target) } - rp.nameFields(&queryRes.Frames, target) - rp.trimDatapoints(&queryRes.Frames, target) result.Responses[target.RefID] = queryRes } return result, nil } +func processRawDataResponse(res *es.SearchResponse, timeField string, queryRes backend.DataResponse) backend.DataResponse { + propNames := make(map[string]bool) + docs := make([]map[string]interface{}, len(res.Hits.Hits)) + + for hitIdx, hit := range res.Hits.Hits { + var flattenedSource map[string]interface{} + if hit["_source"] != nil { + // On frontend maxDepth wasn't used but as we are processing on backend + // let's put a limit to avoid infinite loop. 10 was chosen arbitrarily. + flattenedSource = flatten(hit["_source"].(map[string]interface{}), 10) + } + + flattenedSource["_id"] = hit["_id"] + flattenedSource["_type"] = hit["_type"] + flattenedSource["_index"] = hit["_index"] + if timestamp, ok := getTimestamp(hit, flattenedSource, timeField); ok { + flattenedSource[timeField] = timestamp + } + + for key := range flattenedSource { + propNames[key] = true + } + + docs[hitIdx] = flattenedSource + } + fields := processDocsToDataFrameFields(docs, propNames) + + frames := data.Frames{} + frame := data.NewFrame("", fields...) + frames = append(frames, frame) + + queryRes.Frames = frames + return queryRes +} + +func getTimestamp(hit, source map[string]interface{}, timeField string) (*time.Time, bool) { + // "fields" is requested in the query with a specific format in AddTimeFieldWithStandardizedFormat + timeString, ok := lookForTimeFieldInFields(hit, timeField) + if !ok { + // When "fields" is absent, then getTimestamp tries to find a timestamp in _source + timeString, ok = lookForTimeFieldInSource(source, timeField) + if !ok { + // When both "fields" and "_source" timestamps are not present in the expected JSON structure, nil time.Time is returned + return nil, false + } + } + + timeValue, err := time.Parse(time.RFC3339Nano, timeString) + if err != nil { + // For an invalid format, nil time.Time is returned + return nil, false + } + + return &timeValue, true +} + +func lookForTimeFieldInFields(hit map[string]interface{}, timeField string) (string, bool) { + // "fields" should be present with an array of timestamps + if hit["fields"] != nil { + if fieldsMap, ok := hit["fields"].(map[string]interface{}); ok { + timesArray, ok := fieldsMap[timeField].([]interface{}) + if !ok { + return "", false + } + if len(timesArray) == 1 { + if timeString, ok := timesArray[0].(string); ok { + return timeString, true + } + } + } + } + return "", false +} + +func lookForTimeFieldInSource(source map[string]interface{}, timeField string) (string, bool) { + if source[timeField] != nil { + if timeString, ok := source[timeField].(string); ok { + return timeString, true + } + } + + return "", false +} + +func flatten(target map[string]interface{}, maxDepth int) map[string]interface{} { + // On frontend maxDepth wasn't used but as we are processing on backend + // let's put a limit to avoid infinite loop. 10 was chosen arbitrary. + output := make(map[string]interface{}) + step(0, maxDepth, target, "", output) + return output +} + +func step(currentDepth, maxDepth int, target map[string]interface{}, prev string, output map[string]interface{}) { + nextDepth := currentDepth + 1 + for key, value := range target { + newKey := strings.Trim(prev+"."+key, ".") + + v, ok := value.(map[string]interface{}) + if ok && len(v) > 0 && currentDepth < maxDepth { + step(nextDepth, maxDepth, v, newKey, output) + } else { + output[newKey] = value + } + } +} + +func processDocsToDataFrameFields(docs []map[string]interface{}, propNames map[string]bool) []*data.Field { + allFields := make([]*data.Field, 0, len(propNames)) + var timeDataField *data.Field + for propName := range propNames { + propNameValue := findTheFirstNonNilDocValueForPropName(docs, propName) + switch propNameValue.(type) { + // We are checking for default data types values (float64, int, bool, string) + // and default to json.RawMessage if we cannot find any of them + case *time.Time: + timeDataField = createTimeField(docs, propName) + case float64: + allFields = append(allFields, createFieldOfType[float64](docs, propName)) + case int: + allFields = append(allFields, createFieldOfType[int](docs, propName)) + case string: + allFields = append(allFields, createFieldOfType[string](docs, propName)) + case bool: + allFields = append(allFields, createFieldOfType[bool](docs, propName)) + default: + fieldVector := make([]*json.RawMessage, len(docs)) + for i, doc := range docs { + bytes, err := json.Marshal(doc[propName]) + if err != nil { + // We skip values that cannot be marshalled + continue + } + value := json.RawMessage(bytes) + fieldVector[i] = &value + } + field := data.NewField(propName, nil, fieldVector) + isFilterable := true + field.Config = &data.FieldConfig{Filterable: &isFilterable} + allFields = append(allFields, field) + } + } + + sort.Slice(allFields, func(i, j int) bool { + return allFields[i].Name < allFields[j].Name + }) + + if timeDataField != nil { + allFields = append([]*data.Field{timeDataField}, allFields...) + } + + return allFields +} + +func findTheFirstNonNilDocValueForPropName(docs []map[string]interface{}, propName string) interface{} { + for _, doc := range docs { + if doc[propName] != nil { + return doc[propName] + } + } + return docs[0][propName] +} + +func createTimeField(docs []map[string]interface{}, timeField string) *data.Field { + isFilterable := true + fieldVector := make([]*time.Time, len(docs)) + for i, doc := range docs { + value, ok := doc[timeField].(*time.Time) // cannot use generic function below because the type is already a pointer + if !ok { + continue + } + fieldVector[i] = value + } + field := data.NewField(timeField, nil, fieldVector) + field.Config = &data.FieldConfig{Filterable: &isFilterable} + return field +} + +func createFieldOfType[T int | float64 | bool | string](docs []map[string]interface{}, propName string) *data.Field { + isFilterable := true + fieldVector := make([]*T, len(docs)) + for i, doc := range docs { + value, ok := doc[propName].(T) + if !ok { + continue + } + fieldVector[i] = &value + } + field := data.NewField(propName, nil, fieldVector) + field.Config = &data.FieldConfig{Filterable: &isFilterable} + return field +} + func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Query, queryResult *backend.DataResponse, props map[string]string, depth int) error { var err error maxDepth := len(target.BucketAggs) - 1 diff --git a/pkg/opensearch/response_parser_test.go b/pkg/opensearch/response_parser_test.go index cad83a9e..5b830f73 100644 --- a/pkg/opensearch/response_parser_test.go +++ b/pkg/opensearch/response_parser_test.go @@ -45,7 +45,7 @@ func Test_ResponseParser_test(t *testing.T) { }` rp, err := newResponseParserForTest(targets, response) assert.Nil(t, err) - result, err := rp.getTimeSeries() + result, err := rp.getTimeSeries("@timestamp") assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -96,7 +96,7 @@ func Test_ResponseParser_test(t *testing.T) { }` rp, err := newResponseParserForTest(targets, response) assert.Nil(t, err) - result, err := rp.getTimeSeries() + result, err := rp.getTimeSeries("@timestamp") assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -158,7 +158,7 @@ func Test_ResponseParser_test(t *testing.T) { }` rp, err := newResponseParserForTest(targets, response) assert.Nil(t, err) - result, err := rp.getTimeSeries() + result, err := rp.getTimeSeries("@timestamp") assert.Nil(t, err) responseForA, ok := result.Responses["A"] require.True(t, ok) @@ -228,7 +228,7 @@ func Test_ResponseParser_test(t *testing.T) { }` rp, err := newResponseParserForTest(targets, response) assert.Nil(t, err) - result, err := rp.getTimeSeries() + result, err := rp.getTimeSeries("@timestamp") assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -301,7 +301,7 @@ func Test_ResponseParser_test(t *testing.T) { }` rp, err := newResponseParserForTest(targets, response) assert.Nil(t, err) - result, err := rp.getTimeSeries() + result, err := rp.getTimeSeries("@timestamp") assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -381,7 +381,7 @@ func Test_ResponseParser_test(t *testing.T) { }` rp, err := newResponseParserForTest(targets, response) assert.Nil(t, err) - result, err := rp.getTimeSeries() + result, err := rp.getTimeSeries("@timestamp") assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -466,7 +466,7 @@ func Test_ResponseParser_test(t *testing.T) { }` rp, err := newResponseParserForTest(targets, response) assert.Nil(t, err) - result, err := rp.getTimeSeries() + result, err := rp.getTimeSeries("@timestamp") assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -570,7 +570,7 @@ func Test_ResponseParser_test(t *testing.T) { }` rp, err := newResponseParserForTest(targets, response) assert.Nil(t, err) - result, err := rp.getTimeSeries() + result, err := rp.getTimeSeries("@timestamp") assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -630,7 +630,7 @@ func Test_ResponseParser_test(t *testing.T) { }` rp, err := newResponseParserForTest(targets, response) assert.Nil(t, err) - result, err := rp.getTimeSeries() + result, err := rp.getTimeSeries("@timestamp") assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -691,7 +691,7 @@ func Test_ResponseParser_test(t *testing.T) { }` rp, err := newResponseParserForTest(targets, response) assert.Nil(t, err) - result, err := rp.getTimeSeries() + result, err := rp.getTimeSeries("@timestamp") assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -764,7 +764,7 @@ func Test_ResponseParser_test(t *testing.T) { }` rp, err := newResponseParserForTest(targets, response) assert.Nil(t, err) - result, err := rp.getTimeSeries() + result, err := rp.getTimeSeries("@timestamp") assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -821,7 +821,7 @@ func Test_ResponseParser_test(t *testing.T) { }` rp, err := newResponseParserForTest(targets, response) assert.Nil(t, err) - result, err := rp.getTimeSeries() + result, err := rp.getTimeSeries("@timestamp") assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -876,7 +876,7 @@ func Test_ResponseParser_test(t *testing.T) { }` rp, err := newResponseParserForTest(targets, response) assert.Nil(t, err) - result, err := rp.getTimeSeries() + result, err := rp.getTimeSeries("@timestamp") assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -945,7 +945,7 @@ func Test_ResponseParser_test(t *testing.T) { }` rp, err := newResponseParserForTest(targets, response) assert.Nil(t, err) - result, err := rp.getTimeSeries() + result, err := rp.getTimeSeries("@timestamp") assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -1038,7 +1038,7 @@ func Test_ResponseParser_test(t *testing.T) { }` rp, err := newResponseParserForTest(targets, response) assert.Nil(t, err) - result, err := rp.getTimeSeries() + result, err := rp.getTimeSeries("@timestamp") assert.Nil(t, err) require.Len(t, result.Responses, 1) queryRes := result.Responses["A"] @@ -1128,6 +1128,378 @@ func Test_ResponseParser_test(t *testing.T) { //}) } +func Test_getTimestamp(t *testing.T) { + /* + First look for time in fields: + "hits": [ + { + "fields": { + "timestamp": [ + "2022-12-30T15:42:54.000Z" + ] + } + } + ] + + If not present, look for time in _source: + "hits": [ + { + "_source": { + "timestamp": "2022-12-30T15:42:54.000Z" + } + } + ] + */ + t.Run("When fields is present with array of times and source's time field is also present, then getTimestamp prefers fields", func(t *testing.T) { + hit := map[string]interface{}{"fields": map[string]interface{}{"@timestamp": []interface{}{"2018-08-18T08:08:08.765Z"}}} + source := map[string]interface{}{"@timestamp": "2020-01-01T10:10:10.765Z"} + + actual, ok := getTimestamp(hit, source, "@timestamp") + + require.NotNil(t, actual) + assert.True(t, ok) + assert.Equal(t, time.Date(2018, time.August, 18, 8, 8, 8, 765000000, time.UTC), *actual) + }) + + t.Run("When fields is absent and source's time field is present, then getTimestamp falls back to _source", func(t *testing.T) { + source := map[string]interface{}{"@timestamp": "2020-01-01T10:10:10.765Z"} + + actual, ok := getTimestamp(nil, source, "@timestamp") + + require.NotNil(t, actual) + assert.True(t, ok) + assert.Equal(t, time.Date(2020, time.January, 01, 10, 10, 10, 765000000, time.UTC), *actual) + }) + + t.Run("When fields has an unexpected layout and _source's time field is also present, then getTimestamp falls back to _source", func(t *testing.T) { + hit := map[string]interface{}{"fields": map[string]interface{}{"@timestamp": "2018-08-18T08:08:08.765Z"}} + source := map[string]interface{}{"@timestamp": "2020-01-01T10:10:10.765Z"} + + actual, ok := getTimestamp(hit, source, "@timestamp") + + require.NotNil(t, actual) + assert.True(t, ok) + assert.Equal(t, time.Date(2020, time.January, 01, 10, 10, 10, 765000000, time.UTC), *actual) + }) + + t.Run("When fields is absent and _source's time field has an unexpected format, then getTimestamp returns nil and false", func(t *testing.T) { + source := map[string]interface{}{"@timestamp": "unexpected format"} + + actual, ok := getTimestamp(nil, source, "@timestamp") + + assert.Nil(t, actual) + assert.False(t, ok) + }) + + t.Run("When fields is absent and _source's time field is absent, then getTimestamp returns nil and false", func(t *testing.T) { + actual, ok := getTimestamp(nil, nil, "@timestamp") + + assert.Nil(t, actual) + assert.False(t, ok) + }) +} + +func Test_ProcessRawDataResponse(t *testing.T) { + t.Run("ProcessRawDataResponse populates standard fields and gets other fields from _source, in alphabetical order, with time at the beginning", func(t *testing.T) { + targets := map[string]string{ + "A": `{ + "timeField": "@timestamp", + "metrics": [{"type": "raw_data"}] + }`, + } + response := `{ + "responses": [ + { + "hits": { + "total": { + "value": 109, + "relation": "eq" + }, + "max_score": null, + "hits": [ + { + "_index": "logs-2023.02.08", + "_id": "some id", + "_score": null, + "_source": { + "some other field": 15 + }, + "fields": { + "@timestamp": [ + "2022-12-30T15:42:54.000Z" + ] + }, + "sort": [ + 1675869055830, + 4 + ] + } + ] + }, + "status": 200 + } + ] + }` + + rp, err := newResponseParserForTest(targets, response) + assert.Nil(t, err) + result, err := rp.getTimeSeries("@timestamp") + require.NoError(t, err) + require.Len(t, result.Responses, 1) + + queryRes := result.Responses["A"] + require.NotNil(t, queryRes) + dataframes := queryRes.Frames + require.Len(t, dataframes, 1) + + frame := dataframes[0] + + assert.Equal(t, 5, len(frame.Fields)) + require.Equal(t, 1, frame.Fields[0].Len()) + assert.Equal(t, time.Date(2022, time.December, 30, 15, 42, 54, 0, time.UTC), *frame.Fields[0].At(0).(*time.Time)) + require.Equal(t, 1, frame.Fields[1].Len()) + assert.Equal(t, "_id", frame.Fields[1].Name) + assert.Equal(t, "some id", *frame.Fields[1].At(0).(*string)) + require.Equal(t, 1, frame.Fields[2].Len()) + assert.Equal(t, "_index", frame.Fields[2].Name) + assert.Equal(t, "logs-2023.02.08", *frame.Fields[2].At(0).(*string)) + require.Equal(t, 1, frame.Fields[3].Len()) + assert.Equal(t, "_type", frame.Fields[3].Name) + assert.Equal(t, json.RawMessage("null"), *frame.Fields[3].At(0).(*json.RawMessage)) + require.Equal(t, 1, frame.Fields[4].Len()) + assert.Equal(t, "some other field", frame.Fields[4].Name) + assert.Equal(t, float64(15), *frame.Fields[4].At(0).(*float64)) + }) + + t.Run("no time in _source or in fields does not create data frame field at the beginning with a nil time", func(t *testing.T) { + targets := map[string]string{ + "A": `{ + "timeField": "@timestamp", + "metrics": [{"type": "raw_data"}] + }`, + } + + response := `{ + "responses": [ + { + "hits": { + "hits": [ + { + "_index": "logs-2023.02.08", + "_id": "some id", + "_score": null, + "_source": {}, + "sort": [ + 1675869055830, + 4 + ] + } + ] + } + } + ] + }` + + rp, err := newResponseParserForTest(targets, response) + assert.Nil(t, err) + result, err := rp.getTimeSeries("@timestamp") + require.NoError(t, err) + require.Len(t, result.Responses, 1) + + queryRes := result.Responses["A"] + require.NotNil(t, queryRes) + dataframes := queryRes.Frames + require.Len(t, dataframes, 1) + + frame := dataframes[0] + require.Equal(t, 3, len(frame.Fields)) + assert.Equal(t, "_id", frame.Fields[0].Name) + assert.Equal(t, "_index", frame.Fields[1].Name) + assert.Equal(t, "_type", frame.Fields[2].Name) + }) + + t.Run("Simple raw data query", func(t *testing.T) { + targets := map[string]string{ + "A": `{ + "timeField": "@timestamp", + "metrics": [{"type": "raw_data"}] + }`, + } + + // cSpell:disable + response := `{ + "responses":[ + { + "hits":{ + "total":{ + "value":109, + "relation":"eq" + }, + "max_score":null, + "hits":[ + { + "_index":"logs-2023.02.08", + "_id":"some id", + "_score":null, + "_source":{ + "@timestamp":"2023-02-08T15:10:55.830Z", + "line":"log text [479231733]", + "counter":"109", + "float":58.253758485091, + "label":"val1", + "level":"info", + "location":"17.089705232090438, 41.62861966340297", + "nested":{ + "field":{ + "double_nested":"value" + } + }, + "shapes":[ + { + "type":"triangle" + }, + { + "type":"square" + } + ], + "xyz":null + }, + "fields": { + "@timestamp": [ + "2023-02-08T15:10:55.830Z" + ] + }, + "sort":[ + 1675869055830, + 4 + ] + }, + { + "_index":"logs-2023.02.08", + "_id":"Fx2UMYYBfCQ-FCMjZyJ_", + "_score":null, + "_source":{ + "@timestamp":"2023-02-08T15:10:54.835Z", + "line":"log text with ANSI \u001b[31mpart of the text\u001b[0m [493139080]", + "counter":"108", + "float":54.5977098233944, + "label":"val1", + "level":"info", + "location":"19.766305918490463, 40.42639175509792", + "nested":{ + "field":{ + "double_nested":"value" + } + }, + "shapes":[ + { + "type":"triangle" + }, + { + "type":"square" + } + ], + "xyz":"def" + }, + "fields": { + "@timestamp": [ + "2023-02-08T15:10:54.835Z" + ] + }, + "sort":[ + 1675869054835, + 7 + ] + } + ] + }, + "status":200 + } + ] + }` + // cSpell:enable + + rp, err := newResponseParserForTest(targets, response) + assert.Nil(t, err) + result, err := rp.getTimeSeries("@timestamp") + require.NoError(t, err) + require.Len(t, result.Responses, 1) + + queryRes := result.Responses["A"] + require.NotNil(t, queryRes) + dataframes := queryRes.Frames + require.Len(t, dataframes, 1) + frame := dataframes[0] + + assert.Equal(t, 13, len(frame.Fields)) + // Fields have the correct length + assert.Equal(t, 2, frame.Fields[0].Len()) + // First field is timeField + assert.Equal(t, data.FieldTypeNullableTime, frame.Fields[0].Type()) + // Correctly uses string types + assert.Equal(t, data.FieldTypeNullableString, frame.Fields[1].Type()) + // Correctly detects float64 types + assert.Equal(t, data.FieldTypeNullableFloat64, frame.Fields[5].Type()) + // Correctly detects json types + assert.Equal(t, data.FieldTypeNullableJSON, frame.Fields[11].Type()) + assert.Equal(t, "nested.field.double_nested", frame.Fields[10].Name) + assert.Equal(t, data.FieldTypeNullableString, frame.Fields[10].Type()) + // Correctly detects type even if first value is null + assert.Equal(t, data.FieldTypeNullableString, frame.Fields[12].Type()) + }) + + t.Run("Raw data query filterable fields", func(t *testing.T) { + targets := map[string]string{ + "A": `{ + "timeField": "@timestamp", + "metrics": [{ "type": "raw_data", "id": "1" }], + "bucketAggs": [] + }`, + } + + response := ` + { + "responses": [ + { + "hits": { + "total": { "relation": "eq", "value": 1 }, + "hits": [ + { + "_id": "1", + "_type": "_doc", + "_index": "index", + "_source": { "sourceProp": "asd" }, + "fields": { + "@timestamp": [ + "2023-02-08T15:10:54.835Z" + ] + } + } + ] + } + } + ] + } + ` + + rp, err := newResponseParserForTest(targets, response) + assert.Nil(t, err) + result, err := rp.getTimeSeries("@timestamp") + require.NoError(t, err) + + require.NotNil(t, result.Responses["A"]) + require.Len(t, result.Responses["A"].Frames, 1) + + for _, field := range result.Responses["A"].Frames[0].Fields { + trueValue := true + filterableConfig := data.FieldConfig{Filterable: &trueValue} + + // we need to test that the only changed setting is `filterable` + require.Equal(t, filterableConfig, *field.Config) + } + }) +} + func newResponseParserForTest(tsdbQueries map[string]string, responseBody string) (*responseParser, error) { from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC) to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC) @@ -1186,7 +1558,7 @@ func TestHistogramSimple(t *testing.T) { }` rp, err := newResponseParserForTest(query, response) assert.NoError(t, err) - result, err := rp.getTimeSeries() + result, err := rp.getTimeSeries("@timestamp") assert.NoError(t, err) require.Len(t, result.Responses, 1) @@ -1219,3 +1591,106 @@ func TestHistogramSimple(t *testing.T) { // we need to test that the fieldConfig is "empty" assert.Nil(t, frames.Fields[1].Config) } + +func Test_flatten(t *testing.T) { + t.Run("does not affect any non-nested JSON", func(t *testing.T) { + target := map[string]interface{}{ + "fieldName": "", + } + + assert.Equal(t, map[string]interface{}{ + "fieldName": "", + }, flatten(target, 10)) + }) + + t.Run("flattens up to maxDepth", func(t *testing.T) { + target := map[string]interface{}{ + "fieldName2": map[string]interface{}{ + "innerFieldName2": map[string]interface{}{ + "innerFieldName3": "", + }, + }, + } + + assert.Equal(t, map[string]interface{}{ + "fieldName2.innerFieldName2": map[string]interface{}{"innerFieldName3": ""}}, flatten(target, 1)) + }) + + t.Run("flattens up to maxDepth with multiple keys in target", func(t *testing.T) { + target := map[string]interface{}{ + "fieldName": map[string]interface{}{ + "innerFieldName": "", + }, + "fieldName2": map[string]interface{}{ + "innerFieldName2": map[string]interface{}{ + "innerFieldName3": "", + }, + }, + } + + assert.Equal(t, map[string]interface{}{"fieldName.innerFieldName": "", "fieldName2.innerFieldName2": map[string]interface{}{"innerFieldName3": ""}}, flatten(target, 1)) + }) + + t.Run("flattens multiple objects of the same max depth", func(t *testing.T) { + target := map[string]interface{}{ + "fieldName": map[string]interface{}{ + "innerFieldName": "", + }, + "fieldName2": map[string]interface{}{ + "innerFieldName2": "", + }, + } + + assert.Equal(t, map[string]interface{}{ + "fieldName.innerFieldName": "", + "fieldName2.innerFieldName2": ""}, flatten(target, 1)) + }) + + t.Run("only flattens multiple entries in the same key", func(t *testing.T) { + target := map[string]interface{}{ + "fieldName": map[string]interface{}{ + "innerFieldName": "", + "innerFieldName1": "", + }, + "fieldName2": map[string]interface{}{ + "innerFieldName2": map[string]interface{}{ + "innerFieldName3": "", + }, + }, + } + + assert.Equal(t, map[string]interface{}{ + "fieldName.innerFieldName": "", + "fieldName.innerFieldName1": "", + "fieldName2.innerFieldName2": map[string]interface{}{"innerFieldName3": ""}}, flatten(target, 1)) + }) + + t.Run("combines nested field names", func(t *testing.T) { + target := map[string]interface{}{ + "fieldName": map[string]interface{}{ + "innerFieldName": "", + }, + "fieldName2": map[string]interface{}{ + "innerFieldName2": "", + }, + } + + assert.Equal(t, map[string]interface{}{"fieldName.innerFieldName": "", "fieldName2.innerFieldName2": ""}, flatten(target, 10)) + }) + + t.Run("will preserve only one key with the same name", func(t *testing.T) { + // This test documents that in the unlikely case of a collision of a flattened name and an existing key, only + // one entry's value will be preserved at random + target := map[string]interface{}{ + "fieldName": map[string]interface{}{ + "innerFieldName": "one of these values will be lost", + }, + "fieldName.innerFieldName": "this may be lost", + } + + result := flatten(target, 10) + assert.Len(t, result, 1) + _, ok := result["fieldName.innerFieldName"] + assert.True(t, ok) + }) +} diff --git a/pkg/opensearch/time_series_query_test.go b/pkg/opensearch/time_series_query_test.go index b1678221..4eaab7da 100644 --- a/pkg/opensearch/time_series_query_test.go +++ b/pkg/opensearch/time_series_query_test.go @@ -10,8 +10,37 @@ import ( "github.com/grafana/opensearch-datasource/pkg/tsdb" . "github.com/smartystreets/goconvey/convey" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) +func Test_raw_data(t *testing.T) { + t.Run("With raw data metric query (from frontend tests)", func(t *testing.T) { + from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC) + to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC) + fromMs := from.UnixNano() / int64(time.Millisecond) + toMs := to.UnixNano() / int64(time.Millisecond) + c := newFakeClient(es.OpenSearch, "2.3.0") + + _, err := executeTsdbQuery(c, `{ + "timeField": "@timestamp", + "bucketAggs": [], + "metrics": [{ "id": "1", "type": "raw_data", "settings": {"size": 1337 } }] + }`, from, to, 15*time.Second) + require.NoError(t, err) + + sr := c.multisearchRequests[0].Requests[0] + rangeFilter := sr.Query.Bool.Filters[0].(*es.RangeFilter) + assert.Equal(t, "@timestamp", rangeFilter.Key) + assert.Equal(t, toMs, rangeFilter.Lte) + assert.Equal(t, fromMs, rangeFilter.Gte) + assert.Equal(t, es.DateFormatEpochMS, rangeFilter.Format) + + assert.Equal(t, 1337, sr.Size) + assert.Equal(t, map[string]string{"order": "desc", "unmapped_type": "boolean"}, sr.Sort["@timestamp"]) + assert.Equal(t, map[string]string{"order": "desc"}, sr.Sort["_doc"]) + }) +} + func TestExecuteTimeSeriesQuery(t *testing.T) { from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC) to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC) diff --git a/src/datasource.ts b/src/datasource.ts index f6f22ac7..e09c7dff 100644 --- a/src/datasource.ts +++ b/src/datasource.ts @@ -462,8 +462,8 @@ export class OpenSearchDatasource extends DataSourceWithBackend target.metrics?.every(metric => metric.type === 'raw_data'))) { + return super.query(request).pipe( tap({ next: response => { trackQuery(response, targets, request.app);