diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 2c11e65b29df..dc36d9b114c6 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -134,6 +134,7 @@ import org.apache.cassandra.repair.consistent.admin.PendingStat; import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets; import org.apache.cassandra.replication.MutationId; +import org.apache.cassandra.replication.MutationJournal; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.CompactionParams; import org.apache.cassandra.schema.CompactionParams.TombstoneOption; @@ -498,9 +499,16 @@ public ColumnFamilyStore(Keyspace keyspace, logger.info("Initializing {}.{}", getKeyspaceName(), name); - Memtable initialMemtable = DatabaseDescriptor.isDaemonInitialized() ? - createMemtable(new AtomicReference<>(CommitLog.instance.getCurrentPosition())) : - null; + Memtable initialMemtable = null; + if (DatabaseDescriptor.isDaemonInitialized()) + { + CommitLogPosition commitLogPosition; + if (metadata().replicationType().isTracked()) + commitLogPosition = MutationJournal.instance.getCurrentPosition(); + else + commitLogPosition = CommitLog.instance.getCurrentPosition(); + initialMemtable = createMemtable(new AtomicReference<>(commitLogPosition)); + } memtableMetricsReleaser = memtableFactory.createMemtableMetricsReleaser(metadata); data = new Tracker(this, initialMemtable, loadSSTables); @@ -1147,8 +1155,13 @@ public CommitLogPosition call() // If a flush errored out but the error was ignored, make sure we don't discard the commit log. if (flushFailure == null && mainMemtable != null) { + CommitLogPosition commitLogLowerBound = mainMemtable.getCommitLogLowerBound(); commitLogUpperBound = mainMemtable.getFinalCommitLogUpperBound(); - CommitLog.instance.discardCompletedSegments(metadata.id, mainMemtable.getCommitLogLowerBound(), commitLogUpperBound); + TableMetadata metadata = metadata(); + if (metadata.replicationType().isTracked()) + MutationJournal.instance.notifyFlushed(metadata.id, commitLogLowerBound, commitLogUpperBound); + else + CommitLog.instance.discardCompletedSegments(metadata.id, commitLogLowerBound, commitLogUpperBound); } metric.pendingFlushes.dec(); @@ -1215,7 +1228,7 @@ private Flush(boolean truncate) // we then ensure an atomic decision is made about the upper bound of the continuous range of commit log // records owned by this memtable - setCommitLogUpperBound(commitLogUpperBound); + setCommitLogUpperBound(commitLogUpperBound, metadata().replicationType().isTracked()); // we then issue the barrier; this lets us wait for all operations started prior to the barrier to complete; // since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier @@ -1416,7 +1429,7 @@ public Memtable createMemtable(AtomicReference commitLogUpper } // atomically set the upper bound for the commit log - private static void setCommitLogUpperBound(AtomicReference commitLogUpperBound) + private static void setCommitLogUpperBound(AtomicReference commitLogUpperBound, boolean useMutationJournal) { // we attempt to set the holder to the current commit log context. at the same time all writes to the memtables are // also maintaining this value, so if somebody sneaks ahead of us somehow (should be rare) we simply retry, @@ -1424,7 +1437,13 @@ private static void setCommitLogUpperBound(AtomicReference co CommitLogPosition lastReplayPosition; while (true) { - lastReplayPosition = new Memtable.LastCommitLogPosition((CommitLog.instance.getCurrentPosition())); + CommitLogPosition commitLogPosition; + if (useMutationJournal) + commitLogPosition = MutationJournal.instance.getCurrentPosition(); + else + commitLogPosition = CommitLog.instance.getCurrentPosition(); + + lastReplayPosition = new Memtable.LastCommitLogPosition(commitLogPosition); CommitLogPosition currentLast = commitLogUpperBound.get(); if ((currentLast == null || currentLast.compareTo(lastReplayPosition) <= 0) && commitLogUpperBound.compareAndSet(currentLast, lastReplayPosition)) @@ -3234,7 +3253,11 @@ void onTableDropped() data.notifyDropped(DatabaseDescriptor.getAutoSnapshotTtl()); - CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id)); + // TODO (required): test mutation tracking + table dropping + if (metadata().replicationType().isTracked()) + MutationJournal.instance.notifyFlushed(metadata.id, new CommitLogPosition(0, 0), MutationJournal.instance.getCurrentPosition()); + else + CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id)); compactionStrategyManager.shutdown(); diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index f8388bfe89c0..c81ea47310cf 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -201,6 +201,11 @@ public DecoratedKey key() return key; } + public ImmutableMap modifications() + { + return modifications; + } + public ImmutableCollection getPartitionUpdates() { return modifications.values(); diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 52173d44fb45..f331e0b1a7ef 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -121,7 +121,7 @@ public static CommitLogReplayer construct(CommitLog commitLog, UUID localHostId) for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) { - // but, if we've truncated the cf in question, then we need to need to start replay after the truncation + // but, if we've truncated the cf in question, then we need to start replay after the truncation CommitLogPosition truncatedAt = SystemKeyspace.getTruncatedPosition(cfs.metadata.id); if (truncatedAt != null) { @@ -323,7 +323,7 @@ public void runMayThrow() { assert !newPUCollector.isEmpty(); - Keyspace.open(newPUCollector.getKeyspaceName()).apply(newPUCollector.build(), false, true, false); + keyspace.apply(newPUCollector.build(), false, true, false); commitLogReplayer.keyspacesReplayed.add(keyspace); } } @@ -439,7 +439,7 @@ public static ReplayFilter create() if (toReplay.isEmpty()) logger.info("All tables will be included in commit log replay."); else - logger.info("Tables to be replayed: {}", toReplay.asMap().toString()); + logger.info("Tables to be replayed: {}", toReplay.asMap()); return new CustomReplayFilter(toReplay); } diff --git a/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java b/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java index bfd6d593c6d7..8c06e3c30884 100644 --- a/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java +++ b/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java @@ -24,7 +24,6 @@ import org.apache.cassandra.db.WriteContext; import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.exceptions.RequestExecutionException; -import org.apache.cassandra.journal.RecordPointer; import org.apache.cassandra.replication.MutationJournal; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.concurrent.OpOrder; @@ -40,10 +39,9 @@ public WriteContext beginWrite(Mutation mutation, boolean makeDurable) throws Re group = Keyspace.writeOrder.start(); Tracing.trace("Appending to mutation journal"); - RecordPointer pointer = MutationJournal.instance.write(mutation.id(), mutation); + CommitLogPosition pointer = MutationJournal.instance.write(mutation.id(), mutation); - // TODO (preferred): update journal to return CommitLogPosition or otherwise remove requirement to allocate second object here - return new CassandraWriteContext(group, new CommitLogPosition(pointer.segment, pointer.position)); + return new CassandraWriteContext(group, pointer); } catch (Throwable t) { diff --git a/src/java/org/apache/cassandra/journal/ActiveSegment.java b/src/java/org/apache/cassandra/journal/ActiveSegment.java index 6000e2c9d65f..9e01f493d7de 100644 --- a/src/java/org/apache/cassandra/journal/ActiveSegment.java +++ b/src/java/org/apache/cassandra/journal/ActiveSegment.java @@ -30,10 +30,10 @@ import accord.utils.Invariants; import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; +import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.io.util.*; import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.concurrent.Ref; @@ -98,6 +98,11 @@ private ActiveSegment( } } + public CommitLogPosition currentPosition() + { + return new CommitLogPosition(id(), (int) allocateOffset); + } + static ActiveSegment create(Descriptor descriptor, Params params, KeySupport keySupport) { InMemoryIndex index = InMemoryIndex.create(keySupport); @@ -134,6 +139,12 @@ StaticSegment asStatic() throw new UnsupportedOperationException(); } + @Override + public void persistMetadata() + { + throw new UnsupportedOperationException("Can not mutate active segment's metadata."); + } + /** * Read the entry and specified offset into the entry holder. * Expects the caller to acquire the ref to the segment and the record to exist. @@ -425,19 +436,21 @@ private int allocateBytes(int size) } } - final class Allocation + final class Allocation extends RecordPointer { private final OpOrder.Group appendOp; private final ByteBuffer buffer; - private final int start; - private final int length; Allocation(OpOrder.Group appendOp, ByteBuffer buffer, int length) { + super(descriptor.timestamp, buffer.position(), length); this.appendOp = appendOp; this.buffer = buffer; - this.start = buffer.position(); - this.length = length; + } + + Segment segment() + { + return ActiveSegment.this; } void write(K id, ByteBuffer record) @@ -446,7 +459,7 @@ void write(K id, ByteBuffer record) { EntrySerializer.write(id, record, keySupport, buffer, descriptor.userVersion); metadata.update(); - index.update(id, start, length); + index.update(id, position, length); } catch (IOException e) { @@ -472,14 +485,12 @@ void consumeBufferUnsafe(Consumer fn) } } - - // Variant of write that does not allocate/return a record pointer void writeInternal(K id, ByteBuffer record) { try { EntrySerializer.write(id, record, keySupport, buffer, descriptor.userVersion); - index.update(id, start, length); + index.update(id, position, length); metadata.update(); } catch (IOException e) @@ -496,13 +507,13 @@ void awaitDurable(Timer waitingOnFlush) { try (Timer.Context ignored = waitingOnFlush.time()) { - waitForFlush(start); + waitForFlush(position); } } boolean isFsynced() { - return fsyncedTo >= start + length; + return fsyncedTo >= position + length; } Descriptor descriptor() @@ -512,12 +523,12 @@ Descriptor descriptor() int start() { - return start; + return position; } RecordPointer recordPointer() { - return new RecordPointer(descriptor.timestamp, start, length); + return this; } } diff --git a/src/java/org/apache/cassandra/journal/DeserializedRecordConsumer.java b/src/java/org/apache/cassandra/journal/DeserializedRecordConsumer.java new file mode 100644 index 000000000000..0f058befc097 --- /dev/null +++ b/src/java/org/apache/cassandra/journal/DeserializedRecordConsumer.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.journal; + +import org.apache.cassandra.io.util.DataInputBuffer; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public abstract class DeserializedRecordConsumer implements RecordConsumer +{ + final ValueSerializer valueSerializer; + + public DeserializedRecordConsumer(ValueSerializer valueSerializer) + { + this.valueSerializer = valueSerializer; + } + + @Override + public void accept(long segment, int position, K key, ByteBuffer buffer, int userVersion) + { + try (DataInputBuffer in = new DataInputBuffer(buffer, false)) + { + V value = valueSerializer.deserialize(key, in, userVersion); + accept(segment, position, key, value); + } + catch (IOException e) + { + // can only throw if serializer is buggy + throw new RuntimeException(e); + } + } + + protected abstract void accept(long segment, int position, K key, V value); +} diff --git a/src/java/org/apache/cassandra/journal/Journal.java b/src/java/org/apache/cassandra/journal/Journal.java index eaab157ea9c5..3176c5b395ea 100644 --- a/src/java/org/apache/cassandra/journal/Journal.java +++ b/src/java/org/apache/cassandra/journal/Journal.java @@ -36,6 +36,7 @@ import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; +import org.agrona.collections.Long2ObjectHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,7 +113,7 @@ public class Journal implements Shutdownable // segment that is ready to be used; allocator thread fills this and blocks until consumed private volatile ActiveSegment availableSegment = null; - private final AtomicReference> segments = new AtomicReference<>(); + private final AtomicReference> segments = new AtomicReference<>(new Segments<>(new Long2ObjectHashMap<>())); final AtomicReference state = new AtomicReference<>(State.UNINITIALIZED); @@ -340,6 +341,15 @@ public V readLast(K id) return null; } + public void readLast(K id, long segmentId, DeserializedRecordConsumer consumer) + { + Segment segment = segments.get().get(segmentId); + try (OpOrder.Group group = readOrder.start()) + { + segment.readLast(id, consumer); + } + } + public void readAll(K id, RecordConsumer consumer) { EntrySerializer.EntryHolder holder = new EntrySerializer.EntryHolder<>(); @@ -474,16 +484,16 @@ public RecordPointer lookUpLast(K id) public int sizeOfRecord(RecordPointer pointer) { - Descriptor descriptor = segments.get().descriptor(pointer.segment); + Descriptor descriptor = segments.get().descriptor(pointer.segmentId); Invariants.nonNull(descriptor); - return pointer.size - EntrySerializer.overheadSize(keySupport, descriptor.userVersion); + return pointer.length - EntrySerializer.overheadSize(keySupport, descriptor.userVersion); } public boolean read(RecordPointer pointer, RecordConsumer consumer) { try (OpOrder.Group group = readOrder.start()) { - Segment segment = segments.get().get(pointer.segment); + Segment segment = segments.get().get(pointer.segmentId); return segment != null && segment.read(pointer, consumer); } } @@ -552,7 +562,6 @@ public void unsafeConsumeBytesForTesting(int entrySize, Consumer cor private ActiveSegment.Allocation allocate(int entrySize) { - ActiveSegment segment = currentSegment; ActiveSegment.Allocation alloc; while (null == (alloc = segment.allocate(entrySize))) @@ -807,11 +816,23 @@ ActiveSegment oldestActiveSegment() return oldest; } + public List> getSegments(long lowerBound, long upperBound) + { + List> res = new ArrayList<>(); + segments().select(lowerBound, upperBound, res); + return res; + } + public ActiveSegment currentActiveSegment() { return currentSegment; } + @Nullable protected Segment getSegment(long timestamp) + { + return segments().get(timestamp); + } + ActiveSegment getActiveSegment(long timestamp) { // we can race with segment addition to the segments() collection, with a new segment appearing in currentSegment first @@ -853,10 +874,12 @@ else if (timestamp > currentSegmentTimestamp) private class CloseActiveSegmentRunnable implements Runnable { private final ActiveSegment activeSegment; + private final Runnable onDone; - CloseActiveSegmentRunnable(ActiveSegment activeSegment) + CloseActiveSegmentRunnable(ActiveSegment activeSegment, @Nullable Runnable onDone) { this.activeSegment = activeSegment; + this.onDone = onDone; } @Override @@ -868,10 +891,16 @@ public void run() activeSegment.persistComponents(); replaceCompletedSegment(activeSegment, StaticSegment.open(activeSegment.descriptor, keySupport)); activeSegment.release(Journal.this); + if (onDone != null) onDone.run(); } } - void closeActiveSegmentAndOpenAsStatic(ActiveSegment activeSegment) + protected void closeActiveSegmentAndOpenAsStatic(ActiveSegment activeSegment) + { + closeActiveSegmentAndOpenAsStatic(activeSegment, null); + } + + protected void closeActiveSegmentAndOpenAsStatic(ActiveSegment activeSegment, @Nullable Runnable onDone) { if (activeSegment.isEmpty()) { @@ -880,7 +909,7 @@ void closeActiveSegmentAndOpenAsStatic(ActiveSegment activeSegment) return; } - closer.execute(new CloseActiveSegmentRunnable(activeSegment)); + closer.execute(new CloseActiveSegmentRunnable(activeSegment, onDone)); } @VisibleForTesting @@ -981,13 +1010,21 @@ public interface Writer } /** - * Static segment iterator iterates all keys in _static_ segments in order. + * Static segment iterator iterates all keys in _static_ segments in order for the given key range. */ public StaticSegmentKeyIterator staticSegmentKeyIterator(K min, K max) { return new StaticSegmentKeyIterator(min, max); } + /** + * Static segment iterator iterates all keys in selected segments in order. + */ + public StaticSegmentKeyIterator staticSegmentKeyIterator(Predicate> predicate) + { + return new StaticSegmentKeyIterator(null, null, predicate); + } + /** * List of key and a list of segment descriptors referencing this key */ @@ -1013,6 +1050,11 @@ public void segments(LongConsumer consumer) consumer.accept(segments[i]); } + public long lastSegment() + { + return segments[segments.length - 1]; + } + public long[] copyOfSegments() { return segments == null ? new long[0] : Arrays.copyOf(segments, size); @@ -1060,10 +1102,17 @@ public class StaticSegmentKeyIterator implements CloseableIterator> public StaticSegmentKeyIterator(K min, K max) { - this.segments = selectAndReference(s -> s.isStatic() - && s.asStatic().index().entryCount() > 0 - && (min == null || keySupport.compare(s.index().lastId(), min) >= 0) - && (max == null || keySupport.compare(s.index().firstId(), max) <= 0)); + this(min, max, s -> { + return s.isStatic() + && s.asStatic().index().entryCount() > 0 + && (min == null || keySupport.compare(s.index().lastId(), min) >= 0) + && (max == null || keySupport.compare(s.index().firstId(), max) <= 0); + }); + } + + public StaticSegmentKeyIterator(K min, K max, Predicate> predicate) + { + this.segments = selectAndReference(predicate); List> iterators = new ArrayList<>(segments.count()); for (Segment segment : segments.allSorted(true)) diff --git a/src/java/org/apache/cassandra/journal/Metadata.java b/src/java/org/apache/cassandra/journal/Metadata.java index 13198fa77aa1..6f9552a22901 100644 --- a/src/java/org/apache/cassandra/journal/Metadata.java +++ b/src/java/org/apache/cassandra/journal/Metadata.java @@ -33,23 +33,25 @@ * - total count of records in this segment file * used for compaction prioritisation */ -final class Metadata +public final class Metadata { private int fsyncLimit; - + // Indicates whether a segment needs to be replayed or no. + private volatile boolean needsReplay; private volatile int recordsCount; private static final AtomicIntegerFieldUpdater recordsCountUpdater = AtomicIntegerFieldUpdater.newUpdater(Metadata.class, "recordsCount"); static Metadata empty() { - return new Metadata(0, 0); + return new Metadata(0, 0, true); } - private Metadata(int recordsCount, int fsyncLimit) + private Metadata(int recordsCount, int fsyncLimit, boolean needsReplay) { this.recordsCount = recordsCount; this.fsyncLimit = fsyncLimit; + this.needsReplay = needsReplay; } void update() @@ -62,17 +64,27 @@ void fsyncLimit(int fsyncLimit) this.fsyncLimit = fsyncLimit; } + public void clearNeedsReplay() + { + this.needsReplay = false; + } + int fsyncLimit() { return fsyncLimit; } + public boolean needsReplay() + { + return needsReplay; + } + private void incrementRecordsCount() { recordsCountUpdater.incrementAndGet(this); } - int totalCount() + public int totalCount() { return recordsCount; } @@ -82,8 +94,10 @@ void write(DataOutputPlus out) throws IOException CRC32 crc = Crc.crc32(); out.writeInt(recordsCount); out.writeInt(fsyncLimit); + out.writeBoolean(needsReplay); updateChecksumInt(crc, recordsCount); updateChecksumInt(crc, fsyncLimit); + updateChecksumInt(crc, needsReplay ? 1 : 0); out.writeInt((int) crc.getValue()); } @@ -92,10 +106,12 @@ static Metadata read(DataInputPlus in) throws IOException CRC32 crc = Crc.crc32(); int recordsCount = in.readInt(); int fsyncLimit = in.readInt(); + boolean needsReplay = in.readBoolean(); updateChecksumInt(crc, recordsCount); updateChecksumInt(crc, fsyncLimit); + updateChecksumInt(crc, needsReplay ? 1 : 0); validateCRC(crc, in.readInt()); - return new Metadata(recordsCount, fsyncLimit); + return new Metadata(recordsCount, fsyncLimit, needsReplay); } void persist(Descriptor descriptor) @@ -145,7 +161,7 @@ static Metadata rebuild(Descriptor descriptor, KeySupport keySupport) throw e; } - return new Metadata(recordsCount, fsyncLimit); + return new Metadata(recordsCount, fsyncLimit, true); } static Metadata rebuildAndPersist(Descriptor descriptor, KeySupport keySupport) @@ -156,11 +172,11 @@ static Metadata rebuildAndPersist(Descriptor descriptor, KeySupport keySu } @Override - public String toString() - { + public String toString() { return "Metadata{" + - "fsyncLimit=" + fsyncLimit + - ", recordsCount=" + recordsCount + - '}'; + "fsyncLimit=" + fsyncLimit + + ", needsReplay=" + needsReplay + + ", recordsCount=" + recordsCount + + '}'; } } diff --git a/src/java/org/apache/cassandra/journal/RecordConsumer.java b/src/java/org/apache/cassandra/journal/RecordConsumer.java index 22d2bc4e9f8d..4d2b28b4195b 100644 --- a/src/java/org/apache/cassandra/journal/RecordConsumer.java +++ b/src/java/org/apache/cassandra/journal/RecordConsumer.java @@ -19,6 +19,7 @@ import java.nio.ByteBuffer; +// TODO (desired): rename to SerializedRecordConsumer @FunctionalInterface public interface RecordConsumer { diff --git a/src/java/org/apache/cassandra/journal/RecordPointer.java b/src/java/org/apache/cassandra/journal/RecordPointer.java index 22d874712d9a..8e5c08ca51d8 100644 --- a/src/java/org/apache/cassandra/journal/RecordPointer.java +++ b/src/java/org/apache/cassandra/journal/RecordPointer.java @@ -18,33 +18,29 @@ package org.apache.cassandra.journal; -import com.google.common.primitives.Ints; -import com.google.common.primitives.Longs; +import org.apache.cassandra.db.commitlog.CommitLogPosition; // TODO: make this available in the accord table as an ID -public class RecordPointer implements Comparable +public class RecordPointer extends CommitLogPosition { - public final long segment; // unique segment id - public final int position; // record start position within the segment - public final int size; // full size of the record + public final int length; // full size of the record public final long writtenAt; // only set for periodic mode - public RecordPointer(long segment, int position, int size) + public RecordPointer(long segment, int position, int length) { - this(segment, position, size, 0); + this(segment, position, length, 0); } - public RecordPointer(long segment, int position, int size, long writtenAt) + public RecordPointer(long segment, int position, int length, long writtenAt) { - this.segment = segment; - this.position = position; - this.size = size; + super(segment, position); + this.length = length; this.writtenAt = writtenAt; } public RecordPointer(RecordPointer pointer) { - this(pointer.segment, pointer.position, pointer.size, pointer.writtenAt); + this(pointer.segmentId, pointer.position, pointer.length, pointer.writtenAt); } @Override @@ -55,26 +51,19 @@ public boolean equals(Object other) if (!(other instanceof RecordPointer)) return false; RecordPointer that = (RecordPointer) other; - return this.segment == that.segment + return this.segmentId == that.segmentId && this.position == that.position; } @Override public int hashCode() { - return Long.hashCode(segment) + position * 31; + return Long.hashCode(segmentId) + position * 31; } @Override public String toString() { - return "(" + segment + ", " + position + ')'; - } - - @Override - public int compareTo(RecordPointer that) - { - int cmp = Longs.compare(this.segment, that.segment); - return cmp != 0 ? cmp : Ints.compare(this.position, that.position); + return "(" + segmentId + ", " + position + ')'; } } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/journal/Segment.java b/src/java/org/apache/cassandra/journal/Segment.java index a80bbb4f4cd7..0b34f1451684 100644 --- a/src/java/org/apache/cassandra/journal/Segment.java +++ b/src/java/org/apache/cassandra/journal/Segment.java @@ -66,16 +66,24 @@ public final void tidy() abstract boolean isActive(); abstract boolean isFlushed(long position); - boolean isStatic() { return !isActive(); } abstract ActiveSegment asActive(); abstract StaticSegment asStatic(); + public boolean isStatic() { return !isActive(); } + + public Metadata metadata() + { + return metadata; + } + public long id() { return descriptor.timestamp; } + public abstract void persistMetadata(); + /* * Reading entries (by id, by offset, iterate) */ @@ -110,7 +118,7 @@ boolean readLast(K id, EntrySerializer.EntryHolder into) boolean read(RecordPointer pointer, RecordConsumer consumer) { EntrySerializer.EntryHolder into = new EntrySerializer.EntryHolder<>(); - if (read(pointer.position, pointer.size, into)) + if (read(pointer.position, pointer.length, into)) { consumer.accept(descriptor.timestamp, pointer.position, into.key, into.value, descriptor.userVersion); return true; diff --git a/src/java/org/apache/cassandra/journal/Segments.java b/src/java/org/apache/cassandra/journal/Segments.java index 50c32ed8b693..4e01bd47b42b 100644 --- a/src/java/org/apache/cassandra/journal/Segments.java +++ b/src/java/org/apache/cassandra/journal/Segments.java @@ -128,6 +128,37 @@ void selectActive(long maxTimestamp, Collection> into) into.add(segment.asActive()); } + void select(long minTimestamp, long maxTimestamp, Collection> into) + { + List> sorted = allSorted(true); + int idx = minTimestamp == 0 ? 0 : findIdxFor(minTimestamp); + while (idx < sorted.size()) + { + Segment segment = sorted.get(idx++); + if (segment.descriptor.timestamp > maxTimestamp) + break; + into.add(segment); + } + } + + int findIdxFor(long timestamp) + { + List> sorted = allSorted(true); + int low = 0, mid = sorted.size(), high = mid - 1, res = -1; + while (low <= high) + { + mid = (low + high) >>> 1; + res = Long.compare(timestamp, sorted.get(mid).descriptor.timestamp); + if (res > 0) + low = mid + 1; + else if (res == 0) + return mid; + else + high = mid - 1; + } + throw new IllegalStateException(String.format("Could not find a segment with timestamp %d among %s", timestamp, sorted)); + } + boolean isSwitched(ActiveSegment active) { for (Segment segment : segments.values()) @@ -232,6 +263,7 @@ public void close() @Override public String toString() { + List> sorted = allSorted(true); return sorted.toString(); } diff --git a/src/java/org/apache/cassandra/journal/StaticSegment.java b/src/java/org/apache/cassandra/journal/StaticSegment.java index 35c987c8a480..e78187b1298d 100644 --- a/src/java/org/apache/cassandra/journal/StaticSegment.java +++ b/src/java/org/apache/cassandra/journal/StaticSegment.java @@ -20,6 +20,7 @@ import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.Closeable; +import org.apache.cassandra.utils.SyncUtil; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.concurrent.Ref; import org.apache.cassandra.utils.memory.MemoryUtil; @@ -270,6 +271,13 @@ StaticSegment asStatic() return this; } + @Override + public void persistMetadata() + { + metadata.persist(descriptor); + SyncUtil.trySyncDir(descriptor.directory); + } + /** * Read the entry and specified offset into the entry holder. * Expects the record to have been written at this offset, but potentially not flushed and lost. diff --git a/src/java/org/apache/cassandra/replication/MutationId.java b/src/java/org/apache/cassandra/replication/MutationId.java index 5faf4db7d5cd..259b950b3319 100644 --- a/src/java/org/apache/cassandra/replication/MutationId.java +++ b/src/java/org/apache/cassandra/replication/MutationId.java @@ -46,6 +46,12 @@ public class MutationId extends ShortMutationId */ protected final int timestamp; + public MutationId(long logId, int offset, int timestamp) + { + super(logId, offset); + this.timestamp = timestamp; + } + public MutationId(long logId, long sequenceId) { super(logId, offset(sequenceId)); diff --git a/src/java/org/apache/cassandra/replication/MutationJournal.java b/src/java/org/apache/cassandra/replication/MutationJournal.java index aeccc3f052a7..2de55de42413 100644 --- a/src/java/org/apache/cassandra/replication/MutationJournal.java +++ b/src/java/org/apache/cassandra/replication/MutationJournal.java @@ -20,35 +20,54 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; import java.util.zip.Checksum; - import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.cassandra.journal.*; + +import org.cliffc.high_scale_lib.NonBlockingHashMapLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.utils.Invariants; +import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.db.commitlog.CommitLogPosition; +import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.File; -import org.apache.cassandra.journal.Journal; -import org.apache.cassandra.journal.KeySupport; -import org.apache.cassandra.journal.Params; -import org.apache.cassandra.journal.RecordConsumer; -import org.apache.cassandra.journal.RecordPointer; -import org.apache.cassandra.journal.SegmentCompactor; -import org.apache.cassandra.journal.ValueSerializer; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.Semaphore; +import static org.apache.cassandra.utils.FBUtilities.getAvailableProcessors; + +// TODO (required): handle table truncations public class MutationJournal { public static final MutationJournal instance = new MutationJournal(); + private static final Logger log = LoggerFactory.getLogger(MutationJournal.class); private final Journal journal; + private final Map segmentStateTrackers; + + // Most of the time during write, we will notify last known segment, so we optimistically cache last segment tracker, + // without imposing any visibility guarantees. If we do not see the right segment in this field, we will look it up + // in NBHM. + private SegmentStateTracker lastSegmentTracker; private MutationJournal() { @@ -58,7 +77,38 @@ private MutationJournal() @VisibleForTesting MutationJournal(File directory, Params params) { - journal = new Journal<>("MutationJournal", directory, params, new MutationIdSupport(), new MutationSerializer(), SegmentCompactor.noop()); + journal = new Journal<>("MutationJournal", directory, params, MutationIdSupport.INSTANCE, MutationSerializer.INSTANCE, SegmentCompactor.noop()) { + @Override + protected void closeActiveSegmentAndOpenAsStatic(ActiveSegment activeSegment, Runnable onDone) + { + super.closeActiveSegmentAndOpenAsStatic(activeSegment, + () -> { + maybeCleanupStaticSegment(Invariants.nonNull(getSegment(activeSegment.id()))); + if (onDone != null) onDone.run(); + }); + } + }; + segmentStateTrackers = new NonBlockingHashMapLong<>(); + } + + public CommitLogPosition getCurrentPosition() + { + return journal.currentActiveSegment().currentPosition(); + } + + // If all Memtables associated with given segment were flushed by the time we have closed active segment + // and opened it as static, mark its metadata to indicate it does not need replay. It may happen that we + // crash before persisting this metadata, in which case we will unnecessarily replay the segment, which + // has no correctness implications. + private void maybeCleanupStaticSegment(Segment segment) + { + Invariants.require(segment.isStatic()); + SegmentStateTracker tracker = segmentStateTrackers.get(segment.id()); + if (tracker != null && tracker.removeCleanFromDirty()) + { + segment.metadata().clearNeedsReplay(); + segment.persistMetadata(); + } } public void start() @@ -73,7 +123,40 @@ public void shutdownBlocking() public RecordPointer write(ShortMutationId id, Mutation mutation) { - return journal.blockingWrite(id, mutation); + // TODO (required): why are we using blocking write here? We can/should wait for completion on `close` of WriteContext. + RecordPointer ptr = journal.blockingWrite(id, mutation); + + // IMPORTANT: there should be no way for mutation to be applied to memtable before we mark it as dirty here, + // since this will introduce a race between marking as dirty and marking as clean. + for (TableId tableId : mutation.getTableIds()) + { + SegmentStateTracker tracker = lastSegmentTracker; + if (tracker == null || tracker.segmentId() != ptr.segmentId) + { + tracker = segmentStateTrackers.computeIfAbsent(ptr.segmentId, SegmentStateTracker::new); + lastSegmentTracker = tracker; + } + + tracker.markDirty(tableId, ptr); + } + + return ptr; + } + + /** + * Called by post-flush callback, Memtable is fully flushed to SSTable. + */ + public void notifyFlushed(TableId tableId, CommitLogPosition lowerBound, CommitLogPosition upperBound) + { + for (Segment segment : journal.getSegments(lowerBound.segmentId, upperBound.segmentId)) + { + Invariants.nonNull(segmentStateTrackers.get(segment.id())).markClean(tableId, lowerBound, upperBound); + + // We can only safely mark static segments as non-replayable. Active segment can still be written to, + // so we only persist this metadata on flush. + if (segment.isStatic()) + maybeCleanupStaticSegment(segment); + } } @Nullable @@ -116,6 +199,89 @@ public void readAll(Iterable ids, Collection into) } } + public void replayStaticSegments() + { + replay(new DeserializedRecordConsumer<>(MutationSerializer.INSTANCE) + { + @Override + protected void accept(long segmentId, int position, ShortMutationId key, Mutation value) + { + if (Schema.instance.getKeyspaceMetadata(value.getKeyspaceName()) == null) + return; + // TODO: if (commitLogReplayer.pointInTimeExceeded(mutation)) + final Keyspace keyspace = Keyspace.open(value.getKeyspaceName()); + + Mutation.PartitionUpdateCollector newPUCollector = null; + // TODO (required): replayFilter + for (Map.Entry e : value.modifications().entrySet()) + { + PartitionUpdate update = e.getValue(); + update.validate(); + if (Schema.instance.getTableMetadata(update.metadata().id) == null) + continue; // dropped + TableId tableId = e.getKey(); + + // Start segment state tracking + segmentStateTrackers.computeIfAbsent(segmentId, SegmentStateTracker::new) + .markDirty(tableId, segmentId, position); + // TODO (required): shouldReplay + if (newPUCollector == null) + newPUCollector = new Mutation.PartitionUpdateCollector(value.id(), value.getKeyspaceName(), value.key()); + newPUCollector.add(update); + // TODO (required): replayedCount + } + if (newPUCollector != null) + { + assert !newPUCollector.isEmpty(); + keyspace.apply(newPUCollector.build(), false, true, false); + } + } + }, getAvailableProcessors()); + } + + @VisibleForTesting + public void replay(DeserializedRecordConsumer replayOne, int parallelism) + { + try (Journal.StaticSegmentKeyIterator iter = + journal.staticSegmentKeyIterator(s -> s.isStatic() + && s.metadata().totalCount() > 0 + && s.metadata().needsReplay())) + { + final Semaphore replayParallelism = Semaphore.newSemaphore(parallelism); + final AtomicBoolean abort = new AtomicBoolean(); + + while (iter.hasNext() && !abort.get()) + { + Journal.KeyRefs v = iter.next(); + v = v; // Make sure it can not be used in async lambda by accident + ShortMutationId key = v.key(); + long lastSegment = v.lastSegment(); + // TODO: respect SystemKeyspace.getTruncatedPosition(cfs.metadata.id); + replayParallelism.acquireThrowUncheckedOnInterrupt(1); + Stage.MUTATION.submit(() -> journal.readLast(key, lastSegment, replayOne)) + .addCallback(new BiConsumer() + { + @Override + public void accept(Object o, Throwable fail) + { + if (fail != null && !journal.handleError("Could not replay mutation " + key, fail)) + abort.set(true); + replayParallelism.release(1); + } + }); + } + + // Wait for all mutations to be applied before returning + replayParallelism.acquireThrowUncheckedOnInterrupt(parallelism); + } + } + + @VisibleForTesting + public void closeCurrentSegmentForTestingIfNonEmpty() + { + journal.closeCurrentSegmentForTestingIfNonEmpty(); + } + static class JournalParams implements Params { @Override @@ -175,6 +341,8 @@ public int userVersion() static class MutationIdSupport implements KeySupport { + static final MutationIdSupport INSTANCE = new MutationIdSupport(); + static final int LOG_ID_OFFSET = 0; static final int OFFSET_OFFSET = LOG_ID_OFFSET + TypeSizes.LONG_SIZE; @@ -245,17 +413,20 @@ public int compare(ShortMutationId id1, ShortMutationId id2) } } - static class MutationSerializer implements ValueSerializer + public static class MutationSerializer implements ValueSerializer { + public static MutationSerializer INSTANCE = new MutationSerializer(); @Override public void serialize(ShortMutationId id, Mutation mutation, DataOutputPlus out, int userVersion) throws IOException { + Invariants.require(id.hostId != Integer.MIN_VALUE); Mutation.serializer.serialize(mutation, out, userVersion); } @Override public Mutation deserialize(ShortMutationId id, DataInputPlus in, int userVersion) throws IOException { + Invariants.require(id.hostId != Integer.MIN_VALUE); return Mutation.serializer.deserialize(in, userVersion); } } diff --git a/src/java/org/apache/cassandra/replication/SegmentStateTracker.java b/src/java/org/apache/cassandra/replication/SegmentStateTracker.java new file mode 100644 index 000000000000..739af6d48ddd --- /dev/null +++ b/src/java/org/apache/cassandra/replication/SegmentStateTracker.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.replication; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import accord.utils.Invariants; +import com.google.common.annotations.VisibleForTesting; +import org.apache.cassandra.db.commitlog.CommitLogPosition; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.utils.IntegerInterval; + +/** + * Tracks offsets of clean (i.e. memtable->sstable flushed) and dirty (i.e. not yet durably persisted in sstable) + * allocations. + * + * Mutations in segments marked as clean do not need to be replayed. + * + * Tracks which parts of a commit log segment contain unflushed data for each table, and determines when all + * mutations associated with a segment are fully memtable->sstable flushed + * + * Maintains per-table states: + * - "dirty" high mark (bumped when new allocation is made in the segment) + * - "clean" intervals (min/max bounds reported via memtable flushes) + * + * A segment is considered clean when all dirty intervals are covered by clean intervals for every table. + */ +public class SegmentStateTracker +{ + final long segmentId; + + private final Map states = new HashMap<>(32); + private final Lock lock = new ReentrantLock(); + + public SegmentStateTracker(long segmentId) + { + this.segmentId = segmentId; + } + + public long segmentId() + { + return segmentId; + } + + @VisibleForTesting + public boolean isClean() + { + removeCleanFromDirty(); + lock.lock(); + try + { + return states.isEmpty(); + } + finally + { + lock.unlock(); + } + } + + /** + * Should be called _only_ for a static segment to ensure there can be no way for interval state + * to go back from clean to dirty. + * + * Removes all clean (i.e. memtable -> sstable flushed) from dirty interval. If metadata tracking for all intervals of all tables + * are clean, returns true. False otherwise. + */ + public boolean removeCleanFromDirty() + { + List> states; + // Take a "snapshot" of states, while holding a lock + lock.lock(); + try + { + states = new ArrayList<>(this.states.entrySet()); + } + finally + { + lock.unlock(); + } + + int[] remove = new int[states.size()]; + int removeCount = 0; + + // Check if any of the remaining items can be cleaned up, without holding a lock + for (int i = 0; i < states.size(); i++) + { + IntervalState state = states.get(i).getValue(); + if (!state.isDirty()) + remove[removeCount++] = i; + } + + // Remove all fully covered items, while holding a lock + if (removeCount > 0) + { + lock.lock(); + try + { + if (this.states.size() == removeCount) + { + this.states.clear(); + return true; + } + + for (int i = 0; i < removeCount; i++) + { + Map.Entry e = states.get(remove[i]); + this.states.remove(e.getKey()); + } + } + finally + { + lock.unlock(); + } + } + + return false; + } + + public void markDirty(TableId tableId, CommitLogPosition ptr) + { + markDirty(tableId, ptr.segmentId, ptr.position); + } + + public void markDirty(TableId tableId, long segmentId, int position) + { + Invariants.require(segmentId == this.segmentId); + IntervalState state; + lock.lock(); + try + { + state = states.computeIfAbsent(tableId, (k) -> { + // Initialize with given position as both low and high bound to ensure we correctly set + // lower bound when marking as clean + return new IntervalState(position, position); + }); + } + finally + { + lock.unlock(); + } + state.markDirty(position); + } + + public void markClean(TableId tableId, CommitLogPosition lowerBound, CommitLogPosition upperBound) + { + Invariants.require(lowerBound.compareTo(upperBound) <= 0, "%s should be smaller than %s", lowerBound, upperBound); + if (lowerBound.segmentId > segmentId || upperBound.segmentId < segmentId) + return; + + IntervalState state; + lock.lock(); + try + { + state = states.get(tableId); + } + finally + { + lock.unlock(); + } + + if (state != null) + { + // TODO (required): test this logic + // Only mark clean ranges for _this_ segment + int lower = lowerBound.segmentId == segmentId ? lowerBound.position : 0; + int upper = upperBound.segmentId == segmentId ? upperBound.position : Integer.MAX_VALUE; + state.markClean(lower, upper); + } + } + + private static class IntervalState + { + static final long[] EMPTY = new long[0]; + + // dirty interval in this segment; if interval is not covered by the clean set, the log contains unflushed data + volatile long dirty; + // clean intervals; separate map from above to permit marking Cfs clean whilst the log is still in use + volatile long[] clean = EMPTY; + + private static final AtomicLongFieldUpdater dirtyUpdater = AtomicLongFieldUpdater.newUpdater (IntervalState.class, "dirty"); + private static final AtomicReferenceFieldUpdater cleanUpdater = AtomicReferenceFieldUpdater.newUpdater(IntervalState.class, long[].class, "clean"); + + public IntervalState(int lower, int upper) + { + this(make(lower, upper)); + } + + private IntervalState(long dirty) + { + this.dirty = dirty; + } + + public void markClean(int start, int end) + { + long[] prev; + long[] next; + do + { + prev = this.clean; + next = IntegerInterval.Set.add(prev, start, end); + } + while (!cleanUpdater.compareAndSet(this, prev, next)); + } + + public boolean isDirty() + { + long[] clean = this.clean; + long dirty = this.dirty; + return !IntegerInterval.Set.covers(clean, lower(dirty), upper(dirty)); + } + + /** + * Expands the interval to cover the given value by extending one of its sides if necessary. + * Mutates this. Thread-safe. + */ + public void markDirty(int value) + { + long prev; + int lower; + int upper; + do + { + prev = dirty; + upper = upper(prev); + lower = lower(prev); + if (value > upper) // common case + upper = value; + else if (value < lower) + lower = value; + } + while (!dirtyUpdater.compareAndSet(this, prev, make(lower, upper))); + } + + public String toString() + { + long dirty = this.dirty; + long[] clean = this.clean; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < clean.length; i++) + { + long l = clean[i]; + if (i > 0) + sb.append(','); + sb.append('[').append(lower(l)).append(',').append(upper(l)).append("]"); + } + return "dirty:[" + lower(dirty) + ',' + upper(dirty) + "], clean:[" + sb + "]"; + } + + private static long make(int lower, int upper) + { + assert lower <= upper; + return ((lower & 0xFFFFFFFFL) << 32) | upper & 0xFFFFFFFFL; + } + + private static int lower(long interval) + { + return (int) (interval >>> 32); + } + + private static int upper(long interval) + { + return (int) interval; + } + } + + @Override + public String toString() { + return "DefaultStateTracker{" + + "segmentId=" + segmentId + + ", states=" + states + + '}'; + } +} diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index b66611d8f93d..11647410e2f5 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -340,6 +340,7 @@ protected void setup() try { CommitLog.instance.recoverSegmentsOnDisk(); + MutationJournal.instance.replayStaticSegments(); } catch (IOException e) { diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index 08819934d0a1..aa568104a2fe 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -343,7 +343,7 @@ public void saveCommand(int commandStoreId, CommandUpdate update, @Nullable Runn && diff.after.route() != null) journal.onDurable(pointer, () -> journalTable.safeNotify(index -> - index.update(pointer.segment, key.commandStoreId, key.id, diff.after.route()))); + index.update(pointer.segmentId, key.commandStoreId, key.id, diff.after.route()))); if (onFlush != null) journal.onDurable(pointer, onFlush); } diff --git a/src/java/org/apache/cassandra/tcm/Startup.java b/src/java/org/apache/cassandra/tcm/Startup.java index 7fc336809c53..7333bc468a7f 100644 --- a/src/java/org/apache/cassandra/tcm/Startup.java +++ b/src/java/org/apache/cassandra/tcm/Startup.java @@ -31,6 +31,7 @@ import java.util.function.Supplier; import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.cassandra.replication.MutationJournal; import org.apache.cassandra.replication.MutationTrackingService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -283,6 +284,7 @@ public static void initializeFromGossip(Function wrapProce try { CommitLog.instance.recoverSegmentsOnDisk(); + MutationJournal.instance.replayStaticSegments(); } catch (IOException e) { diff --git a/src/java/org/apache/cassandra/utils/IntegerInterval.java b/src/java/org/apache/cassandra/utils/IntegerInterval.java index b26ac45f0675..d5e43289e625 100644 --- a/src/java/org/apache/cassandra/utils/IntegerInterval.java +++ b/src/java/org/apache/cassandra/utils/IntegerInterval.java @@ -22,9 +22,12 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.stream.Collectors; +import accord.utils.Invariants; import com.google.common.collect.Lists; import com.google.common.primitives.Longs; +import net.nicoulaj.compilecommand.annotations.Inline; + /** * Mutable integer interval class, thread-safe. * Represents the interval [lower,upper]. @@ -32,7 +35,7 @@ public class IntegerInterval { volatile long interval; - private static AtomicLongFieldUpdater intervalUpdater = + private static final AtomicLongFieldUpdater intervalUpdater = AtomicLongFieldUpdater.newUpdater(IntegerInterval.class, "interval"); private IntegerInterval(long interval) @@ -126,7 +129,7 @@ private static int upper(long interval) */ public static class Set { - static long[] EMPTY = new long[0]; + static final long[] EMPTY = new long[0]; private volatile long[] ranges = EMPTY; @@ -135,11 +138,15 @@ public static class Set */ public synchronized void add(int start, int end) { - assert start <= end; - long[] ranges, newRanges; - { - ranges = this.ranges; // take local copy to avoid risk of it changing in the midst of operation + this.ranges = add(this.ranges, start, end); + } + @Inline + public static long[] add(long[] ranges, int start, int end) + { + Invariants.require(start <= end, "Start (%d) should be less than or equal to end (%d)", start, end); + long[] newRanges; + { // extend ourselves to cover any ranges we overlap // record directly preceding our end may extend past us, so take the max of our end and its int rpos = Arrays.binarySearch(ranges, ((end & 0xFFFFFFFFL) << 32) | 0xFFFFFFFFL); // floor (i.e. greatest <=) of the end position @@ -174,7 +181,7 @@ public synchronized void add(int start, int end) for (int i = rpos + 1; i < ranges.length; ++i) newRanges[dest++] = ranges[i]; } - this.ranges = newRanges; + return newRanges; } /** @@ -191,8 +198,13 @@ public boolean covers(IntegerInterval iv) */ public boolean covers(int start, int end) { - long[] ranges = this.ranges; // take local copy to avoid risk of it changing in the midst of operation - int rpos = Arrays.binarySearch(ranges, ((start & 0xFFFFFFFFL) << 32) | 0xFFFFFFFFL); // floor (i.e. greatest <=) of the end position + return covers(this.ranges, start, end); + } + + @Inline + public static boolean covers(long[] ranges, int start, int end) + { + int rpos = Arrays.binarySearch(ranges, ((start & 0xFFFFFFFFL) << 32) | 0xFFFFFFFFL); // floor (i.e. greatest <=) of the end position if (rpos < 0) rpos = (-1 - rpos) - 1; if (rpos == -1) @@ -219,7 +231,7 @@ public int upperBound() public Collection intervals() { - return Lists.transform(Longs.asList(ranges), iv -> new IntegerInterval(iv)); + return Lists.transform(Longs.asList(ranges), IntegerInterval::new); } @Override diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index bd9f69451ab6..dfa676b074a5 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -825,6 +825,7 @@ protected void partialStartup(ICluster cluster) throws IOException, NoSuchFie try { CommitLog.instance.recoverSegmentsOnDisk(); + MutationJournal.instance.replayStaticSegments(); } catch (IOException e) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounceTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounceTest.java new file mode 100644 index 000000000000..3e0a1ad260dc --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounceTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.tracking; + +import java.util.ArrayList; +import java.util.List; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.test.log.FuzzTestBase; +import org.apache.cassandra.harry.SchemaSpec; +import org.apache.cassandra.harry.dsl.HistoryBuilder; +import org.apache.cassandra.harry.dsl.ReplayingHistoryBuilder; +import org.apache.cassandra.harry.execution.InJvmDTestVisitExecutor; +import org.apache.cassandra.harry.gen.Generator; +import org.apache.cassandra.harry.gen.SchemaGenerators; +import org.apache.cassandra.replication.MutationJournal; +import org.junit.Test; + + +import static org.apache.cassandra.harry.checker.TestHelper.withRandom; + +public class MutationTrackingBounceTest extends FuzzTestBase +{ + private static final int POPULATION = 1000; + + @Test + public void bounceTest() throws Throwable + { + try (Cluster cluster = builder().withNodes(1).start()) + { + int tables = 10; + int writesPerKey = 2; + int pks = 100; + withRandom(rng -> { + cluster.schemaChange(String.format("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1} " + + "AND replication_type='tracked'", + KEYSPACE)); + + List builders = new ArrayList<>(); + for (int i = 0; i < tables; i++) + { + Generator schemaGen = SchemaGenerators.trivialSchema(KEYSPACE, () -> "mutation_tracking_bounce_" + (builders.size() + 1), POPULATION, + SchemaSpec.optionsBuilder()); + + SchemaSpec schema = schemaGen.generate(rng); + cluster.schemaChange(schema.compile()); + builders.add(new ReplayingHistoryBuilder(schema.valueGenerators, + hb -> InJvmDTestVisitExecutor.builder() + .consistencyLevel(ConsistencyLevel.QUORUM) + .build(schema, hb, cluster))); + } + + int counter = 0; + for (int pk = 0; pk < pks; pk++) { + for (HistoryBuilder history : builders) + for (int i = 0; i < writesPerKey; i++) + history.insert(pk); + + if (++counter % 10 == 0) + cluster.get(1).runOnInstance(() -> MutationJournal.instance.closeCurrentSegmentForTestingIfNonEmpty()); + } + + ClusterUtils.stopUnchecked(cluster.get(1)); + cluster.get(1).startup(); + + for (int pk = 0; pk < pks; pk++) + for (HistoryBuilder history : builders) + for (int i = 0; i < 10; i++) + history.selectPartition(pk); + + cluster.get(1).runOnInstance(new MutationTrackingBounce_ValidateRunnable(tables * pks * writesPerKey)); + }); + } + } +} \ No newline at end of file diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounce_ValidateRunnable.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounce_ValidateRunnable.java new file mode 100644 index 000000000000..0afb402b1dfb --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounce_ValidateRunnable.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.tracking; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.distributed.api.IIsolatedExecutor; +import org.apache.cassandra.journal.DeserializedRecordConsumer; +import org.apache.cassandra.replication.MutationJournal; +import org.apache.cassandra.replication.MutationTrackingService; +import org.apache.cassandra.replication.ShortMutationId; +import org.junit.Assert; + +// Separate class for MutationTrackingBounceTest, since without it we were getting non-serializable class exceptions, likely due to static fields +public class MutationTrackingBounce_ValidateRunnable implements IIsolatedExecutor.SerializableRunnable +{ + private final int count; + + public MutationTrackingBounce_ValidateRunnable(int expectedMutations) + { + this.count = expectedMutations; + } + + @Override + public void run() + { + AtomicInteger counter = new AtomicInteger(); + MutationJournal.instance.replay(new DeserializedRecordConsumer<>(MutationJournal.MutationSerializer.INSTANCE) + { + Set seen = new HashSet<>(); + @Override + protected void accept(long segment, int position, ShortMutationId key, Mutation mutation) + { + if (!seen.add(key)) + throw new AssertionError(String.format("Should have witnessed each key just once, but seen %s already", key)); + + for (PartitionUpdate partitionUpdate : mutation.getPartitionUpdates()) + { + if (!MutationTrackingService.instance.createSummaryForKey(partitionUpdate.partitionKey(), partitionUpdate.metadata().id, false) + .contains(key)) + { + throw new AssertionError(String.format("Mutation %s should have been witnessed (%s)", mutation, key)); + } + } + counter.incrementAndGet(); + } + }, 1); + Assert.assertEquals(count, counter.get()); + } +} diff --git a/test/harry/main/org/apache/cassandra/harry/checker/TestHelper.java b/test/harry/main/org/apache/cassandra/harry/checker/TestHelper.java index f44b6e5fa2c0..ee930bbd3ef1 100644 --- a/test/harry/main/org/apache/cassandra/harry/checker/TestHelper.java +++ b/test/harry/main/org/apache/cassandra/harry/checker/TestHelper.java @@ -18,6 +18,7 @@ package org.apache.cassandra.harry.checker; +import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +48,20 @@ public static void withRandom(long seed, ModelChecker.ThrowingConsumer rng, Consumer onError) + { + try + { + logger.info("Seed: {}", seed); + rng.accept(new JdkRandomEntropySource(seed)); + } + catch (Throwable t) + { + onError.accept(t); + throw new AssertionError(String.format("Caught an exception at seed:%dL", seed), t); + } + } + public static void repeat(int num, ExecUtil.ThrowingSerializableRunnable r) { for (int i = 0; i < num; i++) diff --git a/test/unit/org/apache/cassandra/journal/SegmentsTest.java b/test/unit/org/apache/cassandra/journal/SegmentsTest.java new file mode 100644 index 000000000000..7e5922237a7d --- /dev/null +++ b/test/unit/org/apache/cassandra/journal/SegmentsTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.journal; + +import java.io.File; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import org.apache.cassandra.utils.concurrent.Ref; +import org.junit.Test; + + +import static org.apache.cassandra.harry.checker.TestHelper.withRandom; + +public class SegmentsTest +{ + @Test + public void testSelect() + { + withRandom(0l, rng -> { + // Create mock segments with different timestamps + java.io.File file = File.createTempFile("segments", "test"); + List> segmentList = new ArrayList<>(); + Set taken = new HashSet<>(); + for (int i = 0; i < 100; i++) + { + while (true) + { + long id = rng.nextLong(0, 10_000); + if (taken.add(id)) + { + segmentList.add(new TestSegment<>(file, id)); + break; + } + } + } + segmentList.sort(Comparator.comparing(s -> s.descriptor.timestamp)); + + Segments segments = Segments.of(segmentList); + for (int i = 0; i < 10_000; i++) + { + // Generate two distinct segment idxs + int i1 = rng.nextInt(segmentList.size()); + int i2; + do + { + i2 = rng.nextInt(segmentList.size()); + } + while (i2 == i1); + int min = Math.min(i1, i2); + int max = Math.max(i1, i2); + List> selected = new ArrayList<>(); + segments.select(segmentList.get(min).id(), + segmentList.get(max).id(), + selected); + List> expected = segmentList.subList(min, max + 1); + if (!Objects.equals(expected, selected)) + { + throw new AssertionError(String.format("\nExpected: %s\n" + + "Selected: %s", + expected, + selected)); + } + } + }); + } + + private static class TestSegment extends Segment + { + TestSegment(File dir, long timestamp) + { + super(Descriptor.create(new org.apache.cassandra.io.util.File(dir), timestamp, 1), null, null); + } + + @Override + void close(Journal journal) + { + + } + + @Override + public boolean isActive() + { + return false; + } + + @Override public boolean isStatic() + { + return false; + } + + @Override Index index() { throw new UnsupportedOperationException(); } + @Override boolean isFlushed(long position) { throw new UnsupportedOperationException(); } + @Override public void persistMetadata() { throw new UnsupportedOperationException(); } + @Override boolean read(int offset, int size, EntrySerializer.EntryHolder into) { throw new UnsupportedOperationException(); } + @Override public ActiveSegment asActive() { throw new UnsupportedOperationException(); } + @Override public StaticSegment asStatic() { throw new UnsupportedOperationException(); } + @Override public Ref> selfRef() { throw new UnsupportedOperationException(); } + @Override public Ref> tryRef(){ throw new UnsupportedOperationException(); } + @Override public Ref> ref(){ throw new UnsupportedOperationException(); } + + @Override + public String toString() + { + return "TestSegment{" + + "id=" + descriptor.timestamp + + '}'; + } + + @Override + public boolean equals(Object obj) + { + TestSegment other = (TestSegment) obj; + return descriptor.equals(other.descriptor); + } + } +} \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/replication/MutationJournalReplayTest.java b/test/unit/org/apache/cassandra/replication/MutationJournalReplayTest.java new file mode 100644 index 000000000000..7a5436b6736d --- /dev/null +++ b/test/unit/org/apache/cassandra/replication/MutationJournalReplayTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.replication; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import net.openhft.chronicle.wire.ValueOut; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.commitlog.CommitLogPosition; +import org.apache.cassandra.harry.checker.TestHelper; +import org.apache.cassandra.journal.DeserializedRecordConsumer; +import org.apache.cassandra.schema.ReplicationType; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.tools.FieldUtil; +import org.apache.cassandra.utils.Pair; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.journal.TestParams; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; + + +import static org.apache.cassandra.replication.MutationJournalTest.assertMutationsEqual; +import static org.junit.Assert.assertEquals; + +public class MutationJournalReplayTest +{ + private static final String KEYSPACE = "ks"; + private static final String TABLE_PREFIX = "tbl"; + private static final TableMetadata[] TABLES = new TableMetadata[10]; + private static MutationJournal journal; + + @BeforeClass + public static void setUp() throws IOException + { + SchemaLoader.prepareServer(); + + File directory = new File(Files.createTempDirectory("mutation-journal-replay-test")); + directory.deleteRecursiveOnExit(); + + journal = new MutationJournal(directory, new TestParams(MessagingService.current_version) + { + @Override + public long flushPeriod(TimeUnit units) + { + return 1; + } + + @Override + public FlushMode flushMode() + { + return FlushMode.PERIODIC; + } + }); + FieldUtil.setInstanceUnsafe(MutationJournal.class, journal, "instance"); + journal.start(); + + for (int i = 0; i < TABLES.length; i++) + { + TABLES[i] = TableMetadata.builder(KEYSPACE, String.format("%s_%d", TABLE_PREFIX, i)) + .keyspaceReplicationType(ReplicationType.tracked) + .addPartitionKeyColumn("pk", UTF8Type.instance) + .addClusteringColumn("ck", UTF8Type.instance) + .addRegularColumn("value", UTF8Type.instance) + .build(); + } + SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1, ReplicationType.tracked), TABLES); + } + + @AfterClass + public static void tearDown() + { + journal.shutdownBlocking(); + } + + @Test + public void testReplay() throws Throwable + { + long seed = 0l; + TestHelper.withRandom(seed, + rng -> { + List original = new ArrayList<>(); + for (int i = 1; i <= 10_000; i++) + { + MutationId id = new MutationId(100L, i, i); + Mutation mutation = mutation(i % 10, i).withMutationId(id); + journal.write(id, mutation); + original.add(mutation); + if (i % rng.nextInt(1, 100) > 90) + journal.closeCurrentSegmentForTestingIfNonEmpty(); + } + + journal.closeCurrentSegmentForTestingIfNonEmpty(); + + List replayed = new ArrayList<>(); + journal.replay(new DeserializedRecordConsumer(MutationJournal.MutationSerializer.INSTANCE) + { + @Override + protected void accept(long segment, int position, ShortMutationId key, Mutation mutation) + { + replayed.add(mutation); + } + }, 1); + + assertMutationsEqual(original, replayed); + }); + } + + @Test + public void testReplayFlushed() throws Throwable + { + long seed = 0l; + class Bounds + { + final CommitLogPosition start; + final CommitLogPosition end; + final int count; + + Bounds(CommitLogPosition start, CommitLogPosition end, int count) + { + this.start = start; + this.end = end; + this.count = count; + } + } + TestHelper.withRandom(seed, + rng -> { + List original = new ArrayList<>(); + + List testFlushBounds = new ArrayList<>(); + CommitLogPosition prevPos = journal.getCurrentPosition(); + int count = 0; + for (int i = 1; i <= 1000; i++) + { + MutationId id = new MutationId(100L, i, i); + Mutation mutation = mutation(i % TABLES.length, i).withMutationId(id); + journal.write(id, mutation); + count++; + original.add(mutation); + if (i % rng.nextInt(1, 100) > 90) + { + CommitLogPosition curPos = journal.getCurrentPosition(); + journal.closeCurrentSegmentForTestingIfNonEmpty(); + testFlushBounds.add(new Bounds(prevPos, curPos, count)); + count = 0; + prevPos = curPos; + } + } + + journal.closeCurrentSegmentForTestingIfNonEmpty(); + + int flushed = 0; + for (Bounds bounds : testFlushBounds) + { + if (rng.nextBoolean()) + { + for (TableMetadata table : TABLES) + journal.notifyFlushed(table.id, bounds.start, bounds.end); + flushed += bounds.count; + } + } + + List replayed = new ArrayList<>(); + journal.replay(new DeserializedRecordConsumer(MutationJournal.MutationSerializer.INSTANCE) + { + @Override + protected void accept(long segment, int position, ShortMutationId key, Mutation mutation) + { + replayed.add(mutation); + } + }, 1); + + Assert.assertEquals(original.size() - flushed, + replayed.size()); + }); + } + + + private static String CACHED_STRING = null; + private static Mutation mutation(int table, int value) + { + if (CACHED_STRING == null) + { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 512; i++) + { + sb.append('.'); + } + CACHED_STRING = sb.toString(); + } + return new RowUpdateBuilder(TABLES[table], 0, "key_" + value) + .clustering("ck") + .add("value", CACHED_STRING) + .build(); + } +} diff --git a/test/unit/org/apache/cassandra/replication/MutationJournalTest.java b/test/unit/org/apache/cassandra/replication/MutationJournalTest.java index a0c778a3e482..ba741799aa1c 100644 --- a/test/unit/org/apache/cassandra/replication/MutationJournalTest.java +++ b/test/unit/org/apache/cassandra/replication/MutationJournalTest.java @@ -127,19 +127,20 @@ public void testWriteManyReadMany() assertMutationsEqual(expected, actual); } - private static void assertMutationEquals(Mutation expected, Mutation actual) + public static void assertMutationEquals(Mutation expected, Mutation actual) { - assertEquals(serialize(expected), serialize(actual)); + if (!serialize(expected).equals(serialize(actual))) + throw new AssertionError(String.format("Expected %s but got %s", expected, actual)); } - private static void assertMutationsEqual(List expected, List actual) + public static void assertMutationsEqual(List expected, List actual) { assertEquals(expected.size(), actual.size()); for (int i = 0; i < expected.size(); i++) assertMutationEquals(expected.get(i), actual.get(i)); } - private static ByteBuffer serialize(Mutation mutation) + public static ByteBuffer serialize(Mutation mutation) { try (DataOutputBuffer out = DataOutputBuffer.scratchBuffer.get()) { diff --git a/test/unit/org/apache/cassandra/replication/SegmentStateTrackerTest.java b/test/unit/org/apache/cassandra/replication/SegmentStateTrackerTest.java new file mode 100644 index 000000000000..928deca00890 --- /dev/null +++ b/test/unit/org/apache/cassandra/replication/SegmentStateTrackerTest.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.replication; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import org.apache.cassandra.db.commitlog.CommitLogPosition; +import org.apache.cassandra.harry.checker.TestHelper; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.utils.Pair; +import org.junit.Assert; +import org.junit.Test; + + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class SegmentStateTrackerTest +{ + @Test + public void trivialTest() + { + long segment = 123L; + SegmentStateTracker tracker = new SegmentStateTracker(segment); + TableId tbl = TableId.generate(); + + assertTrue(tracker.isClean()); + for (int i = 1; i <= 10; i++) + { + tracker.markDirty(tbl, segment, i); + assertFalse(tracker.isClean()); + if (i % 5 == 0) + { + tracker.markClean(tbl, pos(segment, i - 5), pos(segment, i)); + assertTrue(tracker.isClean()); + } + } + assertTrue(tracker.isClean()); + } + + @Test + public void memtableSpannignMultipleSegmentsTest() + { + SegmentStateTracker segment1 = new SegmentStateTracker(1); + SegmentStateTracker segment2 = new SegmentStateTracker(2); + TableId tbl = TableId.fromLong(1); + + segment1.markDirty(tbl, segment1.segmentId(), 10); + segment2.markDirty(tbl, segment2.segmentId(), 10); + segment2.markDirty(tbl, segment2.segmentId(), 20); + + segment1.markClean(tbl, pos(segment1.segmentId(), 10), pos(segment2.segmentId(), 10)); // first flush + segment2.markClean(tbl, pos(segment1.segmentId(), 10), pos(segment2.segmentId(), 10)); // first flush + segment2.markClean(tbl, pos(segment2.segmentId(), 10), pos(segment2.segmentId(), 30)); // second flush + Assert.assertTrue(segment1.isClean()); + Assert.assertTrue(segment2.isClean()); + } + + @Test + public void fuzzTest() + { + int allocationCount = 10_000; + long seed = 0; + List ops = null; // new ArrayList<>(); // to enable operation logging + AtomicLong tableIdGen = new AtomicLong(); + TestHelper.withRandom(seed, + rng -> { + // This test tracks state from the perspective of state tracker: + // We make allocations by bumping the pointer, reporting allocations to different tables. + // Table can flush subsequent allocations, and report information back to the segment. + class Table + { + final TableId tableId = TableId.fromLong(tableIdGen.getAndIncrement()); + + // Track all allocations to a cfid in a Table, and perform contiguous "flushes" + // that perform position bounds to tracker + ArrayList unflushedAllocations = new ArrayList<>(); + + // Memtables _always_ report contiguous chunks of metadata, see CFS$Flush, new memtable + // will always be created with commitLogUpperBound of the previous one. + CommitLogPosition lastFlushMax = null; + + void addAllocation(CommitLogPosition pos) + { + if (ops != null) ops.add("Allocate " + pos + " in " + tableId); + unflushedAllocations.add(pos); + } + + void flush(Collection trackers) + { + if (unflushedAllocations.isEmpty()) + return; + + CommitLogPosition min = lastFlushMax == null ? unflushedAllocations.get(0) : lastFlushMax; + CommitLogPosition max = unflushedAllocations.get(rng.nextInt(unflushedAllocations.size())); + lastFlushMax = max; + + if (ops != null) ops.add(String.format("Flush %s [%s, %s]", tableId, min, max)); + for (SegmentStateTracker tracker : trackers) + reportFlushed(tracker, min, max); + + // TODO (required): use array an copying instead + unflushedAllocations.removeIf(alloc -> alloc.compareTo(min) >= 0 && alloc.compareTo(max) <= 0); + } + + boolean hasUnflushed() + { + return !unflushedAllocations.isEmpty(); + } + + boolean hasUnflushedFor(long segment) + { + for (CommitLogPosition alloc : unflushedAllocations) + { + if (alloc.segmentId == segment) + return true; + } + return false; + } + + List getUnflushedFor(long segment) + { + List unflushed = new ArrayList<>(); + for (CommitLogPosition alloc : unflushedAllocations) + { + if (alloc.segmentId == segment) + unflushed.add(alloc); + } + return unflushed; + } + + void reportFlushed(SegmentStateTracker tracker, CommitLogPosition minBound, CommitLogPosition maxBound) + { + if (tracker.segmentId() >= minBound.segmentId && tracker.segmentId() <= maxBound.segmentId) + tracker.markClean(tableId, minBound, maxBound); + } + } + + int tableCount = 10; + List tables = new ArrayList<>(tableCount); + for (int i = 0; i < tableCount; i++) + tables.add(new Table()); + + Map segments = new HashMap<>(); + Runnable validateAllSegments = () -> { + for (SegmentStateTracker segment : segments.values()) + { + boolean segmentIsClean = segment.isClean(); + boolean allTablesFlushed = tables.stream().noneMatch(t -> t.hasUnflushedFor(segment.segmentId())); + if (segmentIsClean != allTablesFlushed) + throw new IllegalArgumentException(String.format("Segment is %sclean, but table has %sunflushed allocations:\n%s\n%s", + segmentIsClean ? "" : "not ", + allTablesFlushed ? "" : "no ", + segment, + tables.stream().map(t -> Pair.create(t.tableId, t.getUnflushedFor(segment.segmentId()))) + .filter(t -> !t.right.isEmpty()) + .collect(Collectors.toList()) + )); + } + }; + + SegmentStateTracker currentSegment = null; + int currentSegmentOffset = 0; + for (int i = 0; i < allocationCount; i++) + { + if (i > 0 && i % 50 == 0) + { + for (int j = 0; j < 3; j++) + { + Table table = tables.get(rng.nextInt(tableCount)); + table.flush(segments.values()); + validateAllSegments.run(); + } + } + + if (i % 100 == 0) + { + currentSegment = new SegmentStateTracker(segments.size()); + currentSegmentOffset = 0; + segments.put(currentSegment.segmentId(), currentSegment); + } + + int size = rng.nextInt(100); + Table table = tables.get(rng.nextInt(tableCount)); + CommitLogPosition pos = pos(currentSegment.segmentId(), currentSegmentOffset); + table.addAllocation(pos); + currentSegment.markDirty(table.tableId, pos); + currentSegmentOffset += size; + validateAllSegments.run(); + } + + while (tables.stream().anyMatch(Table::hasUnflushed)) + { + Table table = tables.get(rng.nextInt(tableCount)); + if (table.hasUnflushed()) + table.flush(segments.values()); + + validateAllSegments.run(); + } + }, + e -> { + if (ops != null) + { + System.out.println("History: "); + for (String op : ops) + System.out.println(op); + } + }); + } + + + public static CommitLogPosition pos(long segmentId, int pos) + { + return new CommitLogPosition(segmentId, pos); + } +} \ No newline at end of file