Skip to content

Commit

Permalink
Implement the reentrant PushConsumer message receiving (#547)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron-ai authored Jun 21, 2023
1 parent 5d4a682 commit cea5343
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
Expand Down Expand Up @@ -242,11 +243,12 @@ private apache.rocketmq.v2.FilterExpression wrapFilterExpression(FilterExpressio
}

ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, MessageQueueImpl mq,
FilterExpression filterExpression, Duration longPollingTimeout) {
FilterExpression filterExpression, Duration longPollingTimeout, String attemptId) {
attemptId = null == attemptId ? UUID.randomUUID().toString() : attemptId;
return ReceiveMessageRequest.newBuilder().setGroup(getProtobufGroup())
.setMessageQueue(mq.toProtobuf()).setFilterExpression(wrapFilterExpression(filterExpression))
.setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos()))
.setBatchSize(batchSize).setAutoRenew(true).build();
.setBatchSize(batchSize).setAutoRenew(true).setAttemptId(attemptId).build();
}

ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, MessageQueueImpl mq,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.StatusRuntimeException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -175,43 +177,55 @@ public void fetchMessageImmediately() {
*
* <p> Make sure that no exception will be thrown.
*/
public void onReceiveMessageException(Throwable t) {
public void onReceiveMessageException(Throwable t, String attemptId) {
Duration delay = t instanceof TooManyRequestsException ? RECEIVING_FLOW_CONTROL_BACKOFF_DELAY :
RECEIVING_FAILURE_BACKOFF_DELAY;
receiveMessageLater(delay);
receiveMessageLater(delay, attemptId);
}

private void receiveMessageLater(Duration delay) {
private void receiveMessageLater(Duration delay, String attemptId) {
final ClientId clientId = consumer.getClientId();
final ScheduledExecutorService scheduler = consumer.getScheduler();
try {
log.info("Try to receive message later, mq={}, delay={}, clientId={}", mq, delay, clientId);
scheduler.schedule(this::receiveMessage, delay.toNanos(), TimeUnit.NANOSECONDS);
scheduler.schedule(() -> receiveMessage(attemptId), delay.toNanos(), TimeUnit.NANOSECONDS);
} catch (Throwable t) {
if (scheduler.isShutdown()) {
return;
}
// Should never reach here.
log.error("[Bug] Failed to schedule message receiving request, mq={}, clientId={}", mq, clientId, t);
onReceiveMessageException(t);
onReceiveMessageException(t, attemptId);
}
}

private String generateAttemptId() {
return UUID.randomUUID().toString();
}

public void receiveMessage() {
receiveMessage(this.generateAttemptId());
}

public void receiveMessage(String attemptId) {
final ClientId clientId = consumer.getClientId();
if (dropped) {
log.info("Process queue has been dropped, no longer receive message, mq={}, clientId={}", mq, clientId);
return;
}
if (this.isCacheFull()) {
log.warn("Process queue cache is full, would receive message later, mq={}, clientId={}", mq, clientId);
receiveMessageLater(RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL);
receiveMessageLater(RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL, attemptId);
return;
}
receiveMessageImmediately();
receiveMessageImmediately(attemptId);
}

private void receiveMessageImmediately() {
receiveMessageImmediately(this.generateAttemptId());
}

private void receiveMessageImmediately(String attemptId) {
final ClientId clientId = consumer.getClientId();
if (!consumer.isRunning()) {
log.info("Stop to receive message because consumer is not running, mq={}, clientId={}", mq, clientId);
Expand All @@ -222,7 +236,7 @@ private void receiveMessageImmediately() {
final int batchSize = this.getReceptionBatchSize();
final Duration longPollingTimeout = consumer.getPushConsumerSettings().getLongPollingTimeout();
final ReceiveMessageRequest request = consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression,
longPollingTimeout);
longPollingTimeout, attemptId);
activityNanoTime = System.nanoTime();

// Intercept before message reception.
Expand All @@ -248,27 +262,35 @@ public void onSuccess(ReceiveMessageResult result) {
// Should never reach here.
log.error("[Bug] Exception raised while handling receive result, mq={}, endpoints={}, "
+ "clientId={}", mq, endpoints, clientId, t);
onReceiveMessageException(t);
onReceiveMessageException(t, attemptId);
}
}

@Override
public void onFailure(Throwable t) {
String nextAttemptId = null;
if (t instanceof StatusRuntimeException) {
StatusRuntimeException exception = (StatusRuntimeException) t;
if (io.grpc.Status.DEADLINE_EXCEEDED.equals(exception.getStatus())) {
nextAttemptId = request.getAttemptId();
}
}
// Intercept after message reception.
final MessageInterceptorContextImpl context0 =
new MessageInterceptorContextImpl(context, MessageHookPointsStatus.ERROR);
consumer.doAfter(context0, Collections.emptyList());

log.error("Exception raised during message reception, mq={}, endpoints={}, clientId={}", mq,
endpoints, clientId, t);
onReceiveMessageException(t);
log.error("Exception raised during message reception, mq={}, endpoints={}, attemptId={}, " +
"nextAttemptId={}, clientId={}", mq, endpoints, request.getAttemptId(), nextAttemptId,
clientId, t);
onReceiveMessageException(t, nextAttemptId);
}
}, MoreExecutors.directExecutor());
receptionTimes.getAndIncrement();
consumer.getReceptionTimes().getAndIncrement();
} catch (Throwable t) {
log.error("Exception raised during message reception, mq={}, clientId={}", mq, clientId, t);
onReceiveMessageException(t);
onReceiveMessageException(t, attemptId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
Expand Down Expand Up @@ -71,7 +72,7 @@ public void testReceiveMessage() throws ExecutionException, InterruptedException
any(ReceiveMessageRequest.class), any(Duration.class));
final MessageQueueImpl mq = fakeMessageQueueImpl(FAKE_TOPIC_0);
final ReceiveMessageRequest request = pushConsumer.wrapReceiveMessageRequest(1,
mq, new FilterExpression(), Duration.ofSeconds(15));
mq, new FilterExpression(), Duration.ofSeconds(15), UUID.randomUUID().toString());
final ListenableFuture<ReceiveMessageResult> future0 =
pushConsumer.receiveMessage(request, mq, Duration.ofSeconds(15));
final ReceiveMessageResult receiveMessageResult = future0.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -134,7 +135,7 @@ public void testReceiveMessageImmediately() {
when(pushSubscriptionSettings.getReceiveBatchSize()).thenReturn(32);
ReceiveMessageRequest request = ReceiveMessageRequest.newBuilder().build();
when(pushConsumer.wrapReceiveMessageRequest(anyInt(), any(MessageQueueImpl.class),
any(FilterExpression.class), any(Duration.class))).thenReturn(request);
any(FilterExpression.class), any(Duration.class), nullable(String.class))).thenReturn(request);
processQueue.fetchMessageImmediately();
await().atMost(Duration.ofSeconds(3))
.untilAsserted(() -> verify(pushConsumer, times(cachedMessagesCountThresholdPerQueue))
Expand Down

0 comments on commit cea5343

Please sign in to comment.