-
Notifications
You must be signed in to change notification settings - Fork 3.8k
CASSANDRA-17021: Support ZSTD dictionary compression #4399
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
CASSANDRA-17021: Support ZSTD dictionary compression #4399
Conversation
9279a19
to
7bf7e30
Compare
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.Collection; | ||
import java.util.concurrent.CompletableFuture; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkstyle (ant realclean && ant artifacts
) evaluated this as illegal import, see checkstyle.xml and grep CompletableFuture
there to see. I am not completely sure yet but I think it should be replaced by something in org.apache.cassandra.utils.concurrent
package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed
logger.info("Starting manual dictionary training for {}.{} with max sampling duration: {} seconds", | ||
keyspaceName, tableName, maxSamplingDurationSeconds); | ||
|
||
long deadlineMillis = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(maxSamplingDurationSeconds); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid System for time, should use org.apache.cassandra.utils.Clock.Global
or org.apache.cassandra.utils.Clock
interface [blockSystemClock]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed
return; | ||
} | ||
|
||
long now = System.currentTimeMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
package org.apache.cassandra.db.compression; | ||
|
||
import java.nio.ByteBuffer; | ||
import java.util.concurrent.CompletableFuture; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
illegal import
currentSampleCount, totalSampleSize.get(), isReady); | ||
byte[] dictBytes = zstdTrainer.trainSamples(); | ||
long zstdDictId = Zstd.getDictIdFromDict(dictBytes); | ||
DictId dictId = new DictId(Kind.ZSTD, makeDictionaryId(System.currentTimeMillis(), zstdDictId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid System for time, should use org.apache.cassandra.utils.Clock.Globa
l or org.apache.cassandra.utils.Clock
interface [blockSystemClock]
public ZstdDictCompress dictionaryForCompression(int compressionLevel) | ||
{ | ||
if (closed.get()) | ||
throw new IllegalStateException("Dictionary has been closed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe being more descriptive in this message (and all other same messages) by including dict id with kind and id would be nice.
@Override | ||
public String name() | ||
{ | ||
return "ZstdCompressionDictionary"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ZstdCompressionDictionary.class.getSimpleName()
.
}) | ||
.build(); | ||
|
||
// dictioanry and its ref are null, when they are absent. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo in dictioanry
.
|
||
public ManualTrainingOptions(int maxSamplingDurationSeconds) | ||
{ | ||
if (maxSamplingDurationSeconds <= 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider Preconditions.checkArgument
here
// 1. The current compressor is not a dictionary compressor, but there is dictionary attached | ||
// 2. The current dictionary compressor is a different type, e.g. table schema is changed | ||
// In those cases, we should get the compatible dictionary compressor based on the dictionary | ||
if (dictionary.kind() == ZSTD) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are again too "ZSTD-centric" here. I would expect this does not contain anything ZSTD specific so we do not need to touch this anymore. The fact that dictionary is not empty again pretty much guarantees that we can create a compressor from it, no? Why would we have a dictionary, of CompressionDictionary
type, if we were not able to create a compressor from it? That would be a pretty useless dictionary to me.
So maybe doing something like
return dictionary.kind().getCompressor(dictionary);
and calling ZstdDictionaryCompressor.create(zstdDict)
in there would be way more simpler and at one place.
You would need to return ICompressor
from getCompressor
as resolveCompressor
returns it too. If IDictionaryCompressor
extended ICompressor
as already suggested elsewhere (as you need to instantiate ZstdDictionaryCompressor
there) it would be a piece of cake.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or even better
return dictionary.getCompressor();
which would be like
default IDictionaryCompressor getCompressor()
{
return kind().getCompressor(this);
}
public void addSample(ByteBuffer sample) | ||
{ | ||
ICompressionDictionaryTrainer dictionaryTrainer = trainer; | ||
if (dictionaryTrainer != null && dictionaryTrainer.shouldSample()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here, no-op trainer would always return false
on shouldSample
.
public synchronized void train(Map<String, String> options) | ||
{ | ||
// Validate table supports dictionary compression | ||
if (!isEnabled) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Preconditions.checkArgument ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to throw UnsupportedOperationException
as suggested in another comment.
ManualTrainingOptions trainingOptions = ManualTrainingOptions.fromStringMap(options); | ||
|
||
trainer.start(true); | ||
scheduler.scheduleManualTraining(trainingOptions, trainer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we would not actually scheduled anything in that method if trainer is no-op instance.
{ | ||
return TrainingStatus.NOT_STARTED.toString(); | ||
} | ||
return dictionaryTrainer.getTrainingStatus().toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getTrainingStatus
on no-op trainer would always return NOT_STARTED
.
public synchronized void close() | ||
{ | ||
unregisterMbean(); | ||
if (trainer != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
closing no-op trainer would do nothing and when using concrete trainer we would just do trainer = NO_OP
instead of trainer = null
.
} | ||
|
||
IDictionaryCompressor dictionaryCompressor = (IDictionaryCompressor) compressor; | ||
if (dictionaryCompressor.acceptableDictionaryKind() == CompressionDictionary.Kind.ZSTD) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are zstd specific here, not sure that is really needed.
* @param zstdDictId Zstd dictionary ID (unsigned 32-bit value represented as long) | ||
* @return combined dictionary ID that is monotonically increasing over time | ||
*/ | ||
static long makeDictionaryId(long currentTimeMillis, long zstdDictId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this really ZSTD specific? seems like moving this to DictId or similar would be better so it can be used for other compressors too. I just dont see why is this ZSTD specific only.
private CompressionDictionaryManagerMBean getDictionaryManagerProxy(String keyspace, String table) throws IOException | ||
{ | ||
// Construct table-specific MBean name | ||
String mbeanName = "org.apache.cassandra.db.compression:type=CompressionDictionaryManager,keyspace=" + keyspace + ",table=" + table; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replace with constant once introduced
* | ||
* @return dictionary id | ||
*/ | ||
DictId identifier(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rename to dictId();
Add ZstdDictionaryCompressor and wire to the read and the write path Resolve a correct dictionary compressor on the read path lazily init ZstdDictCompress and ZstdDictDecompress checksumming Added compression dictionary table and accessors Wire to System table access to CompressionDictionaryManager Integrate dictionary trainer Add nodetool to train dictionary Cleanup break CompressionDictionaryManager up properly release dictionary with reference counting Add ZstdDictionaryCompressorTest Simplify trainer interface Bump SSTable version Add CompressionDictionaryCacheTest Add ZstdDictionaryTrainerTest update to use assertJ Add CompressionDictionaryTrainingConfigTest Update comments in CompressionDictionaryEventHandler and update exception handling in MessagingService Add CompressionDictionaryEventHandlerTest Add CompressionDictionarySchedulerTest Minor update Add CompressionDictionaryManagerTest Add TrainCompressionDictionaryTest (nodetool) Add CompressionDictionaryIntegrationTest Tidy up Add ZstdCompressionDictionaryTest Add SystemDistributedKeyspaceCompressionDictionaryTest Add --sampling-rate for the nodetool Update consistency level from QUORUM to ONE for best effort Add the missing setCompressionDictionaryManager to sstable writer builders Add benchmark
This reverts commit db95727.
Get rid of java.util.concurrent.CompletableFuture usage Use clock Relocate dictionary configurations and use IntKibibytesBound Change compression_dictionary_training_sampling_rate data type from int to float Address other various comments
db95727
to
a06d9b3
Compare
I think all comments are addressed. Please tag me if I missed. (GH is not super friendly with many comments) |
Add ZstdDictionaryCompressor and wire to the read and the write path
Resolve a correct dictionary compressor on the read path
lazily init ZstdDictCompress and ZstdDictDecompress
checksumming
Added compression dictionary table and accessors
Wire to System table access to CompressionDictionaryManager
Integrate dictionary trainer
Add nodetool to train dictionary
Cleanup
break CompressionDictionaryManager up
properly release dictionary with reference counting
Add ZstdDictionaryCompressorTest
Simplify trainer interface
Bump SSTable version
Add CompressionDictionaryCacheTest
Add ZstdDictionaryTrainerTest
update to use assertJ
Add CompressionDictionaryTrainingConfigTest
Update comments in CompressionDictionaryEventHandler and update exception handling in MessagingService
Add CompressionDictionaryEventHandlerTest
Add CompressionDictionarySchedulerTest
Minor update
Add CompressionDictionaryManagerTest
Add TrainCompressionDictionaryTest (nodetool)
Add CompressionDictionaryIntegrationTest
Tidy up
Add ZstdCompressionDictionaryTest
Add SystemDistributedKeyspaceCompressionDictionaryTest
Add --sampling-rate for the nodetool
Update consistency level from QUORUM to ONE for best effort
Add the missing setCompressionDictionaryManager to sstable writer builders
Add benchmark