Skip to content

Commit

Permalink
Service Map: Display prefetch error to users (#387)
Browse files Browse the repository at this point in the history
  • Loading branch information
iwysiu authored May 14, 2024
1 parent 6e58218 commit 7b7ba33
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 18 deletions.
30 changes: 21 additions & 9 deletions pkg/opensearch/opensearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ func (ds *OpenSearchDatasource) QueryData(ctx context.Context, req *backend.Quer
return nil, err
}

err = handleServiceMapPrefetch(ctx, osClient, req)
errRefID, err := handleServiceMapPrefetch(ctx, osClient, req)
if err != nil {
return nil, err
return wrapServiceMapPrefetchError(errRefID, err)
}

query := newQueryRequest(osClient, req.Queries, req.PluginContext.DataSourceInstanceSettings, intervalCalculator)
Expand All @@ -79,21 +79,21 @@ func (ds *OpenSearchDatasource) QueryData(ctx context.Context, req *backend.Quer
// calls the Prefetch query to get the services and operations lists that are required for
// the associated Stats query. It then adds these parameters to the originating query so
// the Stats query can be created later.
func handleServiceMapPrefetch(ctx context.Context, osClient client.Client, req *backend.QueryDataRequest) error {
func handleServiceMapPrefetch(ctx context.Context, osClient client.Client, req *backend.QueryDataRequest) (string, error) {
for i, query := range req.Queries {
model, err := simplejson.NewJson(query.JSON)
if err != nil {
return err
return "", err
}
queryType := model.Get("queryType").MustString()
luceneQueryType := model.Get("luceneQueryType").MustString()
serviceMapRequested := model.Get("serviceMap").MustBool(false)
if queryType == Lucene && luceneQueryType == "Traces" && serviceMapRequested {
prefetchQuery := createServiceMapPrefetchQuery(query)
q := newQueryRequest(osClient, []backend.DataQuery{prefetchQuery}, req.PluginContext.DataSourceInstanceSettings, intervalCalculator)
response, err := wrapError(q.execute(ctx))
response, err := q.execute(ctx)
if err != nil {
return err
return query.RefID, err
}
services, operations := extractParametersFromServiceMapFrames(response)

Expand All @@ -104,13 +104,25 @@ func handleServiceMapPrefetch(ctx context.Context, osClient client.Client, req *
// An error here _should_ be impossible but since services and operations are coming from outside,
// handle it just in case
if err != nil {
return err
return query.RefID, err
}
req.Queries[i].JSON = newJson
return nil
return "", nil
}
}
return nil
return "", nil
}

func wrapServiceMapPrefetchError(refId string, err error) (*backend.QueryDataResponse, error) {
if refId != "" {
return &backend.QueryDataResponse{
Responses: map[string]backend.DataResponse{
refId: {
Error: fmt.Errorf(`Error fetching service map info: %w`, err),
}},
}, nil
}
return nil, err
}

func wrapError(response *backend.QueryDataResponse, err error) (*backend.QueryDataResponse, error) {
Expand Down
55 changes: 54 additions & 1 deletion pkg/opensearch/opensearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,28 @@ func Test_wrapError(t *testing.T) {
})
}

func Test_wrapServiceMapPrefetchError(t *testing.T) {
t.Run("wrapServiceMapPrefetchError returns a response if a refId is passed", func(t *testing.T) {
prefetchError := fmt.Errorf("Some prefetch error")
actualResponse, err := wrapServiceMapPrefetchError("some ref id", prefetchError)

assert.NoError(t, err)
assert.Equal(t, &backend.QueryDataResponse{
Responses: map[string]backend.DataResponse{
"some ref id": {
Error: fmt.Errorf(`Error fetching service map info: %w`, prefetchError)}},
}, actualResponse)
})

t.Run("wrapServiceMapPrefetchError passes the error if there is no refId", func(t *testing.T) {
prefetchError := fmt.Errorf("Some prefetch error")
_, err := wrapServiceMapPrefetchError("", prefetchError)

assert.Error(t, err)
assert.Equal(t, "Some prefetch error", err.Error())
})
}

func TestServiceMapPreFetch(t *testing.T) {
buckets := `{
"buckets": [
Expand All @@ -60,10 +82,17 @@ func TestServiceMapPreFetch(t *testing.T) {
"service_name": unmarshaledBuckets}},
}

errResponse := []*client.SearchResponse{
{Error: map[string]interface{}{
"reason": "foo",
}},
}

testCases := []struct {
name string
queries []tsdbQuery
response *client.MultiSearchResponse
expectedError error
shouldEditQuery bool
expectedQueryJson string
}{
Expand Down Expand Up @@ -100,6 +129,26 @@ func TestServiceMapPreFetch(t *testing.T) {
expectedQueryJson: `{"bucketAggs":[{"field":"@timestamp","id":"2","settings":{"interval":"auto"},"type":"date_histogram"}],"luceneQueryType":"Traces","metrics":[{"id":"1","type":"count"}],"operations":["op1","op2","op3"],"query":"traceId:000000000000000011faa8ff95fa3eb8","queryType":"lucene","serviceMap":true,"services":["service1","service2"],"timeField":"@timestamp"}`,
shouldEditQuery: true,
},
{
name: "Correctly fetch error",
queries: []tsdbQuery{{
refId: "A",
body: `{
"bucketAggs":[{ "field":"@timestamp", "id":"2", "settings":{"interval": "auto"}, "type": "date_histogram" }],
"luceneQueryType": "Traces",
"metrics": [{"id": "1", "type": "count" }],
"query": "traceId:000000000000000011faa8ff95fa3eb8",
"queryType": "lucene",
"timeField": "@timestamp",
"serviceMap": true
}`,
},
},
response: &client.MultiSearchResponse{
Responses: errResponse,
},
expectedError: fmt.Errorf("foo"),
},
}

for _, tc := range testCases {
Expand All @@ -109,7 +158,11 @@ func TestServiceMapPreFetch(t *testing.T) {
req := backend.QueryDataRequest{
Queries: createDataQueriesForTests(tc.queries),
}
err := handleServiceMapPrefetch(context.Background(), c, &req)
_, err := handleServiceMapPrefetch(context.Background(), c, &req)
if tc.expectedError != nil {
require.Equal(t, tc.expectedError, err)
return
}
require.NoError(t, err)

// handleServiceMapPrefetch may not put the operation in the same order every time
Expand Down
21 changes: 13 additions & 8 deletions pkg/opensearch/response_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ func (rp *responseParser) parseResponse() (*backend.QueryDataResponse, error) {
// grab the associated query
target := rp.Targets[i]

var queryType string
if target.luceneQueryType == luceneQueryTypeTraces {
queryType = luceneQueryTypeTraces
} else {
queryType = target.Metrics[0].Type
}

// if one of the responses is an error add debug info and error
// and keep trying to process other responses
if res.Error != nil {
Expand All @@ -88,8 +95,9 @@ func (rp *responseParser) parseResponse() (*backend.QueryDataResponse, error) {
debugInfo = utils.NewJsonFromAny(rp.DebugInfo)
}

err := getErrorFromOpenSearchResponse(res)
result.Responses[target.RefID] = backend.DataResponse{
Error: getErrorFromOpenSearchResponse(res),
Error: err,
Frames: []*data.Frame{
{
Meta: &data.FrameMeta{
Expand All @@ -98,16 +106,13 @@ func (rp *responseParser) parseResponse() (*backend.QueryDataResponse, error) {
},
},
}
// we want to return the error if we're prefetching service map
if queryType == luceneQueryTypeTraces && target.serviceMapInfo.Type == Prefetch {
return result, err
}
continue
}

var queryType string
if target.luceneQueryType == luceneQueryTypeTraces {
queryType = luceneQueryTypeTraces
} else {
queryType = target.Metrics[0].Type
}

switch queryType {
case rawDataType:
queryRes = processRawDataResponse(res, rp.ConfiguredFields, queryRes)
Expand Down

0 comments on commit 7b7ba33

Please sign in to comment.