Skip to content

Commit ef0a4ba

Browse files
author
Nitsan Wakart
committed
Add cursor based optimized compaction path (WIP)
Adds a compaction implementation utilizing new fixed allocation SSTable reader/writer implementations, and other purpose built code, leading to improved efficiencies. patch by Nitsan Wakart; reviewed by Branimir Lambov, +TBD for CASSANDRA-20918
1 parent 9a534ba commit ef0a4ba

File tree

142 files changed

+13122
-410
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

142 files changed

+13122
-410
lines changed

build.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1414,6 +1414,7 @@
14141414
<jvmarg value="-Dcassandra.config=file:///${scm_none_yaml}"/>
14151415
<jvmarg value="-Dcassandra.test.storage_compatibility_mode=NONE"/>
14161416
<jvmarg value="-Dcassandra.skip_sync=true" />
1417+
<jvmarg value="-Dcassandra.cursor_compaction_enabled=false" />
14171418
</testmacrohelper>
14181419
</sequential>
14191420
</macrodef>

src/java/org/apache/cassandra/config/CassandraRelevantProperties.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ public enum CassandraRelevantProperties
193193
CONSISTENT_RANGE_MOVEMENT("cassandra.consistent.rangemovement", "true"),
194194
CONSISTENT_SIMULTANEOUS_MOVES_ALLOW("cassandra.consistent.simultaneousmoves.allow"),
195195
CRYPTO_PROVIDER_CLASS_NAME("cassandra.crypto_provider_class_name"),
196+
CURSOR_COMPACTION_ENABLED("cassandra.cursor_compaction_enabled", "true"),
196197
CUSTOM_DISK_ERROR_HANDLER("cassandra.custom_disk_error_handler"),
197198
CUSTOM_GUARDRAILS_CONFIG_PROVIDER_CLASS("cassandra.custom_guardrails_config_provider_class"),
198199
CUSTOM_QUERY_HANDLER_CLASS("cassandra.custom_query_handler_class"),

src/java/org/apache/cassandra/config/Config.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949

5050
import static org.apache.cassandra.config.CassandraRelevantProperties.AUTOCOMPACTION_ON_STARTUP_ENABLED;
5151
import static org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_AVAILABLE_PROCESSORS;
52+
import static org.apache.cassandra.config.CassandraRelevantProperties.CURSOR_COMPACTION_ENABLED;
5253
import static org.apache.cassandra.config.CassandraRelevantProperties.FILE_CACHE_ENABLED;
5354
import static org.apache.cassandra.config.CassandraRelevantProperties.SKIP_PAXOS_REPAIR_ON_TOPOLOGY_CHANGE;
5455
import static org.apache.cassandra.config.CassandraRelevantProperties.SKIP_PAXOS_REPAIR_ON_TOPOLOGY_CHANGE_KEYSPACES;
@@ -655,6 +656,8 @@ public static class SSTableConfig
655656
@Replaces(oldName = "enable_drop_compact_storage", converter = Converters.IDENTITY, deprecated = true)
656657
public volatile boolean drop_compact_storage_enabled = false;
657658

659+
public boolean cursor_compaction_enabled = CURSOR_COMPACTION_ENABLED.getBoolean();
660+
658661
public volatile boolean use_statements_enabled = true;
659662

660663
/**

src/java/org/apache/cassandra/config/DatabaseDescriptor.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4683,6 +4683,17 @@ public static void setTransientReplicationEnabledUnsafe(boolean enabled)
46834683
conf.transient_replication_enabled = enabled;
46844684
}
46854685

4686+
public static boolean cursorCompactionEnabled()
4687+
{
4688+
return conf.cursor_compaction_enabled;
4689+
}
4690+
4691+
@VisibleForTesting
4692+
public static void setCursorCompactionEnabled(boolean cursor_compaction_enabled)
4693+
{
4694+
conf.cursor_compaction_enabled = cursor_compaction_enabled;
4695+
}
4696+
46864697
public static boolean enableDropCompactStorage()
46874698
{
46884699
return conf.drop_compact_storage_enabled;

src/java/org/apache/cassandra/db/BufferDecoratedKey.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
public class BufferDecoratedKey extends DecoratedKey
2727
{
28-
private final ByteBuffer key;
28+
protected ByteBuffer key;
2929

3030
public BufferDecoratedKey(Token token, ByteBuffer key)
3131
{

src/java/org/apache/cassandra/db/Clustering.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public String toString(TableMetadata metadata)
133133
/**
134134
* Serializer for Clustering object.
135135
* <p>
136-
* Because every clustering in a given table must have the same size (ant that size cannot actually change once the table
136+
* Because every clustering in a given table must have the same size (and that size cannot actually change once the table
137137
* has been defined), we don't record that size.
138138
*/
139139
public static class Serializer

src/java/org/apache/cassandra/db/ClusteringComparator.java

Lines changed: 107 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,17 @@
2626
import com.google.common.base.Joiner;
2727
import com.google.common.collect.ImmutableList;
2828

29+
import org.apache.cassandra.io.sstable.ClusteringDescriptor;
30+
import org.apache.cassandra.db.marshal.AbstractType;
31+
import org.apache.cassandra.db.marshal.ByteBufferAccessor;
2932
import org.apache.cassandra.db.marshal.ValueAccessor;
3033
import org.apache.cassandra.db.rows.Row;
31-
import org.apache.cassandra.db.marshal.AbstractType;
32-
import org.apache.cassandra.serializers.MarshalException;
33-
3434
import org.apache.cassandra.io.sstable.IndexInfo;
35+
import org.apache.cassandra.serializers.MarshalException;
36+
import org.apache.cassandra.utils.ByteBufferUtil;
3537
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
3638
import org.apache.cassandra.utils.bytecomparable.ByteSource;
39+
import org.apache.cassandra.utils.vint.VIntCoding;
3740

3841
import static org.apache.cassandra.utils.bytecomparable.ByteSource.EXCLUDED;
3942
import static org.apache.cassandra.utils.bytecomparable.ByteSource.NEXT_COMPONENT;
@@ -156,6 +159,107 @@ public <V1, V2> int compare(ClusteringPrefix<V1> c1, ClusteringPrefix<V2> c2)
156159
return s1 < s2 ? c1.kind().comparedToClustering : -c2.kind().comparedToClustering;
157160
}
158161

162+
public static int compare(ClusteringDescriptor c1, ClusteringDescriptor c2)
163+
{
164+
final int c1Size = c1.clusteringColumnsBound();
165+
final int c2Size = c2.clusteringColumnsBound();
166+
final int minColumns = Math.min(c1Size, c2Size);
167+
168+
final int cmp = compare(c1.clusteringTypes(), c1.clusteringBuffer(), c2.clusteringBuffer(), minColumns);
169+
if (cmp != 0)
170+
return cmp;
171+
172+
final ClusteringPrefix.Kind c1Kind = c1.clusteringKind();
173+
final ClusteringPrefix.Kind c2Kind = c2.clusteringKind();
174+
if (c1Size == c2Size)
175+
{
176+
return ClusteringPrefix.Kind.compare(c1Kind, c2Kind);
177+
}
178+
179+
return c1Size < c2Size ? c1Kind.comparedToClustering : -c2Kind.comparedToClustering;
180+
}
181+
182+
public static int compare(AbstractType<?>[] types, ByteBuffer c1, ByteBuffer c2) {
183+
return compare(types, c1, c2, types.length);
184+
}
185+
186+
private static int compare(AbstractType<?>[] types, ByteBuffer c1, ByteBuffer c2, int size)
187+
{
188+
long clusteringBlock1 = 0;
189+
long clusteringBlock2 = 0;
190+
final int position1 = c1.position();
191+
final int position2 = c2.position();
192+
final int limit1 = c1.limit();
193+
final int limit2 = c2.limit();
194+
try
195+
{
196+
for (int clusteringIndex = 0; clusteringIndex < size; clusteringIndex++)
197+
{
198+
if (clusteringIndex % 32 == 0)
199+
{
200+
clusteringBlock1 = VIntCoding.readUnsignedVInt(c1);
201+
clusteringBlock2 = VIntCoding.readUnsignedVInt(c2);
202+
}
203+
204+
AbstractType<?> type = types[clusteringIndex];
205+
206+
byte v1Flags = (byte) (clusteringBlock1 & 0b11);
207+
byte v2Flags = (byte) (clusteringBlock2 & 0b11);
208+
209+
// both values are present
210+
if ((v1Flags|v2Flags) == 0)
211+
{
212+
boolean isByteOrderComparable = type.isByteOrderComparable;
213+
int vlen1,vlen2;
214+
if (type.isValueLengthFixed())
215+
{
216+
vlen1 = vlen2 = type.valueLengthIfFixed();
217+
}
218+
else
219+
{
220+
vlen1 = VIntCoding.readUnsignedVInt32(c1);
221+
vlen2 = VIntCoding.readUnsignedVInt32(c2);
222+
}
223+
int v1Limit = c1.position() + vlen1;
224+
if (v1Limit > limit1)
225+
throw new IllegalArgumentException("Value limit exceeds buffer limit.");
226+
c1.limit(v1Limit);
227+
int v2Limit = c2.position() + vlen2;
228+
if (v2Limit > limit2)
229+
throw new IllegalArgumentException("Value limit exceeds buffer limit.");
230+
c2.limit(v2Limit);
231+
int cmp = isByteOrderComparable ?
232+
ByteBufferUtil.compareUnsigned(c1, c2) :
233+
type.compareCustom(c1, ByteBufferAccessor.instance, c2, ByteBufferAccessor.instance);
234+
if (cmp != 0)
235+
return cmp;
236+
c1.position(v1Limit);
237+
c2.position(v2Limit);
238+
c1.limit(limit1);
239+
c2.limit(limit2);
240+
}
241+
// present > not present
242+
else
243+
{
244+
// null (0b10) is smaller than empty (0b01) which is smaller than valued (0b00);
245+
// compare swapped arguments to reverse the order
246+
int cmp = Long.compare(v2Flags, v1Flags);
247+
if (cmp != 0)
248+
return cmp;
249+
// null/empty == null/empty, continue...
250+
}
251+
clusteringBlock1 = clusteringBlock1 >>> 2;
252+
clusteringBlock2 = clusteringBlock2 >>> 2;
253+
}
254+
}
255+
finally
256+
{
257+
c1.position(position1).limit(limit1);
258+
c2.position(position2).limit(limit2);
259+
}
260+
return 0;
261+
}
262+
159263
public <V1, V2> int compare(Clustering<V1> c1, Clustering<V2> c2)
160264
{
161265
return compare(c1, c2, size());

src/java/org/apache/cassandra/db/ClusteringPrefix.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,7 @@ <V> long valuesWithoutSizeSerializedSize(ClusteringPrefix<V> clustering, int ver
519519
return result;
520520
}
521521

522-
byte[][] deserializeValuesWithoutSize(DataInputPlus in, int size, int version, List<AbstractType<?>> types) throws IOException
522+
public byte[][] deserializeValuesWithoutSize(DataInputPlus in, int size, int version, List<AbstractType<?>> types) throws IOException
523523
{
524524
// Callers of this method should handle the case where size = 0 (in all case we want to return a special value anyway).
525525
assert size > 0;

src/java/org/apache/cassandra/db/ColumnFamilyStore.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2329,6 +2329,11 @@ public boolean shouldIgnoreGcGraceForKey(DecoratedKey dk)
23292329
return partitionKeySetIgnoreGcGrace.contains(dk);
23302330
}
23312331

2332+
public boolean shouldIgnoreGcGraceForAnyKey()
2333+
{
2334+
return !partitionKeySetIgnoreGcGrace.isEmpty();
2335+
}
2336+
23322337
public static Iterable<ColumnFamilyStore> all()
23332338
{
23342339
List<Iterable<ColumnFamilyStore>> stores = new ArrayList<>(Schema.instance.getKeyspaces().size());

src/java/org/apache/cassandra/db/Columns.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.apache.cassandra.db.marshal.UTF8Type;
3434
import org.apache.cassandra.db.rows.ColumnData;
3535
import org.apache.cassandra.db.rows.Row;
36+
import org.apache.cassandra.db.rows.SerializationHelper;
37+
import org.apache.cassandra.db.rows.UnfilteredSerializer;
3638
import org.apache.cassandra.io.util.DataInputPlus;
3739
import org.apache.cassandra.io.util.DataOutputPlus;
3840
import org.apache.cassandra.schema.ColumnMetadata;
@@ -529,6 +531,7 @@ public void serializeSubset(Collection<ColumnMetadata> columns, Columns superset
529531
int supersetCount = superset.size();
530532
if (columnCount == supersetCount)
531533
{
534+
/** This is prevented by caller for row serialization: {@link UnfilteredSerializer#serializeRowBody(Row, int, SerializationHelper, DataOutputPlus)}*/
532535
out.writeUnsignedVInt32(0);
533536
}
534537
else if (supersetCount < 64)

0 commit comments

Comments
 (0)