Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
Signed-off-by: Ruirui Zhang <[email protected]>
  • Loading branch information
ruai0511 committed Oct 8, 2024
1 parent 7f52c52 commit 62fbd5e
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 446 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.common.io.stream.StreamInput;
<<<<<<< HEAD
<<<<<<< HEAD
import org.opensearch.threadpool.ThreadPool;
=======
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportRequest;
>>>>>>> b5cbfa4de9e (changelog)
=======
import org.opensearch.threadpool.ThreadPool;
>>>>>>> 3a7ac33beb6 (modify based on comments)
import org.opensearch.transport.TransportService;
import org.opensearch.wlm.QueryGroupService;
import org.opensearch.wlm.stats.QueryGroupStats;
Expand All @@ -37,32 +27,9 @@ >>>>>>> b5cbfa4de9e (changelog)
*
* @opensearch.experimental
*/
<<<<<<< HEAD:server/src/main/java/org/opensearch/action/admin/cluster/wlm/TransportQueryGroupStatsAction.java
public class TransportQueryGroupStatsAction extends TransportNodesAction<
QueryGroupStatsRequest,
QueryGroupStatsResponse,
<<<<<<< HEAD
<<<<<<< HEAD
QueryGroupStatsRequest,
QueryGroupStats> {
=======
public class TransportWlmStatsAction extends TransportNodesAction<WlmStatsRequest, WlmStatsResponse, WlmStatsRequest, QueryGroupStats> {
>>>>>>> bb4288b3eba (modify based on comments):server/src/main/java/org/opensearch/action/admin/cluster/wlm/TransportWlmStatsAction.java

final QueryGroupService queryGroupService;
=======
TransportQueryGroupStatsAction.NodeQueryGroupStatsRequest,
=======
QueryGroupStatsRequest,
>>>>>>> 3a7ac33beb6 (modify based on comments)
QueryGroupStats> {

<<<<<<< HEAD
QueryGroupService queryGroupService;
>>>>>>> b5cbfa4de9e (changelog)
=======
final QueryGroupService queryGroupService;
>>>>>>> fb30e9af3d4 (revise)

@Inject
public TransportWlmStatsAction(
Expand All @@ -78,21 +45,8 @@ public TransportWlmStatsAction(
clusterService,
transportService,
actionFilters,
<<<<<<< HEAD:server/src/main/java/org/opensearch/action/admin/cluster/wlm/TransportQueryGroupStatsAction.java
QueryGroupStatsRequest::new,
<<<<<<< HEAD
<<<<<<< HEAD
QueryGroupStatsRequest::new,
=======
NodeQueryGroupStatsRequest::new,
>>>>>>> b5cbfa4de9e (changelog)
=======
QueryGroupStatsRequest::new,
>>>>>>> 3a7ac33beb6 (modify based on comments)
=======
WlmStatsRequest::new,
WlmStatsRequest::new,
>>>>>>> bb4288b3eba (modify based on comments):server/src/main/java/org/opensearch/action/admin/cluster/wlm/TransportWlmStatsAction.java
ThreadPool.Names.MANAGEMENT,
QueryGroupStats.class
);
Expand All @@ -109,22 +63,8 @@ protected WlmStatsResponse newResponse(
}

@Override
<<<<<<< HEAD:server/src/main/java/org/opensearch/action/admin/cluster/wlm/TransportQueryGroupStatsAction.java
<<<<<<< HEAD
<<<<<<< HEAD
protected QueryGroupStatsRequest newNodeRequest(QueryGroupStatsRequest request) {
=======
protected WlmStatsRequest newNodeRequest(WlmStatsRequest request) {
>>>>>>> bb4288b3eba (modify based on comments):server/src/main/java/org/opensearch/action/admin/cluster/wlm/TransportWlmStatsAction.java
return request;
=======
protected NodeQueryGroupStatsRequest newNodeRequest(QueryGroupStatsRequest request) {
return new NodeQueryGroupStatsRequest(request);
>>>>>>> b5cbfa4de9e (changelog)
=======
protected QueryGroupStatsRequest newNodeRequest(QueryGroupStatsRequest request) {
return request;
>>>>>>> 3a7ac33beb6 (modify based on comments)
}

@Override
Expand All @@ -133,55 +73,7 @@ protected QueryGroupStats newNodeResponse(StreamInput in) throws IOException {
}

@Override
<<<<<<< HEAD:server/src/main/java/org/opensearch/action/admin/cluster/wlm/TransportQueryGroupStatsAction.java
<<<<<<< HEAD
<<<<<<< HEAD
protected QueryGroupStats nodeOperation(QueryGroupStatsRequest queryGroupStatsRequest) {
return queryGroupService.nodeStats(queryGroupStatsRequest.getQueryGroupIds(), queryGroupStatsRequest.isBreach());
=======
protected QueryGroupStats nodeOperation(NodeQueryGroupStatsRequest nodeQueryGroupStatsRequest) {
QueryGroupStatsRequest request = nodeQueryGroupStatsRequest.request;
return queryGroupService.nodeStats(request.getQueryGroupIds(), request.isBreach());
}

/**
* Inner QueryGroupStatsRequest
*
* @opensearch.experimental
*/
public static class NodeQueryGroupStatsRequest extends TransportRequest {

protected QueryGroupStatsRequest request;

public NodeQueryGroupStatsRequest(StreamInput in) throws IOException {
super(in);
request = new QueryGroupStatsRequest(in);
}

NodeQueryGroupStatsRequest(QueryGroupStatsRequest request) {
this.request = request;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
request.writeTo(out);
}
<<<<<<< HEAD
>>>>>>> b5cbfa4de9e (changelog)
=======

public DiscoveryNode[] getDiscoveryNodes() {
return this.request.concreteNodes();
}
>>>>>>> ffe0d7fa2cd (address comments)
=======
protected QueryGroupStats nodeOperation(QueryGroupStatsRequest queryGroupStatsRequest) {
return queryGroupService.nodeStats(queryGroupStatsRequest.getQueryGroupIds(), queryGroupStatsRequest.isBreach());
>>>>>>> 3a7ac33beb6 (modify based on comments)
=======
protected QueryGroupStats nodeOperation(WlmStatsRequest wlmStatsRequest) {
return queryGroupService.nodeStats(wlmStatsRequest.getQueryGroupIds(), wlmStatsRequest.isBreach());
>>>>>>> bb4288b3eba (modify based on comments):server/src/main/java/org/opensearch/action/admin/cluster/wlm/TransportWlmStatsAction.java
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,9 @@
*
* @opensearch.experimental
*/
<<<<<<< HEAD:server/src/main/java/org/opensearch/action/admin/cluster/wlm/QueryGroupStatsAction.java
public class QueryGroupStatsAction extends ActionType<QueryGroupStatsResponse> {
public static final QueryGroupStatsAction INSTANCE = new QueryGroupStatsAction();
public static final String NAME = "cluster:monitor/wlm/stats";
=======
public class WlmStatsAction extends ActionType<WlmStatsResponse> {
public static final WlmStatsAction INSTANCE = new WlmStatsAction();
<<<<<<< HEAD
public static final String NAME = "cluster:monitor/query_group_stats";
>>>>>>> bb4288b3eba (modify based on comments):server/src/main/java/org/opensearch/action/admin/cluster/wlm/WlmStatsAction.java
=======
public static final String NAME = "cluster:monitor/wlm/stats";
>>>>>>> fb30e9af3d4 (revise)

private WlmStatsAction() {
super(NAME, WlmStatsResponse::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,7 @@ public class WlmStatsRequest extends BaseNodesRequest<WlmStatsRequest> {
private final Set<String> queryGroupIds;
private final Boolean breach;

<<<<<<< HEAD:server/src/main/java/org/opensearch/action/admin/cluster/wlm/QueryGroupStatsRequest.java
<<<<<<< HEAD
<<<<<<< HEAD
public QueryGroupStatsRequest(StreamInput in) throws IOException {
=======
protected QueryGroupStatsRequest(StreamInput in) throws IOException {
>>>>>>> ffe0d7fa2cd (address comments)
=======
public QueryGroupStatsRequest(StreamInput in) throws IOException {
>>>>>>> 3a7ac33beb6 (modify based on comments)
=======
public WlmStatsRequest(StreamInput in) throws IOException {
>>>>>>> bb4288b3eba (modify based on comments):server/src/main/java/org/opensearch/action/admin/cluster/wlm/WlmStatsRequest.java
super(in);
this.queryGroupIds = new HashSet<>(Set.of(in.readStringArray()));
this.breach = in.readOptionalBoolean();
Expand Down
1 change: 0 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1501,7 +1501,6 @@ protected Node(
b.bind(SearchRequestStats.class).toInstance(searchRequestStats);
b.bind(SearchRequestSlowLog.class).toInstance(searchRequestSlowLog);
b.bind(MetricsRegistry.class).toInstance(metricsRegistry);
b.bind(QueryGroupService.class).toInstance(queryGroupService);
b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService);
b.bind(RemoteIndexPathUploader.class).toProvider(() -> remoteIndexPathUploader);
b.bind(RemoteStorePinnedTimestampService.class).toProvider(() -> remoteStorePinnedTimestampService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,10 @@ public class RestWlmStatsAction extends BaseRestHandler {
public List<Route> routes() {
return unmodifiableList(
asList(
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> ebf941fb3c8 (modify uri)
new Route(GET, "_wlm/stats"),
new Route(GET, "_wlm/{nodeId}/stats"),
new Route(GET, "_wlm/stats/{queryGroupId}"),
new Route(GET, "_wlm/{nodeId}/stats/{queryGroupId}")
<<<<<<< HEAD
=======
new Route(GET, "query_group/stats"),
new Route(GET, "query_group/stats/{queryGroupId}"),
new Route(GET, "query_group/stats/nodes/{nodeId}"),
new Route(GET, "query_group/stats/{queryGroupId}/nodes/{nodeId}")
>>>>>>> ffe0d7fa2cd (address comments)
=======
>>>>>>> ebf941fb3c8 (modify uri)
)
);
}
Expand Down
95 changes: 42 additions & 53 deletions server/src/main/java/org/opensearch/wlm/QueryGroupService.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.monitor.jvm.JvmStats;
Expand All @@ -26,15 +28,8 @@
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.wlm.cancellation.QueryGroupTaskCancellationService;
import org.opensearch.transport.TransportService;

import org.opensearch.ResourceNotFoundException;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.wlm.cancellation.QueryGroupTaskCancellationService;
import org.opensearch.wlm.stats.QueryGroupState;
import org.opensearch.wlm.stats.QueryGroupStats;
import org.opensearch.wlm.stats.QueryGroupStats.QueryGroupStatsHolder;
Expand All @@ -43,8 +38,6 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -220,32 +213,49 @@ public void incrementFailuresFor(final String queryGroupId) {
/**
* @return node level query group stats
*/
public QueryGroupStats nodeStats() {
public QueryGroupStats nodeStats(Set<String> queryGroupIds, Boolean requestedBreached) {
final Map<String, QueryGroupStatsHolder> statsHolderMap = new HashMap<>();
for (Map.Entry<String, QueryGroupState> queryGroupsState : queryGroupsStateAccessor.getQueryGroupStateMap().entrySet()) {
final String queryGroupId = queryGroupsState.getKey();
final QueryGroupState currentState = queryGroupsState.getValue();

statsHolderMap.put(queryGroupId, QueryGroupStatsHolder.from(currentState));
Map<String, QueryGroup> existingGroups = clusterService.state().metadata().queryGroups();
if (!queryGroupIds.contains("_all")) {
for (String id : queryGroupIds) {
if (!existingGroups.containsKey(id)) {
throw new ResourceNotFoundException("QueryGroup with id " + id + " does not exist");
}
}
}

return new QueryGroupStats(statsHolderMap);
queryGroupsStateAccessor.getQueryGroupStateMap().forEach((queryGroupId, currentState) -> {
boolean shouldInclude = (queryGroupIds.size() == 1 && queryGroupIds.contains("_all")) || queryGroupIds.contains(queryGroupId);
if (shouldInclude) {
boolean breached = resourceLimitBreached(existingGroups.get(queryGroupId), currentState).v1().length() != 0;
if (requestedBreached == null || requestedBreached == breached) {
statsHolderMap.put(queryGroupId, QueryGroupStatsHolder.from(currentState));
}
}
});
return new QueryGroupStats(transportService.getLocalNode(), statsHolderMap);
}

/**
* @return if the QueryGroup breaches any resource limit based on the LastRecordedUsage
*/
public boolean resourceLimitBreached(String id, QueryGroupState currentState) {
QueryGroup queryGroup = clusterService.state().metadata().queryGroups().get(id);

return currentState.getResourceState()
.entrySet()
.stream()
.anyMatch(
entry -> entry.getValue().getLastRecordedUsage() > queryGroup.getMutableQueryGroupFragment()
.getResourceLimits()
.getOrDefault(entry.getKey(), 100.0)
);
public Tuple<StringBuilder, ResourceType> resourceLimitBreached(QueryGroup queryGroup, QueryGroupState queryGroupState) {
StringBuilder reason = new StringBuilder();
for (ResourceType resourceType : TRACKED_RESOURCES) {
if (queryGroup.getResourceLimits().containsKey(resourceType)) {
final double threshold = getNormalisedRejectionThreshold(queryGroup.getResourceLimits().get(resourceType), resourceType);
final double lastRecordedUsage = queryGroupState.getResourceState().get(resourceType).getLastRecordedUsage();
if (threshold < lastRecordedUsage) {
reason.append(resourceType)
.append(" limit is breaching for ENFORCED type QueryGroup: (")
.append(threshold)
.append(" < ")
.append(lastRecordedUsage)
.append("). ");
return new Tuple<>(reason, resourceType);
}
}
}
return new Tuple<>(reason, null);
}

/**
Expand All @@ -272,30 +282,9 @@ public void rejectIfNeeded(String queryGroupId) {
return;

optionalQueryGroup.ifPresent(queryGroup -> {
boolean reject = false;
final StringBuilder reason = new StringBuilder();
for (ResourceType resourceType : TRACKED_RESOURCES) {
if (queryGroup.getResourceLimits().containsKey(resourceType)) {
final double threshold = getNormalisedRejectionThreshold(
queryGroup.getResourceLimits().get(resourceType),
resourceType
);
final double lastRecordedUsage = queryGroupState.getResourceState().get(resourceType).getLastRecordedUsage();
if (threshold < lastRecordedUsage) {
reject = true;
reason.append(resourceType)
.append(" limit is breaching for ENFORCED type QueryGroup: (")
.append(threshold)
.append(" < ")
.append(lastRecordedUsage)
.append("). ");
queryGroupState.getResourceState().get(resourceType).rejections.inc();
// should not double count even if both the resource limits are breaching
break;
}
}
}
if (reject) {
Tuple<StringBuilder, ResourceType> reason = resourceLimitBreached(queryGroup, queryGroupState);
if (reason.v1().length() != 0) {
queryGroupState.getResourceState().get(reason.v2()).rejections.inc();
queryGroupState.totalRejections.inc();
throw new OpenSearchRejectedExecutionException(
"QueryGroup " + queryGroupId + " is already contended. " + reason.toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@
public class QueryGroupStats extends BaseNodeResponse implements ToXContentObject, Writeable {
private final Map<String, QueryGroupStatsHolder> stats;

public Map<String, QueryGroupStatsHolder> getStats() {
return stats;
}

public QueryGroupStats(DiscoveryNode node, Map<String, QueryGroupStatsHolder> stats) {
super(node);
this.stats = stats;
Expand Down Expand Up @@ -91,6 +87,10 @@ public int hashCode() {
return Objects.hash(stats);
}

public Map<String, QueryGroupStatsHolder> getStats() {
return stats;
}

/**
* This is a stats holder object which will hold the data for a query group at a point in time
* the instance will only be created on demand through stats api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class WlmStatsResponseTests extends OpenSearchTestCase {
0,
1,
0,
0,
Map.of(
ResourceType.CPU,
new QueryGroupStats.ResourceStats(0, 0, 0),
Expand Down
Loading

0 comments on commit 62fbd5e

Please sign in to comment.