Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions document/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"reflect"

"github.com/blevesearch/bleve/v2/search"
"github.com/blevesearch/bleve/v2/size"
index "github.com/blevesearch/bleve_index_api"
)
Expand All @@ -30,8 +31,9 @@ func init() {
}

type Document struct {
id string `json:"id"`
Fields []Field `json:"fields"`
id string
Fields []Field `json:"fields"`
NestedDocuments []*Document `json:"nested_documents"`
CompositeFields []*CompositeField
StoredFieldsSize uint64
indexed bool
Expand Down Expand Up @@ -157,3 +159,35 @@ func (d *Document) SetIndexed() {
func (d *Document) Indexed() bool {
return d.indexed
}

func (d *Document) AddNestedDocument(doc *Document) {
d.NestedDocuments = append(d.NestedDocuments, doc)
}

func (d *Document) NestedFields() search.FieldSet {
if len(d.NestedDocuments) == 0 {
return nil
}

fieldSet := search.NewFieldSet()
var collectFields func(index.Document)
collectFields = func(doc index.Document) {
// Add all field names from this nested document
doc.VisitFields(func(field index.Field) {
fieldSet.Add(field.Name())
})
// Recursively collect from this document's nested documents
if nd, ok := doc.(index.NestedDocument); ok {
nd.VisitNestedDocuments(collectFields)
}
}
// Start collection from nested documents only (not root document)
d.VisitNestedDocuments(collectFields)
return fieldSet
}

func (d *Document) VisitNestedDocuments(visitor func(doc index.Document)) {
for _, doc := range d.NestedDocuments {
visitor(doc)
}
}
5 changes: 5 additions & 0 deletions index/scorch/introducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
newss.deleted = nil
}

// update the deleted bitmap to include any nested/sub-documents as well
// if the segment supports that
if ns, ok := newss.segment.(segment.NestedSegment); ok {
newss.deleted = ns.AddNestedDocuments(newss.deleted)
}
// check for live size before copying
if newss.LiveSize() > 0 {
newSnapshot.segment = append(newSnapshot.segment, newss)
Expand Down
6 changes: 6 additions & 0 deletions index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,12 @@ func analyze(d index.Document, fn customAnalyzerPluginInitFunc) {
}
}
})
if nd, ok := d.(index.NestedDocument); ok {
nd.VisitNestedDocuments(func(doc index.Document) {
doc.AddIDField()
analyze(doc, fn)
})
}
}

func (s *Scorch) AddEligibleForRemoval(epoch uint64) {
Expand Down
69 changes: 39 additions & 30 deletions index/scorch/snapshot_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package scorch
import (
"container/heap"
"context"
"encoding/binary"
"fmt"
"os"
"path/filepath"
Expand All @@ -42,9 +41,8 @@ type asynchSegmentResult struct {
dict segment.TermDictionary
dictItr segment.DictionaryIterator

cardinality int
index int
docs *roaring.Bitmap
index int
docs *roaring.Bitmap

thesItr segment.ThesaurusIterator

Expand All @@ -59,11 +57,11 @@ func init() {
var err error
lb1, err = lev.NewLevenshteinAutomatonBuilder(1, true)
if err != nil {
panic(fmt.Errorf("Levenshtein automaton ed1 builder err: %v", err))
panic(fmt.Errorf("levenshtein automaton ed1 builder err: %v", err))
}
lb2, err = lev.NewLevenshteinAutomatonBuilder(2, true)
if err != nil {
panic(fmt.Errorf("Levenshtein automaton ed2 builder err: %v", err))
panic(fmt.Errorf("levenshtein automaton ed2 builder err: %v", err))
}
}

Expand Down Expand Up @@ -471,7 +469,7 @@ func (is *IndexSnapshot) GetInternal(key []byte) ([]byte, error) {
func (is *IndexSnapshot) DocCount() (uint64, error) {
var rv uint64
for _, segment := range is.segment {
rv += segment.Count()
rv += segment.CountRoot()
}
return rv, nil
}
Expand All @@ -498,7 +496,7 @@ func (is *IndexSnapshot) Document(id string) (rv index.Document, err error) {
return nil, nil
}

docNum, err := docInternalToNumber(next.ID)
docNum, err := next.ID.Value()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -568,7 +566,7 @@ func (is *IndexSnapshot) segmentIndexAndLocalDocNumFromGlobal(docNum uint64) (in
}

func (is *IndexSnapshot) ExternalID(id index.IndexInternalID) (string, error) {
docNum, err := docInternalToNumber(id)
docNum, err := id.Value()
if err != nil {
return "", err
}
Expand All @@ -586,7 +584,7 @@ func (is *IndexSnapshot) ExternalID(id index.IndexInternalID) (string, error) {
}

func (is *IndexSnapshot) segmentIndexAndLocalDocNum(id index.IndexInternalID) (int, uint64, error) {
docNum, err := docInternalToNumber(id)
docNum, err := id.Value()
if err != nil {
return 0, 0, err
}
Expand Down Expand Up @@ -773,25 +771,6 @@ func (is *IndexSnapshot) recycleTermFieldReader(tfr *IndexSnapshotTermFieldReade
is.m2.Unlock()
}

func docNumberToBytes(buf []byte, in uint64) []byte {
if len(buf) != 8 {
if cap(buf) >= 8 {
buf = buf[0:8]
} else {
buf = make([]byte, 8)
}
}
binary.BigEndian.PutUint64(buf, in)
return buf
}

func docInternalToNumber(in index.IndexInternalID) (uint64, error) {
if len(in) != 8 {
return 0, fmt.Errorf("wrong len for IndexInternalID: %q", in)
}
return binary.BigEndian.Uint64(in), nil
}

func (is *IndexSnapshot) documentVisitFieldTermsOnSegment(
segmentIndex int, localDocNum uint64, fields []string, cFields []string,
visitor index.DocValueVisitor, dvs segment.DocVisitState) (
Expand Down Expand Up @@ -894,7 +873,7 @@ func (dvr *DocValueReader) BytesRead() uint64 {
func (dvr *DocValueReader) VisitDocValues(id index.IndexInternalID,
visitor index.DocValueVisitor,
) (err error) {
docNum, err := docInternalToNumber(id)
docNum, err := id.Value()
if err != nil {
return err
}
Expand Down Expand Up @@ -1234,3 +1213,33 @@ func (is *IndexSnapshot) MergeUpdateFieldsInfo(updatedFields map[string]*index.U
}
}
}

// Ancestors returns the ancestral chain for a given document ID in the index.
// For nested documents, this method retrieves all parent documents in the hierarchy
// leading up to the specified document ID. The returned slice contains:
// - [0]: The document itself (with global ID)
// - [1:]: All ancestor documents in the hierarchy (with global IDs)
//
// The method handles the translation from segment-local document numbers to
// global index internal IDs by applying the appropriate segment offset.
func (i *IndexSnapshot) Ancestors(ID index.IndexInternalID) ([]index.IndexInternalID, error) {
seg, ldoc, err := i.segmentIndexAndLocalDocNum(ID)
if err != nil {
return nil, err
}

ancestors := i.segment[seg].Ancestors(ldoc)

// allocate space: +1 for the doc itself
rv := make([]index.IndexInternalID, len(ancestors)+1)
globalOffset := i.offsets[seg]

// first element is the doc itself
rv[0] = index.NewIndexInternalID(nil, ldoc+globalOffset)

// then all ancestors shifted by +1
for j := 0; j < len(ancestors); j++ {
rv[j+1] = index.NewIndexInternalID(nil, ancestors[j]+globalOffset)
}
return rv, nil
}
5 changes: 2 additions & 3 deletions index/scorch/snapshot_index_doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package scorch

import (
"bytes"
"reflect"

"github.com/RoaringBitmap/roaring/v2"
Expand Down Expand Up @@ -49,7 +48,7 @@ func (i *IndexSnapshotDocIDReader) Next() (index.IndexInternalID, error) {
next := i.iterators[i.segmentOffset].Next()
// make segment number into global number by adding offset
globalOffset := i.snapshot.offsets[i.segmentOffset]
return docNumberToBytes(nil, uint64(next)+globalOffset), nil
return index.NewIndexInternalID(nil, uint64(next)+globalOffset), nil
}
return nil, nil
}
Expand All @@ -63,7 +62,7 @@ func (i *IndexSnapshotDocIDReader) Advance(ID index.IndexInternalID) (index.Inde
if next == nil {
return nil, nil
}
for bytes.Compare(next, ID) < 0 {
for next.Compare(ID) < 0 {
next, err = i.Next()
if err != nil {
return nil, err
Expand Down
9 changes: 4 additions & 5 deletions index/scorch/snapshot_index_tfr.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package scorch

import (
"bytes"
"context"
"fmt"
"reflect"
Expand Down Expand Up @@ -94,7 +93,7 @@ func (i *IndexSnapshotTermFieldReader) Next(preAlloced *index.TermFieldDoc) (*in
// make segment number into global number by adding offset
globalOffset := i.snapshot.offsets[i.segmentOffset]
nnum := next.Number()
rv.ID = docNumberToBytes(rv.ID, nnum+globalOffset)
rv.ID = index.NewIndexInternalID(rv.ID, nnum+globalOffset)
i.postingToTermFieldDoc(next, rv)

i.currID = rv.ID
Expand Down Expand Up @@ -146,7 +145,7 @@ func (i *IndexSnapshotTermFieldReader) postingToTermFieldDoc(next segment.Postin
func (i *IndexSnapshotTermFieldReader) Advance(ID index.IndexInternalID, preAlloced *index.TermFieldDoc) (*index.TermFieldDoc, error) {
// FIXME do something better
// for now, if we need to seek backwards, then restart from the beginning
if i.currPosting != nil && bytes.Compare(i.currID, ID) >= 0 {
if i.currPosting != nil && i.currID.Compare(ID) >= 0 {
// Check if the TFR is a special unadorned composite optimization.
// Such a TFR will NOT have a valid `term` or `field` set, making it
// impossible for the TFR to replace itself with a new one.
Expand All @@ -171,7 +170,7 @@ func (i *IndexSnapshotTermFieldReader) Advance(ID index.IndexInternalID, preAllo
}
}
}
num, err := docInternalToNumber(ID)
num, err := ID.Value()
if err != nil {
return nil, fmt.Errorf("error converting to doc number % x - %v", ID, err)
}
Expand All @@ -196,7 +195,7 @@ func (i *IndexSnapshotTermFieldReader) Advance(ID index.IndexInternalID, preAllo
if preAlloced == nil {
preAlloced = &index.TermFieldDoc{}
}
preAlloced.ID = docNumberToBytes(preAlloced.ID, next.Number()+
preAlloced.ID = index.NewIndexInternalID(preAlloced.ID, next.Number()+
i.snapshot.offsets[segIndex])
i.postingToTermFieldDoc(next, preAlloced)
i.currID = preAlloced.ID
Expand Down
9 changes: 4 additions & 5 deletions index/scorch/snapshot_index_vr.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package scorch

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -95,7 +94,7 @@ func (i *IndexSnapshotVectorReader) Next(preAlloced *index.VectorDoc) (
// make segment number into global number by adding offset
globalOffset := i.snapshot.offsets[i.segmentOffset]
nnum := next.Number()
rv.ID = docNumberToBytes(rv.ID, nnum+globalOffset)
rv.ID = index.NewIndexInternalID(rv.ID, nnum+globalOffset)
rv.Score = float64(next.Score())

i.currID = rv.ID
Expand All @@ -112,7 +111,7 @@ func (i *IndexSnapshotVectorReader) Next(preAlloced *index.VectorDoc) (
func (i *IndexSnapshotVectorReader) Advance(ID index.IndexInternalID,
preAlloced *index.VectorDoc) (*index.VectorDoc, error) {

if i.currPosting != nil && bytes.Compare(i.currID, ID) >= 0 {
if i.currPosting != nil && i.currID.Compare(ID) >= 0 {
i2, err := i.snapshot.VectorReader(i.ctx, i.vector, i.field, i.k,
i.searchParams, i.eligibleSelector)
if err != nil {
Expand All @@ -123,7 +122,7 @@ func (i *IndexSnapshotVectorReader) Advance(ID index.IndexInternalID,
*i = *(i2.(*IndexSnapshotVectorReader))
}

num, err := docInternalToNumber(ID)
num, err := ID.Value()
if err != nil {
return nil, fmt.Errorf("error converting to doc number % x - %v", ID, err)
}
Expand All @@ -148,7 +147,7 @@ func (i *IndexSnapshotVectorReader) Advance(ID index.IndexInternalID,
if preAlloced == nil {
preAlloced = &index.VectorDoc{}
}
preAlloced.ID = docNumberToBytes(preAlloced.ID, next.Number()+
preAlloced.ID = index.NewIndexInternalID(preAlloced.ID, next.Number()+
i.snapshot.offsets[segIndex])
i.currID = preAlloced.ID
i.currPosting = next
Expand Down
21 changes: 21 additions & 0 deletions index/scorch/snapshot_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,19 @@ func (s *SegmentSnapshot) Count() uint64 {
return rv
}

// this counts the root documents in the segment this differs from Count() in that
// Count() counts all live documents including nested children, whereas this method
// counts only root live documents
func (s *SegmentSnapshot) CountRoot() uint64 {
var rv uint64
if nsb, ok := s.segment.(segment.NestedSegment); ok {
rv = nsb.CountRoot(s.deleted)
} else {
rv = s.Count()
}
return rv
}

func (s *SegmentSnapshot) DocNumbers(docIDs []string) (*roaring.Bitmap, error) {
rv, err := s.segment.DocNumbers(docIDs)
if err != nil {
Expand Down Expand Up @@ -361,3 +374,11 @@ func (c *cachedMeta) fetchMeta(field string) (rv interface{}) {
c.m.RUnlock()
return rv
}

func (s *SegmentSnapshot) Ancestors(docID uint64) []uint64 {
nsb, ok := s.segment.(segment.NestedSegment)
if !ok {
return nil
}
return nsb.Ancestors(docID)
}
Loading
Loading