Skip to content

Commit

Permalink
Integrate index state sharing into mem management (opensearch-project…
Browse files Browse the repository at this point in the history
…#1545)

Adds the ability to share index state amongst indices during index load
operations into the plugins memory management system. Introduces a
manager of the shared state that will properly manage the lifecycle of
the shared state.

There was a bug in clear cache that had to be fixed to get this change
working as well. Previously, only one index file per clear cache would
be freed. This fixes that logic to clear everything.

Added unit tests and an integration test to confirm functionality. In
addition, modified recall integration tests to get more coverage on the
different algo configs. Along with this, had to fix a few things around
the computation of recall for non-l2 space types.

Signed-off-by: John Mazanec <[email protected]>
  • Loading branch information
jmazanec15 committed Mar 18, 2024
1 parent 8667bf5 commit 8996861
Show file tree
Hide file tree
Showing 17 changed files with 1,093 additions and 74 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
* Disable sdc table for HNSWPQ read-only indices [#1518](https://github.com/opensearch-project/k-NN/pull/1518)
* Switch SpaceType.INNERPRODUCT's vector similarity function to MAXIMUM_INNER_PRODUCT [#1532](https://github.com/opensearch-project/k-NN/pull/1532)
* Add patch to fix arm segfault in nmslib during ingestion [#1541](https://github.com/opensearch-project/k-NN/pull/1541)
* Share ivfpq-l2 table allocations across indices on load [#1558](https://github.com/opensearch-project/k-NN/pull/1558)
### Infrastructure
* Manually install zlib for win CI [#1513](https://github.com/opensearch-project/k-NN/pull/1513)
* Update k-NN build artifact script to enable SIMD on ARM for Faiss [#1543](https://github.com/opensearch-project/k-NN/pull/1543)
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/org/opensearch/knn/index/IndexUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.knn.index.util.KNNEngine;
import org.opensearch.knn.indices.ModelDao;
import org.opensearch.knn.indices.ModelMetadata;
import org.opensearch.knn.jni.JNIService;

import java.io.File;
import java.util.Collections;
Expand Down Expand Up @@ -268,4 +269,19 @@ public static boolean isVersionOnOrAfterMinRequiredVersion(Version version, Stri
}
return version.onOrAfter(minimalRequiredVersion);
}

/**
* Checks if index requires shared state
*
* @param knnEngine The knnEngine associated with the index
* @param modelId The modelId associated with the index
* @param indexAddr Address to check if loaded index requires shared state
* @return true if state can be shared; false otherwise
*/
public static boolean isSharedIndexStateRequired(KNNEngine knnEngine, String modelId, long indexAddr) {
if (StringUtils.isEmpty(modelId)) {
return false;
}
return JNIService.isSharedIndexStateRequired(indexAddr, knnEngine);
}
}
62 changes: 43 additions & 19 deletions src/main/java/org/opensearch/knn/index/KNNIndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.knn.index;

import com.google.common.annotations.VisibleForTesting;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.apache.lucene.index.FieldInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -24,12 +27,13 @@

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import static org.opensearch.knn.common.KNNConstants.MODEL_ID;
import static org.opensearch.knn.common.KNNConstants.SPACE_TYPE;
import static org.opensearch.knn.index.IndexUtil.getParametersAtLoading;
import static org.opensearch.knn.index.codec.util.KNNCodecUtil.buildEngineFilePrefix;
Expand Down Expand Up @@ -82,14 +86,19 @@ public String getIndexName() {
public void warmup() throws IOException {
logger.info("[KNN] Warming up index: " + getIndexName());
try (Engine.Searcher searcher = indexShard.acquireSearcher("knn-warmup")) {
getAllEnginePaths(searcher.getIndexReader()).forEach((key, value) -> {
getAllEngineFileContexts(searcher.getIndexReader()).forEach((engineFileContext) -> {
try {
nativeMemoryCacheManager.get(
new NativeMemoryEntryContext.IndexEntryContext(
key,
engineFileContext.getIndexPath(),
NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance(),
getParametersAtLoading(value, KNNEngine.getEngineNameFromPath(key), getIndexName()),
getIndexName()
getParametersAtLoading(
engineFileContext.getSpaceType(),
KNNEngine.getEngineNameFromPath(engineFileContext.getIndexPath()),
getIndexName()
),
getIndexName(),
engineFileContext.getModelId()
),
true
);
Expand All @@ -103,20 +112,21 @@ public void warmup() throws IOException {
/**
* For the given shard, get all of its engine paths
*
* @param indexReader IndexReader to read the file paths for the shard
* @return List of engine file Paths
* @param indexReader IndexReader to read the information for each segment in the shard
* @return List of engine contexts
* @throws IOException Thrown when the SegmentReader is attempting to read the segments files
*/
public Map<String, SpaceType> getAllEnginePaths(IndexReader indexReader) throws IOException {
Map<String, SpaceType> engineFiles = new HashMap<>();
@VisibleForTesting
List<EngineFileContext> getAllEngineFileContexts(IndexReader indexReader) throws IOException {
List<EngineFileContext> engineFiles = new ArrayList<>();
for (KNNEngine knnEngine : KNNEngine.getEnginesThatCreateCustomSegmentFiles()) {
engineFiles.putAll(getEnginePaths(indexReader, knnEngine));
engineFiles.addAll(getEngineFileContexts(indexReader, knnEngine));
}
return engineFiles;
}

private Map<String, SpaceType> getEnginePaths(IndexReader indexReader, KNNEngine knnEngine) throws IOException {
Map<String, SpaceType> engineFiles = new HashMap<>();
List<EngineFileContext> getEngineFileContexts(IndexReader indexReader, KNNEngine knnEngine) throws IOException {
List<EngineFileContext> engineFiles = new ArrayList<>();

for (LeafReaderContext leafReaderContext : indexReader.leaves()) {
SegmentReader reader = (SegmentReader) FilterLeafReader.unwrap(leafReaderContext.reader());
Expand All @@ -131,15 +141,17 @@ private Map<String, SpaceType> getEnginePaths(IndexReader indexReader, KNNEngine
// was L2. So, if Space Type is not present, just fall back to L2
String spaceTypeName = fieldInfo.attributes().getOrDefault(SPACE_TYPE, SpaceType.L2.getValue());
SpaceType spaceType = SpaceType.getSpace(spaceTypeName);
String modelId = fieldInfo.attributes().getOrDefault(MODEL_ID, null);

engineFiles.putAll(
getEnginePaths(
engineFiles.addAll(
getEngineFileContexts(
reader.getSegmentInfo().files(),
reader.getSegmentInfo().info.name,
fieldInfo.name,
fileExtension,
shardPath,
spaceType
spaceType,
modelId
)
);
}
Expand All @@ -148,20 +160,32 @@ private Map<String, SpaceType> getEnginePaths(IndexReader indexReader, KNNEngine
return engineFiles;
}

protected Map<String, SpaceType> getEnginePaths(
@VisibleForTesting
List<EngineFileContext> getEngineFileContexts(
Collection<String> files,
String segmentName,
String fieldName,
String fileExtension,
Path shardPath,
SpaceType spaceType
SpaceType spaceType,
String modelId
) {
String prefix = buildEngineFilePrefix(segmentName);
String suffix = buildEngineFileSuffix(fieldName, fileExtension);
return files.stream()
.filter(fileName -> fileName.startsWith(prefix))
.filter(fileName -> fileName.endsWith(suffix))
.map(fileName -> shardPath.resolve(fileName).toString())
.collect(Collectors.toMap(fileName -> fileName, fileName -> spaceType));
.map(fileName -> new EngineFileContext(spaceType, modelId, fileName))
.collect(Collectors.toList());
}

@AllArgsConstructor
@Getter
@VisibleForTesting
static class EngineFileContext {
private final SpaceType spaceType;
private final String modelId;
private final String indexPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class IndexAllocation implements NativeMemoryAllocation {
private final String openSearchIndexName;
private final ReadWriteLock readWriteLock;
private final WatcherHandle<FileWatcher> watcherHandle;
private final SharedIndexState sharedIndexState;

/**
* Constructor
Expand All @@ -111,6 +112,31 @@ class IndexAllocation implements NativeMemoryAllocation {
String indexPath,
String openSearchIndexName,
WatcherHandle<FileWatcher> watcherHandle
) {
this(executorService, memoryAddress, size, knnEngine, indexPath, openSearchIndexName, watcherHandle, null);
}

/**
* Constructor
*
* @param executorService Executor service used to close the allocation
* @param memoryAddress Pointer in memory to the index
* @param size Size this index consumes in kilobytes
* @param knnEngine KNNEngine associated with the index allocation
* @param indexPath File path to index
* @param openSearchIndexName Name of OpenSearch index this index is associated with
* @param watcherHandle Handle for watching index file
* @param sharedIndexState Shared index state. If not shared state present, pass null.
*/
IndexAllocation(
ExecutorService executorService,
long memoryAddress,
int size,
KNNEngine knnEngine,
String indexPath,
String openSearchIndexName,
WatcherHandle<FileWatcher> watcherHandle,
SharedIndexState sharedIndexState
) {
this.executor = executorService;
this.closed = false;
Expand All @@ -121,6 +147,7 @@ class IndexAllocation implements NativeMemoryAllocation {
this.readWriteLock = new ReentrantReadWriteLock();
this.size = size;
this.watcherHandle = watcherHandle;
this.sharedIndexState = sharedIndexState;
}

@Override
Expand All @@ -145,6 +172,10 @@ private void cleanup() {
if (memoryAddress != 0) {
JNIService.free(memoryAddress, knnEngine);
}

if (sharedIndexState != null) {
SharedIndexStateManager.getInstance().release(sharedIndexState);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package org.opensearch.knn.index.memory;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.knn.index.IndexUtil;

import java.io.IOException;
Expand Down Expand Up @@ -63,6 +64,8 @@ public static class IndexEntryContext extends NativeMemoryEntryContext<NativeMem
private final NativeMemoryLoadStrategy.IndexLoadStrategy indexLoadStrategy;
private final String openSearchIndexName;
private final Map<String, Object> parameters;
@Nullable
private final String modelId;

/**
* Constructor
Expand All @@ -77,11 +80,31 @@ public IndexEntryContext(
NativeMemoryLoadStrategy.IndexLoadStrategy indexLoadStrategy,
Map<String, Object> parameters,
String openSearchIndexName
) {
this(indexPath, indexLoadStrategy, parameters, openSearchIndexName, null);
}

/**
* Constructor
*
* @param indexPath path to index file. Also used as key in cache.
* @param indexLoadStrategy strategy to load index into memory
* @param parameters load time parameters
* @param openSearchIndexName opensearch index associated with index
* @param modelId model to be loaded. If none available, pass null
*/
public IndexEntryContext(
String indexPath,
NativeMemoryLoadStrategy.IndexLoadStrategy indexLoadStrategy,
Map<String, Object> parameters,
String openSearchIndexName,
String modelId
) {
super(indexPath);
this.indexLoadStrategy = indexLoadStrategy;
this.openSearchIndexName = openSearchIndexName;
this.parameters = parameters;
this.modelId = modelId;
}

@Override
Expand Down Expand Up @@ -112,6 +135,15 @@ public Map<String, Object> getParameters() {
return parameters;
}

/**
* Getter
*
* @return return model ID for the index. null if no model is in use
*/
public String getModelId() {
return modelId;
}

private static class IndexSizeCalculator implements Function<IndexEntryContext, Integer> {

static IndexSizeCalculator INSTANCE = new IndexSizeCalculator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

package org.opensearch.knn.index.memory;

import lombok.extern.log4j.Log4j2;
import org.opensearch.core.action.ActionListener;
import org.opensearch.knn.index.IndexUtil;
import org.opensearch.knn.jni.JNIService;
import org.opensearch.knn.index.util.KNNEngine;
import org.opensearch.knn.training.TrainingDataConsumer;
Expand Down Expand Up @@ -41,6 +43,7 @@ public interface NativeMemoryLoadStrategy<T extends NativeMemoryAllocation, U ex
*/
T load(U nativeMemoryEntryContext) throws IOException;

@Log4j2
class IndexLoadStrategy
implements
NativeMemoryLoadStrategy<NativeMemoryAllocation.IndexAllocation, NativeMemoryEntryContext.IndexEntryContext>,
Expand Down Expand Up @@ -92,17 +95,25 @@ public NativeMemoryAllocation.IndexAllocation load(NativeMemoryEntryContext.Inde
fileWatcher.init();

KNNEngine knnEngine = KNNEngine.getEngineNameFromPath(indexPath.toString());
long memoryAddress = JNIService.loadIndex(indexPath.toString(), indexEntryContext.getParameters(), knnEngine);
final WatcherHandle<FileWatcher> watcherHandle = resourceWatcherService.add(fileWatcher);
long indexAddress = JNIService.loadIndex(indexPath.toString(), indexEntryContext.getParameters(), knnEngine);
SharedIndexState sharedIndexState = null;
String modelId = indexEntryContext.getModelId();
if (IndexUtil.isSharedIndexStateRequired(knnEngine, modelId, indexAddress)) {
log.info("Index with model: \"{}\" requires shared state. Retrieving shared state.", modelId);
sharedIndexState = SharedIndexStateManager.getInstance().get(indexAddress, modelId, knnEngine);
JNIService.setSharedIndexState(indexAddress, sharedIndexState.getSharedIndexStateAddress(), knnEngine);
}

final WatcherHandle<FileWatcher> watcherHandle = resourceWatcherService.add(fileWatcher);
return new NativeMemoryAllocation.IndexAllocation(
executor,
memoryAddress,
indexAddress,
indexEntryContext.calculateSizeInKB(),
knnEngine,
indexPath.toString(),
indexEntryContext.getOpenSearchIndexName(),
watcherHandle
watcherHandle,
sharedIndexState
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index.memory;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.opensearch.knn.index.util.KNNEngine;

/**
* Class stores information about the shared memory allocations between loaded native indices.
*/
@RequiredArgsConstructor
@Getter
public class SharedIndexState {
private final long sharedIndexStateAddress;
private final String modelId;
private final KNNEngine knnEngine;
}
Loading

0 comments on commit 8996861

Please sign in to comment.