Skip to content

Commit 2f9e802

Browse files
Elfo404ifrost
andauthored
Reintroduce Elasticsearch compatibility from OpenDistro datasource (#5)
* Allow flavor selection in datasource config * Add support for Elasticsearch to QueryBuilder * Show only supported metric aggregations bases on configured flavor * Restore maxConcurrentShardRequests settings * ES compatibility for getFields * ES compatibility for getTerms * fix watch issue * default to OpenSearch when flavor is not specified * reintroduce BE compatibility with ES * Restore compatibility for Elasticsearch in BE tests * Merge version & flavor in a single select * Fix configeditor tests * Fix provisioning example * Fix config editor test * Apply suggestions from code review Co-authored-by: Piotr Jamróz <[email protected]> Co-authored-by: Piotr Jamróz <[email protected]>
1 parent c6dd229 commit 2f9e802

32 files changed

+589
-214
lines changed

README.md

+7-4
Original file line numberDiff line numberDiff line change
@@ -208,14 +208,16 @@ Here are some provisioning examples for this data source.
208208
apiVersion: 1
209209

210210
datasources:
211-
- name: Elastic
212-
type: grafana-es-open-distro-datasource
211+
- name: OpenSearch
212+
type: grafana-opensearch-datasource
213213
access: proxy
214214
database: '[metrics-]YYYY.MM.DD'
215215
url: http://localhost:9200
216216
jsonData:
217217
interval: Daily
218218
timeField: '@timestamp'
219+
version: '1.0.0'
220+
flavor: 'opensearch'
219221
```
220222
221223
or, for logs:
@@ -225,14 +227,15 @@ apiVersion: 1
225227

226228
datasources:
227229
- name: elasticsearch-v7-filebeat
228-
type: grafana-es-open-distro-datasource
230+
type: grafana-opensearch-datasource
229231
access: proxy
230232
database: '[filebeat-]YYYY.MM.DD'
231233
url: http://localhost:9200
232234
jsonData:
233235
interval: Daily
234236
timeField: '@timestamp'
235-
version: '1.0.0'
237+
version: '7.0.0'
238+
flavor: 'elasticsearch'
236239
logMessageField: message
237240
logLevelField: fields.level
238241
```

package.json

-3
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,6 @@
1414
"author": "Grafana Labs",
1515
"license": "Apache-2.0",
1616
"dependencies": {
17-
"@emotion/core": "10.0.27",
18-
"@emotion/css": "11.1.3",
19-
"@emotion/react": "11.1.5",
2017
"@grafana/aws-sdk": "0.0.3",
2118
"@grafana/data": "7.5.7",
2219
"@grafana/runtime": "7.5.7",

pkg/opensearch/client/client.go

+37-5
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ func GetSigV4Config(ds *backend.DataSourceInstanceSettings) (*sigv4.Config, erro
9696
// Client represents a client which can interact with OpenSearch api
9797
type Client interface {
9898
GetVersion() *semver.Version
99+
GetFlavor() Flavor
99100
GetTimeField() string
100101
GetMinInterval(queryInterval string) (time.Duration, error)
101102
GetIndex() string
@@ -126,12 +127,14 @@ var NewClient = func(ctx context.Context, ds *backend.DataSourceInstanceSettings
126127

127128
version, err := extractVersion(jsonData.Get("version"))
128129
if err != nil {
129-
return nil, fmt.Errorf("opensearch version is required, err=%v", err)
130+
return nil, fmt.Errorf("version is required, err=%v", err)
130131
}
131132

133+
flavor := jsonData.Get("flavor").MustString(string(OpenSearch))
134+
132135
timeField, err := jsonData.Get("timeField").String()
133136
if err != nil {
134-
return nil, fmt.Errorf("opensearch time field name is required, err=%v", err)
137+
return nil, fmt.Errorf("time field name is required, err=%v", err)
135138
}
136139

137140
indexInterval := jsonData.Get("interval").MustString()
@@ -156,6 +159,7 @@ var NewClient = func(ctx context.Context, ds *backend.DataSourceInstanceSettings
156159
ctx: ctx,
157160
ds: ds,
158161
version: version,
162+
flavor: Flavor(flavor),
159163
timeField: timeField,
160164
indices: indices,
161165
index: index,
@@ -166,6 +170,7 @@ var NewClient = func(ctx context.Context, ds *backend.DataSourceInstanceSettings
166170
type baseClientImpl struct {
167171
ctx context.Context
168172
ds *backend.DataSourceInstanceSettings
173+
flavor Flavor
169174
version *semver.Version
170175
timeField string
171176
indices []string
@@ -174,6 +179,10 @@ type baseClientImpl struct {
174179
debugEnabled bool
175180
}
176181

182+
func (c *baseClientImpl) GetFlavor() Flavor {
183+
return c.flavor
184+
}
185+
177186
func (c *baseClientImpl) GetVersion() *semver.Version {
178187
return c.version
179188
}
@@ -384,19 +393,42 @@ func (c *baseClientImpl) createMultiSearchRequests(searchRequests []*SearchReque
384393
interval: searchReq.Interval,
385394
}
386395

396+
if c.flavor == Elasticsearch {
397+
if c.version.Major() < 5 {
398+
mr.header["search_type"] = "count"
399+
} else {
400+
allowedVersionRange, _ := semver.NewConstraint(">=5.6.0, <7.0.0")
401+
402+
if allowedVersionRange.Check(c.version) {
403+
maxConcurrentShardRequests := c.getSettings().Get("maxConcurrentShardRequests").MustInt(256)
404+
if maxConcurrentShardRequests == 0 {
405+
maxConcurrentShardRequests = 256
406+
}
407+
mr.header["max_concurrent_shard_requests"] = maxConcurrentShardRequests
408+
}
409+
}
410+
}
411+
387412
multiRequests = append(multiRequests, &mr)
388413
}
389414

390415
return multiRequests
391416
}
392417

393418
func (c *baseClientImpl) getMultiSearchQueryParameters() string {
394-
maxConcurrentShardRequests := c.getSettings().Get("maxConcurrentShardRequests").MustInt(5)
395-
return fmt.Sprintf("max_concurrent_shard_requests=%d", maxConcurrentShardRequests)
419+
if c.version.Major() >= 7 || c.flavor == OpenSearch {
420+
maxConcurrentShardRequests := c.getSettings().Get("maxConcurrentShardRequests").MustInt(5)
421+
if maxConcurrentShardRequests == 0 {
422+
maxConcurrentShardRequests = 5
423+
}
424+
return fmt.Sprintf("max_concurrent_shard_requests=%d", maxConcurrentShardRequests)
425+
}
426+
427+
return ""
396428
}
397429

398430
func (c *baseClientImpl) MultiSearch() *MultiSearchRequestBuilder {
399-
return NewMultiSearchRequestBuilder(c.GetVersion())
431+
return NewMultiSearchRequestBuilder(c.GetFlavor(), c.GetVersion())
400432
}
401433

402434
func (c *baseClientImpl) EnableDebug() {

pkg/opensearch/client/models.go

+7
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,13 @@ import (
99
"github.com/grafana/opensearch-datasource/pkg/tsdb"
1010
)
1111

12+
type Flavor string
13+
14+
const (
15+
Elasticsearch Flavor = "elasticsearch"
16+
OpenSearch Flavor = "opensearch"
17+
)
18+
1219
type response struct {
1320
httpResponse *http.Response
1421
reqInfo *SearchRequestInfo

pkg/opensearch/client/search_request.go

+16-5
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
// SearchRequestBuilder represents a builder which can build a search request
1111
type SearchRequestBuilder struct {
12+
flavor Flavor
1213
version *semver.Version
1314
interval tsdb.Interval
1415
index string
@@ -20,8 +21,9 @@ type SearchRequestBuilder struct {
2021
}
2122

2223
// NewSearchRequestBuilder create a new search request builder
23-
func NewSearchRequestBuilder(version *semver.Version, interval tsdb.Interval) *SearchRequestBuilder {
24+
func NewSearchRequestBuilder(flavor Flavor, version *semver.Version, interval tsdb.Interval) *SearchRequestBuilder {
2425
builder := &SearchRequestBuilder{
26+
flavor: flavor,
2527
version: version,
2628
interval: interval,
2729
sort: make(map[string]interface{}),
@@ -88,7 +90,13 @@ func (b *SearchRequestBuilder) SortDesc(field, unmappedType string) *SearchReque
8890
// AddDocValueField adds a doc value field to the search request
8991
func (b *SearchRequestBuilder) AddDocValueField(field string) *SearchRequestBuilder {
9092
b.customProps["script_fields"] = make(map[string]interface{})
91-
b.customProps["docvalue_fields"] = []string{field}
93+
94+
if b.version.Major() < 5 && b.flavor == Elasticsearch {
95+
b.customProps["fielddata_fields"] = []string{field}
96+
b.customProps["fields"] = []string{"*", "_source"}
97+
} else {
98+
b.customProps["docvalue_fields"] = []string{field}
99+
}
92100

93101
return b
94102
}
@@ -110,20 +118,22 @@ func (b *SearchRequestBuilder) Agg() AggBuilder {
110118

111119
// MultiSearchRequestBuilder represents a builder which can build a multi search request
112120
type MultiSearchRequestBuilder struct {
121+
flavor Flavor
113122
version *semver.Version
114123
requestBuilders []*SearchRequestBuilder
115124
}
116125

117126
// NewMultiSearchRequestBuilder creates a new multi search request builder
118-
func NewMultiSearchRequestBuilder(version *semver.Version) *MultiSearchRequestBuilder {
127+
func NewMultiSearchRequestBuilder(flavor Flavor, version *semver.Version) *MultiSearchRequestBuilder {
119128
return &MultiSearchRequestBuilder{
129+
flavor: flavor,
120130
version: version,
121131
}
122132
}
123133

124134
// Search initiates and returns a new search request builder
125135
func (m *MultiSearchRequestBuilder) Search(interval tsdb.Interval) *SearchRequestBuilder {
126-
b := NewSearchRequestBuilder(m.version, interval)
136+
b := NewSearchRequestBuilder(m.flavor, m.version, interval)
127137
m.requestBuilders = append(m.requestBuilders, b)
128138
return b
129139
}
@@ -266,6 +276,7 @@ type AggBuilder interface {
266276
type aggBuilderImpl struct {
267277
AggBuilder
268278
aggDefs []*aggDef
279+
flavor Flavor
269280
version *semver.Version
270281
}
271282

@@ -358,7 +369,7 @@ func (b *aggBuilderImpl) Terms(key, field string, fn func(a *TermsAggregation, b
358369
fn(innerAgg, builder)
359370
}
360371

361-
if len(innerAgg.Order) > 0 {
372+
if (b.version.Major() >= 6 || b.flavor == OpenSearch) && len(innerAgg.Order) > 0 {
362373
if orderBy, exists := innerAgg.Order[termsOrderTerm]; exists {
363374
innerAgg.Order["_key"] = orderBy
364375
delete(innerAgg.Order, termsOrderTerm)

pkg/opensearch/client/search_request_test.go

+58-3
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ func TestSearchRequest(t *testing.T) {
1616
Convey("Test OpenSearch search request", t, func() {
1717
timeField := "@timestamp"
1818
// TODO: Check this
19-
Convey("Given new search request builder for es version 5", func() {
19+
Convey("Given new search request builder for es OpenSearch 1.0.0", func() {
2020
version, _ := semver.NewVersion("1.0.0")
21-
b := NewSearchRequestBuilder(version, tsdb.Interval{Value: 15 * time.Second, Text: "15s"})
21+
b := NewSearchRequestBuilder(OpenSearch, version, tsdb.Interval{Value: 15 * time.Second, Text: "15s"})
2222

2323
Convey("When building search request", func() {
2424
sr, err := b.Build()
@@ -391,14 +391,69 @@ func TestSearchRequest(t *testing.T) {
391391
})
392392
})
393393
})
394+
395+
Convey("Given new search request builder for Elasticsearch 2.0.0", func() {
396+
version, _ := semver.NewVersion("2.0.0")
397+
398+
b := NewSearchRequestBuilder(Elasticsearch, version, tsdb.Interval{Value: 15 * time.Second, Text: "15s"})
399+
400+
Convey("When adding doc value field", func() {
401+
b.AddDocValueField(timeField)
402+
403+
Convey("should set correct props", func() {
404+
fields, ok := b.customProps["fields"].([]string)
405+
So(ok, ShouldBeTrue)
406+
So(fields, ShouldHaveLength, 2)
407+
So(fields[0], ShouldEqual, "*")
408+
So(fields[1], ShouldEqual, "_source")
409+
410+
scriptFields, ok := b.customProps["script_fields"].(map[string]interface{})
411+
So(ok, ShouldBeTrue)
412+
So(scriptFields, ShouldHaveLength, 0)
413+
414+
fieldDataFields, ok := b.customProps["fielddata_fields"].([]string)
415+
So(ok, ShouldBeTrue)
416+
So(fieldDataFields, ShouldHaveLength, 1)
417+
So(fieldDataFields[0], ShouldEqual, timeField)
418+
})
419+
420+
Convey("When building search request", func() {
421+
sr, err := b.Build()
422+
So(err, ShouldBeNil)
423+
424+
Convey("When marshal to JSON should generate correct json", func() {
425+
body, err := json.Marshal(sr)
426+
So(err, ShouldBeNil)
427+
json, err := simplejson.NewJson(body)
428+
So(err, ShouldBeNil)
429+
430+
scriptFields, err := json.Get("script_fields").Map()
431+
So(err, ShouldBeNil)
432+
So(scriptFields, ShouldHaveLength, 0)
433+
434+
fields, err := json.Get("fields").StringArray()
435+
So(err, ShouldBeNil)
436+
So(fields, ShouldHaveLength, 2)
437+
So(fields[0], ShouldEqual, "*")
438+
So(fields[1], ShouldEqual, "_source")
439+
440+
fieldDataFields, err := json.Get("fielddata_fields").StringArray()
441+
So(err, ShouldBeNil)
442+
So(fieldDataFields, ShouldHaveLength, 1)
443+
So(fieldDataFields[0], ShouldEqual, timeField)
444+
})
445+
})
446+
})
447+
})
448+
394449
})
395450
}
396451

397452
func TestMultiSearchRequest(t *testing.T) {
398453
Convey("Test OpenSearch multi search request", t, func() {
399454
Convey("Given new multi search request builder", func() {
400455
version, _ := semver.NewVersion("1.0.0")
401-
b := NewMultiSearchRequestBuilder(version)
456+
b := NewMultiSearchRequestBuilder(OpenSearch, version)
402457

403458
Convey("When adding one search request", func() {
404459
b.Search(tsdb.Interval{Value: 15 * time.Second, Text: "15s"})

0 commit comments

Comments
 (0)