diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java index 49b2071f5db2c..91e7e902eab8a 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; @@ -36,6 +37,7 @@ import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; import org.apache.bookkeeper.mledger.LedgerOffloaderStats; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapFile; @@ -53,6 +55,12 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle { private final LedgerOffloaderStats offloaderStats; private final String managedLedgerName; private final String topicName; + enum State { + Opened, + Closed + } + private volatile State state; + private final AtomicReference> closeFuture = new AtomicReference<>(); private FileStoreBackedReadHandleImpl(ExecutorService executor, MapFile.Reader reader, long ledgerId, LedgerOffloaderStats offloaderStats, @@ -72,6 +80,7 @@ private FileStoreBackedReadHandleImpl(ExecutorService executor, MapFile.Reader r offloaderStats.recordReadOffloadIndexLatency(topicName, System.nanoTime() - startReadIndexTime, TimeUnit.NANOSECONDS); this.ledgerMetadata = parseLedgerMetadata(ledgerId, value.copyBytes()); + state = State.Opened; } catch (IOException e) { log.error("Fail to read LedgerMetadata for ledgerId {}", ledgerId); @@ -92,15 +101,20 @@ public LedgerMetadata getLedgerMetadata() { @Override public CompletableFuture closeAsync() { - CompletableFuture promise = new CompletableFuture<>(); + if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) { + return closeFuture.get(); + } + + CompletableFuture promise = closeFuture.get(); executor.execute(() -> { - try { - reader.close(); - promise.complete(null); - } catch (IOException t) { - promise.completeExceptionally(t); - } - }); + try { + reader.close(); + state = State.Closed; + promise.complete(null); + } catch (IOException t) { + promise.completeExceptionally(t); + } + }); return promise; } @@ -111,6 +125,12 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr } CompletableFuture promise = new CompletableFuture<>(); executor.execute(() -> { + if (state == State.Closed) { + log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", + ledgerId, firstEntry, lastEntry); + promise.completeExceptionally(new ManagedLedgerException.OffloadReadHandleClosedException()); + return; + } if (firstEntry > lastEntry || firstEntry < 0 || lastEntry > getLastAddConfirmed()) { diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java index 5a571bb208e34..5346be6a044c8 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger.offload.jcloud.impl; +import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import io.netty.buffer.ByteBuf; @@ -30,6 +31,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; @@ -66,13 +68,14 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle { .newBuilder() .expireAfterAccess(CACHE_TTL_SECONDS, TimeUnit.SECONDS) .build(); + private final AtomicReference> closeFuture = new AtomicReference<>(); enum State { Opened, Closed } - private State state = null; + private volatile State state = null; private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index, BackedInputStream inputStream, ExecutorService executor) { @@ -96,18 +99,22 @@ public LedgerMetadata getLedgerMetadata() { @Override public CompletableFuture closeAsync() { - CompletableFuture promise = new CompletableFuture<>(); + if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) { + return closeFuture.get(); + } + + CompletableFuture promise = closeFuture.get(); executor.execute(() -> { - try { - index.close(); - inputStream.close(); - entryOffsets.invalidateAll(); - state = State.Closed; - promise.complete(null); - } catch (IOException t) { - promise.completeExceptionally(t); - } - }); + try { + index.close(); + inputStream.close(); + entryOffsets.invalidateAll(); + state = State.Closed; + promise.complete(null); + } catch (IOException t) { + promise.completeExceptionally(t); + } + }); return promise; } @@ -298,6 +305,7 @@ public static ReadHandle open(ScheduledExecutorService executor, } // for testing + @VisibleForTesting State getState() { return this.state; } diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java index e40a0a3834c85..53d96e08abf5e 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java @@ -30,6 +30,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import lombok.val; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; @@ -60,7 +61,8 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle { private final List inputStreams; private final List dataStreams; private final ExecutorService executor; - private State state = null; + private volatile State state = null; + private final AtomicReference> closeFuture = new AtomicReference<>(); enum State { Opened, @@ -123,7 +125,11 @@ public LedgerMetadata getLedgerMetadata() { @Override public CompletableFuture closeAsync() { - CompletableFuture promise = new CompletableFuture<>(); + if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) { + return closeFuture.get(); + } + + CompletableFuture promise = closeFuture.get(); executor.execute(() -> { try { for (OffloadIndexBlockV2 indexBlock : indices) { @@ -143,7 +149,9 @@ public CompletableFuture closeAsync() { @Override public CompletableFuture readAsync(long firstEntry, long lastEntry) { - log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry); + if (log.isDebugEnabled()) { + log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry); + } CompletableFuture promise = new CompletableFuture<>(); executor.execute(() -> { if (state == State.Closed) {