Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Create interface RemoteEntitiesManager #14854

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
Expand Down Expand Up @@ -150,14 +149,14 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
}

/**
* Create async action for writing one {@code IndexRoutingTable} to remote store
* Async action for writing one {@code IndexRoutingTable} to remote store
* @param clusterState current cluster state
* @param indexRouting indexRoutingTable to write to remote store
* @param latchedActionListener listener for handling async action response
* @param clusterBasePath base path for remote file
* @return returns runnable async action
*/
public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
@Override
public void getIndexRoutingAsyncAction(
ClusterState clusterState,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
Expand Down Expand Up @@ -187,7 +186,7 @@ public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
)
);

return () -> uploadIndex(indexRouting, fileName, blobContainer, completionListener);
uploadIndex(indexRouting, fileName, blobContainer, completionListener);
}

/**
Expand Down Expand Up @@ -274,7 +273,7 @@ private void uploadIndex(
}

@Override
public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
public void getAsyncIndexRoutingReadAction(
String uploadedFilename,
Index index,
LatchedActionListener<IndexRoutingTable> latchedActionListener
Expand All @@ -284,7 +283,7 @@ public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
BlobContainer blobContainer = blobStoreRepository.blobStore()
.blobContainer(BlobPath.cleanPath().add(uploadedFilename.substring(0, idx)));

return () -> readAsync(
readAsync(
blobContainer,
blobFileName,
index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.core.index.Index;
Expand Down Expand Up @@ -42,14 +41,13 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
}

@Override
public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
public void getIndexRoutingAsyncAction(
ClusterState clusterState,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
BlobPath clusterBasePath
) {
// noop
return () -> {};
}

@Override
Expand All @@ -63,13 +61,12 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
}

@Override
public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
public void getAsyncIndexRoutingReadAction(
String uploadedFilename,
Index index,
LatchedActionListener<IndexRoutingTable> latchedActionListener
) {
// noop
return () -> {};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.lifecycle.LifecycleComponent;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -46,7 +45,7 @@ public IndexRoutingTable read(StreamInput in, String key) throws IOException {

List<IndexRoutingTable> getIndicesRouting(RoutingTable routingTable);

CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
void getAsyncIndexRoutingReadAction(
String uploadedFilename,
Index index,
LatchedActionListener<IndexRoutingTable> latchedActionListener
Expand All @@ -62,7 +61,7 @@ DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>>
RoutingTable after
);

CheckedRunnable<IOException> getIndexRoutingAsyncAction(
void getIndexRoutingAsyncAction(
ClusterState clusterState,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.remote;

import org.opensearch.core.action.ActionListener;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.model.RemoteReadResult;

import java.util.HashMap;
import java.util.Map;

/**
* An abstract class that provides a base implementation for managing remote entities in the remote store.
*/
public abstract class AbstractRemoteWritableEntityManager implements RemoteWritableEntityManager {
/**
* A map that stores the remote writable entity stores, keyed by the entity type.
*/
protected final Map<String, RemoteWritableEntityStore> remoteWritableEntityStores = new HashMap<>();

/**
* Retrieves the remote writable entity store for the given entity.
*
* @param entity the entity for which the store is requested
* @return the remote writable entity store for the given entity
* @throws IllegalArgumentException if the entity type is unknown
*/
protected RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) {
RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType());
if (remoteStore == null) {
throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]");
}
return remoteStore;
}

/**
* Returns an ActionListener for handling the write operation for the specified component, remote object, and latched action listener.
*
* @param component the component for which the write operation is performed
* @param remoteEntity the remote object to be written
* @param listener the listener to be notified when the write operation completes
* @return an ActionListener for handling the write operation
*/
protected abstract ActionListener<Void> getWrappedWriteListener(
String component,
AbstractRemoteWritableBlobEntity remoteEntity,
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
);

/**
* Returns an ActionListener for handling the read operation for the specified component,
* remote object, and latched action listener.
*
* @param component the component for which the read operation is performed
* @param remoteEntity the remote object to be read
* @param listener the listener to be notified when the read operation completes
* @return an ActionListener for handling the read operation
*/
protected abstract ActionListener<Object> getWrappedReadListener(
String component,
AbstractRemoteWritableBlobEntity remoteEntity,
ActionListener<RemoteReadResult> listener
);

@Override
public void writeAsync(
String component,
AbstractRemoteWritableBlobEntity entity,
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
) {
getStore(entity).writeAsync(entity, getWrappedWriteListener(component, entity, listener));
}

@Override
public void readAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<RemoteReadResult> listener) {
getStore(entity).readAsync(entity, getWrappedReadListener(component, entity, listener));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.remote;

import org.opensearch.core.action.ActionListener;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.gateway.remote.model.RemoteReadResult;

/**
* The RemoteWritableEntityManager interface provides async read and write methods for managing remote entities in the remote store
*/
public interface RemoteWritableEntityManager {

/**
* Performs an asynchronous read operation for the specified component and entity.
*
* @param component the component for which the read operation is performed
* @param entity the entity to be read
* @param listener the listener to be notified when the read operation completes.
* The listener's {@link ActionListener#onResponse(Object)} method
* is called with a {@link RemoteReadResult} object containing the
* read data on successful read. The
* {@link ActionListener#onFailure(Exception)} method is called with
* an exception if the read operation fails.
*/
void readAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<RemoteReadResult> listener);

/**
* Performs an asynchronous write operation for the specified component and entity.
*
* @param component the component for which the write operation is performed
* @param entity the entity to be written
* @param listener the listener to be notified when the write operation completes.
* The listener's {@link ActionListener#onResponse(Object)} method
* is called with a {@link UploadedMetadata} object containing the
* uploaded metadata on successful write. The
* {@link ActionListener#onFailure(Exception)} method is called with
* an exception if the write operation fails.
*/
void writeAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<UploadedMetadata> listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@

package org.opensearch.gateway.remote;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.DiffableUtils.NonDiffableValueSerializer;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.RemoteWritableEntityStore;
import org.opensearch.common.remote.AbstractRemoteWritableEntityManager;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.gateway.remote.model.RemoteClusterBlocks;
Expand All @@ -26,23 +24,19 @@
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
* A Manager which provides APIs to upload and download attributes of ClusterState to the {@link RemoteClusterStateBlobStore}
*
* @opensearch.internal
*/
public class RemoteClusterStateAttributesManager {
public class RemoteClusterStateAttributesManager extends AbstractRemoteWritableEntityManager {
public static final String CLUSTER_STATE_ATTRIBUTE = "cluster_state_attribute";
public static final String DISCOVERY_NODES = "nodes";
public static final String CLUSTER_BLOCKS = "blocks";
public static final int CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION = 1;
private final Map<String, RemoteWritableEntityStore> remoteWritableEntityStores;
private final NamedWriteableRegistry namedWriteableRegistry;

RemoteClusterStateAttributesManager(
String clusterName,
Expand All @@ -51,8 +45,6 @@ public class RemoteClusterStateAttributesManager {
NamedWriteableRegistry namedWriteableRegistry,
ThreadPool threadpool
) {
this.namedWriteableRegistry = namedWriteableRegistry;
this.remoteWritableEntityStores = new HashMap<>();
this.remoteWritableEntityStores.put(
RemoteDiscoveryNodes.DISCOVERY_NODES,
new RemoteClusterStateBlobStore<>(
Expand Down Expand Up @@ -85,46 +77,28 @@ public class RemoteClusterStateAttributesManager {
);
}

/**
* Allows async upload of Cluster State Attribute components to remote
*/
CheckedRunnable<IOException> getAsyncMetadataWriteAction(
@Override
protected ActionListener<Void> getWrappedWriteListener(
String component,
AbstractRemoteWritableBlobEntity blobEntity,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {
return () -> getStore(blobEntity).writeAsync(blobEntity, getActionListener(component, blobEntity, latchedActionListener));
}

private ActionListener<Void> getActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
AbstractRemoteWritableBlobEntity remoteEntity,
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
) {
return ActionListener.wrap(
resp -> latchedActionListener.onResponse(remoteObject.getUploadedMetadata()),
ex -> latchedActionListener.onFailure(new RemoteStateTransferException(component, remoteObject, ex))
resp -> listener.onResponse(remoteEntity.getUploadedMetadata()),
ex -> listener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteEntity, ex))
);
}

private RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) {
RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType());
if (remoteStore == null) {
throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]");
}
return remoteStore;
}

public CheckedRunnable<IOException> getAsyncMetadataReadAction(
@Override
protected ActionListener<Object> getWrappedReadListener(
String component,
AbstractRemoteWritableBlobEntity blobEntity,
LatchedActionListener<RemoteReadResult> listener
AbstractRemoteWritableBlobEntity remoteEntity,
ActionListener<RemoteReadResult> listener
) {
final ActionListener actionListener = ActionListener.wrap(
return ActionListener.wrap(
response -> listener.onResponse(new RemoteReadResult(response, CLUSTER_STATE_ATTRIBUTE, component)),
listener::onFailure
ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteEntity, ex))
);
return () -> getStore(blobEntity).readAsync(blobEntity, actionListener);
}

public DiffableUtils.MapDiff<String, ClusterState.Custom, Map<String, ClusterState.Custom>> getUpdatedCustoms(
Expand Down Expand Up @@ -158,4 +132,5 @@ public DiffableUtils.MapDiff<String, ClusterState.Custom, Map<String, ClusterSta
NonDiffableValueSerializer.getAbstractInstance()
);
}

}
Loading
Loading