Skip to content

Commit f1644b4

Browse files
bug fixes and new features.
1 parent 4e38ae4 commit f1644b4

File tree

10 files changed

+342
-4
lines changed

10 files changed

+342
-4
lines changed

build.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64
172172
fieldFSTs: make(map[uint16]*vellum.FST),
173173
vecIndexCache: newVectorIndexCache(),
174174
synIndexCache: newSynonymIndexCache(),
175+
nstIndexCache: newNestedIndexCache(),
175176
// following fields gets populated by loadFieldsNew
176177
fieldsMap: make(map[string]uint16),
177178
dictLocs: make([]uint64, 0),

dict.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ func (d *Dictionary) postingsListInit(rv *PostingsList, except *roaring.Bitmap)
104104
rv.postings = postings
105105
}
106106
rv.sb = d.sb
107-
rv.except = except
107+
// add subdocs to except if needed
108+
rv.except = rv.sb.AddSubDocs(except)
108109
return rv
109110
}
110111

faiss_vector_posting.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,9 @@ func (sb *SegmentBase) InterpretVectorIndex(field string, requiresFiltering bool
547547
pos += n
548548
}
549549

550+
// add sub-docs to except if needed
551+
except = sb.AddSubDocs(except)
552+
550553
vecIndex, vecDocIDMap, docVecIDMap, vectorIDsToExclude, err =
551554
sb.vecIndexCache.loadOrCreate(fieldIDPlus1, sb.mem[pos:], requiresFiltering,
552555
except)

merge.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, pat
6767
_ = os.Remove(path)
6868
}
6969

70+
drops = addSubDocuments(segmentBases, drops)
71+
7072
// buffer the output
7173
br := bufio.NewWriterSize(f, DefaultFileMergerBufferSize)
7274

@@ -531,6 +533,54 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap,
531533
}
532534
}
533535

536+
// calculate new edge list if applicable
537+
var newEdgeList map[uint64]uint64
538+
539+
for segI, segment := range segments {
540+
// check for the closure in meantime
541+
if isClosed(closeCh) {
542+
return 0, nil, seg.ErrClosed
543+
}
544+
// get the edgeList for this segment
545+
edgeList := segment.EdgeList()
546+
// if no edgeList, nothing to do
547+
if len(edgeList) == 0 {
548+
continue
549+
}
550+
newSegDocNums := rv[segI]
551+
for oldChild, oldParent := range edgeList {
552+
newParent := newSegDocNums[oldParent]
553+
newChild := newSegDocNums[oldChild]
554+
if newParent != docDropped &&
555+
newChild != docDropped {
556+
if newEdgeList == nil {
557+
newEdgeList = make(map[uint64]uint64)
558+
}
559+
newEdgeList[newChild] = newSegDocNums[oldParent]
560+
}
561+
}
562+
}
563+
564+
// write out the new edge list
565+
// first write out the number of entries
566+
// which is also the number of valid subDocs
567+
// in the merged segment
568+
err := binary.Write(w, binary.BigEndian, uint64(len(newEdgeList)))
569+
if err != nil {
570+
return 0, nil, err
571+
}
572+
// now the new edge list
573+
for child, parent := range newEdgeList {
574+
err := binary.Write(w, binary.BigEndian, child)
575+
if err != nil {
576+
return 0, nil, err
577+
}
578+
err = binary.Write(w, binary.BigEndian, parent)
579+
if err != nil {
580+
return 0, nil, err
581+
}
582+
}
583+
534584
return storedIndexOffset, rv, nil
535585
}
536586

@@ -614,3 +664,11 @@ func isClosed(closeCh chan struct{}) bool {
614664
return false
615665
}
616666
}
667+
668+
func addSubDocuments(sbs []*SegmentBase, drops []*roaring.Bitmap) []*roaring.Bitmap {
669+
rv := make([]*roaring.Bitmap, len(drops))
670+
for i, sb := range sbs {
671+
rv[i] = sb.AddSubDocs(drops[i])
672+
}
673+
return rv
674+
}

nested_cache.go

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package zap
2+
3+
import (
4+
"encoding/binary"
5+
"sync"
6+
)
7+
8+
func newNestedIndexCache() *nestedIndexCache {
9+
return &nestedIndexCache{}
10+
}
11+
12+
type nestedIndexCache struct {
13+
m sync.RWMutex
14+
15+
cache *nestedCacheEntry
16+
}
17+
18+
// Clear clears the synonym cache which would mean that the termID to term map would no longer be available.
19+
func (nc *nestedIndexCache) Clear() {
20+
nc.m.Lock()
21+
nc.cache = nil
22+
nc.m.Unlock()
23+
}
24+
25+
// Returns edgeList, ancestry, descendants
26+
func (nc *nestedIndexCache) loadOrCreate(edgeListOffset uint64, mem []byte) *nestedCacheEntry {
27+
nc.m.RLock()
28+
if nc.cache != nil {
29+
nc.m.RUnlock()
30+
return nc.cache
31+
}
32+
nc.m.RUnlock()
33+
34+
nc.m.Lock()
35+
defer nc.m.Unlock()
36+
37+
if nc.cache != nil {
38+
return nc.cache
39+
}
40+
41+
return nc.createAndCacheLOCKED(edgeListOffset, mem)
42+
}
43+
44+
// createAndCacheLOCKED creates the synonym index cache for the specified fieldID and caches it.
45+
func (sc *nestedIndexCache) createAndCacheLOCKED(edgeListOffset uint64, mem []byte) *nestedCacheEntry {
46+
// pos stores the current read position
47+
pos := edgeListOffset
48+
// read number of subDocs which is also the number of edges
49+
numEdges := binary.BigEndian.Uint64(mem[pos : pos+8])
50+
pos += 8
51+
// if no edges or no subDocs, return empty cache
52+
if numEdges == 0 {
53+
sc.cache = &nestedCacheEntry{}
54+
return sc.cache
55+
}
56+
// edgeList as a map[node]parent
57+
edgeList := make(map[uint64]uint64, numEdges)
58+
for i := uint64(0); i < numEdges; i++ {
59+
child := binary.BigEndian.Uint64(mem[pos : pos+8])
60+
pos += 8
61+
parent := binary.BigEndian.Uint64(mem[pos : pos+8])
62+
pos += 8
63+
edgeList[child] = parent
64+
}
65+
// build ancestry using DFS + memoization
66+
ancestry := make(map[uint64][]uint64, numEdges)
67+
// memoized DFS
68+
var getAncestors func(uint64) []uint64
69+
getAncestors = func(node uint64) []uint64 {
70+
// if already computed, return
71+
if val, ok := ancestry[node]; ok {
72+
return val
73+
}
74+
if parent, ok := edgeList[node]; ok {
75+
// compute parent's ancestry + parent itself
76+
res := append([]uint64{parent}, getAncestors(parent)...)
77+
ancestry[node] = res
78+
return res
79+
}
80+
return nil
81+
}
82+
83+
for child := range edgeList {
84+
// only store if non-empty ancestry
85+
if v := getAncestors(child); len(v) > 0 {
86+
ancestry[child] = v
87+
}
88+
}
89+
90+
descendants := make(map[uint64][]uint64, numEdges)
91+
92+
// Build descendants using ancestry
93+
for node := uint64(0); node < numEdges; node++ {
94+
if ancestors, ok := ancestry[node]; ok {
95+
for _, ancestor := range ancestors {
96+
descendants[ancestor] = append(descendants[ancestor], node)
97+
}
98+
}
99+
}
100+
101+
sc.cache = &nestedCacheEntry{
102+
edgeList: edgeList,
103+
ancestry: ancestry,
104+
descendants: descendants,
105+
}
106+
107+
return sc.cache
108+
}
109+
110+
func (nc *nestedIndexCache) getAncestry(edgeListOffset uint64, mem []byte, docNum uint64) []uint64 {
111+
cache := nc.loadOrCreate(edgeListOffset, mem)
112+
if cache == nil || cache.ancestry == nil {
113+
return nil
114+
}
115+
return cache.ancestry[docNum]
116+
}
117+
118+
func (nc *nestedIndexCache) getDescendants(edgeListOffset uint64, mem []byte, docNum uint64) []uint64 {
119+
cache := nc.loadOrCreate(edgeListOffset, mem)
120+
if cache == nil || cache.ancestry == nil {
121+
return nil
122+
}
123+
return cache.descendants[docNum]
124+
}
125+
126+
func (nc *nestedIndexCache) getEdgeList(edgeListOffset uint64, mem []byte) map[uint64]uint64 {
127+
cache := nc.loadOrCreate(edgeListOffset, mem)
128+
if cache == nil || cache.ancestry == nil {
129+
return nil
130+
}
131+
return cache.edgeList
132+
}
133+
134+
func (nc *nestedIndexCache) getNumSubDocs(edgeListOffset uint64, mem []byte) uint64 {
135+
cache := nc.loadOrCreate(edgeListOffset, mem)
136+
if cache == nil {
137+
return 0
138+
}
139+
return uint64(len(cache.edgeList))
140+
}
141+
142+
type nestedCacheEntry struct {
143+
// edgeList[node] = parent
144+
edgeList map[uint64]uint64
145+
// ancestry[node] = list of parents
146+
ancestry map[uint64][]uint64
147+
// descendants[parent] = list of children
148+
descendants map[uint64][]uint64
149+
}

new.go

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func (*ZapPlugin) newWithChunkMode(results []index.Document,
6262
br.Grow(estimateAvgBytesPerDoc * estimateNumResults)
6363
}
6464

65-
s.results = results
65+
s.results, s.edgeList = flattenNestedDocuments(results)
6666
s.chunkMode = chunkMode
6767
s.w = NewCountHashWriter(&br)
6868

@@ -72,7 +72,7 @@ func (*ZapPlugin) newWithChunkMode(results []index.Document,
7272
}
7373

7474
sb, err := InitSegmentBase(br.Bytes(), s.w.Sum32(), chunkMode,
75-
uint64(len(results)), storedIndexOffset, sectionsIndexOffset)
75+
uint64(len(s.results)), storedIndexOffset, sectionsIndexOffset)
7676

7777
// get the bytes written before the interim's reset() call
7878
// write it to the newly formed segment base.
@@ -94,6 +94,9 @@ var interimPool = sync.Pool{New: func() interface{} { return &interim{} }}
9494
type interim struct {
9595
results []index.Document
9696

97+
// edge list for nested documents: child -> parent
98+
edgeList map[int]uint64
99+
97100
chunkMode uint32
98101

99102
w *CountHashWriter
@@ -122,6 +125,7 @@ type interim struct {
122125

123126
func (s *interim) reset() (err error) {
124127
s.results = nil
128+
s.edgeList = nil
125129
s.chunkMode = 0
126130
s.w = nil
127131
for k := range s.FieldsMap {
@@ -408,6 +412,27 @@ func (s *interim) writeStoredFields() (
408412
}
409413
}
410414

415+
// write the number of edges in the child -> parent edge list
416+
// this will be zero if there are no nested documents
417+
// and this number also reflects the number of sub-documents
418+
// in the segment
419+
err = binary.Write(s.w, binary.BigEndian, uint64(len(s.edgeList)))
420+
if err != nil {
421+
return 0, err
422+
}
423+
// write the child -> parent edge list
424+
// child and parent are both flattened doc ids
425+
for child, parent := range s.edgeList {
426+
err = binary.Write(s.w, binary.BigEndian, uint64(child))
427+
if err != nil {
428+
return 0, err
429+
}
430+
err = binary.Write(s.w, binary.BigEndian, parent)
431+
if err != nil {
432+
return 0, err
433+
}
434+
}
435+
411436
return storedIndexOffset, nil
412437
}
413438

@@ -437,3 +462,48 @@ func numUvarintBytes(x uint64) (n int) {
437462
}
438463
return n + 1
439464
}
465+
466+
// flattenNestedDocuments returns a preorder list of the given documents and all their nested documents,
467+
// along with a map mapping each flattened index to its parent index (excluding root docs entirely).
468+
func flattenNestedDocuments(docs []index.Document) ([]index.Document, map[int]uint64) {
469+
totalCount := 0
470+
for _, doc := range docs {
471+
totalCount += countNestedDocuments(doc)
472+
}
473+
474+
flattened := make([]index.Document, 0, totalCount)
475+
edgeMap := make(map[int]uint64, totalCount)
476+
477+
var traverse func(doc index.Document, hasParent bool, parentIdx int)
478+
traverse = func(d index.Document, hasParent bool, parentIdx int) {
479+
curIdx := len(flattened)
480+
flattened = append(flattened, d)
481+
482+
if hasParent {
483+
edgeMap[curIdx] = uint64(parentIdx)
484+
}
485+
486+
if nestedDoc, ok := d.(index.NestedDocument); ok {
487+
nestedDoc.VisitNestedDocuments(func(child index.Document) {
488+
traverse(child, true, curIdx)
489+
})
490+
}
491+
}
492+
// Top-level docs have no parent
493+
for _, doc := range docs {
494+
traverse(doc, false, 0)
495+
}
496+
return flattened, edgeMap
497+
}
498+
499+
// countNestedDocuments returns the total number of docs in preorder,
500+
// including the parent and all descendants.
501+
func countNestedDocuments(doc index.Document) int {
502+
count := 1 // include this doc
503+
if nd, ok := doc.(index.NestedDocument); ok {
504+
nd.VisitNestedDocuments(func(child index.Document) {
505+
count += countNestedDocuments(child)
506+
})
507+
}
508+
return count
509+
}

read.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,12 @@ func (sb *SegmentBase) getDocStoredOffsets(docNum uint64) (
4141

4242
return indexOffset, storedOffset, n, metaLen, dataLen
4343
}
44+
45+
func (sb *SegmentBase) getEdgeListOffset() uint64 {
46+
// Edge list comes right after document content and before doc stored offsets
47+
// The stored index offset points to where the doc offsets start
48+
// So edge list starts right after the last document content
49+
// which is at sb.storedIndexOffset + (8 * sb.numDocs)
50+
// since each doc offset is 8 bytes
51+
return sb.storedIndexOffset + (8 * sb.numDocs)
52+
}

section_inverted_text_index.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1046,6 +1046,7 @@ func (io *invertedIndexOpaque) Reset() (err error) {
10461046
atomic.StoreUint64(&io.bytesWritten, 0)
10471047
io.fieldsSame = false
10481048
io.numDocs = 0
1049+
io.fieldAddrs = map[int]int{}
10491050

10501051
return err
10511052
}

section_synonym_index.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ func (so *synonymIndexOpaque) Reset() (err error) {
137137
so.FieldIDtoThesaurusID = nil
138138
so.SynonymTermToID = so.SynonymTermToID[:0]
139139
so.SynonymIDtoTerm = so.SynonymIDtoTerm[:0]
140+
so.thesaurusAddrs = map[int]int{}
140141

141142
so.tmp0 = so.tmp0[:0]
142143
return err

0 commit comments

Comments
 (0)