Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func persistStoredFieldValues(fieldID int,
}

func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64,
storedIndexOffset uint64, sectionsIndexOffset uint64) (*SegmentBase, error) {
storedIndexOffset uint64, sectionsIndexOffset uint64, config map[string]interface{}) (*SegmentBase, error) {
sb := &SegmentBase{
mem: mem,
memCRC: memCRC,
Expand All @@ -176,6 +176,7 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64
fieldsMap: make(map[string]uint16),
dictLocs: make([]uint64, 0),
fieldsInv: make([]string, 0),
config: config,
}
sb.updateSize()

Expand Down
10 changes: 5 additions & 5 deletions build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,28 +48,28 @@ func buildTestSegment() (*SegmentBase, uint64, error) {
doc,
}

seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode)
seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode, nil)
return seg.(*SegmentBase), size, err
}

func buildTestSegmentMulti() (*SegmentBase, uint64, error) {
results := buildTestAnalysisResultsMulti()

seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode)
seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode, nil)
return seg.(*SegmentBase), size, err
}

func buildTestSegmentMultiWithChunkFactor(chunkFactor uint32) (*SegmentBase, uint64, error) {
results := buildTestAnalysisResultsMulti()

seg, size, err := zapPlugin.newWithChunkMode(results, chunkFactor)
seg, size, err := zapPlugin.newWithChunkMode(results, chunkFactor, nil)
return seg.(*SegmentBase), size, err
}

func buildTestSegmentMultiWithDifferentFields(includeDocA, includeDocB bool) (*SegmentBase, uint64, error) {
results := buildTestAnalysisResultsMultiWithDifferentFields(includeDocA, includeDocB)

seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode)
seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode, nil)
return seg.(*SegmentBase), size, err
}

Expand Down Expand Up @@ -152,7 +152,7 @@ func buildTestSegmentWithDefaultFieldMapping(chunkFactor uint32) (
doc,
}

sb, _, err := zapPlugin.newWithChunkMode(results, chunkFactor)
sb, _, err := zapPlugin.newWithChunkMode(results, chunkFactor, nil)

return sb.(*SegmentBase), fields, err
}
2 changes: 1 addition & 1 deletion dict_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func buildTestSegmentForDict() (*SegmentBase, uint64, error) {
doc,
}

seg, size, err := zapPlugin.newWithChunkMode(results, 1024)
seg, size, err := zapPlugin.newWithChunkMode(results, 1024, nil)
return seg.(*SegmentBase), size, err
}

Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/sys v0.13.0 // indirect
)

replace github.com/blevesearch/go-faiss => /Users/thejas.orkombu/fts/blevesearch/go-faiss
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ github.com/bits-and-blooms/bitset v1.22.0 h1:Tquv9S8+SGaS3EhyA+up3FXzmkhxPGjQQCk
github.com/bits-and-blooms/bitset v1.22.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/blevesearch/bleve_index_api v1.2.8 h1:Y98Pu5/MdlkRyLM0qDHostYo7i+Vv1cDNhqTeR4Sy6Y=
github.com/blevesearch/bleve_index_api v1.2.8/go.mod h1:rKQDl4u51uwafZxFrPD1R7xFOwKnzZW7s/LSeK4lgo0=
github.com/blevesearch/go-faiss v1.0.25 h1:lel1rkOUGbT1CJ0YgzKwC7k+XH0XVBHnCVWahdCXk4U=
github.com/blevesearch/go-faiss v1.0.25/go.mod h1:OMGQwOaRRYxrmeNdMrXJPvVx8gBnvE5RYrr0BahNnkk=
github.com/blevesearch/mmap-go v1.0.4 h1:OVhDhT5B/M1HNPpYPBKIEJaD0F3Si+CrEKULGCDPWmc=
github.com/blevesearch/mmap-go v1.0.4/go.mod h1:EWmEAOmdAS9z/pi/+Toxu99DnsbhG1TIxUoRmJw/pSs=
github.com/blevesearch/scorch_segment_api/v2 v2.3.10 h1:Yqk0XD1mE0fDZAJXTjawJ8If/85JxnLd8v5vG/jWE/s=
Expand Down
24 changes: 18 additions & 6 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,21 @@ const docDropped = math.MaxUint64 // sentinel docNum to represent a deleted doc
// Merge takes a slice of segments and bit masks describing which
// documents may be dropped, and creates a new segment containing the
// remaining data. This new segment is built at the specified path.
func (*ZapPlugin) Merge(segments []seg.Segment, drops []*roaring.Bitmap, path string,
func (z *ZapPlugin) Merge(segments []seg.Segment, drops []*roaring.Bitmap, path string,
closeCh chan struct{}, s seg.StatsReporter) (
[][]uint64, uint64, error) {
return z.merge(segments, drops, path, closeCh, s, nil)
}

func (z *ZapPlugin) MergeEx(segments []seg.Segment, drops []*roaring.Bitmap, path string,
closeCh chan struct{}, s seg.StatsReporter, config map[string]interface{}) (
[][]uint64, uint64, error) {
return z.merge(segments, drops, path, closeCh, s, config)
}

func (*ZapPlugin) merge(segments []seg.Segment, drops []*roaring.Bitmap, path string,
closeCh chan struct{}, s seg.StatsReporter, config map[string]interface{}) (
[][]uint64, uint64, error) {
segmentBases := make([]*SegmentBase, len(segments))
for segmenti, segment := range segments {
switch segmentx := segment.(type) {
Expand All @@ -49,11 +61,11 @@ func (*ZapPlugin) Merge(segments []seg.Segment, drops []*roaring.Bitmap, path st
panic(fmt.Sprintf("oops, unexpected segment type: %T", segment))
}
}
return mergeSegmentBases(segmentBases, drops, path, DefaultChunkMode, closeCh, s)
return mergeSegmentBases(segmentBases, drops, path, DefaultChunkMode, closeCh, s, config)
}

func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, path string,
chunkMode uint32, closeCh chan struct{}, s seg.StatsReporter) (
chunkMode uint32, closeCh chan struct{}, s seg.StatsReporter, config map[string]interface{}) (
[][]uint64, uint64, error) {
flag := os.O_RDWR | os.O_CREATE

Expand All @@ -74,7 +86,7 @@ func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, pat
cr := NewCountHashWriterWithStatsReporter(br, s)

newDocNums, numDocs, storedIndexOffset, _, _, sectionsIndexOffset, err :=
mergeToWriter(segmentBases, drops, chunkMode, cr, closeCh)
mergeToWriter(segmentBases, drops, chunkMode, cr, closeCh, config)
if err != nil {
cleanup()
return nil, 0, err
Expand Down Expand Up @@ -110,7 +122,7 @@ func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, pat
}

func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
chunkMode uint32, cr *CountHashWriter, closeCh chan struct{}) (
chunkMode uint32, cr *CountHashWriter, closeCh chan struct{}, config map[string]interface{}) (
newDocNums [][]uint64, numDocs, storedIndexOffset uint64,
fieldsInv []string, fieldsMap map[string]uint16, sectionsIndexOffset uint64,
err error) {
Expand Down Expand Up @@ -147,7 +159,7 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
for i, x := range segmentSections {
mergeOpaque[int(i)] = x.InitOpaque(args)

err = x.Merge(mergeOpaque, segments, drops, fieldsInv, newDocNums, cr, closeCh)
err = x.Merge(mergeOpaque, segments, drops, fieldsInv, newDocNums, cr, closeCh, config)
if err != nil {
return nil, 0, 0, nil, nil, 0, err
}
Expand Down
4 changes: 2 additions & 2 deletions merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func testMergeWithEmptySegments(t *testing.T, before bool, numEmptySegments int)

_ = os.RemoveAll("/tmp/" + fname)

emptySegment, _, err := zapPlugin.newWithChunkMode([]index.Document{}, 1024)
emptySegment, _, err := zapPlugin.newWithChunkMode([]index.Document{}, 1024, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -644,7 +644,7 @@ func buildTestSegmentMultiHelper(docIds []string) (*SegmentBase, uint64, error)
doc2,
}

seg, size, err := zapPlugin.newWithChunkMode(results, 1024)
seg, size, err := zapPlugin.newWithChunkMode(results, 1024, nil)
return seg.(*SegmentBase), size, err
}

Expand Down
11 changes: 8 additions & 3 deletions new.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,16 @@ var ValidateDocFields = func(field index.Field) error {
// New creates an in-memory zap-encoded SegmentBase from a set of Documents
func (z *ZapPlugin) New(results []index.Document) (
segment.Segment, uint64, error) {
return z.newWithChunkMode(results, DefaultChunkMode)
return z.newWithChunkMode(results, DefaultChunkMode, nil)
}

func (z *ZapPlugin) NewEx(results []index.Document, config map[string]interface{}) (
segment.Segment, uint64, error) {
return z.newWithChunkMode(results, DefaultChunkMode, config)
}

func (*ZapPlugin) newWithChunkMode(results []index.Document,
chunkMode uint32) (segment.Segment, uint64, error) {
chunkMode uint32, config map[string]interface{}) (segment.Segment, uint64, error) {
s := interimPool.Get().(*interim)

var br bytes.Buffer
Expand All @@ -72,7 +77,7 @@ func (*ZapPlugin) newWithChunkMode(results []index.Document,
}

sb, err := InitSegmentBase(br.Bytes(), s.w.Sum32(), chunkMode,
uint64(len(results)), storedIndexOffset, sectionsIndexOffset)
uint64(len(results)), storedIndexOffset, sectionsIndexOffset, config)

// get the bytes written before the interim's reset() call
// write it to the newly formed segment base.
Expand Down
2 changes: 1 addition & 1 deletion section.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type section interface {
// as part of the merge API, write the merged data to the writer and also track
// the starting offset of this newly merged section data.
Merge(opaque map[int]resetable, segments []*SegmentBase, drops []*roaring.Bitmap, fieldsInv []string,
newDocNumsIn [][]uint64, w *CountHashWriter, closeCh chan struct{}) error
newDocNumsIn [][]uint64, w *CountHashWriter, closeCh chan struct{}, config map[string]interface{}) error

// opaque is used to track the data specific to this section. its not visible
// to the other sections and is only visible and freely modifiable by this specifc
Expand Down
126 changes: 121 additions & 5 deletions section_faiss_vector_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ type vecIndexInfo struct {

// keep in mind with respect to update and delete operations with respect to vectors
func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*SegmentBase,
drops []*roaring.Bitmap, fieldsInv []string,
newDocNumsIn [][]uint64, w *CountHashWriter, closeCh chan struct{}) error {
drops []*roaring.Bitmap, fieldsInv []string, newDocNumsIn [][]uint64, w *CountHashWriter,
closeCh chan struct{}, config map[string]interface{}) error {
vo := v.getvectorIndexOpaque(opaque)

// the segments with valid vector sections in them
Expand Down Expand Up @@ -170,7 +170,7 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se
if err != nil {
return err
}
err = vo.mergeAndWriteVectorIndexes(vecSegs, indexes, w, closeCh)
err = vo.mergeAndWriteVectorIndexes(vecSegs, indexes, w, closeCh, config)
if err != nil {
return err
}
Expand Down Expand Up @@ -244,7 +244,6 @@ func (v *vectorIndexOpaque) flushVectorIndex(indexBytes []byte, w *CountHashWrit
return err
}

// write the vector index data
_, err = w.Write(indexBytes)
return err
}
Expand All @@ -266,10 +265,114 @@ func calculateNprobe(nlist int, indexOptimizedFor string) int32 {
return nprobe
}

func (v *vectorIndexOpaque) fastMergeIndexes(vecIndexes []*vecIndexInfo, trainData []float32, w *CountHashWriter, closeCh chan struct{}) error {
defer freeReconstructedIndexes(vecIndexes)
// var vecIDs []int64
var mergeCandidates []int
var indexOptimizedFor string
var dims, metric, nvecs, reconsCap int
var trained bool
for i := 0; i < len(vecIndexes); i++ {
if isClosed(closeCh) {
return seg.ErrClosed
}
if len(vecIndexes[i].vecIds) == 0 {
continue
} else if vecIndexes[i].index.Nlist() >= int(4*math.Sqrt(float64(1000000))) {
// merging only IVFSQ8 indexes
if len(vecIndexes[i].vecIds) >= 10000 {
mergeCandidates = append(mergeCandidates, i)
}
}

if len(vecIndexes[i].vecIds) > 0 {
indexReconsLen := len(vecIndexes[i].vecIds) * vecIndexes[i].index.D()
if indexReconsLen > reconsCap {
reconsCap = indexReconsLen
}
}
nvecs += len(vecIndexes[i].vecIds)
indexOptimizedFor = vecIndexes[i].indexOptimizedFor
dims = vecIndexes[i].index.D()
metric = int(vecIndexes[i].index.MetricType())

}

// index type to be created after merge based on the number of vectors
// in indexData added into the index.
nlist := determineCentroids(nvecs)
if len(trainData) > 0 {
nlist = len(trainData) / (50 * dims)
}
indexDescription, indexClass := determineIndexToUse(nvecs, nlist, indexOptimizedFor)

faissIndex, err := faiss.IndexFactory(dims, indexDescription, metric)
if err != nil {
return err
}
defer faissIndex.Close()

reconsVecs := make([]float32, 0, reconsCap)
if indexClass == IndexTypeIVF {
err = faissIndex.SetDirectMap(2)
if err != nil {
return err
}

nprobe := calculateNprobe(nlist, indexOptimizedFor)
faissIndex.SetNProbe(nprobe)
if !trained {
if len(mergeCandidates) > 0 {
err = faissIndex.SetQuantizers(vecIndexes[mergeCandidates[0]].index)
if err != nil {
return err
}
} else {
err = faissIndex.Train(trainData)
if err != nil {
return err
}
}
}
var j int
for i := 0; i < len(vecIndexes); i++ {
if isClosed(closeCh) {
return seg.ErrClosed
}
if j < len(mergeCandidates) && i == mergeCandidates[j] {
err = faissIndex.MergeFrom(vecIndexes[i].index, 0)
if err != nil {
return err
}
j++
} else {
neededReconsLen := len(vecIndexes[i].vecIds) * vecIndexes[i].index.D()
reconsVecs = reconsVecs[:neededReconsLen]
reconsVecs, err := vecIndexes[i].index.ReconstructBatch(vecIndexes[i].vecIds, reconsVecs)
if err != nil {
return err
}
err = faissIndex.AddWithIDs(reconsVecs, vecIndexes[i].vecIds)
if err != nil {
return err
}
}
}

}

mergedIndexBytes, err := faiss.WriteIndexIntoBuffer(faissIndex)
if err != nil {
return err
}

return v.flushVectorIndex(mergedIndexBytes, w)
}

// todo: naive implementation. need to keep in mind the perf implications and improve on this.
// perhaps, parallelized merging can help speed things up over here.
func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,
vecIndexes []*vecIndexInfo, w *CountHashWriter, closeCh chan struct{}) error {
vecIndexes []*vecIndexInfo, w *CountHashWriter, closeCh chan struct{}, config map[string]interface{}) error {

// safe to assume that all the indexes are of the same config values, given
// that they are extracted from the field mapping info.
Expand Down Expand Up @@ -327,6 +430,12 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,
indexData := make([]float32, 0, indexDataCap)
// reusable buffer for reconstruction
recons := make([]float32, 0, reconsCap)

if trainData, ok := config["trainData"]; ok {
trainData := trainData.([]float32)
return v.fastMergeIndexes(vecIndexes, trainData, w, closeCh)
}

var err error
for i := 0; i < len(vecIndexes); i++ {
if isClosed(closeCh) {
Expand All @@ -345,6 +454,13 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,
freeReconstructedIndexes(vecIndexes)
return err
}

// collect training data for future reuse
if v, ok := config["collectTrainDataCallback"]; ok {
collectTrainDataCallback := v.(func([]float32))
collectTrainDataCallback(recons)
}

indexData = append(indexData, recons...)
// Adding vector IDs in the same order as the vectors
finalVecIDs = append(finalVecIDs, vecIndexes[i].vecIds...)
Expand Down
2 changes: 1 addition & 1 deletion section_inverted_text_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring.

func (i *invertedTextIndexSection) Merge(opaque map[int]resetable, segments []*SegmentBase,
drops []*roaring.Bitmap, fieldsInv []string, newDocNumsIn [][]uint64,
w *CountHashWriter, closeCh chan struct{}) error {
w *CountHashWriter, closeCh chan struct{}, config map[string]interface{}) error {
io := i.getInvertedIndexOpaque(opaque)
fieldAddrs, _, err := mergeAndPersistInvertedSection(segments, drops, fieldsInv,
io.FieldsMap, io.fieldsSame, newDocNumsIn, io.numDocs, io.chunkMode, w, closeCh)
Expand Down
2 changes: 1 addition & 1 deletion section_synonym_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func (s *synonymIndexSection) AddrForField(opaque map[int]resetable, fieldID int
// synonym index section.
func (s *synonymIndexSection) Merge(opaque map[int]resetable, segments []*SegmentBase,
drops []*roaring.Bitmap, fieldsInv []string, newDocNumsIn [][]uint64,
w *CountHashWriter, closeCh chan struct{}) error {
w *CountHashWriter, closeCh chan struct{}, config map[string]interface{}) error {
so := s.getSynonymIndexOpaque(opaque)
thesaurusAddrs, fieldIDtoThesaurusID, err := mergeAndPersistSynonymSection(segments, drops, fieldsInv, newDocNumsIn, w, closeCh)
if err != nil {
Expand Down
Loading
Loading