Skip to content

Commit

Permalink
add removeOne method for ReceiptHandleGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
xdkxlk committed Jun 27, 2023
1 parent 8ab99ac commit f91366b
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
Expand Down Expand Up @@ -77,6 +78,22 @@ public String toString() {
.append("offset", offset)
.toString();
}

public String getOriginalHandle() {
return originalHandle;
}

public String getBroker() {
return broker;
}

public int getQueueId() {
return queueId;
}

public long getOffset() {
return offset;
}
}

public static class HandleData {
Expand All @@ -100,6 +117,10 @@ public void unlock() {
this.semaphore.release();
}

public MessageReceiptHandle getMessageReceiptHandle() {
return messageReceiptHandle;
}

@Override
public boolean equals(Object o) {
return this == o;
Expand Down Expand Up @@ -196,6 +217,21 @@ public MessageReceiptHandle remove(String msgID, String handle) {
return res.get();
}

public MessageReceiptHandle removeOne(String msgID) {
Map<HandleKey, HandleData> handleMap = this.receiptHandleMap.get(msgID);
if (handleMap == null) {
return null;
}
Set<HandleKey> keys = handleMap.keySet();
for (HandleKey key : keys) {
MessageReceiptHandle res = this.remove(msgID, key.originalHandle);
if (res != null) {
return res;
}
}
return null;
}

public void computeIfPresent(String msgID, String handle,
Function<MessageReceiptHandle, CompletableFuture<MessageReceiptHandle>> function) {
Map<HandleKey, HandleData> handleMap = this.receiptHandleMap.get(msgID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,6 @@ public void testGetWhenComputeIfPresentReturnNull() {
assertTrue(receiptHandleGroup.isEmpty());
}



@Test
public void testRemoveWhenComputeIfPresent() {
String handle1 = createHandle();
Expand Down Expand Up @@ -281,6 +279,36 @@ public void testRemoveMultiThread() {
assertTrue(receiptHandleGroup.isEmpty());
}

@Test
public void testRemoveOne() {
String handle1 = createHandle();
AtomicReference<MessageReceiptHandle> removeHandleRef = new AtomicReference<>();
AtomicInteger count = new AtomicInteger();

receiptHandleGroup.put(msgID, createMessageReceiptHandle(handle1, msgID));
int threadNum = Math.max(Runtime.getRuntime().availableProcessors(), 3);
CountDownLatch latch = new CountDownLatch(threadNum);
for (int i = 0; i < threadNum; i++) {
Thread thread = new Thread(() -> {
try {
latch.countDown();
latch.await();
MessageReceiptHandle handle = receiptHandleGroup.removeOne(msgID);
if (handle != null) {
removeHandleRef.set(handle);
count.incrementAndGet();
}
} catch (Exception ignored) {
}
});
thread.start();
}

await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> assertEquals(1, count.get()));
assertEquals(handle1, removeHandleRef.get().getReceiptHandleStr());
assertTrue(receiptHandleGroup.isEmpty());
}

private MessageReceiptHandle createMessageReceiptHandle(String handle, String msgID) {
return new MessageReceiptHandle(GROUP, TOPIC, 0, handle, msgID, 0, 0);
}
Expand Down

0 comments on commit f91366b

Please sign in to comment.