Skip to content

Commit

Permalink
Merge branch 'main' into split-response-processor
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis authored Jul 22, 2024
2 parents f20efd4 + 0040f4b commit 65127cf
Show file tree
Hide file tree
Showing 30 changed files with 1,297 additions and 685 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))
- Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790)))
- Add SplitResponseProcessor to Search Pipelines (([#14800](https://github.com/opensearch-project/OpenSearch/issues/14800)))
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))

### Dependencies
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ protected NodeInfo nodeOperation(NodeInfoRequest nodeRequest) {
*/
public static class NodeInfoRequest extends TransportRequest {

NodesInfoRequest request;
protected NodesInfoRequest request;

public NodeInfoRequest(StreamInput in) throws IOException {
super(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
*/
public static class NodeStatsRequest extends TransportRequest {

NodesStatsRequest request;
protected NodesStatsRequest request;

public NodeStatsRequest(StreamInput in) throws IOException {
super(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
*/
public static class ClusterStatsNodeRequest extends TransportRequest {

ClusterStatsRequest request;
protected ClusterStatsRequest request;

public ClusterStatsNodeRequest(StreamInput in) throws IOException {
super(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ public abstract class BaseNodesRequest<Request extends BaseNodesRequest<Request>
* will be ignored and this will be used.
* */
private DiscoveryNode[] concreteNodes;

/**
* Since do not use the discovery nodes coming from the request in all code paths following a request extended off from
* BaseNodeRequest, we do not require it to sent around across all nodes.
*
* Setting default behavior as `true` but can be explicitly changed in requests that do not require.
*/
private boolean includeDiscoveryNodes = true;
private final TimeValue DEFAULT_TIMEOUT_SECS = TimeValue.timeValueSeconds(30);

private TimeValue timeout;
Expand Down Expand Up @@ -119,6 +127,14 @@ public void setConcreteNodes(DiscoveryNode[] concreteNodes) {
this.concreteNodes = concreteNodes;
}

public void setIncludeDiscoveryNodes(boolean value) {
includeDiscoveryNodes = value;
}

public boolean getIncludeDiscoveryNodes() {
return includeDiscoveryNodes;
}

@Override
public ActionRequestValidationException validate() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ class AsyncAction {
private final NodesRequest request;
private final ActionListener<NodesResponse> listener;
private final AtomicReferenceArray<Object> responses;
private final DiscoveryNode[] concreteNodes;
private final AtomicInteger counter = new AtomicInteger();
private final Task task;

Expand All @@ -238,10 +239,18 @@ class AsyncAction {
assert request.concreteNodes() != null;
}
this.responses = new AtomicReferenceArray<>(request.concreteNodes().length);
this.concreteNodes = request.concreteNodes();

if (request.getIncludeDiscoveryNodes() == false) {
// As we transfer the ownership of discovery nodes to route the request to into the AsyncAction class, we
// remove the list of DiscoveryNodes from the request. This reduces the payload of the request and improves
// the number of concrete nodes in the memory.
request.setConcreteNodes(null);
}
}

void start() {
final DiscoveryNode[] nodes = request.concreteNodes();
final DiscoveryNode[] nodes = this.concreteNodes;
if (nodes.length == 0) {
// nothing to notify
threadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses)));
Expand All @@ -260,7 +269,6 @@ void start() {
if (task != null) {
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
}

transportService.sendRequest(
node,
getTransportNodeAction(node),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.remote.RemoteWritableEntityStore;
Expand Down Expand Up @@ -102,16 +101,16 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
}

/**
* Create async action for writing one {@code IndexRoutingTable} to remote store
* Async action for writing one {@code IndexRoutingTable} to remote store
*
* @param term current term
* @param version current version
* @param clusterUUID current cluster UUID
* @param indexRouting indexRoutingTable to write to remote store
* @param latchedActionListener listener for handling async action response
* @return returns runnable async action
*/
@Override
public CheckedRunnable<IOException> getAsyncIndexRoutingWriteAction(
public void getAsyncIndexRoutingWriteAction(
String clusterUUID,
long term,
long version,
Expand All @@ -128,7 +127,7 @@ public CheckedRunnable<IOException> getAsyncIndexRoutingWriteAction(
)
);

return () -> remoteIndexRoutingTableStore.writeAsync(remoteIndexRoutingTable, completionListener);
remoteIndexRoutingTableStore.writeAsync(remoteIndexRoutingTable, completionListener);
}

/**
Expand Down Expand Up @@ -156,7 +155,7 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
}

@Override
public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
public void getAsyncIndexRoutingReadAction(
String clusterUUID,
String uploadedFilename,
LatchedActionListener<IndexRoutingTable> latchedActionListener
Expand All @@ -169,7 +168,7 @@ public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(

RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor);

return () -> remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener);
remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

Expand All @@ -39,15 +38,14 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
}

@Override
public CheckedRunnable<IOException> getAsyncIndexRoutingWriteAction(
public void getAsyncIndexRoutingWriteAction(
String clusterUUID,
long term,
long version,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {
// noop
return () -> {};
}

@Override
Expand All @@ -61,13 +59,12 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
}

@Override
public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
public void getAsyncIndexRoutingReadAction(
String clusterUUID,
String uploadedFilename,
LatchedActionListener<IndexRoutingTable> latchedActionListener
) {
// noop
return () -> {};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.lifecycle.LifecycleComponent;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -43,7 +42,7 @@ public IndexRoutingTable read(StreamInput in, String key) throws IOException {

List<IndexRoutingTable> getIndicesRouting(RoutingTable routingTable);

CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
void getAsyncIndexRoutingReadAction(
String clusterUUID,
String uploadedFilename,
LatchedActionListener<IndexRoutingTable> latchedActionListener
Expand All @@ -59,7 +58,7 @@ DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>>
RoutingTable after
);

CheckedRunnable<IOException> getAsyncIndexRoutingWriteAction(
void getAsyncIndexRoutingWriteAction(
String clusterUUID,
long term,
long version,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.remote;

import org.opensearch.core.action.ActionListener;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.model.RemoteReadResult;

import java.util.HashMap;
import java.util.Map;

/**
* An abstract class that provides a base implementation for managing remote entities in the remote store.
*/
public abstract class AbstractRemoteWritableEntityManager implements RemoteWritableEntityManager {
/**
* A map that stores the remote writable entity stores, keyed by the entity type.
*/
protected final Map<String, RemoteWritableEntityStore> remoteWritableEntityStores = new HashMap<>();

/**
* Retrieves the remote writable entity store for the given entity.
*
* @param entity the entity for which the store is requested
* @return the remote writable entity store for the given entity
* @throws IllegalArgumentException if the entity type is unknown
*/
protected RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) {
RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType());
if (remoteStore == null) {
throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]");
}
return remoteStore;
}

/**
* Returns an ActionListener for handling the write operation for the specified component, remote object, and latched action listener.
*
* @param component the component for which the write operation is performed
* @param remoteEntity the remote object to be written
* @param listener the listener to be notified when the write operation completes
* @return an ActionListener for handling the write operation
*/
protected abstract ActionListener<Void> getWrappedWriteListener(
String component,
AbstractRemoteWritableBlobEntity remoteEntity,
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
);

/**
* Returns an ActionListener for handling the read operation for the specified component,
* remote object, and latched action listener.
*
* @param component the component for which the read operation is performed
* @param remoteEntity the remote object to be read
* @param listener the listener to be notified when the read operation completes
* @return an ActionListener for handling the read operation
*/
protected abstract ActionListener<Object> getWrappedReadListener(
String component,
AbstractRemoteWritableBlobEntity remoteEntity,
ActionListener<RemoteReadResult> listener
);

@Override
public void writeAsync(
String component,
AbstractRemoteWritableBlobEntity entity,
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
) {
getStore(entity).writeAsync(entity, getWrappedWriteListener(component, entity, listener));
}

@Override
public void readAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<RemoteReadResult> listener) {
getStore(entity).readAsync(entity, getWrappedReadListener(component, entity, listener));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.remote;

import org.opensearch.core.action.ActionListener;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.gateway.remote.model.RemoteReadResult;

/**
* The RemoteWritableEntityManager interface provides async read and write methods for managing remote entities in the remote store
*/
public interface RemoteWritableEntityManager {

/**
* Performs an asynchronous read operation for the specified component and entity.
*
* @param component the component for which the read operation is performed
* @param entity the entity to be read
* @param listener the listener to be notified when the read operation completes.
* The listener's {@link ActionListener#onResponse(Object)} method
* is called with a {@link RemoteReadResult} object containing the
* read data on successful read. The
* {@link ActionListener#onFailure(Exception)} method is called with
* an exception if the read operation fails.
*/
void readAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<RemoteReadResult> listener);

/**
* Performs an asynchronous write operation for the specified component and entity.
*
* @param component the component for which the write operation is performed
* @param entity the entity to be written
* @param listener the listener to be notified when the write operation completes.
* The listener's {@link ActionListener#onResponse(Object)} method
* is called with a {@link UploadedMetadata} object containing the
* uploaded metadata on successful write. The
* {@link ActionListener#onFailure(Exception)} method is called with
* an exception if the write operation fails.
*/
void writeAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<UploadedMetadata> listener);
}
Loading

0 comments on commit 65127cf

Please sign in to comment.