Skip to content

Commit

Permalink
Rough implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ltaragi committed Aug 8, 2024
1 parent c6189a9 commit 13d099a
Show file tree
Hide file tree
Showing 8 changed files with 317 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ public static void createIndex(String indexName, int replicaCount) {
.prepareCreate(indexName)
.setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replicaCount)
.build()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
Expand All @@ -20,12 +23,14 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.hamcrest.OpenSearchAssertions;

import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
Expand Down Expand Up @@ -98,7 +103,7 @@ public void testMigrationDirections() {
assertThrows(IllegalArgumentException.class, () -> client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}

public void testNoShallowSnapshotInMixedMode() throws Exception {
public void test_remote_cluster() throws Exception {
logger.info("Initialize remote cluster");
addRemote = true;
internalCluster().setBootstrapClusterManagerNodeIndex(0);
Expand All @@ -110,8 +115,18 @@ public void testNoShallowSnapshotInMixedMode() throws Exception {
internalCluster().validateClusterFormed();

logger.info("Create remote backed index");
RemoteStoreMigrationShardAllocationBaseTestCase.createIndex("test", 0);
RemoteStoreMigrationShardAllocationBaseTestCase.assertRemoteStoreBackedIndex("test");
String index1 = "index1";
String index2 = "index2";
String index3 = "index3";
RemoteStoreMigrationShardAllocationBaseTestCase.createIndex(index1, 0);
RemoteStoreMigrationShardAllocationBaseTestCase.createIndex(index2, 0);
// RemoteStoreMigrationShardAllocationBaseTestCase.createIndex(index3, 0);

logger.info("--> indexing some data");
for (int i = 0; i < 10; i++) {
index(index1, "_doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();

logger.info("Create shallow snapshot setting enabled repo");
String shallowSnapshotRepoName = "shallow-snapshot-repo-name";
Expand All @@ -131,19 +146,23 @@ public void testNoShallowSnapshotInMixedMode() throws Exception {
SnapshotInfo snapshotInfo1 = RemoteStoreMigrationShardAllocationBaseTestCase.createSnapshot(
shallowSnapshotRepoName,
snapshot1,
"test"
index1
);
assertEquals(snapshotInfo1.isRemoteStoreIndexShallowCopyEnabled(), true);
// SnapshotsStatusRequest snapshotsStatusRequest = client.admin().cluster().prepareSnapshotStatus().setSnapshots(snapshot1).setIndices("tria-a", "trial-b").request();
// SnapshotStatus snapshotStatus1 = client.admin().cluster().execute(snapshotsStatusRequest);
// logger.info("*** snapshotStatus1 = {}", snapshotStatus1.toString());

// assertBusy(() -> {
// SnapshotsStatusResponse snapshotsStatusResponse = client().admin().cluster().prepareSnapshotStatus(shallowSnapshotRepoName)
// .setSnapshots(snapshot1, "snapshot2", "taragilk-snapshot")
//// .setIndices(index1, index2, index3)
// .setIgnoreUnavailable(false)
// .get();
// SnapshotStatus snapshotStatus1 = snapshotsStatusResponse.getSnapshots().get(0);
// logger.info("*** current snapshot status - totalSize [{}]", snapshotStatus1.getStats().getTotalSize());
// assertNotEquals(0L, snapshotStatus1.getStats().getTotalSize());
// }, 1, TimeUnit.MINUTES);

logger.info("Set MIXED compatibility mode");
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

logger.info("Verify that new snapshot is not shallow");
final String snapshot2 = "snapshot2";
SnapshotInfo snapshotInfo2 = RemoteStoreMigrationShardAllocationBaseTestCase.createSnapshot(shallowSnapshotRepoName, snapshot2);
assertEquals(snapshotInfo2.isRemoteStoreIndexShallowCopyEnabled(), false);
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public class SnapshotStatus implements ToXContentObject, Writeable {
includeGlobalState = in.readOptionalBoolean();
final long startTime = in.readLong();
final long time = in.readLong();
updateShardStats(startTime, time);
updateShardStats(startTime, time, 0);
}

SnapshotStatus(
Expand All @@ -105,15 +105,16 @@ public class SnapshotStatus implements ToXContentObject, Writeable {
List<SnapshotIndexShardStatus> shards,
Boolean includeGlobalState,
long startTime,
long time
long time,
long initialTotalSize
) {
this.snapshot = Objects.requireNonNull(snapshot);
this.state = Objects.requireNonNull(state);
this.shards = Objects.requireNonNull(shards);
this.includeGlobalState = includeGlobalState;
shardsStats = new SnapshotShardsStats(shards);
assert time >= 0 : "time must be >= 0 but received [" + time + "]";
updateShardStats(startTime, time);
updateShardStats(startTime, time, initialTotalSize);
}

private SnapshotStatus(
Expand Down Expand Up @@ -299,8 +300,8 @@ public static SnapshotStatus fromXContent(XContentParser parser) throws IOExcept
return PARSER.parse(parser, null);
}

private void updateShardStats(long startTime, long time) {
stats = new SnapshotStats(startTime, time, 0, 0, 0, 0, 0, 0);
private void updateShardStats(long startTime, long time, long initialTotalSize) {
stats = new SnapshotStats(startTime, time, 0, 0, 0, 0, initialTotalSize, 0);
shardsStats = new SnapshotShardsStats(shards);
for (SnapshotIndexShardStatus shard : shards) {
// BWC: only update timestamps when we did not get a start time from an old node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class SnapshotsStatusRequest extends ClusterManagerNodeRequest<SnapshotsS
private String repository = "_all";

private String[] snapshots = Strings.EMPTY_ARRAY;
private String[] indices = Strings.EMPTY_ARRAY;

private boolean ignoreUnavailable;

Expand All @@ -65,15 +66,17 @@ public SnapshotsStatusRequest() {}
* @param repository repository name
* @param snapshots list of snapshots
*/
public SnapshotsStatusRequest(String repository, String[] snapshots) {
public SnapshotsStatusRequest(String repository, String[] snapshots, String[] indices) {
this.repository = repository;
this.snapshots = snapshots;
this.indices = indices;
}

public SnapshotsStatusRequest(StreamInput in) throws IOException {
super(in);
repository = in.readString();
snapshots = in.readStringArray();
indices = in.readStringArray();
ignoreUnavailable = in.readBoolean();
}

Expand All @@ -82,6 +85,7 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(repository);
out.writeStringArray(snapshots);
out.writeStringArray(indices);
out.writeBoolean(ignoreUnavailable);
}

Expand All @@ -103,6 +107,9 @@ public ActionRequestValidationException validate() {
if (snapshots == null) {
validationException = addValidationError("snapshots is null", validationException);
}
if (indices.length != 0 && snapshots.length != 1) {
validationException = addValidationError("index list filter is supported only for a single snapshot", validationException);
}
return validationException;
}

Expand Down Expand Up @@ -146,10 +153,29 @@ public SnapshotsStatusRequest snapshots(String[] snapshots) {
}

/**
* Set to <code>true</code> to ignore unavailable snapshots, instead of throwing an exception.
* Defaults to <code>false</code>, which means unavailable snapshots cause an exception to be thrown.
* Returns the names of the indices.
*
* @return the names of indices
*/
public String[] indices() {
return this.indices;
}

/**
* Sets the list of indices to be returned
*
* @return this request
*/
public SnapshotsStatusRequest indices(String[] indices) {
this.indices = indices;
return this;
}

/**
* Set to <code>true</code> to ignore unavailable snapshots and indices, instead of throwing an exception.
* Defaults to <code>false</code>, which means unavailable snapshots and indices cause an exception to be thrown.
*
* @param ignoreUnavailable whether to ignore unavailable snapshots
* @param ignoreUnavailable whether to ignore unavailable snapshots and indices
* @return this request
*/
public SnapshotsStatusRequest ignoreUnavailable(boolean ignoreUnavailable) {
Expand All @@ -158,9 +184,9 @@ public SnapshotsStatusRequest ignoreUnavailable(boolean ignoreUnavailable) {
}

/**
* Returns whether the request permits unavailable snapshots to be ignored.
* Returns whether the request permits unavailable snapshots and indices to be ignored.
*
* @return true if the request will ignore unavailable snapshots, false if it will throw an exception on unavailable snapshots
* @return true if the request will ignore unavailable snapshots and indices, false if it will throw an exception on unavailable snapshots and indices
*/
public boolean ignoreUnavailable() {
return ignoreUnavailable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,32 @@ public SnapshotsStatusRequestBuilder addSnapshots(String... snapshots) {
}

/**
* Set to <code>true</code> to ignore unavailable snapshots, instead of throwing an exception.
* Defaults to <code>false</code>, which means unavailable snapshots cause an exception to be thrown.
* Sets list of indices to return
*
* @param ignoreUnavailable whether to ignore unavailable snapshots.
* @param indices list of indices
* @return this builder
*/
public SnapshotsStatusRequestBuilder setIndices(String... indices) {
request.indices(indices);
return this;
}

/**
* Adds additional indices to the list of indices to return
*
* @param indices additional indices
* @return this builder
*/
public SnapshotsStatusRequestBuilder addIndices(String... indices) {
request.indices(ArrayUtils.concat(request.indices(), indices));
return this;
}

/**
* Set to <code>true</code> to ignore unavailable snapshots and indices, instead of throwing an exception.
* Defaults to <code>false</code>, which means unavailable snapshots and indices cause an exception to be thrown.
*
* @param ignoreUnavailable whether to ignore unavailable snapshots and indices.
* @return this builder
*/
public SnapshotsStatusRequestBuilder setIgnoreUnavailable(boolean ignoreUnavailable) {
Expand Down
Loading

0 comments on commit 13d099a

Please sign in to comment.