Skip to content

Commit a06d9b3

Browse files
committed
Address Stefan's review comments
Get rid of java.util.concurrent.CompletableFuture usage Use clock Relocate dictionary configurations and use IntKibibytesBound Change compression_dictionary_training_sampling_rate data type from int to float Address other various comments
1 parent 19eed8a commit a06d9b3

28 files changed

+230
-176
lines changed

conf/cassandra.yaml

Lines changed: 46 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -617,54 +617,6 @@ counter_cache_save_period: 7200s
617617
# Disabled by default, meaning all keys are going to be saved
618618
# counter_cache_keys_to_save: 100
619619

620-
# Dictionary compression settings for ZSTD dictionary-based compression
621-
# These settings control the automatic training and caching of compression dictionaries
622-
# for tables that use ZSTD dictionary compression.
623-
624-
# How often to refresh compression dictionaries across the cluster.
625-
# During refresh, nodes will check for newer dictionary versions and update their caches.
626-
# Min unit: s
627-
compression_dictionary_refresh_interval: 3600s
628-
629-
# Initial delay before starting the first dictionary refresh cycle after node startup.
630-
# This prevents all nodes from refreshing simultaneously when the cluster starts.
631-
# Min unit: s
632-
compression_dictionary_refresh_initial_delay: 10s
633-
634-
# Maximum number of compression dictionaries to cache per table.
635-
# Each table using dictionary compression can have multiple dictionaries cached
636-
# (current version plus recently used versions for reading older SSTables).
637-
compression_dictionary_cache_size: 10
638-
639-
# How long to keep compression dictionaries in the cache before they expire.
640-
# Expired dictionaries will be removed from memory but can be reloaded if needed.
641-
# Min unit: s
642-
compression_dictionary_cache_expire: 3600s
643-
644-
# Dictionary training configuration (advanced settings)
645-
# These settings control how compression dictionaries are trained from sample data.
646-
647-
# Maximum size of a trained compression dictionary in bytes.
648-
# Larger dictionaries may provide better compression but use more memory.
649-
# Min unit: B
650-
compression_dictionary_training_max_dictionary_size: 65536
651-
652-
# Maximum total size of sample data to collect for dictionary training.
653-
# More sample data generally produces better dictionaries but takes longer to train.
654-
# The recommended sample size is 100x the dictionary size.
655-
# Min unit: B
656-
compression_dictionary_training_max_total_sample_size: 10485760
657-
658-
# Enable automatic dictionary training based on sampling of write operations.
659-
# When enabled, the system will automatically collect samples and train new dictionaries.
660-
# Manual training via nodetool is always available regardless of this setting.
661-
compression_dictionary_training_auto_train_enabled: false
662-
663-
# Sampling rate for automatic dictionary training (1-10000).
664-
# Value of 100 means 1% of writes are sampled. Lower values reduce overhead but may
665-
# result in less representative sample data for dictionary training.
666-
compression_dictionary_training_sampling_rate: 100
667-
668620
# saved caches
669621
# If not set, the default directory is $CASSANDRA_HOME/data/saved_caches.
670622
# saved_caches_directory: /var/lib/cassandra/saved_caches
@@ -2897,3 +2849,49 @@ storage_compatibility_mode: NONE
28972849
# # especially in keyspaces with many tables. The splitter avoids batching tables together if they
28982850
# # exceed other configuration parameters like bytes_per_assignment or partitions_per_assignment.
28992851
# max_tables_per_assignment: 64
2852+
2853+
# Dictionary compression settings for ZSTD dictionary-based compression
2854+
# These settings control the automatic training and caching of compression dictionaries
2855+
# for tables that use ZSTD dictionary compression.
2856+
2857+
# How often to refresh compression dictionaries across the cluster.
2858+
# During refresh, nodes will check for newer dictionary versions and update their caches.
2859+
# Min unit: s
2860+
compression_dictionary_refresh_interval: 3600s
2861+
2862+
# Initial delay before starting the first dictionary refresh cycle after node startup.
2863+
# This prevents all nodes from refreshing simultaneously when the cluster starts.
2864+
# Min unit: s
2865+
compression_dictionary_refresh_initial_delay: 10s
2866+
2867+
# Maximum number of compression dictionaries to cache per table.
2868+
# Each table using dictionary compression can have multiple dictionaries cached
2869+
# (current version plus recently used versions for reading older SSTables).
2870+
compression_dictionary_cache_size: 10
2871+
2872+
# How long to keep compression dictionaries in the cache before they expire.
2873+
# Expired dictionaries will be removed from memory but can be reloaded if needed.
2874+
# Min unit: s
2875+
compression_dictionary_cache_expire: 24h
2876+
2877+
# Dictionary training configuration (advanced settings)
2878+
# These settings control how compression dictionaries are trained from sample data.
2879+
2880+
# Maximum size of a trained compression dictionary.
2881+
# Larger dictionaries may provide better compression but use more memory.
2882+
compression_dictionary_training_max_dictionary_size: 64KiB
2883+
2884+
# Maximum total size of sample data to collect for dictionary training.
2885+
# More sample data generally produces better dictionaries but takes longer to train.
2886+
# The recommended sample size is 100x the dictionary size.
2887+
compression_dictionary_training_max_total_sample_size: 10MiB
2888+
2889+
# Enable automatic dictionary training based on sampling of write operations.
2890+
# When enabled, the system will automatically collect samples and train new dictionaries.
2891+
# Manual training via nodetool is always available regardless of this setting.
2892+
compression_dictionary_training_auto_train_enabled: false
2893+
2894+
# Sampling rate for automatic dictionary training (1-10000).
2895+
# Value of 100 means 1% of writes are sampled. Lower values reduce overhead but may
2896+
# result in less representative sample data for dictionary training.
2897+
compression_dictionary_training_sampling_rate: 0.01

conf/cassandra_latest.yaml

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2529,3 +2529,49 @@ storage_compatibility_mode: NONE
25292529
# # especially in keyspaces with many tables. The splitter avoids batching tables together if they
25302530
# # exceed other configuration parameters like bytes_per_assignment or partitions_per_assignment.
25312531
# max_tables_per_assignment: 64
2532+
2533+
# Dictionary compression settings for ZSTD dictionary-based compression
2534+
# These settings control the automatic training and caching of compression dictionaries
2535+
# for tables that use ZSTD dictionary compression.
2536+
2537+
# How often to refresh compression dictionaries across the cluster.
2538+
# During refresh, nodes will check for newer dictionary versions and update their caches.
2539+
# Min unit: s
2540+
compression_dictionary_refresh_interval: 3600s
2541+
2542+
# Initial delay before starting the first dictionary refresh cycle after node startup.
2543+
# This prevents all nodes from refreshing simultaneously when the cluster starts.
2544+
# Min unit: s
2545+
compression_dictionary_refresh_initial_delay: 10s
2546+
2547+
# Maximum number of compression dictionaries to cache per table.
2548+
# Each table using dictionary compression can have multiple dictionaries cached
2549+
# (current version plus recently used versions for reading older SSTables).
2550+
compression_dictionary_cache_size: 10
2551+
2552+
# How long to keep compression dictionaries in the cache before they expire.
2553+
# Expired dictionaries will be removed from memory but can be reloaded if needed.
2554+
# Min unit: s
2555+
compression_dictionary_cache_expire: 24h
2556+
2557+
# Dictionary training configuration (advanced settings)
2558+
# These settings control how compression dictionaries are trained from sample data.
2559+
2560+
# Maximum size of a trained compression dictionary.
2561+
# Larger dictionaries may provide better compression but use more memory.
2562+
compression_dictionary_training_max_dictionary_size: 64KiB
2563+
2564+
# Maximum total size of sample data to collect for dictionary training.
2565+
# More sample data generally produces better dictionaries but takes longer to train.
2566+
# The recommended sample size is 100x the dictionary size.
2567+
compression_dictionary_training_max_total_sample_size: 10MiB
2568+
2569+
# Enable automatic dictionary training based on sampling of write operations.
2570+
# When enabled, the system will automatically collect samples and train new dictionaries.
2571+
# Manual training via nodetool is always available regardless of this setting.
2572+
compression_dictionary_training_auto_train_enabled: false
2573+
2574+
# Sampling rate for automatic dictionary training (1-10000).
2575+
# Value of 100 means 1% of writes are sampled. Lower values reduce overhead but may
2576+
# result in less representative sample data for dictionary training.
2577+
compression_dictionary_training_sampling_rate: 0.01

src/java/org/apache/cassandra/config/Config.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -517,13 +517,13 @@ public static class SSTableConfig
517517
public volatile DurationSpec.IntSecondsBound compression_dictionary_refresh_interval = new DurationSpec.IntSecondsBound("3600s"); // 1 hour - TODO: re-assess whether daily (86400s) is more appropriate
518518
public volatile DurationSpec.IntSecondsBound compression_dictionary_refresh_initial_delay = new DurationSpec.IntSecondsBound("10s"); // 10 seconds default
519519
public volatile int compression_dictionary_cache_size = 10; // max dictionaries per table
520-
public volatile DurationSpec.IntSecondsBound compression_dictionary_cache_expire = new DurationSpec.IntSecondsBound("3600s"); // 1 hour default
520+
public volatile DurationSpec.IntSecondsBound compression_dictionary_cache_expire = new DurationSpec.IntSecondsBound("24h");
521521

522522
// Dictionary training settings
523-
public volatile int compression_dictionary_training_max_dictionary_size = 65536; // 64KB
524-
public volatile int compression_dictionary_training_max_total_sample_size = 10485760; // 10MB total
523+
public volatile DataStorageSpec.IntKibibytesBound compression_dictionary_training_max_dictionary_size = new DataStorageSpec.IntKibibytesBound("64KiB");
524+
public volatile DataStorageSpec.IntKibibytesBound compression_dictionary_training_max_total_sample_size = new DataStorageSpec.IntKibibytesBound("10MiB");
525525
public volatile boolean compression_dictionary_training_auto_train_enabled = false;
526-
public volatile int compression_dictionary_training_sampling_rate = 100; // samples 1%; using int since random.nextInt is generally faster than random.nextDouble
526+
public volatile float compression_dictionary_training_sampling_rate = 0.01f; // samples 1%
527527

528528
public DataStorageSpec.LongMebibytesBound paxos_cache_size = null;
529529

src/java/org/apache/cassandra/config/DatabaseDescriptor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4383,12 +4383,12 @@ public static int getCompressionDictionaryCacheExpireSeconds()
43834383

43844384
public static int getCompressionDictionaryTrainingMaxDictionarySize()
43854385
{
4386-
return conf.compression_dictionary_training_max_dictionary_size;
4386+
return conf.compression_dictionary_training_max_dictionary_size.toBytes();
43874387
}
43884388

43894389
public static int getCompressionDictionaryTrainingMaxTotalSampleSize()
43904390
{
4391-
return conf.compression_dictionary_training_max_total_sample_size;
4391+
return conf.compression_dictionary_training_max_total_sample_size.toBytes();
43924392
}
43934393

43944394
public static boolean getCompressionDictionaryTrainingAutoTrainEnabled()
@@ -4397,7 +4397,7 @@ public static boolean getCompressionDictionaryTrainingAutoTrainEnabled()
43974397
}
43984398

43994399

4400-
public static int getCompressionDictionaryTrainingSamplingRate()
4400+
public static float getCompressionDictionaryTrainingSamplingRate()
44014401
{
44024402
return conf.compression_dictionary_training_sampling_rate;
44034403
}

src/java/org/apache/cassandra/db/compression/CompressionDictionary.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public interface CompressionDictionary extends AutoCloseable
4040
*
4141
* @return dictionary id
4242
*/
43-
DictId identifier();
43+
DictId dictId();
4444

4545
/**
4646
* Get the raw bytes of the compression dictionary
@@ -56,7 +56,7 @@ public interface CompressionDictionary extends AutoCloseable
5656
*/
5757
default Kind kind()
5858
{
59-
return identifier().kind;
59+
return dictId().kind;
6060
}
6161

6262
/**
@@ -67,7 +67,7 @@ default Kind kind()
6767
*/
6868
default void serialize(DataOutput out) throws IOException
6969
{
70-
DictId dictId = identifier();
70+
DictId dictId = dictId();
7171
int ordinal = dictId.kind.ordinal();
7272
out.writeByte(ordinal);
7373
out.writeLong(dictId.id);
@@ -123,7 +123,8 @@ static CompressionDictionary deserialize(DataInput input, @Nullable CompressionD
123123
int checksum = input.readInt();
124124
int calculatedChecksum = calculateChecksum((byte) kindOrdinal, id, dict);
125125
if (checksum != calculatedChecksum)
126-
throw new IOException("Compression dictionary checksum does not match");
126+
throw new IOException("Compression dictionary checksum does not match. " +
127+
"Expected: " + checksum + "; actual: " + calculatedChecksum);
127128

128129
CompressionDictionary dictionary = kind.createDictionary(dictId, dict);
129130

src/java/org/apache/cassandra/db/compression/CompressionDictionaryCache.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,9 @@ public class CompressionDictionaryCache implements ICompressionDictionaryCache
4747

4848
public CompressionDictionaryCache()
4949
{
50-
Duration expiryTime = Duration.ofSeconds(DatabaseDescriptor.getCompressionDictionaryCacheExpireSeconds());
5150
this.cache = Caffeine.newBuilder()
5251
.maximumSize(DatabaseDescriptor.getCompressionDictionaryCacheSize())
53-
.expireAfterAccess(expiryTime)
52+
.expireAfterAccess(Duration.ofSeconds(DatabaseDescriptor.getCompressionDictionaryCacheExpireSeconds()))
5453
.removalListener((CompressionDictionary.DictId dictId,
5554
CompressionDictionary dictionary,
5655
RemovalCause cause) -> {
@@ -88,7 +87,7 @@ public CompressionDictionary get(CompressionDictionary.DictId dictId)
8887
@Override
8988
public void add(CompressionDictionary compressionDictionary)
9089
{
91-
cache.put(compressionDictionary.identifier(), compressionDictionary);
90+
cache.put(compressionDictionary.dictId(), compressionDictionary);
9291
}
9392

9493
@Override
@@ -100,7 +99,7 @@ public void setCurrentIfNewer(@Nullable CompressionDictionary dictionary)
10099
add(dictionary);
101100
// Only update the current dictionary if we don't have one or the new one has a higher ID (newer)
102101
CompressionDictionary current = currentDictionary.get();
103-
while ((current == null || dictionary.identifier().id > current.identifier().id)
102+
while ((current == null || dictionary.dictId().id > current.dictId().id)
104103
&& !currentDictionary.compareAndSet(current, dictionary))
105104
{
106105
current = currentDictionary.get();

src/java/org/apache/cassandra/db/compression/CompressionDictionaryEventHandler.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.slf4j.LoggerFactory;
3232

3333
import java.util.Collection;
34-
import java.util.concurrent.CompletableFuture;
3534

3635
/**
3736
* Handles compression dictionary events including training completion and cluster notifications.
@@ -79,7 +78,7 @@ public void onNewDictionaryTrained(CompressionDictionary.DictId dictionaryId)
7978
public void onNewDictionaryAvailable(CompressionDictionary.DictId dictionaryId)
8079
{
8180
// Best effort to retrieve the dictionary; otherwise, the periodic task should retrieve the dictionary later
82-
CompletableFuture.runAsync(() -> {
81+
ScheduledExecutors.nonPeriodicTasks.submit(() -> {
8382
try
8483
{
8584
if (!cfs.metadata().params.compression.isDictionaryCompressionEnabled())
@@ -95,7 +94,7 @@ public void onNewDictionaryAvailable(CompressionDictionary.DictId dictionaryId)
9594
logger.warn("Failed to retrieve compression dictionary for {}.{}. {}",
9695
keyspaceName, tableName, dictionaryId, e);
9796
}
98-
}, ScheduledExecutors.nonPeriodicTasks);
97+
});
9998
}
10099

101100
// Best effort to notify the peer regarding the new dictionary being available to pull.

src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,7 @@ public CompressionDictionaryManager(ColumnFamilyStore columnFamilyStore, boolean
8383

8484
static String mbeanName(String keyspaceName, String tableName)
8585
{
86-
return "org.apache.cassandra.db.compression:type=CompressionDictionaryManager" +
87-
",keyspace=" + keyspaceName + ",table=" + tableName;
86+
return MBEAN_NAME + ",keyspace=" + keyspaceName + ",table=" + tableName;
8887
}
8988

9089
public boolean isEnabled()
@@ -209,7 +208,7 @@ public synchronized void train(Map<String, String> options)
209208
// Validate table supports dictionary compression
210209
if (!isEnabled)
211210
{
212-
throw new IllegalArgumentException("Table " + keyspaceName + '.' + tableName + " does not support dictionary compression");
211+
throw new UnsupportedOperationException("Table " + keyspaceName + '.' + tableName + " does not support dictionary compression");
213212
}
214213

215214
if (trainer == null)
@@ -266,7 +265,7 @@ private void handleNewDictionary(CompressionDictionary dictionary)
266265
{
267266
// sequence meatters; persist the new dictionary before broadcasting to others.
268267
storeDictionary(dictionary);
269-
onNewDictionaryTrained(dictionary.identifier());
268+
onNewDictionaryTrained(dictionary.dictId());
270269
}
271270

272271
private CompressionDictionaryTrainingConfig createTrainingConfig()

src/java/org/apache/cassandra/db/compression/CompressionDictionaryManagerMBean.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222

2323
public interface CompressionDictionaryManagerMBean
2424
{
25+
String MBEAN_NAME = "org.apache.cassandra.db.compression:type=CompressionDictionaryManager";
26+
2527
/**
2628
* Starts sampling and training for this table.
2729
*
2830
* @param options options for the training process (currently unused, reserved for future extensions)
29-
* @throws IllegalArgumentException if table doesn't support dictionary compression
31+
* @throws UnsupportedOperationException if table doesn't support dictionary compression
3032
* @throws IllegalStateException if training is already in progress for this table
3133
*/
3234
void train(Map<String, String> options);

src/java/org/apache/cassandra/db/compression/CompressionDictionaryScheduler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.cassandra.config.DatabaseDescriptor;
3030
import org.apache.cassandra.db.compression.ICompressionDictionaryTrainer.TrainingStatus;
3131
import org.apache.cassandra.schema.SystemDistributedKeyspace;
32+
import org.apache.cassandra.utils.Clock;
3233

3334
/**
3435
* Manages scheduled tasks for compression dictionary operations.
@@ -90,7 +91,7 @@ public void scheduleManualTraining(ManualTrainingOptions options, ICompressionDi
9091
logger.info("Starting manual dictionary training for {}.{} with max sampling duration: {} seconds",
9192
keyspaceName, tableName, maxSamplingDurationSeconds);
9293

93-
long deadlineMillis = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(maxSamplingDurationSeconds);
94+
long deadlineMillis = Clock.Global.currentTimeMillis() + TimeUnit.SECONDS.toMillis(maxSamplingDurationSeconds);
9495

9596
ManualTrainingTask task = new ManualTrainingTask(deadlineMillis, trainer);
9697

@@ -182,15 +183,15 @@ public void run()
182183
return;
183184
}
184185

185-
long now = System.currentTimeMillis();
186+
long now = Clock.Global.currentTimeMillis();
186187
// Force training if there are not enough samples, but we have hit the max sampling duration
187188
boolean reachedDeadline = now >= deadlineMillis;
188189
if (!isTraining && (trainer.isReady() || reachedDeadline))
189190
{
190191
// Set isTraining to only enter the branch once
191192
isTraining = true;
192193
trainer.trainDictionaryAsync(reachedDeadline)
193-
.whenComplete((dictionary, throwable) -> {
194+
.addCallback((dictionary, throwable) -> {
194195
cancelManualTraining();
195196
if (throwable != null)
196197
{

0 commit comments

Comments
 (0)