diff --git a/src/main/java/io/lettuce/core/AutoBatchFlushOptions.java b/src/main/java/io/lettuce/core/AutoBatchFlushOptions.java index 09a754a8c8..1166f4c4d1 100644 --- a/src/main/java/io/lettuce/core/AutoBatchFlushOptions.java +++ b/src/main/java/io/lettuce/core/AutoBatchFlushOptions.java @@ -24,7 +24,7 @@ public class AutoBatchFlushOptions implements Serializable { public static final int DEFAULT_WRITE_SPIN_COUNT = 16; - public static final int DEFAULT_BATCH_SIZE = 20; + public static final int DEFAULT_BATCH_SIZE = 32; public static final boolean DEFAULT_USE_MPSC_QUEUE = true; diff --git a/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java index f433b21329..7326aff060 100644 --- a/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java +++ b/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java @@ -650,13 +650,22 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) { } if (chan.context.autoBatchFlushEndPointContext.hasOngoingSendLoop.tryEnter()) { - // Benchmark result of using tryEnterSafeGetVolatile() or not (1 thread, async get): - // 1. uses tryEnterSafeGetVolatile() to avoid unnecessary eventLoop.execute() calls - // Avg latency: 3.2956217278663s - // Avg QPS: 495238.50056392356/s - // 2. uses eventLoop.execute() directly - // Avg latency: 3.2677197021496998s - // Avg QPS: 476925.0751855796/s + // Benchmark result: + // Redis: + // engine: 7.1.0 + // server: AWS elasticcache cache.r7g.large + // Client: EC2-c5n.2xlarge + // Test Model: + // multi-thread sync exists (./bench-multi-thread-exists.sh -b 32 -s 10 -n 80000 -t 64) + // Test Parameter: + // thread num: 64, loop num: 80000, batch size: 32, write spin count: 10 + // + // With tryEnter(): + // Avg latency: 0.64917373203125ms + // Avg QPS: 196037.67991971457/s + // Without tryEnter(): + // Avg latency: 0.6618976359375001ms + // Avg QPS: 192240.1301551348/s eventLoop.execute(() -> loopSend(chan, true)); } @@ -681,31 +690,32 @@ private void loopSend(final ContextualChannel chan, boolean entered) { } private void loopSend0(final AutoBatchFlushEndPointContext autoBatchFlushEndPointContext, final ContextualChannel chan, - int remainingSpinnCount, boolean entered) { + int remainingSpinnCount, final boolean entered) { do { - final int count = pollBatch(autoBatchFlushEndPointContext, chan); + final int count = DefaultAutoBatchFlushEndpoint.this.pollBatch(autoBatchFlushEndPointContext, chan); + if (count == 0) { + break; + } if (count < 0) { return; } - if (count < batchSize) { - if (!entered) { - return; - } - // queue was empty - // The send loop will be triggered later when a new task is added, - // // Don't setUnsafe here because loopSend0() may result in a delayed loopSend() call. - autoBatchFlushEndPointContext.hasOngoingSendLoop.exit(); - if (taskQueue.isEmpty()) { - return; - } - entered = false; - // // Guarantee thread-safety: no dangling tasks in the queue. - } } while (--remainingSpinnCount > 0); - final boolean finalEntered = entered; - // Don't need to exitUnsafe since we still have an ongoing consume tasks in this thread. - chan.eventLoop().execute(() -> loopSend(chan, finalEntered)); + if (remainingSpinnCount <= 0) { + // Don't need to exitUnsafe since we still have an ongoing consume tasks in this thread. + chan.eventLoop().execute(() -> loopSend(chan, entered)); + return; + } + + if (entered) { + // queue was empty + // The send loop will be triggered later when a new task is added, + autoBatchFlushEndPointContext.hasOngoingSendLoop.exit(); + // Guarantee thread-safety: no dangling tasks in the queue, see scheduleSendJobIfNeeded() + if (!taskQueue.isEmpty()) { + loopSend0(autoBatchFlushEndPointContext, chan, remainingSpinnCount, false); + } + } } private int pollBatch(final AutoBatchFlushEndPointContext autoBatchFlushEndPointContext, ContextualChannel chan) { @@ -771,24 +781,6 @@ private void trySetEndpointQuiescence(ContextualChannel chan) { } } - private void onEndpointQuiescence() { - if (channel.context.initialState == ConnectionContext.State.ENDPOINT_CLOSED) { - return; - } - - this.logPrefix = null; - // Create happens-before with channelActive() - if (!CHANNEL.compareAndSet(this, DummyContextualChannelInstances.CHANNEL_WILL_RECONNECT, - DummyContextualChannelInstances.CHANNEL_CONNECTING)) { - onUnexpectedState("onEndpointQuiescence", ConnectionContext.State.WILL_RECONNECT); - return; - } - - // neither connectionWatchdog nor doReconnectOnEndpointQuiescence could be null - // noinspection DataFlowIssue - connectionWatchdog.reconnectOnAutoBatchFlushEndpointQuiescence(); - } - private void onWillReconnect(@Nonnull final ConnectionContext.CloseStatus closeStatus, final AutoBatchFlushEndPointContext autoBatchFlushEndPointContext) { final @Nullable Deque> retryableFailedToSendTasks = autoBatchFlushEndPointContext @@ -831,6 +823,25 @@ private void onWontReconnect(@Nonnull final ConnectionContext.CloseStatus closeS } } + private void onEndpointQuiescence() { + if (channel.context.initialState == ConnectionContext.State.ENDPOINT_CLOSED) { + return; + } + + this.logPrefix = null; + // Create happens-before with channelActive() + if (!CHANNEL.compareAndSet(this, DummyContextualChannelInstances.CHANNEL_WILL_RECONNECT, + DummyContextualChannelInstances.CHANNEL_CONNECTING)) { + onUnexpectedState("onEndpointQuiescence", ConnectionContext.State.WILL_RECONNECT); + return; + } + + // notify connectionWatchDog that it is safe to reconnect now. + // neither connectionWatchdog nor doReconnectOnEndpointQuiescence could be null + // noinspection DataFlowIssue + connectionWatchdog.reconnectOnAutoBatchFlushEndpointQuiescence(); + } + private void offerFirstAll(Deque> commands) { commands.forEach(cmd -> { if (cmd instanceof DemandAware.Sink) {