Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Writable Warm] Composite Directory implementation and integrating it with FileCache #12782

Merged
merged 22 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
3c6ad60
Composite Directory POC
rayshrey Mar 13, 2024
30ddaea
Refactor TransferManager interface to RemoteStoreFileTrackerAdapter
rayshrey Mar 19, 2024
a7361d2
Implement block level fetch for Composite Directory
rayshrey Mar 20, 2024
54beed2
Removed CACHE state from FileTracker
rayshrey Apr 1, 2024
87772f0
Fixes after latest pull
rayshrey Apr 1, 2024
8fb76d1
Add new setting for warm, remove store type setting, FileTracker and …
rayshrey Apr 4, 2024
932ad92
Modify TransferManager - replace BlobContainer with Functional Interf…
rayshrey Apr 30, 2024
3e74318
Reuse OnDemandBlockSnapshotIndexInput instead of OnDemandBlockComposi…
rayshrey May 2, 2024
dca1a07
Modify constructors to avoid breaking public api contract and code re…
rayshrey May 8, 2024
723aba0
Add experimental annotations for newly created classes and review com…
rayshrey May 9, 2024
959f90c
Use ref count as a temporary measure to prevent file from eviction un…
rayshrey May 9, 2024
2632738
Remove method level locks
rayshrey May 10, 2024
ba34798
Handle tmp file deletion
rayshrey May 10, 2024
85003ee
Nit fixes
rayshrey May 13, 2024
de1895e
Handle delete and close in Composite Directory, log current state of …
rayshrey May 29, 2024
f9d880b
Refactor usages of WRITEABLE_REMOTE_INDEX_SETTING to TIERED_REMOTE_IN…
rayshrey May 30, 2024
e1b18ad
Add tests for FileCachedIndexInput and review comment fixes
rayshrey May 30, 2024
5e008db
Add additional IT for feature flag disabled
rayshrey May 30, 2024
c1e6b18
Move setting for Partial Locality type behind Feature Flag, fix bug f…
rayshrey Jun 12, 2024
74f9c29
Minor test and nit fixes
rayshrey Jun 14, 2024
7fce3d3
Add javadocs for FullFileCachedIndexInput
rayshrey Jun 17, 2024
901849b
Minor precommit fixes
rayshrey Jun 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add fingerprint ingest processor ([#13724](https://github.com/opensearch-project/OpenSearch/pull/13724))
- [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/))
- Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865))
- [Writable Warm] Add composite directory implementation and integrate it with FileCache ([12782](https://github.com/opensearch-project/OpenSearch/pull/12782))

### Dependencies
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.CompositeDirectory;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.utils.FileTypeUtils;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.Node;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false)
// Uncomment the below line to enable trace level logs for this test for better debugging
// @TestLogging(reason = "Getting trace logs from composite directory package", value = "org.opensearch.index.store:TRACE")
public class WritableWarmIT extends RemoteStoreBaseIntegTestCase {

protected static final String INDEX_NAME = "test-idx-1";
protected static final int NUM_DOCS_IN_BULK = 1000;

/*
Disabling MockFSIndexStore plugin as the MockFSDirectoryFactory wraps the FSDirectory over a OpenSearchMockDirectoryWrapper which extends FilterDirectory (whereas FSDirectory extends BaseDirectory)
As a result of this wrapping the local directory of Composite Directory does not satisfy the assertion that local directory must be of type FSDirectory
*/
@Override
protected boolean addMockIndexStorePlugin() {
return false;
}

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true);
return featureSettings.build();
}

public void testWritableWarmFeatureFlagDisabled() {
Settings clusterSettings = Settings.builder().put(super.nodeSettings(0)).put(FeatureFlags.TIERED_REMOTE_INDEX, false).build();
InternalTestCluster internalTestCluster = internalCluster();
internalTestCluster.startClusterManagerOnlyNode(clusterSettings);
internalTestCluster.startDataOnlyNode(clusterSettings);

Settings indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())
.build();

try {
prepareCreate(INDEX_NAME).setSettings(indexSettings).get();
fail("Should have thrown Exception as setting should not be registered if Feature Flag is Disabled");
} catch (SettingsException ex) {
assertEquals(
"unknown setting ["
+ IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()
+ "] please check that any required plugins are installed, or check the "
+ "breaking changes documentation for removed settings",
ex.getMessage()
);
}
}

public void testWritableWarmBasic() throws Exception {
InternalTestCluster internalTestCluster = internalCluster();
internalTestCluster.startClusterManagerOnlyNode();
internalTestCluster.startDataOnlyNode();
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())
.build();
assertAcked(client().admin().indices().prepareCreate(INDEX_NAME).setSettings(settings).get());

// Verify from the cluster settings if the data locality is partial
GetIndexResponse getIndexResponse = client().admin()
.indices()
.getIndex(new GetIndexRequest().indices(INDEX_NAME).includeDefaults(true))
.get();
Settings indexSettings = getIndexResponse.settings().get(INDEX_NAME);
assertEquals(IndexModule.DataLocalityType.PARTIAL.name(), indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()));

// Ingesting some docs
indexBulk(INDEX_NAME, NUM_DOCS_IN_BULK);
flushAndRefresh(INDEX_NAME);

// ensuring cluster is green after performing force-merge
ensureGreen();

SearchResponse searchResponse = client().prepareSearch(INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get();
// Asserting that search returns same number of docs as ingested
assertHitCount(searchResponse, NUM_DOCS_IN_BULK);

// Ingesting docs again before force merge
indexBulk(INDEX_NAME, NUM_DOCS_IN_BULK);
flushAndRefresh(INDEX_NAME);

FileCache fileCache = internalTestCluster.getDataNodeInstance(Node.class).fileCache();
IndexShard shard = internalTestCluster.getDataNodeInstance(IndicesService.class)
.indexService(resolveIndex(INDEX_NAME))
.getShardOrNull(0);
Directory directory = (((FilterDirectory) (((FilterDirectory) (shard.store().directory())).getDelegate())).getDelegate());

// Force merging the index
Set<String> filesBeforeMerge = new HashSet<>(Arrays.asList(directory.listAll()));
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).get();
flushAndRefresh(INDEX_NAME);
Set<String> filesAfterMerge = new HashSet<>(Arrays.asList(directory.listAll()));

Set<String> filesFromPreviousGenStillPresent = filesBeforeMerge.stream()
.filter(filesAfterMerge::contains)
.filter(file -> !FileTypeUtils.isLockFile(file))
.filter(file -> !FileTypeUtils.isSegmentsFile(file))
.collect(Collectors.toUnmodifiableSet());

// Asserting that after merge all the files from previous gen are no more part of the directory
assertTrue(filesFromPreviousGenStillPresent.isEmpty());
rayshrey marked this conversation as resolved.
Show resolved Hide resolved

// Asserting that files from previous gen are not present in File Cache as well
filesBeforeMerge.stream()
.filter(file -> !FileTypeUtils.isLockFile(file))
.filter(file -> !FileTypeUtils.isSegmentsFile(file))
.forEach(file -> assertNull(fileCache.get(((CompositeDirectory) directory).getFilePath(file))));

// Deleting the index (so that ref count drops to zero for all the files) and then pruning the cache to clear it to avoid any file
// leaks
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
fileCache.prune();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IndexSortConfig;
Expand Down Expand Up @@ -260,7 +261,10 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
* is ready for production release, the feature flag can be removed, and the
* setting should be moved to {@link #BUILT_IN_INDEX_SETTINGS}.
*/
public static final Map<String, List<Setting>> FEATURE_FLAGGED_INDEX_SETTINGS = Map.of();
public static final Map<String, List<Setting>> FEATURE_FLAGGED_INDEX_SETTINGS = Map.of(
FeatureFlags.TIERED_REMOTE_INDEX,
List.of(IndexModule.INDEX_STORE_LOCALITY_SETTING)
);

public static final IndexScopedSettings DEFAULT_SCOPED_SETTINGS = new IndexScopedSettings(Settings.EMPTY, BUILT_IN_INDEX_SETTINGS);

Expand Down
78 changes: 76 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@
import java.util.function.Function;
import java.util.function.Supplier;

import static org.apache.logging.log4j.util.Strings.toRootUpperCase;

/**
* IndexModule represents the central extension point for index level custom implementations like:
* <ul>
Expand Down Expand Up @@ -141,6 +143,17 @@
Property.NodeScope
);

/**
* Index setting which used to determine how the data is cached locally fully or partially
*/
public static final Setting<DataLocalityType> INDEX_STORE_LOCALITY_SETTING = new Setting<>(
"index.store.data_locality",
DataLocalityType.FULL.name(),
DataLocalityType::getValueOf,
Property.IndexScope,
rayshrey marked this conversation as resolved.
Show resolved Hide resolved
Property.NodeScope
rayshrey marked this conversation as resolved.
Show resolved Hide resolved
);

public static final Setting<String> INDEX_RECOVERY_TYPE_SETTING = new Setting<>(
"index.recovery.type",
"",
Expand Down Expand Up @@ -297,6 +310,7 @@
private final AtomicBoolean frozen = new AtomicBoolean(false);
private final BooleanSupplier allowExpensiveQueries;
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
private final FileCache fileCache;

/**
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
Expand All @@ -315,7 +329,8 @@
final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
final BooleanSupplier allowExpensiveQueries,
final IndexNameExpressionResolver expressionResolver,
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
final FileCache fileCache
) {
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
Expand All @@ -327,6 +342,30 @@
this.allowExpensiveQueries = allowExpensiveQueries;
this.expressionResolver = expressionResolver;
this.recoveryStateFactories = recoveryStateFactories;
this.fileCache = fileCache;
}

public IndexModule(
final IndexSettings indexSettings,
final AnalysisRegistry analysisRegistry,
final EngineFactory engineFactory,
final EngineConfigFactory engineConfigFactory,
final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
final BooleanSupplier allowExpensiveQueries,
final IndexNameExpressionResolver expressionResolver,
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories
) {
this(
indexSettings,
analysisRegistry,
engineFactory,
engineConfigFactory,
directoryFactories,
allowExpensiveQueries,
expressionResolver,
recoveryStateFactories,
null
);
}

/**
Expand Down Expand Up @@ -577,6 +616,40 @@
}
}

/**
* Indicates the locality of the data - whether it will be cached fully or partially
*/
public enum DataLocalityType {
/**
* Indicates that all the data will be cached locally
*/
FULL,
/**
* Indicates that only a subset of the data will be cached locally
*/
PARTIAL;

private static final Map<String, DataLocalityType> LOCALITY_TYPES;

static {
final Map<String, DataLocalityType> localityTypes = new HashMap<>(values().length);
for (final DataLocalityType dataLocalityType : values()) {
localityTypes.put(dataLocalityType.name(), dataLocalityType);
}
LOCALITY_TYPES = Collections.unmodifiableMap(localityTypes);
}

public static DataLocalityType getValueOf(final String localityType) {
Objects.requireNonNull(localityType, "No locality type given.");
final String localityTypeName = toRootUpperCase(localityType.trim());
final DataLocalityType type = LOCALITY_TYPES.get(localityTypeName);
if (type != null) {
return type;
}
throw new IllegalArgumentException("Unknown locality type constant [" + localityType + "].");

Check warning on line 649 in server/src/main/java/org/opensearch/index/IndexModule.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexModule.java#L649

Added line #L649 was not covered by tests
}
}

public static Type defaultStoreType(final boolean allowMmap) {
if (allowMmap && Constants.JRE_IS_64BIT && MMapDirectory.UNMAP_SUPPORTED) {
return Type.HYBRIDFS;
Expand Down Expand Up @@ -665,7 +738,8 @@
translogFactorySupplier,
clusterDefaultRefreshIntervalSupplier,
recoverySettings,
remoteStoreSettings
remoteStoreSettings,
fileCache
);
success = true;
return indexService;
Expand Down
Loading
Loading