Skip to content

Commit

Permalink
Basic pattern for decoupled views in metadata vs transport requests
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Nied <[email protected]>
  • Loading branch information
peternied committed Feb 1, 2024
1 parent 8e41fbe commit cf22a8d
Show file tree
Hide file tree
Showing 7 changed files with 273 additions and 11 deletions.
4 changes: 4 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@
import org.opensearch.action.admin.indices.upgrade.post.UpgradeSettingsAction;
import org.opensearch.action.admin.indices.validate.query.TransportValidateQueryAction;
import org.opensearch.action.admin.indices.validate.query.ValidateQueryAction;
import org.opensearch.action.admin.indices.view.CreateViewAction;
import org.opensearch.action.bulk.BulkAction;
import org.opensearch.action.bulk.TransportBulkAction;
import org.opensearch.action.bulk.TransportShardBulkAction;
Expand Down Expand Up @@ -721,6 +722,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(ResolveIndexAction.INSTANCE, ResolveIndexAction.TransportAction.class);
actions.register(DataStreamsStatsAction.INSTANCE, DataStreamsStatsAction.TransportAction.class);

// Views:
actions.register(CreateViewAction.INSTANCE, CreateViewAction.TransportAction.class);

// Persistent tasks:
actions.register(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class);
actions.register(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package org.opensearch.action.admin.indices.view;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.ActionType;
import org.opensearch.action.ValidateActions;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.ViewService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

/** Action to create a view */
public class CreateViewAction extends ActionType<CreateViewAction.Response> {

public static final CreateViewAction INSTANCE = new CreateViewAction();
public static final String NAME = "cluster:views:create";

private CreateViewAction() {
super(NAME, CreateViewAction.Response::new);
}


/** View target representation for create requests */
public static class ViewTarget implements Writeable {
public final String indexPattern;

public ViewTarget(final String indexPattern) {
this.indexPattern = indexPattern;
}

public ViewTarget(final StreamInput in) throws IOException {
this.indexPattern = in.readString();
}

public String getIndexPattern() {
return indexPattern;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(indexPattern);
}

public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;

if (Strings.isNullOrEmpty(indexPattern)) {
validationException = ValidateActions.addValidationError("index pattern cannot be empty or null", validationException);
}

return validationException;
}

}

/**
* Request for Creating View
*/
public static class Request extends ClusterManagerNodeRequest<Request> {
private final String name;
private final String description;
private final List<ViewTarget> targets;

public Request(final String name, final String description, final List<ViewTarget> targets) {
this.name = name;
this.description = description;
this.targets = targets;
}

public String getName() {
return name;
}

public String getDescription() {
return description;
}

public List<ViewTarget> getTargets() {
return new ArrayList<>(targets);
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (Strings.isNullOrEmpty(name)) {
validationException = ValidateActions.addValidationError("Name is cannot be empty or null", validationException);
}
if (targets.isEmpty()) {
validationException = ValidateActions.addValidationError("targets cannot be empty", validationException);
}

for (final ViewTarget target : targets) {
validationException = target.validate();
}

return validationException;
}

public Request(final StreamInput in) throws IOException {
super(in);
this.name = in.readString();
this.description = in.readString();
this.targets = in.readList(ViewTarget::new);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
out.writeString(description);
out.writeList(targets);
}
}

/** Response after view is created */
public static class Response extends ActionResponse {

private final org.opensearch.cluster.metadata.View createdView;

public Response(final org.opensearch.cluster.metadata.View createdView) {
this.createdView = createdView;
}

public Response(final StreamInput in) throws IOException {
super(in);
this.createdView = new org.opensearch.cluster.metadata.View(in);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
this.createdView.writeTo(out);
}
}

/**
* Transport Action for creating a View
*/
public static class TransportAction extends TransportClusterManagerNodeAction<Request, Response> {

private final ViewService viewService;

@Inject
public TransportAction(
final TransportService transportService,
final ClusterService clusterService,
final ThreadPool threadPool,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver,
final ViewService viewService
) {
super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver);
this.viewService = viewService;
}

@Override
protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected Response read(StreamInput in) throws IOException {
return new Response(in);
}

@Override
protected void clusterManagerOperation(Request request, ClusterState state, ActionListener<Response> listener)
throws Exception {
viewService.createView(request, listener);
}

@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/** View transport handlers. */
package org.opensearch.action.admin.indices.view;
11 changes: 6 additions & 5 deletions server/src/main/java/org/opensearch/cluster/metadata/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Objects;

/** TODO */
@ExperimentalApi
Expand All @@ -34,11 +35,11 @@ public class View extends AbstractDiffable<View> implements ToXContentObject {
public final List<Target> targets;

public View(final String name, final String description, final Long createdAt, final Long modifiedAt, final List<Target> targets) {
this.name = name;
this.name = Objects.requireNonNull(name, "Name must be provided");
this.description = description;
this.createdAt = createdAt != null ? createdAt : -1;
this.modifiedAt = modifiedAt != null ? modifiedAt : -1;
this.targets = targets;
this.targets = Objects.requireNonNull(targets, "Targets are required on a view");
}

public View(final StreamInput in) throws IOException {
Expand All @@ -55,11 +56,11 @@ public static class Target implements Writeable, ToXContentObject {

public final String indexPattern;

Target(final String indexPattern) {
this.indexPattern = indexPattern;
public Target(final String indexPattern) {
this.indexPattern = Objects.requireNonNull(indexPattern, "IndexPattern is required");
}

private Target(final StreamInput in) throws IOException {
public Target(final StreamInput in) throws IOException {
this(in.readString());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.opensearch.cluster.metadata;

import java.util.List;
import java.util.stream.Collectors;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.view.CreateViewAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.action.ActionListener;

/** Service to interact with views, create, retrieve, update, and delete */
public class ViewService {

private final static Logger LOG = LogManager.getLogger(ViewService.class);
private final ClusterService clusterService;

public ViewService(final ClusterService clusterService) {
this.clusterService = clusterService;
}

public void createView(final CreateViewAction.Request request, final ActionListener<CreateViewAction.Response> listener) {
final long currentTime = System.currentTimeMillis();

final List<View.Target> targets = request.getTargets()
.stream()
.map(target -> new View.Target(target.getIndexPattern()))
.collect(Collectors.toList());
final View view = new View(request.getName(), request.getDescription(), currentTime, currentTime, targets);

clusterService.submitStateUpdateTask("create_view_task", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(final ClusterState currentState) throws Exception {
return new ClusterState.Builder(clusterService.state()).metadata(Metadata.builder(currentState.metadata()).put(view))
.build();
}

@Override
public void onFailure(final String source, final Exception e) {
LOG.error("Unable to create view, due to {}", source, e);
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) {
final View createdView = newState.getMetadata().views().get(request.getName());
final CreateViewAction.Response response = new CreateViewAction.Response(createdView);
listener.onResponse(response);
}
});
}
}
9 changes: 6 additions & 3 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.opensearch.cluster.metadata.MetadataIndexUpgradeService;
import org.opensearch.cluster.metadata.SystemIndexMetadataUpgradeService;
import org.opensearch.cluster.metadata.TemplateUpgradeService;
import org.opensearch.cluster.metadata.ViewService;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.routing.BatchedRerouteService;
Expand Down Expand Up @@ -862,6 +863,10 @@ protected Node(
metadataCreateIndexService
);

final ViewService viewService = new ViewService(
clusterService
);

Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class)
.stream()
.flatMap(
Expand Down Expand Up @@ -898,9 +903,6 @@ protected Node(
);
modules.add(actionModule);

actionModule.getRestController().registerHandler(new RestViewAction(clusterService));
actionModule.getRestController().registerHandler(new RestViewSearchAction(clusterService));

final RestController restController = actionModule.getRestController();

final NodeResourceUsageTracker nodeResourceUsageTracker = new NodeResourceUsageTracker(
Expand Down Expand Up @@ -1222,6 +1224,7 @@ protected Node(
b.bind(MetadataCreateIndexService.class).toInstance(metadataCreateIndexService);
b.bind(AwarenessReplicaBalance.class).toInstance(awarenessReplicaBalance);
b.bind(MetadataCreateDataStreamService.class).toInstance(metadataCreateDataStreamService);
b.bind(ViewService.class).toInstance(viewService);
b.bind(SearchService.class).toInstance(searchService);
b.bind(SearchTransportService.class).toInstance(searchTransportService);
b.bind(SearchPhaseController.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public void testNullTargetIndexPattern() {
assertThat(npe.getMessage(), equalTo("IndexPattern is required"));
}


public void testDefaultValues() {
final View view = new View("myName", null, null, null, List.of());

Expand All @@ -92,6 +91,4 @@ public void testDefaultValues() {
assertThat(view.modifiedAt, equalTo(-1L));
assertThat(view.targets, empty());
}


}

0 comments on commit cf22a8d

Please sign in to comment.