Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1229,13 +1229,20 @@
</target>

<target name="test-compression" depends="build-test" description="Execute unit tests with sstable compression enabled">
<testmacro suitename="unit" inputdir="${test.unit.src}" exclude="**/pig/*.java" timeout="${test.timeout}">
<property name="compressed_yaml" value="${build.test.dir}/cassandra.compressed.yaml"/>
<concat destfile="${compressed_yaml}">
<fileset file="${test.conf}/cassandra.yaml"/>
<fileset file="${test.conf}/commitlog_compression.yaml"/>
</concat>
<echo>Compressed config: ${compressed_yaml}</echo>
<testmacro suitename="unit" inputdir="${test.unit.src}" exclude="**/pig/*.java" timeout="${test.timeout}">
<jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/>
<jvmarg value="-Dcorrupt-sstable-root=${test.data}/corrupt-sstables"/>
<jvmarg value="-Dmigration-sstable-root=${test.data}/migration-sstables"/>
<jvmarg value="-Dcassandra.test.compression=true"/>
<jvmarg value="-Dcassandra.ring_delay_ms=1000"/>
<jvmarg value="-Dcassandra.tolerate_sstable_size=true"/>
<jvmarg value="-Dcassandra.config=file:///${compressed_yaml}"/>
</testmacro>
<fileset dir="${test.unit.src}">
<exclude name="**/pig/*.java" />
Expand Down
7 changes: 7 additions & 0 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,13 @@ commitlog_sync_period_in_ms: 10000
# is reasonable.
commitlog_segment_size_in_mb: 32

# Compression to apply to the commit log. If omitted, the commit log
# will be written uncompressed.
#commitlog_compression:
# - class_name: LZ4Compressor
# parameters:
# -

# any class that implements the SeedProvider interface and has a
# constructor that takes a Map<String, String> of parameters will do.
seed_provider:
Expand Down
6 changes: 4 additions & 2 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
import java.io.IOException;
import java.io.StringReader;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.google.common.collect.Sets;

import org.supercsv.io.CsvListReader;
import org.supercsv.prefs.CsvPreference;

import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
import org.apache.cassandra.exceptions.ConfigurationException;
Expand Down Expand Up @@ -53,7 +54,7 @@ public class Config
public Set<String> hinted_handoff_enabled_by_dc = Sets.newConcurrentHashSet();
public volatile Integer max_hint_window_in_ms = 3600 * 1000; // one hour

public SeedProviderDef seed_provider;
public ParametrizedClass seed_provider;
public DiskAccessMode disk_access_mode = DiskAccessMode.auto;

public DiskFailurePolicy disk_failure_policy = DiskFailurePolicy.ignore;
Expand Down Expand Up @@ -152,6 +153,7 @@ public class Config
public Integer commitlog_sync_period_in_ms;
public int commitlog_segment_size_in_mb = 32;
public int commitlog_periodic_queue_size = 1024 * FBUtilities.getAvailableProcessors();
public ParametrizedClass commitlog_compression;

public String endpoint_snitch;
public Boolean dynamic_snitch = true;
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,11 @@ public static String getCommitLogLocation()
return conf.commitlog_directory;
}

public static ParametrizedClass getCommitLogCompression()
{
return conf.commitlog_compression;
}

public static int getTombstoneWarnThreshold()
{
return conf.tombstone_warn_threshold;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,40 @@
import java.util.List;
import java.util.Map;

import com.google.common.base.Objects;

public class SeedProviderDef
public class ParametrizedClass
{
public String class_name;
public Map<String, String> parameters;

public SeedProviderDef(LinkedHashMap<String, ?> p)
public ParametrizedClass(String class_name, Map<String, String> parameters)
{
class_name = (String)p.get("class_name");
parameters = (Map<String, String>)((List)p.get("parameters")).get(0);
this.class_name = class_name;
this.parameters = parameters;
}

@SuppressWarnings("unchecked")
public ParametrizedClass(LinkedHashMap<String, ?> p)
{
this((String)p.get("class_name"),
p.containsKey("parameters") ? (Map<String, String>)((List<?>)p.get("parameters")).get(0) : null);
}

@Override
public boolean equals(Object that)
{
return that instanceof ParametrizedClass && equals((ParametrizedClass) that);
}

public boolean equals(ParametrizedClass that)
{
return Objects.equal(class_name, that.class_name) && Objects.equal(parameters, that.parameters);
}

@Override
public String toString()
{
return class_name + (parameters == null ? "" : parameters.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public Config loadConfig(URL url) throws ConfigurationException
logConfig(configBytes);

org.yaml.snakeyaml.constructor.Constructor constructor = new org.yaml.snakeyaml.constructor.Constructor(Config.class);
TypeDescription seedDesc = new TypeDescription(SeedProviderDef.class);
TypeDescription seedDesc = new TypeDescription(ParametrizedClass.class);
seedDesc.putMapPropertyType("parameters", String.class, String.class);
constructor.addTypeDescription(seedDesc);
MissingPropertiesChecker propertiesChecker = new MissingPropertiesChecker();
Expand Down
18 changes: 18 additions & 0 deletions src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParametrizedClass;
import org.apache.cassandra.db.*;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.util.DataOutputByteBuffer;
import org.apache.cassandra.metrics.CommitLogMetrics;
import org.apache.cassandra.net.MessagingService;
Expand Down Expand Up @@ -59,6 +63,20 @@ public class CommitLog implements CommitLogMBean
final CommitLogMetrics metrics;
final AbstractCommitLogService executor;

static final ICompressor compressor;
static {
try
{
ParametrizedClass compressionClass = DatabaseDescriptor.getCommitLogCompression();
compressor = compressionClass != null ? CompressionParameters.createCompressor(compressionClass) : null;
}
catch (ConfigurationException e)
{
logger.error("Fatal configuration error", e);
throw new ExceptionInInitializerError(e);
}
}

private CommitLog()
{
DatabaseDescriptor.createAllDirectories();
Expand Down
18 changes: 15 additions & 3 deletions src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@

import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -166,17 +167,28 @@ public void maybeRestoreArchive()
CommitLogDescriptor descriptor;
if (fromHeader == null && fromName == null)
throw new IllegalStateException("Cannot safely construct descriptor for segment, either from its name or its header: " + fromFile.getPath());
else if (fromHeader != null && fromName != null && !fromHeader.equals(fromName))
else if (fromHeader != null && fromName != null && !fromHeader.equalsIgnoringCompression(fromName))
throw new IllegalStateException(String.format("Cannot safely construct descriptor for segment, as name and header descriptors do not match (%s vs %s): %s", fromHeader, fromName, fromFile.getPath()));
else if (fromName != null && fromHeader == null && fromName.version >= CommitLogDescriptor.VERSION_21)
throw new IllegalStateException("Cannot safely construct descriptor for segment, as name descriptor implies a version that should contain a header descriptor, but that descriptor could not be read: " + fromFile.getPath());
else if (fromHeader != null)
descriptor = fromHeader;
else descriptor = fromName;

if (descriptor.version > CommitLogDescriptor.VERSION_21)
if (descriptor.version > CommitLogDescriptor.VERSION_22)
throw new IllegalStateException("Unsupported commit log version: " + descriptor.version);

if (descriptor.compression != null) {
try
{
CompressionParameters.createCompressor(descriptor.compression);
}
catch (ConfigurationException e)
{
throw new IllegalStateException("Unknown compression", e);
}
}

File toFile = new File(DatabaseDescriptor.getCommitLogLocation(), descriptor.fileName());
if (toFile.exists())
throw new IllegalStateException("Trying to restore archive " + fromFile.getPath() + ", but the same segment already exists in the restore location: " + toFile.getPath());
Expand Down
Loading