diff --git a/pkg/opensearch/client/models.go b/pkg/opensearch/client/models.go index d61540a5..ad23f794 100644 --- a/pkg/opensearch/client/models.go +++ b/pkg/opensearch/client/models.go @@ -102,6 +102,7 @@ type Query struct { // BoolQuery represents a bool query type BoolQuery struct { Filters []Filter + Must []Must } // MarshalJSON returns the JSON encoding of the boolean query. @@ -115,12 +116,21 @@ func (q *BoolQuery) MarshalJSON() ([]byte, error) { root["filter"] = q.Filters } } + if len(q.Must) > 0 { + root["must"] = q.Must + } return json.Marshal(root) } // Filter represents a search filter type Filter interface{} +type Must interface{} + +type MustTerm struct { + Term map[string]string +} + // QueryStringFilter represents a query string search filter type QueryStringFilter struct { Filter @@ -128,6 +138,16 @@ type QueryStringFilter struct { AnalyzeWildcard bool } +func (m MustTerm) MarshalJSON() ([]byte, error) { + root := map[string]interface{}{} + if m.Term != nil && m.Term["TraceId"] != "" { + root["term"] = map[string]string{ + "traceId": m.Term["TraceId"], + } + } + return json.Marshal(root) +} + // MarshalJSON returns the JSON encoding of the query string filter. func (f *QueryStringFilter) MarshalJSON() ([]byte, error) { root := map[string]interface{}{ @@ -149,6 +169,13 @@ type RangeFilter struct { Format string } +type TraceRangeFilter struct { + StartTime struct { + Gte int64 + Lte int64 + } +} + // DateFormatEpochMS represents a date format of epoch milliseconds (epoch_millis) const DateFormatEpochMS = "epoch_millis" @@ -170,6 +197,19 @@ func (f *RangeFilter) MarshalJSON() ([]byte, error) { return json.Marshal(root) } +// MarshalJSON returns the JSON encoding of the query string filter. +func (f *TraceRangeFilter) MarshalJSON() ([]byte, error) { + root := map[string]map[string]map[string]interface{}{ + "range": { + "startTime": { + "lte": f.StartTime.Lte, + "gte": f.StartTime.Gte, + }, + }, + } + return json.Marshal(root) +} + // Aggregation represents an aggregation type Aggregation interface{} diff --git a/pkg/opensearch/client/search_request.go b/pkg/opensearch/client/search_request.go index 50f9f22d..99d183d4 100644 --- a/pkg/opensearch/client/search_request.go +++ b/pkg/opensearch/client/search_request.go @@ -203,6 +203,7 @@ func (b *QueryBuilder) Bool() *BoolQueryBuilder { // BoolQueryBuilder represents a bool query builder type BoolQueryBuilder struct { filterQueryBuilder *FilterQueryBuilder + mustQueryBuilder *MustQueryBuilder } // NewBoolQueryBuilder create a new bool query builder @@ -218,6 +219,14 @@ func (b *BoolQueryBuilder) Filter() *FilterQueryBuilder { return b.filterQueryBuilder } +// Filter creates and return a must query builder +func (b *BoolQueryBuilder) Must() *MustQueryBuilder { + if b.mustQueryBuilder == nil { + b.mustQueryBuilder = NewMustQueryBuilder() + } + return b.mustQueryBuilder +} + // Build builds and return a bool query builder func (b *BoolQueryBuilder) Build() (*BoolQuery, error) { boolQuery := BoolQuery{} @@ -230,6 +239,14 @@ func (b *BoolQueryBuilder) Build() (*BoolQuery, error) { boolQuery.Filters = filters } + if b.mustQueryBuilder != nil { + must, err := b.mustQueryBuilder.Build() + if err != nil { + return nil, err + } + boolQuery.Must = must + } + return &boolQuery, nil } @@ -237,19 +254,32 @@ func (b *BoolQueryBuilder) Build() (*BoolQuery, error) { type FilterQueryBuilder struct { filters []Filter } - +type MustQueryBuilder struct { + must []Must +} // NewFilterQueryBuilder creates a new filter query builder func NewFilterQueryBuilder() *FilterQueryBuilder { return &FilterQueryBuilder{ filters: make([]Filter, 0), } } +// NewMustQueryBuilder creates a new must bool query builder +func NewMustQueryBuilder() *MustQueryBuilder { + return &MustQueryBuilder{ + must: make([]Must, 0), + } +} // Build builds and return a filter query builder func (b *FilterQueryBuilder) Build() ([]Filter, error) { return b.filters, nil } +// Build builds and return a must query builder +func (b *MustQueryBuilder) Build() ([]Must, error) { + return b.must, nil +} + // AddDateRangeFilter adds a new time range filter func (b *FilterQueryBuilder) AddDateRangeFilter(timeField, format string, lte, gte int64) *FilterQueryBuilder { b.filters = append(b.filters, &RangeFilter{ @@ -261,6 +291,25 @@ func (b *FilterQueryBuilder) AddDateRangeFilter(timeField, format string, lte, g return b } +func (b *MustQueryBuilder) AddMustFilter(field string, matchTo string) *MustQueryBuilder{ + b.must = append(b.must, MustTerm{ + Term: map[string]string{ + field: matchTo, + }, + }) + return b +} + +func (b *MustQueryBuilder) AddStartTimeFilter(gte, lte int64) *MustQueryBuilder { + b.must = append(b.must, &TraceRangeFilter{ + StartTime: struct { + Gte int64 + Lte int64 + }{Lte: lte, Gte: gte}, + }) + return b +} + // AddQueryStringFilter adds a new query string filter func (b *FilterQueryBuilder) AddQueryStringFilter(querystring string, analyzeWildcard bool) *FilterQueryBuilder { if len(strings.TrimSpace(querystring)) == 0 { diff --git a/pkg/opensearch/lucene_handler.go b/pkg/opensearch/lucene_handler.go index 53d96293..ddda01b3 100644 --- a/pkg/opensearch/lucene_handler.go +++ b/pkg/opensearch/lucene_handler.go @@ -2,6 +2,7 @@ package opensearch import ( "fmt" + "regexp" "strconv" "time" @@ -44,31 +45,73 @@ func (h *luceneHandler) processQuery(q *Query) error { b := h.ms.Search(interval) b.Size(0) - filters := b.Query().Bool().Filter() - defaultTimeField := h.client.GetConfiguredFields().TimeField - filters.AddDateRangeFilter(defaultTimeField, es.DateFormatEpochMS, toMs, fromMs) - if q.RawQuery != "" { - filters.AddQueryStringFilter(q.RawQuery, true) - } + // TODO: merge into a switch condition and write processTracesQuery function + if q.LuceneQueryType == "Traces" { + traceId, err := getTraceId(q.RawQuery) + if err != nil { + return err + } + if traceId != "" { + // TODO: needs to be greater than 0 + b.Size(1000) + processTraceSpansQuery(b, traceId, fromMs, toMs) + } + processTraceListQuery(q, b, fromMs, toMs) + } else { + filters := b.Query().Bool().Filter() + defaultTimeField := h.client.GetConfiguredFields().TimeField + filters.AddDateRangeFilter(defaultTimeField, es.DateFormatEpochMS, toMs, fromMs) + + // I don't think we support any kind of additional filtering with traces apart from traceId? + if q.RawQuery != "" { + filters.AddQueryStringFilter(q.RawQuery, true) + } + + if len(q.BucketAggs) == 0 { + // If no aggregations, only document and logs queries are valid + if q.LuceneQueryType == "traces" && (len(q.Metrics) == 0 || !(q.Metrics[0].Type == rawDataType || q.Metrics[0].Type == rawDocumentType)) { + return fmt.Errorf("invalid query, missing metrics and aggregations") + } + } - if len(q.BucketAggs) == 0 { - // If no aggregations, only document and logs queries are valid - if len(q.Metrics) == 0 || !(q.Metrics[0].Type == rawDataType || q.Metrics[0].Type == rawDocumentType) { - return fmt.Errorf("invalid query, missing metrics and aggregations") + switch q.Metrics[0].Type { + case rawDocumentType, rawDataType: + processDocumentQuery(q, b, defaultTimeField) + case logsType: + processLogsQuery(q, b, fromMs, toMs, defaultTimeField) + default: + processTimeSeriesQuery(q, b, fromMs, toMs, defaultTimeField) } + } + return nil +} - switch q.Metrics[0].Type { - case rawDocumentType, rawDataType: - processDocumentQuery(q, b, defaultTimeField) - case logsType: - processLogsQuery(q, b, fromMs, toMs, defaultTimeField) - default: - processTimeSeriesQuery(q, b, fromMs, toMs, defaultTimeField) +func processTraceSpansQuery(b *es.SearchRequestBuilder, traceId string, fromMs int64, toMs int64) { + must := b.Query().Bool().Must() + must.AddMustFilter("TraceId", traceId) + must.AddStartTimeFilter(fromMs, toMs) +} + +func processTraceListQuery(q *Query, b *es.SearchRequestBuilder, from, to int64) { + // TODO: implement me +} + +// TODO: You can compile this one and chuck in a global variable, let's think about if we should check for the error and how/where +// TODO: support a space after the traceID +var traceIdRE = regexp.MustCompile(`traceId:(.+)`) + +func getTraceId(rawQuery string) (string, error) { + matches := traceIdRE.FindStringSubmatch(rawQuery) + + // TODO: what kind of flexibility can we have regarding typos? + // TODO: if we do indeed want an error, how do we make it appear to the user in a nice way? (wrapError) + if len(matches) != 2 { + return "", fmt.Errorf("trace ID not found in the input string") } - return nil + return matches[1], nil } func processLogsQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) { @@ -219,6 +262,10 @@ func processTimeSeriesQuery(q *Query, b *es.SearchRequestBuilder, fromMs int64, } } +func processTraces(q *Query, b *es.SearchRequestBuilder) { + +} + func getPipelineAggField(m *MetricAgg) string { // From https://github.com/grafana/grafana/pull/60337 // In frontend we are using Field as pipelineAggField diff --git a/pkg/opensearch/models.go b/pkg/opensearch/models.go index dbafc842..676caf46 100644 --- a/pkg/opensearch/models.go +++ b/pkg/opensearch/models.go @@ -14,6 +14,7 @@ type Query struct { Alias string `json:"alias"` Interval string RefID string + LuceneQueryType string `json:"luceneQueryType"` } // queryHandler is an interface for handling queries of the same type diff --git a/pkg/opensearch/response_parser.go b/pkg/opensearch/response_parser.go index 5c84b2df..9f33f7b2 100644 --- a/pkg/opensearch/response_parser.go +++ b/pkg/opensearch/response_parser.go @@ -3,6 +3,7 @@ package opensearch import ( "encoding/json" "errors" + "fmt" "regexp" "sort" "strconv" @@ -11,6 +12,7 @@ import ( simplejson "github.com/bitly/go-simplejson" "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/backend/log" "github.com/grafana/grafana-plugin-sdk-go/data" es "github.com/grafana/opensearch-datasource/pkg/opensearch/client" "github.com/grafana/opensearch-datasource/pkg/utils" @@ -85,28 +87,254 @@ func (rp *responseParser) getTimeSeries(configuredFields es.ConfiguredFields) (* Frames: data.Frames{}, } - switch target.Metrics[0].Type { - case rawDataType: - queryRes = processRawDataResponse(res, configuredFields, queryRes) - case rawDocumentType: - queryRes = processRawDocumentResponse(res, target.RefID, queryRes) - case logsType: - queryRes = processLogsResponse(res, target.Metrics[0].Settings.Get("limit").MustString(), configuredFields, queryRes) - default: - props := make(map[string]string) - err := rp.processBuckets(res.Aggregations, target, &queryRes, props, 0) - if err != nil { - return nil, err + // trace span condition + if target.LuceneQueryType == "Traces" { + queryRes = processTraceSpansResponse(res, queryRes) + } else { + // TODO: modify switch condition to lucene query type without Metrics (to do Traces) + switch target.Metrics[0].Type { + case rawDataType: + queryRes = processRawDataResponse(res, configuredFields, queryRes) + case rawDocumentType: + queryRes = processRawDocumentResponse(res, target.RefID, queryRes) + case logsType: + queryRes = processLogsResponse(res, target.Metrics[0].Settings.Get("limit").MustString(), configuredFields, queryRes) + default: + props := make(map[string]string) + err := rp.processBuckets(res.Aggregations, target, &queryRes, props, 0) + if err != nil { + return nil, err + } + rp.nameFields(&queryRes.Frames, target) + rp.trimDatapoints(&queryRes.Frames, target) } - rp.nameFields(&queryRes.Frames, target) - rp.trimDatapoints(&queryRes.Frames, target) - } + } result.Responses[target.RefID] = queryRes } return result, nil } +func timeStringToMilliseconds(dateString string) (int64, error) { + layout := "2006-01-02T15:04:05.000000Z" + + // Parse the date string into a time.Time value + t, err := time.Parse(layout, dateString) + if err != nil { + return 0, err + } + + milliseconds := t.UnixNano() / 1e6 + return milliseconds, nil +} +func spanHasError(spanEvents []interface{}) bool { + for _, event := range spanEvents { + if eventMap, ok := event.(map[string]interface{}); ok { + attributes := eventMap["attributes"] + if attributes, ok := attributes.(map[string]interface{}); ok { + if attributes["error"] != nil { + return true + } + } + + } else { + log.DefaultLogger.Debug("span event is not a map") + } + } + return false +} + +func flattenNestedFieldsToObj(field map[string]interface{}) map[string]interface{} { + // from "span.attributes.sampler@type": "test" + // to map[span:map[attributes:map[sampler@type:test]]] + result := make(map[string]interface{}) + + for key, value := range field { + keys := strings.Split(key, ".") + current := result + + for i := 0; i < len(keys)-1; i++ { + if _, exists := current[keys[i]]; !exists { + current[keys[i]] = make(map[string]interface{}) + } + current = current[keys[i]].(map[string]interface{}) + } + + current[keys[len(keys)-1]] = value + } + + return result +} + +func processTraceSpansResponse(res *es.SearchResponse, queryRes backend.DataResponse) backend.DataResponse { + propNames := make(map[string]bool) + docs := make([]map[string]interface{}, len(res.Hits.Hits)) + + for hitIdx, hit := range res.Hits.Hits { + var withKeysToObj map[string]interface{} + if hit["_source"] != nil { + flattened := flatten(hit["_source"].(map[string]interface{}), maxFlattenDepth) + // some k:v pairs come from OpenSearch in dot notation: 'span.attributes.http@status_code': 200, + // namely TraceSpanRow.Attributes and TraceSpanRow.Resource + // this turns everything into maps we can index and access + withKeysToObj = flattenNestedFieldsToObj(flattened) + } + + doc := map[string]interface{}{ + "_id": hit["_id"], + "_type": hit["_type"], + "_index": hit["_index"], + "_source": withKeysToObj, + } + + // used in a few places below, to determine if we need to add error flags + spanHasError := withKeysToObj["events"] != nil && spanHasError(withKeysToObj["events"].([]interface{})) + for k, v := range withKeysToObj { + // some field names TraceView viz needs do not correspond to what we get from OpenSearch, this remaps them + if k == "startTime" { + startTime, err := timeStringToMilliseconds(v.(string)) + if err != nil { + continue + } + doc[k] = startTime + continue + } + if k == "durationInNanos" { + value, isNumeric := v.(float64) // Check for float64 + if isNumeric { + // grafana needs time in milliseconds + duration := value * 0.000001 + doc["duration"] = duration + continue + } + } + if k == "parentSpanId" { + doc["parentSpanID"] = v + continue + } + if k == "traceId" { + doc["traceID"] = v + continue + } + if k == "spanId" { + doc["spanID"] = v + continue + } + if k == "name" { + doc["operationName"] = v + continue + } + if k == "resource" { + resourceAttributes := v.(map[string]interface{})["attributes"].(map[string]interface{}) + if resourceAttributes != nil { + transformedResourceAttributes := []map[string]interface{}{} + for k, v := range resourceAttributes { + transformedResourceAttributes = append(transformedResourceAttributes, map[string]interface{}{"key": k, "value": v}) + } + if transformedResourceAttributes != nil { + doc["serviceTags"] = transformedResourceAttributes + } + } + + continue + } + if k == "span" { + spanAttributes := v.(map[string]interface{})["attributes"].(map[string]interface{}) + if spanAttributes != nil { + transformedSpanAttributes := []map[string]interface{}{} + for k, v := range spanAttributes { + transformedSpanAttributes = append(transformedSpanAttributes, map[string]interface{}{"key": k, "value": v}) + } + if spanHasError { + transformedSpanAttributes = append(transformedSpanAttributes, map[string]interface{}{"key": "error", "value": true}) + + } + if transformedSpanAttributes != nil { + doc["tags"] = transformedSpanAttributes + } + } + + continue + } + if k == "events" { + spanEvents := []map[string]interface{}{} + stackTraces := []string{} + if len(v.([]interface{})) > 0 { + for _, event := range v.([]interface{}) { + eventObj := event.(map[string]interface{}) + var timestamp int64 = 0 + switch timeField := eventObj["time"].(type) { + case string: + if timeField != "" { + nanoseconds, err := timeStringToMilliseconds(timeField) + if err != nil { + //throw error? + continue + } + timestamp = nanoseconds + } + + case int64: + timestamp = timeField + default: + timestamp = 0 + } + spanEvents = append(spanEvents, map[string]interface{}{"timestamp": timestamp, "fields": []map[string]interface{}{{"key": "name", "value": eventObj["name"]}}}) + + // get stack traces if error event + attributes, ok := eventObj["attributes"].(map[string]interface{}) + if ok { + errorValue := attributes["error"] + if errorValue != nil { + stackTraces = append(stackTraces, fmt.Sprintf("%s: %s", eventObj["name"], attributes["error"])) + } else { + continue + } + } + } + if spanHasError { + doc["stackTraces"] = stackTraces + } + + doc["logs"] = spanEvents + continue + } + } + doc[k] = v + } + + if hit["fields"] != nil { + source, ok := hit["fields"].(map[string]interface{}) + if ok { + for k, v := range source { + + doc[k] = v + } + } + } + for key := range doc { + propNames[key] = true + } + + docs[hitIdx] = doc + } + + sortedPropNames := sortPropNames(propNames, []string{}) + fields := processDocsToDataFrameFields(docs, sortedPropNames) + + frame := data.NewFrame("", fields...) + if frame.Meta == nil { + frame.Meta = &data.FrameMeta{} + } + frame.Meta.PreferredVisualization = data.VisTypeTrace + + if frame.Meta.Custom == nil { + frame.Meta.Custom = map[string]interface{}{} + } + + queryRes.Frames = data.Frames{frame} + return queryRes +} func processLogsResponse(res *es.SearchResponse, limitString string, configuredFields es.ConfiguredFields, queryRes backend.DataResponse) backend.DataResponse { propNames := make(map[string]bool) docs := make([]map[string]interface{}, len(res.Hits.Hits)) @@ -353,6 +581,7 @@ func processDocsToDataFrameFields(docs []map[string]interface{}, propNames []str allFields := make([]*data.Field, 0, len(propNames)) for _, propName := range propNames { propNameValue := findTheFirstNonNilDocValueForPropName(docs, propName) + switch propNameValue.(type) { // We are checking for default data types values (time, float64, int, bool, string) // and default to json.RawMessage if we cannot find any of them diff --git a/pkg/opensearch/time_series_query.go b/pkg/opensearch/time_series_query.go index 10d8f5fd..b9a2f39f 100644 --- a/pkg/opensearch/time_series_query.go +++ b/pkg/opensearch/time_series_query.go @@ -80,6 +80,7 @@ func (p *timeSeriesQueryParser) parse(reqQueries []backend.DataQuery) ([]*Query, // please do not create a new field with that name, to avoid potential problems with old, persisted queries. rawQuery := model.Get("query").MustString() queryType := model.Get("queryType").MustString("lucene") + luceneQueryType := model.Get("luceneQueryType").MustString() if queryType != Lucene && queryType != PPL { return nil, fmt.Errorf("%w: %q", invalidQueryTypeError{refId: q.RefID}, queryType) @@ -96,13 +97,14 @@ func (p *timeSeriesQueryParser) parse(reqQueries []backend.DataQuery) ([]*Query, interval := strconv.FormatInt(q.Interval.Milliseconds(), 10) + "ms" queries = append(queries, &Query{ - RawQuery: rawQuery, - QueryType: queryType, - BucketAggs: bucketAggs, - Metrics: metrics, - Alias: alias, - Interval: interval, - RefID: q.RefID, + RawQuery: rawQuery, + QueryType: queryType, + BucketAggs: bucketAggs, + Metrics: metrics, + Alias: alias, + Interval: interval, + RefID: q.RefID, + LuceneQueryType: luceneQueryType, }) } diff --git a/src/datasource.test.ts b/src/datasource.test.ts index 69f34519..86c222b4 100644 --- a/src/datasource.test.ts +++ b/src/datasource.test.ts @@ -39,6 +39,9 @@ const OPENSEARCH_MOCK_URL = 'http://opensearch.local'; const backendSrv = { datasourceRequest: jest.fn(), }; +const mockedSuperQuery = jest + .spyOn(DataSourceWithBackend.prototype, 'query') + .mockImplementation((request: DataQueryRequest) => of()); jest.mock('./tracking.ts', () => ({ trackQuery: jest.fn(), @@ -1180,6 +1183,27 @@ describe('OpenSearchDatasource', function(this: any) { expect(mockedSuperQuery).toHaveBeenCalled(); }); + it('should send trace span queries', () => { + const rawDataQuery: OpenSearchQuery = { + refId: 'A', + luceneQueryType: LuceneQueryType.Traces, + query: 'traceId: test', + }; + const request: DataQueryRequest = { + requestId: '', + interval: '', + intervalMs: 1, + scopedVars: {}, + timezone: '', + app: CoreApp.Dashboard, + startTime: 0, + range: createTimeRange(toUtc([2015, 4, 30, 10]), toUtc([2015, 5, 1, 10])), + targets: [rawDataQuery], + }; + ctx.ds.query(request); + expect(mockedSuperQuery).toHaveBeenCalled(); + }); + it('should send logs queries in Explore', () => { const mockedSuperQuery = jest .spyOn(DataSourceWithBackend.prototype, 'query') diff --git a/src/datasource.ts b/src/datasource.ts index 2ee8aea6..fb6c825f 100644 --- a/src/datasource.ts +++ b/src/datasource.ts @@ -540,6 +540,7 @@ export class OpenSearchDatasource extends DataSourceWithBackend metric.type === 'raw_data' || metric.type === 'raw_document' || + target.luceneQueryType === LuceneQueryType.Traces || (metric.type === 'logs' && request.app === CoreApp.Explore) ) ) diff --git a/src/traces/formatTraces.ts b/src/traces/formatTraces.ts index 102026d7..88b372c1 100644 --- a/src/traces/formatTraces.ts +++ b/src/traces/formatTraces.ts @@ -188,8 +188,7 @@ function getStackTraces(events: OpenSearchSpanEvent[]): string[] | undefined { const stackTraces = events .filter(event => event.attributes.error) .map(event => `${event.name}: ${event.attributes.error}`); - // if we return an empty array, Trace panel plugin shows "0" - return stackTraces.length > 0 ? stackTraces : undefined; + return stackTraces; } function convertToKeyValue(tags: Record): TraceKeyValuePair[] {