Skip to content

Commit

Permalink
Add comments and variable refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Pranshu Shukla <[email protected]>
  • Loading branch information
Pranshu-S committed Jul 22, 2024
1 parent b657072 commit 67749bd
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ public void testNodeIndicesStatsWithAggregationOnNodes() {
testLevels.forEach(testLevel -> {
NodesStatsResponse response;
CommonStatsFlags commonStatsFlags = new CommonStatsFlags();
commonStatsFlags.aggregateNodeResponsesOnLevel(true);
commonStatsFlags.setAggregateNodeIndicesStatsResponsesOnLevel(true);
if (!testLevel.equals(MockFields.NULL)) {
ArrayList<String> level_arg = new ArrayList<>();
level_arg.add(testLevel.getRestName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class CommonStatsFlags implements Writeable, Cloneable {
// Used for metric CACHE_STATS, to determine which caches to report stats for
private EnumSet<CacheType> includeCaches = EnumSet.noneOf(CacheType.class);
private String[] levels = new String[0];
private boolean aggregateNodeResponsesOnLevel = false;
private boolean aggregateNodeIndicesStatsResponsesOnLevel = false;

/**
* @param flags flags to set. If no flags are supplied, default flags will be set.
Expand Down Expand Up @@ -102,7 +102,7 @@ public CommonStatsFlags(StreamInput in) throws IOException {
levels = in.readStringArray();
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
aggregateNodeResponsesOnLevel = in.readBoolean();
aggregateNodeIndicesStatsResponsesOnLevel = in.readBoolean();
}
}

Expand All @@ -129,7 +129,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeStringArrayNullable(levels);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(aggregateNodeResponsesOnLevel);
out.writeBoolean(aggregateNodeIndicesStatsResponsesOnLevel);
}
}

Expand Down Expand Up @@ -269,12 +269,12 @@ public boolean includeSegmentFileSizes() {
return this.includeSegmentFileSizes;
}

public void aggregateNodeResponsesOnLevel(boolean aggregateNodeResponsesOnLevel) {
this.aggregateNodeResponsesOnLevel = aggregateNodeResponsesOnLevel;
public void setAggregateNodeIndicesStatsResponsesOnLevel(boolean aggregateNodeIndicesStatsResponsesOnLevel) {
this.aggregateNodeIndicesStatsResponsesOnLevel = aggregateNodeIndicesStatsResponsesOnLevel;
}

public boolean aggregateNodeResponsesOnLevel() {
return this.aggregateNodeResponsesOnLevel;
public boolean getAggregateNodeIndicesStatsResponsesOnLevel() {
return this.aggregateNodeIndicesStatsResponsesOnLevel;
}

public boolean isSet(Flag flag) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ public NodeIndicesStats stats(CommonStatsFlags flags) {
break;
}
}
if (flags.aggregateNodeResponsesOnLevel()) {
if (flags.getAggregateNodeIndicesStatsResponsesOnLevel()) {
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats, flags.getLevels());
}
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ public NodeIndicesStats(StreamInput in) throws IOException {
}
}

/**
* Without passing the information of the levels to the constructor, we return the Node-level aggregated stats as
* {@link CommonStats} along with a hash-map containing Index to List of Shard Stats.
*/
public NodeIndicesStats(CommonStats oldStats, Map<Index, List<IndexShardStats>> statsByShard, SearchRequestStats searchRequestStats) {
// this.stats = stats;
this.statsByShard = statsByShard;
Expand All @@ -115,6 +119,12 @@ public NodeIndicesStats(CommonStats oldStats, Map<Index, List<IndexShardStats>>
}
}

/**
* Passing the level information to the nodes allows us to aggregate the stats based on the level passed. This
* allows us to aggregate based on NodeLevel (default - if no level is passed) or Index level if `indices` level is
* passed and finally return the statsByShards map if `shards` level is passed. This allows us to reduce ser/de of
* stats and return only the information that is required while returning to the client.
*/
public NodeIndicesStats(
CommonStats oldStats,
Map<Index, List<IndexShardStats>> statsByShard,
Expand Down Expand Up @@ -148,7 +158,13 @@ public NodeIndicesStats(
}
}

public static Fields getFirstAcceptedLevel(String[] levels) {
/**
* By default, the levels passed from the transport action will be a list of strings, since NodeIndicesStats can
* only aggregate on one level, we pick the first accepted level else we ignore if no known level is passed.
* @param levels - levels sent in the request.
* @return Corresponding identified enum {@link Fields}
*/
private static Fields getFirstAcceptedLevel(String[] levels) {
if (levels != null) {
Optional<Fields> level = Arrays.stream(levels)
.flatMap(passedLevel -> Arrays.stream(Fields.values()).filter(field -> field.getRestName().equals(passedLevel)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
// If no levels are passed in this results in an empty array.
String[] levels = Strings.splitStringByCommaToArray(request.param("level"));
nodesStatsRequest.indices().setLevels(levels);
nodesStatsRequest.indices().aggregateNodeResponsesOnLevel(true);
nodesStatsRequest.indices().setAggregateNodeIndicesStatsResponsesOnLevel(true);

return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void processResponse(final NodesInfoResponse nodesInfoResponse) {
NodesStatsRequest.Metric.PROCESS.metricName(),
NodesStatsRequest.Metric.SCRIPT.metricName()
);
nodesStatsRequest.indices().aggregateNodeResponsesOnLevel(true);
nodesStatsRequest.indices().setAggregateNodeIndicesStatsResponsesOnLevel(true);
client.admin().cluster().nodesStats(nodesStatsRequest, new RestResponseListener<NodesStatsResponse>(channel) {
@Override
public RestResponse buildResponse(NodesStatsResponse nodesStatsResponse) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1180,7 +1180,7 @@ public void testNodeIndicesStatsSerialization() throws IOException {
commonStatsFlags.set(CommonStatsFlags.Flag.Docs, true);
commonStatsFlags.set(CommonStatsFlags.Flag.Store, true);
commonStatsFlags.set(CommonStatsFlags.Flag.Indexing, true);
commonStatsFlags.aggregateNodeResponsesOnLevel(true);
commonStatsFlags.setAggregateNodeIndicesStatsResponsesOnLevel(true);

levelParams.forEach(levelParam -> {
ArrayList<String> level_arg = new ArrayList<>();
Expand Down Expand Up @@ -1236,7 +1236,7 @@ public void testNodeIndicesStatsToXContent() {
commonStatsFlags.set(CommonStatsFlags.Flag.Docs, true);
commonStatsFlags.set(CommonStatsFlags.Flag.Store, true);
commonStatsFlags.set(CommonStatsFlags.Flag.Indexing, true);
commonStatsFlags.aggregateNodeResponsesOnLevel(true);
commonStatsFlags.setAggregateNodeIndicesStatsResponsesOnLevel(true);

levelParams.forEach(levelParam -> {
ArrayList<String> level_arg = new ArrayList<>();
Expand Down Expand Up @@ -1325,7 +1325,7 @@ public MockNodeIndicesStats generateMockNodeIndicesStats(CommonStats commonStats

ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);

if (commonStatsFlags.aggregateNodeResponsesOnLevel()) {
if (commonStatsFlags.getAggregateNodeIndicesStatsResponsesOnLevel()) {
return new MockNodeIndicesStats(
new CommonStats(commonStatsFlags),
statsByShard,
Expand Down

0 comments on commit 67749bd

Please sign in to comment.