Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix][offload] Fix Offload readHandle cannot close multi times. (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
dao-jun authored Feb 29, 2024
1 parent 6ec473e commit e25c7f0
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
Expand All @@ -36,6 +37,7 @@
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
Expand All @@ -53,6 +55,12 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
private final LedgerOffloaderStats offloaderStats;
private final String managedLedgerName;
private final String topicName;
enum State {
Opened,
Closed
}
private volatile State state;
private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();

private FileStoreBackedReadHandleImpl(ExecutorService executor, MapFile.Reader reader, long ledgerId,
LedgerOffloaderStats offloaderStats,
Expand All @@ -72,6 +80,7 @@ private FileStoreBackedReadHandleImpl(ExecutorService executor, MapFile.Reader r
offloaderStats.recordReadOffloadIndexLatency(topicName,
System.nanoTime() - startReadIndexTime, TimeUnit.NANOSECONDS);
this.ledgerMetadata = parseLedgerMetadata(ledgerId, value.copyBytes());
state = State.Opened;
} catch (IOException e) {
log.error("Fail to read LedgerMetadata for ledgerId {}",
ledgerId);
Expand All @@ -92,15 +101,20 @@ public LedgerMetadata getLedgerMetadata() {

@Override
public CompletableFuture<Void> closeAsync() {
CompletableFuture<Void> promise = new CompletableFuture<>();
if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) {
return closeFuture.get();
}

CompletableFuture<Void> promise = closeFuture.get();
executor.execute(() -> {
try {
reader.close();
promise.complete(null);
} catch (IOException t) {
promise.completeExceptionally(t);
}
});
try {
reader.close();
state = State.Closed;
promise.complete(null);
} catch (IOException t) {
promise.completeExceptionally(t);
}
});
return promise;
}

Expand All @@ -111,6 +125,12 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
}
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
executor.execute(() -> {
if (state == State.Closed) {
log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
ledgerId, firstEntry, lastEntry);
promise.completeExceptionally(new ManagedLedgerException.OffloadReadHandleClosedException());
return;
}
if (firstEntry > lastEntry
|| firstEntry < 0
|| lastEntry > getLastAddConfirmed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.offload.jcloud.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.netty.buffer.ByteBuf;
Expand All @@ -30,6 +31,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
Expand Down Expand Up @@ -66,13 +68,14 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
.newBuilder()
.expireAfterAccess(CACHE_TTL_SECONDS, TimeUnit.SECONDS)
.build();
private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();

enum State {
Opened,
Closed
}

private State state = null;
private volatile State state = null;

private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
BackedInputStream inputStream, ExecutorService executor) {
Expand All @@ -96,18 +99,22 @@ public LedgerMetadata getLedgerMetadata() {

@Override
public CompletableFuture<Void> closeAsync() {
CompletableFuture<Void> promise = new CompletableFuture<>();
if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) {
return closeFuture.get();
}

CompletableFuture<Void> promise = closeFuture.get();
executor.execute(() -> {
try {
index.close();
inputStream.close();
entryOffsets.invalidateAll();
state = State.Closed;
promise.complete(null);
} catch (IOException t) {
promise.completeExceptionally(t);
}
});
try {
index.close();
inputStream.close();
entryOffsets.invalidateAll();
state = State.Closed;
promise.complete(null);
} catch (IOException t) {
promise.completeExceptionally(t);
}
});
return promise;
}

Expand Down Expand Up @@ -298,6 +305,7 @@ public static ReadHandle open(ScheduledExecutorService executor,
}

// for testing
@VisibleForTesting
State getState() {
return this.state;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.val;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
Expand Down Expand Up @@ -60,7 +61,8 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
private final List<BackedInputStream> inputStreams;
private final List<DataInputStream> dataStreams;
private final ExecutorService executor;
private State state = null;
private volatile State state = null;
private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();

enum State {
Opened,
Expand Down Expand Up @@ -123,7 +125,11 @@ public LedgerMetadata getLedgerMetadata() {

@Override
public CompletableFuture<Void> closeAsync() {
CompletableFuture<Void> promise = new CompletableFuture<>();
if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) {
return closeFuture.get();
}

CompletableFuture<Void> promise = closeFuture.get();
executor.execute(() -> {
try {
for (OffloadIndexBlockV2 indexBlock : indices) {
Expand All @@ -143,7 +149,9 @@ public CompletableFuture<Void> closeAsync() {

@Override
public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
if (log.isDebugEnabled()) {
log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
}
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
executor.execute(() -> {
if (state == State.Closed) {
Expand Down

0 comments on commit e25c7f0

Please sign in to comment.