Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -407,4 +407,62 @@ public boolean addWriteBuffer(String schemaName, String tableName) throws Retina
}
return true;
}

/**
* Register a long-running query to be offloaded to disk checkpoint.
*
* @param transId the transaction id
* @param timestamp the transaction timestamp
* @return true on success
* @throws RetinaException if the operation fails
*/
public boolean registerOffload(long transId, long timestamp) throws RetinaException
{
String token = UUID.randomUUID().toString();
RetinaProto.RegisterOffloadRequest request = RetinaProto.RegisterOffloadRequest.newBuilder()
.setHeader(RetinaProto.RequestHeader.newBuilder().setToken(token).build())
.setTransId(transId)
.setTimestamp(timestamp)
.build();
RetinaProto.RegisterOffloadResponse response = this.stub.registerOffload(request);
if (response.getHeader().getErrorCode() != 0)
{
throw new RetinaException("Failed to register offload: " + response.getHeader().getErrorCode()
+ " " + response.getHeader().getErrorMsg());
}
if (!response.getHeader().getToken().equals(token))
{
throw new RetinaException("Response token does not match");
}
return true;
}

/**
* Unregister a long-running query's offload checkpoint when the query completes.
*
* @param transId the transaction id
* @param timestamp the transaction timestamp
* @return true on success
* @throws RetinaException if the operation fails
*/
public boolean unregisterOffload(long transId, long timestamp) throws RetinaException
{
String token = UUID.randomUUID().toString();
RetinaProto.UnregisterOffloadRequest request = RetinaProto.UnregisterOffloadRequest.newBuilder()
.setHeader(RetinaProto.RequestHeader.newBuilder().setToken(token).build())
.setTransId(transId)
.setTimestamp(timestamp)
.build();
RetinaProto.UnregisterOffloadResponse response = this.stub.unregisterOffload(request);
if (response.getHeader().getErrorCode() != 0)
{
throw new RetinaException("Failed to unregister offload: " + response.getHeader().getErrorCode()
+ " " + response.getHeader().getErrorMsg());
}
if (!response.getHeader().getToken().equals(token))
{
throw new RetinaException("Response token does not match");
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public class TransContext implements Comparable<TransContext>
private final boolean readOnly;
private final AtomicReference<TransProto.TransStatus> status;
private final Properties properties;
private final long startTime;
private volatile boolean isOffloaded;

public TransContext(long transId, long timestamp, boolean readOnly)
Expand All @@ -49,7 +48,6 @@ public TransContext(long transId, long timestamp, boolean readOnly)
this.readOnly = readOnly;
this.status = new AtomicReference<>(TransProto.TransStatus.PENDING);
this.properties = new Properties();
this.startTime = System.currentTimeMillis();
this.isOffloaded = false;
}

Expand All @@ -61,15 +59,9 @@ public TransContext(TransProto.TransContext contextPb)
this.status = new AtomicReference<>(contextPb.getStatus());
this.properties = new Properties();
this.properties.putAll(contextPb.getPropertiesMap());
this.startTime = System.currentTimeMillis();
this.isOffloaded = false;
}

public long getStartTime()
{
return startTime;
}

public boolean isOffloaded()
{
return isOffloaded;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,4 +432,44 @@ public long getSafeGcTimestamp() throws TransException
}
return response.getTimestamp();
}

/**
* Push the watermark for garbage collection. This is called after a long-running query
* is offloaded to allow the garbage collection to proceed.
*
* @param readOnly true for read-only transactions (low watermark), false for write transactions (high watermark)
* @return true on success
* @throws TransException if the operation fails
*/
public boolean pushWatermark(boolean readOnly) throws TransException
{
TransProto.PushWatermarkRequest request = TransProto.PushWatermarkRequest.newBuilder()
.setReadOnly(readOnly).build();
TransProto.PushWatermarkResponse response = this.stub.pushWatermark(request);
if (response.getErrorCode() != ErrorCode.SUCCESS)
{
throw new TransException("failed to push watermark, error code=" + response.getErrorCode());
}
return true;
}

/**
* Mark a transaction as offloaded. This allows the transaction to be skipped when
* calculating the minimum running transaction timestamp for garbage collection.
*
* @param transId the id of the transaction to mark as offloaded
* @return true on success
* @throws TransException if the operation fails
*/
public boolean markTransOffloaded(long transId) throws TransException
{
TransProto.MarkTransOffloadedRequest request = TransProto.MarkTransOffloadedRequest.newBuilder()
.setTransId(transId).build();
TransProto.MarkTransOffloadedResponse response = this.stub.markTransOffloaded(request);
if (response.getErrorCode() != ErrorCode.SUCCESS)
{
throw new TransException("failed to mark transaction as offloaded, error code=" + response.getErrorCode());
}
return true;
}
}
2 changes: 1 addition & 1 deletion pixels-common/src/main/resources/pixels.properties
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ retina.gc.interval=300
# offloading threshold for long query in seconds
pixels.transaction.offload.threshold=1800
# snapshot storage directory
pixels.retina.checkpoint.dir=/tmp/pixels-checkpoints
pixels.retina.checkpoint.dir=file:///tmp/pixels-checkpoints

### pixels-sink ###
sink.server.enabled=false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,56 @@ public void getWriteBuffer(RetinaProto.GetWriteBufferRequest request,
}
}

@Override
public void registerOffload(RetinaProto.RegisterOffloadRequest request,
StreamObserver<RetinaProto.RegisterOffloadResponse> responseObserver)
{
RetinaProto.ResponseHeader.Builder headerBuilder = RetinaProto.ResponseHeader.newBuilder()
.setToken(request.getHeader().getToken());

try
{
this.retinaResourceManager.registerOffload(request.getTransId(), request.getTimestamp());
responseObserver.onNext(RetinaProto.RegisterOffloadResponse.newBuilder()
.setHeader(headerBuilder.build()).build());
} catch (RetinaException e)
{
logger.error("registerOffload failed for transId={}, timestamp={}",
request.getTransId(), request.getTimestamp(), e);
headerBuilder.setErrorCode(1).setErrorMsg(e.getMessage());
responseObserver.onNext(RetinaProto.RegisterOffloadResponse.newBuilder()
.setHeader(headerBuilder.build()).build());
} finally
{
responseObserver.onCompleted();
}
}

@Override
public void unregisterOffload(RetinaProto.UnregisterOffloadRequest request,
StreamObserver<RetinaProto.UnregisterOffloadResponse> responseObserver)
{
RetinaProto.ResponseHeader.Builder headerBuilder = RetinaProto.ResponseHeader.newBuilder()
.setToken(request.getHeader().getToken());

try
{
this.retinaResourceManager.unregisterOffload(request.getTransId(), request.getTimestamp());
responseObserver.onNext(RetinaProto.UnregisterOffloadResponse.newBuilder()
.setHeader(headerBuilder.build()).build());
} catch (Exception e)
{
logger.error("unregisterOffload failed for transId={}, timestamp={}",
request.getTransId(), request.getTimestamp(), e);
headerBuilder.setErrorCode(1).setErrorMsg(e.getMessage());
responseObserver.onNext(RetinaProto.UnregisterOffloadResponse.newBuilder()
.setHeader(headerBuilder.build()).build());
} finally
{
responseObserver.onCompleted();
}
}

/**
* Check if the order or compact paths from pixels metadata is valid.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.pixelsdb.pixels.common.transaction.TransContext;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
import io.pixelsdb.pixels.daemon.TransProto;
import io.pixelsdb.pixels.retina.RetinaResourceManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -75,8 +74,6 @@ protected static TransContextManager Instance()

private final ReadWriteLock contextLock = new ReentrantReadWriteLock();

private static final long OFFLOAD_THRESHOLD = Long.parseLong(ConfigFactory.Instance().getProperty("pixels.transaction.offload.threshold"));

private TransContextManager() { }

/**
Expand Down Expand Up @@ -218,20 +215,7 @@ private boolean terminateTrans(long transId, TransProto.TransStatus status)
* Adding the same lock in {@link #offloadLongRunningQueries()}
* constitutes a mutually exclusive critical section.
*/
synchronized (context)
{
context.setStatus(status);
if (context.isOffloaded())
{
try
{
RetinaResourceManager.Instance().unregisterOffload(context.getTransId(), context.getTimestamp());
} catch (Exception e)
{
log.error("Unregister failed", e);
}
}
}
context.setStatus(status);

if (context.isReadOnly())
{
Expand All @@ -254,52 +238,6 @@ private boolean terminateTrans(long transId, TransProto.TransStatus status)
return false;
}

/**
* Offload long-running queries to disk.
*/
public void offloadLongRunningQueries()
{
long now = System.currentTimeMillis();
boolean pushed = false;

for (TransContext ctx : runningReadOnlyTrans)
{
if (ctx.isOffloaded())
{
continue;
}

if ((now - ctx.getStartTime()) > OFFLOAD_THRESHOLD)
{
try
{
// 1. Register and generate snapshot
RetinaResourceManager.Instance().registerOffload(ctx.getTransId(), ctx.getTimestamp());

// 2. Double-checked locking
synchronized (ctx)
{
if (ctx.getStatus() == TransProto.TransStatus.PENDING)
{
ctx.setOffloaded(true);
pushed = true;
} else
{
// Transaction has ended, rollback registration
RetinaResourceManager.Instance().unregisterOffload(ctx.getTransId(), ctx.getTimestamp());
}
}
} catch (Exception e)
{
log.error("Failed to offload transaction {}", ctx.getTransId(), e);
}
}
}
if (pushed)
{
TransServiceImpl.pushWatermarks(true);
}
}

/**
* Dump the context of transactions in this manager to a history file and remove terminated transactions. This method
Expand Down Expand Up @@ -451,4 +389,22 @@ public int getQueryConcurrency(boolean readOnly)
this.contextLock.writeLock().unlock();
}
}

/**
* Mark a transaction as offloaded. This allows the transaction context manager to
* skip it when calculating the minimum running transaction timestamp.
*
* @param transId the transaction id
* @return true if the transaction exists and was marked as offloaded, false otherwise
*/
public boolean markTransOffloaded(long transId)
{
TransContext context = this.transIdToContext.get(transId);
if (context != null)
{
context.setOffloaded(true);
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,6 @@ public class TransServiceImpl extends TransServiceGrpc.TransServiceImplBase

public TransServiceImpl()
{
/*
* Initiate a background monitoring thread to periodically (every 5 minutes)
* trigger the detection and offloading process for long-running queries,
* thereby ensuring the persistent release of blockages on the Low Watermarks
* and guaranteeing the proper functioning of the garbage collection mechanism.
*/
ScheduledExecutorService offloadScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "trans-offload-monitor");
t.setDaemon(true);
return t;
});
offloadScheduler.scheduleAtFixedRate(() ->
TransContextManager.Instance().offloadLongRunningQueries(), 5, 5, TimeUnit.MINUTES);
}

@Override
Expand Down Expand Up @@ -497,4 +484,33 @@ public void getSafeGcTimestamp(com.google.protobuf.Empty request,
responseObserver.onNext(response);
responseObserver.onCompleted();
}

@Override
public void pushWatermark(TransProto.PushWatermarkRequest request,
StreamObserver<TransProto.PushWatermarkResponse> responseObserver)
{
pushWatermarks(request.getReadOnly());
TransProto.PushWatermarkResponse response = TransProto.PushWatermarkResponse.newBuilder()
.setErrorCode(ErrorCode.SUCCESS).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}

@Override
public void markTransOffloaded(TransProto.MarkTransOffloadedRequest request,
StreamObserver<TransProto.MarkTransOffloadedResponse> responseObserver)
{
int error = ErrorCode.SUCCESS;
boolean success = TransContextManager.Instance().markTransOffloaded(request.getTransId());
if (!success)
{
logger.error("transaction id {} does not exist or failed to mark as offloaded", request.getTransId());
error = ErrorCode.TRANS_ID_NOT_EXIST;
}

TransProto.MarkTransOffloadedResponse response = TransProto.MarkTransOffloadedResponse.newBuilder()
.setErrorCode(error).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
Loading