Skip to content

Commit

Permalink
[Remote State] fix lock release before deletion is completed (opensea…
Browse files Browse the repository at this point in the history
…rch-project#10611)

* fix lock release before deletion is completed

Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi authored and austintlee committed Oct 23, 2023
1 parent e4911df commit f5b3316
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,8 @@ public void onFailure(Exception e) {
* @param clusterUUID uuid of cluster state to refer to in remote
* @param manifestsToRetain no of latest manifest files to keep in remote
*/
private void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) {
// package private for testing
void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) {
if (deleteStaleMetadataRunning.compareAndSet(false, true) == false) {
logger.info("Delete stale cluster metadata task is already in progress.");
return;
Expand Down Expand Up @@ -1109,8 +1110,9 @@ public void onFailure(Exception e) {
}
}
);
} finally {
} catch (Exception e) {
deleteStaleMetadataRunning.set(false);
throw e;
}
}

Expand Down Expand Up @@ -1190,7 +1192,7 @@ private void deleteStalePaths(String clusterName, String clusterUUID, List<Strin
public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataManifest committedManifest) {
threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
String clusterName = clusterState.getClusterName().value();
logger.info("Deleting stale cluster UUIDs data from remote [{}]", clusterName);
logger.debug("Deleting stale cluster UUIDs data from remote [{}]", clusterName);
Set<String> allClustersUUIDsInRemote;
try {
allClustersUUIDsInRemote = new HashSet<>(getAllClusterUUIDs(clusterState.getClusterName().value()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import org.mockito.ArgumentCaptor;
Expand All @@ -73,6 +76,7 @@
import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_CURRENT_CODEC_VERSION;
import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX;
import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX;
import static org.opensearch.gateway.remote.RemoteClusterStateService.RETAINED_MANIFESTS;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
Expand Down Expand Up @@ -1004,6 +1008,36 @@ public void testFileNames() {
assertThat(splittedName[3], is("P"));
}

public void testSingleConcurrentExecutionOfStaleManifestCleanup() throws Exception {
BlobContainer blobContainer = mock(BlobContainer.class);
BlobPath blobPath = new BlobPath().add("random-path");
when((blobStoreRepository.basePath())).thenReturn(blobPath);
when(blobStore.blobContainer(any())).thenReturn(blobContainer);

CountDownLatch latch = new CountDownLatch(1);
AtomicInteger callCount = new AtomicInteger(0);
doAnswer(invocation -> {
callCount.incrementAndGet();
if (latch.await(5000, TimeUnit.SECONDS) == false) {
throw new Exception("Timed out waiting for delete task queuing to complete");
}
return null;
}).when(blobContainer)
.listBlobsByPrefixInSortedOrder(
any(String.class),
any(int.class),
any(BlobContainer.BlobNameSortOrder.class),
any(ActionListener.class)
);

remoteClusterStateService.start();
remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS);
remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS);

latch.countDown();
assertBusy(() -> assertEquals(1, callCount.get()));
}

private void mockObjectsForGettingPreviousClusterUUID(Map<String, String> clusterUUIDsPointers) throws IOException {
final BlobPath blobPath = mock(BlobPath.class);
when((blobStoreRepository.basePath())).thenReturn(blobPath);
Expand Down

0 comments on commit f5b3316

Please sign in to comment.