-
Notifications
You must be signed in to change notification settings - Fork 15
MB-62985 - Add support for binary quantised vectors. #329
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
26093ec
3af2383
daa8fc6
fd22b0d
7eb9574
401a144
581dbc9
127a63d
815537a
5c210e3
e652071
60f83f1
b20ae9d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -56,29 +56,30 @@ func (vc *vectorIndexCache) Clear() { | |
| // It is true when a filtered kNN query accesses the cache since it requires the | ||
| // map. It's false otherwise. | ||
| func (vc *vectorIndexCache) loadOrCreate(fieldID uint16, mem []byte, | ||
| loadDocVecIDMap bool, except *roaring.Bitmap) ( | ||
| index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, docVecIDMap map[uint32][]int64, | ||
| loadDocVecIDMap bool, loadBinaryIndex bool, except *roaring.Bitmap) ( | ||
| indexes []*faiss.IndexImpl, vecDocIDMap map[int64]uint32, docVecIDMap map[uint32][]int64, | ||
| vecIDsToExclude []int64, err error) { | ||
| index, vecDocIDMap, docVecIDMap, vecIDsToExclude, err = vc.loadFromCache( | ||
| fieldID, loadDocVecIDMap, mem, except) | ||
| return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, err | ||
| indexes, vecDocIDMap, docVecIDMap, vecIDsToExclude, err = vc.loadFromCache( | ||
| fieldID, loadDocVecIDMap, loadBinaryIndex, mem, except) | ||
| return indexes, vecDocIDMap, docVecIDMap, vecIDsToExclude, err | ||
| } | ||
|
|
||
| // function to load the vectorDocIDMap and if required, docVecIDMap from cache | ||
| // If not, it will create these and add them to the cache. | ||
| func (vc *vectorIndexCache) loadFromCache(fieldID uint16, loadDocVecIDMap bool, | ||
| mem []byte, except *roaring.Bitmap) (index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, | ||
| docVecIDMap map[uint32][]int64, vecIDsToExclude []int64, err error) { | ||
| func (vc *vectorIndexCache) loadFromCache(fieldID uint16, loadDocVecIDMap, | ||
| loadBinaryIndex bool, mem []byte, except *roaring.Bitmap) (indexes []*faiss.IndexImpl, | ||
| vecDocIDMap map[int64]uint32, docVecIDMap map[uint32][]int64, | ||
| vecIDsToExclude []int64, err error) { | ||
|
|
||
| vc.m.RLock() | ||
|
|
||
| entry, ok := vc.cache[fieldID] | ||
| if ok { | ||
| index, vecDocIDMap, docVecIDMap = entry.load() | ||
| indexes, vecDocIDMap, docVecIDMap = entry.load() | ||
| vecIDsToExclude = getVecIDsToExclude(vecDocIDMap, except) | ||
| if !loadDocVecIDMap || (loadDocVecIDMap && len(entry.docVecIDMap) > 0) { | ||
| vc.m.RUnlock() | ||
| return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil | ||
| return indexes, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil | ||
| } | ||
|
|
||
| vc.m.RUnlock() | ||
|
|
@@ -88,14 +89,15 @@ func (vc *vectorIndexCache) loadFromCache(fieldID uint16, loadDocVecIDMap bool, | |
| // typically seen for the first filtered query. | ||
| docVecIDMap = vc.addDocVecIDMapToCacheLOCKED(entry) | ||
| vc.m.Unlock() | ||
| return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil | ||
| return indexes, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil | ||
| } | ||
|
|
||
| vc.m.RUnlock() | ||
| // acquiring a lock since this is modifying the cache. | ||
| vc.m.Lock() | ||
| defer vc.m.Unlock() | ||
| return vc.createAndCacheLOCKED(fieldID, mem, loadDocVecIDMap, except) | ||
| return vc.createAndCacheLOCKED(fieldID, mem, loadDocVecIDMap, loadBinaryIndex, | ||
| except) | ||
| } | ||
|
|
||
| func (vc *vectorIndexCache) addDocVecIDMapToCacheLOCKED(ce *cacheEntry) map[uint32][]int64 { | ||
|
|
@@ -116,21 +118,21 @@ func (vc *vectorIndexCache) addDocVecIDMapToCacheLOCKED(ce *cacheEntry) map[uint | |
|
|
||
| // Rebuilding the cache on a miss. | ||
| func (vc *vectorIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte, | ||
| loadDocVecIDMap bool, except *roaring.Bitmap) ( | ||
| index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, | ||
| loadDocVecIDMap, loadBinaryIndex bool, except *roaring.Bitmap) ( | ||
| indexes []*faiss.IndexImpl, vecDocIDMap map[int64]uint32, | ||
| docVecIDMap map[uint32][]int64, vecIDsToExclude []int64, err error) { | ||
|
|
||
| // Handle concurrent accesses (to avoid unnecessary work) by adding a | ||
| // check within the write lock here. | ||
| entry := vc.cache[fieldID] | ||
| if entry != nil { | ||
| index, vecDocIDMap, docVecIDMap = entry.load() | ||
| indexes, vecDocIDMap, docVecIDMap = entry.load() | ||
| vecIDsToExclude = getVecIDsToExclude(vecDocIDMap, except) | ||
| if !loadDocVecIDMap || (loadDocVecIDMap && len(entry.docVecIDMap) > 0) { | ||
| return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil | ||
| return indexes, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil | ||
| } | ||
| docVecIDMap = vc.addDocVecIDMapToCacheLOCKED(entry) | ||
| return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil | ||
| return indexes, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil | ||
| } | ||
|
|
||
| // if the cache doesn't have the entry, construct the vector to doc id map and | ||
|
|
@@ -161,21 +163,42 @@ func (vc *vectorIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte, | |
| } | ||
| } | ||
|
|
||
| cacheEntryStub := cacheEntryReqs{ | ||
| vecDocIDMap: vecDocIDMap, | ||
| } | ||
|
|
||
| indexes = make([]*faiss.IndexImpl, 0) | ||
|
|
||
| if loadBinaryIndex { | ||
| binaryIndexSize, n := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) | ||
| pos += n | ||
|
|
||
| // Read binary index with proper flags | ||
| binaryIndex, err := faiss.ReadBinaryIndexFromBuffer(mem[pos:pos+int(binaryIndexSize)], faissIOFlags) | ||
| if err != nil { | ||
| return nil, nil, nil, nil, err | ||
| } | ||
| cacheEntryStub.binaryIndex = binaryIndex | ||
| indexes = append(indexes, binaryIndex) | ||
| pos += int(binaryIndexSize) | ||
| } | ||
|
|
||
| indexSize, n := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) | ||
| pos += n | ||
|
|
||
| index, err = faiss.ReadIndexFromBuffer(mem[pos:pos+int(indexSize)], faissIOFlags) | ||
| index, err := faiss.ReadIndexFromBuffer(mem[pos:pos+int(indexSize)], faissIOFlags) | ||
| if err != nil { | ||
| return nil, nil, nil, nil, err | ||
| } | ||
| indexes = append(indexes, index) | ||
|
|
||
| cacheEntryStub.index = index | ||
|
|
||
| vc.insertLOCKED(fieldID, index, vecDocIDMap, loadDocVecIDMap, docVecIDMap) | ||
| return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil | ||
| vc.insertLOCKED(fieldID, cacheEntryStub) | ||
| return indexes, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil | ||
| } | ||
|
|
||
| func (vc *vectorIndexCache) insertLOCKED(fieldIDPlus1 uint16, | ||
| index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, loadDocVecIDMap bool, | ||
| docVecIDMap map[uint32][]int64) { | ||
| func (vc *vectorIndexCache) insertLOCKED(fieldIDPlus1 uint16, ce cacheEntryReqs) { | ||
| // the first time we've hit the cache, try to spawn a monitoring routine | ||
| // which will reconcile the moving averages for all the fields being hit | ||
| if len(vc.cache) == 0 { | ||
|
|
@@ -189,8 +212,7 @@ func (vc *vectorIndexCache) insertLOCKED(fieldIDPlus1 uint16, | |
| // this makes the average to be kept above the threshold value for a | ||
| // longer time and thereby the index to be resident in the cache | ||
| // for longer time. | ||
| vc.cache[fieldIDPlus1] = createCacheEntry(index, vecDocIDMap, | ||
| loadDocVecIDMap, docVecIDMap, 0.4) | ||
| vc.cache[fieldIDPlus1] = createCacheEntry(&ce, 0.4, ce.loadDocVecIDMap) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -283,19 +305,33 @@ func (e *ewma) add(val uint64) { | |
|
|
||
| // ----------------------------------------------------------------------------- | ||
|
|
||
| func createCacheEntry(index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, | ||
| loadDocVecIDMap bool, docVecIDMap map[uint32][]int64, alpha float64) *cacheEntry { | ||
| // required info to create a cache entry. | ||
| type cacheEntryReqs struct { | ||
| alpha float64 | ||
| index *faiss.IndexImpl | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe just have the indexes array itself here |
||
| binaryIndex *faiss.IndexImpl | ||
| vecDocIDMap map[int64]uint32 | ||
| // Used to indicate if the below fields are populated - will only be | ||
| // used for pre-filtered queries. | ||
| loadDocVecIDMap bool | ||
| docVecIDMap map[uint32][]int64 | ||
| clusterAssignment map[int64]*roaring.Bitmap | ||
| } | ||
|
|
||
| func createCacheEntry(stub *cacheEntryReqs, alpha float64, | ||
| loadDocVecIDMap bool) *cacheEntry { | ||
| ce := &cacheEntry{ | ||
| index: index, | ||
| vecDocIDMap: vecDocIDMap, | ||
| index: stub.index, | ||
| binaryIndex: stub.binaryIndex, | ||
| vecDocIDMap: stub.vecDocIDMap, | ||
| tracker: &ewma{ | ||
| alpha: alpha, | ||
| sample: 1, | ||
| }, | ||
| refs: 1, | ||
| } | ||
| if loadDocVecIDMap { | ||
| ce.docVecIDMap = docVecIDMap | ||
| ce.docVecIDMap = stub.docVecIDMap | ||
| } | ||
| return ce | ||
| } | ||
|
|
@@ -309,6 +345,7 @@ type cacheEntry struct { | |
| refs int64 | ||
|
|
||
| index *faiss.IndexImpl | ||
| binaryIndex *faiss.IndexImpl | ||
| vecDocIDMap map[int64]uint32 | ||
| docVecIDMap map[uint32][]int64 | ||
| } | ||
|
|
@@ -325,16 +362,24 @@ func (ce *cacheEntry) decRef() { | |
| atomic.AddInt64(&ce.refs, -1) | ||
| } | ||
|
|
||
| func (ce *cacheEntry) load() (*faiss.IndexImpl, map[int64]uint32, map[uint32][]int64) { | ||
| func (ce *cacheEntry) load() ([]*faiss.IndexImpl, | ||
| map[int64]uint32, map[uint32][]int64) { | ||
| ce.incHit() | ||
| ce.addRef() | ||
| return ce.index, ce.vecDocIDMap, ce.docVecIDMap | ||
| if ce.binaryIndex != nil { | ||
| return []*faiss.IndexImpl{ce.binaryIndex, ce.index}, ce.vecDocIDMap, ce.docVecIDMap | ||
| } | ||
| return []*faiss.IndexImpl{ce.index}, ce.vecDocIDMap, ce.docVecIDMap | ||
| } | ||
|
|
||
| func (ce *cacheEntry) close() { | ||
| go func() { | ||
| ce.index.Close() | ||
| ce.index = nil | ||
| if ce.binaryIndex != nil { | ||
| ce.binaryIndex.Close() | ||
| ce.binaryIndex = nil | ||
| } | ||
| ce.vecDocIDMap = nil | ||
| ce.docVecIDMap = nil | ||
| }() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ import ( | |
| "math" | ||
| "reflect" | ||
|
|
||
| "container/heap" | ||
| "github.com/RoaringBitmap/roaring/v2" | ||
| "github.com/RoaringBitmap/roaring/v2/roaring64" | ||
| "github.com/bits-and-blooms/bitset" | ||
|
|
@@ -301,6 +302,31 @@ func (i *vectorIndexWrapper) Size() uint64 { | |
| return i.size() | ||
| } | ||
|
|
||
| // distanceID represents a distance-ID pair for heap operations | ||
| type distanceID struct { | ||
| distance float32 | ||
| id int64 | ||
| } | ||
|
|
||
| // maxHeap implements heap.Interface for distanceID | ||
| type maxHeap []*distanceID | ||
|
|
||
| func (h maxHeap) Len() int { return len(h) } | ||
| func (h maxHeap) Less(i, j int) bool { return h[i].distance > h[j].distance } | ||
| func (h maxHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } | ||
|
|
||
| func (h *maxHeap) Push(x interface{}) { | ||
| *h = append(*h, x.(*distanceID)) | ||
| } | ||
|
|
||
| func (h *maxHeap) Pop() interface{} { | ||
| old := *h | ||
| n := len(old) | ||
| x := old[n-1] | ||
| *h = old[0 : n-1] | ||
| return x | ||
| } | ||
|
|
||
metonymic-smokey marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // InterpretVectorIndex returns a construct of closures (vectorIndexWrapper) | ||
| // that will allow the caller to - | ||
| // (1) search within an attached vector index | ||
|
|
@@ -312,6 +338,7 @@ func (sb *SegmentBase) InterpretVectorIndex(field string, requiresFiltering bool | |
| segment.VectorIndex, error) { | ||
| // Params needed for the closures | ||
| var vecIndex *faiss.IndexImpl | ||
| var binaryIndex *faiss.IndexImpl | ||
| var vecDocIDMap map[int64]uint32 | ||
| var docVecIDMap map[uint32][]int64 | ||
| var vectorIDsToExclude []int64 | ||
|
|
@@ -354,14 +381,49 @@ func (sb *SegmentBase) InterpretVectorIndex(field string, requiresFiltering bool | |
| return rv, nil | ||
| } | ||
|
|
||
| scores, ids, err := vecIndex.SearchWithoutIDs(qVector, k, | ||
| vectorIDsToExclude, params) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if binaryIndex != nil { | ||
| binaryQueryVector := convertToBinary(qVector) | ||
| _, binIDs, err := binaryIndex.SearchBinaryWithoutIDs(binaryQueryVector, k*4, | ||
| vectorIDsToExclude, params) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| addIDsToPostingsList(rv, ids, scores) | ||
| distances := make([]float32, k*4) | ||
| err = vecIndex.DistCompute(qVector, binIDs, int(k*4), distances) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| // Need to map distances to the original IDs to get the top K. | ||
| // Use a heap to keep track of the top K. | ||
| h := &maxHeap{} | ||
| heap.Init(h) | ||
| for i := 0; i < len(binIDs); i++ { | ||
| heap.Push(h, &distanceID{distance: distances[i], id: binIDs[i]}) | ||
| if h.Len() > int(k) { | ||
| heap.Pop(h) | ||
| } | ||
| } | ||
|
|
||
| // Pop the top K in reverse order to get them in ascending order | ||
| ids := make([]int64, k) | ||
| scores := make([]float32, k) | ||
| for i := int(k) - 1; i >= 0; i-- { | ||
| distanceID := heap.Pop(h).(*distanceID) | ||
| scores[i] = distanceID.distance | ||
| ids[i] = distanceID.id | ||
| } | ||
|
Comment on lines
+400
to
+416
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At this stage, we already have the complete set of ids and their corresponding distances available for Top-K selection. This scenario differs from the K-sized heap approach used in In our case, since the entire dataset is available upfront, using a heap becomes unnecessarily expensive—particularly in terms of space complexity O(K). This overhead can compound significantly with the number of segments in a highly distributed index, as the heap becomes temporary query-specific state for each segment. Given that, it would be more efficient to use a |
||
|
|
||
| addIDsToPostingsList(rv, ids, scores) | ||
| } else { | ||
| scores, ids, err := vecIndex.SearchWithoutIDs(qVector, k, | ||
| vectorIDsToExclude, params) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| addIDsToPostingsList(rv, ids, scores) | ||
| } | ||
| return rv, nil | ||
| }, | ||
| searchWithFilter: func(qVector []float32, k int64, | ||
|
|
@@ -542,14 +604,27 @@ func (sb *SegmentBase) InterpretVectorIndex(field string, requiresFiltering bool | |
| // 1. doc values(first 2 iterations) - adhering to the sections format. never | ||
| // valid values for vector section | ||
| // 2. index optimization type. | ||
| for i := 0; i < 3; i++ { | ||
| for i := 0; i < 2; i++ { | ||
| _, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) | ||
| pos += n | ||
| } | ||
|
|
||
| vecIndex, vecDocIDMap, docVecIDMap, vectorIDsToExclude, err = | ||
| // Determining if a binary index is needed. | ||
| indexOptimisationType, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) | ||
| pos += n | ||
|
|
||
| loadBinaryIndex := isBinaryIndex(indexOptimisationType) | ||
|
|
||
| vecIndexes, vecDocIDMap, docVecIDMap, vectorIDsToExclude, err := | ||
| sb.vecIndexCache.loadOrCreate(fieldIDPlus1, sb.mem[pos:], requiresFiltering, | ||
| except) | ||
| loadBinaryIndex, except) | ||
|
|
||
| if loadBinaryIndex { | ||
| vecIndex = vecIndexes[1] | ||
| binaryIndex = vecIndexes[0] | ||
| } else { | ||
| vecIndex = vecIndexes[0] | ||
| } | ||
|
|
||
| if vecIndex != nil { | ||
| vecIndexSize = vecIndex.Size() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
take pointer to the cacheEntryReqs struct, don't copy the struct