diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/retina/RetinaService.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/retina/RetinaService.java index 7328913417..4f04f0c0b8 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/retina/RetinaService.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/retina/RetinaService.java @@ -407,4 +407,58 @@ public boolean addWriteBuffer(String schemaName, String tableName) throws Retina } return true; } + + /** + * Register a long-running query to be offloaded to disk checkpoint. + * + * @param timestamp the transaction timestamp + * @return true on success + * @throws RetinaException if the operation fails + */ + public boolean registerOffload(long timestamp) throws RetinaException + { + String token = UUID.randomUUID().toString(); + RetinaProto.RegisterOffloadRequest request = RetinaProto.RegisterOffloadRequest.newBuilder() + .setHeader(RetinaProto.RequestHeader.newBuilder().setToken(token).build()) + .setTimestamp(timestamp) + .build(); + RetinaProto.RegisterOffloadResponse response = this.stub.registerOffload(request); + if (response.getHeader().getErrorCode() != 0) + { + throw new RetinaException("Failed to register offload: " + response.getHeader().getErrorCode() + + " " + response.getHeader().getErrorMsg()); + } + if (!response.getHeader().getToken().equals(token)) + { + throw new RetinaException("Response token does not match"); + } + return true; + } + + /** + * Unregister a long-running query's offload checkpoint when the query completes. + * + * @param timestamp the transaction timestamp + * @return true on success + * @throws RetinaException if the operation fails + */ + public boolean unregisterOffload(long timestamp) throws RetinaException + { + String token = UUID.randomUUID().toString(); + RetinaProto.UnregisterOffloadRequest request = RetinaProto.UnregisterOffloadRequest.newBuilder() + .setHeader(RetinaProto.RequestHeader.newBuilder().setToken(token).build()) + .setTimestamp(timestamp) + .build(); + RetinaProto.UnregisterOffloadResponse response = this.stub.unregisterOffload(request); + if (response.getHeader().getErrorCode() != 0) + { + throw new RetinaException("Failed to unregister offload: " + response.getHeader().getErrorCode() + + " " + response.getHeader().getErrorMsg()); + } + if (!response.getHeader().getToken().equals(token)) + { + throw new RetinaException("Response token does not match"); + } + return true; + } } diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/transaction/TransService.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/transaction/TransService.java index eeaa01f1f5..29df3d7f7b 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/transaction/TransService.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/transaction/TransService.java @@ -432,4 +432,24 @@ public long getSafeGcTimestamp() throws TransException } return response.getTimestamp(); } + + /** + * Mark a transaction as offloaded. This allows the transaction to be skipped when + * calculating the minimum running transaction timestamp for garbage collection. + * + * @param transId the id of the transaction to mark as offloaded + * @return true on success + * @throws TransException if the operation fails + */ + public boolean markTransOffloaded(long transId) throws TransException + { + TransProto.MarkTransOffloadedRequest request = TransProto.MarkTransOffloadedRequest.newBuilder() + .setTransId(transId).build(); + TransProto.MarkTransOffloadedResponse response = this.stub.markTransOffloaded(request); + if (response.getErrorCode() != ErrorCode.SUCCESS) + { + throw new TransException("failed to mark transaction as offloaded, error code=" + response.getErrorCode()); + } + return true; + } } diff --git a/pixels-common/src/main/resources/pixels.properties b/pixels-common/src/main/resources/pixels.properties index 88b2d715de..96378f7eb5 100644 --- a/pixels-common/src/main/resources/pixels.properties +++ b/pixels-common/src/main/resources/pixels.properties @@ -288,7 +288,7 @@ retina.gc.interval=300 # offloading threshold for long query in seconds pixels.transaction.offload.threshold=1800 # snapshot storage directory -pixels.retina.checkpoint.dir=/tmp/pixels-checkpoints +pixels.retina.checkpoint.dir=file:///tmp/pixels-checkpoints ### pixels-sink ### sink.server.enabled=false diff --git a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/retina/RetinaServerImpl.java b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/retina/RetinaServerImpl.java index 5719da8d08..8ec3d071c5 100644 --- a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/retina/RetinaServerImpl.java +++ b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/retina/RetinaServerImpl.java @@ -640,6 +640,56 @@ public void getWriteBuffer(RetinaProto.GetWriteBufferRequest request, } } + @Override + public void registerOffload(RetinaProto.RegisterOffloadRequest request, + StreamObserver responseObserver) + { + RetinaProto.ResponseHeader.Builder headerBuilder = RetinaProto.ResponseHeader.newBuilder() + .setToken(request.getHeader().getToken()); + + try + { + this.retinaResourceManager.registerOffload(request.getTimestamp()); + responseObserver.onNext(RetinaProto.RegisterOffloadResponse.newBuilder() + .setHeader(headerBuilder.build()).build()); + } catch (RetinaException e) + { + logger.error("registerOffload failed for timestamp={}", + request.getTimestamp(), e); + headerBuilder.setErrorCode(1).setErrorMsg(e.getMessage()); + responseObserver.onNext(RetinaProto.RegisterOffloadResponse.newBuilder() + .setHeader(headerBuilder.build()).build()); + } finally + { + responseObserver.onCompleted(); + } + } + + @Override + public void unregisterOffload(RetinaProto.UnregisterOffloadRequest request, + StreamObserver responseObserver) + { + RetinaProto.ResponseHeader.Builder headerBuilder = RetinaProto.ResponseHeader.newBuilder() + .setToken(request.getHeader().getToken()); + + try + { + this.retinaResourceManager.unregisterOffload(request.getTimestamp()); + responseObserver.onNext(RetinaProto.UnregisterOffloadResponse.newBuilder() + .setHeader(headerBuilder.build()).build()); + } catch (Exception e) + { + logger.error("unregisterOffload failed for timestamp={}", + request.getTimestamp(), e); + headerBuilder.setErrorCode(1).setErrorMsg(e.getMessage()); + responseObserver.onNext(RetinaProto.UnregisterOffloadResponse.newBuilder() + .setHeader(headerBuilder.build()).build()); + } finally + { + responseObserver.onCompleted(); + } + } + /** * Check if the order or compact paths from pixels metadata is valid. * diff --git a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/transaction/TransContextManager.java b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/transaction/TransContextManager.java index a22b312cd6..18e5c173e1 100644 --- a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/transaction/TransContextManager.java +++ b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/transaction/TransContextManager.java @@ -22,7 +22,6 @@ import io.pixelsdb.pixels.common.transaction.TransContext; import io.pixelsdb.pixels.common.utils.ConfigFactory; import io.pixelsdb.pixels.daemon.TransProto; -import io.pixelsdb.pixels.retina.RetinaResourceManager; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -75,8 +74,6 @@ protected static TransContextManager Instance() private final ReadWriteLock contextLock = new ReentrantReadWriteLock(); - private static final long OFFLOAD_THRESHOLD = Long.parseLong(ConfigFactory.Instance().getProperty("pixels.transaction.offload.threshold")); - private TransContextManager() { } /** @@ -218,20 +215,7 @@ private boolean terminateTrans(long transId, TransProto.TransStatus status) * Adding the same lock in {@link #offloadLongRunningQueries()} * constitutes a mutually exclusive critical section. */ - synchronized (context) - { - context.setStatus(status); - if (context.isOffloaded()) - { - try - { - RetinaResourceManager.Instance().unregisterOffload(context.getTransId(), context.getTimestamp()); - } catch (Exception e) - { - log.error("Unregister failed", e); - } - } - } + context.setStatus(status); if (context.isReadOnly()) { @@ -254,52 +238,6 @@ private boolean terminateTrans(long transId, TransProto.TransStatus status) return false; } - /** - * Offload long-running queries to disk. - */ - public void offloadLongRunningQueries() - { - long now = System.currentTimeMillis(); - boolean pushed = false; - - for (TransContext ctx : runningReadOnlyTrans) - { - if (ctx.isOffloaded()) - { - continue; - } - - if ((now - ctx.getStartTime()) > OFFLOAD_THRESHOLD) - { - try - { - // 1. Register and generate snapshot - RetinaResourceManager.Instance().registerOffload(ctx.getTransId(), ctx.getTimestamp()); - - // 2. Double-checked locking - synchronized (ctx) - { - if (ctx.getStatus() == TransProto.TransStatus.PENDING) - { - ctx.setOffloaded(true); - pushed = true; - } else - { - // Transaction has ended, rollback registration - RetinaResourceManager.Instance().unregisterOffload(ctx.getTransId(), ctx.getTimestamp()); - } - } - } catch (Exception e) - { - log.error("Failed to offload transaction {}", ctx.getTransId(), e); - } - } - } - if (pushed) - { - TransServiceImpl.pushWatermarks(true); - } - } /** * Dump the context of transactions in this manager to a history file and remove terminated transactions. This method @@ -451,4 +389,22 @@ public int getQueryConcurrency(boolean readOnly) this.contextLock.writeLock().unlock(); } } + + /** + * Mark a transaction as offloaded. This allows the transaction context manager to + * skip it when calculating the minimum running transaction timestamp. + * + * @param transId the transaction id + * @return true if the transaction exists and was marked as offloaded, false otherwise + */ + public boolean markTransOffloaded(long transId) + { + TransContext context = this.transIdToContext.get(transId); + if (context != null) + { + context.setOffloaded(true); + return true; + } + return false; + } } diff --git a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/transaction/TransServiceImpl.java b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/transaction/TransServiceImpl.java index 71b317a999..5ee3035f60 100644 --- a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/transaction/TransServiceImpl.java +++ b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/transaction/TransServiceImpl.java @@ -109,19 +109,6 @@ public class TransServiceImpl extends TransServiceGrpc.TransServiceImplBase public TransServiceImpl() { - /* - * Initiate a background monitoring thread to periodically (every 5 minutes) - * trigger the detection and offloading process for long-running queries, - * thereby ensuring the persistent release of blockages on the Low Watermarks - * and guaranteeing the proper functioning of the garbage collection mechanism. - */ - ScheduledExecutorService offloadScheduler = Executors.newSingleThreadScheduledExecutor(r -> { - Thread t = new Thread(r, "trans-offload-monitor"); - t.setDaemon(true); - return t; - }); - offloadScheduler.scheduleAtFixedRate(() -> - TransContextManager.Instance().offloadLongRunningQueries(), 5, 5, TimeUnit.MINUTES); } @Override @@ -314,7 +301,7 @@ public void rollbackTrans(TransProto.RollbackTransRequest request, responseObserver.onCompleted(); } - static void pushWatermarks(boolean readOnly) + private void pushWatermarks(boolean readOnly) { long timestamp = TransContextManager.Instance().getMinRunningTransTimestamp(readOnly); if (readOnly) @@ -497,4 +484,25 @@ public void getSafeGcTimestamp(com.google.protobuf.Empty request, responseObserver.onNext(response); responseObserver.onCompleted(); } + + @Override + public void markTransOffloaded(TransProto.MarkTransOffloadedRequest request, + StreamObserver responseObserver) + { + int error = ErrorCode.SUCCESS; + boolean success = TransContextManager.Instance().markTransOffloaded(request.getTransId()); + if (!success) + { + logger.error("transaction id {} does not exist or failed to mark as offloaded", request.getTransId()); + error = ErrorCode.TRANS_ID_NOT_EXIST; + } + + // After marking, attempt to push low watermark. + pushWatermarks(true); + + TransProto.MarkTransOffloadedResponse response = TransProto.MarkTransOffloadedResponse.newBuilder() + .setErrorCode(error).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } } diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java index b7b8b9ddf0..fe8003ea94 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java @@ -42,10 +42,7 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.nio.file.Files; -import java.nio.file.Path; import java.nio.file.Paths; -import java.nio.file.StandardCopyOption; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; @@ -53,7 +50,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import java.util.stream.Stream; /** * Use the singleton pattern to manage data resources in the retina service. @@ -70,11 +66,10 @@ public class RetinaResourceManager private final ScheduledExecutorService gcExecutor; // Checkpoint related fields - private final Map offloadedCheckpoints; - private final Path checkpointDir; + private final Map offloadedCheckpoints; + private final String checkpointDir; private long latestGcTimestamp = -1; - private final Set offloadedTransIds; // identity routing whitelist private final Map checkpointRefCounts; private static class RecoveredState @@ -102,9 +97,8 @@ private RetinaResourceManager() this.rgVisibilityMap = new ConcurrentHashMap<>(); this.pixelsWriteBufferMap = new ConcurrentHashMap<>(); this.offloadedCheckpoints = new ConcurrentHashMap<>(); - this.offloadedTransIds = ConcurrentHashMap.newKeySet(); this.checkpointRefCounts = new ConcurrentHashMap<>(); - this.checkpointDir = Paths.get(ConfigFactory.Instance().getProperty("pixels.retina.checkpoint.dir")); + this.checkpointDir = ConfigFactory.Instance().getProperty("pixels.retina.checkpoint.dir"); this.recoveryCache = new ConcurrentHashMap<>(); ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(r -> { @@ -159,27 +153,28 @@ public void recoverCheckpoints() { try { - if (!Files.exists(checkpointDir)) + Storage storage = StorageFactory.Instance().getStorage(checkpointDir); + if (!storage.exists(checkpointDir)) { - Files.createDirectories(checkpointDir); + storage.mkdirs(checkpointDir); return; } - List allFiles; - try (Stream stream = Files.list(checkpointDir)) - { - allFiles = stream.filter(p -> p.toString().endsWith(".bin")).collect(Collectors.toList()); - } + + List allFiles = storage.listPaths(checkpointDir); + // filter only .bin files + allFiles = allFiles.stream().filter(p -> p.endsWith(".bin")).collect(Collectors.toList()); List gcTimestamps = new ArrayList<>(); - for (Path path : allFiles) + for (String path : allFiles) { - String filename = path.getFileName().toString(); + // use Paths.get().getFileName() to extract filename from path string + String filename = Paths.get(path).getFileName().toString(); if (filename.startsWith("vis_offload_")) { // delete offload checkpoint files when restarting try { - Files.deleteIfExists(path); + storage.delete(path, false); } catch (IOException e) { logger.error("Failed to delete checkpoint file {}", path, e); @@ -191,7 +186,7 @@ public void recoverCheckpoints() gcTimestamps.add(Long.parseLong(filename.replace("vis_gc_", "").replace(".bin", ""))); } catch (Exception e) { - logger.error("Failed to delete checkpoint file {}", path, e); + logger.error("Failed to parse checkpoint timestamp from file {}", path, e); } } } @@ -223,26 +218,32 @@ public void recoverCheckpoints() private void loadCheckpointToCache(long timestamp) { String fileName = "vis_gc_" + timestamp + ".bin"; - Path path = checkpointDir.resolve(fileName); - if (!Files.exists(path)) - { - return; - } + // construct path. Storage expects '/' separator usually, but let's be safe + String path = checkpointDir.endsWith("/") ? checkpointDir + fileName : checkpointDir + "/" + fileName; - try (DataInputStream in = new DataInputStream(new BufferedInputStream(Files.newInputStream(path)))) + try { - int rgCount = in.readInt(); - for (int i = 0; i < rgCount; i++) + Storage storage = StorageFactory.Instance().getStorage(path); + if (!storage.exists(path)) { - long fileId = in.readLong(); - int rgId = in.readInt(); - int len = in.readInt(); - long[] bitmap = new long[len]; - for (int j = 0; j < len; j++) + return; + } + + try (DataInputStream in = storage.open(path)) + { + int rgCount = in.readInt(); + for (int i = 0; i < rgCount; i++) { - bitmap[j] = in.readLong(); + long fileId = in.readLong(); + int rgId = in.readInt(); + int len = in.readInt(); + long[] bitmap = new long[len]; + for (int j = 0; j < len; j++) + { + bitmap[j] = in.readLong(); + } + recoveryCache.put(fileId + "_" + rgId, new RecoveredState(timestamp, bitmap)); } - recoveryCache.put(fileId + "_" + rgId, new RecoveredState(timestamp, bitmap)); } } catch (IOException e) { @@ -307,13 +308,13 @@ public void addVisibility(String filePath) throws RetinaException public long[] queryVisibility(long fileId, int rgId, long timestamp, long transId) throws RetinaException { // [Routing Logic] Only read from disk if the transaction is explicitly registered as Offload - if (transId != -1 && offloadedTransIds.contains(transId)) + if (transId != -1) { if (offloadedCheckpoints.containsKey(timestamp)) { return loadBitmapFromDisk(timestamp, fileId, rgId); } - logger.warn("Offloaded checkpoint missing for TransID {}, falling back to memory.", transId); + throw new RetinaException("Offloaded checkpoint missing for TransID" + transId); } // otherwise read from memory RGVisibility rgVisibility = checkRGVisibility(fileId, rgId); @@ -337,11 +338,10 @@ public long[] queryVisibility(long fileId, int rgId, long timestamp) throws Reti * Therefore, even if checkpoints are created under the same timestamp * and only one copy is retained, this has virtually no impact on queries. * - * @param transId * @param timestamp * @throws RetinaException */ - public void registerOffload(long transId, long timestamp) throws RetinaException + public void registerOffload(long timestamp) throws RetinaException { while (true) { @@ -361,7 +361,7 @@ public void registerOffload(long transId, long timestamp) throws RetinaException { if (!offloadedCheckpoints.containsKey(timestamp)) { - createDiskCheckpoint(timestamp, CheckpointType.OFFLOAD); + createCheckpoint(timestamp, CheckpointType.OFFLOAD); } } catch (Exception e) { @@ -370,84 +370,68 @@ public void registerOffload(long transId, long timestamp) throws RetinaException } } } - offloadedTransIds.add(transId); - logger.info("Registered offload for TransID: {}, Timestamp: {}", transId, timestamp); + logger.info("Registered offload for Timestamp: {}", timestamp); return; } } - public void unregisterOffload(long transId, long timestamp) + public void unregisterOffload(long timestamp) { - if (offloadedTransIds.remove(transId)) + AtomicInteger refCount = checkpointRefCounts.get(timestamp); + if (refCount != null) { - AtomicInteger refCount = checkpointRefCounts.get(timestamp); - if (refCount != null) + synchronized (refCount) { - synchronized (refCount) + int remaining = refCount.decrementAndGet(); + if (remaining <= 0) { - int remaining = refCount.decrementAndGet(); - if (remaining <= 0) + offloadedCheckpoints.remove(timestamp); + if (refCount.get() > 0) { - offloadedCheckpoints.remove(timestamp); - if (refCount.get() > 0) - { - logger.info("Checkpoint resurrection detected, skipping deletion. TS: {}", timestamp); - return; - } - removeCheckpointFile(timestamp, CheckpointType.OFFLOAD); - checkpointRefCounts.remove(timestamp); - logger.info("Offload checkpoint for timestamp {} removed.", timestamp); + logger.info("Checkpoint resurrection detected, skipping deletion. TS: {}", timestamp); + return; } + removeCheckpointFile(timestamp, CheckpointType.OFFLOAD); + checkpointRefCounts.remove(timestamp); + logger.info("Offload checkpoint for timestamp {} removed.", timestamp); } } } } - private void createDiskCheckpoint(long timestamp, CheckpointType type) throws RetinaException + private void createCheckpoint(long timestamp, CheckpointType type) throws RetinaException { String prefix = (type == CheckpointType.GC) ? "vis_gc_" : "vis_offload_"; String fileName = prefix + timestamp + ".bin"; - Path filePath = checkpointDir.resolve(fileName); - Path tempPath = checkpointDir.resolve(fileName + ".tmp"); + String filePath = checkpointDir.endsWith("/") ? checkpointDir + fileName : checkpointDir + "/" + fileName; - try (DataOutputStream out = new DataOutputStream(new BufferedOutputStream(Files.newOutputStream(tempPath)))) + try { - int rgCount = this.rgVisibilityMap.size(); - out.writeInt(rgCount); - for (Map.Entry entry : this.rgVisibilityMap.entrySet()) + Storage storage = StorageFactory.Instance().getStorage(filePath); + // Write directly to the final file as atomic move is not supported by all storages (e.g. S3). + // Object stores typically guarantee atomicity of the put operation. + try (DataOutputStream out = storage.create(filePath, true, 4096)) { - String[] parts = entry.getKey().split("_"); - long fileId = Long.parseLong(parts[0]); - int rgId = Integer.parseInt(parts[1]); - long[] bitmap = entry.getValue().getVisibilityBitmap(timestamp); - - out.writeLong(fileId); - out.writeInt(rgId); - out.writeInt(bitmap.length); - for (long l : bitmap) + int rgCount = this.rgVisibilityMap.size(); + out.writeInt(rgCount); + for (Map.Entry entry : this.rgVisibilityMap.entrySet()) { - out.writeLong(l); + String[] parts = entry.getKey().split("_"); + long fileId = Long.parseLong(parts[0]); + int rgId = Integer.parseInt(parts[1]); + long[] bitmap = entry.getValue().getVisibilityBitmap(timestamp); + + out.writeLong(fileId); + out.writeInt(rgId); + out.writeInt(bitmap.length); + for (long l : bitmap) + { + out.writeLong(l); + } } + out.flush(); } - out.flush(); - } catch (IOException e) - { - try - { - Files.deleteIfExists(tempPath); - } catch (IOException ignored) - { - } - throw new RetinaException("Failed to write checkpoint file", e); - } - try - { - StandardCopyOption[] options = new StandardCopyOption[]{ - StandardCopyOption.ATOMIC_MOVE, - StandardCopyOption.REPLACE_EXISTING - }; - Files.move(tempPath, filePath, options); if (type == CheckpointType.OFFLOAD) { offloadedCheckpoints.put(timestamp, filePath); @@ -462,40 +446,56 @@ private void createDiskCheckpoint(long timestamp, CheckpointType type) throws Re } } catch (IOException e) { + // Try to cleanup the potentially corrupted or partial file + try + { + StorageFactory.Instance().getStorage(filePath).delete(filePath, false); + } catch (IOException ignored) + { + } throw new RetinaException("Failed to commit checkpoint file", e); } } private long[] loadBitmapFromDisk(long timestamp, long targetFileId, int targetRgId) throws RetinaException { - Path path = offloadedCheckpoints.get(timestamp); - if (path == null || !Files.exists(path)) + String path = offloadedCheckpoints.get(timestamp); + if (path == null) { throw new RetinaException("Checkpoint missing: " + timestamp); } - try (DataInputStream in = new DataInputStream(new BufferedInputStream(Files.newInputStream(path)))) + try { - int rgCount = in.readInt(); - for (int i = 0; i < rgCount; i++) + Storage storage = StorageFactory.Instance().getStorage(path); + if (!storage.exists(path)) { - long fileId = in.readLong(); - int rgId = in.readInt(); - int len = in.readInt(); - if (fileId == targetFileId && rgId == targetRgId) + throw new RetinaException("Checkpoint file missing: " + path); + } + + try (DataInputStream in = storage.open(path)) + { + int rgCount = in.readInt(); + for (int i = 0; i < rgCount; i++) { - long[] bitmap = new long[len]; - for (int j = 0; j < len; j++) + long fileId = in.readLong(); + int rgId = in.readInt(); + int len = in.readInt(); + if (fileId == targetFileId && rgId == targetRgId) { - bitmap[j] = in.readLong(); - } - return bitmap; - } else - { - int skipped = in.skipBytes(len * 8); - if (skipped != len * 8) + long[] bitmap = new long[len]; + for (int j = 0; j < len; j++) + { + bitmap[j] = in.readLong(); + } + return bitmap; + } else { - throw new IOException("Unexpected EOF"); + int skipped = in.skipBytes(len * 8); + if (skipped != len * 8) + { + throw new IOException("Unexpected EOF"); + } } } } @@ -509,9 +509,12 @@ private long[] loadBitmapFromDisk(long timestamp, long targetFileId, int targetR private void removeCheckpointFile(long timestamp, CheckpointType type) { String prefix = (type == CheckpointType.GC) ? "vis_gc_" : "vis_offload_"; + String fileName = prefix + timestamp + ".bin"; + String path = checkpointDir.endsWith("/") ? checkpointDir + fileName : checkpointDir + "/" + fileName; + try { - Files.deleteIfExists(checkpointDir.resolve(prefix + timestamp + ".bin")); + StorageFactory.Instance().getStorage(path).delete(path, false); } catch (IOException e) { logger.warn("Failed to delete checkpoint file", e); @@ -728,7 +731,7 @@ private void runGC() try { // 1. Persist first - createDiskCheckpoint(timestamp, CheckpointType.GC); + createCheckpoint(timestamp, CheckpointType.GC); // 2. Then clean memory for (Map.Entry entry: this.rgVisibilityMap.entrySet()) { diff --git a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java index b4ad15f998..c41ab36ae9 100644 --- a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java +++ b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java @@ -20,16 +20,16 @@ package io.pixelsdb.pixels.retina; import io.pixelsdb.pixels.common.exception.RetinaException; +import io.pixelsdb.pixels.common.physical.Storage; +import io.pixelsdb.pixels.common.physical.StorageFactory; import io.pixelsdb.pixels.common.utils.ConfigFactory; import org.junit.Before; import org.junit.Test; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.lang.reflect.Field; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardCopyOption; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -44,7 +44,8 @@ public class TestRetinaCheckpoint { private RetinaResourceManager retinaManager; - private Path testCheckpointDir; + private String testCheckpointDir; + private Storage storage; private final long fileId = 1L; private final int rgId = 0; private final int numRows = 1024; @@ -52,49 +53,56 @@ public class TestRetinaCheckpoint @Before public void setUp() throws IOException, RetinaException { - testCheckpointDir = Paths.get(ConfigFactory.Instance().getProperty("pixels.retina.checkpoint.dir")); - if (!Files.exists(testCheckpointDir)) + testCheckpointDir = ConfigFactory.Instance().getProperty("pixels.retina.checkpoint.dir"); + storage = StorageFactory.Instance().getStorage(testCheckpointDir); + + if (!storage.exists(testCheckpointDir)) { - Files.createDirectories(testCheckpointDir); + storage.mkdirs(testCheckpointDir); } else { - Files.list(testCheckpointDir).forEach(p -> { + for (String path : storage.listPaths(testCheckpointDir)) + { try { - Files.deleteIfExists(p); + storage.delete(path, false); } catch (IOException e) { e.printStackTrace(); } - }); + } } retinaManager = RetinaResourceManager.Instance(); retinaManager.addVisibility(fileId, rgId, numRows); } + private String resolve(String dir, String filename) { + return dir.endsWith("/") ? dir + filename : dir + "/" + filename; + } + @Test - public void testRegisterOffload() throws RetinaException + public void testRegisterOffload() throws RetinaException, IOException { long transId = 12345L; long timestamp = 100L; // Register offload - retinaManager.registerOffload(transId, timestamp); + retinaManager.registerOffload(timestamp); // Verify checkpoint file exists - Path expectedFile = testCheckpointDir.resolve("vis_offload_100.bin"); - assertTrue("Offload checkpoint file should exist", Files.exists(expectedFile)); + String expectedFile = resolve(testCheckpointDir, "vis_offload_100.bin"); + assertTrue("Offload checkpoint file should exist", storage.exists(expectedFile)); // Unregister - retinaManager.unregisterOffload(transId, timestamp); + retinaManager.unregisterOffload(timestamp); // File should be removed - assertFalse("Offload checkpoint file should be removed", Files.exists(expectedFile)); + assertFalse("Offload checkpoint file should be removed", storage.exists(expectedFile)); } @Test - public void testMultipleOffloads() throws RetinaException + public void testMultipleOffloads() throws RetinaException, IOException { long transId1 = 12345L; long timestamp1 = 100L; @@ -102,19 +110,19 @@ public void testMultipleOffloads() throws RetinaException long timestamp1_dup = 100L; // same timestamp // Both register the same timestamp - should share checkpoint - retinaManager.registerOffload(transId1, timestamp1); - retinaManager.registerOffload(transId2, timestamp1_dup); + retinaManager.registerOffload(timestamp1); + retinaManager.registerOffload(timestamp1_dup); - Path expectedFile = testCheckpointDir.resolve("vis_offload_100.bin"); - assertTrue("Offload checkpoint file should exist", Files.exists(expectedFile)); + String expectedFile = resolve(testCheckpointDir, "vis_offload_100.bin"); + assertTrue("Offload checkpoint file should exist", storage.exists(expectedFile)); // Unregister one - should not remove yet (ref count >1) - retinaManager.unregisterOffload(transId1, timestamp1); - assertTrue("Offload checkpoint should still exist (ref count >1)", Files.exists(expectedFile)); + retinaManager.unregisterOffload(timestamp1); + assertTrue("Offload checkpoint should still exist (ref count >1)", storage.exists(expectedFile)); // Unregister second - retinaManager.unregisterOffload(transId2, timestamp1); - assertFalse("Offload checkpoint should be removed", Files.exists(expectedFile)); + retinaManager.unregisterOffload(timestamp1); + assertFalse("Offload checkpoint should be removed", storage.exists(expectedFile)); } @Test @@ -132,13 +140,24 @@ public void testCheckpointRecovery() throws RetinaException, IOException assertTrue("Row 10 should be deleted in memory", isBitSet(memBitmap, rowToDelete)); // 2. Register Offload to generate checkpoint file - retinaManager.registerOffload(transId, timestamp); - Path offloadPath = testCheckpointDir.resolve("vis_offload_" + timestamp + ".bin"); - assertTrue("Checkpoint file should exist", Files.exists(offloadPath)); + retinaManager.registerOffload(timestamp); + String offloadPath = resolve(testCheckpointDir, "vis_offload_" + timestamp + ".bin"); + assertTrue("Checkpoint file should exist", storage.exists(offloadPath)); // 3. Rename offload file to GC file to simulate checkpoint generated by GC - Path gcPath = testCheckpointDir.resolve("vis_gc_" + timestamp + ".bin"); - Files.move(offloadPath, gcPath, StandardCopyOption.REPLACE_EXISTING); + String gcPath = resolve(testCheckpointDir, "vis_gc_" + timestamp + ".bin"); + // Storage interface doesn't have renamed, using copy and delete + try (DataInputStream in = storage.open(offloadPath); + DataOutputStream out = storage.create(gcPath, true, 4096)) + { + byte[] buffer = new byte[4096]; + int bytesRead; + while ((bytesRead = in.read(buffer)) != -1) + { + out.write(buffer, 0, bytesRead); + } + } + storage.delete(offloadPath, false); // 4. Reset singleton state (Simulate Crash/Restart) resetSingletonState(); @@ -166,7 +185,7 @@ public void testDiskBitmapQuery() throws RetinaException retinaManager.deleteRecord(fileId, rgId, 5, baseTimestamp); // 2. Register Offload for this transaction (save snapshot at this moment to disk) - retinaManager.registerOffload(transId, baseTimestamp); + retinaManager.registerOffload(baseTimestamp); // 3. Delete row 6 at a later time baseTimestamp + 10 // This only affects the latest state in memory, should not affect the checkpoint on disk @@ -185,7 +204,7 @@ public void testDiskBitmapQuery() throws RetinaException assertTrue("Memory: Row 6 should be deleted", isBitSet(memBitmap, 6)); // Cleanup - retinaManager.unregisterOffload(transId, baseTimestamp); + retinaManager.unregisterOffload(baseTimestamp); } @Test @@ -217,10 +236,10 @@ public void testConcurrency() throws InterruptedException, RetinaException if (j % 3 == 0) { // Register Offload - retinaManager.registerOffload(transId, timestamp); + retinaManager.registerOffload(timestamp); // Verify file exists - Path p = testCheckpointDir.resolve("vis_offload_" + timestamp + ".bin"); - if (!Files.exists(p)) { + String p = resolve(testCheckpointDir, "vis_offload_" + timestamp + ".bin"); + if (!storage.exists(p)) { throw new RuntimeException("Checkpoint file missing after register: " + p); } } @@ -235,7 +254,7 @@ else if (j % 3 == 1) else { // Unregister Offload - retinaManager.unregisterOffload(transId, timestamp); + retinaManager.unregisterOffload(timestamp); } } } catch (Exception e) @@ -276,9 +295,13 @@ private void resetSingletonState() offloadedField.setAccessible(true); ((Map) offloadedField.get(retinaManager)).clear(); - Field transField = RetinaResourceManager.class.getDeclaredField("offloadedTransIds"); - transField.setAccessible(true); - ((Set) transField.get(retinaManager)).clear(); + Field refCountsField = RetinaResourceManager.class.getDeclaredField("checkpointRefCounts"); + refCountsField.setAccessible(true); + ((Map) refCountsField.get(retinaManager)).clear(); + + Field gcTimestampField = RetinaResourceManager.class.getDeclaredField("latestGcTimestamp"); + gcTimestampField.setAccessible(true); + gcTimestampField.setLong(retinaManager, -1L); Field recoveryCacheField = RetinaResourceManager.class.getDeclaredField("recoveryCache"); recoveryCacheField.setAccessible(true); @@ -304,14 +327,6 @@ private boolean assertFalse(String message, boolean condition) return assertTrue(message, !condition); } - private void assertEquals(String message, int expected, int actual) - { - if (expected != actual) - { - throw new AssertionError(message + " expected: " + expected + " actual: " + actual); - } - } - private boolean isBitSet(long[] bitmap, int rowIndex) { if (bitmap == null || bitmap.length == 0) return false; diff --git a/proto/retina.proto b/proto/retina.proto index f2fa45869b..c9c5e47067 100644 --- a/proto/retina.proto +++ b/proto/retina.proto @@ -42,6 +42,10 @@ service RetinaWorkerService { rpc AddWriteBuffer (AddWriteBufferRequest) returns (AddWriteBufferResponse); // Get a unified view of the data in the writer buffer rpc GetWriteBuffer (GetWriteBufferRequest) returns (GetWriteBufferResponse); + // Register offload for long-running queries + rpc RegisterOffload (RegisterOffloadRequest) returns (RegisterOffloadResponse); + // Unregister offload when query completes + rpc UnregisterOffload (UnregisterOffloadRequest) returns (UnregisterOffloadResponse); } // header @@ -168,3 +172,23 @@ message GetWriteBufferResponse { repeated int64 ids = 3; repeated VisibilityBitmap bitmaps = 4; } + +// register offload +message RegisterOffloadRequest { + RequestHeader header = 1; + uint64 timestamp = 2; +} + +message RegisterOffloadResponse { + ResponseHeader header = 1; +} + +// unregister offload +message UnregisterOffloadRequest { + RequestHeader header = 1; + uint64 timestamp = 2; +} + +message UnregisterOffloadResponse { + ResponseHeader header = 1; +} diff --git a/proto/transaction.proto b/proto/transaction.proto index bc663eabc4..10f2a24971 100644 --- a/proto/transaction.proto +++ b/proto/transaction.proto @@ -44,6 +44,7 @@ service TransService { rpc BindExternalTraceId (BindExternalTraceIdRequest) returns (BindExternalTraceIdResponse); rpc DumpTrans (DumpTransRequest) returns (DumpTransResponse); rpc GetSafeGcTimestamp(google.protobuf.Empty) returns (GetSafeGcTimestampResponse); + rpc MarkTransOffloaded (MarkTransOffloadedRequest) returns (MarkTransOffloadedResponse); } // begin request/response definition for rpc services @@ -192,4 +193,12 @@ message GetSafeGcTimestampResponse { uint64 timestamp = 2; } -// end request/response definition for rpc services \ No newline at end of file +message MarkTransOffloadedRequest { + uint64 transId = 1; +} + +message MarkTransOffloadedResponse { + int32 errorCode = 1; +} + +// end request/response definition for rpc services