Skip to content

Commit

Permalink
[ISSUE #8736] fix searchOffset corner case in rocksdb consume queue (#…
Browse files Browse the repository at this point in the history
…8737)

* fix searchOffset  for ConsumeQueue backed by RocksDB
  • Loading branch information
yuz10 authored Sep 29, 2024
1 parent b7a2a3d commit 551c8c3
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,39 @@ public long binarySearchInCQByTime(String topic, int queueId, long high, long lo
long result = -1L;
long targetOffset = -1L, leftOffset = -1L, rightOffset = -1L;
long ceiling = high, floor = low;
// Handle the following corner cases first:
// 1. store time of (high) < timestamp
ByteBuffer buffer = getCQInKV(topic, queueId, ceiling);
if (buffer != null) {
long storeTime = buffer.getLong(MSG_STORE_TIME_SIZE_OFFSET);
if (storeTime < timestamp) {
switch (boundaryType) {
case LOWER:
return ceiling + 1;
case UPPER:
return ceiling;
default:
log.warn("Unknown boundary type");
break;
}
}
}
// 2. store time of (low) > timestamp
buffer = getCQInKV(topic, queueId, floor);
if (buffer != null) {
long storeTime = buffer.getLong(MSG_STORE_TIME_SIZE_OFFSET);
if (storeTime > timestamp) {
switch (boundaryType) {
case LOWER:
return floor;
case UPPER:
return 0;
default:
log.warn("Unknown boundary type");
break;
}
}
}
while (high >= low) {
long midOffset = low + ((high - low) >>> 1);
ByteBuffer byteBuffer = getCQInKV(topic, queueId, midOffset);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.queue;

import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.rocksdb.ConsumeQueueRocksDBStorage;
import org.junit.Test;
import org.mockito.stubbing.Answer;
import org.rocksdb.RocksDBException;

import java.nio.ByteBuffer;

import static org.apache.rocketmq.store.queue.RocksDBConsumeQueueTable.CQ_UNIT_SIZE;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;

public class RocksDBConsumeQueueTableTest {

@Test
public void testBinarySearchInCQByTime() throws RocksDBException {
if (MixAll.isMac()) {
return;
}
ConsumeQueueRocksDBStorage rocksDBStorage = mock(ConsumeQueueRocksDBStorage.class);
DefaultMessageStore store = mock(DefaultMessageStore.class);
RocksDBConsumeQueueTable table = new RocksDBConsumeQueueTable(rocksDBStorage, store);
doAnswer((Answer<byte[]>) mock -> {
/*
* queueOffset timestamp
* 100 1000
* 200 2000
* 201 2010
* 1000 10000
*/
byte[] keyBytes = mock.getArgument(0);
ByteBuffer keyBuffer = ByteBuffer.wrap(keyBytes);
int len = keyBuffer.getInt(0);
long offset = keyBuffer.getLong(4 + 1 + len + 1 + 4 + 1);
long phyOffset = offset;
long timestamp = offset * 10;
final ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_UNIT_SIZE);
byteBuffer.putLong(phyOffset);
byteBuffer.putInt(1);
byteBuffer.putLong(0);
byteBuffer.putLong(timestamp);
return byteBuffer.array();
}).when(rocksDBStorage).getCQ(any());
assertEquals(1001, table.binarySearchInCQByTime("topic", 0, 1000, 100, 20000, 0, BoundaryType.LOWER));
assertEquals(1000, table.binarySearchInCQByTime("topic", 0, 1000, 100, 20000, 0, BoundaryType.UPPER));
assertEquals(100, table.binarySearchInCQByTime("topic", 0, 1000, 100, 1, 0, BoundaryType.LOWER));
assertEquals(0, table.binarySearchInCQByTime("topic", 0, 1000, 100, 1, 0, BoundaryType.UPPER));
assertEquals(201, table.binarySearchInCQByTime("topic", 0, 1000, 100, 2001, 0, BoundaryType.LOWER));
assertEquals(200, table.binarySearchInCQByTime("topic", 0, 1000, 100, 2001, 0, BoundaryType.UPPER));
assertEquals(200, table.binarySearchInCQByTime("topic", 0, 1000, 100, 2000, 0, BoundaryType.LOWER));
assertEquals(200, table.binarySearchInCQByTime("topic", 0, 1000, 100, 2000, 0, BoundaryType.UPPER));
}
}

0 comments on commit 551c8c3

Please sign in to comment.