Skip to content

Commit

Permalink
[ISSUE #8765] fix low performance of delay message when enable rocksd…
Browse files Browse the repository at this point in the history
…b consume queue
  • Loading branch information
yuz10 authored Sep 27, 2024
1 parent c3664aa commit c7597fb
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,9 @@ public long getMinOffsetInQueue() {
private int pullNum(long cqOffset, long maxCqOffset) {
long diffLong = maxCqOffset - cqOffset;
if (diffLong < Integer.MAX_VALUE) {
int diffInt = (int) diffLong;
return diffInt > 16 ? 16 : diffInt;
return (int) diffLong;
}
return 16;
return Integer.MAX_VALUE;
}

@Override
Expand All @@ -246,7 +245,7 @@ public ReferredIterator<CqUnit> iterateFrom(final long startIndex) {
long maxCqOffset = getMaxOffsetInQueue();
if (startIndex < maxCqOffset) {
int num = pullNum(startIndex, maxCqOffset);
return iterateFrom0(startIndex, num);
return iterateFrom1(startIndex, num);
}
} catch (RocksDBException e) {
log.error("[RocksDBConsumeQueue] iterateFrom error!", e);
Expand Down Expand Up @@ -333,6 +332,13 @@ private ReferredIterator<CqUnit> iterateFrom0(final long startIndex, final int c
return new RocksDBConsumeQueueIterator(byteBufferList, startIndex);
}

private ReferredIterator<CqUnit> iterateFrom1(final long startIndex, final int count) throws RocksDBException {
if (count <= 16) {
return iterateFrom0(startIndex, count);
}
return new LargeRocksDBConsumeQueueIterator(startIndex, count);
}

@Override
public String getTopic() {
return topic;
Expand Down Expand Up @@ -391,4 +397,83 @@ public CqUnit nextAndRelease() {
}
}
}

private class LargeRocksDBConsumeQueueIterator implements ReferredIterator<CqUnit> {
private List<ByteBuffer> byteBufferList;
private int bufferStartIndex;
private final long startIndex;
private final int totalCount;
private int currentIndex;

public LargeRocksDBConsumeQueueIterator(final long startIndex, final int num) throws RocksDBException {
this.startIndex = startIndex;
this.totalCount = num;
this.currentIndex = 0;
prefetch(0);
}

private void prefetch(int bufferStartIndex) throws RocksDBException {
int pullNum = Math.min(totalCount - bufferStartIndex, 16);
byteBufferList = messageStore.getQueueStore().rangeQuery(topic, queueId, startIndex + bufferStartIndex, pullNum);
if (byteBufferList == null || byteBufferList.isEmpty()) {
if (messageStore.getMessageStoreConfig().isEnableRocksDBLog()) {
log.warn("iterateFrom1 - find nothing, startIndex:{}, count:{}", startIndex + bufferStartIndex, pullNum);
}
}
this.bufferStartIndex = bufferStartIndex;
}

@Override
public boolean hasNext() {
return this.currentIndex < this.totalCount;
}

public boolean needPrefetch(int currentIndex) {
if (byteBufferList == null) {
return true;
}
return currentIndex < bufferStartIndex || currentIndex >= bufferStartIndex + byteBufferList.size();
}

@Override
public CqUnit next() {
if (!hasNext()) {
return null;
}
final int currentIndex = this.currentIndex;
if (needPrefetch(currentIndex)) {
try {
prefetch(currentIndex);
} catch (RocksDBException e) {
return null;
}
}
if (needPrefetch(currentIndex)) {
return null;
}

final ByteBuffer byteBuffer = this.byteBufferList.get(currentIndex - bufferStartIndex);
CqUnit cqUnit = new CqUnit(this.startIndex + currentIndex, byteBuffer.getLong(), byteBuffer.getInt(), byteBuffer.getLong());
this.currentIndex++;
return cqUnit;
}

@Override
public void remove() {
throw new UnsupportedOperationException("remove");
}

@Override
public void release() {
}

@Override
public CqUnit nextAndRelease() {
try {
return next();
} finally {
release();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.queue;

import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

import static org.apache.rocketmq.store.queue.RocksDBConsumeQueueTable.CQ_UNIT_SIZE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class RocksDBConsumeQueueTest {

@Test
public void testIterator() throws Exception {
if (MixAll.isMac()) {
return;
}
DefaultMessageStore messageStore = mock(DefaultMessageStore.class);
RocksDBConsumeQueueStore rocksDBConsumeQueueStore = mock(RocksDBConsumeQueueStore.class);
when(messageStore.getQueueStore()).thenReturn(rocksDBConsumeQueueStore);
when(rocksDBConsumeQueueStore.getMaxOffsetInQueue(anyString(), anyInt())).thenReturn(10000L);
when(rocksDBConsumeQueueStore.rangeQuery(anyString(), anyInt(), anyLong(), anyInt())).then(new Answer<List<ByteBuffer>>() {
@Override
public List<ByteBuffer> answer(InvocationOnMock mock) throws Throwable {
long startIndex = mock.getArgument(2);
int num = mock.getArgument(3);
List<ByteBuffer> result = new ArrayList<>(num);
for (int i = 0; i < num; i++) {
final ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_UNIT_SIZE);
long phyOffset = (startIndex + i) * 10;
byteBuffer.putLong(phyOffset);
byteBuffer.putInt(1);
byteBuffer.putLong(0);
byteBuffer.putLong(0);
byteBuffer.flip();
result.add(byteBuffer);
}
return result;
}
});

RocksDBConsumeQueue consumeQueue = new RocksDBConsumeQueue(messageStore, "topic", 0);
ReferredIterator<CqUnit> it = consumeQueue.iterateFrom(9000);
for (int i = 0; i < 1000; i++) {
assertTrue(it.hasNext());
CqUnit next = it.next();
assertEquals(9000 + i, next.getQueueOffset());
assertEquals(10 * (9000 + i), next.getPos());
}
assertFalse(it.hasNext());
}
}

0 comments on commit c7597fb

Please sign in to comment.