Skip to content

Commit

Permalink
Separate out changes to add primary store size in status
Browse files Browse the repository at this point in the history
Signed-off-by: Lakshya Taragi <[email protected]>
  • Loading branch information
ltaragi committed Aug 28, 2024
1 parent 1a5fa32 commit 513732d
Show file tree
Hide file tree
Showing 24 changed files with 41 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ private void assertShallowV2SnapshotStatus(SnapshotStatus snapshotStatus, boolea
if (hasIndexFilter) {
assertEquals(0, snapshotStatus.getStats().getTotalSize());
} else {
assertTrue(snapshotStatus.getStats().getTotalSize() > 0);
// TODO: after adding primary store size at the snapshot level, total size here should be > 0
}
// assert that total and incremental values of file count and size_in_bytes are 0 at index and shard levels
assertEquals(0, snapshotStatus.getStats().getTotalFileCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

package org.opensearch.action.admin.cluster.snapshots.status;

import org.opensearch.Version;
import org.opensearch.cluster.SnapshotsInProgress;
import org.opensearch.cluster.SnapshotsInProgress.State;
import org.opensearch.common.Nullable;
Expand Down Expand Up @@ -87,8 +86,6 @@ public class SnapshotStatus implements ToXContentObject, Writeable {

private SnapshotStats stats;

private final long initialTotalSizeInBytes;

@Nullable
private final Boolean includeGlobalState;

Expand All @@ -99,12 +96,7 @@ public class SnapshotStatus implements ToXContentObject, Writeable {
includeGlobalState = in.readOptionalBoolean();
final long startTime = in.readLong();
final long time = in.readLong();
if (in.getVersion().onOrAfter(Version.V_2_17_0)) {
initialTotalSizeInBytes = in.readOptionalLong();
} else {
initialTotalSizeInBytes = 0L;
}
updateShardStats(startTime, time, initialTotalSizeInBytes);
updateShardStats(startTime, time);
}

SnapshotStatus(
Expand All @@ -113,18 +105,15 @@ public class SnapshotStatus implements ToXContentObject, Writeable {
List<SnapshotIndexShardStatus> shards,
Boolean includeGlobalState,
long startTime,
long time,
long initialTotalSizeInBytes
long time
) {
this.snapshot = Objects.requireNonNull(snapshot);
this.state = Objects.requireNonNull(state);
this.shards = Objects.requireNonNull(shards);
this.includeGlobalState = includeGlobalState;
shardsStats = new SnapshotShardsStats(shards);
assert time >= 0 : "time must be >= 0 but received [" + time + "]";
this.initialTotalSizeInBytes = initialTotalSizeInBytes;
assert initialTotalSizeInBytes >= 0 : "initialTotalSizeInBytes must be >= 0 but received [" + initialTotalSizeInBytes + "]";
updateShardStats(startTime, time, initialTotalSizeInBytes);
updateShardStats(startTime, time);
}

private SnapshotStatus(
Expand All @@ -134,8 +123,7 @@ private SnapshotStatus(
Map<String, SnapshotIndexStatus> indicesStatus,
SnapshotShardsStats shardsStats,
SnapshotStats stats,
Boolean includeGlobalState,
long initialTotalSizeInBytes
Boolean includeGlobalState
) {
this.snapshot = snapshot;
this.state = state;
Expand All @@ -144,7 +132,6 @@ private SnapshotStatus(
this.shardsStats = shardsStats;
this.stats = stats;
this.includeGlobalState = includeGlobalState;
this.initialTotalSizeInBytes = initialTotalSizeInBytes;
}

/**
Expand Down Expand Up @@ -217,9 +204,6 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalBoolean(includeGlobalState);
out.writeLong(stats.getStartTime());
out.writeLong(stats.getTime());
if (out.getVersion().onOrAfter(Version.V_2_17_0)) {
out.writeOptionalLong(initialTotalSizeInBytes);
}
}

@Override
Expand All @@ -240,7 +224,6 @@ public SnapshotStats getStats() {
private static final String STATE = "state";
private static final String INDICES = "indices";
private static final String INCLUDE_GLOBAL_STATE = "include_global_state";
private static final String INITIAL_TOTAL_SIZE_IN_BYTES = "initial_total_size_in_bytes";

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
Expand All @@ -252,9 +235,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (includeGlobalState != null) {
builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState);
}
if (initialTotalSizeInBytes != 0) {
builder.field(INITIAL_TOTAL_SIZE_IN_BYTES, initialTotalSizeInBytes);
}
builder.field(SnapshotShardsStats.Fields.SHARDS_STATS, shardsStats, params);
builder.field(SnapshotStats.Fields.STATS, stats, params);
builder.startObject(INDICES);
Expand All @@ -276,7 +256,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
String uuid = (String) parsedObjects[i++];
String rawState = (String) parsedObjects[i++];
Boolean includeGlobalState = (Boolean) parsedObjects[i++];
Long initialTotalSizeInBytes = (Long) parsedObjects[i++];
SnapshotStats stats = ((SnapshotStats) parsedObjects[i++]);
SnapshotShardsStats shardsStats = ((SnapshotShardsStats) parsedObjects[i++]);
@SuppressWarnings("unchecked")
Expand All @@ -297,16 +276,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
shards.addAll(index.getShards().values());
}
}
return new SnapshotStatus(
snapshot,
state,
shards,
indicesStatus,
shardsStats,
stats,
includeGlobalState,
initialTotalSizeInBytes
);
return new SnapshotStatus(snapshot, state, shards, indicesStatus, shardsStats, stats, includeGlobalState);
}
);
static {
Expand All @@ -315,7 +285,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
PARSER.declareString(constructorArg(), new ParseField(UUID));
PARSER.declareString(constructorArg(), new ParseField(STATE));
PARSER.declareBoolean(optionalConstructorArg(), new ParseField(INCLUDE_GLOBAL_STATE));
PARSER.declareLong(optionalConstructorArg(), new ParseField(INITIAL_TOTAL_SIZE_IN_BYTES));
PARSER.declareField(
constructorArg(),
SnapshotStats::fromXContent,
Expand All @@ -330,8 +299,8 @@ public static SnapshotStatus fromXContent(XContentParser parser) throws IOExcept
return PARSER.parse(parser, null);
}

private void updateShardStats(long startTime, long time, long initialTotalSizeInBytes) {
stats = new SnapshotStats(startTime, time, 0, 0, 0, 0, initialTotalSizeInBytes, 0);
private void updateShardStats(long startTime, long time) {
stats = new SnapshotStats(startTime, time, 0, 0, 0, 0, 0, 0);
shardsStats = new SnapshotShardsStats(shards);
for (SnapshotIndexShardStatus shard : shards) {
// BWC: only update timestamps when we did not get a start time from an old node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,7 @@ private void buildResponse(
Collections.unmodifiableList(shardStatusBuilder),
entry.includeGlobalState(),
entry.startTime(),
Math.max(threadPool.absoluteTimeInMillis() - entry.startTime(), 0L),
0L
Math.max(threadPool.absoluteTimeInMillis() - entry.startTime(), 0L)
)
);
}
Expand Down Expand Up @@ -345,7 +344,7 @@ private void loadRepositoryData(
boolean isShallowV2Snapshot = snapshotInfo.getPinnedTimestamp() > 0;
long initialSnapshotTotalSize = 0;
if (isShallowV2Snapshot && request.indices().length == 0) {
initialSnapshotTotalSize = snapshotInfo.getSnapshotSizeInBytes();
// TODO: add primary store size in bytes at the snapshot level
}

for (Map.Entry<ShardId, IndexShardSnapshotStatus> shardStatus : shardStatuses.entrySet()) {
Expand Down Expand Up @@ -378,8 +377,7 @@ private void loadRepositoryData(
snapshotInfo.includeGlobalState(),
startTime,
// Use current time to calculate overall runtime for in-progress snapshots that have endTime == 0
(endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime,
initialSnapshotTotalSize
(endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime
)
);
}
Expand Down
26 changes: 3 additions & 23 deletions server/src/main/java/org/opensearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,11 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
final Map<ShardRouting, String> routingToDataPath;
final Map<NodeAndPath, ReservedSpace> reservedSpace;
final Map<String, FileCacheStats> nodeFileCacheStats;
final long primaryStoreSize;

private long avgTotalBytes;
private long avgFreeByte;

protected ClusterInfo() {
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), 0L);
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
}

/**
Expand All @@ -86,7 +84,6 @@ protected ClusterInfo() {
* @param shardSizes a shardkey to size in bytes mapping per shard.
* @param routingToDataPath the shard routing to datapath mapping
* @param reservedSpace reserved space per shard broken down by node and data path
* @param primaryStoreSize total size in bytes for all the primary shards
* @see #shardIdentifierFromRouting
*/
public ClusterInfo(
Expand All @@ -95,16 +92,14 @@ public ClusterInfo(
final Map<String, Long> shardSizes,
final Map<ShardRouting, String> routingToDataPath,
final Map<NodeAndPath, ReservedSpace> reservedSpace,
final Map<String, FileCacheStats> nodeFileCacheStats,
final long primaryStoreSize
final Map<String, FileCacheStats> nodeFileCacheStats
) {
this.leastAvailableSpaceUsage = leastAvailableSpaceUsage;
this.shardSizes = shardSizes;
this.mostAvailableSpaceUsage = mostAvailableSpaceUsage;
this.routingToDataPath = routingToDataPath;
this.reservedSpace = reservedSpace;
this.nodeFileCacheStats = nodeFileCacheStats;
this.primaryStoreSize = primaryStoreSize;
calculateAvgFreeAndTotalBytes(mostAvailableSpaceUsage);
}

Expand All @@ -115,6 +110,7 @@ public ClusterInfo(StreamInput in) throws IOException {
Map<ShardRouting, String> routingMap = in.readMap(ShardRouting::new, StreamInput::readString);
Map<NodeAndPath, ReservedSpace> reservedSpaceMap;
reservedSpaceMap = in.readMap(NodeAndPath::new, ReservedSpace::new);

this.leastAvailableSpaceUsage = Collections.unmodifiableMap(leastMap);
this.mostAvailableSpaceUsage = Collections.unmodifiableMap(mostMap);
this.shardSizes = Collections.unmodifiableMap(sizeMap);
Expand All @@ -125,11 +121,6 @@ public ClusterInfo(StreamInput in) throws IOException {
} else {
this.nodeFileCacheStats = Map.of();
}
if (in.getVersion().onOrAfter(Version.V_2_17_0)) {
this.primaryStoreSize = in.readOptionalLong();
} else {
this.primaryStoreSize = 0L;
}

calculateAvgFreeAndTotalBytes(mostAvailableSpaceUsage);
}
Expand Down Expand Up @@ -175,9 +166,6 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_10_0)) {
out.writeMap(this.nodeFileCacheStats, StreamOutput::writeString, (o, v) -> v.writeTo(o));
}
if (out.getVersion().onOrAfter(Version.V_2_17_0)) {
out.writeOptionalLong(this.primaryStoreSize);
}
}

public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
Expand Down Expand Up @@ -232,7 +220,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
}
builder.endArray(); // end "reserved_sizes"
builder.field("primary_store_size", this.primaryStoreSize);
return builder;
}

Expand All @@ -259,13 +246,6 @@ public Map<String, FileCacheStats> getNodeFileCacheStats() {
return Collections.unmodifiableMap(this.nodeFileCacheStats);
}

/**
* Returns the total size in bytes for all the primary shards
*/
public long getPrimaryStoreSize() {
return primaryStoreSize;
}

/**
* Returns the shard size for the given shard routing or <code>null</code> it that metric is not available.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,7 @@ public ClusterInfo getClusterInfo() {
indicesStatsSummary.shardSizes,
indicesStatsSummary.shardRoutingToDataPath,
indicesStatsSummary.reservedSpace,
nodeFileCacheStats,
primaryStoreSize
nodeFileCacheStats
);
}

Expand Down
1 change: 0 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1186,7 +1186,6 @@ protected Node(
clusterModule.getIndexNameExpressionResolver(),
repositoryService,
transportService,
clusterInfoService,
actionModule.getActionFilters(),
remoteStorePinnedTimestampService
);
Expand Down
Loading

0 comments on commit 513732d

Please sign in to comment.