Skip to content

Commit

Permalink
refactor: rename BatchFlushEndpointContext->AutoBatchFlushEndpointCon…
Browse files Browse the repository at this point in the history
…text
  • Loading branch information
okg-cxf committed Aug 8, 2024
1 parent 205813a commit 0c74788
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
/**
* @author chenxiaofan
*/
public class BatchFlushEndPointContext {
public class AutoBatchFlushEndPointContext {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(BatchFlushEndPointContext.class);
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AutoBatchFlushEndPointContext.class);

public static class HasOngoingSendLoop {

Expand Down Expand Up @@ -51,26 +51,26 @@ public void exit() {

}

BatchFlushEndPointContext() {
AutoBatchFlushEndPointContext() {
}

/**
* Tasks that failed to send (probably due to connection errors)
* Commands that failed to send (probably due to connection errors)
*/
@Nullable
Deque<RedisCommand<?, ?, ?>> retryableFailedToSendTasks = null;
Deque<RedisCommand<?, ?, ?>> retryableFailedToSendCommands = null;

Throwable firstDiscontinueReason = null;

public Throwable getFirstDiscontinueReason() {
return firstDiscontinueReason;
}

private int flyingTaskNum;
private int flyingCmdNum;

@SuppressWarnings("unused")
public int getFlyingTaskNum() {
return flyingTaskNum;
public int getFlyingCmdNum() {
return flyingCmdNum;
}

private int total = 0;
Expand All @@ -83,47 +83,48 @@ public int getTotal() {

public void add(int n) {
this.total += n;
this.flyingTaskNum += n;
this.flyingCmdNum += n;
}

public @Nullable Deque<RedisCommand<?, ?, ?>> getAndClearRetryableFailedToSendTasks() {
final Deque<RedisCommand<?, ?, ?>> old = this.retryableFailedToSendTasks;
// don't set to null so give us a chance to expose potential bugs if there is addRetryableFailedToSendTask() afterwards
this.retryableFailedToSendTasks = UnmodifiableDeque.emptyDeque();
public @Nullable Deque<RedisCommand<?, ?, ?>> getAndClearRetryableFailedToSendCommands() {
final Deque<RedisCommand<?, ?, ?>> old = this.retryableFailedToSendCommands;
// don't set to null so give us a chance to expose potential bugs if there is addRetryableFailedToSendCommand()
// afterwards
this.retryableFailedToSendCommands = UnmodifiableDeque.emptyDeque();
return old;
}

public void done(int n) {
this.flyingTaskNum -= n;
this.flyingCmdNum -= n;
}

public boolean isDone() {
if (this.flyingTaskNum < 0) {
logger.error("[unexpected] flyingTaskNum < 0, flyingTaskNum: {}, total: {}", this.flyingTaskNum, this.total);
if (this.flyingCmdNum < 0) {
logger.error("[unexpected] flyingCmdNum < 0, flyingCmdNum: {}, total: {}", this.flyingCmdNum, this.total);
return true;
}
return this.flyingTaskNum == 0;
return this.flyingCmdNum == 0;
}

public boolean hasRetryableFailedToSendTasks() {
return retryableFailedToSendTasks != null;
public boolean hasRetryableFailedToSendCommands() {
return retryableFailedToSendCommands != null;
}

/**
* @param retryableTask retryable task
* @param retryableCommand retryable command
* @param cause fail reason
* @return true if this is the first retryable failed task
* @return true if this is the first retryable failed command
*/
public boolean addRetryableFailedToSendTask(RedisCommand<?, ?, ?> retryableTask, @Nonnull Throwable cause) {
if (retryableFailedToSendTasks == null) {
retryableFailedToSendTasks = new ArrayDeque<>();
retryableFailedToSendTasks.add(retryableTask);
public boolean addRetryableFailedToSendCommand(RedisCommand<?, ?, ?> retryableCommand, @Nonnull Throwable cause) {
if (retryableFailedToSendCommands == null) {
retryableFailedToSendCommands = new ArrayDeque<>();
retryableFailedToSendCommands.add(retryableCommand);

firstDiscontinueReason = cause;
return true;
}

retryableFailedToSendTasks.add(retryableTask);
retryableFailedToSendCommands.add(retryableCommand);
return false;
}

Expand Down
8 changes: 2 additions & 6 deletions src/main/java/io/lettuce/core/context/ConnectionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ public boolean isConnected() {

public final State initialState;

public final BatchFlushEndPointContext batchFlushEndPointContext;
public final AutoBatchFlushEndPointContext autoBatchFlushEndPointContext;

public ConnectionContext(State initialState) {
this.initialState = initialState;
this.batchFlushEndPointContext = new BatchFlushEndPointContext();
this.autoBatchFlushEndPointContext = new AutoBatchFlushEndPointContext();
}

/* below fields must be accessed by the event loop thread only */
Expand All @@ -92,10 +92,6 @@ public boolean isChannelInactiveEventFired() {

private boolean channelQuiescent = false;

public boolean isChannelQuiescent() {
return channelQuiescent;
}

public boolean setChannelQuiescentOnce() {
if (channelQuiescent) {
return false;
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/io/lettuce/core/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCom

private final Endpoint endpoint;

private final boolean supportsBatchFlush;
private final boolean supportsAutoBatchFlush;

private final ArrayDeque<RedisCommand<?, ?, ?>> stack = new ArrayDeque<>();

Expand Down Expand Up @@ -154,7 +154,7 @@ public CommandHandler(ClientOptions clientOptions, ClientResources clientResourc
this.clientOptions = clientOptions;
this.clientResources = clientResources;
this.endpoint = endpoint;
this.supportsBatchFlush = endpoint instanceof AutoBatchFlushEndpoint;
this.supportsAutoBatchFlush = endpoint instanceof AutoBatchFlushEndpoint;
this.commandLatencyRecorder = clientResources.commandLatencyRecorder();
this.latencyMetricsEnabled = commandLatencyRecorder.isEnabled();
this.boundedQueues = clientOptions.getRequestQueueSize() != Integer.MAX_VALUE;
Expand Down Expand Up @@ -377,9 +377,9 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
setState(LifecycleState.DEACTIVATING);

endpoint.notifyChannelInactive(ctx.channel());
Deque<RedisCommand<?, ?, ?>> batchFlushRetryableDrainQueuedCommands = UnmodifiableDeque.emptyDeque();
if (supportsBatchFlush) {
batchFlushRetryableDrainQueuedCommands = drainStack();
Deque<RedisCommand<?, ?, ?>> autoBatchFlushRetryableDrainQueuedCommands = UnmodifiableDeque.emptyDeque();
if (supportsAutoBatchFlush) {
autoBatchFlushRetryableDrainQueuedCommands = drainStack();
} else {
endpoint.notifyDrainQueuedCommands(this);
}
Expand All @@ -397,10 +397,10 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {

super.channelInactive(ctx);

if (supportsBatchFlush) {
if (supportsAutoBatchFlush) {
// Needs decision of watchdog
((AutoBatchFlushEndpoint) endpoint).notifyChannelInactiveAfterWatchdogDecision(ctx.channel(),
batchFlushRetryableDrainQueuedCommands);
autoBatchFlushRetryableDrainQueuedCommands);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {

private final String epid;

private final boolean useBatchFlushEndpoint;
private final boolean useAutoBatchFlushEndpoint;

private final Endpoint endpoint;

Expand Down Expand Up @@ -149,7 +149,7 @@ public ConnectionWatchdog(Delay reconnectDelay, ClientOptions clientOptions, Boo
this.redisUri = (String) bootstrap.config().attrs().get(ConnectionBuilder.REDIS_URI);
this.epid = endpoint.getId();
this.endpoint = endpoint;
this.useBatchFlushEndpoint = endpoint instanceof AutoBatchFlushEndpoint;
this.useAutoBatchFlushEndpoint = endpoint instanceof AutoBatchFlushEndpoint;

Mono<SocketAddress> wrappedSocketAddressSupplier = socketAddressSupplier.doOnNext(addr -> remoteAddress = addr)
.onErrorResume(t -> {
Expand Down Expand Up @@ -226,7 +226,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}

doReconnectOnEndpointQuiescence = this::scheduleReconnect;
if (!useBatchFlushEndpoint) {
if (!useAutoBatchFlushEndpoint) {
doReconnectOnEndpointQuiescence.run();
}
// otherwise, will be called later by BatchFlushEndpoint#onEndpointQuiescence
Expand Down Expand Up @@ -307,7 +307,7 @@ private void notifyEndpointFailedToConnectIfNeeded() {
}

private void notifyEndpointFailedToConnectIfNeeded(Exception e) {
if (useBatchFlushEndpoint) {
if (useAutoBatchFlushEndpoint) {
((AutoBatchFlushEndpoint) endpoint).notifyReconnectFailed(e);
}
}
Expand Down
Loading

0 comments on commit 0c74788

Please sign in to comment.