Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1229,13 +1229,20 @@
</target>

<target name="test-compression" depends="build-test" description="Execute unit tests with sstable compression enabled">
<testmacro suitename="unit" inputdir="${test.unit.src}" exclude="**/pig/*.java" timeout="${test.timeout}">
<property name="compressed_yaml" value="${build.test.dir}/cassandra.compressed.yaml"/>
<concat destfile="${compressed_yaml}">
<fileset file="${test.conf}/cassandra.yaml"/>
<fileset file="${test.conf}/commitlog_compression.yaml"/>
</concat>
<echo>Compressed config: ${compressed_yaml}</echo>
<testmacro suitename="unit" inputdir="${test.unit.src}" exclude="**/pig/*.java" timeout="${test.timeout}">
<jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/>
<jvmarg value="-Dcorrupt-sstable-root=${test.data}/corrupt-sstables"/>
<jvmarg value="-Dmigration-sstable-root=${test.data}/migration-sstables"/>
<jvmarg value="-Dcassandra.test.compression=true"/>
<jvmarg value="-Dcassandra.ring_delay_ms=1000"/>
<jvmarg value="-Dcassandra.tolerate_sstable_size=true"/>
<jvmarg value="-Dcassandra.config=file:///${compressed_yaml}"/>
</testmacro>
<fileset dir="${test.unit.src}">
<exclude name="**/pig/*.java" />
Expand Down
11 changes: 11 additions & 0 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,17 @@ commitlog_sync_period_in_ms: 10000
# is reasonable.
commitlog_segment_size_in_mb: 32

# Compression to apply to the commit log. If omitted, the commit log
# will be written uncompressed.
#commitlog_compression:
# - class_name: LZ4Compressor
# parameters:
# -

# Specifies the number of sync threads to use for the commit log.
# Only usable with compression.
#commitlog_sync_threads: 1

# any class that implements the SeedProvider interface and has a
# constructor that takes a Map<String, String> of parameters will do.
seed_provider:
Expand Down
6 changes: 4 additions & 2 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import java.util.Set;

import com.google.common.collect.Sets;

import org.supercsv.io.CsvListReader;
import org.supercsv.prefs.CsvPreference;

import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
import org.apache.cassandra.exceptions.ConfigurationException;
Expand Down Expand Up @@ -53,7 +53,7 @@ public class Config
public Set<String> hinted_handoff_enabled_by_dc = Sets.newConcurrentHashSet();
public volatile Integer max_hint_window_in_ms = 3600 * 1000; // one hour

public SeedProviderDef seed_provider;
public ParametrizedClass seed_provider;
public DiskAccessMode disk_access_mode = DiskAccessMode.auto;

public DiskFailurePolicy disk_failure_policy = DiskFailurePolicy.ignore;
Expand Down Expand Up @@ -150,8 +150,10 @@ public class Config
public CommitLogSync commitlog_sync;
public Double commitlog_sync_batch_window_in_ms;
public Integer commitlog_sync_period_in_ms;
public int commitlog_sync_threads = 1;
public int commitlog_segment_size_in_mb = 32;
public int commitlog_periodic_queue_size = 1024 * FBUtilities.getAvailableProcessors();
public ParametrizedClass commitlog_compression;

public String endpoint_snitch;
public Boolean dynamic_snitch = true;
Expand Down
16 changes: 16 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ else if (conf.commitlog_sync_batch_window_in_ms != null)
logger.debug("Syncing log with a period of {}", conf.commitlog_sync_period_in_ms);
}

if (conf.commitlog_sync_threads < 1)
throw new ConfigurationException("commitlog_sync_threads must be a positive integer.");

if (conf.commitlog_compression == null && conf.commitlog_sync_threads > 1)
throw new ConfigurationException("commitlog_sync_threads can only be used when compression is enabled.");

if (conf.commitlog_total_space_in_mb == null)
conf.commitlog_total_space_in_mb = hasLargeAddressSpace() ? 8192 : 32;

Expand Down Expand Up @@ -1068,6 +1074,16 @@ public static String getCommitLogLocation()
return conf.commitlog_directory;
}

public static ParametrizedClass getCommitLogCompression()
{
return conf.commitlog_compression;
}

public static int getCommitLogSyncThreadCount()
{
return conf.commitlog_sync_threads;
}

public static int getTombstoneWarnThreshold()
{
return conf.tombstone_warn_threshold;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,40 @@
import java.util.List;
import java.util.Map;

import com.google.common.base.Objects;

public class SeedProviderDef
public class ParametrizedClass
{
public String class_name;
public Map<String, String> parameters;

public SeedProviderDef(LinkedHashMap<String, ?> p)
public ParametrizedClass(String class_name, Map<String, String> parameters)
{
class_name = (String)p.get("class_name");
parameters = (Map<String, String>)((List)p.get("parameters")).get(0);
this.class_name = class_name;
this.parameters = parameters;
}

@SuppressWarnings("unchecked")
public ParametrizedClass(LinkedHashMap<String, ?> p)
{
this((String)p.get("class_name"),
p.containsKey("parameters") ? (Map<String, String>)((List<?>)p.get("parameters")).get(0) : null);
}

@Override
public boolean equals(Object that)
{
return that instanceof ParametrizedClass && equals((ParametrizedClass) that);
}

public boolean equals(ParametrizedClass that)
{
return Objects.equal(class_name, that.class_name) && Objects.equal(parameters, that.parameters);
}

@Override
public String toString()
{
return class_name + (parameters == null ? "" : parameters.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public Config loadConfig(URL url) throws ConfigurationException
logConfig(configBytes);

org.yaml.snakeyaml.constructor.Constructor constructor = new org.yaml.snakeyaml.constructor.Constructor(Config.class);
TypeDescription seedDesc = new TypeDescription(SeedProviderDef.class);
TypeDescription seedDesc = new TypeDescription(ParametrizedClass.class);
seedDesc.putMapPropertyType("parameters", String.class, String.class);
constructor.addTypeDescription(seedDesc);
MissingPropertiesChecker propertiesChecker = new MissingPropertiesChecker();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@
*/
package org.apache.cassandra.db.commitlog;

import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.concurrent.WaitQueue;

import org.slf4j.*;

import java.util.concurrent.Semaphore;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -31,21 +36,29 @@ public abstract class AbstractCommitLogService
// how often should we log syngs that lag behind our desired period
private static final long LAG_REPORT_INTERVAL = TimeUnit.MINUTES.toMillis(5);

private final Thread thread;
private volatile boolean shutdown = false;

// all Allocations written before this time will be synced
protected volatile long lastSyncedAt = System.currentTimeMillis();
// all Allocations written before this time are synced.
// Note: this number might jump back once in a while, but will eventually increase past any point in time.
// If a jump does occur, mutations started before both the higher and lower number are synced at that time.
protected volatile long approximateSyncedAt = System.currentTimeMillis();

// counts of total written, and pending, log messages
private final AtomicLong written = new AtomicLong(0);
protected final AtomicLong pending = new AtomicLong(0);

// signal that writers can wait on to be notified of a completed sync
protected final WaitQueue syncComplete = new WaitQueue();
private final Semaphore haveWork = new Semaphore(1);

private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class);
private final ScheduledExecutorService executor;
private final Runnable runnable;

long firstLagAt = 0;
long totalSyncDuration = 0; // total time spent syncing since firstLagAt
long syncExceededIntervalBy = 0; // time that syncs exceeded pollInterval since firstLagAt
int lagCount = 0;
int syncCount = 0;

volatile boolean shutdown = false;

/**
* CommitLogService provides a fsync service for Allocations, fulfilling either the
Expand All @@ -58,89 +71,61 @@ public abstract class AbstractCommitLogService
if (pollIntervalMillis < 1)
throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms", pollIntervalMillis));

Runnable runnable = new Runnable()
runnable = new Runnable()
{
public void run()
{
long firstLagAt = 0;
long totalSyncDuration = 0; // total time spent syncing since firstLagAt
long syncExceededIntervalBy = 0; // time that syncs exceeded pollInterval since firstLagAt
int lagCount = 0;
int syncCount = 0;

boolean run = true;
while (run)
try
{
try
// always run once after shutdown signalled
if (shutdown)
executor.shutdown();

// sync and signal
long syncStarted = System.currentTimeMillis();
commitLog.sync(shutdown);
// This might overwrite a higher number put by another thread. This is not a problem
// as to reach this point the other thread must have waited for our writes to complete.
approximateSyncedAt = syncStarted;
syncComplete.signalAll();


// sleep any time we have left before the next one is due
long now = System.currentTimeMillis();
long sleep = syncStarted + pollIntervalMillis - now;
if (sleep < 0)
{
// always run once after shutdown signalled
run = !shutdown;

// sync and signal
long syncStarted = System.currentTimeMillis();
commitLog.sync(shutdown);
lastSyncedAt = syncStarted;
syncComplete.signalAll();


// sleep any time we have left before the next one is due
long now = System.currentTimeMillis();
long sleep = syncStarted + pollIntervalMillis - now;
if (sleep < 0)
// if we have lagged noticeably, update our lag counter
if (firstLagAt == 0)
{
// if we have lagged noticeably, update our lag counter
if (firstLagAt == 0)
{
firstLagAt = now;
totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0;
}
syncExceededIntervalBy -= sleep;
lagCount++;
}
syncCount++;
totalSyncDuration += now - syncStarted;

if (firstLagAt > 0 && now - firstLagAt >= LAG_REPORT_INTERVAL)
{
logger.warn(String.format("Out of %d commit log syncs over the past %ds with average duration of %.2fms, %d have exceeded the configured commit interval by an average of %.2fms",
syncCount, (now - firstLagAt) / 1000, (double) totalSyncDuration / syncCount, lagCount, (double) syncExceededIntervalBy / lagCount));
firstLagAt = 0;
}

// if we have lagged this round, we probably have work to do already so we don't sleep
if (sleep < 0 || !run)
continue;

try
{
haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
throw new AssertionError();
firstLagAt = now;
totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0;
}
syncExceededIntervalBy -= sleep;
lagCount++;
}
catch (Throwable t)
{
if (!CommitLog.handleCommitError("Failed to persist commits to disk", t))
break;
syncCount++;
totalSyncDuration += now - syncStarted;

// sleep for full poll-interval after an error, so we don't spam the log file
try
{
haveWork.tryAcquire(pollIntervalMillis, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
throw new AssertionError();
}
if (firstLagAt > 0 && now - firstLagAt >= LAG_REPORT_INTERVAL)
{
logger.warn(String.format("Out of %d commit log syncs over the past %ds with average duration of %.2fms, %d have exceeded the configured commit interval by an average of %.2fms",
syncCount, (now - firstLagAt) / 1000, (double) totalSyncDuration / syncCount, lagCount, (double) syncExceededIntervalBy / lagCount));
firstLagAt = 0;
}
}
catch (Throwable t)
{
if (!CommitLog.handleCommitError("Failed to persist commits to disk", t))
executor.shutdown();
}
}
};

thread = new Thread(runnable, name);
thread.start();
int threadCount = DatabaseDescriptor.getCommitLogSyncThreadCount();
executor = Executors.newScheduledThreadPool(threadCount, new NamedThreadFactory("commit-log-service"));
for (int i=0; i<threadCount; ++i)
executor.scheduleAtFixedRate(runnable, pollIntervalMillis * i, pollIntervalMillis * threadCount, TimeUnit.MILLISECONDS);
}

/**
Expand All @@ -157,22 +142,20 @@ public void finishWriteFor(Allocation alloc)
/**
* Sync immediately, but don't block for the sync to cmplete
*/
public WaitQueue.Signal requestExtraSync()
public Future<?> requestExtraSync()
{
WaitQueue.Signal signal = syncComplete.register();
haveWork.release(1);
return signal;
return executor.submit(runnable);
}

public void shutdown()
{
shutdown = true;
haveWork.release(1);
requestExtraSync();
}

public void awaitTermination() throws InterruptedException
{
thread.join();
executor.awaitTermination(3600, TimeUnit.SECONDS);
}

public long getCompletedTasks()
Expand Down
18 changes: 18 additions & 0 deletions src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParametrizedClass;
import org.apache.cassandra.db.*;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.util.DataOutputByteBuffer;
import org.apache.cassandra.metrics.CommitLogMetrics;
import org.apache.cassandra.net.MessagingService;
Expand Down Expand Up @@ -59,6 +63,20 @@ public class CommitLog implements CommitLogMBean
final CommitLogMetrics metrics;
final AbstractCommitLogService executor;

static final ICompressor compressor;
static {
try
{
ParametrizedClass compressionClass = DatabaseDescriptor.getCommitLogCompression();
compressor = compressionClass != null ? CompressionParameters.createCompressor(compressionClass) : null;
}
catch (ConfigurationException e)
{
logger.error("Fatal configuration error", e);
throw new ExceptionInInitializerError(e);
}
}

private CommitLog()
{
DatabaseDescriptor.createAllDirectories();
Expand Down
Loading