diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index f3e3a9dc144d9..1e5d67871d434 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -59,7 +59,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; @@ -4039,7 +4038,7 @@ public static ManagedLedgerException createManagedLedgerException(Throwable t) { */ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig config, DigestType digestType, CreateCallback cb, Map metadata) { - AtomicBoolean ledgerCreated = new AtomicBoolean(false); + CompletableFuture ledgerFutureHook = new CompletableFuture<>(); Map finalMetadata = new HashMap<>(); finalMetadata.putAll(ledgerMetadata); finalMetadata.putAll(metadata); @@ -4052,33 +4051,39 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig conf )); } catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) { log.error("[{}] Serialize the placement configuration failed", name, e); - cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated); + cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook); return; } } createdLedgerCustomMetadata = finalMetadata; - try { bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), - config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerCreated, finalMetadata); + config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerFutureHook, finalMetadata); } catch (Throwable cause) { log.error("[{}] Encountered unexpected error when creating ledger", name, cause); - cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated); + ledgerFutureHook.completeExceptionally(cause); + cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook); return; } - scheduledExecutor.schedule(() -> { - if (!ledgerCreated.get()) { + + ScheduledFuture timeoutChecker = scheduledExecutor.schedule(() -> { + if (!ledgerFutureHook.isDone() + && ledgerFutureHook.completeExceptionally(new TimeoutException(name + " Create ledger timeout"))) { if (log.isDebugEnabled()) { log.debug("[{}] Timeout creating ledger", name); } - cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated); + cb.createComplete(BKException.Code.TimeoutException, null, ledgerFutureHook); } else { if (log.isDebugEnabled()) { log.debug("[{}] Ledger already created when timeout task is triggered", name); } } }, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS); + + ledgerFutureHook.whenComplete((ignore, ex) -> { + timeoutChecker.cancel(false); + }); } public Clock getClock() { @@ -4087,16 +4092,12 @@ public Clock getClock() { /** * check if ledger-op task is already completed by timeout-task. If completed then delete the created ledger - * - * @param rc - * @param lh - * @param ctx * @return */ protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, Object ctx) { - if (ctx instanceof AtomicBoolean) { + if (ctx instanceof CompletableFuture) { // ledger-creation is already timed out and callback is already completed so, delete this ledger and return. - if (((AtomicBoolean) (ctx)).compareAndSet(false, true)) { + if (((CompletableFuture) ctx).complete(lh)) { return false; } else { if (rc == BKException.Code.OK) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 6e04aafec0f30..df7da9bdc1370 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -61,11 +61,13 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -90,6 +92,8 @@ import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.common.util.BoundedScheduledExecutorService; +import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; @@ -136,6 +140,7 @@ import org.apache.pulsar.metadata.api.extended.SessionEvent; import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore; import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.DataProvider; @@ -3087,9 +3092,9 @@ public void testManagedLedgerWithCreateLedgerTimeOut() throws Exception { latch.await(config.getMetadataOperationsTimeoutSeconds() + 2, TimeUnit.SECONDS); assertEquals(response.get(), BKException.Code.TimeoutException); - assertTrue(ctxHolder.get() instanceof AtomicBoolean); - AtomicBoolean ledgerCreated = (AtomicBoolean) ctxHolder.get(); - assertFalse(ledgerCreated.get()); + assertTrue(ctxHolder.get() instanceof CompletableFuture); + CompletableFuture ledgerCreateHook = (CompletableFuture) ctxHolder.get(); + assertTrue(ledgerCreateHook.isCompletedExceptionally()); ledger.close(); } @@ -4100,4 +4105,52 @@ public void testNonDurableCursorCreateForInactiveLedger() throws Exception { Position Position = new PositionImpl(-1L, -1L); assertNotNull(ml.newNonDurableCursor(Position)); } + + /*** + * When a ML tries to create a ledger, it will create a delay task to check if the ledger create request is timeout. + * But we should guarantee that the delay task should be canceled after the ledger create request responded. + */ + @Test + public void testNoOrphanScheduledTasksAfterCloseML() throws Exception { + String mlName = UUID.randomUUID().toString(); + ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, bkc); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMetadataOperationsTimeoutSeconds(3600); + + // Calculate pending task count. + long pendingTaskCountBefore = calculatePendingTaskCount(factory.getScheduledExecutor()); + // Trigger create & close ML 1000 times. + for (int i = 0; i < 1000; i++) { + ManagedLedger ml = factory.open(mlName, config); + ml.close(); + } + // Verify there is no orphan scheduled task. + long pendingTaskCountAfter = calculatePendingTaskCount(factory.getScheduledExecutor()); + // Maybe there are other components also appended scheduled tasks, so leave 100 tasks to avoid flaky. + assertTrue(pendingTaskCountAfter - pendingTaskCountBefore < 100); + } + + /** + * Calculate how many pending tasks in {@link OrderedScheduler} + */ + private long calculatePendingTaskCount(OrderedScheduler orderedScheduler) { + ExecutorService[] threads = WhiteboxImpl.getInternalState(orderedScheduler, "threads"); + long taskCounter = 0; + for (ExecutorService thread : threads) { + BoundedScheduledExecutorService boundedScheduledExecutorService = + WhiteboxImpl.getInternalState(thread, "delegate"); + BlockingQueue queue = WhiteboxImpl.getInternalState(boundedScheduledExecutorService, "queue"); + for (Runnable r : queue) { + if (r instanceof FutureTask) { + FutureTask futureTask = (FutureTask) r; + if (!futureTask.isCancelled() && !futureTask.isDone()) { + taskCounter++; + } + } else { + taskCounter++; + } + } + } + return taskCounter; + } }