From c4dffec0c749e2b947c69def6951a8091413c624 Mon Sep 17 00:00:00 2001 From: "xiaofan.chen" Date: Thu, 29 Aug 2024 16:47:16 +0800 Subject: [PATCH] fix: two bugs: 1, autoBatchFlushEndPointContext.add() should always be before autoBatchFlushEndPointContext.done(1) othewise the flyingTaskNum could be negative; 2, make sure lastEventLoop is never null --- .../DefaultAutoBatchFlushEndpoint.java | 62 ++++++++++--------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java index 7326aff060..c5c84e30f2 100644 --- a/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java +++ b/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java @@ -15,24 +15,6 @@ */ package io.lettuce.core.protocol; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Deque; -import java.util.HashSet; -import java.util.List; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.function.Consumer; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - import io.lettuce.core.ClientOptions; import io.lettuce.core.ConnectionEvents; import io.lettuce.core.ContextualChannel; @@ -55,11 +37,29 @@ import io.netty.channel.EventLoop; import io.netty.handler.codec.EncoderException; import io.netty.util.Recycler; +import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Deque; +import java.util.HashSet; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; + /** * Default {@link Endpoint} implementation. * @@ -161,7 +161,7 @@ protected static void cancelCommandOnEndpointClose(RedisCommand cmd) { private final boolean canFire; - private volatile EventLoop lastEventLoop = null; + private volatile EventExecutor lastEventExecutor; private volatile Throwable connectionError; @@ -202,6 +202,7 @@ protected DefaultAutoBatchFlushEndpoint(ClientOptions clientOptions, ClientResou this.callbackOnClose = callbackOnClose; this.writeSpinCount = clientOptions.getAutoBatchFlushOptions().getWriteSpinCount(); this.batchSize = clientOptions.getAutoBatchFlushOptions().getBatchSize(); + this.lastEventExecutor = clientResources.eventExecutorGroup().next(); } @Override @@ -322,7 +323,7 @@ public void notifyChannelActive(Channel channel) { return; } - this.lastEventLoop = channel.eventLoop(); + this.lastEventExecutor = channel.eventLoop(); this.connectionError = null; this.inProtectMode = false; this.logPrefix = null; @@ -585,7 +586,7 @@ private void resetInternal() { if (chan.context.initialState.isConnected()) { chan.pipeline().fireUserEventTriggered(new ConnectionEvents.Reset()); } - LettuceAssert.assertState(lastEventLoop.inEventLoop(), "must be called in lastEventLoop thread"); + LettuceAssert.assertState(lastEventExecutor.inEventLoop(), "must be called in lastEventLoop thread"); cancelCommands("resetInternal"); } @@ -727,22 +728,23 @@ private int pollBatch(final AutoBatchFlushEndPointContext autoBatchFlushEndPoint } if (o instanceof RedisCommand) { + autoBatchFlushEndPointContext.add(1); RedisCommand cmd = (RedisCommand) o; channelWrite(chan, cmd).addListener(WrittenToChannel.newInstance(this, chan, cmd, false)); count++; } else { @SuppressWarnings("unchecked") Collection> commands = (Collection>) o; + final int commandsSize = commands.size(); // size() could be expensive for some collections so cache it! + autoBatchFlushEndPointContext.add(commandsSize); for (RedisCommand cmd : commands) { channelWrite(chan, cmd).addListener(WrittenToChannel.newInstance(this, chan, cmd, false)); } - count += commands.size(); + count += commandsSize; } } if (count > 0) { - autoBatchFlushEndPointContext.add(count); - channelFlush(chan); if (autoBatchFlushEndPointContext.hasRetryableFailedToSendCommands()) { // Wait for onConnectionClose event() @@ -755,7 +757,7 @@ private int pollBatch(final AutoBatchFlushEndPointContext autoBatchFlushEndPoint private void trySetEndpointQuiescence(ContextualChannel chan) { final EventLoop eventLoop = chan.eventLoop(); LettuceAssert.isTrue(eventLoop.inEventLoop(), "unexpected: not in event loop"); - LettuceAssert.isTrue(eventLoop == lastEventLoop, "unexpected: lastEventLoop not match"); + LettuceAssert.isTrue(eventLoop == lastEventExecutor, "unexpected: lastEventLoop not match"); final ConnectionContext connectionContext = chan.context; final @Nullable ConnectionContext.CloseStatus closeStatus = connectionContext.getCloseStatus(); @@ -1019,14 +1021,14 @@ private ChannelFuture channelWrite(Channel channel, RedisCommand comman * is terminated (state is RECONNECT_FAILED/ENDPOINT_CLOSED) */ private void syncAfterTerminated(Runnable runnable) { - final EventLoop localLastEventLoop = lastEventLoop; - LettuceAssert.notNull(localLastEventLoop, "lastEventLoop must not be null after terminated"); - if (localLastEventLoop.inEventLoop()) { + final EventExecutor localLastEventExecutor = lastEventExecutor; + if (localLastEventExecutor.inEventLoop()) { runnable.run(); } else { - localLastEventLoop.execute(() -> { + localLastEventExecutor.execute(() -> { runnable.run(); - LettuceAssert.isTrue(lastEventLoop == localLastEventLoop, "lastEventLoop must not be changed after terminated"); + LettuceAssert.isTrue(lastEventExecutor == localLastEventExecutor, + "lastEventLoop must not be changed after terminated"); }); } }