Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <[email protected]>
  • Loading branch information
Arpit-Bandejiya committed Jul 8, 2024
1 parent c30e574 commit 64d94a4
Show file tree
Hide file tree
Showing 12 changed files with 386 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteStateTransferException;
import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore;
Expand Down Expand Up @@ -56,10 +55,6 @@
*/
public class InternalRemoteRoutingTableService extends AbstractLifecycleComponent implements RemoteRoutingTableService {

public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing";
public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing";
public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--";

private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class);
private final Settings settings;
private final Supplier<RepositoriesService> repositoriesService;
Expand Down Expand Up @@ -126,14 +121,13 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
@Override
public CheckedRunnable<IOException> getAsyncIndexRoutingWriteAction(
ClusterState clusterState,
String clusterUUID,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {

RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(
indexRouting,
clusterUUID,
clusterState.metadata().clusterUUID(),
compressor,
clusterState.term(),
clusterState.version()
Expand Down Expand Up @@ -177,7 +171,6 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
String clusterUUID,
String uploadedFilename,
Index index,
LatchedActionListener<IndexRoutingTable> latchedActionListener
) {

Expand All @@ -186,7 +179,7 @@ public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
latchedActionListener::onFailure
);

RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, index, clusterUUID, compressor);
RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor);

return () -> remoteWritableEntityStore.readAsync(remoteIndexRoutingTable, actionListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.io.IOException;
Expand Down Expand Up @@ -43,7 +42,6 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
@Override
public CheckedRunnable<IOException> getAsyncIndexRoutingWriteAction(
ClusterState clusterState,
String clusterUUID,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {
Expand All @@ -65,7 +63,6 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
String clusterUUID,
String uploadedFilename,
Index index,
LatchedActionListener<IndexRoutingTable> latchedActionListener
) {
// noop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.opensearch.common.lifecycle.LifecycleComponent;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.io.IOException;
Expand Down Expand Up @@ -48,7 +47,6 @@ public IndexRoutingTable read(StreamInput in, String key) throws IOException {
CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
String clusterUUID,
String uploadedFilename,
Index index,
LatchedActionListener<IndexRoutingTable> latchedActionListener
);

Expand All @@ -64,7 +62,6 @@ DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>>

CheckedRunnable<IOException> getAsyncIndexRoutingWriteAction(
ClusterState clusterState,
String clusterUUID,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.opensearch.cluster.node.DiscoveryNodes.Builder;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableServiceFactory;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -43,7 +42,6 @@
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.index.Index;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
Expand Down Expand Up @@ -106,6 +104,8 @@
import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA;
import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA;
import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
Expand Down Expand Up @@ -656,13 +656,8 @@ private UploadedMetadataResults writeMetadataInParallel(
});
indicesRoutingToUpload.forEach(indexRoutingTable -> {
uploadTasks.put(
InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName(),
remoteRoutingTableService.getAsyncIndexRoutingWriteAction(
clusterState,
clusterState.metadata().clusterUUID(),
indexRoutingTable,
listener
)
String.join(CUSTOM_DELIMITER, INDEX_ROUTING_TABLE, indexRoutingTable.getIndex().getName()),
remoteRoutingTableService.getAsyncIndexRoutingWriteAction(clusterState, indexRoutingTable, listener)
);
});

Expand Down Expand Up @@ -712,7 +707,7 @@ private UploadedMetadataResults writeMetadataInParallel(
UploadedMetadataResults response = new UploadedMetadataResults();
results.forEach((name, uploadedMetadata) -> {
if (uploadedMetadata.getClass().equals(UploadedIndexMetadata.class)
&& uploadedMetadata.getComponent().contains(InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX)) {
&& uploadedMetadata.getComponent().contains(INDEX_ROUTING_TABLE_PREFIX)) {
response.uploadedIndicesRoutingMetadata.add((UploadedIndexMetadata) uploadedMetadata);
} else if (name.startsWith(CUSTOM_METADATA)) {
// component name for custom metadata will look like custom--<metadata-attribute>
Expand Down Expand Up @@ -1037,7 +1032,6 @@ private ClusterState readClusterStateInParallel(
remoteRoutingTableService.getAsyncIndexRoutingReadAction(
clusterUUID,
indexRouting.getUploadedFilename(),
new Index(indexRouting.getIndexName(), indexRouting.getIndexUUID()),
routingTableLatchedActionListener
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;

import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_PATH_TOKEN;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;

/**
* Extends the RemoteClusterStateBlobStore to support {@link RemoteIndexRoutingTable}
Expand Down Expand Up @@ -60,7 +56,6 @@ public class RemoteRoutingTableBlobStore<IndexRoutingTable, U extends AbstractRe
Setting.Property.Dynamic
);

public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing";
private RemoteStoreEnums.PathType pathType;
private RemoteStoreEnums.PathHashAlgorithm pathHashAlgo;

Expand All @@ -85,7 +80,7 @@ public BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity<Inde
BlobPath indexRoutingPath = getBasePath().add(RemoteClusterStateUtils.encodeString(getClusterName()))
.add("cluster-state")
.add(obj.clusterUUID())
.add(INDEX_ROUTING_PATH_TOKEN);
.add(INDEX_ROUTING_TABLE);
BlobPath path = pathType.path(
RemoteStorePathStrategy.PathInput.builder()
.basePath(indexRoutingPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,8 @@

package org.opensearch.gateway.remote.routingtable;


import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.common.io.Streams;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.store.IndexOutputOutputStream;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.BlobPathParameters;
import org.opensearch.core.compress.Compressor;
Expand All @@ -22,29 +18,25 @@
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.repositories.blobstore.ChecksumWritableBlobStoreFormat;


import java.io.IOException;
import java.io.InputStream;
import java.util.List;

import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore.INDEX_ROUTING_FILE_PREFIX;

/**
* Remote store object for IndexRoutingTable
*/
public class RemoteIndexRoutingTable extends AbstractRemoteWritableBlobEntity<IndexRoutingTable> {

public static final String INDEX_ROUTING_TABLE = "index_routing_table";
public static final String INDEX_ROUTING_TABLE = "index-routing";
public static final String INDEX_ROUTING_TABLE_PREFIX = "index-routing--";
private IndexRoutingTable indexRoutingTable;
private final Index index;
private long term;
private long version;
public static final ChecksumWritableBlobStoreFormat<IndexRoutingTable> INDEX_ROUTING_TABLE_FORMAT = new ChecksumWritableBlobStoreFormat<>(
"index-routing-table",
IndexRoutingTable::readFrom
);
public static final ChecksumWritableBlobStoreFormat<IndexRoutingTable> INDEX_ROUTING_TABLE_FORMAT =
new ChecksumWritableBlobStoreFormat<>("index-routing-table", IndexRoutingTable::readFrom);

public RemoteIndexRoutingTable(
IndexRoutingTable indexRoutingTable,
Expand All @@ -58,17 +50,17 @@ public RemoteIndexRoutingTable(
this.indexRoutingTable = indexRoutingTable;
this.term = term;
this.version = version;
this.blobFileName = generateBlobFileName();
}

/**
* Reads data from inputStream and creates RemoteIndexRoutingTable object with the {@link IndexRoutingTable}
* @param blobName name of the blob, which contains the index routing data
* @param index index for the current routing data
* @param clusterUUID UUID of the cluster
* @param compressor Compressor object
*/
public RemoteIndexRoutingTable(String blobName, Index index, String clusterUUID, Compressor compressor) {
public RemoteIndexRoutingTable(String blobName, String clusterUUID, Compressor compressor) {
super(clusterUUID, compressor, null);
this.index = index;
this.index = null;
this.term = -1;
this.version = -1;
this.blobName = blobName;
Expand Down Expand Up @@ -96,7 +88,7 @@ public String getType() {
public String generateBlobFileName() {
return String.join(
DELIMITER,
INDEX_ROUTING_FILE_PREFIX,
INDEX_ROUTING_TABLE_PREFIX,
RemoteStoreUtils.invertLong(term),
RemoteStoreUtils.invertLong(version),
RemoteStoreUtils.invertLong(System.currentTimeMillis())
Expand All @@ -105,7 +97,9 @@ public String generateBlobFileName() {

@Override
public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() {
return new ClusterMetadataManifest.UploadedIndexMetadata(index.getName(), index.getUUID(), blobName, INDEX_ROUTING_METADATA_PREFIX);
assert blobName != null;
assert index != null;
return new ClusterMetadataManifest.UploadedIndexMetadata(index.getName(), index.getUUID(), blobName, INDEX_ROUTING_TABLE_PREFIX);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.compress.NoneCompressor;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
Expand All @@ -22,6 +26,7 @@

import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.mockito.Mockito.mock;

public class RemoteRoutingTableServiceFactoryTests extends OpenSearchTestCase {

Expand All @@ -36,11 +41,18 @@ public void teardown() throws Exception {

public void testGetServiceWhenRemoteRoutingDisabled() {
Settings settings = Settings.builder().build();
BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class);
Compressor compressor = new NoneCompressor();
BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class);
RemoteRoutingTableService service = RemoteRoutingTableServiceFactory.getService(
repositoriesService,
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool
threadPool,
compressor,
blobStoreTransferService,
blobStoreRepository,
"test-cluster"
);
assertTrue(service instanceof NoopRemoteRoutingTableService);
}
Expand All @@ -50,13 +62,20 @@ public void testGetServiceWhenRemoteRoutingEnabled() {
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository")
.put(FsRepository.REPOSITORIES_COMPRESS_SETTING.getKey(), false)
.build();
BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class);
Compressor compressor = new NoneCompressor();
BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class);
Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build();
FeatureFlags.initializeFeatureFlags(nodeSettings);
RemoteRoutingTableService service = RemoteRoutingTableServiceFactory.getService(
repositoriesService,
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool
threadPool,
compressor,
blobStoreTransferService,
blobStoreRepository,
"test-cluster"
);
assertTrue(service instanceof InternalRemoteRoutingTableService);
}
Expand Down
Loading

0 comments on commit 64d94a4

Please sign in to comment.