Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmd/zap/cmd/vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,11 @@ func decodeSection(data []byte, start uint64) (int, int, map[int64]uint64, *fais
return 0, 0, nil, nil, err
}

return int(numVecs), int(indexSize), vecDocIDMap, vecIndex, nil
idxImpl, ok := vecIndex.(*faiss.IndexImpl)
if !ok {
return 0, 0, nil, nil, fmt.Errorf("failed to convert FloatIndex to *IndexImpl")
}
return int(numVecs), int(indexSize), vecDocIDMap, idxImpl, nil
}

func init() {
Expand Down
120 changes: 91 additions & 29 deletions faiss_vector_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package zap

import (
"encoding/binary"
"fmt"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -56,17 +57,29 @@ func (vc *vectorIndexCache) Clear() {
// present. It also returns the batch executor for the field if it's present in the
// cache.
func (vc *vectorIndexCache) loadOrCreate(fieldID uint16, mem []byte,
loadDocVecIDMap bool, except *roaring.Bitmap) (
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, docVecIDMap map[uint32][]int64,
loadDocVecIDMap bool, loadBinaryIndex bool, except *roaring.Bitmap) (
index *faiss.IndexImpl, binaryIndex *faiss.BinaryIndexImpl, vecDocIDMap map[int64]uint32,
docVecIDMap map[uint32][]int64, vecIDsToExclude []int64, err error) {
index, binaryIndex, vecDocIDMap, docVecIDMap, vecIDsToExclude, err = vc.loadFromCache(
fieldID, loadDocVecIDMap, loadBinaryIndex, mem, except)
return index, binaryIndex, vecDocIDMap, docVecIDMap, vecIDsToExclude, err
}

// function to load the vectorDocIDMap and if required, docVecIDMap from cache
// If not, it will create these and add them to the cache.
func (vc *vectorIndexCache) loadFromCache(fieldID uint16, loadDocVecIDMap,
loadBinaryIndex bool, mem []byte, except *roaring.Bitmap) (index *faiss.IndexImpl,
binaryIndex *faiss.BinaryIndexImpl, vecDocIDMap map[int64]uint32, docVecIDMap map[uint32][]int64,
vecIDsToExclude []int64, err error) {

vc.m.RLock()
entry, ok := vc.cache[fieldID]
if ok {
index, vecDocIDMap, docVecIDMap = entry.load()
index, binaryIndex, vecDocIDMap, docVecIDMap = entry.load()
vecIDsToExclude = getVecIDsToExclude(vecDocIDMap, except)
if !loadDocVecIDMap || len(entry.docVecIDMap) > 0 {
vc.m.RUnlock()
return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
return index, binaryIndex, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
}

vc.m.RUnlock()
Expand All @@ -76,14 +89,15 @@ func (vc *vectorIndexCache) loadOrCreate(fieldID uint16, mem []byte,
// typically seen for the first filtered query.
docVecIDMap = vc.addDocVecIDMapToCacheLOCKED(entry)
vc.m.Unlock()
return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
return index, binaryIndex, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
}

vc.m.RUnlock()
// acquiring a lock since this is modifying the cache.
vc.m.Lock()
defer vc.m.Unlock()
return vc.createAndCacheLOCKED(fieldID, mem, loadDocVecIDMap, except)
return vc.createAndCacheLOCKED(fieldID, mem, loadDocVecIDMap, loadBinaryIndex,
except)
}

func (vc *vectorIndexCache) addDocVecIDMapToCacheLOCKED(ce *cacheEntry) map[uint32][]int64 {
Expand All @@ -104,21 +118,21 @@ func (vc *vectorIndexCache) addDocVecIDMapToCacheLOCKED(ce *cacheEntry) map[uint

// Rebuilding the cache on a miss.
func (vc *vectorIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte,
loadDocVecIDMap bool, except *roaring.Bitmap) (
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32,
loadDocVecIDMap, loadBinaryIndex bool, except *roaring.Bitmap) (
index *faiss.IndexImpl, binaryIndex *faiss.BinaryIndexImpl, vecDocIDMap map[int64]uint32,
docVecIDMap map[uint32][]int64, vecIDsToExclude []int64, err error) {

// Handle concurrent accesses (to avoid unnecessary work) by adding a
// check within the write lock here.
entry := vc.cache[fieldID]
if entry != nil {
index, vecDocIDMap, docVecIDMap = entry.load()
index, binaryIndex, vecDocIDMap, docVecIDMap = entry.load()
vecIDsToExclude = getVecIDsToExclude(vecDocIDMap, except)
if !loadDocVecIDMap || len(entry.docVecIDMap) > 0 {
return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
return index, binaryIndex, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
}
docVecIDMap = vc.addDocVecIDMapToCacheLOCKED(entry)
return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
return index, binaryIndex, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
}

// if the cache doesn't have the entry, construct the vector to doc id map and
Expand Down Expand Up @@ -149,21 +163,45 @@ func (vc *vectorIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte,
}
}

cacheEntryStub := cacheEntryReqs{
vecDocIDMap: vecDocIDMap,
}

if loadBinaryIndex {
binaryIndexSize, n := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64])
pos += n

// Read binary index with proper flags
binaryIndex, err := faiss.ReadBinaryIndexFromBuffer(mem[pos:pos+int(binaryIndexSize)], faissIOFlags)
if err != nil {
return nil, nil, nil, nil, nil, err
}
binaryIndexImpl, ok := binaryIndex.(*faiss.BinaryIndexImpl)
if !ok {
return nil, nil, nil, nil, nil, fmt.Errorf("binary index is not a *faiss.BinaryIndexImpl")
}
cacheEntryStub.binaryIndex = binaryIndexImpl
pos += int(binaryIndexSize)
}

indexSize, n := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64])
pos += n

index, err = faiss.ReadIndexFromBuffer(mem[pos:pos+int(indexSize)], faissIOFlags)
idx, err := faiss.ReadIndexFromBuffer(mem[pos:pos+int(indexSize)], faissIOFlags)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
indexImpl, ok := idx.(*faiss.IndexImpl)
if !ok {
return nil, nil, nil, nil, nil, fmt.Errorf("index is not a *faiss.IndexImpl")
}
cacheEntryStub.index = indexImpl

vc.insertLOCKED(fieldID, index, vecDocIDMap, loadDocVecIDMap, docVecIDMap)
return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
vc.insertLOCKED(fieldID, cacheEntryStub)
return indexImpl, binaryIndex, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
}

func (vc *vectorIndexCache) insertLOCKED(fieldIDPlus1 uint16,
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, loadDocVecIDMap bool,
docVecIDMap map[uint32][]int64) {
func (vc *vectorIndexCache) insertLOCKED(fieldIDPlus1 uint16, ce cacheEntryReqs) {
// the first time we've hit the cache, try to spawn a monitoring routine
// which will reconcile the moving averages for all the fields being hit
if len(vc.cache) == 0 {
Expand All @@ -177,8 +215,7 @@ func (vc *vectorIndexCache) insertLOCKED(fieldIDPlus1 uint16,
// this makes the average to be kept above the threshold value for a
// longer time and thereby the index to be resident in the cache
// for longer time.
vc.cache[fieldIDPlus1] = createCacheEntry(index, vecDocIDMap,
loadDocVecIDMap, docVecIDMap, 0.4)
vc.cache[fieldIDPlus1] = createCacheEntry(&ce, 0.4, ce.loadDocVecIDMap)
}
}

Expand Down Expand Up @@ -271,19 +308,33 @@ func (e *ewma) add(val uint64) {

// -----------------------------------------------------------------------------

func createCacheEntry(index *faiss.IndexImpl, vecDocIDMap map[int64]uint32,
loadDocVecIDMap bool, docVecIDMap map[uint32][]int64, alpha float64) *cacheEntry {
// required info to create a cache entry.
type cacheEntryReqs struct {
alpha float64
index *faiss.IndexImpl
binaryIndex *faiss.BinaryIndexImpl
vecDocIDMap map[int64]uint32
// Used to indicate if the below fields are populated - will only be
// used for pre-filtered queries.
loadDocVecIDMap bool
docVecIDMap map[uint32][]int64
clusterAssignment map[int64]*roaring.Bitmap
}

func createCacheEntry(stub *cacheEntryReqs, alpha float64,
loadDocVecIDMap bool) *cacheEntry {
ce := &cacheEntry{
index: index,
vecDocIDMap: vecDocIDMap,
index: stub.index,
binaryIndex: stub.binaryIndex,
vecDocIDMap: stub.vecDocIDMap,
tracker: &ewma{
alpha: alpha,
sample: 1,
},
refs: 1,
}
if loadDocVecIDMap {
ce.docVecIDMap = docVecIDMap
ce.docVecIDMap = stub.docVecIDMap
}
return ce
}
Expand All @@ -297,6 +348,7 @@ type cacheEntry struct {
refs int64

index *faiss.IndexImpl
binaryIndex *faiss.BinaryIndexImpl
vecDocIDMap map[int64]uint32
docVecIDMap map[uint32][]int64
}
Expand All @@ -313,16 +365,26 @@ func (ce *cacheEntry) decRef() {
atomic.AddInt64(&ce.refs, -1)
}

func (ce *cacheEntry) load() (*faiss.IndexImpl, map[int64]uint32, map[uint32][]int64) {
func (ce *cacheEntry) load() (*faiss.IndexImpl, *faiss.BinaryIndexImpl,
map[int64]uint32, map[uint32][]int64) {
ce.incHit()
ce.addRef()
return ce.index, ce.vecDocIDMap, ce.docVecIDMap
if ce.binaryIndex != nil {
return ce.index, ce.binaryIndex, ce.vecDocIDMap, ce.docVecIDMap
}
return ce.index, nil, ce.vecDocIDMap, ce.docVecIDMap
}

func (ce *cacheEntry) close() {
go func() {
ce.index.Close()
ce.index = nil
if ce.index != nil {
(*ce.index).Close()
ce.index = nil
}
if ce.binaryIndex != nil {
(*ce.binaryIndex).Close()
ce.binaryIndex = nil
}
ce.vecDocIDMap = nil
ce.docVecIDMap = nil
}()
Expand Down
Loading
Loading