Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ public class ErrorCode
public static final int TRANS_INVALID_ARGUMENT = (ERROR_TRANS + 8);
public static final int TRANS_BATCH_PARTIAL_ID_NOT_EXIST = (ERROR_TRANS + 9);
public static final int TRANS_BATCH_PARTIAL_COMMIT_FAILED = (ERROR_TRANS + 10);
public static final int TRANS_EXTEND_LEASE_FAILED = (ERROR_TRANS + 11);
public static final int TRANS_BATCH_EXTEND_LEASE_FAILED = (ERROR_TRANS + 12);
public static final int TRANS_LEASE_EXPIRED = (ERROR_TRANS + 13);

// begin error code for query schedule
private static final int ERROR_QUERY_SCHEDULE = ERROR_BASE + 300;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private SinglePointIndexFactory()
requireNonNull(value, "enabled.single.point.index.schemes is not configured");
String[] schemeNames = value.trim().split(",");
checkArgument(schemeNames.length > 0,
"at lease one single point index scheme must be enabled");
"at least one single point index scheme must be enabled");

ImmutableMap.Builder<SinglePointIndex.Scheme, SinglePointIndexProvider> providersBuilder = ImmutableMap.builder();
ServiceLoader<SinglePointIndexProvider> providerLoader = ServiceLoader.load(SinglePointIndexProvider.class);
Expand Down
122 changes: 122 additions & 0 deletions pixels-common/src/main/java/io/pixelsdb/pixels/common/lease/Lease.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2023 PixelsDB.
*
* This file is part of Pixels.
*
* Pixels is free software: you can redistribute it and/or modify
* it under the terms of the Affero GNU General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Pixels is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Affero GNU General Public License for more details.
*
* You should have received a copy of the Affero GNU General Public
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.common.lease;

import io.pixelsdb.pixels.common.exception.InvalidArgumentException;

import java.util.concurrent.atomic.AtomicLong;

import static io.pixelsdb.pixels.common.utils.Constants.*;

/**
* @author hank
* @create 2023-07-29
* @update 2025-12-03 make {@link #startMs} atomic and add {@link Role}.
*/
public class Lease
{
private final long periodMs;
private final AtomicLong startMs;

public Lease(long startMs, long periodMs)
{
this.startMs = new AtomicLong(startMs);
this.periodMs = periodMs;
}

public long getPeriodMs()
{
return periodMs;
}

public long getStartMs()
{
return startMs.get();
}

/**
* Update the start times of the lease. The new start time must be larger than the current start time of the lease.
* @param newStartMs the new start time
* @return true if the new start time is set successfully, false if it is not larger than the current start time
*/
public boolean updateStartMs(long newStartMs)
{
long currStartMs;
do
{
currStartMs = this.startMs.get();
if (newStartMs <= currStartMs)
{
return false;
}
}
while (!this.startMs.compareAndSet(currStartMs, newStartMs));
return true;
}

/**
* This method can be called by lease assigner and holder.
* Note that for leaseholder, even if this method returns false, the lease may be expired. Leaseholder can use
* {@link #expiring(long, Role)} to check if the lease is likely to be expiring.
* @param currentTimeMs the current time in microseconds
* @param role the role of the caller
* @return for lease assigner, true if the lease has expired;
* for leaseholder, true if the lease is certain to be expired
* @throws InvalidArgumentException if the role is not {@link Role#Holder} or {@link Role#Assigner}
*/
public boolean hasExpired(long currentTimeMs, Role role)
{
if (role == Role.Assigner)
{
return currentTimeMs - this.startMs.get() > this.periodMs;
}
else if (role == Role.Holder)
{
return currentTimeMs - this.startMs.get() > this.periodMs + LEASE_TIME_SKEW_MS;
}
else
{
throw new InvalidArgumentException("invalid lease role " + role.name());
}
}

/**
* This method can be only called by leaseholder to check if the lease is expiring.
* @param currentTimeMs the current time in microseconds
* @param role the role of the caller
* @return true if the lease is expiring
* @throws InvalidArgumentException if the role is not {@link Role#Holder}
*/
public boolean expiring(long currentTimeMs, Role role)
{
if (role == Role.Holder)
{
return currentTimeMs - this.startMs.get() >
this.periodMs * LEASE_EXPIRING_THRESHOLD - LEASE_TIME_SKEW_MS - LEASE_NETWORK_LATENCY_MS;
}
throw new InvalidArgumentException("mayExpire should only be called by leaseholder");
}

public enum Role
{
Assigner, // the role that assigns the lease
Holder // the role that applies and holds the lease
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.alibaba.fastjson.JSON;
import io.pixelsdb.pixels.common.exception.WorkerCoordinateException;
import io.pixelsdb.pixels.common.lease.Lease;

import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -63,6 +64,7 @@ public WI getWorkerInfo()
}

/**
* This method should only be called on the coordinator.
* @return true if this worker has a valid lease
*/
public boolean isAlive()
Expand All @@ -74,7 +76,7 @@ public boolean isAlive()
return false;
}
long currentTimeMs = System.currentTimeMillis();
return !this.lease.hasExpired(currentTimeMs);
return !this.lease.hasExpired(currentTimeMs, Lease.Role.Assigner);
}
}

Expand All @@ -87,7 +89,7 @@ public void terminate()
}

/**
* Extent the lease of this worker if the worker is alive.
* Extent the lease of this worker if the worker is alive. This method should only be called on the coordinator.
* @return the new start time (milliseconds since the Unix epoch) of the extended lease
* @throws WorkerCoordinateException if the worker is terminated or the lease has already expired
*/
Expand All @@ -96,11 +98,14 @@ public long extendLease() throws WorkerCoordinateException
synchronized (this.lease)
{
long currentTimeMs = System.currentTimeMillis();
if (this.terminated || this.lease.hasExpired(currentTimeMs))
if (this.terminated || this.lease.hasExpired(currentTimeMs, Lease.Role.Assigner))
{
throw new WorkerCoordinateException("worker is not alive, can not extend the lease");
}
this.lease.setStartTimeMs(currentTimeMs);
if (!this.lease.updateStartMs(currentTimeMs))
{
throw new WorkerCoordinateException("the new start time must be later than the current lease start time");
}
return currentTimeMs;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package io.pixelsdb.pixels.common.transaction;

import io.pixelsdb.pixels.common.lease.Lease;
import io.pixelsdb.pixels.daemon.TransProto;

import java.util.Map;
Expand All @@ -39,13 +40,15 @@ public class TransContext implements Comparable<TransContext>
private final boolean readOnly;
private final AtomicReference<TransProto.TransStatus> status;
private final Properties properties;
private final Lease lease;
private final long startTime;
private volatile boolean isOffloaded;

public TransContext(long transId, long timestamp, boolean readOnly)
public TransContext(long transId, long timestamp, long leaseStartMs, long leasePeriodMs, boolean readOnly)
{
this.transId = transId;
this.timestamp = timestamp;
this.lease = new Lease(leaseStartMs, leasePeriodMs);
this.readOnly = readOnly;
this.status = new AtomicReference<>(TransProto.TransStatus.PENDING);
this.properties = new Properties();
Expand All @@ -61,6 +64,7 @@ public TransContext(TransProto.TransContext contextPb)
this.status = new AtomicReference<>(contextPb.getStatus());
this.properties = new Properties();
this.properties.putAll(contextPb.getPropertiesMap());
this.lease = new Lease(contextPb.getLeaseStartMs(), contextPb.getLeasePeriodMs());
this.startTime = System.currentTimeMillis();
this.isOffloaded = false;
}
Expand Down Expand Up @@ -110,6 +114,11 @@ public Properties getProperties()
return this.properties;
}

public Lease getLease()
{
return lease;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package io.pixelsdb.pixels.common.transaction;

import io.pixelsdb.pixels.common.lease.Lease;
import io.pixelsdb.pixels.daemon.TransProto;

import java.util.Map;
Expand Down Expand Up @@ -92,9 +93,19 @@ protected void setTransRollback(long transId)

public boolean isTerminated(long transId)
{
TransContext info = this.transIdToContext.get(transId);
return info == null || info.getStatus() == TransProto.TransStatus.COMMIT ||
info.getStatus() == TransProto.TransStatus.ROLLBACK;
TransContext context = this.transIdToContext.get(transId);
return context == null || context.getStatus() == TransProto.TransStatus.COMMIT ||
context.getStatus() == TransProto.TransStatus.ROLLBACK;
}

/**
* @param transId the transaction id
* @return the lease of the transaction, or null if the transaction context does not exist in the cache
*/
public Lease getTransLease(long transId)
{
TransContext context = this.transIdToContext.get(transId);
return context == null ? null : context.getLease();
}

/**
Expand Down
Loading