diff --git a/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java index 3ee74e5267718..6b9c619e3141a 100644 --- a/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java @@ -10,10 +10,11 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.search.ReferenceManager; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.lease.Releasables; import org.opensearch.common.unit.TimeValue; import org.opensearch.threadpool.ThreadPool; -import java.io.Closeable; import java.io.IOException; import java.util.Objects; import java.util.concurrent.Semaphore; @@ -26,7 +27,7 @@ * is closed, all the permits are acquired and there are no available permits to afterRefresh. This abstract class provides * necessary abstract methods to schedule retry. */ -public abstract class CloseableRetryableRefreshListener implements ReferenceManager.RefreshListener, Closeable { +public abstract class CloseableRetryableRefreshListener implements ReferenceManager.RefreshListener { /** * Total permits = 1 ensures that there is only single instance of runAfterRefreshWithPermit that is running at a time. @@ -184,18 +185,24 @@ private void scheduleRetry(boolean afterRefreshSuccessful, boolean didRefresh) { */ protected abstract boolean performAfterRefreshWithPermit(boolean didRefresh); - @Override - public final void close() throws IOException { + public final Releasable drainRefreshes() { try { if (semaphore.tryAcquire(TOTAL_PERMITS, 10, TimeUnit.MINUTES)) { boolean result = closed.compareAndSet(false, true); assert result && semaphore.availablePermits() == 0; getLogger().info("All permits are acquired and refresh listener is closed"); + return Releasables.releaseOnce(() -> { + semaphore.release(TOTAL_PERMITS); + boolean wasClosed = closed.getAndSet(false); + assert semaphore.availablePermits() == TOTAL_PERMITS : "Available permits is " + semaphore.availablePermits(); + assert wasClosed : "RefreshListener is not closed before reopening it"; + getLogger().info("All permits are released and refresh listener is open"); + }); } else { - throw new TimeoutException("timeout while closing gated refresh listener"); + throw new TimeoutException("Timeout while acquiring all permits"); } } catch (InterruptedException | TimeoutException e) { - throw new RuntimeException("Failed to close the closeable retryable listener", e); + throw new RuntimeException("Failed to acquire all permits", e); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f1873ac659400..252aa6592eae3 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -845,6 +845,9 @@ public void relocated( final Runnable performSegRep ) throws IllegalIndexShardStateException, IllegalStateException, InterruptedException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; + // The below list of releasable ensures that if the relocation does not happen, we undo the activity of close and + // acquire all permits. This will ensure that the remote store uploads can still be done by the existing primary shard. + List releasablesOnNoHandoff = new ArrayList<>(); try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) { indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { forceRefreshes.close(); @@ -857,11 +860,15 @@ public void relocated( maybeSync(); } - // Ensures all in-flight remote store operations drain, before we perform the handoff. - internalRefreshListener.stream() - .filter(refreshListener -> refreshListener instanceof Closeable) - .map(refreshListener -> (Closeable) refreshListener) - .close(); + // Ensures all in-flight remote store refreshes drain, before we perform the performSegRep. + for (ReferenceManager.RefreshListener refreshListener : internalRefreshListener) { + if (refreshListener instanceof CloseableRetryableRefreshListener) { + releasablesOnNoHandoff.add(((CloseableRetryableRefreshListener) refreshListener).drainRefreshes()); + } + } + + // Ensure all in-flight remote store translog upload drains, before we perform the performSegRep. + releasablesOnNoHandoff.add(getEngineOrNull().translogManager().drainSyncToStore()); // no shard operation permits are being held here, move state from started to relocated assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED @@ -896,6 +903,15 @@ public void relocated( // Fail primary relocation source and target shards. failShard("timed out waiting for relocation hand-off to complete", null); throw new IndexShardClosedException(shardId(), "timed out waiting for relocation hand-off to complete"); + } finally { + // If the primary mode is still true after the end of handoff attempt, it basically means that the relocation + // failed. The existing primary will continue to be the primary, so we need to allow the segments and translog + // upload to resume. + if (replicationTracker.isPrimaryMode()) { + for (Releasable releasable : releasablesOnNoHandoff) { + releasable.close(); + } + } } } diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 4d0fc13d433c6..eabf08d85fa7c 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.logging.Loggers; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.common.util.io.IOUtils; @@ -301,10 +302,16 @@ public void setMinSeqNoToKeep(long seqNo) { translog.setMinSeqNoToKeep(seqNo); } + @Override public void onDelete() { translog.onDelete(); } + @Override + public Releasable drainSyncToStore() { + return translog.drainSyncToStore(); + } + @Override public Translog.TranslogGeneration getTranslogGeneration() { return translog.getGeneration(); diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java index 3e6a8e69edfbb..33b64a20e934b 100644 --- a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java @@ -8,6 +8,7 @@ package org.opensearch.index.translog; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.core.index.shard.ShardId; @@ -121,8 +122,14 @@ public Translog.Snapshot newChangesSnapshot(long fromSeqNo, long toSeqNo, boolea throw new UnsupportedOperationException("Translog snapshot unsupported with no-op translogs"); } + @Override public void onDelete() {} + @Override + public Releasable drainSyncToStore() { + return () -> {}; + } + @Override public Translog.TranslogGeneration getTranslogGeneration() { return null; diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 8fb420e8fa1da..0a4c4e5801cad 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.common.SetOnce; import org.opensearch.common.lease.Releasable; +import org.opensearch.common.lease.Releasables; import org.opensearch.common.logging.Loggers; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.common.util.io.IOUtils; @@ -38,6 +39,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.function.LongSupplier; @@ -53,7 +56,6 @@ public class RemoteFsTranslog extends Translog { private final Logger logger; - private final BlobStoreRepository blobStoreRepository; private final TranslogTransferManager translogTransferManager; private final FileTransferTracker fileTransferTracker; private final BooleanSupplier primaryModeSupplier; @@ -75,6 +77,10 @@ public class RemoteFsTranslog extends Translog { // Semaphore used to allow only single remote generation to happen at a time private final Semaphore remoteGenerationDeletionPermits = new Semaphore(REMOTE_DELETION_PERMITS); + // These permits exist to allow any inflight background triggered upload. + private static final int UPLOAD_PERMITS = 1; + private final Semaphore uploadPermits = new Semaphore(UPLOAD_PERMITS); + public RemoteFsTranslog( TranslogConfig config, String translogUUID, @@ -89,7 +95,6 @@ public RemoteFsTranslog( ) throws IOException { super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer); logger = Loggers.getLogger(getClass(), shardId); - this.blobStoreRepository = blobStoreRepository; this.primaryModeSupplier = primaryModeSupplier; this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker); @@ -321,8 +326,13 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws // below ensures that the real primary only is uploading. Before the primary mode is set as true for the new // primary, the engine is reset to InternalEngine which also initialises the RemoteFsTranslog which in turns // downloads all the translogs from remote store and does a flush before the relocation finishes. - if (primaryModeSupplier.getAsBoolean() == false) { - logger.debug("skipped uploading translog for {} {}", primaryTerm, generation); + if (primaryModeSupplier.getAsBoolean() == false || uploadPermits.tryAcquire(1) == false) { + logger.debug( + "skipped uploading translog for {} {} uploadPermits={}", + primaryTerm, + generation, + uploadPermits.availablePermits() + ); // NO-OP return true; } @@ -341,6 +351,8 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws transferSnapshotProvider, new RemoteFsTranslogTransferListener(generation, primaryTerm, maxSeqNo) ); + } finally { + uploadPermits.release(1); } } @@ -423,6 +435,24 @@ protected void setMinSeqNoToKeep(long seqNo) { this.minSeqNoToKeep = seqNo; } + @Override + protected Releasable drainSyncToStore() { + try { + if (uploadPermits.tryAcquire(UPLOAD_PERMITS, 1, TimeUnit.MINUTES)) { + logger.info("All permits acquired"); + return Releasables.releaseOnce(() -> { + uploadPermits.release(UPLOAD_PERMITS); + assert uploadPermits.availablePermits() == UPLOAD_PERMITS : "Available permits is " + uploadPermits.availablePermits(); + logger.info("All permits released"); + }); + } else { + throw new TimeoutException("Timeout while acquiring all permits"); + } + } catch (TimeoutException | InterruptedException e) { + throw new RuntimeException("Failed to acquire all permits", e); + } + } + @Override public void trimUnreferencedReaders() throws IOException { // clean up local translog files and updates readers diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index 8b4662238ed25..6e08d7c4d7bdb 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -1817,6 +1817,13 @@ protected void setMinSeqNoToKeep(long seqNo) {} protected void onDelete() {} + /** + * Drains ongoing syncs to the underlying store. It returns a releasable which can be closed to resume the syncs back. + */ + protected Releasable drainSyncToStore() { + return () -> {}; + } + /** * deletes all files associated with a reader. package-private to be able to simulate node failures at this point */ diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java index 4279e0289c1dc..d66f12dc3da81 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -9,6 +9,7 @@ package org.opensearch.index.translog; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.lease.Releasable; import java.io.IOException; import java.util.stream.Stream; @@ -135,5 +136,10 @@ public interface TranslogManager { */ void onDelete(); + /** + * Drains ongoing syncs to the underlying store. It returns a releasable which can be closed to resume the syncs back. + */ + Releasable drainSyncToStore(); + Translog.TranslogGeneration getTranslogGeneration(); } diff --git a/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java index 01242063caa77..cc192e4eefa0a 100644 --- a/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java @@ -66,7 +66,7 @@ protected Logger getLogger() { // Second invocation of afterRefresh method testRefreshListener.afterRefresh(true); assertEquals(0, countDownLatch.getCount()); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); } /** @@ -98,7 +98,7 @@ protected Logger getLogger() { assertEquals(initialCount - refreshCount, countDownLatch.getCount()); // Closing the refresh listener so that no further afterRefreshes are executed going forward - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); for (int i = 0; i < initialCount - refreshCount; i++) { testRefreshListener.afterRefresh(true); @@ -129,7 +129,7 @@ protected Logger getLogger() { }; testRefreshListener.afterRefresh(true); assertEquals(initialCount - 1, countDownLatch.getCount()); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { @Override @@ -148,7 +148,7 @@ protected Logger getLogger() { }; testRefreshListener.afterRefresh(true); assertEquals(initialCount - 2, countDownLatch.getCount()); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { @Override @@ -172,7 +172,7 @@ protected Logger getLogger() { }; testRefreshListener.afterRefresh(true); assertEquals(initialCount - 3, countDownLatch.getCount()); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { @Override @@ -196,7 +196,7 @@ protected Logger getLogger() { }; testRefreshListener.afterRefresh(true); assertEquals(initialCount - 4, countDownLatch.getCount()); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); } /** @@ -237,7 +237,7 @@ protected boolean isRetryEnabled() { }; testRefreshListener.afterRefresh(true); assertBusy(() -> assertEquals(0, countDownLatch.getCount())); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); } /** @@ -272,7 +272,7 @@ protected Logger getLogger() { } }; testRefreshListener.afterRefresh(randomBoolean()); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); assertNotEquals(0, countDownLatch.getCount()); } @@ -307,7 +307,7 @@ protected Logger getLogger() { }); thread.start(); assertBusy(() -> assertEquals(0, countDownLatch.getCount())); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); } public void testScheduleRetryAfterClose() throws Exception { @@ -358,8 +358,8 @@ protected TimeValue getNextRetryInterval() { Thread thread2 = new Thread(() -> { try { Thread.sleep(500); - testRefreshListener.close(); - } catch (IOException | InterruptedException e) { + testRefreshListener.drainRefreshes(); + } catch (InterruptedException e) { throw new AssertionError(e); } }); @@ -408,7 +408,7 @@ protected boolean isRetryEnabled() { testRefreshListener.afterRefresh(true); testRefreshListener.afterRefresh(true); assertBusy(() -> assertEquals(3, runCount.get())); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); } public void testExceptionDuringThreadPoolSchedule() throws Exception { @@ -450,7 +450,7 @@ protected boolean isRetryEnabled() { assertThrows(RuntimeException.class, () -> testRefreshListener.afterRefresh(true)); assertBusy(() -> assertFalse(testRefreshListener.getRetryScheduledStatus())); assertEquals(1, runCount.get()); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); } @After