Skip to content

Commit

Permalink
remove persistable interface
Browse files Browse the repository at this point in the history
Signed-off-by: Ruirui Zhang <[email protected]>
  • Loading branch information
ruai0511 committed Jul 22, 2024
1 parent 6b15803 commit c5bc1a7
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.plugin.wlm.service.Persistable;
import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand All @@ -28,7 +28,7 @@
public class TransportCreateQueryGroupAction extends HandledTransportAction<CreateQueryGroupRequest, CreateQueryGroupResponse> {

private final ThreadPool threadPool;
private final Persistable<QueryGroup> queryGroupPersistenceService;
private final QueryGroupPersistenceService queryGroupPersistenceService;

/**
* Constructor for TransportCreateQueryGroupAction
Expand All @@ -37,15 +37,15 @@ public class TransportCreateQueryGroupAction extends HandledTransportAction<Crea
* @param transportService - a {@link TransportService} object
* @param actionFilters - a {@link ActionFilters} object
* @param threadPool - a {@link ThreadPool} object
* @param queryGroupPersistenceService - a {@link Persistable} object
* @param queryGroupPersistenceService - a {@link QueryGroupPersistenceService} object
*/
@Inject
public TransportCreateQueryGroupAction(
String actionName,
TransportService transportService,
ActionFilters actionFilters,
ThreadPool threadPool,
Persistable<QueryGroup> queryGroupPersistenceService
QueryGroupPersistenceService queryGroupPersistenceService
) {
super(CreateQueryGroupAction.NAME, transportService, actionFilters, CreateQueryGroupRequest::new);
this.threadPool = threadPool;
Expand All @@ -60,6 +60,7 @@ protected void doExecute(Task task, CreateQueryGroupRequest request, ActionListe
.resourceLimits(request.getResourceLimits())
.updatedAt(request.getUpdatedAtInMillis())
.build();
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> queryGroupPersistenceService.persist(queryGroup, listener));
threadPool.executor(ThreadPool.Names.GENERIC)
.execute(() -> queryGroupPersistenceService.persistInClusterStateMetadata(queryGroup, listener));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@

package org.opensearch.plugin.wlm;

import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.common.inject.AbstractModule;
import org.opensearch.common.inject.TypeLiteral;
import org.opensearch.plugin.wlm.service.Persistable;
import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService;

/**
* Guice Module to manage WorkloadManagement related objects
Expand All @@ -25,8 +21,5 @@ public class WorkloadManagementPluginModule extends AbstractModule {
public WorkloadManagementPluginModule() {}

@Override
protected void configure() {
bind(new TypeLiteral<Persistable<QueryGroup>>() {
}).to(QueryGroupPersistenceService.class).asEagerSingleton();
}
protected void configure() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public String getName() {
*/
@Override
public List<Route> routes() {
return List.of(new Route(POST, "_wlm/_query_group/"), new Route(PUT, "_wlm/_query_group/"));
return List.of(new Route(POST, "_wlm/query_group/"), new Route(PUT, "_wlm/query_group/"));
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
/**
* This class defines the functions for QueryGroup persistence
*/
public class QueryGroupPersistenceService implements Persistable<QueryGroup> {
public class QueryGroupPersistenceService {
private static final Logger logger = LogManager.getLogger(QueryGroupPersistenceService.class);
private final ClusterService clusterService;
private static final String SOURCE = "query-group-persistence-service";
Expand Down Expand Up @@ -78,16 +78,12 @@ public void setMaxQueryGroupCount(int newMaxQueryGroupCount) {
this.maxQueryGroupCount = newMaxQueryGroupCount;
}

@Override
public void persist(QueryGroup queryGroup, ActionListener<CreateQueryGroupResponse> listener) {
persistInClusterStateMetadata(queryGroup, listener);
}

/**
* Update cluster state to include the new QueryGroup
* @param queryGroup {@link QueryGroup} - the QueryGroup we're currently creating
* @param listener - ActionListener for CreateQueryGroupResponse
*/
void persistInClusterStateMetadata(QueryGroup queryGroup, ActionListener<CreateQueryGroupResponse> listener) {
public void persistInClusterStateMetadata(QueryGroup queryGroup, ActionListener<CreateQueryGroupResponse> listener) {
clusterService.submitStateUpdateTask(SOURCE, new ClusterStateUpdateTask(Priority.NORMAL) {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Expand Down Expand Up @@ -184,11 +180,4 @@ private double calculateExistingUsage(String resourceName, Map<String, QueryGrou
}
return existingUsage;
}

/**
* clusterService getter
*/
public ClusterService getClusterService() {
return clusterService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -765,8 +765,10 @@ public void apply(Settings value, Settings current, Settings previous) {

// QueryGroup settings
QueryGroupServiceSettings.MAX_QUERY_GROUP_COUNT,
QueryGroupServiceSettings.NODE_LEVEL_REJECTION_THRESHOLD,
QueryGroupServiceSettings.NODE_LEVEL_CANCELLATION_THRESHOLD
QueryGroupServiceSettings.NODE_LEVEL_CPU_REJECTION_THRESHOLD,
QueryGroupServiceSettings.NODE_LEVEL_CPU_CANCELLATION_THRESHOLD,
QueryGroupServiceSettings.NODE_LEVEL_MEMORY_REJECTION_THRESHOLD,
QueryGroupServiceSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD
)
)
);
Expand Down
Loading

0 comments on commit c5bc1a7

Please sign in to comment.