Skip to content

Commit

Permalink
Backend: Use int64 type instead of string for from/to date times (#191)
Browse files Browse the repository at this point in the history
  • Loading branch information
fridgepoet authored Jun 8, 2023
1 parent b854ea8 commit 1c2819a
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 26 deletions.
8 changes: 4 additions & 4 deletions pkg/opensearch/client/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ func (f *QueryStringFilter) MarshalJSON() ([]byte, error) {
type RangeFilter struct {
Filter
Key string
Gte string
Lte string
Gte int64
Lte int64
Format string
}

Expand Down Expand Up @@ -270,8 +270,8 @@ type TermsAggregation struct {

// ExtendedBounds represents extended bounds
type ExtendedBounds struct {
Min string `json:"min"`
Max string `json:"max"`
Min int64 `json:"min"`
Max int64 `json:"max"`
}

// GeoHashGridAggregation represents a geo hash grid aggregation
Expand Down
2 changes: 1 addition & 1 deletion pkg/opensearch/client/search_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (b *FilterQueryBuilder) Build() ([]Filter, error) {
}

// AddDateRangeFilter adds a new time range filter
func (b *FilterQueryBuilder) AddDateRangeFilter(timeField, lte, gte, format string) *FilterQueryBuilder {
func (b *FilterQueryBuilder) AddDateRangeFilter(timeField, format string, lte, gte int64) *FilterQueryBuilder {
b.filters = append(b.filters, &RangeFilter{
Key: timeField,
Lte: lte,
Expand Down
10 changes: 5 additions & 5 deletions pkg/opensearch/client/search_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestSearchRequest(t *testing.T) {
b.Size(200)
b.SortDesc(timeField, "boolean")
filters := b.Query().Bool().Filter()
filters.AddDateRangeFilter(timeField, "$timeTo", "$timeFrom", DateFormatEpochMS)
filters.AddDateRangeFilter(timeField, DateFormatEpochMS, 10, 5)
filters.AddQueryStringFilter("test", true)

Convey("When building search request", func() {
Expand All @@ -69,8 +69,8 @@ func TestSearchRequest(t *testing.T) {
Convey("Should have range filter", func() {
f, ok := sr.Query.Bool.Filters[0].(*RangeFilter)
So(ok, ShouldBeTrue)
So(f.Gte, ShouldEqual, "$timeFrom")
So(f.Lte, ShouldEqual, "$timeTo")
So(f.Gte, ShouldEqual, 5)
So(f.Lte, ShouldEqual, 10)
So(f.Format, ShouldEqual, "epoch_millis")
})

Expand All @@ -93,8 +93,8 @@ func TestSearchRequest(t *testing.T) {
So(sort.Get("unmapped_type").MustString(), ShouldEqual, "boolean")

timeRangeFilter := json.GetPath("query", "bool", "filter").GetIndex(0).Get("range").Get(timeField)
So(timeRangeFilter.Get("gte").MustString(""), ShouldEqual, "$timeFrom")
So(timeRangeFilter.Get("lte").MustString(""), ShouldEqual, "$timeTo")
So(timeRangeFilter.Get("gte").MustInt64(), ShouldEqual, 5)
So(timeRangeFilter.Get("lte").MustInt64(), ShouldEqual, 10)
So(timeRangeFilter.Get("format").MustString(""), ShouldEqual, DateFormatEpochMS)

queryStringFilter := json.GetPath("query", "bool", "filter").GetIndex(1).Get("query_string")
Expand Down
9 changes: 3 additions & 6 deletions pkg/opensearch/lucene_handler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package opensearch

import (
"fmt"
"strconv"
"time"

Expand Down Expand Up @@ -32,8 +31,6 @@ var newLuceneHandler = func(client es.Client, req *backend.QueryDataRequest, int
func (h *luceneHandler) processQuery(q *Query) error {
fromMs := h.req.Queries[0].TimeRange.From.UnixNano() / int64(time.Millisecond)
toMs := h.req.Queries[0].TimeRange.To.UnixNano() / int64(time.Millisecond)
from := fmt.Sprintf("%d", fromMs)
to := fmt.Sprintf("%d", toMs)

minInterval, err := h.client.GetMinInterval(q.Interval)
if err != nil {
Expand All @@ -46,7 +43,7 @@ func (h *luceneHandler) processQuery(q *Query) error {
b := h.ms.Search(interval)
b.Size(0)
filters := b.Query().Bool().Filter()
filters.AddDateRangeFilter(h.client.GetTimeField(), to, from, es.DateFormatEpochMS)
filters.AddDateRangeFilter(h.client.GetTimeField(), es.DateFormatEpochMS, toMs, fromMs)

if q.RawQuery != "" {
filters.AddQueryStringFilter(q.RawQuery, true)
Expand All @@ -69,7 +66,7 @@ func (h *luceneHandler) processQuery(q *Query) error {
for _, bucketAgg := range q.BucketAggs {
switch bucketAgg.Type {
case dateHistType:
aggBuilder = addDateHistogramAgg(aggBuilder, bucketAgg, from, to)
aggBuilder = addDateHistogramAgg(aggBuilder, bucketAgg, fromMs, toMs)
case histogramType:
aggBuilder = addHistogramAgg(aggBuilder, bucketAgg)
case filtersType:
Expand Down Expand Up @@ -183,7 +180,7 @@ func (h *luceneHandler) executeQueries() (*backend.QueryDataResponse, error) {
return rp.getTimeSeries()
}

func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo string) es.AggBuilder {
func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo int64) es.AggBuilder {
aggBuilder.DateHistogram(bucketAgg.ID, bucketAgg.Field, func(a *es.DateHistogramAgg, b es.AggBuilder) {
a.Interval = bucketAgg.Settings.Get("interval").MustString("auto")
a.MinDocCount = bucketAgg.Settings.Get("min_doc_count").MustInt(0)
Expand Down
18 changes: 8 additions & 10 deletions pkg/opensearch/time_series_query_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package opensearch

import (
"fmt"
"testing"
"time"

Expand All @@ -16,9 +15,8 @@ import (
func TestExecuteTimeSeriesQuery(t *testing.T) {
from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC)
to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC)
fromStr := fmt.Sprintf("%d", from.UnixNano()/int64(time.Millisecond))
toStr := fmt.Sprintf("%d", to.UnixNano()/int64(time.Millisecond))

fromMs := from.UnixNano() / int64(time.Millisecond)
toMs := to.UnixNano() / int64(time.Millisecond)
Convey("Test execute time series query", t, func() {
Convey("With defaults on Elasticsearch 2.0.0", func() {
c := newFakeClient(es.Elasticsearch, "2.0.0")
Expand All @@ -31,14 +29,14 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
sr := c.multisearchRequests[0].Requests[0]
rangeFilter := sr.Query.Bool.Filters[0].(*es.RangeFilter)
So(rangeFilter.Key, ShouldEqual, c.timeField)
So(rangeFilter.Lte, ShouldEqual, toStr)
So(rangeFilter.Gte, ShouldEqual, fromStr)
So(rangeFilter.Lte, ShouldEqual, toMs)
So(rangeFilter.Gte, ShouldEqual, fromMs)
So(rangeFilter.Format, ShouldEqual, es.DateFormatEpochMS)
So(sr.Aggs[0].Key, ShouldEqual, "2")
dateHistogramAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg)
So(dateHistogramAgg.Field, ShouldEqual, "@timestamp")
So(dateHistogramAgg.ExtendedBounds.Min, ShouldEqual, fromStr)
So(dateHistogramAgg.ExtendedBounds.Max, ShouldEqual, toStr)
So(dateHistogramAgg.ExtendedBounds.Min, ShouldEqual, fromMs)
So(dateHistogramAgg.ExtendedBounds.Max, ShouldEqual, toMs)
})

Convey("With defaults on Elasticsearch 5.0.0", func() {
Expand All @@ -52,8 +50,8 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
sr := c.multisearchRequests[0].Requests[0]
So(sr.Query.Bool.Filters[0].(*es.RangeFilter).Key, ShouldEqual, c.timeField)
So(sr.Aggs[0].Key, ShouldEqual, "2")
So(sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg).ExtendedBounds.Min, ShouldEqual, fromStr)
So(sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg).ExtendedBounds.Max, ShouldEqual, toStr)
So(sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg).ExtendedBounds.Min, ShouldEqual, fromMs)
So(sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg).ExtendedBounds.Max, ShouldEqual, toMs)
})

Convey("With multiple bucket aggs", func() {
Expand Down

0 comments on commit 1c2819a

Please sign in to comment.