Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: auto batch flush (more than 2x times performance gain) #2950

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e63ef39
perf: auto batch flush
okg-cxf Jul 24, 2024
63bb957
fix: add notifyChannelInactiveAfterWatchdogDecision
okg-cxf Jul 24, 2024
f5cc938
chore: watchdog pass onEndpointQuiescence callback instead
okg-cxf Jul 24, 2024
96cfe7b
perf: use platform dependent mpsc queue, refine code style
okg-cxf Jul 25, 2024
1357f49
chore: use compareAndSet instead of weakCompareAndSet
okg-cxf Jul 25, 2024
f627587
chore: remove unused log
okg-cxf Jul 25, 2024
de67728
perf: use RECYCLE for WrittenToChannel
okg-cxf Jul 25, 2024
70c4677
chore: remove getters
okg-cxf Jul 29, 2024
ae8b533
fix: queue size not correct
okg-cxf Jul 29, 2024
aa9aec3
perf: add busyLoop mode
okg-cxf Aug 1, 2024
275b7fc
fix: ensure hasOngoingSendLoop.exitSafe()
okg-cxf Aug 7, 2024
6897b05
perf: batch offer
okg-cxf Aug 8, 2024
0d3df3b
Revert "perf: add busyLoop mode"
okg-cxf Aug 8, 2024
b4a90ab
chore: add asserts, revert jetbrain#NotNull
okg-cxf Aug 8, 2024
fb65b0a
refactor: remove tryEnterUnsafe/exitUnsafe
okg-cxf Aug 8, 2024
3241d42
chore: handle eventLoop.inEventLoop() case of scheduleSendJobIfNeeded()
okg-cxf Aug 8, 2024
205813a
refactor: rename BatchFlushEndpoint->AutoBatchFlushEndpoint
okg-cxf Aug 8, 2024
2b57334
refactor: rename BatchFlushEndpointContext->AutoBatchFlushEndpointCon…
okg-cxf Aug 8, 2024
19e6463
refactor: better naming
okg-cxf Aug 8, 2024
0d94a5b
refactor: better error msg for ConnectionWatchDog
okg-cxf Aug 9, 2024
4b41a12
chore: remove unused code
okg-cxf Aug 9, 2024
a0da050
chore: default batch size to 20
okg-cxf Aug 9, 2024
51c57cb
chore: simplify code, avoid recursion
okg-cxf Aug 9, 2024
799a3b5
chore: add isEmpty(), fix logPrefix
okg-cxf Aug 9, 2024
63ec4f9
chore: ensure thread safety for taskQueue consuming
okg-cxf Aug 14, 2024
c88b52f
chore: add useMpscQueue to AutoBatchFlushOptions, default to true
okg-cxf Aug 14, 2024
cc01006
chore: remove unused code
okg-cxf Aug 14, 2024
e8d515a
chore: cancel commands in initialState()/reset() if not using mpsc queue
okg-cxf Aug 14, 2024
8ed038c
fix: closing unexpected channel in internalCloseConnectionIfNeeded
okg-cxf Aug 16, 2024
7a54c8b
fix: activiation command should be sent immediately upon channelActiv…
okg-cxf Aug 16, 2024
e328aa0
fix: revert protectMode upon channelActive event, complete non-retrya…
okg-cxf Aug 17, 2024
a80453e
chore: default batch size to 32, refine code style
okg-cxf Aug 20, 2024
5ad30d0
fix: two bugs: 1, autoBatchFlushEndPointContext.add() should always b…
okg-cxf Aug 29, 2024
82ca33f
chore: refine logging
okg-cxf Aug 29, 2024
24233a8
feat: add OwnershipSynchronizer to abstract consumer migration
okg-cxf Aug 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions src/main/java/io/lettuce/core/AutoBatchFlushOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package io.lettuce.core;

import java.io.Serializable;

import io.lettuce.core.internal.LettuceAssert;

/**
* Options for command timeouts. These options configure how and whether commands time out once they were dispatched. Command
* timeout begins:
* <ul>
* <li>When the command is sent successfully to the transport</li>
* <li>Queued while the connection was inactive</li>
* </ul>
*
* The timeout is canceled upon command completion/cancellation. Timeouts are not tied to a specific API and expire commands
* regardless of the synchronization method provided by the API that was used to enqueue the command.
*
* @author Mark Paluch
* @since 5.1
*/
public class AutoBatchFlushOptions implements Serializable {

public static final boolean DEFAULT_ENABLE_AUTO_BATCH_FLUSH = false;

public static final int DEFAULT_WRITE_SPIN_COUNT = 16;

public static final int DEFAULT_BATCH_SIZE = 32;

public static final boolean DEFAULT_USE_MPSC_QUEUE = true;

private final boolean enableAutoBatchFlush;

private final int writeSpinCount;

private final int batchSize;

private final boolean useMpscQueue;

public AutoBatchFlushOptions(AutoBatchFlushOptions.Builder builder) {
this.enableAutoBatchFlush = builder.enableAutoBatchFlush;
this.writeSpinCount = builder.writeSpinCount;
this.batchSize = builder.batchSize;
this.useMpscQueue = builder.useMpscQueue;
}

/**
* Returns a new {@link AutoBatchFlushOptions.Builder} to construct {@link AutoBatchFlushOptions}.
*/
public static Builder builder() {
return new Builder();
}

/**
* Create a new instance of {@link AutoBatchFlushOptions} with default settings.
*/
public static AutoBatchFlushOptions create() {
return builder().build();
}

/**
* Builder for {@link AutoBatchFlushOptions}.
*/
public static class Builder {

private boolean enableAutoBatchFlush = DEFAULT_ENABLE_AUTO_BATCH_FLUSH;

private int writeSpinCount = DEFAULT_WRITE_SPIN_COUNT;

private int batchSize = DEFAULT_BATCH_SIZE;

private boolean useMpscQueue = DEFAULT_USE_MPSC_QUEUE;

/**
* Enable auto batch flush.
*
* @param enableAutoBatchFlush {@code true} to enable auto batch flush.
* @return {@code this}
*/
public Builder enableAutoBatchFlush(boolean enableAutoBatchFlush) {
this.enableAutoBatchFlush = enableAutoBatchFlush;
return this;
}

/**
* how many times to spin batchPoll() from the task queue
*
* @param writeSpinCount the write spin count
* @return {@code this}
*/
public Builder writeSpinCount(int writeSpinCount) {
LettuceAssert.isPositive(writeSpinCount, "Batch size must be greater than 0");

this.writeSpinCount = writeSpinCount;
return this;
}

/**
* how many commands to batch in a single flush
*
* @param batchSize the batch size
* @return {@code this}
*/
public Builder batchSize(int batchSize) {
LettuceAssert.isPositive(batchSize, "Batch size must be greater than 0");

this.batchSize = batchSize;
return this;
}

/**
* @param useMpscQueue use MPSC queue. If {@code false}, a {@link java.util.concurrent.ConcurrentLinkedQueue} is used,
* which has lower performance but is safer to consume across multiple threads, the option may be removed in the
* future if the mpsc queue is proven to be safe.
* @return {@code this}
*/
public Builder useMpscQueue(boolean useMpscQueue) {
this.useMpscQueue = useMpscQueue;
return this;
}

/**
* Create a new instance of {@link AutoBatchFlushOptions}.
*
* @return new instance of {@link AutoBatchFlushOptions}
*/
public AutoBatchFlushOptions build() {
return new AutoBatchFlushOptions(this);
}

}

/**
* @return {@code true} if auto batch flush is enabled.
*/
public boolean isAutoBatchFlushEnabled() {
return enableAutoBatchFlush;
}

/**
* @return the write spin count
*/
public int getWriteSpinCount() {
return writeSpinCount;
}

/**
* @return the batch size
*/
public int getBatchSize() {
return batchSize;
}

/**
* @return {@code true} if the queue is a MPSC queue
*/
public boolean usesMpscQueue() {
return useMpscQueue;
}

}
31 changes: 26 additions & 5 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class ClientOptions implements Serializable {

public static final TimeoutOptions DEFAULT_TIMEOUT_OPTIONS = TimeoutOptions.create();

public static final AutoBatchFlushOptions DEFAULT_AUTO_BATCH_FLUSH_OPTIONS = AutoBatchFlushOptions.create();

private final boolean autoReconnect;

private final boolean cancelCommandsOnReconnectFailure;
Expand Down Expand Up @@ -97,6 +99,8 @@ public class ClientOptions implements Serializable {

private final TimeoutOptions timeoutOptions;

private final AutoBatchFlushOptions autoBatchFlushOptions;

protected ClientOptions(Builder builder) {
this.autoReconnect = builder.autoReconnect;
this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure;
Expand All @@ -112,6 +116,7 @@ protected ClientOptions(Builder builder) {
this.sslOptions = builder.sslOptions;
this.suspendReconnectOnProtocolFailure = builder.suspendReconnectOnProtocolFailure;
this.timeoutOptions = builder.timeoutOptions;
this.autoBatchFlushOptions = builder.autoBatchFlushOptions;
}

protected ClientOptions(ClientOptions original) {
Expand All @@ -129,6 +134,7 @@ protected ClientOptions(ClientOptions original) {
this.sslOptions = original.getSslOptions();
this.suspendReconnectOnProtocolFailure = original.isSuspendReconnectOnProtocolFailure();
this.timeoutOptions = original.getTimeoutOptions();
this.autoBatchFlushOptions = original.getAutoBatchFlushOptions();
}

/**
Expand Down Expand Up @@ -192,6 +198,8 @@ public static class Builder {

private TimeoutOptions timeoutOptions = DEFAULT_TIMEOUT_OPTIONS;

private AutoBatchFlushOptions autoBatchFlushOptions = DEFAULT_AUTO_BATCH_FLUSH_OPTIONS;

protected Builder() {
}

Expand Down Expand Up @@ -247,8 +255,8 @@ public Builder bufferUsageRatio(int bufferUsageRatio) {
*
* @param policy the policy to use in {@link io.lettuce.core.protocol.CommandHandler}
* @return {@code this}
* @since 6.0
* @see DecodeBufferPolicies
* @since 6.0
*/
public Builder decodeBufferPolicy(DecodeBufferPolicy policy) {

Expand Down Expand Up @@ -295,8 +303,8 @@ public Builder pingBeforeActivateConnection(boolean pingBeforeActivateConnection
*
* @param protocolVersion version to use.
* @return {@code this}
* @since 6.0
* @see ProtocolVersion#newestSupported()
* @since 6.0
*/
public Builder protocolVersion(ProtocolVersion protocolVersion) {

Expand All @@ -315,9 +323,9 @@ public Builder protocolVersion(ProtocolVersion protocolVersion) {
*
* @param publishOnScheduler true/false
* @return {@code this}
* @since 5.2
* @see org.reactivestreams.Subscriber#onNext(Object)
* @see ClientResources#eventExecutorGroup()
* @since 5.2
*/
public Builder publishOnScheduler(boolean publishOnScheduler) {
this.publishOnScheduler = publishOnScheduler;
Expand Down Expand Up @@ -422,6 +430,17 @@ public Builder timeoutOptions(TimeoutOptions timeoutOptions) {
return this;
}

/**
* Sets the {@link AutoBatchFlushOptions}
*
* @param autoBatchFlushOptions must not be {@code null}.
*/
public Builder autoBatchFlushOptions(AutoBatchFlushOptions autoBatchFlushOptions) {
LettuceAssert.notNull(autoBatchFlushOptions, "AutoBatchFlushOptions must not be null");
this.autoBatchFlushOptions = autoBatchFlushOptions;
return this;
}

/**
* Create a new instance of {@link ClientOptions}.
*
Expand All @@ -439,7 +458,6 @@ public ClientOptions build() {
*
* @return a {@link ClientOptions.Builder} to create new {@link ClientOptions} whose settings are replicated from the
* current {@link ClientOptions}.
*
* @since 5.1
*/
public ClientOptions.Builder mutate() {
Expand Down Expand Up @@ -498,7 +516,6 @@ public DecodeBufferPolicy getDecodeBufferPolicy() {
*
* @return zero.
* @since 5.2
*
* @deprecated since 6.0 in favor of {@link DecodeBufferPolicy}.
*/
@Deprecated
Expand Down Expand Up @@ -637,6 +654,10 @@ public TimeoutOptions getTimeoutOptions() {
return timeoutOptions;
}

public AutoBatchFlushOptions getAutoBatchFlushOptions() {
return autoBatchFlushOptions;
}

/**
* Behavior of connections in disconnected state.
*/
Expand Down
Loading
Loading