Skip to content
Open
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
9368327
Add cursor based optimized compaction path (WIP)
Jun 22, 2025
759dc2c
Merge branch 'trunk' into compaction-work-pr-prep
nitsanw Sep 29, 2025
d75d59c
Fix MEGABYTE constant
Oct 9, 2025
35ea514
Fix 0x11/0x10 should be 0b11/0b10
Oct 14, 2025
33b9e47
Introduce ENABLE_CURSOR_COMPACTION controls via CassandraRelevantProp…
Oct 14, 2025
d46511d
Revert `matched` comments left to track stats tracking
Oct 14, 2025
b417274
Revert change to RandomAccessReader and match row skipping logic from…
Oct 14, 2025
85379f1
Improve CompactionCursor javadoc
Oct 14, 2025
097b995
Extract `isSupported` from constructor
Oct 14, 2025
1af0f0e
Fix Trasnactions/Transformations
Oct 15, 2025
7576864
Add comment and rename `sortForPartitionMerge` -> `prepareForPartitio…
Oct 15, 2025
2205e48
Typo: `preturbed` -> `perturbed`
Oct 15, 2025
dea15c9
Javadoc for bubbleInsertToPreSorted and minor refactor
Oct 15, 2025
5d200ef
Typo: passed -> past
Oct 15, 2025
1a1c761
Remove redundant TODOs
Oct 15, 2025
b9dc6db
Revert 'unused' params
Oct 20, 2025
5accdd5
Rename `ElementDescriptor` -> `UnfilteredDescriptor` (and fallout)
Oct 20, 2025
17be5f4
Remove unused parameter
Oct 20, 2025
3b0f0a9
Remove unused method
Oct 20, 2025
ae65b7e
Fix indentation
Oct 20, 2025
62837f9
Move SSTableCursorPipeUtil to benchmarks
Oct 20, 2025
4064d32
Rename `partitionLength` back to `finishResult` and clarify comment
Oct 20, 2025
797ace1
Remove unused methods
Oct 20, 2025
855a740
Improve bubbleInsertElementToPreSorted, delay element insert
Oct 21, 2025
beef9ad
Remove redundant cursor status check
Oct 21, 2025
b19b959
Simplify deletion merging loop, clarify partitionDeletion variable names
Oct 21, 2025
f5718b3
Neaten up SSTableCursorReader
Oct 22, 2025
87a1c1c
Dead code removal
Oct 22, 2025
ff5eff1
Revert making classes public
Oct 22, 2025
ed48fe5
Transform LivenessInfo an interface
Oct 22, 2025
a8bc083
Fix javadoc
Oct 22, 2025
be8b4e3
Fix intellij warnings
Oct 22, 2025
011213a
Explicitly split DeletionTime implementations
Oct 29, 2025
4d74fce
Rely on nextElementEquality in findMergeLimit
Oct 30, 2025
b9f5802
Refactor prepareAndSortForMerge code
Oct 30, 2025
308a3b3
Move merge limit == 0 out of mergeRows
Oct 31, 2025
562e437
Add TODO for clustering read/skip
Oct 31, 2025
37104f6
Simplify ClusteringComparator code and remove redundant code
Nov 4, 2025
d7abcb6
Refactor ClusteringDescriptor for clarity and fix MetadataCollector i…
Nov 4, 2025
8e24365
Add comment regarding boundary
Nov 4, 2025
bc27a3d
Split checkNextFlags by context and tidy-up
Nov 4, 2025
16f2eee
Use RuntimeException instead of ISE
Nov 5, 2025
80007eb
Improve CompactionCursor javadoc
Nov 5, 2025
cf77915
Remove "system" table check
Nov 5, 2025
ff8ab58
Check output format for `isSupported`, improve unsupported reason log…
Nov 5, 2025
647f3b6
Move partition start out of maybePurgedOutputDeletion
Nov 5, 2025
35af776
Improve prepareAndSortForPartitionMerge javadoc, avoid repeated reset…
Nov 5, 2025
2e8dff9
Change option name enable_cursor_compaction -> cursor_compaction_enabled
Nov 6, 2025
d2b23d6
Add option cursor_compaction_enabled to testlist-oa
Nov 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ public enum CassandraRelevantProperties
DTEST_IS_IN_JVM_DTEST("org.apache.cassandra.dtest.is_in_jvm_dtest"),
/** In_JVM dtest property indicating that the test should use "latest" configuration */
DTEST_JVM_DTESTS_USE_LATEST("jvm_dtests.latest"),
ENABLE_CURSOR_COMPACTION("cassandra.enable_cursor_compaction", "true"),
ENABLE_DC_LOCAL_COMMIT("cassandra.enable_dc_local_commit", "true"),
/**
* Whether {@link org.apache.cassandra.db.ConsistencyLevel#NODE_LOCAL} should be allowed.
Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import static org.apache.cassandra.config.CassandraRelevantProperties.AUTOCOMPACTION_ON_STARTUP_ENABLED;
import static org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_AVAILABLE_PROCESSORS;
import static org.apache.cassandra.config.CassandraRelevantProperties.ENABLE_CURSOR_COMPACTION;
import static org.apache.cassandra.config.CassandraRelevantProperties.FILE_CACHE_ENABLED;
import static org.apache.cassandra.config.CassandraRelevantProperties.SKIP_PAXOS_REPAIR_ON_TOPOLOGY_CHANGE;
import static org.apache.cassandra.config.CassandraRelevantProperties.SKIP_PAXOS_REPAIR_ON_TOPOLOGY_CHANGE_KEYSPACES;
Expand Down Expand Up @@ -644,6 +645,8 @@ public static class SSTableConfig
@Replaces(oldName = "enable_drop_compact_storage", converter = Converters.IDENTITY, deprecated = true)
public volatile boolean drop_compact_storage_enabled = false;

public boolean enable_cursor_compaction = ENABLE_CURSOR_COMPACTION.getBoolean();

public volatile boolean use_statements_enabled = true;

/**
Expand Down
11 changes: 11 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -4642,6 +4642,17 @@ public static void setTransientReplicationEnabledUnsafe(boolean enabled)
conf.transient_replication_enabled = enabled;
}

public static boolean enableCursorCompaction()
{
return conf.enable_cursor_compaction;
}

@VisibleForTesting
public static void setEnableCursorCompaction(boolean enable_cursor_compaction)
{
conf.enable_cursor_compaction = enable_cursor_compaction;
}

public static boolean enableDropCompactStorage()
{
return conf.drop_compact_storage_enabled;
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/Clustering.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public String toString(TableMetadata metadata)
/**
* Serializer for Clustering object.
* <p>
* Because every clustering in a given table must have the same size (ant that size cannot actually change once the table
* Because every clustering in a given table must have the same size (and that size cannot actually change once the table
* has been defined), we don't record that size.
*/
public static class Serializer
Expand Down
129 changes: 126 additions & 3 deletions src/java/org/apache/cassandra/db/ClusteringComparator.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;

import org.apache.cassandra.io.sstable.ClusteringDescriptor;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.db.marshal.ValueAccessor;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.serializers.MarshalException;

import org.apache.cassandra.io.sstable.IndexInfo;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import org.apache.cassandra.utils.bytecomparable.ByteSource;
import org.apache.cassandra.utils.vint.VIntCoding;

import static org.apache.cassandra.utils.bytecomparable.ByteSource.EXCLUDED;
import static org.apache.cassandra.utils.bytecomparable.ByteSource.NEXT_COMPONENT;
Expand Down Expand Up @@ -156,6 +159,126 @@ public <V1, V2> int compare(ClusteringPrefix<V1> c1, ClusteringPrefix<V2> c2)
return s1 < s2 ? c1.kind().comparedToClustering : -c2.kind().comparedToClustering;
}

public static int compare(ClusteringDescriptor c1, ClusteringDescriptor c2)
{
final int c1Size = c1.clusteringColumnsBound();
final int c2Size = c2.clusteringColumnsBound();
final int minColumns = Math.min(c1Size, c2Size);

final int cmp = compare(c1.clusteringTypes(), c1.clusteringBuffer(), c2.clusteringBuffer(), minColumns);
if (cmp != 0)
return cmp;

final ClusteringPrefix.Kind c1Kind = c1.clusteringKind();
final ClusteringPrefix.Kind c2Kind = c2.clusteringKind();
if (c1Size == c2Size)
{
return ClusteringPrefix.Kind.compare(c1Kind, c2Kind);
}

return c1Size < c2Size ? c1Kind.comparedToClustering : -c2Kind.comparedToClustering;
}

public static int compare(AbstractType<?>[] types, ByteBuffer c1, ByteBuffer c2) {
return compare(types, c1, c2, types.length);
}

private static int compare(AbstractType<?>[] types, ByteBuffer c1, ByteBuffer c2, int size)
{
long clusteringBlock1 = 0;
long clusteringBlock2 = 0;
final int position1 = c1.position();
final int position2 = c2.position();
final int limit1 = c1.limit();
final int limit2 = c2.limit();
try
{
int ofst1 = position1;
int ofst2 = position2;
for (int clusteringIndex = 0; clusteringIndex < size; clusteringIndex++)
{
if (clusteringIndex % 32 == 0)
{
clusteringBlock1 = VIntCoding.getUnsignedVInt(c1, ofst1, limit1);
ofst1 += VIntCoding.computeUnsignedVIntSize(clusteringBlock1);
clusteringBlock2 = VIntCoding.getUnsignedVInt(c2, ofst2, limit2);
ofst2 += VIntCoding.computeUnsignedVIntSize(clusteringBlock2);
}

AbstractType<?> type = types[clusteringIndex];

boolean v1Present = (clusteringBlock1 & 0b11) == 0;
boolean v2Present = (clusteringBlock2 & 0b11) == 0;

if (v1Present && v2Present)
{
boolean isByteOrderComparable = type.isByteOrderComparable;
int vlen1,vlen2;
if (type.isValueLengthFixed())
{
vlen1 = vlen2 = type.valueLengthIfFixed();
}
else
{
vlen1 = VIntCoding.getUnsignedVInt32(c1, ofst1, limit1);
ofst1 += VIntCoding.computeUnsignedVIntSize(vlen1);
vlen2 = VIntCoding.getUnsignedVInt32(c2, ofst2, limit2);
ofst2 += VIntCoding.computeUnsignedVIntSize(vlen2);
}
int v1Limit = ofst1 + vlen1;
if (v1Limit > limit1)
throw new IllegalArgumentException("Value limit exceeds buffer limit.");
c1.position(ofst1).limit(v1Limit);
int v2Limit = ofst2 + vlen2;
if (v2Limit > limit2)
throw new IllegalArgumentException("Value limit exceeds buffer limit.");
c2.position(ofst2).limit(v2Limit);
int cmp = isByteOrderComparable ?
ByteBufferUtil.compareUnsigned(c1, c2) :
type.compareCustom(c1, ByteBufferAccessor.instance, c2, ByteBufferAccessor.instance);
if (cmp != 0)
return cmp;
c1.limit(limit1);
c2.limit(limit2);
ofst1 += vlen1;
ofst2 += vlen2;
}
// present > not present
else if (v1Present && !v2Present)
{
return 1;
}
else if (!v1Present && v2Present)
{
return -1;
}
else
{
boolean v1Null = (clusteringBlock1 & 0b10) == 0;
boolean v2Null = (clusteringBlock2 & 0b10) == 0;
// empty > null
if (!v1Null && v2Null)
{
return 1;
}
else if (v1Null && !v2Null)
{
return -1;
}
// empty == empty, continue...
}
clusteringBlock1 = clusteringBlock1 >>> 2;
clusteringBlock2 = clusteringBlock2 >>> 2;
}
}
finally
{
c1.position(position1).limit(limit1);
c2.position(position2).limit(limit2);
}
return 0;
}

public <V1, V2> int compare(Clustering<V1> c1, Clustering<V2> c2)
{
return compare(c1, c2, size());
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/ClusteringPrefix.java
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ <V> long valuesWithoutSizeSerializedSize(ClusteringPrefix<V> clustering, int ver
return result;
}

byte[][] deserializeValuesWithoutSize(DataInputPlus in, int size, int version, List<AbstractType<?>> types) throws IOException
public byte[][] deserializeValuesWithoutSize(DataInputPlus in, int size, int version, List<AbstractType<?>> types) throws IOException
{
// Callers of this method should handle the case where size = 0 (in all case we want to return a special value anyway).
assert size > 0;
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -2323,6 +2323,11 @@ public boolean shouldIgnoreGcGraceForKey(DecoratedKey dk)
return partitionKeySetIgnoreGcGrace.contains(dk);
}

public boolean shouldIgnoreGcGraceForAnyKey()
{
return !partitionKeySetIgnoreGcGrace.isEmpty();
}

public static Iterable<ColumnFamilyStore> all()
{
List<Iterable<ColumnFamilyStore>> stores = new ArrayList<>(Schema.instance.getKeyspaces().size());
Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.rows.ColumnData;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.SerializationHelper;
import org.apache.cassandra.db.rows.UnfilteredSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.schema.ColumnMetadata;
Expand Down Expand Up @@ -529,6 +531,7 @@ public void serializeSubset(Collection<ColumnMetadata> columns, Columns superset
int supersetCount = superset.size();
if (columnCount == supersetCount)
{
/** This is prevented by caller for row serialization: {@link UnfilteredSerializer#serializeRowBody(Row, int, SerializationHelper, DataOutputPlus)}*/
out.writeUnsignedVInt32(0);
}
else if (supersetCount < 64)
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/db/DecoratedKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public ByteSource asComparableBytes(Version version)
// The OSS50 version avoids this by adding a terminator.
return ByteSource.withTerminatorMaybeLegacy(version,
ByteSource.END_OF_STREAM,
token.asComparableBytes(version),
getToken().asComparableBytes(version),
keyComparableBytes(version));
}

Expand All @@ -127,7 +127,7 @@ public ByteComparable asComparableBound(boolean before)

return ByteSource.withTerminator(
before ? ByteSource.LT_NEXT_COMPONENT : ByteSource.GT_NEXT_COMPONENT,
token.asComparableBytes(version),
getToken().asComparableBytes(version),
keyComparableBytes(version));
};
}
Expand Down
8 changes: 4 additions & 4 deletions src/java/org/apache/cassandra/db/DeletionPurger.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@

public interface DeletionPurger
{
public static final DeletionPurger PURGE_ALL = (ts, ldt) -> true;
DeletionPurger PURGE_ALL = (ts, ldt) -> true;

public boolean shouldPurge(long timestamp, long localDeletionTime);
boolean shouldPurge(long timestamp, long localDeletionTime);

public default boolean shouldPurge(DeletionTime dt)
default boolean shouldPurge(DeletionTime dt)
{
return !dt.isLive() && shouldPurge(dt.markedForDeleteAt(), dt.localDeletionTime());
}

public default boolean shouldPurge(LivenessInfo liveness, long nowInSec)
default boolean shouldPurge(LivenessInfo liveness, long nowInSec)
{
return !liveness.isLive(nowInSec) && shouldPurge(liveness.timestamp(), liveness.localExpirationTime());
}
Expand Down
Loading