Skip to content
39 changes: 31 additions & 8 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
import org.apache.cassandra.repair.consistent.admin.PendingStat;
import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets;
import org.apache.cassandra.replication.MutationId;
import org.apache.cassandra.replication.MutationJournal;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
Expand Down Expand Up @@ -498,9 +499,16 @@ public ColumnFamilyStore(Keyspace keyspace,

logger.info("Initializing {}.{}", getKeyspaceName(), name);

Memtable initialMemtable = DatabaseDescriptor.isDaemonInitialized() ?
createMemtable(new AtomicReference<>(CommitLog.instance.getCurrentPosition())) :
null;
Memtable initialMemtable = null;
if (DatabaseDescriptor.isDaemonInitialized())
{
CommitLogPosition commitLogPosition;
if (metadata().replicationType().isTracked())
commitLogPosition = MutationJournal.instance.getCurrentPosition();
else
commitLogPosition = CommitLog.instance.getCurrentPosition();
initialMemtable = createMemtable(new AtomicReference<>(commitLogPosition));
}
memtableMetricsReleaser = memtableFactory.createMemtableMetricsReleaser(metadata);

data = new Tracker(this, initialMemtable, loadSSTables);
Expand Down Expand Up @@ -1147,8 +1155,13 @@ public CommitLogPosition call()
// If a flush errored out but the error was ignored, make sure we don't discard the commit log.
if (flushFailure == null && mainMemtable != null)
{
CommitLogPosition commitLogLowerBound = mainMemtable.getCommitLogLowerBound();
commitLogUpperBound = mainMemtable.getFinalCommitLogUpperBound();
CommitLog.instance.discardCompletedSegments(metadata.id, mainMemtable.getCommitLogLowerBound(), commitLogUpperBound);
TableMetadata metadata = metadata();
if (metadata.replicationType().isTracked())
MutationJournal.instance.notifyFlushed(metadata.id, commitLogLowerBound, commitLogUpperBound);
else
CommitLog.instance.discardCompletedSegments(metadata.id, commitLogLowerBound, commitLogUpperBound);
}

metric.pendingFlushes.dec();
Expand Down Expand Up @@ -1215,7 +1228,7 @@ private Flush(boolean truncate)

// we then ensure an atomic decision is made about the upper bound of the continuous range of commit log
// records owned by this memtable
setCommitLogUpperBound(commitLogUpperBound);
setCommitLogUpperBound(commitLogUpperBound, metadata().replicationType().isTracked());

// we then issue the barrier; this lets us wait for all operations started prior to the barrier to complete;
// since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier
Expand Down Expand Up @@ -1416,15 +1429,21 @@ public Memtable createMemtable(AtomicReference<CommitLogPosition> commitLogUpper
}

// atomically set the upper bound for the commit log
private static void setCommitLogUpperBound(AtomicReference<CommitLogPosition> commitLogUpperBound)
private static void setCommitLogUpperBound(AtomicReference<CommitLogPosition> commitLogUpperBound, boolean useMutationJournal)
{
// we attempt to set the holder to the current commit log context. at the same time all writes to the memtables are
// also maintaining this value, so if somebody sneaks ahead of us somehow (should be rare) we simply retry,
// so that we know all operations prior to the position have not reached it yet
CommitLogPosition lastReplayPosition;
while (true)
{
lastReplayPosition = new Memtable.LastCommitLogPosition((CommitLog.instance.getCurrentPosition()));
CommitLogPosition commitLogPosition;
if (useMutationJournal)
commitLogPosition = MutationJournal.instance.getCurrentPosition();
else
commitLogPosition = CommitLog.instance.getCurrentPosition();

lastReplayPosition = new Memtable.LastCommitLogPosition(commitLogPosition);
CommitLogPosition currentLast = commitLogUpperBound.get();
if ((currentLast == null || currentLast.compareTo(lastReplayPosition) <= 0)
&& commitLogUpperBound.compareAndSet(currentLast, lastReplayPosition))
Expand Down Expand Up @@ -3234,7 +3253,11 @@ void onTableDropped()

data.notifyDropped(DatabaseDescriptor.getAutoSnapshotTtl());

CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id));
// TODO (required): test mutation tracking + table dropping
if (metadata().replicationType().isTracked())
MutationJournal.instance.notifyFlushed(metadata.id, new CommitLogPosition(0, 0), MutationJournal.instance.getCurrentPosition());
else
CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id));

compactionStrategyManager.shutdown();

Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/db/Mutation.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ public DecoratedKey key()
return key;
}

public ImmutableMap<TableId, PartitionUpdate> modifications()
{
return modifications;
}

public ImmutableCollection<PartitionUpdate> getPartitionUpdates()
{
return modifications.values();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public static CommitLogReplayer construct(CommitLog commitLog, UUID localHostId)

for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
// but, if we've truncated the cf in question, then we need to need to start replay after the truncation
// but, if we've truncated the cf in question, then we need to start replay after the truncation
CommitLogPosition truncatedAt = SystemKeyspace.getTruncatedPosition(cfs.metadata.id);
if (truncatedAt != null)
{
Expand Down Expand Up @@ -323,7 +323,7 @@ public void runMayThrow()
{
assert !newPUCollector.isEmpty();

Keyspace.open(newPUCollector.getKeyspaceName()).apply(newPUCollector.build(), false, true, false);
keyspace.apply(newPUCollector.build(), false, true, false);
commitLogReplayer.keyspacesReplayed.add(keyspace);
}
}
Expand Down Expand Up @@ -439,7 +439,7 @@ public static ReplayFilter create()
if (toReplay.isEmpty())
logger.info("All tables will be included in commit log replay.");
else
logger.info("Tables to be replayed: {}", toReplay.asMap().toString());
logger.info("Tables to be replayed: {}", toReplay.asMap());

return new CustomReplayFilter(toReplay);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.cassandra.db.WriteContext;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.journal.RecordPointer;
import org.apache.cassandra.replication.MutationJournal;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.concurrent.OpOrder;
Expand All @@ -40,10 +39,9 @@ public WriteContext beginWrite(Mutation mutation, boolean makeDurable) throws Re
group = Keyspace.writeOrder.start();

Tracing.trace("Appending to mutation journal");
RecordPointer pointer = MutationJournal.instance.write(mutation.id(), mutation);
CommitLogPosition pointer = MutationJournal.instance.write(mutation.id(), mutation);

// TODO (preferred): update journal to return CommitLogPosition or otherwise remove requirement to allocate second object here
return new CassandraWriteContext(group, new CommitLogPosition(pointer.segment, pointer.position));
return new CassandraWriteContext(group, pointer);
}
catch (Throwable t)
{
Expand Down
39 changes: 25 additions & 14 deletions src/java/org/apache/cassandra/journal/ActiveSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import accord.utils.Invariants;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.io.util.*;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.Ref;
Expand Down Expand Up @@ -98,6 +98,11 @@ private ActiveSegment(
}
}

public CommitLogPosition currentPosition()
{
return new CommitLogPosition(id(), (int) allocateOffset);
}

static <K, V> ActiveSegment<K, V> create(Descriptor descriptor, Params params, KeySupport<K> keySupport)
{
InMemoryIndex<K> index = InMemoryIndex.create(keySupport);
Expand Down Expand Up @@ -134,6 +139,12 @@ StaticSegment<K, V> asStatic()
throw new UnsupportedOperationException();
}

@Override
public void persistMetadata()
{
throw new UnsupportedOperationException("Can not mutate active segment's metadata.");
}

/**
* Read the entry and specified offset into the entry holder.
* Expects the caller to acquire the ref to the segment and the record to exist.
Expand Down Expand Up @@ -425,19 +436,21 @@ private int allocateBytes(int size)
}
}

final class Allocation
final class Allocation extends RecordPointer
{
private final OpOrder.Group appendOp;
private final ByteBuffer buffer;
private final int start;
private final int length;

Allocation(OpOrder.Group appendOp, ByteBuffer buffer, int length)
{
super(descriptor.timestamp, buffer.position(), length);
this.appendOp = appendOp;
this.buffer = buffer;
this.start = buffer.position();
this.length = length;
}

Segment<K, V> segment()
{
return ActiveSegment.this;
}

void write(K id, ByteBuffer record)
Expand All @@ -446,7 +459,7 @@ void write(K id, ByteBuffer record)
{
EntrySerializer.write(id, record, keySupport, buffer, descriptor.userVersion);
metadata.update();
index.update(id, start, length);
index.update(id, position, length);
}
catch (IOException e)
{
Expand All @@ -472,14 +485,12 @@ void consumeBufferUnsafe(Consumer<ByteBuffer> fn)
}
}


// Variant of write that does not allocate/return a record pointer
void writeInternal(K id, ByteBuffer record)
{
try
{
EntrySerializer.write(id, record, keySupport, buffer, descriptor.userVersion);
index.update(id, start, length);
index.update(id, position, length);
metadata.update();
}
catch (IOException e)
Expand All @@ -496,13 +507,13 @@ void awaitDurable(Timer waitingOnFlush)
{
try (Timer.Context ignored = waitingOnFlush.time())
{
waitForFlush(start);
waitForFlush(position);
}
}

boolean isFsynced()
{
return fsyncedTo >= start + length;
return fsyncedTo >= position + length;
}

Descriptor descriptor()
Expand All @@ -512,12 +523,12 @@ Descriptor descriptor()

int start()
{
return start;
return position;
}

RecordPointer recordPointer()
{
return new RecordPointer(descriptor.timestamp, start, length);
return this;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.journal;

import org.apache.cassandra.io.util.DataInputBuffer;

import java.io.IOException;
import java.nio.ByteBuffer;

public abstract class DeserializedRecordConsumer<K, V> implements RecordConsumer<K>
{
final ValueSerializer<K, V> valueSerializer;

public DeserializedRecordConsumer(ValueSerializer<K, V> valueSerializer)
{
this.valueSerializer = valueSerializer;
}

@Override
public void accept(long segment, int position, K key, ByteBuffer buffer, int userVersion)
{
try (DataInputBuffer in = new DataInputBuffer(buffer, false))
{
V value = valueSerializer.deserialize(key, in, userVersion);
accept(segment, position, key, value);
}
catch (IOException e)
{
// can only throw if serializer is buggy
throw new RuntimeException(e);
}
}

protected abstract void accept(long segment, int position, K key, V value);
}
Loading