Skip to content

Commit

Permalink
Fix problem with partial predicate UUIDs in the volatile driver and m…
Browse files Browse the repository at this point in the history
…ake FILTER for latest return multiple triples if they share the same predicate and same latest anchor
  • Loading branch information
rogerlucena committed Sep 21, 2020
1 parent c9d5820 commit 229f88c
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 26 deletions.
46 changes: 29 additions & 17 deletions storage/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,18 @@ func (c *checker) CheckAndUpdate(p *predicate.Predicate) bool {
}

// latestFilter executes the latest filter operation over memoryTriples following filterOptions.
func latestFilter(memoryTriples map[string]*triple.Triple, filterOptions *storage.FilteringOptions) (map[string]*triple.Triple, error) {
func latestFilter(memoryTriples map[string]*triple.Triple, pQuery *predicate.Predicate, filterOptions *storage.FilteringOptions) (map[string]*triple.Triple, error) {
if filterOptions.Field != "predicate" && filterOptions.Field != "object" {
return nil, fmt.Errorf(`invalid field %q for "latest" filter operation, can accept only "predicate" or "object"`, filterOptions.Field)
}

lastTA := make(map[string]*time.Time)
trps := make(map[string]*triple.Triple)
trps := make(map[string]map[string]*triple.Triple)
for _, t := range memoryTriples {
if pQuery != nil && pQuery.String() != t.Predicate().String() {
continue
}

var p *predicate.Predicate
if filterOptions.Field == "predicate" {
p = t.Predicate()
Expand All @@ -301,19 +305,27 @@ func latestFilter(memoryTriples map[string]*triple.Triple, filterOptions *storag
return nil, err
}
if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 {
trps[ppUUID] = t
trps[ppUUID] = map[string]*triple.Triple{t.UUID().String(): t}
lastTA[ppUUID] = ta
} else if ta.Sub(*lta) == 0 {
trps[ppUUID][t.UUID().String()] = t
}
}

return trps, nil
trpsByUUID := make(map[string]*triple.Triple)
for _, m := range trps {
for tUUID, t := range m {
trpsByUUID[tUUID] = t
}
}
return trpsByUUID, nil
}

// executeFilter executes the proper filter operation over memoryTriples following the specifications given in filterOptions.
func executeFilter(memoryTriples map[string]*triple.Triple, filterOptions *storage.FilteringOptions) (map[string]*triple.Triple, error) {
func executeFilter(memoryTriples map[string]*triple.Triple, pQuery *predicate.Predicate, filterOptions *storage.FilteringOptions) (map[string]*triple.Triple, error) {
switch filterOptions.Operation {
case "latest":
return latestFilter(memoryTriples, filterOptions)
return latestFilter(memoryTriples, pQuery, filterOptions)
default:
return nil, fmt.Errorf("filter operation %q not supported", filterOptions.Operation)
}
Expand Down Expand Up @@ -346,7 +358,7 @@ func (m *memory) Objects(ctx context.Context, s *node.Node, p *predicate.Predica
}()
}
if lo.FilterOptions != nil {
trps, err := executeFilter(m.idxSP[spIdx], lo.FilterOptions)
trps, err := executeFilter(m.idxSP[spIdx], p, lo.FilterOptions)
if err != nil {
return err
}
Expand Down Expand Up @@ -394,7 +406,7 @@ func (m *memory) Subjects(ctx context.Context, p *predicate.Predicate, o *triple
}()
}
if lo.FilterOptions != nil {
trps, err := executeFilter(m.idxPO[poIdx], lo.FilterOptions)
trps, err := executeFilter(m.idxPO[poIdx], p, lo.FilterOptions)
if err != nil {
return err
}
Expand Down Expand Up @@ -442,7 +454,7 @@ func (m *memory) PredicatesForSubjectAndObject(ctx context.Context, s *node.Node
}()
}
if lo.FilterOptions != nil {
trps, err := executeFilter(m.idxSO[soIdx], lo.FilterOptions)
trps, err := executeFilter(m.idxSO[soIdx], nil, lo.FilterOptions)
if err != nil {
return err
}
Expand Down Expand Up @@ -488,7 +500,7 @@ func (m *memory) PredicatesForSubject(ctx context.Context, s *node.Node, lo *sto
}()
}
if lo.FilterOptions != nil {
trps, err := executeFilter(m.idxS[sUUID], lo.FilterOptions)
trps, err := executeFilter(m.idxS[sUUID], nil, lo.FilterOptions)
if err != nil {
return err
}
Expand Down Expand Up @@ -534,7 +546,7 @@ func (m *memory) PredicatesForObject(ctx context.Context, o *triple.Object, lo *
}()
}
if lo.FilterOptions != nil {
trps, err := executeFilter(m.idxO[oUUID], lo.FilterOptions)
trps, err := executeFilter(m.idxO[oUUID], nil, lo.FilterOptions)
if err != nil {
return err
}
Expand Down Expand Up @@ -580,7 +592,7 @@ func (m *memory) TriplesForSubject(ctx context.Context, s *node.Node, lo *storag
}()
}
if lo.FilterOptions != nil {
trps, err := executeFilter(m.idxS[sUUID], lo.FilterOptions)
trps, err := executeFilter(m.idxS[sUUID], nil, lo.FilterOptions)
if err != nil {
return err
}
Expand Down Expand Up @@ -626,7 +638,7 @@ func (m *memory) TriplesForPredicate(ctx context.Context, p *predicate.Predicate
}()
}
if lo.FilterOptions != nil {
trps, err := executeFilter(m.idxP[pUUID], lo.FilterOptions)
trps, err := executeFilter(m.idxP[pUUID], p, lo.FilterOptions)
if err != nil {
return err
}
Expand Down Expand Up @@ -672,7 +684,7 @@ func (m *memory) TriplesForObject(ctx context.Context, o *triple.Object, lo *sto
}()
}
if lo.FilterOptions != nil {
trps, err := executeFilter(m.idxO[oUUID], lo.FilterOptions)
trps, err := executeFilter(m.idxO[oUUID], nil, lo.FilterOptions)
if err != nil {
return err
}
Expand Down Expand Up @@ -720,7 +732,7 @@ func (m *memory) TriplesForSubjectAndPredicate(ctx context.Context, s *node.Node
}()
}
if lo.FilterOptions != nil {
trps, err := executeFilter(m.idxSP[spIdx], lo.FilterOptions)
trps, err := executeFilter(m.idxSP[spIdx], p, lo.FilterOptions)
if err != nil {
return err
}
Expand Down Expand Up @@ -768,7 +780,7 @@ func (m *memory) TriplesForPredicateAndObject(ctx context.Context, p *predicate.
}()
}
if lo.FilterOptions != nil {
trps, err := executeFilter(m.idxPO[poIdx], lo.FilterOptions)
trps, err := executeFilter(m.idxPO[poIdx], p, lo.FilterOptions)
if err != nil {
return err
}
Expand Down Expand Up @@ -822,7 +834,7 @@ func (m *memory) Triples(ctx context.Context, lo *storage.LookupOptions, trpls c
}()
}
if lo.FilterOptions != nil {
trps, err := executeFilter(m.idx, lo.FilterOptions)
trps, err := executeFilter(m.idx, nil, lo.FilterOptions)
if err != nil {
return err
}
Expand Down
18 changes: 9 additions & 9 deletions storage/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,12 +593,12 @@ func TestTriplesForPredicateLatestAnchor(t *testing.T) {
cnt := 0
for rts := range trpls {
cnt++
if !reflect.DeepEqual(rts.Predicate().UUID(), ts[len(ts)-1].Predicate().UUID()) {
t.Errorf("g.PredicatesForObject(%s) failed to return a valid predicate; returned %s instead", ts[0].Object(), rts.Predicate())
if !reflect.DeepEqual(rts.Predicate().UUID(), ts[0].Predicate().UUID()) {
t.Errorf("g.TriplesForPredicate(%s) = %s for LatestAnchor; want %s", ts[0].Predicate(), rts.Predicate(), ts[0].Predicate())
}
}
if cnt != 1 {
t.Errorf("g.triplesForPredicate(%s) failed to retrieve 3 predicates, got %d instead", ts[0].Predicate(), cnt)
t.Errorf("g.triplesForPredicate(%s) retrieved %d predicates; want 1", ts[0].Predicate(), cnt)
}
}

Expand Down Expand Up @@ -731,12 +731,12 @@ func TestTriplesForSubjectAndPredicateLatestAnchor(t *testing.T) {
cnt := 0
for rts := range trpls {
cnt++
if !reflect.DeepEqual(rts.Predicate().UUID(), ts[len(ts)-1].Predicate().UUID()) {
t.Errorf("g.PredicatesForObject(%s) failed to return a valid predicate; returned %s instead", ts[0].Object(), rts.Predicate())
if !reflect.DeepEqual(rts.Predicate().UUID(), ts[0].Predicate().UUID()) {
t.Errorf("g.TriplesForSubjectAndPredicate(%s, %s) = %s for LatestAnchor; want %s", ts[0].Subject(), ts[0].Predicate(), rts.Predicate(), ts[0].Predicate())
}
}
if cnt != 1 {
t.Errorf("g.TriplesForSubjectAndPredicate(%s, %s) failed to retrieve 3 predicates, got %d instead", ts[0].Subject(), ts[0].Predicate(), cnt)
t.Errorf("g.TriplesForSubjectAndPredicate(%s, %s) retrieved %d predicates; want 1", ts[0].Subject(), ts[0].Predicate(), cnt)
}
}

Expand Down Expand Up @@ -779,12 +779,12 @@ func TestTriplesForPredicateAndObjectLatestAnchor(t *testing.T) {
cnt := 0
for rts := range trpls {
cnt++
if !reflect.DeepEqual(rts.Predicate().UUID(), ts[len(ts)-1].Predicate().UUID()) {
t.Errorf("g.PredicatesForObject(%s) failed to return a valid predicate; returned %s instead", ts[0].Object(), rts.Predicate())
if !reflect.DeepEqual(rts.Predicate().UUID(), ts[0].Predicate().UUID()) {
t.Errorf("g.TriplesForPredicateAndObject(%s, %s) = %s for LatestAnchor; want %s", ts[0].Predicate(), ts[0].Object(), rts.Predicate(), ts[0].Predicate())
}
}
if cnt != 1 {
t.Errorf("g.TriplesForPredicateAndObject(%s, %s) failed to retrieve 1 predicates, got %d instead", ts[0].Predicate(), ts[0].Object(), cnt)
t.Errorf("g.TriplesForPredicateAndObject(%s, %s) retrieved %d predicates; want 1", ts[0].Predicate(), ts[0].Object(), cnt)
}
}

Expand Down

0 comments on commit 229f88c

Please sign in to comment.