Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.util;

import java.util.Objects;
import java.util.concurrent.locks.Lock;

/**
* A utility class providing helper methods for working with {@link Lock} objects.
* This class simplifies the usage of locks by encapsulating common patterns,
* such as acquiring and releasing locks in a safe manner.
*/
public class LockUtils {
@FunctionalInterface
public interface ThrowingSupplier<T, E extends Exception> {
T get() throws E;
}
@FunctionalInterface
public interface ThrowingRunnable<E extends Exception> {
void run() throws E;
}

/**
* Executes the given {@link ThrowingSupplier} within the context of the specified {@link Lock}.
* The lock is acquired before executing the supplier and released after the execution,
* ensuring that the lock is always released, even if an exception is thrown.
*
* @param <T> the type of the result returned by the supplier
* @param <E> the type of exception that may be thrown by the supplier
* @param lock the lock to be acquired and released
* @param supplier the supplier to be executed within the lock context
* @return the result of the supplier
* @throws E if an exception occurs during the execution of the supplier
* @throws NullPointerException if either {@code lock} or {@code supplier} is null
*/
public static <T, E extends Exception> T inLock(Lock lock, ThrowingSupplier<T, E> supplier) throws E {
Objects.requireNonNull(lock, "Lock must not be null");
Objects.requireNonNull(supplier, "Supplier must not be null");

lock.lock();
try {
return supplier.get();
} finally {
lock.unlock();
}
}

/**
* Executes the given {@link ThrowingRunnable} within the context of the specified {@link Lock}.
* The lock is acquired before executing the runnable and released after the execution,
* ensuring that the lock is always released, even if an exception is thrown.
*
* @param <E> the type of exception that may be thrown by the runnable
* @param lock the lock to be acquired and released
* @param runnable the runnable to be executed within the lock context
* @throws E if an exception occurs during the execution of the runnable
* @throws NullPointerException if either {@code lock} or {@code runnable} is null
*/
public static <E extends Exception> void inLock(Lock lock, ThrowingRunnable<E> runnable) throws E {
Objects.requireNonNull(lock, "Lock must not be null");
Objects.requireNonNull(runnable, "Runnable must not be null");

lock.lock();
try {
runnable.run();
} finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.kafka.storage.internals.log;

import org.apache.kafka.common.utils.ByteBufferUnmapper;
import org.apache.kafka.common.utils.OperatingSystem;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.LockUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -33,8 +33,8 @@
import java.nio.file.Files;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* The abstract index class which holds entry format agnostic methods.
Expand All @@ -47,7 +47,18 @@ private enum SearchResultType {

private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class);

protected final ReentrantLock lock = new ReentrantLock();
// Serializes all index operations that mutate internal state.
// Readers do not need to acquire this lock because:
// 1) MappedByteBuffer provides direct access to the OS-level buffer cache,
// which allows concurrent reads in practice.
// 2) Clients only read committed data and are not affected by concurrent appends/truncates.
// In the rare case when the data is truncated, the follower could read inconsistent data.
// The follower has the logic to ignore the inconsistent data through crc and leader epoch.
// 3) Read and remap operations are coordinated via remapLock to ensure visibility of the
// underlying mmap.
private final ReentrantLock lock = new ReentrantLock();
// Allows concurrent read operations while ensuring exclusive access if the underlying mmap is changed
private final ReentrantReadWriteLock remapLock = new ReentrantReadWriteLock();

private final long baseOffset;
private final int maxIndexSize;
Expand Down Expand Up @@ -187,36 +198,32 @@ public void updateParentDir(File parentDir) {
* @return a boolean indicating whether the size of the memory map and the underneath file is changed or not.
*/
public boolean resize(int newSize) throws IOException {
lock.lock();
try {
int roundedNewSize = roundDownToExactMultiple(newSize, entrySize());

if (length == roundedNewSize) {
log.debug("Index {} was not resized because it already has size {}", file.getAbsolutePath(), roundedNewSize);
return false;
} else {
RandomAccessFile raf = new RandomAccessFile(file, "rw");
try {
int position = mmap.position();

/* Windows or z/OS won't let us modify the file length while the file is mmapped :-( */
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
safeForceUnmap();
raf.setLength(roundedNewSize);
this.length = roundedNewSize;
mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize);
this.maxEntries = mmap.limit() / entrySize();
mmap.position(position);
log.debug("Resized {} to {}, position is {} and limit is {}", file.getAbsolutePath(), roundedNewSize,
mmap.position(), mmap.limit());
return true;
} finally {
Utils.closeQuietly(raf, "index file " + file.getName());
}
}
} finally {
lock.unlock();
}
return inLock(() ->
inRemapWriteLock(() -> {
int roundedNewSize = roundDownToExactMultiple(newSize, entrySize());

if (length == roundedNewSize) {
log.debug("Index {} was not resized because it already has size {}", file.getAbsolutePath(), roundedNewSize);
return false;
} else {
RandomAccessFile raf = new RandomAccessFile(file, "rw");
try {
int position = mmap.position();

safeForceUnmap();
raf.setLength(roundedNewSize);
this.length = roundedNewSize;
mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize);
this.maxEntries = mmap.limit() / entrySize();
mmap.position(position);
log.debug("Resized {} to {}, position is {} and limit is {}", file.getAbsolutePath(), roundedNewSize,
mmap.position(), mmap.limit());
return true;
} finally {
Utils.closeQuietly(raf, "index file " + file.getName());
}
}
}));
}

/**
Expand All @@ -236,12 +243,9 @@ public void renameTo(File f) throws IOException {
* Flush the data in the index to disk
*/
public void flush() {
lock.lock();
try {
inLock(() -> {
mmap.force();
} finally {
lock.unlock();
}
});
}

/**
Expand All @@ -261,12 +265,11 @@ public boolean deleteIfExists() throws IOException {
* the file.
*/
public void trimToValidSize() throws IOException {
lock.lock();
try {
resize(entrySize() * entries);
} finally {
lock.unlock();
}
inLock(() -> {
if (mmap != null) {
resize(entrySize() * entries);
}
});
}

/**
Expand All @@ -286,12 +289,7 @@ public void closeHandler() {
// However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk.
// To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness.
// See https://issues.apache.org/jira/browse/KAFKA-4614 for the details.
lock.lock();
try {
safeForceUnmap();
} finally {
lock.unlock();
}
inLock(() -> inRemapWriteLock(this::safeForceUnmap));
}

/**
Expand Down Expand Up @@ -418,20 +416,28 @@ protected void truncateToEntries0(int entries) {
mmap.position(entries * entrySize());
}

/**
* Execute the given function in a lock only if we are running on windows or z/OS. We do this
* because Windows or z/OS won't let us resize a file while it is mmapped. As a result we have to force unmap it
* and this requires synchronizing reads.
*/
protected final <T, E extends Exception> T maybeLock(Lock lock, StorageAction<T, E> action) throws E {
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
lock.lock();
try {
return action.execute();
} finally {
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
lock.unlock();
}
protected final <T, E extends Exception> T inLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
return LockUtils.inLock(lock, action);
}

protected final <E extends Exception> void inLock(LockUtils.ThrowingRunnable<E> action) throws E {
LockUtils.inLock(lock, action);
}

protected final <T, E extends Exception> T inRemapReadLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
return LockUtils.inLock(remapLock.readLock(), action);
}

protected final <E extends Exception> void inRemapReadLock(LockUtils.ThrowingRunnable<E> action) throws E {
LockUtils.inLock(remapLock.readLock(), action);
}

protected final <T, E extends Exception> T inRemapWriteLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
return LockUtils.inLock(remapLock.writeLock(), action);
}

protected final <E extends Exception> void inRemapWriteLock(LockUtils.ThrowingRunnable<E> action) throws E {
LockUtils.inLock(remapLock.writeLock(), action);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class OffsetIndex extends AbstractIndex {
private static final int ENTRY_SIZE = 8;

/* the last offset in the index */
private long lastOffset;
private volatile long lastOffset;

public OffsetIndex(File file, long baseOffset) throws IOException {
this(file, baseOffset, -1);
Expand Down Expand Up @@ -96,7 +96,7 @@ public void sanityCheck() {
* the pair (baseOffset, 0) is returned.
*/
public OffsetPosition lookup(long targetOffset) {
return maybeLock(lock, () -> {
return inRemapReadLock(() -> {
ByteBuffer idx = mmap().duplicate();
int slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY);
if (slot == -1)
Expand All @@ -112,7 +112,7 @@ public OffsetPosition lookup(long targetOffset) {
* @return The offset/position pair at that entry
*/
public OffsetPosition entry(int n) {
return maybeLock(lock, () -> {
return inRemapReadLock(() -> {
if (n >= entries())
throw new IllegalArgumentException("Attempt to fetch the " + n + "th entry from index " +
file().getAbsolutePath() + ", which has size " + entries());
Expand All @@ -126,7 +126,7 @@ public OffsetPosition entry(int n) {
* such offset.
*/
public Optional<OffsetPosition> fetchUpperBoundOffset(OffsetPosition fetchOffset, int fetchSize) {
return maybeLock(lock, () -> {
return inRemapReadLock(() -> {
ByteBuffer idx = mmap().duplicate();
int slot = smallestUpperBoundSlotFor(idx, fetchOffset.position + fetchSize, IndexSearchType.VALUE);
if (slot == -1)
Expand All @@ -142,8 +142,7 @@ public Optional<OffsetPosition> fetchUpperBoundOffset(OffsetPosition fetchOffset
* @throws InvalidOffsetException if provided offset is not larger than the last offset
*/
public void append(long offset, int position) {
lock.lock();
try {
inLock(() -> {
if (isFull())
throw new IllegalArgumentException("Attempt to append to a full index (size = " + entries() + ").");

Expand All @@ -158,15 +157,12 @@ public void append(long offset, int position) {
} else
throw new InvalidOffsetException("Attempt to append an offset " + offset + " to position " + entries() +
" no larger than the last offset appended (" + lastOffset + ") to " + file().getAbsolutePath());
} finally {
lock.unlock();
}
});
}

@Override
public void truncateTo(long offset) {
lock.lock();
try {
inLock(() -> {
ByteBuffer idx = mmap().duplicate();
int slot = largestLowerBoundSlotFor(idx, offset, IndexSearchType.KEY);

Expand All @@ -183,9 +179,7 @@ else if (relativeOffset(idx, slot) == offset - baseOffset())
else
newEntries = slot + 1;
truncateToEntries(newEntries);
} finally {
lock.unlock();
}
});
}

public long lastOffset() {
Expand Down Expand Up @@ -219,30 +213,24 @@ private int physical(ByteBuffer buffer, int n) {
* Truncates index to a known number of entries.
*/
private void truncateToEntries(int entries) {
lock.lock();
try {
inLock(() -> {
super.truncateToEntries0(entries);
this.lastOffset = lastEntry().offset;
log.debug("Truncated index {} to {} entries; position is now {} and last offset is now {}",
file().getAbsolutePath(), entries, mmap().position(), lastOffset);
} finally {
lock.unlock();
}
file().getAbsolutePath(), entries, mmap().position(), lastOffset);
});
}

/**
* The last entry in the index
*/
private OffsetPosition lastEntry() {
lock.lock();
try {
return inRemapReadLock(() -> {
int entries = entries();
if (entries == 0)
return new OffsetPosition(baseOffset(), 0);
else
return parseEntry(mmap(), entries - 1);
} finally {
lock.unlock();
}
});
}
}
Loading