Skip to content

Commit

Permalink
remove inflight checks as they are not necessary
Browse files Browse the repository at this point in the history
Signed-off-by: Ruirui Zhang <[email protected]>
  • Loading branch information
ruai0511 committed Jul 18, 2024
1 parent b20a96c commit 6b15803
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
/**
* Transport action to create QueryGroup
*
* @opensearch.api
* @opensearch.experimental
*/
public class CreateQueryGroupAction extends ActionType<CreateQueryGroupResponse> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.XContentParseException;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.search.ResourceType;
import org.joda.time.Instant;
Expand All @@ -36,7 +37,7 @@
* }
* }
*
* @opensearch.internal
* @opensearch.experimental
*/
public class CreateQueryGroupRequest extends ActionRequest implements Writeable.Reader<CreateQueryGroupRequest> {
private String name;
Expand Down Expand Up @@ -113,7 +114,7 @@ public static CreateQueryGroupRequest fromXContent(XContentParser parser) throws
}

if (parser.currentToken() != XContentParser.Token.START_OBJECT) {
throw new IllegalArgumentException("expected start object but got a " + parser.currentToken());
throw new XContentParseException("expected start object but got a " + parser.currentToken());

Check warning on line 117 in plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/CreateQueryGroupRequest.java

View check run for this annotation

Codecov / codecov/patch

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/CreateQueryGroupRequest.java#L117

Added line #L117 was not covered by tests
}

XContentParser.Token token;
Expand All @@ -132,13 +133,11 @@ public static CreateQueryGroupRequest fromXContent(XContentParser parser) throws
} else if (fieldName.equals("resiliency_mode")) {
mode = ResiliencyMode.fromName(parser.text());

Check warning on line 134 in plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/CreateQueryGroupRequest.java

View check run for this annotation

Codecov / codecov/patch

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/CreateQueryGroupRequest.java#L134

Added line #L134 was not covered by tests
} else {
throw new IllegalArgumentException("unrecognised [field=" + fieldName + " in QueryGroup");
throw new XContentParseException("unrecognised [field=" + fieldName + " in QueryGroup");

Check warning on line 136 in plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/CreateQueryGroupRequest.java

View check run for this annotation

Codecov / codecov/patch

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/CreateQueryGroupRequest.java#L136

Added line #L136 was not covered by tests
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (!fieldName.equals("resourceLimits")) {
throw new IllegalArgumentException(
"QueryGroup.resourceLimits is an object and expected token was { " + " but found " + token
);
throw new XContentParseException("Invalid field passed. QueryGroup does not support " + fieldName + ".");

Check warning on line 140 in plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/CreateQueryGroupRequest.java

View check run for this annotation

Codecov / codecov/patch

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/CreateQueryGroupRequest.java#L140

Added line #L140 was not covered by tests
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
/**
* Response for the create API for QueryGroup
*
* @opensearch.internal
* @opensearch.experimental
*/
public class CreateQueryGroupResponse extends ActionResponse implements ToXContent, ToXContentObject {
private final QueryGroup queryGroup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
/**
* Transport action to create QueryGroup
*
* @opensearch.internal
* @opensearch.experimental
*/
public class TransportCreateQueryGroupAction extends HandledTransportAction<CreateQueryGroupRequest, CreateQueryGroupResponse> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
/**
* Rest action to create a QueryGroup
*
* @opensearch.api
* @opensearch.experimental
*/
public class RestCreateQueryGroupAction extends BaseRestHandler {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@
import org.opensearch.plugin.wlm.CreateQueryGroupResponse;
import org.opensearch.search.ResourceType;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.DoubleAdder;

import static org.opensearch.search.query_group.QueryGroupServiceSettings.MAX_QUERY_GROUP_COUNT;
import static org.opensearch.search.query_group.QueryGroupServiceSettings.QUERY_GROUP_COUNT_SETTING_NAME;
Expand All @@ -44,8 +41,6 @@ public class QueryGroupPersistenceService implements Persistable<QueryGroup> {
private static final String CREATE_QUERY_GROUP_THROTTLING_KEY = "create-query-group";
private static final String UPDATE_QUERY_GROUP_THROTTLING_KEY = "update-query-group";
private static final String DELETE_QUERY_GROUP_THROTTLING_KEY = "delete-query-group";
private final AtomicInteger inflightCreateQueryGroupRequestCount;
private final Map<String, DoubleAdder> inflightResourceLimitValues;
private volatile int maxQueryGroupCount;
final ThrottlingKey createQueryGroupThrottlingKey;
final ThrottlingKey updateQueryGroupThrottlingKey;
Expand All @@ -70,8 +65,6 @@ public QueryGroupPersistenceService(
this.updateQueryGroupThrottlingKey = clusterService.registerClusterManagerTask(UPDATE_QUERY_GROUP_THROTTLING_KEY, true);
maxQueryGroupCount = MAX_QUERY_GROUP_COUNT.get(settings);
clusterSettings.addSettingsUpdateConsumer(MAX_QUERY_GROUP_COUNT, this::setMaxQueryGroupCount);
inflightCreateQueryGroupRequestCount = new AtomicInteger();
inflightResourceLimitValues = new HashMap<>();
}

/**
Expand All @@ -95,7 +88,7 @@ public void persist(QueryGroup queryGroup, ActionListener<CreateQueryGroupRespon
* @param queryGroup {@link QueryGroup} - the QueryGroup we're currently creating
*/
void persistInClusterStateMetadata(QueryGroup queryGroup, ActionListener<CreateQueryGroupResponse> listener) {
clusterService.submitStateUpdateTask(SOURCE, new ClusterStateUpdateTask(Priority.URGENT) {
clusterService.submitStateUpdateTask(SOURCE, new ClusterStateUpdateTask(Priority.NORMAL) {

Check warning on line 91 in plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java

View check run for this annotation

Codecov / codecov/patch

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java#L91

Added line #L91 was not covered by tests
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return saveQueryGroupInClusterState(queryGroup, currentState);

Check warning on line 94 in plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java

View check run for this annotation

Codecov / codecov/patch

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java#L94

Added line #L94 was not covered by tests
Expand All @@ -108,36 +101,18 @@ public ThrottlingKey getClusterManagerThrottlingKey() {

@Override
public void onFailure(String source, Exception e) {
restoreInflightValues(queryGroup.getResourceLimits());
inflightCreateQueryGroupRequestCount.decrementAndGet();
logger.warn("failed to save QueryGroup object due to error: {}, for source: {}", e.getMessage(), source);
listener.onFailure(e);
}

Check warning on line 106 in plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java

View check run for this annotation

Codecov / codecov/patch

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java#L104-L106

Added lines #L104 - L106 were not covered by tests

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
restoreInflightValues(queryGroup.getResourceLimits());
inflightCreateQueryGroupRequestCount.decrementAndGet();
CreateQueryGroupResponse response = new CreateQueryGroupResponse(queryGroup, RestStatus.OK);
listener.onResponse(response);
}

Check warning on line 112 in plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java

View check run for this annotation

Codecov / codecov/patch

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java#L110-L112

Added lines #L110 - L112 were not covered by tests
});
}

Check warning on line 114 in plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java

View check run for this annotation

Codecov / codecov/patch

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java#L114

Added line #L114 was not covered by tests

/**
* Get the allocation value for resourceName for the QueryGroup
* @param resourceName - the resourceName we want to get the usage for
* @param resourceLimits - the resource limit from which to get the allocation value for resourceName
*/
private double getResourceLimitValue(String resourceName, final Map<ResourceType, Object> resourceLimits) {
for (ResourceType resourceType : resourceLimits.keySet()) {
if (resourceType.getName().equals(resourceName)) {
return (double) resourceLimits.get(resourceType);
}
}
return 0.0;
}

/**
* This method will be executed before we submit the new cluster state
* @param queryGroup - the QueryGroup we're currently creating
Expand All @@ -148,57 +123,49 @@ ClusterState saveQueryGroupInClusterState(final QueryGroup queryGroup, final Clu
String groupName = queryGroup.getName();
final Map<String, QueryGroup> previousGroups = metadata.queryGroups();

// check if there's any resource allocation that exceed limit of 1.0
String resourceNameWithThresholdExceeded = "";
for (ResourceType resourceType : queryGroup.getResourceLimits().keySet()) {
String resourceName = resourceType.getName();
double existingUsage = calculateExistingUsage(resourceName, previousGroups, groupName);
double newGroupUsage = getResourceLimitValue(resourceName, queryGroup.getResourceLimits());
inflightResourceLimitValues.computeIfAbsent(resourceName, k -> new DoubleAdder()).add(newGroupUsage);
double totalUsage = existingUsage + inflightResourceLimitValues.get(resourceName).doubleValue();
if (totalUsage > 1) {
resourceNameWithThresholdExceeded = resourceName;
}
// check if maxQueryGroupCount will breach
if (previousGroups.size() + 1 > maxQueryGroupCount) {
logger.error("{} value exceeded its assigned limit of {}", QUERY_GROUP_COUNT_SETTING_NAME, maxQueryGroupCount);
throw new RuntimeException("Can't create more than " + maxQueryGroupCount + " QueryGroups in the system");
}
// check if group count exceed max
boolean groupCountExceeded = inflightCreateQueryGroupRequestCount.incrementAndGet() + previousGroups.size() > maxQueryGroupCount;

// check for duplicate name
Optional<QueryGroup> findExistingGroup = previousGroups.values()
.stream()
.filter(group -> group.getName().equals(groupName))
.findFirst();

if (findExistingGroup.isPresent()) {
logger.warn("QueryGroup with name {} already exists. Not creating a new one.", groupName);
throw new RuntimeException("QueryGroup with name " + groupName + " already exists. Not creating a new one.");
}
if (!resourceNameWithThresholdExceeded.isEmpty()) {
logger.error("Total resource allocation for {} will go above the max limit of 1.0", resourceNameWithThresholdExceeded);
throw new RuntimeException(
"Total resource allocation for " + resourceNameWithThresholdExceeded + " will go above the max limit of 1.0"
);
}
if (groupCountExceeded) {
logger.error("{} value exceeded its assigned limit of {}", QUERY_GROUP_COUNT_SETTING_NAME, maxQueryGroupCount);
throw new RuntimeException("Can't create more than " + maxQueryGroupCount + " QueryGroups in the system");

// check if there's any resource allocation that exceed limit of 1.0
for (ResourceType resourceType : queryGroup.getResourceLimits().keySet()) {
String resourceName = resourceType.getName();
double existingUsage = calculateExistingUsage(resourceName, previousGroups, groupName);
double newGroupUsage = getResourceLimitValue(resourceName, queryGroup.getResourceLimits());
if (existingUsage + newGroupUsage > 1) {
logger.error("Total resource allocation for {} will go above the max limit of 1.0", resourceName);
throw new RuntimeException("Total resource allocation for " + resourceName + " will go above the max limit of 1.0");
}
}
Metadata newData = Metadata.builder(metadata).put(queryGroup).build();

Metadata newData = Metadata.builder(metadata).put(queryGroup).build();
return ClusterState.builder(currentClusterState).metadata(newData).build();
}

/**
* This method restores the inflight values to be before the QueryGroup is processed
* @param resourceLimits - the resourceLimits we're currently restoring
* Get the allocation value for resourceName from the resourceLimits of a QueryGroup
* @param resourceName - the resourceName we want to get the usage for
* @param resourceLimits - the resource limit from which to get the allocation value for resourceName
*/
void restoreInflightValues(Map<ResourceType, Object> resourceLimits) {
if (resourceLimits == null || resourceLimits.isEmpty()) {
return;
}
private double getResourceLimitValue(String resourceName, final Map<ResourceType, Object> resourceLimits) {
for (ResourceType resourceType : resourceLimits.keySet()) {
String currResourceName = resourceType.getName();
inflightResourceLimitValues.get(currResourceName).add(-getResourceLimitValue(currResourceName, resourceLimits));
if (resourceType.getName().equals(resourceName)) {
return (double) resourceLimits.get(resourceType);
}
}
return 0.0;

Check warning on line 168 in plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java

View check run for this annotation

Codecov / codecov/patch

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java#L167-L168

Added lines #L167 - L168 were not covered by tests
}

/**
Expand All @@ -218,20 +185,6 @@ private double calculateExistingUsage(String resourceName, Map<String, QueryGrou
return existingUsage;
}

/**
* inflightCreateQueryGroupRequestCount getter
*/
public AtomicInteger getInflightCreateQueryGroupRequestCount() {
return inflightCreateQueryGroupRequestCount;
}

/**
* inflightResourceLimitValues getter
*/
public Map<String, DoubleAdder> getInflightResourceLimitValues() {
return inflightResourceLimitValues;
}

/**
* clusterService getter
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.DoubleAdder;

import static org.opensearch.cluster.metadata.QueryGroup.builder;
import static org.opensearch.search.ResourceType.fromName;
Expand Down Expand Up @@ -133,14 +132,4 @@ public static void compareQueryGroups(List<QueryGroup> listOne, List<QueryGroup>
assertTrue(listOne.get(i).equals(listTwo.get(i)));
}
}

public static void assertInflightValuesAreZero(QueryGroupPersistenceService queryGroupPersistenceService) {
assertEquals(0, queryGroupPersistenceService.getInflightCreateQueryGroupRequestCount().get());
Map<String, DoubleAdder> inflightResourceMap = queryGroupPersistenceService.getInflightResourceLimitValues();
if (inflightResourceMap != null) {
for (String resourceName : inflightResourceMap.keySet()) {
assertEquals(0, inflightResourceMap.get(resourceName).intValue());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@
import static org.opensearch.plugin.wlm.QueryGroupTestUtils.NAME_ONE;
import static org.opensearch.plugin.wlm.QueryGroupTestUtils._ID_ONE;
import static org.opensearch.plugin.wlm.QueryGroupTestUtils._ID_TWO;
import static org.opensearch.plugin.wlm.QueryGroupTestUtils.assertInflightValuesAreZero;
import static org.opensearch.plugin.wlm.QueryGroupTestUtils.compareQueryGroups;
import static org.opensearch.plugin.wlm.QueryGroupTestUtils.preparePersistenceServiceSetup;
import static org.opensearch.plugin.wlm.QueryGroupTestUtils.queryGroupList;
import static org.opensearch.plugin.wlm.QueryGroupTestUtils.queryGroupOne;
import static org.opensearch.plugin.wlm.QueryGroupTestUtils.queryGroupPersistenceService;
import static org.opensearch.plugin.wlm.QueryGroupTestUtils.queryGroupTwo;
import static org.opensearch.search.query_group.QueryGroupServiceSettings.QUERY_GROUP_COUNT_SETTING_NAME;
import static org.mockito.Mockito.mock;
Expand All @@ -57,7 +55,6 @@ public void testCreateQueryGroup() {
listOne.add(queryGroupOne);
listTwo.add(updatedGroupsMap.get(_ID_ONE));
compareQueryGroups(listOne, listTwo);
assertInflightValuesAreZero(queryGroupPersistenceService());
}

public void testCreateAnotherQueryGroup() {
Expand All @@ -70,7 +67,6 @@ public void testCreateAnotherQueryGroup() {
assertTrue(updatedGroups.containsKey(_ID_TWO));
Collection<QueryGroup> values = updatedGroups.values();
compareQueryGroups(queryGroupList(), new ArrayList<>(values));
assertInflightValuesAreZero(queryGroupPersistenceService());
}

public void testCreateQueryGroupDuplicateName() {
Expand Down

0 comments on commit 6b15803

Please sign in to comment.