Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64
fieldFSTs: make(map[uint16]*vellum.FST),
vecIndexCache: newVectorIndexCache(),
synIndexCache: newSynonymIndexCache(),
nstIndexCache: newNestedIndexCache(),
// following fields gets populated by loadFieldsNew
fieldsMap: make(map[string]uint16),
dictLocs: make([]uint64, 0),
Expand Down
3 changes: 2 additions & 1 deletion dict.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ func (d *Dictionary) postingsListInit(rv *PostingsList, except *roaring.Bitmap)
rv.postings = postings
}
rv.sb = d.sb
rv.except = except
// add subdocs to except if needed
rv.except = rv.sb.AddSubDocs(except)
return rv
}

Expand Down
3 changes: 3 additions & 0 deletions faiss_vector_posting.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,9 @@ func (sb *SegmentBase) InterpretVectorIndex(field string, requiresFiltering bool
pos += n
}

// add sub-docs to except if needed
except = sb.AddSubDocs(except)

vecIndex, vecDocIDMap, docVecIDMap, vectorIDsToExclude, err =
sb.vecIndexCache.loadOrCreate(fieldIDPlus1, sb.mem[pos:], requiresFiltering,
except)
Expand Down
58 changes: 58 additions & 0 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, pat
_ = os.Remove(path)
}

drops = addSubDocuments(segmentBases, drops)

// buffer the output
br := bufio.NewWriterSize(f, DefaultFileMergerBufferSize)

Expand Down Expand Up @@ -557,6 +559,54 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap,
}
}

// calculate new edge list if applicable
var newEdgeList map[uint64]uint64

for segI, segment := range segments {
// check for the closure in meantime
if isClosed(closeCh) {
return 0, nil, seg.ErrClosed
}
// get the edgeList for this segment
edgeList := segment.EdgeList()
// if no edgeList, nothing to do
if len(edgeList) == 0 {
continue
}
newSegDocNums := rv[segI]
for oldChild, oldParent := range edgeList {
newParent := newSegDocNums[oldParent]
newChild := newSegDocNums[oldChild]
if newParent != docDropped &&
newChild != docDropped {
if newEdgeList == nil {
newEdgeList = make(map[uint64]uint64)
}
newEdgeList[newChild] = newParent
}
}
}

// write out the new edge list
// first write out the number of entries
// which is also the number of valid subDocs
// in the merged segment
err := binary.Write(w, binary.BigEndian, uint64(len(newEdgeList)))
if err != nil {
return 0, nil, err
}
// now the new edge list
for child, parent := range newEdgeList {
err := binary.Write(w, binary.BigEndian, child)
if err != nil {
return 0, nil, err
}
err = binary.Write(w, binary.BigEndian, parent)
if err != nil {
return 0, nil, err
}
}

return storedIndexOffset, rv, nil
}

Expand Down Expand Up @@ -668,3 +718,11 @@ func isClosed(closeCh chan struct{}) bool {
return false
}
}

func addSubDocuments(sbs []*SegmentBase, drops []*roaring.Bitmap) []*roaring.Bitmap {
rv := make([]*roaring.Bitmap, len(drops))
for i, sb := range sbs {
rv[i] = sb.AddSubDocs(drops[i])
}
return rv
}
147 changes: 147 additions & 0 deletions nested_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package zap

import (
"encoding/binary"
"sync"
)

func newNestedIndexCache() *nestedIndexCache {
return &nestedIndexCache{}
}

type nestedIndexCache struct {
m sync.RWMutex

cache *nestedCacheEntry
}

// Clear clears the nested index cache, removing cached edge lists, ancestry, and descendants.
func (nc *nestedIndexCache) Clear() {
nc.m.Lock()
nc.cache = nil
nc.m.Unlock()
}

// Returns edgeList, ancestry, descendants
func (nc *nestedIndexCache) loadOrCreate(edgeListOffset uint64, mem []byte) *nestedCacheEntry {
nc.m.RLock()
if nc.cache != nil {
nc.m.RUnlock()
return nc.cache
}
nc.m.RUnlock()

nc.m.Lock()
defer nc.m.Unlock()

if nc.cache != nil {
return nc.cache
}

return nc.createAndCacheLOCKED(edgeListOffset, mem)
}

// createAndCacheLOCKED creates and caches a nested cache entry (edge list, ancestry, descendants) for the given edge list offset and memory slice.
func (sc *nestedIndexCache) createAndCacheLOCKED(edgeListOffset uint64, mem []byte) *nestedCacheEntry {
// pos stores the current read position
pos := edgeListOffset
// read number of subDocs which is also the number of edges
numEdges := binary.BigEndian.Uint64(mem[pos : pos+8])
pos += 8
// if no edges or no subDocs, return empty cache
if numEdges == 0 {
sc.cache = &nestedCacheEntry{}
return sc.cache
}
// edgeList as a map[node]parent
edgeList := make(map[uint64]uint64, numEdges)
for i := uint64(0); i < numEdges; i++ {
child := binary.BigEndian.Uint64(mem[pos : pos+8])
pos += 8
parent := binary.BigEndian.Uint64(mem[pos : pos+8])
pos += 8
edgeList[child] = parent
}
// build ancestry using DFS + memoization
ancestry := make(map[uint64][]uint64, numEdges)
// memoized DFS
var getAncestors func(uint64) []uint64
getAncestors = func(node uint64) []uint64 {
// if already computed, return
if val, ok := ancestry[node]; ok {
return val
}
if parent, ok := edgeList[node]; ok {
// compute parent's ancestry + parent itself
res := append([]uint64{parent}, getAncestors(parent)...)
ancestry[node] = res
return res
}
return nil
}

for child := range edgeList {
// only store if non-empty ancestry
if v := getAncestors(child); len(v) > 0 {
ancestry[child] = v
}
}

descendants := make(map[uint64][]uint64, numEdges)

// Build descendants using ancestry
for node, parents := range ancestry {
for _, parent := range parents {
descendants[parent] = append(descendants[parent], node)
}
}

sc.cache = &nestedCacheEntry{
edgeList: edgeList,
ancestry: ancestry,
descendants: descendants,
}

return sc.cache
}

func (nc *nestedIndexCache) getAncestry(edgeListOffset uint64, mem []byte, docNum uint64) []uint64 {
cache := nc.loadOrCreate(edgeListOffset, mem)
if cache == nil || cache.ancestry == nil {
return nil
}
return cache.ancestry[docNum]
}

func (nc *nestedIndexCache) getDescendants(edgeListOffset uint64, mem []byte, docNum uint64) []uint64 {
cache := nc.loadOrCreate(edgeListOffset, mem)
if cache == nil || cache.descendants == nil {
return nil
}
return cache.descendants[docNum]
}

func (nc *nestedIndexCache) getEdgeList(edgeListOffset uint64, mem []byte) map[uint64]uint64 {
cache := nc.loadOrCreate(edgeListOffset, mem)
if cache == nil || cache.edgeList == nil {
return nil
}
return cache.edgeList
}

func (nc *nestedIndexCache) getNumSubDocs(edgeListOffset uint64, mem []byte) uint64 {
cache := nc.loadOrCreate(edgeListOffset, mem)
if cache == nil {
return 0
}
return uint64(len(cache.edgeList))
}

type nestedCacheEntry struct {
// edgeList[node] = parent
edgeList map[uint64]uint64
// ancestry[node] = list of parents
ancestry map[uint64][]uint64
// descendants[parent] = list of children
descendants map[uint64][]uint64
}
74 changes: 72 additions & 2 deletions new.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
br.Grow(estimateAvgBytesPerDoc * estimateNumResults)
}

s.results = results
s.results, s.edgeList = flattenNestedDocuments(results)
s.chunkMode = chunkMode
s.w = NewCountHashWriter(&br)

Expand All @@ -72,7 +72,7 @@
}

sb, err := InitSegmentBase(br.Bytes(), s.w.Sum32(), chunkMode,
uint64(len(results)), storedIndexOffset, sectionsIndexOffset)
uint64(len(s.results)), storedIndexOffset, sectionsIndexOffset)

// get the bytes written before the interim's reset() call
// write it to the newly formed segment base.
Expand All @@ -94,6 +94,9 @@
type interim struct {
results []index.Document

// edge list for nested documents: child -> parent
edgeList map[int]uint64

chunkMode uint32

w *CountHashWriter
Expand Down Expand Up @@ -122,6 +125,7 @@

func (s *interim) reset() (err error) {
s.results = nil
s.edgeList = nil
s.chunkMode = 0
s.w = nil
for k := range s.FieldsMap {
Expand Down Expand Up @@ -408,6 +412,27 @@
}
}

// write the number of edges in the child -> parent edge list
// this will be zero if there are no nested documents
// and this number also reflects the number of sub-documents
// in the segment
err = binary.Write(s.w, binary.BigEndian, uint64(len(s.edgeList)))
if err != nil {
return 0, err
}
// write the child -> parent edge list
// child and parent are both flattened doc ids
for child, parent := range s.edgeList {
err = binary.Write(s.w, binary.BigEndian, uint64(child))
if err != nil {
return 0, err
}
err = binary.Write(s.w, binary.BigEndian, parent)
if err != nil {
return 0, err
}
}

return storedIndexOffset, nil
}

Expand Down Expand Up @@ -437,3 +462,48 @@
}
return n + 1
}

// flattenNestedDocuments returns a preorder list of the given documents and all their nested documents,
// along with a map mapping each flattened index to its parent index (excluding root docs entirely).
func flattenNestedDocuments(docs []index.Document) ([]index.Document, map[int]uint64) {
totalCount := 0
for _, doc := range docs {
totalCount += countNestedDocuments(doc)
}

flattened := make([]index.Document, 0, totalCount)
edgeMap := make(map[int]uint64, totalCount)

var traverse func(doc index.Document, hasParent bool, parentIdx int)
traverse = func(d index.Document, hasParent bool, parentIdx int) {
curIdx := len(flattened)
flattened = append(flattened, d)

if hasParent {
edgeMap[curIdx] = uint64(parentIdx)
}

if nestedDoc, ok := d.(index.NestedDocument); ok {

Check failure on line 486 in new.go

View workflow job for this annotation

GitHub Actions / test (1.23.x, ubuntu-latest)

undefined: index.NestedDocument

Check failure on line 486 in new.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.NestedDocument

Check failure on line 486 in new.go

View workflow job for this annotation

GitHub Actions / test (1.24.x, macos-latest)

undefined: index.NestedDocument

Check failure on line 486 in new.go

View workflow job for this annotation

GitHub Actions / test (1.24.x, ubuntu-latest)

undefined: index.NestedDocument

Check failure on line 486 in new.go

View workflow job for this annotation

GitHub Actions / test (1.23.x, macos-latest)

undefined: index.NestedDocument
nestedDoc.VisitNestedDocuments(func(child index.Document) {
traverse(child, true, curIdx)
})
}
}
// Top-level docs have no parent
for _, doc := range docs {
traverse(doc, false, 0)
}
return flattened, edgeMap
}

// countNestedDocuments returns the total number of docs in preorder,
// including the parent and all descendants.
func countNestedDocuments(doc index.Document) int {
count := 1 // include this doc
if nd, ok := doc.(index.NestedDocument); ok {

Check failure on line 503 in new.go

View workflow job for this annotation

GitHub Actions / test (1.23.x, ubuntu-latest)

undefined: index.NestedDocument

Check failure on line 503 in new.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.NestedDocument

Check failure on line 503 in new.go

View workflow job for this annotation

GitHub Actions / test (1.24.x, macos-latest)

undefined: index.NestedDocument

Check failure on line 503 in new.go

View workflow job for this annotation

GitHub Actions / test (1.24.x, ubuntu-latest)

undefined: index.NestedDocument

Check failure on line 503 in new.go

View workflow job for this annotation

GitHub Actions / test (1.23.x, macos-latest)

undefined: index.NestedDocument
nd.VisitNestedDocuments(func(child index.Document) {
count += countNestedDocuments(child)
})
}
return count
}
9 changes: 9 additions & 0 deletions read.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,12 @@ func (sb *SegmentBase) getDocStoredOffsets(docNum uint64) (

return indexOffset, storedOffset, n, metaLen, dataLen
}

func (sb *SegmentBase) getEdgeListOffset() uint64 {
// Edge list comes right after document content and before doc stored offsets
// The stored index offset points to where the doc offsets start
// So edge list starts right after the last document content
// which is at sb.storedIndexOffset + (8 * sb.numDocs)
// since each doc offset is 8 bytes
return sb.storedIndexOffset + (8 * sb.numDocs)
}
1 change: 1 addition & 0 deletions section_inverted_text_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,7 @@ func (io *invertedIndexOpaque) Reset() (err error) {
atomic.StoreUint64(&io.bytesWritten, 0)
io.fieldsSame = false
io.numDocs = 0
io.fieldAddrs = map[int]int{}

return err
}
Expand Down
1 change: 1 addition & 0 deletions section_synonym_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (so *synonymIndexOpaque) Reset() (err error) {
so.FieldIDtoThesaurusID = nil
so.SynonymTermToID = so.SynonymTermToID[:0]
so.SynonymIDtoTerm = so.SynonymIDtoTerm[:0]
so.thesaurusAddrs = map[int]int{}

so.tmp0 = so.tmp0[:0]
return err
Expand Down
Loading
Loading