Skip to content

Commit

Permalink
remove persistable interface
Browse files Browse the repository at this point in the history
Signed-off-by: Ruirui Zhang <[email protected]>
  • Loading branch information
ruai0511 committed Jul 19, 2024
1 parent 6b15803 commit 71471ff
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.plugin.wlm.service.Persistable;
import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand All @@ -28,7 +28,7 @@
public class TransportCreateQueryGroupAction extends HandledTransportAction<CreateQueryGroupRequest, CreateQueryGroupResponse> {

private final ThreadPool threadPool;
private final Persistable<QueryGroup> queryGroupPersistenceService;
private final QueryGroupPersistenceService queryGroupPersistenceService;

/**
* Constructor for TransportCreateQueryGroupAction
Expand All @@ -37,15 +37,15 @@ public class TransportCreateQueryGroupAction extends HandledTransportAction<Crea
* @param transportService - a {@link TransportService} object
* @param actionFilters - a {@link ActionFilters} object
* @param threadPool - a {@link ThreadPool} object
* @param queryGroupPersistenceService - a {@link Persistable} object
* @param queryGroupPersistenceService - a {@link QueryGroupPersistenceService} object
*/
@Inject
public TransportCreateQueryGroupAction(
String actionName,
TransportService transportService,
ActionFilters actionFilters,
ThreadPool threadPool,
Persistable<QueryGroup> queryGroupPersistenceService
QueryGroupPersistenceService queryGroupPersistenceService
) {
super(CreateQueryGroupAction.NAME, transportService, actionFilters, CreateQueryGroupRequest::new);
this.threadPool = threadPool;
Expand All @@ -60,6 +60,7 @@ protected void doExecute(Task task, CreateQueryGroupRequest request, ActionListe
.resourceLimits(request.getResourceLimits())
.updatedAt(request.getUpdatedAtInMillis())
.build();
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> queryGroupPersistenceService.persist(queryGroup, listener));
threadPool.executor(ThreadPool.Names.GENERIC)
.execute(() -> queryGroupPersistenceService.persistInClusterStateMetadata(queryGroup, listener));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@

package org.opensearch.plugin.wlm;

import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.common.inject.AbstractModule;
import org.opensearch.common.inject.TypeLiteral;
import org.opensearch.plugin.wlm.service.Persistable;
import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService;

/**
* Guice Module to manage WorkloadManagement related objects
Expand All @@ -26,7 +22,7 @@ public WorkloadManagementPluginModule() {}

@Override
protected void configure() {
bind(new TypeLiteral<Persistable<QueryGroup>>() {
}).to(QueryGroupPersistenceService.class).asEagerSingleton();
// bind(new TypeLiteral<Persistable<QueryGroup>>() {
// }).to(QueryGroupPersistenceService.class).asEagerSingleton();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public String getName() {
*/
@Override
public List<Route> routes() {
return List.of(new Route(POST, "_wlm/_query_group/"), new Route(PUT, "_wlm/_query_group/"));
return List.of(new Route(POST, "_wlm/query_group/"), new Route(PUT, "_wlm/query_group/"));
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateApplier;
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.QueryGroup;
Expand All @@ -25,23 +27,28 @@
import org.opensearch.plugin.wlm.CreateQueryGroupResponse;
import org.opensearch.search.ResourceType;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;

import static org.opensearch.search.query_group.QueryGroupServiceSettings.MAX_QUERY_GROUP_COUNT;
import static org.opensearch.search.query_group.QueryGroupServiceSettings.QUERY_GROUP_COUNT_SETTING_NAME;

/**
* This class defines the functions for QueryGroup persistence
*/
public class QueryGroupPersistenceService implements Persistable<QueryGroup> {
public class QueryGroupPersistenceService implements ClusterStateApplier {
private static final Logger logger = LogManager.getLogger(QueryGroupPersistenceService.class);
private final ClusterService clusterService;
private final List<Consumer<ClusterState>> searchPipelineClusterStateListeners = new CopyOnWriteArrayList<>();
private static final String SOURCE = "query-group-persistence-service";
private static final String CREATE_QUERY_GROUP_THROTTLING_KEY = "create-query-group";
private static final String UPDATE_QUERY_GROUP_THROTTLING_KEY = "update-query-group";
private static final String DELETE_QUERY_GROUP_THROTTLING_KEY = "delete-query-group";
private volatile int maxQueryGroupCount;
private volatile ClusterState state;
final ThrottlingKey createQueryGroupThrottlingKey;
final ThrottlingKey updateQueryGroupThrottlingKey;
final ThrottlingKey deleteQueryGroupThrottlingKey;
Expand Down Expand Up @@ -78,16 +85,11 @@ public void setMaxQueryGroupCount(int newMaxQueryGroupCount) {
this.maxQueryGroupCount = newMaxQueryGroupCount;
}

@Override
public void persist(QueryGroup queryGroup, ActionListener<CreateQueryGroupResponse> listener) {
persistInClusterStateMetadata(queryGroup, listener);
}

/**
* Update cluster state to include the new QueryGroup
* @param queryGroup {@link QueryGroup} - the QueryGroup we're currently creating
*/
void persistInClusterStateMetadata(QueryGroup queryGroup, ActionListener<CreateQueryGroupResponse> listener) {
public void persistInClusterStateMetadata(QueryGroup queryGroup, ActionListener<CreateQueryGroupResponse> listener) {
clusterService.submitStateUpdateTask(SOURCE, new ClusterStateUpdateTask(Priority.NORMAL) {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Expand Down Expand Up @@ -185,10 +187,8 @@ private double calculateExistingUsage(String resourceName, Map<String, QueryGrou
return existingUsage;
}

/**
* clusterService getter
*/
public ClusterService getClusterService() {
return clusterService;
@Override
public void applyClusterState(ClusterChangedEvent event) {
state = event.state();
}
}

0 comments on commit 71471ff

Please sign in to comment.