Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ public List<String> importNewSSTables(Set<String> srcPaths, boolean resetLevel,
.build());
}

Descriptor getUniqueDescriptorFor(Descriptor descriptor, File targetDirectory)
public Descriptor getUniqueDescriptorFor(Descriptor descriptor, File targetDirectory)
{
Descriptor newDescriptor;
do
Expand Down
16 changes: 16 additions & 0 deletions src/java/org/apache/cassandra/db/Directories.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.cassandra.service.snapshot.SnapshotManifest;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.TimeUUID;

import static org.apache.cassandra.utils.LocalizeString.toLowerCaseLocalized;

Expand Down Expand Up @@ -115,6 +116,7 @@ public class Directories

public static final String BACKUPS_SUBDIR = "backups";
public static final String SNAPSHOT_SUBDIR = "snapshots";
public static final String PENDING_SUBDIR = "pending";
public static final String TMP_SUBDIR = "tmp";
public static final String SECONDARY_INDEX_NAME_SEPARATOR = ".";

Expand Down Expand Up @@ -727,6 +729,20 @@ public static File getSnapshotSchemaFile(File snapshotDir)
return new File(snapshotDir, "schema.cql");
}

public File getPendingLocationForDisk(DataDirectory dataDirectory, TimeUUID planId)
{
for (File dir : dataPaths)
{
// Note that we must compare absolute paths (not canonical) here since keyspace directories might be symlinks
Path dirPath = dir.toAbsolute().toPath();
Path locationPath = dataDirectory.location.toAbsolute().toPath();
if (!dirPath.startsWith(locationPath))
continue;
return getOrCreate(dir, PENDING_SUBDIR, planId.toString());
}
throw new RuntimeException("Could not find pending location");
}

public static File getBackupsDirectory(Descriptor desc)
{
return getBackupsDirectory(desc.directory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,8 @@ protected void recordLatency(TableMetrics metric, long latencyNanos)
public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadExecutionController controller)
{
ColumnFamilyStore.ViewFragment view = cfs.select(View.selectLive(dataRange().keyRange()));
if (cfs.metadata().replicationType().isTracked())
controller.addActivationIds(view);
Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().partitionKeyType));

// fetch data from current memtable, historical memtables, and SSTables in the correct order.
Expand Down
34 changes: 34 additions & 0 deletions src/java/org/apache/cassandra/db/ReadExecutionController.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,22 @@
package org.apache.cassandra.db;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.replication.ShortMutationId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.concurrent.OpOrder;
Expand All @@ -33,6 +42,8 @@

public class ReadExecutionController implements AutoCloseable
{
private static final Logger logger = LoggerFactory.getLogger(ReadExecutionController.class);

private static final long NO_SAMPLING = Long.MIN_VALUE;

// For every reads
Expand All @@ -50,6 +61,13 @@ public class ReadExecutionController implements AutoCloseable
private final RepairedDataInfo repairedDataInfo;
private long oldestUnrepairedTombstone = Long.MAX_VALUE;

/*
* Track bulk transfers involved in the read, so we can do read reconciliation.
* These come from the ViewFragment, not the SSTable read path, so bloom filters + short-circuiting SSTable scans
* will still include the total set of relevant bulk transfers.
*/
private Set<ShortMutationId> activationIds = null;

ReadExecutionController(ReadCommand command,
OpOrder.Group baseOp,
TableMetadata baseMetadata,
Expand Down Expand Up @@ -243,4 +261,20 @@ private void addSample()
if (cfs != null)
cfs.metric.topLocalReadQueryTime.addSample(cql, timeMicros);
}

public void addActivationIds(ColumnFamilyStore.ViewFragment view)
{
activationIds = new HashSet<>();
for (SSTableReader sstable : view.sstables)
{
Collection<? extends ShortMutationId> ids = sstable.getCoordinatorLogOffsets().transfers();
logger.trace("Adding transfer IDs from SSTable {} {}", sstable, ids);
activationIds.addAll(ids);
}
}

public Iterator<ShortMutationId> getActivationIds()
{
return activationIds == null ? null : activationIds.iterator();
}
}
34 changes: 28 additions & 6 deletions src/java/org/apache/cassandra/db/SSTableImporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.cassandra.io.sstable.format.SSTableFormat.Components;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.replication.MutationTrackingService;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
Expand Down Expand Up @@ -80,11 +81,8 @@ synchronized List<String> importNewSSTables(Options options)
UUID importID = UUID.randomUUID();
logger.info("[{}] Loading new SSTables for {}/{}: {}", importID, cfs.getKeyspaceName(), cfs.getTableName(), options);

// This will be supported in the future
TableMetadata metadata = cfs.metadata();
if (metadata.replicationType() != null && metadata.replicationType().isTracked())
throw new IllegalStateException("Can't import into tables with mutation tracking enabled");

boolean isTracked = metadata.replicationType().isTracked();
List<Pair<Directories.SSTableLister, String>> listers = getSSTableListers(options.srcPaths);

Set<Descriptor> currentDescriptors = new HashSet<>();
Expand Down Expand Up @@ -183,7 +181,14 @@ synchronized List<String> importNewSSTables(Options options)
Descriptor newDescriptor = cfs.getUniqueDescriptorFor(entry.getKey(), targetDir);
maybeMutateMetadata(entry.getKey(), options);
movedSSTables.add(new MovedSSTable(newDescriptor, entry.getKey(), entry.getValue()));
SSTableReader sstable = SSTableReader.moveAndOpenSSTable(cfs, entry.getKey(), newDescriptor, entry.getValue(), options.copyData);
SSTableReader sstable;
if (isTracked)
sstable = SSTableReader.open(cfs, oldDescriptor, metadata.ref);
else
{
// Don't move tracked SSTables, since that will move them to the live set on bounce
sstable = SSTableReader.moveAndOpenSSTable(cfs, oldDescriptor, newDescriptor, entry.getValue(), options.copyData);
}
newSSTablesPerDirectory.add(sstable);
}
catch (Throwable t)
Expand Down Expand Up @@ -233,7 +238,13 @@ synchronized List<String> importNewSSTables(Options options)
if (!cfs.indexManager.validateSSTableAttachedIndexes(newSSTables, false, options.validateIndexChecksum))
cfs.indexManager.buildSSTableAttachedIndexesBlocking(newSSTables);

cfs.getTracker().addSSTables(newSSTables);
if (isTracked)
{
TrackedBulkTransfer.execute(cfs.keyspace.getName(), newSSTables);
}
else
cfs.getTracker().addSSTables(newSSTables);

for (SSTableReader reader : newSSTables)
{
if (options.invalidateCaches && cfs.isRowCacheEnabled())
Expand All @@ -250,6 +261,17 @@ synchronized List<String> importNewSSTables(Options options)
return failedDirectories;
}

/**
* TODO: Support user-defined consistency level for import, for import with replicas down
*/
private static class TrackedBulkTransfer
{
private static void execute(String keyspace, Set<SSTableReader> sstables)
{
MutationTrackingService.instance.executeTransfers(keyspace, sstables, ConsistencyLevel.ALL);
}
}

/**
* Check the state of this node and throws an {@link InterruptedException} if it is currently draining
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,8 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs

Tracing.trace("Acquiring sstable references");
ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
if (cfs.metadata().replicationType().isTracked())
controller.addActivationIds(view);
view.sstables.sort(SSTableReader.maxTimestampDescending);
ClusteringIndexFilter filter = clusteringIndexFilter();
long minTimestamp = Long.MAX_VALUE;
Expand Down Expand Up @@ -1022,6 +1024,8 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam
{
Tracing.trace("Acquiring sstable references");
ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
if (cfs.metadata().replicationType().isTracked())
controller.addActivationIds(view);

ImmutableBTreePartition result = null;
SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();
Expand All @@ -1044,6 +1048,8 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam

/* add the SSTables on disk */
view.sstables.sort(SSTableReader.maxTimestampDescending);
if (cfs.metadata().replicationType().isTracked())
logger.trace("Executing read against SSTables {}", view.sstables);
// read sorted sstables
for (SSTableReader sstable : view.sstables)
{
Expand Down
16 changes: 16 additions & 0 deletions src/java/org/apache/cassandra/db/lifecycle/Tracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
Expand Down Expand Up @@ -61,6 +62,7 @@
import org.apache.cassandra.notifications.TableDroppedNotification;
import org.apache.cassandra.notifications.TablePreScrubNotification;
import org.apache.cassandra.notifications.TruncationNotification;
import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.TimeUUID;
Expand Down Expand Up @@ -270,6 +272,20 @@ public void updateInitialSSTableSize(Iterable<SSTableReader> sstables)

public void addSSTables(Collection<SSTableReader> sstables)
{
Preconditions.checkState(!cfstore.metadata().replicationType().isTracked());
addSSTablesInternal(sstables, false, true, true);
}

public void addSSTablesTracked(Collection<SSTableReader> sstables)
{
Preconditions.checkState(cfstore.metadata().replicationType().isTracked());
for (SSTableReader sstable : sstables)
{
ImmutableCoordinatorLogOffsets logOffsets = sstable.getCoordinatorLogOffsets();
Preconditions.checkState(logOffsets.isEmpty());
Preconditions.checkState(!logOffsets.transfers().isEmpty());
}

addSSTablesInternal(sstables, false, true, true);
}

Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/lifecycle/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -402,4 +402,4 @@ public boolean apply(T t)
}
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.cassandra.streaming.StreamReceiver;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.messages.StreamMessageHeader;
import org.apache.cassandra.utils.FBUtilities;

import static java.lang.String.format;
import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
Expand Down Expand Up @@ -159,9 +160,13 @@ public SSTableMultiWriter read(DataInputPlus in) throws IOException

private File getDataDir(ColumnFamilyStore cfs, long totalSize) throws IOException
{
boolean isTracked = cfs.metadata().replicationType().isTracked();

Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
if (localDir == null)
throw new IOException(format("Insufficient disk space to store %s", prettyPrintMemory(totalSize)));
throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize)));
if (isTracked)
return cfs.getDirectories().getPendingLocationForDisk(localDir, session.planId());

File dir = cfs.getDirectories().getLocationForDisk(cfs.getDiskBoundaries().getCorrectDiskForKey(header.firstKey));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,23 @@
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.UnknownColumnException;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.RangeAwareSSTableWriter;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter;
import org.apache.cassandra.io.sstable.SimpleSSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.TrackedDataInputPlus;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.StreamReceivedOutOfTokenRangeException;
Expand Down Expand Up @@ -180,15 +184,29 @@ protected SerializationHeader getHeader(TableMetadata metadata) throws UnknownCo
}
protected SSTableTxnSingleStreamWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, TimeUUID pendingRepair, ImmutableCoordinatorLogOffsets coordinatorLogOffsets, SSTableFormat<?, ?> format) throws IOException
{
boolean isTracked = cfs.metadata().replicationType().isTracked();

Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
if (localDir == null)
throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize)));

StreamReceiver streamReceiver = session.getAggregator(tableId);
Preconditions.checkState(streamReceiver instanceof CassandraStreamReceiver);
ILifecycleTransaction txn = createTxn();
RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, pendingRepair, false, coordinatorLogOffsets, format, sstableLevel, totalSize, txn, getHeader(cfs.metadata()));
return new SSTableTxnSingleStreamWriter(txn, writer);
if (isTracked)
{
File location = cfs.getDirectories().getPendingLocationForDisk(localDir, session.planId());
Descriptor desc = cfs.newSSTableDescriptor(location, format);
SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(desc, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE, ActiveRepairService.NO_PENDING_REPAIR, false,
coordinatorLogOffsets, cfs.metadata, null, sstableLevel, getHeader(cfs.metadata()),
cfs.indexManager.listIndexGroups(), txn, cfs);
return new SSTableTxnSingleStreamWriter(txn, writer);
}
else
{
RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, pendingRepair, false, coordinatorLogOffsets, format, sstableLevel, totalSize, txn, getHeader(cfs.metadata()));
return new SSTableTxnSingleStreamWriter(txn, writer);
}
}

private ILifecycleTransaction createTxn()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.apache.cassandra.service.accord.AccordTopology;
import org.apache.cassandra.service.accord.IAccordService;
import org.apache.cassandra.service.accord.TimeOnlyRequestBookkeeping.LatencyRequestBookkeeping;
import org.apache.cassandra.replication.MutationTrackingService;
import org.apache.cassandra.replication.PendingLocalTransfer;
import org.apache.cassandra.streaming.IncomingStream;
import org.apache.cassandra.streaming.StreamReceiver;
import org.apache.cassandra.streaming.StreamSession;
Expand Down Expand Up @@ -256,6 +258,15 @@ public void finished()

// add sstables (this will build non-SSTable-attached secondary indexes too, see CASSANDRA-10130)
logger.debug("[Stream #{}] Received {} sstables from {} ({})", session.planId(), readers.size(), session.peer, readers);

if (cfs.metadata().replicationType().isTracked())
{
// Don't mark as live until activated by the stream coordinator
PendingLocalTransfer transfer = new PendingLocalTransfer(cfs.metadata().id, session.planId(), sstables);
MutationTrackingService.instance.received(transfer);
return;
}

cfs.addSSTables(readers);

//invalidate row and counter cache
Expand Down
13 changes: 13 additions & 0 deletions src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.apache.cassandra.io.util.FileUtils.DuplicateHardlinkException;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.metrics.RestorableMeter;
import org.apache.cassandra.replication.CoordinatorLogOffsets;
import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableMetadataRef;
Expand Down Expand Up @@ -1382,6 +1383,18 @@ public void mutateRepairedAndReload(long newRepairedAt, TimeUUID newPendingRepai
}
}

/**
* Mutate sstable level with a lock to avoid racing with entire-sstable-streaming and then reload sstable metadata
*/
public void mutateCoordinatorLogOffsetsAndReload(ImmutableCoordinatorLogOffsets logOffsets) throws IOException
{
synchronized (tidy.global)
{
descriptor.getMetadataSerializer().mutateCoordinatorLogOffsets(descriptor, logOffsets);
reloadSSTableMetadata();
}
}

/**
* Reloads the sstable metadata from disk.
* <p>
Expand Down
Loading