diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java index 273ad91963f..f0e8b3ab503 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java @@ -442,6 +442,10 @@ public CompletableFuture queryMessageAsync( if (flatFile == null) { continue; } + if (indexItem.getOffset() < flatFile.getCommitLogMinOffset() || + indexItem.getOffset() > flatFile.getCommitLogMaxOffset()) { + continue; + } CompletableFuture getMessageFuture = flatFile .getCommitLogAsync(indexItem.getOffset(), indexItem.getSize()) .thenApply(messageBuffer -> new SelectMappedBufferResult( diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java index 00566d68723..2385628ed45 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java @@ -28,7 +28,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; @@ -238,7 +237,7 @@ public CompletableFuture> queryAsync( ConcurrentNavigableMap pendingMap = this.timeStoreTable.subMap(beginTime, true, endTime, true); List> futureList = new ArrayList<>(pendingMap.size()); - ConcurrentHashMap result = new ConcurrentHashMap<>(); + ConcurrentSkipListMap result = new ConcurrentSkipListMap<>(); for (Map.Entry entry : pendingMap.descendingMap().entrySet()) { CompletableFuture completableFuture = entry.getValue() @@ -246,7 +245,7 @@ public CompletableFuture> queryAsync( .thenAccept(itemList -> itemList.forEach(indexItem -> { if (result.size() < maxCount) { result.put(String.format( - "%d-%d", indexItem.getQueueId(), indexItem.getOffset()), indexItem); + "%d-%20d", indexItem.getQueueId(), indexItem.getOffset()), indexItem); } })); futureList.add(completableFuture); @@ -349,7 +348,8 @@ public void destroyExpiredFile(long expireTimestamp) { flatAppendFile.destroyExpiredFile(expireTimestamp); timeStoreTable.entrySet().removeIf(entry -> IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()) && - entry.getKey() < flatAppendFile.getMinTimestamp()); + (flatAppendFile.getFileSegmentList().isEmpty() || + entry.getKey() < flatAppendFile.getMinTimestamp())); int tableSize = (int) timeStoreTable.entrySet().stream() .filter(entry -> IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus())) .count();