diff --git a/build.go b/build.go index 7843653..e8bb7cf 100644 --- a/build.go +++ b/build.go @@ -174,6 +174,7 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64 fieldFSTs: make(map[uint16]*vellum.FST), vecIndexCache: newVectorIndexCache(), synIndexCache: newSynonymIndexCache(), + nstIndexCache: newNestedIndexCache(), // following fields gets populated by loadFieldsNew fieldsMap: make(map[string]uint16), dictLocs: make([]uint64, 0), diff --git a/go.mod b/go.mod index dcf08a6..31960a5 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,10 @@ go 1.23 require ( github.com/RoaringBitmap/roaring/v2 v2.4.5 github.com/bits-and-blooms/bitset v1.22.0 - github.com/blevesearch/bleve_index_api v1.2.9 + github.com/blevesearch/bleve_index_api v1.2.11-0.20251014092825-c63531cff62e github.com/blevesearch/go-faiss v1.0.25 github.com/blevesearch/mmap-go v1.0.4 - github.com/blevesearch/scorch_segment_api/v2 v2.3.11 + github.com/blevesearch/scorch_segment_api/v2 v2.3.13-0.20251014151641-79a52e6ea80f github.com/blevesearch/vellum v1.1.0 github.com/golang/snappy v0.0.4 github.com/spf13/cobra v1.7.0 diff --git a/go.sum b/go.sum index 0c80fd6..b14df0e 100644 --- a/go.sum +++ b/go.sum @@ -5,12 +5,16 @@ github.com/bits-and-blooms/bitset v1.22.0 h1:Tquv9S8+SGaS3EhyA+up3FXzmkhxPGjQQCk github.com/bits-and-blooms/bitset v1.22.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/blevesearch/bleve_index_api v1.2.9 h1:WqD3kvYwnlYLv8sTdH+AF7n/L4v969Cek68+wZnYj4Q= github.com/blevesearch/bleve_index_api v1.2.9/go.mod h1:rKQDl4u51uwafZxFrPD1R7xFOwKnzZW7s/LSeK4lgo0= +github.com/blevesearch/bleve_index_api v1.2.11-0.20251014092825-c63531cff62e h1:TJYWXKiQflZlgTtG1c11QUMXm50XSBdj2Ssdvznuxbs= +github.com/blevesearch/bleve_index_api v1.2.11-0.20251014092825-c63531cff62e/go.mod h1:rKQDl4u51uwafZxFrPD1R7xFOwKnzZW7s/LSeK4lgo0= github.com/blevesearch/go-faiss v1.0.25 h1:lel1rkOUGbT1CJ0YgzKwC7k+XH0XVBHnCVWahdCXk4U= github.com/blevesearch/go-faiss v1.0.25/go.mod h1:OMGQwOaRRYxrmeNdMrXJPvVx8gBnvE5RYrr0BahNnkk= github.com/blevesearch/mmap-go v1.0.4 h1:OVhDhT5B/M1HNPpYPBKIEJaD0F3Si+CrEKULGCDPWmc= github.com/blevesearch/mmap-go v1.0.4/go.mod h1:EWmEAOmdAS9z/pi/+Toxu99DnsbhG1TIxUoRmJw/pSs= github.com/blevesearch/scorch_segment_api/v2 v2.3.11 h1:bYuEgsyGqgU/gy0/Vk6g1eCUqGBs2r+3bRCv+Cnq2kc= github.com/blevesearch/scorch_segment_api/v2 v2.3.11/go.mod h1:aAWoeQ3DdoZ3Z5138jXVSd1T/klGwvg11z0pSxrJSEk= +github.com/blevesearch/scorch_segment_api/v2 v2.3.13-0.20251014151641-79a52e6ea80f h1:3BOY/h5E/Wjv3Uu1zql8zdiVQtSkwN7rPUB+46Bo1fE= +github.com/blevesearch/scorch_segment_api/v2 v2.3.13-0.20251014151641-79a52e6ea80f/go.mod h1:JBRGAneqgLSI2+jCNjtwMqp2B7EBF3/VUzgDPIU33MM= github.com/blevesearch/vellum v1.1.0 h1:CinkGyIsgVlYf8Y2LUQHvdelgXr6PYuvoDIajq6yR9w= github.com/blevesearch/vellum v1.1.0/go.mod h1:QgwWryE8ThtNPxtgWJof5ndPfx0/YMBh+W2weHKPw8Y= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= diff --git a/merge.go b/merge.go index 6197af1..aa04bf0 100644 --- a/merge.go +++ b/merge.go @@ -557,6 +557,54 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, } } + // calculate new edge list if applicable + var newEdgeList map[uint64]uint64 + + for segI, segment := range segments { + // check for the closure in meantime + if isClosed(closeCh) { + return 0, nil, seg.ErrClosed + } + // get the edgeList for this segment + edgeList := segment.EdgeList() + // if no edgeList, nothing to do + if len(edgeList) == 0 { + continue + } + newSegDocNums := rv[segI] + for oldChild, oldParent := range edgeList { + newParent := newSegDocNums[oldParent] + newChild := newSegDocNums[oldChild] + if newParent != docDropped && + newChild != docDropped { + if newEdgeList == nil { + newEdgeList = make(map[uint64]uint64) + } + newEdgeList[newChild] = newParent + } + } + } + + // write out the new edge list + // first write out the number of entries + // which is also the number of valid subDocs + // in the merged segment + err := binary.Write(w, binary.BigEndian, uint64(len(newEdgeList))) + if err != nil { + return 0, nil, err + } + // now the new edge list + for child, parent := range newEdgeList { + err := binary.Write(w, binary.BigEndian, child) + if err != nil { + return 0, nil, err + } + err = binary.Write(w, binary.BigEndian, parent) + if err != nil { + return 0, nil, err + } + } + return storedIndexOffset, rv, nil } diff --git a/nested_cache.go b/nested_cache.go new file mode 100644 index 0000000..0963c3e --- /dev/null +++ b/nested_cache.go @@ -0,0 +1,146 @@ +package zap + +import ( + "encoding/binary" + "sync" + + "github.com/RoaringBitmap/roaring/v2" +) + +func newNestedIndexCache() *nestedIndexCache { + return &nestedIndexCache{} +} + +type nestedIndexCache struct { + m sync.RWMutex + + cache *nestedCacheEntry +} + +// Clear clears the nested index cache, removing cached edge lists, ancestry, and descendants. +func (nc *nestedIndexCache) Clear() { + nc.m.Lock() + nc.cache = nil + nc.m.Unlock() +} + +// Returns edgeList, ancestry, descendants +func (nc *nestedIndexCache) loadOrCreate(edgeListOffset uint64, mem []byte) *nestedCacheEntry { + nc.m.RLock() + if nc.cache != nil { + nc.m.RUnlock() + return nc.cache + } + nc.m.RUnlock() + + nc.m.Lock() + defer nc.m.Unlock() + + if nc.cache != nil { + return nc.cache + } + + return nc.createAndCacheLOCKED(edgeListOffset, mem) +} + +// createAndCacheLOCKED creates and caches a nested cache entry (edge list, ancestry, descendants) for the given edge list offset and memory slice. +func (sc *nestedIndexCache) createAndCacheLOCKED(edgeListOffset uint64, mem []byte) *nestedCacheEntry { + // pos stores the current read position + pos := edgeListOffset + // read number of subDocs which is also the number of edges + numEdges := binary.BigEndian.Uint64(mem[pos : pos+8]) + pos += 8 + // if no edges or no subDocs, return empty cache + if numEdges == 0 { + sc.cache = &nestedCacheEntry{} + return sc.cache + } + // edgeList as a map[node]parent + edgeList := make(map[uint64]uint64, numEdges) + for i := uint64(0); i < numEdges; i++ { + child := binary.BigEndian.Uint64(mem[pos : pos+8]) + pos += 8 + parent := binary.BigEndian.Uint64(mem[pos : pos+8]) + pos += 8 + edgeList[child] = parent + } + + sc.cache = &nestedCacheEntry{ + edgeList: edgeList, + } + + return sc.cache +} + +func getDocAncestors(edgeList map[uint64]uint64, docNum uint64) []uint64 { + var ancestors []uint64 + current := docNum + for { + parent, ok := edgeList[current] + if !ok { + break + } + ancestors = append(ancestors, parent) + current = parent + } + return ancestors +} + +func (nc *nestedIndexCache) getAncestry(edgeListOffset uint64, mem []byte, docNum uint64) []uint64 { + cache := nc.loadOrCreate(edgeListOffset, mem) + if cache == nil || cache.edgeList == nil { + return nil + } + return getDocAncestors(cache.edgeList, docNum) +} + +func (nc *nestedIndexCache) getEdgeList(edgeListOffset uint64, mem []byte) map[uint64]uint64 { + cache := nc.loadOrCreate(edgeListOffset, mem) + if cache == nil || cache.edgeList == nil { + return nil + } + return cache.edgeList +} + +func (nc *nestedIndexCache) getNumSubDocs(edgeListOffset uint64, mem []byte) uint64 { + cache := nc.loadOrCreate(edgeListOffset, mem) + if cache == nil { + return 0 + } + return uint64(len(cache.edgeList)) +} + +// countRoot returns the number of root documents in the given bitmap +func (nc *nestedIndexCache) countRoot(edgeListOffset uint64, mem []byte, bm *roaring.Bitmap) uint64 { + var totalDocs uint64 + if bm == nil { + // if bitmap is empty, return 0 + return totalDocs + } + totalDocs = bm.GetCardinality() + cache := nc.loadOrCreate(edgeListOffset, mem) + if cache == nil { + // if cache is nil, no nested docs, so all docs are root docs + // so just return the cardinality of the bitmap + return totalDocs + } + // count sub docs in the bitmap, a sub doc is one that has a parent in the edge list + var subDocCount uint64 + bm.Iterate(func(docNum uint32) bool { + if _, ok := cache.edgeList[uint64(docNum)]; ok { + subDocCount++ + } + return true + }) + // root docs = total docs - sub docs + if totalDocs < subDocCount { + // should not happen, but just in case + return 0 + } + return totalDocs - subDocCount +} + +type nestedCacheEntry struct { + // edgeList[node] = parent + edgeList map[uint64]uint64 +} diff --git a/new.go b/new.go index c99b933..a823fdc 100644 --- a/new.go +++ b/new.go @@ -62,7 +62,7 @@ func (*ZapPlugin) newWithChunkMode(results []index.Document, br.Grow(estimateAvgBytesPerDoc * estimateNumResults) } - s.results = results + s.results, s.edgeList = flattenNestedDocuments(results) s.chunkMode = chunkMode s.w = NewCountHashWriter(&br) @@ -72,7 +72,7 @@ func (*ZapPlugin) newWithChunkMode(results []index.Document, } sb, err := InitSegmentBase(br.Bytes(), s.w.Sum32(), chunkMode, - uint64(len(results)), storedIndexOffset, sectionsIndexOffset) + uint64(len(s.results)), storedIndexOffset, sectionsIndexOffset) // get the bytes written before the interim's reset() call // write it to the newly formed segment base. @@ -94,6 +94,9 @@ var interimPool = sync.Pool{New: func() interface{} { return &interim{} }} type interim struct { results []index.Document + // edge list for nested documents: child -> parent + edgeList map[uint64]uint64 + chunkMode uint32 w *CountHashWriter @@ -122,6 +125,7 @@ type interim struct { func (s *interim) reset() (err error) { s.results = nil + s.edgeList = nil s.chunkMode = 0 s.w = nil for k := range s.FieldsMap { @@ -408,6 +412,27 @@ func (s *interim) writeStoredFields() ( } } + // write the number of edges in the child -> parent edge list + // this will be zero if there are no nested documents + // and this number also reflects the number of sub-documents + // in the segment + err = binary.Write(s.w, binary.BigEndian, uint64(len(s.edgeList))) + if err != nil { + return 0, err + } + // write the child -> parent edge list + // child and parent are both flattened doc ids + for child, parent := range s.edgeList { + err = binary.Write(s.w, binary.BigEndian, uint64(child)) + if err != nil { + return 0, err + } + err = binary.Write(s.w, binary.BigEndian, parent) + if err != nil { + return 0, err + } + } + return storedIndexOffset, nil } @@ -437,3 +462,48 @@ func numUvarintBytes(x uint64) (n int) { } return n + 1 } + +// flattenNestedDocuments returns a preorder list of the given documents and all their nested documents, +// along with a map mapping each flattened index to its parent index (excluding root docs entirely). +func flattenNestedDocuments(docs []index.Document) ([]index.Document, map[uint64]uint64) { + totalCount := 0 + for _, doc := range docs { + totalCount += countNestedDocuments(doc) + } + + flattened := make([]index.Document, 0, totalCount) + edgeMap := make(map[uint64]uint64, totalCount) + + var traverse func(doc index.Document, hasParent bool, parentIdx uint64) + traverse = func(d index.Document, hasParent bool, parentIdx uint64) { + curIdx := uint64(len(flattened)) + flattened = append(flattened, d) + + if hasParent { + edgeMap[curIdx] = uint64(parentIdx) + } + + if nestedDoc, ok := d.(index.NestedDocument); ok { + nestedDoc.VisitNestedDocuments(func(child index.Document) { + traverse(child, true, curIdx) + }) + } + } + // Top-level docs have no parent + for _, doc := range docs { + traverse(doc, false, 0) + } + return flattened, edgeMap +} + +// countNestedDocuments returns the total number of docs in preorder, +// including the parent and all descendants. +func countNestedDocuments(doc index.Document) int { + count := 1 // include this doc + if nd, ok := doc.(index.NestedDocument); ok { + nd.VisitNestedDocuments(func(child index.Document) { + count += countNestedDocuments(child) + }) + } + return count +} diff --git a/read.go b/read.go index d4a1002..a5afd57 100644 --- a/read.go +++ b/read.go @@ -41,3 +41,12 @@ func (sb *SegmentBase) getDocStoredOffsets(docNum uint64) ( return indexOffset, storedOffset, n, metaLen, dataLen } + +func (sb *SegmentBase) getEdgeListOffset() uint64 { + // Edge list comes right after document content and before doc stored offsets + // The stored index offset points to where the doc offsets start + // So edge list starts right after the last document content + // which is at sb.storedIndexOffset + (8 * sb.numDocs) + // since each doc offset is 8 bytes + return sb.storedIndexOffset + (8 * sb.numDocs) +} diff --git a/section_inverted_text_index.go b/section_inverted_text_index.go index 0862b2e..9542803 100644 --- a/section_inverted_text_index.go +++ b/section_inverted_text_index.go @@ -1058,6 +1058,7 @@ func (io *invertedIndexOpaque) Reset() (err error) { atomic.StoreUint64(&io.bytesWritten, 0) io.fieldsSame = false io.numDocs = 0 + io.fieldAddrs = map[int]int{} return err } diff --git a/section_synonym_index.go b/section_synonym_index.go index 3894d1a..ae1627e 100644 --- a/section_synonym_index.go +++ b/section_synonym_index.go @@ -137,6 +137,7 @@ func (so *synonymIndexOpaque) Reset() (err error) { so.FieldIDtoThesaurusID = nil so.SynonymTermToID = so.SynonymTermToID[:0] so.SynonymIDtoTerm = so.SynonymIDtoTerm[:0] + so.thesaurusAddrs = map[int]int{} so.tmp0 = so.tmp0[:0] return err diff --git a/segment.go b/segment.go index 461fdf5..ffce68e 100644 --- a/segment.go +++ b/segment.go @@ -58,6 +58,7 @@ func (*ZapPlugin) Open(path string) (segment.Segment, error) { fieldFSTs: make(map[uint16]*vellum.FST), vecIndexCache: newVectorIndexCache(), synIndexCache: newSynonymIndexCache(), + nstIndexCache: newNestedIndexCache(), fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)), }, f: f, @@ -118,6 +119,7 @@ type SegmentBase struct { // this cache comes into play when vectors are supported in builds. vecIndexCache *vectorIndexCache synIndexCache *synonymIndexCache + nstIndexCache *nestedIndexCache } func (sb *SegmentBase) Size() int { @@ -157,6 +159,7 @@ func (sb *SegmentBase) DecRef() (err error) { return nil } func (sb *SegmentBase) Close() (err error) { sb.vecIndexCache.Clear() sb.synIndexCache.Clear() + sb.nstIndexCache.Clear() return nil } @@ -702,6 +705,7 @@ func (s *Segment) closeActual() (err error) { // clear contents from the vector and synonym index cache before un-mmapping s.vecIndexCache.Clear() s.synIndexCache.Clear() + s.nstIndexCache.Clear() if s.mm != nil { err = s.mm.Unmap() @@ -965,3 +969,69 @@ func (s *SegmentBase) GetUpdatedFields() map[string]*index.UpdateFieldInfo { func (s *SegmentBase) SetUpdatedFields(updatedFields map[string]*index.UpdateFieldInfo) { s.updatedFields = updatedFields } + +// Ancestors returns a slice of document numbers representing the ancestors of the +// specified document (docNum) within the segment. If the document has no ancestors, +// an empty slice is returned. +func (sb *SegmentBase) Ancestors(docNum uint64) []uint64 { + return sb.nstIndexCache.getAncestry(sb.getEdgeListOffset(), sb.mem, docNum) +} + +// CountRoot returns the number of root documents in the segment, excluding any +// documents that are marked as deleted in the provided bitmap. The deleted bitmap +// may contain both root and sub-document numbers, and the method ensures that +// only root documents are counted. +func (sb *SegmentBase) CountRoot(deleted *roaring.Bitmap) uint64 { + // the formula is as follows: + // Total Docs (T) = Root Docs (R) + Sub Docs (S) + // R = T - S + // Now if we have D deleted docs, some of which may be sub-docs, we need to exclude + // those from the root doc count. Let D = dR + dS, where dR is the number of deleted + // root docs and dS is the number of deleted sub docs. + // dR = D - dS + // Therefore, the count of root docs excluding deleted ones is: + // R - dR = (T - S) - (D - dS) + return (sb.Count() - sb.CountNested()) - (sb.nstIndexCache.countRoot(sb.getEdgeListOffset(), sb.mem, deleted)) +} + +// SubDocCount returns the number of sub-documents present in the segment. +// This is determined using the segment's edge list. +func (sb *SegmentBase) CountNested() uint64 { + return sb.nstIndexCache.getNumSubDocs(sb.getEdgeListOffset(), sb.mem) +} + +// EdgeList returns a map representing the parent-child relationships between documents in the segment. +// The map keys are child document numbers (uint64), and the values are their corresponding parent document numbers (uint64). +// If a document has no parent, it may not appear in the map. This is useful for navigating document hierarchies. +func (sb *SegmentBase) EdgeList() map[uint64]uint64 { + return sb.nstIndexCache.getEdgeList(sb.getEdgeListOffset(), sb.mem) +} + +// AddNestedDocuments returns a bitmap containing the original document numbers in drops, +// plus any descendant document numbers for each dropped document. The drops +// parameter represents a set of document numbers to be dropped, and the returned +// bitmap includes both the original drops and all their descendants (if any). +func (sb *SegmentBase) AddNestedDocuments(drops *roaring.Bitmap) *roaring.Bitmap { + // If no drops or no subDocs, nothing to do + if drops == nil || drops.GetCardinality() == 0 || sb.CountNested() == 0 { + return drops + } + // Get the edge list for this segment + el := sb.EdgeList() + // Algorithm => iterate through each child->parent mapping in the edge list, + // and for each pair, check if the parent is in the drops bitmap. + // If it is, and the child is also not already in the drops bitmap, + // add the child to the drops. Repeat this process until no + // dew additions are made in an iteration. + changed := true + for changed { + changed = false + for child, parent := range el { + if drops.Contains(uint32(parent)) && !drops.Contains(uint32(child)) { + drops.Add(uint32(child)) + changed = true + } + } + } + return drops +}