Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import io.netty.util.internal.PlatformDependent;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.SystemClock;
Expand Down Expand Up @@ -69,7 +71,6 @@
import org.apache.rocketmq.store.queue.ReferredIterator;
import org.apache.rocketmq.store.util.LibC;
import org.rocksdb.RocksDBException;
import sun.nio.ch.DirectBuffer;

/**
* Store all metadata downtime for recovery, data protection reliability
Expand Down Expand Up @@ -2433,7 +2434,7 @@ private byte[] sampling(byte[] pageCacheTable, int sampleStep) {

private byte[] checkFileInPageCache(MappedFile mappedFile) {
long fileSize = mappedFile.getFileSize();
final long address = ((DirectBuffer) mappedFile.getMappedByteBuffer()).address();
final long address = PlatformDependent.directBufferAddress(mappedFile.getMappedByteBuffer());
int pageNums = (int) (fileSize + this.pageSize - 1) / this.pageSize;
byte[] pageCacheRst = new byte[pageNums];
int mincore = LibC.INSTANCE.mincore(new Pointer(address), new NativeLong(fileSize), pageCacheRst);
Expand Down Expand Up @@ -2509,7 +2510,7 @@ private int setFileReadMode(MappedFile mappedFile, int mode) {
log.error("setFileReadMode mappedFile is null");
return -1;
}
final long address = ((DirectBuffer) mappedFile.getMappedByteBuffer()).address();
final long address = PlatformDependent.directBufferAddress(mappedFile.getMappedByteBuffer());
int madvise = LibC.INSTANCE.madvise(new Pointer(address), new NativeLong(mappedFile.getFileSize()), mode);
if (madvise != 0) {
log.error("setFileReadMode error fileName: {}, madvise: {}, mode:{}", mappedFile.getFileName(), madvise, mode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
*/
package org.apache.rocketmq.store;

import com.sun.jna.NativeLong;
import com.sun.jna.Pointer;
import java.nio.ByteBuffer;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;

import com.sun.jna.NativeLong;
import com.sun.jna.Pointer;
import io.netty.util.internal.PlatformDependent;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.util.LibC;
import sun.nio.ch.DirectBuffer;

public class TransientStorePool {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
Expand All @@ -48,7 +49,7 @@ public void init() {
for (int i = 0; i < poolSize; i++) {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);

final long address = ((DirectBuffer) byteBuffer).address();
final long address = PlatformDependent.directBufferAddress(byteBuffer);
Pointer pointer = new Pointer(address);
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));

Expand All @@ -58,7 +59,7 @@ public void init() {

public void destroy() {
for (ByteBuffer byteBuffer : availableBuffers) {
final long address = ((DirectBuffer) byteBuffer).address();
final long address = PlatformDependent.directBufferAddress(byteBuffer);
Pointer pointer = new Pointer(address);
LibC.INSTANCE.munlock(pointer, new NativeLong(fileSize));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.util.LibC;
import sun.misc.Unsafe;
import sun.nio.ch.DirectBuffer;


public class DefaultMappedFile extends AbstractMappedFile {
public static final int OS_PAGE_SIZE = 1024 * 4;
Expand Down Expand Up @@ -914,7 +914,7 @@ public void setFirstCreateInQueue(boolean firstCreateInQueue) {
@Override
public void mlock() {
final long beginTime = System.currentTimeMillis();
final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
final long address = PlatformDependent.directBufferAddress(this.mappedByteBuffer);
Pointer pointer = new Pointer(address);
{
int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
Expand All @@ -930,7 +930,7 @@ public void mlock() {
@Override
public void munlock() {
final long beginTime = System.currentTimeMillis();
final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
final long address = PlatformDependent.directBufferAddress(this.mappedByteBuffer);
Pointer pointer = new Pointer(address);
int ret = LibC.INSTANCE.munlock(pointer, new NativeLong(this.fileSize));
log.info("munlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
Expand Down
Loading