Skip to content

Commit

Permalink
Add snapshot shard blobs with hashed prefix
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Aug 27, 2024
1 parent 2301adf commit 68d100b
Show file tree
Hide file tree
Showing 13 changed files with 864 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ private RemoteRestoreResult executeRestore(
.build();
}

IndexId indexId = new IndexId(indexName, updatedIndexMetadata.getIndexUUID());
// This instance of IndexId is not related to Snapshot Restore. Hence, we are using the ctor without pathType.
IndexId indexId = new IndexId(indexName, updatedIndexMetadata.getIndexUUID(), IndexId.DEFAULT_SHARD_PATH_TYPE);

if (metadataFromRemoteStore == false) {
Map<ShardId, IndexShardRoutingTable> indexShardRoutingTableMap = currentState.routingTable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public class RemoteIndexPathUploader extends IndexMetadataUploadListener {

public static final String DELIMITER = "#";
public static final ConfigBlobStoreFormat<RemoteIndexPath> REMOTE_INDEX_PATH_FORMAT = new ConfigBlobStoreFormat<>(
RemoteIndexPath.FILE_NAME_FORMAT
RemoteIndexPath.FILE_NAME_FORMAT,
RemoteIndexPath::fromXContent
);

private static final String TIMEOUT_EXCEPTION_MSG = "Timed out waiting while uploading remote index path file for indexes=%s";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public enum PathHashAlgorithm {
@Override
String hash(PathInput pathInput) {
StringBuilder input = new StringBuilder();
for (String path : pathInput.fixedSubPath().toArray()) {
for (String path : pathInput.hashPath().toArray()) {
input.append(path);
}
long hash = FNV1a.hash64(input.toString());
Expand All @@ -222,7 +222,7 @@ String hash(PathInput pathInput) {
@Override
String hash(PathInput pathInput) {
StringBuilder input = new StringBuilder();
for (String path : pathInput.fixedSubPath().toArray()) {
for (String path : pathInput.hashPath().toArray()) {
input.append(path);
}
long hash = FNV1a.hash64(input.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.index.remote.RemoteStoreEnums.DataType;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.util.Objects;

Expand Down Expand Up @@ -100,6 +101,10 @@ BlobPath fixedSubPath() {
return BlobPath.cleanPath().add(indexUUID);
}

BlobPath hashPath() {
return BlobPath.cleanPath().add(indexUUID);
}

/**
* Returns a new builder for {@link PathInput}.
*/
Expand Down Expand Up @@ -127,7 +132,7 @@ public T basePath(BlobPath basePath) {
return self();
}

public Builder indexUUID(String indexUUID) {
public T indexUUID(String indexUUID) {
this.indexUUID = indexUUID;
return self();
}
Expand All @@ -142,6 +147,61 @@ public PathInput build() {
}
}

/**
* A subclass of {@link PathInput} that represents the input required to generate a path
* for a shard in a snapshot. It includes the base path, index UUID, and shard ID.
*
* @opensearch.internal
*/
public static class SnapshotShardPathInput extends PathInput {
private final String shardId;

public SnapshotShardPathInput(SnapshotShardPathInput.Builder builder) {
super(builder);
this.shardId = Objects.requireNonNull(builder.shardId);
}

@Override
BlobPath fixedSubPath() {
return BlobPath.cleanPath().add(BlobStoreRepository.INDICES_DIR).add(super.fixedSubPath()).add(shardId);
}

@Override
BlobPath hashPath() {
return BlobPath.cleanPath().add(shardId).add(indexUUID());
}

/**
* Returns a new builder for {@link SnapshotShardPathInput}.
*/
public static SnapshotShardPathInput.Builder builder() {
return new SnapshotShardPathInput.Builder();
}

/**
* Builder for {@link SnapshotShardPathInput}.
*
* @opensearch.internal
*/
public static class Builder extends PathInput.Builder<SnapshotShardPathInput.Builder> {
private String shardId;

public SnapshotShardPathInput.Builder shardId(String shardId) {
this.shardId = shardId;
return this;
}

@Override
protected SnapshotShardPathInput.Builder self() {
return this;
}

public SnapshotShardPathInput build() {
return new SnapshotShardPathInput(this);
}
}
}

/**
* Wrapper class for the data aware path input required to generate path for remote store uploads. This input is
* composed of the parent inputs, shard id, data category and data type.
Expand Down Expand Up @@ -204,16 +264,6 @@ public static class Builder extends PathInput.Builder<Builder> {
private DataCategory dataCategory;
private DataType dataType;

public Builder basePath(BlobPath basePath) {
super.basePath = basePath;
return this;
}

public Builder indexUUID(String indexUUID) {
super.indexUUID = indexUUID;
return this;
}

public Builder shardId(String shardId) {
this.shardId = shardId;
return this;
Expand Down
38 changes: 32 additions & 6 deletions server/src/main/java/org/opensearch/repositories/IndexId.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.repositories;

import org.opensearch.Version;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand All @@ -40,6 +41,7 @@
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.remote.RemoteStoreEnums;

import java.io.IOException;
import java.util.Objects;
Expand All @@ -51,23 +53,36 @@
*/
@PublicApi(since = "1.0.0")
public final class IndexId implements Writeable, ToXContentObject {
protected static final String NAME = "name";
protected static final String ID = "id";
static final String NAME = "name";
static final String ID = "id";
static final String SHARD_PATH_TYPE = "shard_path_type";
public static final int DEFAULT_SHARD_PATH_TYPE = RemoteStoreEnums.PathType.FIXED.getCode();

private final String name;
private final String id;
private final int shardPathType;
private final int hashCode;

// Used for testing only
public IndexId(final String name, final String id) {
this(name, id, DEFAULT_SHARD_PATH_TYPE);
}

public IndexId(String name, String id, int shardPathType) {
this.name = name;
this.id = id;
this.shardPathType = shardPathType;
this.hashCode = computeHashCode();

}

public IndexId(final StreamInput in) throws IOException {
this.name = in.readString();
this.id = in.readString();
if (in.getVersion().onOrAfter(Version.CURRENT)) {
this.shardPathType = in.readVInt();
} else {
this.shardPathType = DEFAULT_SHARD_PATH_TYPE;
}
this.hashCode = computeHashCode();
}

Expand All @@ -93,9 +108,16 @@ public String getId() {
return id;
}

/**
* The storage path type in remote store for the indexes having the underlying index ids.
*/
public int getShardPathType() {
return shardPathType;
}

@Override
public String toString() {
return "[" + name + "/" + id + "]";
return "[" + name + "/" + id + "/" + shardPathType + "]";
}

@Override
Expand All @@ -107,7 +129,7 @@ public boolean equals(Object o) {
return false;
}
IndexId that = (IndexId) o;
return Objects.equals(name, that.name) && Objects.equals(id, that.id);
return Objects.equals(name, that.name) && Objects.equals(id, that.id) && Objects.equals(this.shardPathType, that.shardPathType);
}

@Override
Expand All @@ -116,20 +138,24 @@ public int hashCode() {
}

private int computeHashCode() {
return Objects.hash(name, id);
return Objects.hash(name, id, shardPathType);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeString(name);
out.writeString(id);
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeVInt(shardPathType);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field(NAME, name);
builder.field(ID, id);
builder.field(SHARD_PATH_TYPE, shardPathType);
builder.endObject();
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,15 +517,15 @@ public List<IndexId> resolveIndices(final List<String> indices) {
* @param indicesToResolve names of indices to resolve
* @param inFlightIds name to index mapping for currently in-flight snapshots not yet in the repository data to fall back to
*/
public List<IndexId> resolveNewIndices(List<String> indicesToResolve, Map<String, IndexId> inFlightIds) {
public List<IndexId> resolveNewIndices(List<String> indicesToResolve, Map<String, IndexId> inFlightIds, int pathType) {
List<IndexId> snapshotIndices = new ArrayList<>();
for (String index : indicesToResolve) {
IndexId indexId = indices.get(index);
if (indexId == null) {
indexId = inFlightIds.get(index);
}
if (indexId == null) {
indexId = new IndexId(index, UUIDs.randomBase64UUID());
indexId = new IndexId(index, UUIDs.randomBase64UUID(), pathType);
}
snapshotIndices.add(indexId);
}
Expand All @@ -544,10 +544,16 @@ public List<IndexId> resolveNewIndices(List<String> indicesToResolve, Map<String
private static final String VERSION = "version";
private static final String MIN_VERSION = "min_version";

// Visible for testing only
public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final Version repoMetaVersion) throws IOException {
return snapshotsToXContent(builder, repoMetaVersion, Version.CURRENT);
}

/**
* Writes the snapshots metadata and the related indices metadata to x-content.
*/
public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final Version repoMetaVersion) throws IOException {
public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final Version repoMetaVersion, final Version minNodeVersion)
throws IOException {
builder.startObject();
// write the snapshots list
builder.startArray(SNAPSHOTS);
Expand Down Expand Up @@ -578,6 +584,9 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final
for (final IndexId indexId : getIndices().values()) {
builder.startObject(indexId.getName());
builder.field(INDEX_ID, indexId.getId());
if (minNodeVersion.onOrAfter(Version.CURRENT)) {
builder.field(IndexId.SHARD_PATH_TYPE, indexId.getShardPathType());
}
builder.startArray(SNAPSHOTS);
List<SnapshotId> snapshotIds = indexSnapshots.get(indexId);
assert snapshotIds != null;
Expand Down Expand Up @@ -765,14 +774,20 @@ private static void parseIndices(
final List<SnapshotId> snapshotIds = new ArrayList<>();
final List<String> gens = new ArrayList<>();

String id = null;
int pathType = IndexId.DEFAULT_SHARD_PATH_TYPE;
IndexId indexId = null;

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
final String indexMetaFieldName = parser.currentName();
final XContentParser.Token currentToken = parser.nextToken();
switch (indexMetaFieldName) {
case INDEX_ID:
indexId = new IndexId(indexName, parser.text());
id = parser.text();
break;
case IndexId.SHARD_PATH_TYPE:
pathType = parser.intValue();
break;
case SNAPSHOTS:
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, currentToken, parser);
Expand All @@ -795,7 +810,7 @@ private static void parseIndices(
// different versions create or delete snapshot in the same repository.
throw new OpenSearchParseException(
"Detected a corrupted repository, index "
+ indexId
+ new IndexId(indexName, id, pathType)
+ " references an unknown snapshot uuid ["
+ uuid
+ "]"
Expand All @@ -812,9 +827,10 @@ private static void parseIndices(
break;
}
}
assert indexId != null;
assert id != null;
indexId = new IndexId(indexName, id, pathType);
indexSnapshots.put(indexId, Collections.unmodifiableList(snapshotIds));
indexLookup.put(indexId.getId(), indexId);
indexLookup.put(id, indexId);
for (int i = 0; i < gens.size(); i++) {
String parsedGen = gens.get(i);
if (parsedGen != null) {
Expand Down
Loading

0 comments on commit 68d100b

Please sign in to comment.