Skip to content

Commit

Permalink
Merge pull request #150 from rogerlucena/filter-latest-planner-memory
Browse files Browse the repository at this point in the history
FILTER keyword for latest anchor queries (planner/memory)
  • Loading branch information
thiagovas authored Oct 16, 2020
2 parents b515c3e + ea887b7 commit 85e2195
Show file tree
Hide file tree
Showing 12 changed files with 1,368 additions and 234 deletions.
7 changes: 4 additions & 3 deletions bql/planner/data_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ var _ error = (*skippableError)(nil)
// provided graph clause.
func updateTimeBounds(lo *storage.LookupOptions, cls *semantic.GraphClause) *storage.LookupOptions {
nlo := &storage.LookupOptions{
MaxElements: lo.MaxElements,
LowerAnchor: lo.LowerAnchor,
UpperAnchor: lo.UpperAnchor,
MaxElements: lo.MaxElements,
LowerAnchor: lo.LowerAnchor,
UpperAnchor: lo.UpperAnchor,
FilterOptions: lo.FilterOptions,
}
if cls.PLowerBound != nil {
if lo.LowerAnchor == nil || (lo.LowerAnchor != nil && cls.PLowerBound.After(*lo.LowerAnchor)) {
Expand Down
88 changes: 88 additions & 0 deletions bql/planner/filter/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2020 Google Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package filter isolates core FILTER related implementation.
package filter

import (
"fmt"
)

// Operation represents a filter operation supported in BadWolf.
type Operation int

// List of supported filter operations.
const (
Latest Operation = iota + 1
)

// Field represents the position of the semantic.GraphClause that will be operated by the filter at storage level.
type Field int

// List of filter fields.
const (
SubjectField Field = iota + 1
PredicateField
ObjectField
)

// SupportedOperations maps suported filter operation strings to their correspondant Operation.
// Note that the string keys here must be in lowercase letters only (for compatibility with the WhereFilterClauseHook).
var SupportedOperations = map[string]Operation{
"latest": Latest,
}

// StorageOptions represent the storage level specifications for the filtering to be executed.
// Operation below refers to the filter function being applied (eg: Latest), Field refers to the position of the graph clause it
// will be applied to (subject, predicate, or object) and Value, when specified, contains the second argument of the filter
// function (not applicable for all Operations - some like Latest do not use it while others like GreaterThan do, see Issue 129).
type StorageOptions struct {
Operation Operation
Field Field
Value string
}

// String returns the string representation of Operation.
func (op Operation) String() string {
switch op {
case Latest:
return "latest"
default:
return fmt.Sprintf(`not defined filter operation "%d"`, op)
}
}

// IsEmpty returns true if the Operation was not set yet.
func (op Operation) IsEmpty() bool {
return op == Operation(0)
}

// String returns the string representation of Field.
func (f Field) String() string {
switch f {
case SubjectField:
return "subject field"
case PredicateField:
return "predicate field"
case ObjectField:
return "object field"
default:
return fmt.Sprintf(`not defined filter field "%d"`, f)
}
}

// String returns the string representation of StorageOptions.
func (so *StorageOptions) String() string {
return fmt.Sprintf("%+v", *so)
}
121 changes: 111 additions & 10 deletions bql/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sync"

"github.com/google/badwolf/bql/lexer"
"github.com/google/badwolf/bql/planner/filter"
"github.com/google/badwolf/bql/planner/tracer"
"github.com/google/badwolf/bql/semantic"
"github.com/google/badwolf/bql/table"
Expand Down Expand Up @@ -266,7 +267,8 @@ type queryPlan struct {
bndgs []string
grfsNames []string
grfs []storage.Graph
cls []*semantic.GraphClause
clauses []*semantic.GraphClause
filters []*semantic.FilterClause
tbl *table.Table
chanSize int
tracer io.Writer
Expand All @@ -292,7 +294,8 @@ func newQueryPlan(ctx context.Context, store storage.Store, stm *semantic.Statem
store: store,
bndgs: bs,
grfsNames: stm.InputGraphNames(),
cls: stm.GraphPatternClauses(),
clauses: stm.GraphPatternClauses(),
filters: stm.FilterClauses(),
tbl: t,
chanSize: chanSize,
tracer: w,
Expand Down Expand Up @@ -639,28 +642,119 @@ func (p *queryPlan) filterOnExistence(ctx context.Context, cls *semantic.GraphCl
return grp.Wait()
}

// organizeClausesByBinding takes the graph clauses received as input and organize them in a map
// on which the keys are the bindings of these clauses.
func organizeClausesByBinding(clauses []*semantic.GraphClause) map[string][]*semantic.GraphClause {
clausesByBinding := map[string][]*semantic.GraphClause{}
for _, cls := range clauses {
for b := range cls.BindingsMap() {
clausesByBinding[b] = append(clausesByBinding[b], cls)
}
}

return clausesByBinding
}

// compatibleBindingsInClauseForFilterOperation returns a function that, for each given clause, returns the bindings that are
// compatible with the specified filter operation.
func compatibleBindingsInClauseForFilterOperation(operation filter.Operation) (compatibleBindingsInClause func(cls *semantic.GraphClause) (bindingsByField map[filter.Field]map[string]bool), err error) {
switch operation {
case filter.Latest:
compatibleBindingsInClause = func(cls *semantic.GraphClause) (bindingsByField map[filter.Field]map[string]bool) {
bindingsByField = map[filter.Field]map[string]bool{
filter.PredicateField: {cls.PBinding: true, cls.PAlias: true},
filter.ObjectField: {cls.OBinding: true, cls.OAlias: true},
}
return bindingsByField
}
return compatibleBindingsInClause, nil
default:
return nil, fmt.Errorf("filter function %q has no bindings in clause specified for it (planner level)", operation)
}
}

// organizeFilterOptionsByClause processes all the given filters and organize them in a map that has as keys the
// clauses to which they must be applied.
func organizeFilterOptionsByClause(filters []*semantic.FilterClause, clauses []*semantic.GraphClause) (map[*semantic.GraphClause]*filter.StorageOptions, error) {
clausesByBinding := organizeClausesByBinding(clauses)
filterOptionsByClause := map[*semantic.GraphClause]*filter.StorageOptions{}

for _, f := range filters {
if _, ok := clausesByBinding[f.Binding]; !ok {
return nil, fmt.Errorf("binding %q referenced by filter clause %q does not exist in the graph pattern", f.Binding, f)
}

compatibleBindingsInClause, err := compatibleBindingsInClauseForFilterOperation(f.Operation)
if err != nil {
return nil, err
}
for _, cls := range clausesByBinding[f.Binding] {
if _, ok := filterOptionsByClause[cls]; ok {
return nil, fmt.Errorf("multiple filters for the same graph clause or same binding are not supported at the moment")
}

compatibleBindingsByField := compatibleBindingsInClause(cls)
filterBindingIsCompatible := false
for field, bndgs := range compatibleBindingsByField {
if bndgs[f.Binding] {
filterBindingIsCompatible = true
filterOptionsByClause[cls] = &filter.StorageOptions{
Operation: f.Operation,
Field: field,
Value: f.Value,
}
break
}
}
if !filterBindingIsCompatible {
return nil, fmt.Errorf("binding %q occupies a position in graph clause %q that is incompatible with filter function %q", f.Binding, cls, f.Operation)
}
}
}

return filterOptionsByClause, nil
}

// addFilterOptions adds FilterOptions to lookup options if the given clause has bindings for which
// filters were defined (organized in filterOptionsByClause).
func addFilterOptions(lo *storage.LookupOptions, cls *semantic.GraphClause, filterOptionsByClause map[*semantic.GraphClause]*filter.StorageOptions) {
if _, ok := filterOptionsByClause[cls]; ok {
lo.FilterOptions = filterOptionsByClause[cls]
}
}

// resetFilterOptions resets FilterOptions in lookup options to nil.
func resetFilterOptions(lo *storage.LookupOptions) {
lo.FilterOptions = (*filter.StorageOptions)(nil)
}

// processGraphPattern process the query graph pattern to retrieve the
// data from the specified graphs.
func (p *queryPlan) processGraphPattern(ctx context.Context, lo *storage.LookupOptions) error {
tracer.Trace(p.tracer, func() *tracer.Arguments {
var res []string
for i, cls := range p.cls {
for i, cls := range p.clauses {
res = append(res, fmt.Sprintf("Clause %d to process: %v", i, cls))
}
return &tracer.Arguments{
Msgs: res,
}
})
for i, c := range p.cls {
i, cls := i, *c

filterOptionsByClause, err := organizeFilterOptionsByClause(p.filters, p.clauses)
if err != nil {
return err
}
for i, cls := range p.clauses {
tracer.Trace(p.tracer, func() *tracer.Arguments {
return &tracer.Arguments{
Msgs: []string{fmt.Sprintf("Processing clause %d: %v", i, &cls)},
Msgs: []string{fmt.Sprintf("Processing clause %d: %v", i, cls)},
}
})
// The current planner is based on naively executing clauses by
// specificity.
unresolvable, err := p.processClause(ctx, &cls, lo)

addFilterOptions(lo, cls, filterOptionsByClause)
unresolvable, err := p.processClause(ctx, cls, lo)
resetFilterOptions(lo)
if err != nil {
return err
}
Expand All @@ -669,6 +763,7 @@ func (p *queryPlan) processGraphPattern(ctx context.Context, lo *storage.LookupO
return nil
}
}

return nil
}

Expand Down Expand Up @@ -876,11 +971,17 @@ func (p *queryPlan) String(ctx context.Context) string {
b.WriteString("using store(\"")
b.WriteString(p.store.Name(nil))
b.WriteString(fmt.Sprintf("\") graphs %v\nresolve\n", p.grfsNames))
for _, c := range p.cls {
for _, c := range p.clauses {
b.WriteString("\t")
b.WriteString(c.String())
b.WriteString("\n")
}
b.WriteString("with filters\n")
for _, f := range p.filters {
b.WriteString("\t")
b.WriteString(f.String())
b.WriteString("\n")
}
b.WriteString("project results using\n")
for _, p := range p.stm.Projection() {
b.WriteString("\t")
Expand Down
Loading

0 comments on commit 85e2195

Please sign in to comment.