Skip to content

Commit

Permalink
Add raw_data query support to backend (#203)
Browse files Browse the repository at this point in the history
  • Loading branch information
fridgepoet authored Jul 5, 2023
1 parent 34a4fdd commit 24fcd47
Show file tree
Hide file tree
Showing 6 changed files with 764 additions and 35 deletions.
8 changes: 8 additions & 0 deletions pkg/opensearch/client/search_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ func (b *SearchRequestBuilder) AddDocValueField(field string) *SearchRequestBuil
return b
}

// AddTimeFieldWithStandardizedFormat adds timeField as field with standardized time format to not receive
// invalid formats that Elasticsearch/OpenSearch can parse, but our frontend can't (e.g. yyyy_MM_dd_HH_mm_ss)
// https://opensearch.org/docs/latest/api-reference/search/#request-body
// https://opensearch.org/docs/latest/field-types/supported-field-types/date/#full-date-formats
func (b *SearchRequestBuilder) AddTimeFieldWithStandardizedFormat(timeField string) {
b.customProps["fields"] = []map[string]string{{"field": timeField, "format": "strict_date_optional_time_nanos"}}
}

// Query creates and return a query builder
func (b *SearchRequestBuilder) Query() *QueryBuilder {
if b.queryBuilder == nil {
Expand Down
36 changes: 26 additions & 10 deletions pkg/opensearch/lucene_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package opensearch

import (
"fmt"
"strconv"
"time"

Expand Down Expand Up @@ -50,16 +51,33 @@ func (h *luceneHandler) processQuery(q *Query) error {
}

if len(q.BucketAggs) == 0 {
if len(q.Metrics) == 0 || q.Metrics[0].Type != "raw_document" {
return nil
// If no aggregations, only document and logs queries are valid
if len(q.Metrics) == 0 || !(q.Metrics[0].Type == rawDataType || q.Metrics[0].Type == rawDocumentType) {
return fmt.Errorf("invalid query, missing metrics and aggregations")
}
metric := q.Metrics[0]
b.Size(metric.Settings.Get("size").MustInt(500))
b.SortDesc("@timestamp", "boolean")
b.AddDocValueField("@timestamp")
return nil
}

switch {
case q.Metrics[0].Type == rawDataType:
processRawDataQuery(q, b, h.client.GetTimeField())
default:
processTimeSeriesQuery(q, b, fromMs, toMs)
}

return nil
}

func processRawDataQuery(q *Query, b *es.SearchRequestBuilder, defaultTimeField string) {
metric := q.Metrics[0]
b.SortDesc(defaultTimeField, "boolean")
b.SortDesc("_doc", "")
b.AddTimeFieldWithStandardizedFormat(defaultTimeField)
b.Size(metric.Settings.Get("size").MustInt(500))
}

func processTimeSeriesQuery(q *Query, b *es.SearchRequestBuilder, fromMs int64, toMs int64) {
metric := q.Metrics[0]
b.Size(metric.Settings.Get("size").MustInt(500))
aggBuilder := b.Agg()

// iterate backwards to create aggregations bottom-down
Expand Down Expand Up @@ -143,8 +161,6 @@ func (h *luceneHandler) processQuery(q *Query) error {
})
}
}

return nil
}

func getPipelineAggField(m *MetricAgg) string {
Expand Down Expand Up @@ -177,7 +193,7 @@ func (h *luceneHandler) executeQueries() (*backend.QueryDataResponse, error) {
}

rp := newResponseParser(res.Responses, h.queries, res.DebugInfo)
return rp.getTimeSeries()
return rp.getTimeSeries(h.client.GetTimeField())
}

func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo int64) es.AggBuilder {
Expand Down
215 changes: 208 additions & 7 deletions pkg/opensearch/response_parser.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package opensearch

import (
"encoding/json"
"errors"
"regexp"
"sort"
Expand All @@ -26,6 +27,8 @@ const (
filtersType = "filters"
termsType = "terms"
geohashGridType = "geohash_grid"
rawDataType = "raw_data"
rawDocumentType = "raw_document"
)

type responseParser struct {
Expand All @@ -42,7 +45,7 @@ var newResponseParser = func(responses []*es.SearchResponse, targets []*Query, d
}
}

func (rp *responseParser) getTimeSeries() (*backend.QueryDataResponse, error) {
func (rp *responseParser) getTimeSeries(timeField string) (*backend.QueryDataResponse, error) {
result := backend.NewQueryDataResponse()

if rp.Responses == nil {
Expand Down Expand Up @@ -74,19 +77,217 @@ func (rp *responseParser) getTimeSeries() (*backend.QueryDataResponse, error) {
queryRes := backend.DataResponse{
Frames: data.Frames{},
}
props := make(map[string]string)
err := rp.processBuckets(res.Aggregations, target, &queryRes, props, 0)
if err != nil {
return nil, err

switch {
case target.Metrics[0].Type == rawDataType:
queryRes = processRawDataResponse(res, timeField, queryRes)
default:
props := make(map[string]string)
err := rp.processBuckets(res.Aggregations, target, &queryRes, props, 0)
if err != nil {
return nil, err
}
rp.nameFields(&queryRes.Frames, target)
rp.trimDatapoints(&queryRes.Frames, target)
}
rp.nameFields(&queryRes.Frames, target)
rp.trimDatapoints(&queryRes.Frames, target)

result.Responses[target.RefID] = queryRes
}
return result, nil
}

func processRawDataResponse(res *es.SearchResponse, timeField string, queryRes backend.DataResponse) backend.DataResponse {
propNames := make(map[string]bool)
docs := make([]map[string]interface{}, len(res.Hits.Hits))

for hitIdx, hit := range res.Hits.Hits {
var flattenedSource map[string]interface{}
if hit["_source"] != nil {
// On frontend maxDepth wasn't used but as we are processing on backend
// let's put a limit to avoid infinite loop. 10 was chosen arbitrarily.
flattenedSource = flatten(hit["_source"].(map[string]interface{}), 10)
}

flattenedSource["_id"] = hit["_id"]
flattenedSource["_type"] = hit["_type"]
flattenedSource["_index"] = hit["_index"]
if timestamp, ok := getTimestamp(hit, flattenedSource, timeField); ok {
flattenedSource[timeField] = timestamp
}

for key := range flattenedSource {
propNames[key] = true
}

docs[hitIdx] = flattenedSource
}
fields := processDocsToDataFrameFields(docs, propNames)

frames := data.Frames{}
frame := data.NewFrame("", fields...)
frames = append(frames, frame)

queryRes.Frames = frames
return queryRes
}

func getTimestamp(hit, source map[string]interface{}, timeField string) (*time.Time, bool) {
// "fields" is requested in the query with a specific format in AddTimeFieldWithStandardizedFormat
timeString, ok := lookForTimeFieldInFields(hit, timeField)
if !ok {
// When "fields" is absent, then getTimestamp tries to find a timestamp in _source
timeString, ok = lookForTimeFieldInSource(source, timeField)
if !ok {
// When both "fields" and "_source" timestamps are not present in the expected JSON structure, nil time.Time is returned
return nil, false
}
}

timeValue, err := time.Parse(time.RFC3339Nano, timeString)
if err != nil {
// For an invalid format, nil time.Time is returned
return nil, false
}

return &timeValue, true
}

func lookForTimeFieldInFields(hit map[string]interface{}, timeField string) (string, bool) {
// "fields" should be present with an array of timestamps
if hit["fields"] != nil {
if fieldsMap, ok := hit["fields"].(map[string]interface{}); ok {
timesArray, ok := fieldsMap[timeField].([]interface{})
if !ok {
return "", false
}
if len(timesArray) == 1 {
if timeString, ok := timesArray[0].(string); ok {
return timeString, true
}
}
}
}
return "", false
}

func lookForTimeFieldInSource(source map[string]interface{}, timeField string) (string, bool) {
if source[timeField] != nil {
if timeString, ok := source[timeField].(string); ok {
return timeString, true
}
}

return "", false
}

func flatten(target map[string]interface{}, maxDepth int) map[string]interface{} {
// On frontend maxDepth wasn't used but as we are processing on backend
// let's put a limit to avoid infinite loop. 10 was chosen arbitrary.
output := make(map[string]interface{})
step(0, maxDepth, target, "", output)
return output
}

func step(currentDepth, maxDepth int, target map[string]interface{}, prev string, output map[string]interface{}) {
nextDepth := currentDepth + 1
for key, value := range target {
newKey := strings.Trim(prev+"."+key, ".")

v, ok := value.(map[string]interface{})
if ok && len(v) > 0 && currentDepth < maxDepth {
step(nextDepth, maxDepth, v, newKey, output)
} else {
output[newKey] = value
}
}
}

func processDocsToDataFrameFields(docs []map[string]interface{}, propNames map[string]bool) []*data.Field {
allFields := make([]*data.Field, 0, len(propNames))
var timeDataField *data.Field
for propName := range propNames {
propNameValue := findTheFirstNonNilDocValueForPropName(docs, propName)
switch propNameValue.(type) {
// We are checking for default data types values (float64, int, bool, string)
// and default to json.RawMessage if we cannot find any of them
case *time.Time:
timeDataField = createTimeField(docs, propName)
case float64:
allFields = append(allFields, createFieldOfType[float64](docs, propName))
case int:
allFields = append(allFields, createFieldOfType[int](docs, propName))
case string:
allFields = append(allFields, createFieldOfType[string](docs, propName))
case bool:
allFields = append(allFields, createFieldOfType[bool](docs, propName))
default:
fieldVector := make([]*json.RawMessage, len(docs))
for i, doc := range docs {
bytes, err := json.Marshal(doc[propName])
if err != nil {
// We skip values that cannot be marshalled
continue
}
value := json.RawMessage(bytes)
fieldVector[i] = &value
}
field := data.NewField(propName, nil, fieldVector)
isFilterable := true
field.Config = &data.FieldConfig{Filterable: &isFilterable}
allFields = append(allFields, field)
}
}

sort.Slice(allFields, func(i, j int) bool {
return allFields[i].Name < allFields[j].Name
})

if timeDataField != nil {
allFields = append([]*data.Field{timeDataField}, allFields...)
}

return allFields
}

func findTheFirstNonNilDocValueForPropName(docs []map[string]interface{}, propName string) interface{} {
for _, doc := range docs {
if doc[propName] != nil {
return doc[propName]
}
}
return docs[0][propName]
}

func createTimeField(docs []map[string]interface{}, timeField string) *data.Field {
isFilterable := true
fieldVector := make([]*time.Time, len(docs))
for i, doc := range docs {
value, ok := doc[timeField].(*time.Time) // cannot use generic function below because the type is already a pointer
if !ok {
continue
}
fieldVector[i] = value
}
field := data.NewField(timeField, nil, fieldVector)
field.Config = &data.FieldConfig{Filterable: &isFilterable}
return field
}

func createFieldOfType[T int | float64 | bool | string](docs []map[string]interface{}, propName string) *data.Field {
isFilterable := true
fieldVector := make([]*T, len(docs))
for i, doc := range docs {
value, ok := doc[propName].(T)
if !ok {
continue
}
fieldVector[i] = &value
}
field := data.NewField(propName, nil, fieldVector)
field.Config = &data.FieldConfig{Filterable: &isFilterable}
return field
}

func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Query, queryResult *backend.DataResponse, props map[string]string, depth int) error {
var err error
maxDepth := len(target.BucketAggs) - 1
Expand Down
Loading

0 comments on commit 24fcd47

Please sign in to comment.