Skip to content

Commit

Permalink
review processing query
Browse files Browse the repository at this point in the history
  • Loading branch information
fridgepoet committed Sep 13, 2023
1 parent 1821e94 commit eae3f00
Show file tree
Hide file tree
Showing 9 changed files with 435 additions and 43 deletions.
40 changes: 40 additions & 0 deletions pkg/opensearch/client/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type Query struct {
// BoolQuery represents a bool query
type BoolQuery struct {
Filters []Filter
Must []Must
}

// MarshalJSON returns the JSON encoding of the boolean query.
Expand All @@ -115,19 +116,38 @@ func (q *BoolQuery) MarshalJSON() ([]byte, error) {
root["filter"] = q.Filters
}
}
if len(q.Must) > 0 {
root["must"] = q.Must
}
return json.Marshal(root)
}

// Filter represents a search filter
type Filter interface{}

type Must interface{}

type MustTerm struct {
Term map[string]string
}

// QueryStringFilter represents a query string search filter
type QueryStringFilter struct {
Filter
Query string
AnalyzeWildcard bool
}

func (m MustTerm) MarshalJSON() ([]byte, error) {
root := map[string]interface{}{}
if m.Term != nil && m.Term["TraceId"] != "" {
root["term"] = map[string]string{
"traceId": m.Term["TraceId"],
}
}
return json.Marshal(root)
}

// MarshalJSON returns the JSON encoding of the query string filter.
func (f *QueryStringFilter) MarshalJSON() ([]byte, error) {
root := map[string]interface{}{
Expand All @@ -149,6 +169,13 @@ type RangeFilter struct {
Format string
}

type TraceRangeFilter struct {
StartTime struct {
Gte int64
Lte int64
}
}

// DateFormatEpochMS represents a date format of epoch milliseconds (epoch_millis)
const DateFormatEpochMS = "epoch_millis"

Expand All @@ -170,6 +197,19 @@ func (f *RangeFilter) MarshalJSON() ([]byte, error) {
return json.Marshal(root)
}

// MarshalJSON returns the JSON encoding of the query string filter.
func (f *TraceRangeFilter) MarshalJSON() ([]byte, error) {
root := map[string]map[string]map[string]interface{}{
"range": {
"startTime": {
"lte": f.StartTime.Lte,
"gte": f.StartTime.Gte,
},
},
}
return json.Marshal(root)
}

// Aggregation represents an aggregation
type Aggregation interface{}

Expand Down
51 changes: 50 additions & 1 deletion pkg/opensearch/client/search_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func (b *QueryBuilder) Bool() *BoolQueryBuilder {
// BoolQueryBuilder represents a bool query builder
type BoolQueryBuilder struct {
filterQueryBuilder *FilterQueryBuilder
mustQueryBuilder *MustQueryBuilder
}

// NewBoolQueryBuilder create a new bool query builder
Expand All @@ -218,6 +219,14 @@ func (b *BoolQueryBuilder) Filter() *FilterQueryBuilder {
return b.filterQueryBuilder
}

// Filter creates and return a must query builder
func (b *BoolQueryBuilder) Must() *MustQueryBuilder {
if b.mustQueryBuilder == nil {
b.mustQueryBuilder = NewMustQueryBuilder()
}
return b.mustQueryBuilder
}

// Build builds and return a bool query builder
func (b *BoolQueryBuilder) Build() (*BoolQuery, error) {
boolQuery := BoolQuery{}
Expand All @@ -230,26 +239,47 @@ func (b *BoolQueryBuilder) Build() (*BoolQuery, error) {
boolQuery.Filters = filters
}

if b.mustQueryBuilder != nil {
must, err := b.mustQueryBuilder.Build()
if err != nil {
return nil, err
}
boolQuery.Must = must
}

return &boolQuery, nil
}

// FilterQueryBuilder represents a filter query builder
type FilterQueryBuilder struct {
filters []Filter
}

type MustQueryBuilder struct {
must []Must
}
// NewFilterQueryBuilder creates a new filter query builder
func NewFilterQueryBuilder() *FilterQueryBuilder {
return &FilterQueryBuilder{
filters: make([]Filter, 0),
}
}
// NewMustQueryBuilder creates a new must bool query builder
func NewMustQueryBuilder() *MustQueryBuilder {
return &MustQueryBuilder{
must: make([]Must, 0),
}
}

// Build builds and return a filter query builder
func (b *FilterQueryBuilder) Build() ([]Filter, error) {
return b.filters, nil
}

// Build builds and return a must query builder
func (b *MustQueryBuilder) Build() ([]Must, error) {
return b.must, nil
}

// AddDateRangeFilter adds a new time range filter
func (b *FilterQueryBuilder) AddDateRangeFilter(timeField, format string, lte, gte int64) *FilterQueryBuilder {
b.filters = append(b.filters, &RangeFilter{
Expand All @@ -261,6 +291,25 @@ func (b *FilterQueryBuilder) AddDateRangeFilter(timeField, format string, lte, g
return b
}

func (b *MustQueryBuilder) AddMustFilter(field string, matchTo string) *MustQueryBuilder{
b.must = append(b.must, MustTerm{
Term: map[string]string{
field: matchTo,
},
})
return b
}

func (b *MustQueryBuilder) AddStartTimeFilter(gte, lte int64) *MustQueryBuilder {
b.must = append(b.must, &TraceRangeFilter{
StartTime: struct {
Gte int64
Lte int64
}{Lte: lte, Gte: gte},
})
return b
}

// AddQueryStringFilter adds a new query string filter
func (b *FilterQueryBuilder) AddQueryStringFilter(querystring string, analyzeWildcard bool) *FilterQueryBuilder {
if len(strings.TrimSpace(querystring)) == 0 {
Expand Down
83 changes: 65 additions & 18 deletions pkg/opensearch/lucene_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package opensearch

import (
"fmt"
"regexp"
"strconv"
"time"

Expand Down Expand Up @@ -44,31 +45,73 @@ func (h *luceneHandler) processQuery(q *Query) error {

b := h.ms.Search(interval)
b.Size(0)
filters := b.Query().Bool().Filter()
defaultTimeField := h.client.GetConfiguredFields().TimeField
filters.AddDateRangeFilter(defaultTimeField, es.DateFormatEpochMS, toMs, fromMs)

if q.RawQuery != "" {
filters.AddQueryStringFilter(q.RawQuery, true)
}
// TODO: merge into a switch condition and write processTracesQuery function
if q.LuceneQueryType == "Traces" {
traceId, err := getTraceId(q.RawQuery)
if err != nil {
return err
}
if traceId != "" {
// TODO: needs to be greater than 0
b.Size(1000)
processTraceSpansQuery(b, traceId, fromMs, toMs)
}
processTraceListQuery(q, b, fromMs, toMs)
} else {
filters := b.Query().Bool().Filter()
defaultTimeField := h.client.GetConfiguredFields().TimeField
filters.AddDateRangeFilter(defaultTimeField, es.DateFormatEpochMS, toMs, fromMs)

// I don't think we support any kind of additional filtering with traces apart from traceId?
if q.RawQuery != "" {
filters.AddQueryStringFilter(q.RawQuery, true)
}

if len(q.BucketAggs) == 0 {
// If no aggregations, only document and logs queries are valid
if q.LuceneQueryType == "traces" && (len(q.Metrics) == 0 || !(q.Metrics[0].Type == rawDataType || q.Metrics[0].Type == rawDocumentType)) {
return fmt.Errorf("invalid query, missing metrics and aggregations")
}
}

if len(q.BucketAggs) == 0 {
// If no aggregations, only document and logs queries are valid
if len(q.Metrics) == 0 || !(q.Metrics[0].Type == rawDataType || q.Metrics[0].Type == rawDocumentType) {
return fmt.Errorf("invalid query, missing metrics and aggregations")
switch q.Metrics[0].Type {
case rawDocumentType, rawDataType:
processDocumentQuery(q, b, defaultTimeField)
case logsType:
processLogsQuery(q, b, fromMs, toMs, defaultTimeField)
default:
processTimeSeriesQuery(q, b, fromMs, toMs, defaultTimeField)
}

}
return nil
}

switch q.Metrics[0].Type {
case rawDocumentType, rawDataType:
processDocumentQuery(q, b, defaultTimeField)
case logsType:
processLogsQuery(q, b, fromMs, toMs, defaultTimeField)
default:
processTimeSeriesQuery(q, b, fromMs, toMs, defaultTimeField)
func processTraceSpansQuery(b *es.SearchRequestBuilder, traceId string, fromMs int64, toMs int64) {
must := b.Query().Bool().Must()
must.AddMustFilter("TraceId", traceId)
must.AddStartTimeFilter(fromMs, toMs)
}

func processTraceListQuery(q *Query, b *es.SearchRequestBuilder, from, to int64) {
// TODO: implement me
}

// TODO: You can compile this one and chuck in a global variable, let's think about if we should check for the error and how/where
// TODO: support a space after the traceID
var traceIdRE = regexp.MustCompile(`traceId:(.+)`)

func getTraceId(rawQuery string) (string, error) {
matches := traceIdRE.FindStringSubmatch(rawQuery)

// TODO: what kind of flexibility can we have regarding typos?
// TODO: if we do indeed want an error, how do we make it appear to the user in a nice way? (wrapError)
if len(matches) != 2 {
return "", fmt.Errorf("trace ID not found in the input string")
}

return nil
return matches[1], nil
}

func processLogsQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) {
Expand Down Expand Up @@ -219,6 +262,10 @@ func processTimeSeriesQuery(q *Query, b *es.SearchRequestBuilder, fromMs int64,
}
}

func processTraces(q *Query, b *es.SearchRequestBuilder) {

}

func getPipelineAggField(m *MetricAgg) string {
// From https://github.com/grafana/grafana/pull/60337
// In frontend we are using Field as pipelineAggField
Expand Down
1 change: 1 addition & 0 deletions pkg/opensearch/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Query struct {
Alias string `json:"alias"`
Interval string
RefID string
LuceneQueryType string `json:"luceneQueryType"`
}

// queryHandler is an interface for handling queries of the same type
Expand Down
Loading

0 comments on commit eae3f00

Please sign in to comment.