From e62096c16b384334d5499de21ae71310a586be16 Mon Sep 17 00:00:00 2001 From: Likith B Date: Wed, 9 Jul 2025 14:11:43 +0530 Subject: [PATCH 1/5] MB-65860: Added new file writers and readers --- build.go | 12 ++-- contentcoder.go | 8 +++ docvalues.go | 6 +- faiss_vector_cache.go | 32 +++++++--- faiss_vector_posting.go | 2 +- go.mod | 2 + intDecoder.go | 11 +++- intcoder.go | 24 +++++++- merge.go | 63 +++++++++++++------- new.go | 23 +++++--- posting.go | 11 ++-- read.go | 13 +++- section.go | 4 +- section_faiss_vector_index.go | 105 ++++++++++++++++++++++----------- section_inverted_text_index.go | 30 +++++++--- section_synonym_index.go | 73 +++++++++++++++-------- segment.go | 38 +++++++++--- synonym_cache.go | 35 +++++++---- synonym_posting.go | 7 ++- write.go | 28 ++++++++- writer.go | 78 ++++++++++++++++++++++++ 21 files changed, 463 insertions(+), 142 deletions(-) create mode 100644 writer.go diff --git a/build.go b/build.go index 7843653a..29a7e1c7 100644 --- a/build.go +++ b/build.go @@ -100,7 +100,7 @@ func persistSegmentBaseToWriter(sb *SegmentBase, w io.Writer) (int, error) { } err = persistFooter(sb.numDocs, sb.storedIndexOffset, sb.fieldsIndexOffset, sb.sectionsIndexOffset, - sb.docValueOffset, sb.chunkMode, sb.memCRC, br) + sb.docValueOffset, sb.chunkMode, sb.memCRC, br, sb.writerId) if err != nil { return 0, err } @@ -159,7 +159,7 @@ func persistStoredFieldValues(fieldID int, } func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64, - storedIndexOffset uint64, sectionsIndexOffset uint64) (*SegmentBase, error) { + storedIndexOffset uint64, sectionsIndexOffset uint64, writerId string) (*SegmentBase, error) { sb := &SegmentBase{ mem: mem, memCRC: memCRC, @@ -175,9 +175,11 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64 vecIndexCache: newVectorIndexCache(), synIndexCache: newSynonymIndexCache(), // following fields gets populated by loadFieldsNew - fieldsMap: make(map[string]uint16), - dictLocs: make([]uint64, 0), - fieldsInv: make([]string, 0), + fieldsMap: make(map[string]uint16), + dictLocs: make([]uint64, 0), + fieldsInv: make([]string, 0), + writerId: writerId, + fileReader: NewFileReader(writerId), } sb.updateSize() diff --git a/contentcoder.go b/contentcoder.go index 3343d317..56ced4d2 100644 --- a/contentcoder.go +++ b/contentcoder.go @@ -140,6 +140,14 @@ func (c *chunkedContentCoder) flushContents() error { c.final = append(c.final, c.chunkMetaBuf.Bytes()...) // write the compressed data to the final data c.compressed = snappy.Encode(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes()) + if fw, ok := c.w.(*fileWriter); ok && fw != nil { + // process the compressed data using the callback + var err error + c.compressed, err = fw.process(c.compressed) + if err != nil { + return err + } + } c.incrementBytesWritten(uint64(len(c.compressed))) c.final = append(c.final, c.compressed...) diff --git a/docvalues.go b/docvalues.go index 21124c4e..629cd037 100644 --- a/docvalues.go +++ b/docvalues.go @@ -197,10 +197,14 @@ func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *SegmentBase) error offset += uint64(read) } + var err error compressedDataLoc := chunkMetaLoc + offset dataLength := curChunkEnd - compressedDataLoc di.incrementBytesRead(uint64(dataLength + offset)) - di.curChunkData = s.mem[compressedDataLoc : compressedDataLoc+dataLength] + di.curChunkData, err = s.fileReader.process(s.mem[compressedDataLoc : compressedDataLoc+dataLength]) + if err != nil { + return err + } di.curChunkNum = chunkNumber di.uncompressed = di.uncompressed[:0] return nil diff --git a/faiss_vector_cache.go b/faiss_vector_cache.go index ce8e1bff..eea82e78 100644 --- a/faiss_vector_cache.go +++ b/faiss_vector_cache.go @@ -56,7 +56,7 @@ func (vc *vectorIndexCache) Clear() { // present. It also returns the batch executor for the field if it's present in the // cache. func (vc *vectorIndexCache) loadOrCreate(fieldID uint16, mem []byte, - loadDocVecIDMap bool, except *roaring.Bitmap) ( + loadDocVecIDMap bool, except *roaring.Bitmap, r *fileReader) ( index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, docVecIDMap map[uint32][]int64, vecIDsToExclude []int64, err error) { vc.m.RLock() @@ -83,7 +83,7 @@ func (vc *vectorIndexCache) loadOrCreate(fieldID uint16, mem []byte, // acquiring a lock since this is modifying the cache. vc.m.Lock() defer vc.m.Unlock() - return vc.createAndCacheLOCKED(fieldID, mem, loadDocVecIDMap, except) + return vc.createAndCacheLOCKED(fieldID, mem, loadDocVecIDMap, except, r) } func (vc *vectorIndexCache) addDocVecIDMapToCacheLOCKED(ce *cacheEntry) map[uint32][]int64 { @@ -104,7 +104,7 @@ func (vc *vectorIndexCache) addDocVecIDMapToCacheLOCKED(ce *cacheEntry) map[uint // Rebuilding the cache on a miss. func (vc *vectorIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte, - loadDocVecIDMap bool, except *roaring.Bitmap) ( + loadDocVecIDMap bool, except *roaring.Bitmap, r *fileReader) ( index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, docVecIDMap map[uint32][]int64, vecIDsToExclude []int64, err error) { @@ -127,16 +127,27 @@ func (vc *vectorIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte, numVecs, n := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) pos += n + mapLen, n := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) + pos += n + pos += int(mapLen) + + buf, err := r.process(mem[pos : pos+int(mapLen)]) + if err != nil { + return nil, nil, nil, nil, err + } + bufPos := 0 + bufLen := len(buf) + vecDocIDMap = make(map[int64]uint32, numVecs) if loadDocVecIDMap { docVecIDMap = make(map[uint32][]int64, numVecs) } isExceptNotEmpty := except != nil && !except.IsEmpty() for i := 0; i < int(numVecs); i++ { - vecID, n := binary.Varint(mem[pos : pos+binary.MaxVarintLen64]) - pos += n - docID, n := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) - pos += n + vecID, n := binary.Varint(buf[bufPos:min(bufPos+binary.MaxVarintLen64, bufLen)]) + bufPos += n + docID, n := binary.Uvarint(buf[bufPos:min(bufPos+binary.MaxVarintLen64, bufLen)]) + bufPos += n docIDUint32 := uint32(docID) if isExceptNotEmpty && except.Contains(docIDUint32) { @@ -152,7 +163,12 @@ func (vc *vectorIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte, indexSize, n := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) pos += n - index, err = faiss.ReadIndexFromBuffer(mem[pos:pos+int(indexSize)], faissIOFlags) + buf, err = r.process(mem[pos : pos+int(indexSize)]) + if err != nil { + return nil, nil, nil, nil, err + } + + index, err = faiss.ReadIndexFromBuffer(buf, faissIOFlags) if err != nil { return nil, nil, nil, nil, err } diff --git a/faiss_vector_posting.go b/faiss_vector_posting.go index 2a77199c..cec70fb6 100644 --- a/faiss_vector_posting.go +++ b/faiss_vector_posting.go @@ -549,7 +549,7 @@ func (sb *SegmentBase) InterpretVectorIndex(field string, requiresFiltering bool vecIndex, vecDocIDMap, docVecIDMap, vectorIDsToExclude, err = sb.vecIndexCache.loadOrCreate(fieldIDPlus1, sb.mem[pos:], requiresFiltering, - except) + except, sb.fileReader) if vecIndex != nil { vecIndexSize = vecIndex.Size() diff --git a/go.mod b/go.mod index 80e1a712..7f73c743 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/blevesearch/zapx/v16 go 1.23 +toolchain go1.23.9 + require ( github.com/RoaringBitmap/roaring/v2 v2.4.5 github.com/bits-and-blooms/bitset v1.22.0 diff --git a/intDecoder.go b/intDecoder.go index 1a69e614..b56e88b6 100644 --- a/intDecoder.go +++ b/intDecoder.go @@ -26,12 +26,13 @@ type chunkedIntDecoder struct { curChunkBytes []byte data []byte r *memUvarintReader + fr *fileReader bytesRead uint64 } // newChunkedIntDecoder expects an optional or reset chunkedIntDecoder for better reuse. -func newChunkedIntDecoder(buf []byte, offset uint64, rv *chunkedIntDecoder) *chunkedIntDecoder { +func newChunkedIntDecoder(buf []byte, offset uint64, rv *chunkedIntDecoder, fr *fileReader) *chunkedIntDecoder { if rv == nil { rv = &chunkedIntDecoder{startOffset: offset, data: buf} } else { @@ -59,6 +60,7 @@ func newChunkedIntDecoder(buf []byte, offset uint64, rv *chunkedIntDecoder) *chu } rv.bytesRead += n rv.dataStartOffset = offset + n + rv.fr = fr return rv } @@ -88,6 +90,13 @@ func (d *chunkedIntDecoder) loadChunk(chunk int) error { end += e d.curChunkBytes = d.data[start:end] d.bytesRead += uint64(len(d.curChunkBytes)) + + var err error + d.curChunkBytes, err = d.fr.process(d.data[start:end]) + if err != nil { + return fmt.Errorf("error processing chunk %d: %w", chunk, err) + } + d.bytesRead += end - start if d.r == nil { d.r = newMemUvarintReader(d.curChunkBytes) } else { diff --git a/intcoder.go b/intcoder.go index e1586edc..d0803338 100644 --- a/intcoder.go +++ b/intcoder.go @@ -144,6 +144,26 @@ func (c *chunkedIntCoder) Write(w io.Writer) (int, error) { // convert the chunk lengths into chunk offsets chunkOffsets := modifyLengthsToEndOffsets(c.chunkLens) + // process each chunk if needed and update the chunk offsets + if fw, ok := w.(*fileWriter); ok && fw != nil { + var prevOffset int + processedBuf := make([]byte, 0, 64) + for i := 0; i < len(chunkOffsets); i++ { + if chunkOffsets[i] == uint64(prevOffset) { + continue + } + buf, err := fw.process(c.final[prevOffset:chunkOffsets[i]]) + if err != nil { + return 0, err + } + processedBuf = append(processedBuf, buf...) + c.chunkLens[i] = uint64(len(buf)) + prevOffset = int(chunkOffsets[i]) + } + c.final = processedBuf + chunkOffsets = modifyLengthsToEndOffsets(c.chunkLens) + } + // write out the number of chunks & each chunk offsets n := binary.PutUvarint(buf, uint64(len(chunkOffsets))) for _, chunkOffset := range chunkOffsets { @@ -172,8 +192,8 @@ func (c *chunkedIntCoder) writeAt(w io.Writer) (uint64, int, error) { return startOffset, 0, nil } - if chw := w.(*CountHashWriter); chw != nil { - startOffset = uint64(chw.Count()) + if fw, ok := w.(*fileWriter); ok && fw != nil { + startOffset = uint64(fw.Count()) } tw, err := c.Write(w) diff --git a/merge.go b/merge.go index 6197af11..c195a0ab 100644 --- a/merge.go +++ b/merge.go @@ -73,9 +73,10 @@ func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, pat // wrap it for counting (tracking offsets) cr := NewCountHashWriterWithStatsReporter(br, s) + w := NewFileWriter(cr) newDocNums, numDocs, storedIndexOffset, _, _, sectionsIndexOffset, err := - mergeToWriter(segmentBases, drops, chunkMode, cr, closeCh) + mergeToWriter(segmentBases, drops, chunkMode, w, closeCh) if err != nil { cleanup() return nil, 0, err @@ -83,7 +84,7 @@ func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, pat // passing the sectionsIndexOffset as fieldsIndexOffset and the docValueOffset as 0 for the footer err = persistFooter(numDocs, storedIndexOffset, sectionsIndexOffset, sectionsIndexOffset, - 0, chunkMode, cr.Sum32(), cr) + 0, chunkMode, cr.Sum32(), w, w.id) if err != nil { cleanup() return nil, 0, err @@ -124,7 +125,7 @@ func filterFields(fieldsInv []string, fieldInfo map[string]*index.UpdateFieldInf } func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, - chunkMode uint32, cr *CountHashWriter, closeCh chan struct{}) ( + chunkMode uint32, w *fileWriter, closeCh chan struct{}) ( newDocNums [][]uint64, numDocs, storedIndexOffset uint64, fieldsInv []string, fieldsMap map[string]uint16, sectionsIndexOffset uint64, err error) { @@ -155,7 +156,7 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, if numDocs > 0 { storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops, - fieldsMap, fieldsInv, fieldsSame, numDocs, cr, closeCh, updatedFields) + fieldsMap, fieldsInv, fieldsSame, numDocs, w, closeCh, updatedFields) if err != nil { return nil, 0, 0, nil, nil, 0, err } @@ -163,8 +164,7 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, // at this point, ask each section implementation to merge itself for i, x := range segmentSections { mergeOpaque[int(i)] = x.InitOpaque(args) - - err = x.Merge(mergeOpaque, segments, drops, fieldsInv, newDocNums, cr, closeCh) + err = x.Merge(mergeOpaque, segments, drops, fieldsInv, newDocNums, w, closeCh) if err != nil { return nil, 0, 0, nil, nil, 0, err } @@ -173,7 +173,7 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, // we can persist the fields section index now, this will point // to the various indexes (each in different section) available for a field. - sectionsIndexOffset, err = persistFieldsSection(fieldsInv, cr, mergeOpaque) + sectionsIndexOffset, err = persistFieldsSection(fieldsInv, w, mergeOpaque) if err != nil { return nil, 0, 0, nil, nil, 0, err } @@ -319,7 +319,7 @@ func mergeTermFreqNormLocs(fieldsMap map[string]uint16, term []byte, postItr *Po func writePostings(postings *roaring.Bitmap, tfEncoder, locEncoder *chunkedIntCoder, use1HitEncoding func(uint64) (bool, uint64, uint64), - w *CountHashWriter, bufMaxVarintLen64 []byte) ( + w *fileWriter, bufMaxVarintLen64 []byte) ( offset uint64, err error) { if postings == nil { return 0, nil @@ -375,7 +375,7 @@ type varintEncoder func(uint64) (int, error) func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, fieldsMap map[string]uint16, fieldsInv []string, fieldsSame bool, newSegDocCount uint64, - w *CountHashWriter, closeCh chan struct{}, updatedFields map[string]*index.UpdateFieldInfo) (uint64, [][]uint64, error) { + w *fileWriter, closeCh chan struct{}, updatedFields map[string]*index.UpdateFieldInfo) (uint64, [][]uint64, error) { var rv [][]uint64 // The remapped or newDocNums for each segment. var newDocNum uint64 @@ -393,6 +393,16 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, typs := make([][]byte, len(fieldsInv)) poss := make([][][]uint64, len(fieldsInv)) + // copying data from segments is only possible when + // there are no writer callbacks across all segments + copyFlag := true + for _, segment := range segments { + if segment.writerId != "" { + copyFlag = true + break + } + } + var posBuf []uint64 docNumOffsets := make([]uint64, newSegDocCount) @@ -415,7 +425,7 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, // segments and there are no deletions, via byte-copying // of stored docs bytes directly to the writer // cannot copy directly if fields might have been deleted - if fieldsSame && (dropsI == nil || dropsI.GetCardinality() == 0) && len(updatedFields) == 0 { + if fieldsSame && (dropsI == nil || dropsI.GetCardinality() == 0) && copyFlag && w.id == "" && len(updatedFields) == 0 { err := segment.copyStoredDocs(newDocNum, docNumOffsets, w) if err != nil { return 0, nil, err @@ -517,25 +527,38 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, // record where we're about to start writing docNumOffsets[newDocNum] = uint64(w.Count()) - // write out the meta len and compressed data len - _, err = writeUvarints(w, - uint64(len(metaBytes)), - uint64(len(idFieldVal)+len(compressed))) + bufMeta, err := w.process(metaBytes) if err != nil { return 0, nil, err } - // now write the meta - _, err = w.Write(metaBytes) + + // CHECK + // Making a copy because TestMergeWithEmptySegmentsFirst fails + // during an append to idFieldVal + buf := make([]byte, 0, len(idFieldVal)+len(compressed)) + buf = append(buf, idFieldVal...) + buf = append(buf, compressed...) + + // bufCompressed, err := w.process(append(idFieldVal, compressed...)) + bufCompressed, err := w.process(buf) + if err != nil { + return 0, nil, err + } + + // write out the meta len and compressed data len + _, err = writeUvarints(w, + uint64(len(bufMeta)), + uint64(len(bufCompressed))) if err != nil { return 0, nil, err } - // now write the _id field val (counted as part of the 'compressed' data) - _, err = w.Write(idFieldVal) + // now write the meta + _, err = w.Write(bufMeta) if err != nil { return 0, nil, err } // now write the compressed data - _, err = w.Write(compressed) + _, err = w.Write(bufCompressed) if err != nil { return 0, nil, err } @@ -564,7 +587,7 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, // using a single Write() call for the entire set of bytes. The // newDocNumOffsets is filled with the new offsets for each doc. func (sb *SegmentBase) copyStoredDocs(newDocNum uint64, newDocNumOffsets []uint64, - w *CountHashWriter) error { + w *fileWriter) error { if sb.numDocs <= 0 { return nil } diff --git a/new.go b/new.go index c99b933d..ef4202db 100644 --- a/new.go +++ b/new.go @@ -64,7 +64,7 @@ func (*ZapPlugin) newWithChunkMode(results []index.Document, s.results = results s.chunkMode = chunkMode - s.w = NewCountHashWriter(&br) + s.w = NewFileWriter(NewCountHashWriter(&br)) storedIndexOffset, sectionsIndexOffset, err := s.convert() if err != nil { @@ -72,7 +72,7 @@ func (*ZapPlugin) newWithChunkMode(results []index.Document, } sb, err := InitSegmentBase(br.Bytes(), s.w.Sum32(), chunkMode, - uint64(len(results)), storedIndexOffset, sectionsIndexOffset) + uint64(len(results)), storedIndexOffset, sectionsIndexOffset, s.w.id) // get the bytes written before the interim's reset() call // write it to the newly formed segment base. @@ -96,7 +96,7 @@ type interim struct { chunkMode uint32 - w *CountHashWriter + w *fileWriter // FieldsMap adds 1 to field id to avoid zero value issues // name -> field id + 1 @@ -376,24 +376,29 @@ func (s *interim) writeStoredFields() ( s.incrementBytesWritten(uint64(len(compressed))) docStoredOffsets[docNum] = uint64(s.w.Count()) - _, err := writeUvarints(s.w, - uint64(len(metaBytes)), - uint64(len(idFieldVal)+len(compressed))) + bufMeta, err := s.w.process(metaBytes) if err != nil { return 0, err } - _, err = s.w.Write(metaBytes) + bufCompressed, err := s.w.process(append(idFieldVal, compressed...)) if err != nil { return 0, err } - _, err = s.w.Write(idFieldVal) + _, err = writeUvarints(s.w, + uint64(len(bufMeta)), + uint64(len(bufCompressed))) if err != nil { return 0, err } - _, err = s.w.Write(compressed) + _, err = s.w.Write(bufMeta) + if err != nil { + return 0, err + } + + _, err = s.w.Write(bufCompressed) if err != nil { return 0, err } diff --git a/posting.go b/posting.go index 893f717a..0e63bb1c 100644 --- a/posting.go +++ b/posting.go @@ -209,13 +209,13 @@ func (p *PostingsList) iterator(includeFreq, includeNorm, includeLocs bool, // initialize freq chunk reader if rv.includeFreqNorm { - rv.freqNormReader = newChunkedIntDecoder(p.sb.mem, p.freqOffset, rv.freqNormReader) + rv.freqNormReader = newChunkedIntDecoder(p.sb.mem, p.freqOffset, rv.freqNormReader, p.sb.fileReader) rv.incrementBytesRead(rv.freqNormReader.getBytesRead()) } // initialize the loc chunk reader if rv.includeLocs { - rv.locReader = newChunkedIntDecoder(p.sb.mem, p.locOffset, rv.locReader) + rv.locReader = newChunkedIntDecoder(p.sb.mem, p.locOffset, rv.locReader, p.sb.fileReader) rv.incrementBytesRead(rv.locReader.getBytesRead()) } @@ -290,14 +290,17 @@ func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error { postingsLen, read = binary.Uvarint(d.sb.mem[postingsOffset+n : postingsOffset+n+binary.MaxVarintLen64]) n += uint64(read) - roaringBytes := d.sb.mem[postingsOffset+n : postingsOffset+n+postingsLen] + roaringBytes, err := d.sb.fileReader.process(d.sb.mem[postingsOffset+n : postingsOffset+n+postingsLen]) + if err != nil { + return err + } rv.incrementBytesRead(n + postingsLen) if rv.postings == nil { rv.postings = roaring.NewBitmap() } - _, err := rv.postings.FromBuffer(roaringBytes) + _, err = rv.postings.FromBuffer(roaringBytes) if err != nil { return fmt.Errorf("error loading roaring bitmap: %v", err) } diff --git a/read.go b/read.go index d4a10024..fda38a3a 100644 --- a/read.go +++ b/read.go @@ -16,13 +16,22 @@ package zap import "encoding/binary" -func (sb *SegmentBase) getDocStoredMetaAndCompressed(docNum uint64) ([]byte, []byte) { +func (sb *SegmentBase) getDocStoredMetaAndCompressed(docNum uint64) ([]byte, []byte, error) { _, storedOffset, n, metaLen, dataLen := sb.getDocStoredOffsets(docNum) meta := sb.mem[storedOffset+n : storedOffset+n+metaLen] data := sb.mem[storedOffset+n+metaLen : storedOffset+n+metaLen+dataLen] - return meta, data + meta, err := sb.fileReader.process(meta) + if err != nil { + return nil, nil, err + } + data, err = sb.fileReader.process(data) + if err != nil { + return nil, nil, err + } + + return meta, data, nil } func (sb *SegmentBase) getDocStoredOffsets(docNum uint64) ( diff --git a/section.go b/section.go index c6e8e113..fd42f7af 100644 --- a/section.go +++ b/section.go @@ -28,7 +28,7 @@ type section interface { Process(opaque map[int]resetable, docNum uint32, f index.Field, fieldID uint16) // flush the processed data in the opaque to the writer. - Persist(opaque map[int]resetable, w *CountHashWriter) (n int64, err error) + Persist(opaque map[int]resetable, w *fileWriter) (n int64, err error) // this API is used to fetch the file offset of the field for this section. // this is used during search time to parse the section, and fetch results @@ -40,7 +40,7 @@ type section interface { // as part of the merge API, write the merged data to the writer and also track // the starting offset of this newly merged section data. Merge(opaque map[int]resetable, segments []*SegmentBase, drops []*roaring.Bitmap, fieldsInv []string, - newDocNumsIn [][]uint64, w *CountHashWriter, closeCh chan struct{}) error + newDocNumsIn [][]uint64, w *fileWriter, closeCh chan struct{}) error // opaque is used to track the data specific to this section. its not visible // to the other sections and is only visible and freely modifiable by this specifc diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index ce16756c..cb203eee 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -57,7 +57,7 @@ func (v *faissVectorIndexSection) Process(opaque map[int]resetable, docNum uint3 } } -func (v *faissVectorIndexSection) Persist(opaque map[int]resetable, w *CountHashWriter) (n int64, err error) { +func (v *faissVectorIndexSection) Persist(opaque map[int]resetable, w *fileWriter) (n int64, err error) { vo := v.getvectorIndexOpaque(opaque) vo.writeVectorIndexes(w) return 0, nil @@ -81,7 +81,7 @@ type vecIndexInfo struct { // keep in mind with respect to update and delete operations with respect to vectors func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*SegmentBase, drops []*roaring.Bitmap, fieldsInv []string, - newDocNumsIn [][]uint64, w *CountHashWriter, closeCh chan struct{}) error { + newDocNumsIn [][]uint64, w *fileWriter, closeCh chan struct{}) error { vo := v.getvectorIndexOpaque(opaque) // the segments with valid vector sections in them @@ -139,13 +139,23 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se indexOptimizedFor: index.VectorIndexOptimizationsReverseLookup[int(indexOptimizationTypeInt)], }) + idMapLen, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += n + pos += int(idMapLen) + + buf, err := sb.fileReader.process(sb.mem[pos : pos+int(idMapLen)]) + if err != nil { + return err + } + bufPos := 0 + curIdx := len(indexes) - 1 for i := 0; i < int(numVecs); i++ { - vecID, n := binary.Varint(sb.mem[pos : pos+binary.MaxVarintLen64]) - pos += n + vecID, n := binary.Varint(buf[bufPos : bufPos+binary.MaxVarintLen64]) + bufPos += n - docID, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) - pos += n + docID, n := binary.Uvarint(buf[bufPos : bufPos+binary.MaxVarintLen64]) + bufPos += n // remap the docID from the old segment to the new document nos. // provided. furthermore, also drop the now-invalid doc nums @@ -183,7 +193,7 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se return nil } -func (v *vectorIndexOpaque) flushSectionMetadata(fieldID int, w *CountHashWriter, +func (v *vectorIndexOpaque) flushSectionMetadata(fieldID int, w *fileWriter, vecToDocID map[int64]uint64, indexes []*vecIndexInfo) error { tempBuf := v.grabBuf(binary.MaxVarintLen64) @@ -219,31 +229,44 @@ func (v *vectorIndexOpaque) flushSectionMetadata(fieldID int, w *CountHashWriter return err } + idBuf := make([]byte, 0, binary.MaxVarintLen64*2*len(vecToDocID)) + pos := 0 + for vecID, docID := range vecToDocID { // write the vecID - n = binary.PutVarint(tempBuf, vecID) - _, err = w.Write(tempBuf[:n]) - if err != nil { - return err - } + pos += binary.PutVarint(idBuf, vecID) + pos += binary.PutUvarint(idBuf, docID) + } + idBuf, err = w.process(idBuf[:pos]) + if err != nil { + return err + } - // write the docID - n = binary.PutUvarint(tempBuf, docID) - _, err = w.Write(tempBuf[:n]) - if err != nil { - return err - } + n = binary.PutUvarint(tempBuf, uint64(len(idBuf))) + _, err = w.Write(tempBuf[:n]) + if err != nil { + return err + } + + _, err = w.Write(idBuf) + if err != nil { + return err } v.fieldAddrs[uint16(fieldID)] = fieldStart return nil } -func (v *vectorIndexOpaque) flushVectorIndex(indexBytes []byte, w *CountHashWriter) error { +func (v *vectorIndexOpaque) flushVectorIndex(indexBytes []byte, w *fileWriter) error { tempBuf := v.grabBuf(binary.MaxVarintLen64) + indexBytes, err := w.process(indexBytes) + if err != nil { + return err + } + n := binary.PutUvarint(tempBuf, uint64(len(indexBytes))) - _, err := w.Write(tempBuf[:n]) + _, err = w.Write(tempBuf[:n]) if err != nil { return err } @@ -273,7 +296,7 @@ func calculateNprobe(nlist int, indexOptimizedFor string) int32 { // todo: naive implementation. need to keep in mind the perf implications and improve on this. // perhaps, parallelized merging can help speed things up over here. func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, - vecIndexes []*vecIndexInfo, w *CountHashWriter, closeCh chan struct{}) error { + vecIndexes []*vecIndexInfo, w *fileWriter, closeCh chan struct{}) error { // safe to assume that all the indexes are of the same config values, given // that they are extracted from the field mapping info. @@ -296,7 +319,10 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, } // read the index bytes. todo: parallelize this - indexBytes := segBase.mem[vecIndexes[segI].startOffset : vecIndexes[segI].startOffset+int(vecIndexes[segI].indexSize)] + indexBytes, err := w.process(segBase.mem[vecIndexes[segI].startOffset : vecIndexes[segI].startOffset+int(vecIndexes[segI].indexSize)]) + if err != nil { + return err + } index, err := faiss.ReadIndexFromBuffer(indexBytes, faissIOFlags) if err != nil { freeReconstructedIndexes(vecIndexes) @@ -480,7 +506,7 @@ func determineIndexToUse(nvecs, nlist int, indexOptimizedFor string) (string, in } } -func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint64, err error) { +func (vo *vectorIndexOpaque) writeVectorIndexes(w *fileWriter) (offset uint64, err error) { // for every fieldID, contents to store over here are: // 1. the serialized representation of the dense vector index. // 2. its constituent vectorID -> {docID} mapping. @@ -566,20 +592,28 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint // section would be help avoiding in paging in this data as part of a page // (which is to load a non-cacheable info like index). this could help the // paging costs + idBuf := make([]byte, 0, binary.MaxVarintLen64*2*len(content.vecs)) + pos := 0 for vecID := range content.vecs { docID := vo.vecIDMap[vecID].docID // write the vecID - n = binary.PutVarint(tempBuf, vecID) - _, err = w.Write(tempBuf[:n]) - if err != nil { - return 0, err - } + pos += binary.PutVarint(idBuf[pos:], vecID) + pos += binary.PutUvarint(idBuf[pos:], uint64(docID)) + } + idBuf, err = w.process(idBuf[:pos]) + if err != nil { + return 0, err + } - n = binary.PutUvarint(tempBuf, uint64(docID)) - _, err = w.Write(tempBuf[:n]) - if err != nil { - return 0, err - } + n = binary.PutUvarint(tempBuf, uint64(len(idBuf))) + _, err = w.Write(tempBuf[:n]) + if err != nil { + return 0, err + } + + _, err = w.Write(idBuf) + if err != nil { + return 0, err } // serialize the built index into a byte slice @@ -588,6 +622,11 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint return 0, err } + buf, err = w.process(buf) + if err != nil { + return 0, err + } + // record the fieldStart value for this section. // write the vecID -> docID mapping // write the index bytes and its length diff --git a/section_inverted_text_index.go b/section_inverted_text_index.go index 0862b2e2..c21744e9 100644 --- a/section_inverted_text_index.go +++ b/section_inverted_text_index.go @@ -68,7 +68,7 @@ func (i *invertedTextIndexSection) Process(opaque map[int]resetable, docNum uint } } -func (i *invertedTextIndexSection) Persist(opaque map[int]resetable, w *CountHashWriter) (n int64, err error) { +func (i *invertedTextIndexSection) Persist(opaque map[int]resetable, w *fileWriter) (n int64, err error) { invIndexOpaque := i.getInvertedIndexOpaque(opaque) _, err = invIndexOpaque.writeDicts(w) return 0, err @@ -82,7 +82,7 @@ func (i *invertedTextIndexSection) AddrForField(opaque map[int]resetable, fieldI func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring.Bitmap, fieldsInv []string, fieldsMap map[string]uint16, fieldsSame bool, newDocNumsIn [][]uint64, newSegDocCount uint64, chunkMode uint32, - updatedFields map[string]*index.UpdateFieldInfo, w *CountHashWriter, + updatedFields map[string]*index.UpdateFieldInfo, w *fileWriter, closeCh chan struct{}) (map[int]int, uint64, error) { var bufMaxVarintLen64 []byte = make([]byte, binary.MaxVarintLen64) var bufLoc []uint64 @@ -95,6 +95,16 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring. fieldDvLocsStart := make([]uint64, len(fieldsInv)) fieldDvLocsEnd := make([]uint64, len(fieldsInv)) + // copying data from segments is only possible when + // there are no writer callbacks across all segments + copyFlag := true + for _, segment := range segments { + if segment.writerId != "" { + copyFlag = false + break + } + } + // these int coders are initialized with chunk size 1024 // however this will be reset to the correct chunk size // while processing each individual field-term section @@ -250,7 +260,7 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring. postItr = postings.iterator(true, true, true, postItr) // can only safely copy data if no field data has been deleted - if fieldsSame && len(updatedFields) == 0 { + if fieldsSame && len(updatedFields) == 0 && copyFlag && w.id == "" { // can optimize by copying freq/norm/loc bytes directly lastDocNum, lastFreq, lastNorm, err = mergeTermFreqNormLocsByCopying( term, postItr, newDocNums[itrI], newRoaring, @@ -289,7 +299,10 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring. if err != nil { return nil, 0, err } - vellumData := vellumBuf.Bytes() + vellumData, err := w.process(vellumBuf.Bytes()) + if err != nil { + return nil, 0, err + } // write out the length of the vellum data n := binary.PutUvarint(bufMaxVarintLen64, uint64(len(vellumData))) @@ -404,7 +417,7 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring. func (i *invertedTextIndexSection) Merge(opaque map[int]resetable, segments []*SegmentBase, drops []*roaring.Bitmap, fieldsInv []string, newDocNumsIn [][]uint64, - w *CountHashWriter, closeCh chan struct{}) error { + w *fileWriter, closeCh chan struct{}) error { io := i.getInvertedIndexOpaque(opaque) fieldAddrs, _, err := mergeAndPersistInvertedSection(segments, drops, fieldsInv, io.FieldsMap, io.fieldsSame, newDocNumsIn, io.numDocs, io.chunkMode, io.updatedFields, w, closeCh) @@ -439,7 +452,7 @@ func (i *invertedIndexOpaque) BytesRead() uint64 { func (i *invertedIndexOpaque) ResetBytesRead(uint64) {} -func (io *invertedIndexOpaque) writeDicts(w *CountHashWriter) (dictOffsets []uint64, err error) { +func (io *invertedIndexOpaque) writeDicts(w *fileWriter) (dictOffsets []uint64, err error) { if io.results == nil || len(io.results) == 0 { return nil, nil } @@ -585,7 +598,10 @@ func (io *invertedIndexOpaque) writeDicts(w *CountHashWriter) (dictOffsets []uin // record where this dictionary starts dictOffsets[fieldID] = uint64(w.Count()) - vellumData := io.builderBuf.Bytes() + vellumData, err := w.process(io.builderBuf.Bytes()) + if err != nil { + return nil, err + } // write out the length of the vellum data n := binary.PutUvarint(buf, uint64(len(vellumData))) diff --git a/section_synonym_index.go b/section_synonym_index.go index 3894d1ae..ec1687cc 100644 --- a/section_synonym_index.go +++ b/section_synonym_index.go @@ -286,7 +286,7 @@ func (so *synonymIndexOpaque) grabBuf(size int) []byte { return buf[:size] } -func (so *synonymIndexOpaque) writeThesauri(w *CountHashWriter) (thesOffsets []uint64, err error) { +func (so *synonymIndexOpaque) writeThesauri(w *fileWriter) (thesOffsets []uint64, err error) { if so.results == nil || len(so.results) == 0 { return nil, nil @@ -328,7 +328,10 @@ func (so *synonymIndexOpaque) writeThesauri(w *CountHashWriter) (thesOffsets []u thesOffsets[thesaurusID] = uint64(w.Count()) - vellumData := so.builderBuf.Bytes() + vellumData, err := w.process(so.builderBuf.Bytes()) + if err != nil { + return nil, err + } // write out the length of the vellum data n := binary.PutUvarint(buf, uint64(len(vellumData))) @@ -352,7 +355,7 @@ func (so *synonymIndexOpaque) writeThesauri(w *CountHashWriter) (thesOffsets []u } // write out the synTermMap for this thesaurus - err := writeSynTermMap(so.SynonymIDtoTerm[thesaurusID], w, buf) + err = writeSynTermMap(so.SynonymIDtoTerm[thesaurusID], w, buf) if err != nil { return nil, err } @@ -422,7 +425,7 @@ func (s *synonymIndexSection) Process(opaque map[int]resetable, docNum uint32, f // Persist serializes and writes the thesauri processed to the writer, along // with the synonym postings lists, and the synonym term map. Implements the // Persist API for the synonym index section. -func (s *synonymIndexSection) Persist(opaque map[int]resetable, w *CountHashWriter) (n int64, err error) { +func (s *synonymIndexSection) Persist(opaque map[int]resetable, w *fileWriter) (n int64, err error) { synIndexOpaque := s.getSynonymIndexOpaque(opaque) _, err = synIndexOpaque.writeThesauri(w) return 0, err @@ -450,7 +453,7 @@ func (s *synonymIndexSection) AddrForField(opaque map[int]resetable, fieldID int // synonym index section. func (s *synonymIndexSection) Merge(opaque map[int]resetable, segments []*SegmentBase, drops []*roaring.Bitmap, fieldsInv []string, newDocNumsIn [][]uint64, - w *CountHashWriter, closeCh chan struct{}) error { + w *fileWriter, closeCh chan struct{}) error { so := s.getSynonymIndexOpaque(opaque) thesaurusAddrs, fieldIDtoThesaurusID, err := mergeAndPersistSynonymSection(segments, drops, fieldsInv, newDocNumsIn, w, closeCh) if err != nil { @@ -489,7 +492,7 @@ func encodeSynonym(synonymID uint32, docID uint32) uint64 { // writeSynonyms serilizes and writes the synonym postings list to the writer, by first // serializing the postings list to a byte slice and then writing the length // of the byte slice followed by the byte slice itself. -func writeSynonyms(postings *roaring64.Bitmap, w *CountHashWriter, bufMaxVarintLen64 []byte) ( +func writeSynonyms(postings *roaring64.Bitmap, w *fileWriter, bufMaxVarintLen64 []byte) ( offset uint64, err error) { termCardinality := postings.GetCardinality() if termCardinality <= 0 { @@ -502,6 +505,11 @@ func writeSynonyms(postings *roaring64.Bitmap, w *CountHashWriter, bufMaxVarintL if err != nil { return 0, err } + buf, err = w.process(buf) + if err != nil { + return 0, err + } + // write out the length n := binary.PutUvarint(bufMaxVarintLen64, uint64(len(buf))) _, err = w.Write(bufMaxVarintLen64[:n]) @@ -520,40 +528,52 @@ func writeSynonyms(postings *roaring64.Bitmap, w *CountHashWriter, bufMaxVarintL // writeSynTermMap serializes and writes the synonym term map to the writer, by first // writing the length of the map followed by the map entries, where each entry // consists of the synonym ID, the length of the term, and the term itself. -func writeSynTermMap(synTermMap map[uint32]string, w *CountHashWriter, bufMaxVarintLen64 []byte) error { +func writeSynTermMap(synTermMap map[uint32]string, w *fileWriter, bufMaxVarintLen64 []byte) error { if len(synTermMap) == 0 { return nil } n := binary.PutUvarint(bufMaxVarintLen64, uint64(len(synTermMap))) _, err := w.Write(bufMaxVarintLen64[:n]) - if err != nil { - return err + + lenTerms := 0 + for _, term := range synTermMap { + lenTerms += len(term) } + buf := make([]byte, lenTerms+binary.MaxVarintLen64*(2*len(synTermMap))) + bufPos := 0 + for sid, term := range synTermMap { - n = binary.PutUvarint(bufMaxVarintLen64, uint64(sid)) - _, err = w.Write(bufMaxVarintLen64[:n]) - if err != nil { - return err - } + bufPos += binary.PutUvarint(buf[bufPos:], uint64(sid)) - n = binary.PutUvarint(bufMaxVarintLen64, uint64(len(term))) - _, err = w.Write(bufMaxVarintLen64[:n]) - if err != nil { - return err - } + bufPos += binary.PutUvarint(buf[bufPos:], uint64(len(term))) - _, err = w.Write([]byte(term)) - if err != nil { - return err - } + copy(buf[bufPos:], term) + bufPos += len(term) + } + + buf, err = w.process(buf[:bufPos]) + if err != nil { + return err + } + + // write out the length of the map + n = binary.PutUvarint(bufMaxVarintLen64, uint64(len(buf))) + _, err = w.Write(bufMaxVarintLen64[:n]) + if err != nil { + return err + } + + _, err = w.Write(buf) + if err != nil { + return err } return nil } func mergeAndPersistSynonymSection(segments []*SegmentBase, dropsIn []*roaring.Bitmap, - fieldsInv []string, newDocNumsIn [][]uint64, w *CountHashWriter, + fieldsInv []string, newDocNumsIn [][]uint64, w *fileWriter, closeCh chan struct{}) (map[int]int, map[uint16]int, error) { var bufMaxVarintLen64 []byte = make([]byte, binary.MaxVarintLen64) @@ -722,7 +742,10 @@ func mergeAndPersistSynonymSection(segments []*SegmentBase, dropsIn []*roaring.B if err != nil { return nil, nil, err } - vellumData := vellumBuf.Bytes() + vellumData, err := w.process(vellumBuf.Bytes()) + if err != nil { + return nil, nil, err + } thesOffset := uint64(w.Count()) diff --git a/segment.go b/segment.go index 461fdf5a..bb004b1a 100644 --- a/segment.go +++ b/segment.go @@ -73,6 +73,8 @@ func (*ZapPlugin) Open(path string) (segment.Segment, error) { return nil, err } + rv.fileReader = NewFileReader(rv.writerId) + err = rv.loadFieldsNew() if err != nil { _ = rv.Close() @@ -109,6 +111,8 @@ type SegmentBase struct { fieldDvReaders []map[uint16]*docValueReader // naive chunk cache per field; section->field->reader fieldDvNames []string // field names cached in fieldDvReaders size uint64 + writerId string + fileReader *fileReader updatedFields map[string]*index.UpdateFieldInfo @@ -210,7 +214,13 @@ func (s *Segment) loadConfig() error { crcOffset := len(s.mm) - 4 s.crc = binary.BigEndian.Uint32(s.mm[crcOffset : crcOffset+4]) - verOffset := crcOffset - 4 + idLenOffset := crcOffset - 4 + idLen := binary.BigEndian.Uint32(s.mm[idLenOffset : idLenOffset+4]) + idOffset := idLenOffset - int(idLen) + id := string(s.mm[idOffset : idOffset+int(idLen)]) + s.writerId = id + + verOffset := idOffset - 4 s.version = binary.BigEndian.Uint32(s.mm[verOffset : verOffset+4]) if Version < IndexSectionsVersion && s.version != Version { return fmt.Errorf("unsupported version %d != %d", s.version, Version) @@ -385,11 +395,14 @@ func (sb *SegmentBase) loadFieldNew(fieldID uint16, pos uint64, fieldNameLen, sz := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) pos += uint64(sz) - fieldName := string(sb.mem[pos : pos+fieldNameLen]) + fieldName, err := sb.fileReader.process(sb.mem[pos : pos+fieldNameLen]) + if err != nil { + return err + } pos += fieldNameLen - sb.fieldsInv = append(sb.fieldsInv, fieldName) - sb.fieldsMap[fieldName] = uint16(fieldID + 1) + sb.fieldsInv = append(sb.fieldsInv, string(fieldName)) + sb.fieldsMap[string(fieldName)] = uint16(fieldID + 1) fieldNumSections, sz := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) pos += uint64(sz) @@ -461,6 +474,11 @@ func (sb *SegmentBase) dictionary(field string) (rv *Dictionary, err error) { } fstBytes := sb.mem[dictStart+uint64(read) : dictStart+uint64(read)+vellumLen] rv.incrementBytesRead(uint64(read) + vellumLen) + fstBytes, err := sb.fileReader.process(fstBytes) + if err != nil { + sb.m.Unlock() + return nil, err + } rv.fst, err = vellum.Load(fstBytes) if err != nil { sb.m.Unlock() @@ -509,7 +527,7 @@ func (sb *SegmentBase) thesaurus(name string) (rv *Thesaurus, err error) { } thesLoc, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) pos += uint64(n) - fst, synTermMap, err := sb.synIndexCache.loadOrCreate(rv.fieldID, sb.mem[thesLoc:]) + fst, synTermMap, err := sb.synIndexCache.loadOrCreate(rv.fieldID, sb.mem[thesLoc:], sb.fileReader) if err != nil { return nil, fmt.Errorf("thesaurus name %s err: %v", name, err) } @@ -550,7 +568,10 @@ func (sb *SegmentBase) visitStoredFields(vdc *visitDocumentCtx, num uint64, visitor segment.StoredFieldValueVisitor) error { // first make sure this is a valid number in this segment if num < sb.numDocs { - meta, compressed := sb.getDocStoredMetaAndCompressed(num) + meta, compressed, err := sb.getDocStoredMetaAndCompressed(num) + if err != nil { + return err + } vdc.reader.Reset(meta) @@ -630,7 +651,10 @@ func (sb *SegmentBase) DocID(num uint64) ([]byte, error) { vdc := visitDocumentCtxPool.Get().(*visitDocumentCtx) - meta, compressed := sb.getDocStoredMetaAndCompressed(num) + meta, compressed, err := sb.getDocStoredMetaAndCompressed(num) + if err != nil { + return nil, err + } vdc.reader.Reset(meta) diff --git a/synonym_cache.go b/synonym_cache.go index 0b8d56c2..012937bb 100644 --- a/synonym_cache.go +++ b/synonym_cache.go @@ -46,7 +46,7 @@ func (sc *synonymIndexCache) Clear() { // - A Vellum FST (Finite State Transducer) representing the thesaurus. // - A map associating synonym IDs to their corresponding terms. // This function returns the loaded or newly created tuple (FST and map). -func (sc *synonymIndexCache) loadOrCreate(fieldID uint16, mem []byte) (*vellum.FST, map[uint32][]byte, error) { +func (sc *synonymIndexCache) loadOrCreate(fieldID uint16, mem []byte, r *fileReader) (*vellum.FST, map[uint32][]byte, error) { sc.m.RLock() entry, ok := sc.cache[fieldID] if ok { @@ -64,18 +64,21 @@ func (sc *synonymIndexCache) loadOrCreate(fieldID uint16, mem []byte) (*vellum.F return entry.load() } - return sc.createAndCacheLOCKED(fieldID, mem) + return sc.createAndCacheLOCKED(fieldID, mem, r) } // createAndCacheLOCKED creates the synonym index cache for the specified fieldID and caches it. -func (sc *synonymIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte) (*vellum.FST, map[uint32][]byte, error) { +func (sc *synonymIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte, r *fileReader) (*vellum.FST, map[uint32][]byte, error) { var pos uint64 vellumLen, read := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) if vellumLen == 0 || read <= 0 { return nil, nil, fmt.Errorf("vellum length is 0") } pos += uint64(read) - fstBytes := mem[pos : pos+vellumLen] + fstBytes, err := r.process(mem[pos : pos+vellumLen]) + if err != nil { + return nil, nil, err + } fst, err := vellum.Load(fstBytes) if err != nil { return nil, nil, fmt.Errorf("vellum err: %v", err) @@ -86,17 +89,29 @@ func (sc *synonymIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte) (* if numSyns == 0 { return nil, nil, fmt.Errorf("no synonyms found") } + mapLen, n := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) + pos += uint64(n) + if mapLen == 0 { + return nil, nil, fmt.Errorf("synonym term map length is 0") + } + buf, err := r.process(mem[pos : pos+mapLen]) + if err != nil { + return nil, nil, err + } + pos += mapLen + bufLen := uint64(len(buf)) + var bufPos uint64 synTermMap := make(map[uint32][]byte, numSyns) for i := 0; i < int(numSyns); i++ { - synID, n := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) - pos += uint64(n) - termLen, n := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) - pos += uint64(n) + synID, n := binary.Uvarint(buf[bufPos:min(bufPos+binary.MaxVarintLen64, bufLen)]) + bufPos += uint64(n) + termLen, n := binary.Uvarint(buf[bufPos:min(bufPos+binary.MaxVarintLen64, bufLen)]) + bufPos += uint64(n) if termLen == 0 { return nil, nil, fmt.Errorf("term length is 0") } - term := mem[pos : pos+uint64(termLen)] - pos += uint64(termLen) + term := buf[bufPos : bufPos+uint64(termLen)] + bufPos += uint64(termLen) synTermMap[uint32(synID)] = term } sc.insertLOCKED(fieldID, fst, synTermMap) diff --git a/synonym_posting.go b/synonym_posting.go index c411a69b..9424dd56 100644 --- a/synonym_posting.go +++ b/synonym_posting.go @@ -110,7 +110,10 @@ func (rv *SynonymsList) read(synonymsOffset uint64, t *Thesaurus) error { synonymsLen, read = binary.Uvarint(t.sb.mem[synonymsOffset+n : synonymsOffset+n+binary.MaxVarintLen64]) n += uint64(read) - roaringBytes := t.sb.mem[synonymsOffset+n : synonymsOffset+n+synonymsLen] + roaringBytes, err := t.sb.fileReader.process(t.sb.mem[synonymsOffset+n : synonymsOffset+n+synonymsLen]) + if err != nil { + return err + } if rv.synonyms == nil { rv.synonyms = roaring64.NewBitmap() @@ -118,7 +121,7 @@ func (rv *SynonymsList) read(synonymsOffset uint64, t *Thesaurus) error { rv.buffer.Reset(roaringBytes) - _, err := rv.synonyms.ReadFrom(rv.buffer) + _, err = rv.synonyms.ReadFrom(rv.buffer) if err != nil { return fmt.Errorf("error loading roaring bitmap: %v", err) } diff --git a/write.go b/write.go index 4e7f5523..4a1d262f 100644 --- a/write.go +++ b/write.go @@ -32,6 +32,13 @@ func writeRoaringWithLen(r *roaring.Bitmap, w io.Writer, var tw int + if fw, ok := w.(*fileWriter); ok && fw != nil { + buf, err = fw.process(buf) + if err != nil { + return tw, err + } + } + // write out the length n := binary.PutUvarint(reuseBufVarint, uint64(len(buf))) nw, err := w.Write(reuseBufVarint[:n]) @@ -50,16 +57,20 @@ func writeRoaringWithLen(r *roaring.Bitmap, w io.Writer, return tw, nil } -func persistFieldsSection(fieldsInv []string, w *CountHashWriter, opaque map[int]resetable) (uint64, error) { +func persistFieldsSection(fieldsInv []string, w *fileWriter, opaque map[int]resetable) (uint64, error) { var rv uint64 fieldsOffsets := make([]uint64, 0, len(fieldsInv)) for fieldID, fieldName := range fieldsInv { // record start of this field fieldsOffsets = append(fieldsOffsets, uint64(w.Count())) + fieldName, err := w.process([]byte(fieldName)) + if err != nil { + return 0, err + } // write field name length - _, err := writeUvarints(w, uint64(len(fieldName))) + _, err = writeUvarints(w, uint64(len(fieldName))) if err != nil { return 0, err } @@ -109,7 +120,7 @@ const FooterSize = 4 + 4 + 4 + 8 + 8 + 8 + 8 + 8 // in the index sections format, the fieldsIndexOffset points to the sectionsIndexOffset func persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset, sectionsIndexOffset, docValueOffset uint64, - chunkMode uint32, crcBeforeFooter uint32, writerIn io.Writer) error { + chunkMode uint32, crcBeforeFooter uint32, writerIn io.Writer, writerId string) error { w := NewCountHashWriter(writerIn) w.crc = crcBeforeFooter @@ -150,6 +161,17 @@ func persistFooter(numDocs, storedIndexOffset, fieldsIndexOffset, sectionsIndexO if err != nil { return err } + + // write out the length of the writer id and the writer id + _, err = w.Write([]byte(writerId)) + if err != nil { + return err + } + err = binary.Write(w, binary.BigEndian, uint32(len(writerId))) + if err != nil { + return err + } + // write out CRC-32 of everything upto but not including this CRC err = binary.Write(w, binary.BigEndian, w.crc) if err != nil { diff --git a/writer.go b/writer.go new file mode 100644 index 00000000..1cd6126a --- /dev/null +++ b/writer.go @@ -0,0 +1,78 @@ +// Copyright (c) 2025 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package zap + +type fileWriter struct { + writerCB func(data []byte) ([]byte, error) + id string + c *CountHashWriter +} + +var WriterCallbackGetter = func() (string, func(data []byte) ([]byte, error)) { + return "", func(data []byte) ([]byte, error) { + return data, nil + } +} + +var ReaderCallbackGetter = func(string) func(data []byte) ([]byte, error) { + return func(data []byte) ([]byte, error) { + return data, nil + } +} + +func NewFileWriter(c *CountHashWriter) *fileWriter { + rv := &fileWriter{c: c} + rv.id, rv.writerCB = WriterCallbackGetter() + + return rv +} + +func (w *fileWriter) Write(data []byte) (int, error) { + return w.c.Write(data) +} + +func (w *fileWriter) process(data []byte) ([]byte, error) { + if w.writerCB != nil { + return w.writerCB(data) + } + return data, nil +} + +func (w *fileWriter) Count() int { + return w.c.Count() +} + +func (w *fileWriter) Sum32() uint32 { + return w.c.Sum32() +} + +type fileReader struct { + callback func(data []byte) ([]byte, error) + id string +} + +func NewFileReader(id string) *fileReader { + rv := &fileReader{id: id} + rv.callback = ReaderCallbackGetter(id) + + return rv +} + +func (r *fileReader) process(data []byte) ([]byte, error) { + if r.callback != nil { + return r.callback(data) + } + return data, nil +} From b03a818dad8c1472176082d0be6e48b0342a803c Mon Sep 17 00:00:00 2001 From: Likith B Date: Mon, 21 Jul 2025 12:07:30 +0530 Subject: [PATCH 2/5] MB-65860: Minor changes to callbacks --- build.go | 17 +++++++----- writer.go => callbacks.go | 56 ++++++++++++++++++++++++++++++--------- merge.go | 6 ++++- new.go | 6 ++++- segment.go | 10 ++++++- 5 files changed, 73 insertions(+), 22 deletions(-) rename writer.go => callbacks.go (55%) diff --git a/build.go b/build.go index 29a7e1c7..87bf2502 100644 --- a/build.go +++ b/build.go @@ -175,17 +175,22 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64 vecIndexCache: newVectorIndexCache(), synIndexCache: newSynonymIndexCache(), // following fields gets populated by loadFieldsNew - fieldsMap: make(map[string]uint16), - dictLocs: make([]uint64, 0), - fieldsInv: make([]string, 0), - writerId: writerId, - fileReader: NewFileReader(writerId), + fieldsMap: make(map[string]uint16), + dictLocs: make([]uint64, 0), + fieldsInv: make([]string, 0), + writerId: writerId, } sb.updateSize() + fileReader, err := NewFileReader(sb.writerId) + if err != nil { + return nil, err + } + sb.fileReader = fileReader + // load the data/section starting offsets for each field // by via the sectionsIndexOffset as starting point. - err := sb.loadFieldsNew() + err = sb.loadFieldsNew() if err != nil { return nil, err } diff --git a/writer.go b/callbacks.go similarity index 55% rename from writer.go rename to callbacks.go index 1cd6126a..b3833d27 100644 --- a/writer.go +++ b/callbacks.go @@ -15,28 +15,41 @@ package zap type fileWriter struct { - writerCB func(data []byte) ([]byte, error) + writerCB func(data []byte, counter []byte) ([]byte, error) + counter []byte id string c *CountHashWriter } -var WriterCallbackGetter = func() (string, func(data []byte) ([]byte, error)) { - return "", func(data []byte) ([]byte, error) { +var WriterCallbackGetter = func() (string, func(data []byte, _ []byte) ([]byte, error), error) { + return "", func(data []byte, _ []byte) ([]byte, error) { return data, nil - } + }, nil } -var ReaderCallbackGetter = func(string) func(data []byte) ([]byte, error) { +var ReaderCallbackGetter = func(string) (func(data []byte) ([]byte, error), error) { return func(data []byte) ([]byte, error) { return data, nil - } + }, nil } -func NewFileWriter(c *CountHashWriter) *fileWriter { +var CounterGetter = func() ([]byte, error) { + return nil, nil +} + +func NewFileWriter(c *CountHashWriter) (*fileWriter, error) { + var err error rv := &fileWriter{c: c} - rv.id, rv.writerCB = WriterCallbackGetter() + rv.id, rv.writerCB, err = WriterCallbackGetter() + if err != nil { + return nil, err + } + rv.counter, err = CounterGetter() + if err != nil { + return nil, err + } - return rv + return rv, nil } func (w *fileWriter) Write(data []byte) (int, error) { @@ -45,11 +58,24 @@ func (w *fileWriter) Write(data []byte) (int, error) { func (w *fileWriter) process(data []byte) ([]byte, error) { if w.writerCB != nil { - return w.writerCB(data) + w.incrementCounter() + return w.writerCB(data, w.counter) } return data, nil } +func (w *fileWriter) incrementCounter() { + if w.counter != nil { + for i := len(w.counter) - 1; i >= 0; i-- { + if w.counter[i] < 255 { + w.counter[i]++ + return + } + w.counter[i] = 0 + } + } +} + func (w *fileWriter) Count() int { return w.c.Count() } @@ -63,11 +89,15 @@ type fileReader struct { id string } -func NewFileReader(id string) *fileReader { +func NewFileReader(id string) (*fileReader, error) { + var err error rv := &fileReader{id: id} - rv.callback = ReaderCallbackGetter(id) + rv.callback, err = ReaderCallbackGetter(id) + if err != nil { + return nil, err + } - return rv + return rv, nil } func (r *fileReader) process(data []byte) ([]byte, error) { diff --git a/merge.go b/merge.go index c195a0ab..16408ca8 100644 --- a/merge.go +++ b/merge.go @@ -73,7 +73,11 @@ func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, pat // wrap it for counting (tracking offsets) cr := NewCountHashWriterWithStatsReporter(br, s) - w := NewFileWriter(cr) + w, err := NewFileWriter(cr) + if err != nil { + cleanup() + return nil, 0, err + } newDocNums, numDocs, storedIndexOffset, _, _, sectionsIndexOffset, err := mergeToWriter(segmentBases, drops, chunkMode, w, closeCh) diff --git a/new.go b/new.go index ef4202db..79cabf66 100644 --- a/new.go +++ b/new.go @@ -62,9 +62,13 @@ func (*ZapPlugin) newWithChunkMode(results []index.Document, br.Grow(estimateAvgBytesPerDoc * estimateNumResults) } + var err error s.results = results s.chunkMode = chunkMode - s.w = NewFileWriter(NewCountHashWriter(&br)) + s.w, err = NewFileWriter(NewCountHashWriter(&br)) + if err != nil { + return nil, uint64(0), err + } storedIndexOffset, sectionsIndexOffset, err := s.convert() if err != nil { diff --git a/segment.go b/segment.go index bb004b1a..8058e918 100644 --- a/segment.go +++ b/segment.go @@ -73,7 +73,11 @@ func (*ZapPlugin) Open(path string) (segment.Segment, error) { return nil, err } - rv.fileReader = NewFileReader(rv.writerId) + rv.fileReader, err = NewFileReader(rv.writerId) + if err != nil { + _ = rv.Close() + return nil, err + } err = rv.loadFieldsNew() if err != nil { @@ -989,3 +993,7 @@ func (s *SegmentBase) GetUpdatedFields() map[string]*index.UpdateFieldInfo { func (s *SegmentBase) SetUpdatedFields(updatedFields map[string]*index.UpdateFieldInfo) { s.updatedFields = updatedFields } + +func (sb *SegmentBase) CallbackId() string { + return sb.writerId +} From b603f5e14ad116035e0293dfed6cd4f694310a95 Mon Sep 17 00:00:00 2001 From: Likith B Date: Wed, 23 Jul 2025 11:03:21 +0530 Subject: [PATCH 3/5] MB:65860: Fixed double counting of bytes read --- intDecoder.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/intDecoder.go b/intDecoder.go index b56e88b6..253da467 100644 --- a/intDecoder.go +++ b/intDecoder.go @@ -88,8 +88,6 @@ func (d *chunkedIntDecoder) loadChunk(chunk int) error { s, e := readChunkBoundary(chunk, d.chunkOffsets) start += s end += e - d.curChunkBytes = d.data[start:end] - d.bytesRead += uint64(len(d.curChunkBytes)) var err error d.curChunkBytes, err = d.fr.process(d.data[start:end]) From 08b133aff6804f17f2b315ddab9de97c6c84ac92 Mon Sep 17 00:00:00 2001 From: Likith B Date: Wed, 23 Jul 2025 11:07:22 +0530 Subject: [PATCH 4/5] MB-65860: Fixed comments around idFieldVal append --- merge.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/merge.go b/merge.go index 16408ca8..36523b25 100644 --- a/merge.go +++ b/merge.go @@ -536,14 +536,12 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, return 0, nil, err } - // CHECK - // Making a copy because TestMergeWithEmptySegmentsFirst fails - // during an append to idFieldVal + // idFieldVal is a pointer to a mem mapped byte slice, so we copy + // before merging it with the compressed data buf := make([]byte, 0, len(idFieldVal)+len(compressed)) buf = append(buf, idFieldVal...) buf = append(buf, compressed...) - // bufCompressed, err := w.process(append(idFieldVal, compressed...)) bufCompressed, err := w.process(buf) if err != nil { return 0, nil, err From 2594f5b126caaa54055c59b48573e2e4aa7b184a Mon Sep 17 00:00:00 2001 From: Likith B Date: Mon, 25 Aug 2025 12:14:03 +0530 Subject: [PATCH 5/5] MB-65860: Added testcases, commentary and minor fixes --- callbacks.go | 49 +++++++++++-- callbacks_test.go | 131 ++++++++++++++++++++++++++++++++++ faiss_vector_cache.go | 2 +- faiss_vector_test.go | 23 ++++-- intcoder.go | 2 +- merge.go | 2 +- section_faiss_vector_index.go | 12 ++-- 7 files changed, 200 insertions(+), 21 deletions(-) create mode 100644 callbacks_test.go diff --git a/callbacks.go b/callbacks.go index b3833d27..21e54270 100644 --- a/callbacks.go +++ b/callbacks.go @@ -14,29 +14,60 @@ package zap -type fileWriter struct { - writerCB func(data []byte, counter []byte) ([]byte, error) - counter []byte - id string - c *CountHashWriter -} - +// This file provides a mechanism for users of zap to provide callbacks +// that can process data before it is written to disk, and after it is read +// from disk. This can be used for things like encryption, compression, etc. + +// The user is responsible for ensuring that the writer and reader callbacks +// are compatible with each other, and that any state needed by the callbacks +// is managed appropriately. For example, if the writer callback uses a +// unique key or nonce per write, the reader callback must be able to +// determine the correct key or nonce to use for each read. + +// The callbacks are identified by an id string, which is returned by the +// WriterCallbackGetter. The same id string is passed to the ReaderCallbackGetter +// when creating a reader. This allows the reader to determine which +// callback to use for a given file. + +// Additionaly, if the writer callback needs a unique counter or nonce +// per write, the CounterGetter can be used to provide that. The counter +// is passed to the writer callback along with the data to be written. +// The counter is not passed to the reader callback, as it is assumed that +// the reader callback can determine the correct counter to use based +// on the data being read. + +// An example implementation using AES-GCM is provided in callbacks_test.go +// within initFileCallbacks. + +// Default no-op implementation. Is called before writing any user data to a file. var WriterCallbackGetter = func() (string, func(data []byte, _ []byte) ([]byte, error), error) { return "", func(data []byte, _ []byte) ([]byte, error) { return data, nil }, nil } +// Default no-op implementation. Is called after reading any user data from a file. var ReaderCallbackGetter = func(string) (func(data []byte) ([]byte, error), error) { return func(data []byte) ([]byte, error) { return data, nil }, nil } +// Default no-op implementation. Is called once per write call if a unique counter is +// needed by the writer callback. var CounterGetter = func() ([]byte, error) { return nil, nil } +// fileWriter wraps a CountHashWriter and applies a user provided +// writer callback to the data being written. +type fileWriter struct { + writerCB func(data []byte, counter []byte) ([]byte, error) + counter []byte + id string + c *CountHashWriter +} + func NewFileWriter(c *CountHashWriter) (*fileWriter, error) { var err error rv := &fileWriter{c: c} @@ -56,6 +87,8 @@ func (w *fileWriter) Write(data []byte) (int, error) { return w.c.Write(data) } +// process applies the writer callback to the data, if one is set +// and increments the counter if one is set. func (w *fileWriter) process(data []byte) ([]byte, error) { if w.writerCB != nil { w.incrementCounter() @@ -84,6 +117,7 @@ func (w *fileWriter) Sum32() uint32 { return w.c.Sum32() } +// fileReader wraps a reader callback to be applied to data read from a file. type fileReader struct { callback func(data []byte) ([]byte, error) id string @@ -100,6 +134,7 @@ func NewFileReader(id string) (*fileReader, error) { return rv, nil } +// process applies the reader callback to the data, if one is set func (r *fileReader) process(data []byte) ([]byte, error) { if r.callback != nil { return r.callback(data) diff --git a/callbacks_test.go b/callbacks_test.go new file mode 100644 index 00000000..39d6a924 --- /dev/null +++ b/callbacks_test.go @@ -0,0 +1,131 @@ +// Copyright (c) 2025 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +//go:build vectors +// +build vectors + +package zap + +import ( + "crypto/aes" + "crypto/cipher" + "crypto/rand" + "fmt" + "testing" +) + +func initFileCallbacks(t *testing.T) { + key := make([]byte, 32) + keyId := "test-key-id" + + if _, err := rand.Read(key); err != nil { + t.Fatalf("Failed to generate random key: %v", err) + } + + block, err := aes.NewCipher(key) + if err != nil { + t.Fatalf("Failed to create AES cipher: %v", err) + } + + aesgcm, err := cipher.NewGCM(block) + if err != nil { + t.Fatalf("Failed to create AES GCM: %v", err) + } + + CounterGetter = func() ([]byte, error) { + counter := make([]byte, 12) + if _, err := rand.Read(counter); err != nil { + return nil, err + } + return counter, nil + } + + writerCallback := func(data, counter []byte) ([]byte, error) { + ciphertext := aesgcm.Seal(nil, counter, data, nil) + result := append(ciphertext, counter...) + return result, nil + } + + readerCallback := func(data []byte) ([]byte, error) { + if len(data) < 12 { + return nil, fmt.Errorf("ciphertext too short") + } + + counter := data[len(data)-12:] + ciphertext := data[:len(data)-12] + plaintext, err := aesgcm.Open(nil, counter, ciphertext, nil) + if err != nil { + return nil, err + } + return plaintext, nil + } + + WriterCallbackGetter = func() (string, func(data []byte, counter []byte) ([]byte, error), error) { + return keyId, writerCallback, nil + } + + ReaderCallbackGetter = func(id string) (func(data []byte) ([]byte, error), error) { + if id != keyId { + return nil, fmt.Errorf("unknown callback ID: %s", id) + } + return readerCallback, nil + } +} + +func TestFileCallbacks(t *testing.T) { + initFileCallbacks(t) + + TestOpen(t) + TestOpenMulti(t) + TestOpenMultiWithTwoChunks(t) + TestSegmentVisitableDocValueFieldsList(t) + TestSegmentDocsWithNonOverlappingFields(t) + TestMergedSegmentDocsWithNonOverlappingFields(t) + + TestChunkedContentCoder(t) + TestChunkedContentCoders(t) + + TestDictionary(t) + TestDictionaryError(t) + TestDictionaryBug1156(t) + + TestEnumerator(t) + + TestVecPostingsIterator(t) + TestVectorSegment(t) + TestHashCode(t) + TestPersistedVectorSegment(t) + TestValidVectorMerge(t) + + TestChunkIntCoder(t) + TestChunkLengthToOffsets(t) + TestChunkReadBoundaryFromOffsets(t) + + TestMerge(t) + TestMergeWithEmptySegment(t) + TestMergeWithEmptySegments(t) + TestMergeWithEmptySegmentFirst(t) + TestMergeWithEmptySegmentsFirst(t) + TestMergeAndDrop(t) + TestMergeAndDropAllFromOneSegment(t) + TestMergeWithUpdates(t) + TestMergeWithUpdatesOnManySegments(t) + TestMergeWithUpdatesOnOneDoc(t) + TestMergeBytesWritten(t) + TestUnder32Bits(t) + + TestSynonymSegment(t) + + TestRoaringSizes(t) +} diff --git a/faiss_vector_cache.go b/faiss_vector_cache.go index eea82e78..744ce94f 100644 --- a/faiss_vector_cache.go +++ b/faiss_vector_cache.go @@ -129,12 +129,12 @@ func (vc *vectorIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte, mapLen, n := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) pos += n - pos += int(mapLen) buf, err := r.process(mem[pos : pos+int(mapLen)]) if err != nil { return nil, nil, nil, nil, err } + pos += int(mapLen) bufPos := 0 bufLen := len(buf) diff --git a/faiss_vector_test.go b/faiss_vector_test.go index ac50a471..2060e78d 100644 --- a/faiss_vector_test.go +++ b/faiss_vector_test.go @@ -332,21 +332,32 @@ func getSectionContentOffsets(sb *SegmentBase, offset uint64) ( numVecs, n = binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) pos += uint64(n) + idMapLen, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += uint64(n) vecDocIDsMappingOffset = pos + + buf, err := sb.fileReader.process(sb.mem[pos:int(pos+idMapLen)]) + if err != nil { + return 0, 0, 0, 0, 0, 0 + } + pos += idMapLen + + bufPos := 0 + bufLen := len(buf) for i := 0; i < int(numVecs); i++ { - _, n := binary.Varint(sb.mem[pos : pos+binary.MaxVarintLen64]) - pos += uint64(n) - _, n = binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) - pos += uint64(n) + _, n := binary.Varint(buf[bufPos:min(bufPos+binary.MaxVarintLen64, bufLen)]) + bufPos += n + _, n = binary.Uvarint(buf[bufPos:min(bufPos+binary.MaxVarintLen64, bufLen)]) + bufPos += n } indexBytesLen, n = binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) pos += uint64(n) - indexBytesOffset = pos + indexBytes, err := sb.fileReader.process(sb.mem[pos:int(pos+indexBytesLen)]) pos += indexBytesLen - return docValueStart, docValueEnd, indexBytesLen, indexBytesOffset, numVecs, vecDocIDsMappingOffset + return docValueStart, docValueEnd, uint64(len(indexBytes)), indexBytesOffset, numVecs, vecDocIDsMappingOffset } func serializeVecs(dataset [][]float32) []float32 { diff --git a/intcoder.go b/intcoder.go index d0803338..15d71744 100644 --- a/intcoder.go +++ b/intcoder.go @@ -157,8 +157,8 @@ func (c *chunkedIntCoder) Write(w io.Writer) (int, error) { return 0, err } processedBuf = append(processedBuf, buf...) - c.chunkLens[i] = uint64(len(buf)) prevOffset = int(chunkOffsets[i]) + c.chunkLens[i] = uint64(len(buf)) } c.final = processedBuf chunkOffsets = modifyLengthsToEndOffsets(c.chunkLens) diff --git a/merge.go b/merge.go index 36523b25..7cd6fc0b 100644 --- a/merge.go +++ b/merge.go @@ -399,7 +399,7 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap, // copying data from segments is only possible when // there are no writer callbacks across all segments - copyFlag := true + copyFlag := false for _, segment := range segments { if segment.writerId != "" { copyFlag = true diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index cb203eee..0cc17294 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -141,20 +141,22 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se idMapLen, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) pos += n - pos += int(idMapLen) buf, err := sb.fileReader.process(sb.mem[pos : pos+int(idMapLen)]) if err != nil { return err } + pos += int(idMapLen) + bufPos := 0 + bufLen := len(buf) curIdx := len(indexes) - 1 for i := 0; i < int(numVecs); i++ { - vecID, n := binary.Varint(buf[bufPos : bufPos+binary.MaxVarintLen64]) + vecID, n := binary.Varint(buf[bufPos:min(bufPos+binary.MaxVarintLen64, bufLen)]) bufPos += n - docID, n := binary.Uvarint(buf[bufPos : bufPos+binary.MaxVarintLen64]) + docID, n := binary.Uvarint(buf[bufPos:min(bufPos+binary.MaxVarintLen64, bufLen)]) bufPos += n // remap the docID from the old segment to the new document nos. @@ -229,7 +231,7 @@ func (v *vectorIndexOpaque) flushSectionMetadata(fieldID int, w *fileWriter, return err } - idBuf := make([]byte, 0, binary.MaxVarintLen64*2*len(vecToDocID)) + idBuf := make([]byte, binary.MaxVarintLen64*2*len(vecToDocID)) pos := 0 for vecID, docID := range vecToDocID { @@ -592,7 +594,7 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *fileWriter) (offset uint64, e // section would be help avoiding in paging in this data as part of a page // (which is to load a non-cacheable info like index). this could help the // paging costs - idBuf := make([]byte, 0, binary.MaxVarintLen64*2*len(content.vecs)) + idBuf := make([]byte, binary.MaxVarintLen64*2*len(content.vecs)) pos := 0 for vecID := range content.vecs { docID := vo.vecIDMap[vecID].docID