Skip to content

Commit

Permalink
fix: activiation command should be sent immediately upon channelActiv…
Browse files Browse the repository at this point in the history
…e events
  • Loading branch information
okg-cxf committed Aug 16, 2024
1 parent 8ed038c commit 7a54c8b
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 48 deletions.
4 changes: 0 additions & 4 deletions src/main/java/io/lettuce/core/ContextualChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ public class ContextualChannel implements Channel {

public final ConnectionContext context;

public ConnectionContext getContext() {
return context;
}

public Channel getDelegate() {
return delegate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
*/
package io.lettuce.core.cluster;

import static io.lettuce.core.protocol.CommandType.*;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.time.Duration;
Expand Down Expand Up @@ -57,6 +55,12 @@
import io.lettuce.core.protocol.ConnectionIntent;
import io.lettuce.core.protocol.ConnectionWatchdog;
import io.lettuce.core.protocol.RedisCommand;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import static io.lettuce.core.protocol.CommandType.AUTH;
import static io.lettuce.core.protocol.CommandType.READONLY;
import static io.lettuce.core.protocol.CommandType.READWRITE;

/**
* A thread-safe connection to a Redis Cluster. Multiple threads may share one {@link StatefulRedisClusterConnectionImpl}
Expand All @@ -70,6 +74,8 @@
public class StatefulRedisClusterConnectionImpl<K, V> extends RedisChannelHandler<K, V>
implements StatefulRedisClusterConnection<K, V> {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(StatefulRedisClusterConnectionImpl.class);

private final ClusterPushHandler pushHandler;

protected final RedisCodec<K, V> codec;
Expand Down Expand Up @@ -208,7 +214,13 @@ public CompletableFuture<StatefulRedisConnection<K, V>> getConnectionAsync(Strin
public void activated() {
super.activated();

async.clusterMyId().thenAccept(connectionState::setNodeId);
async.clusterMyId().whenComplete((nodeId, throwable) -> {
if (throwable != null) {
logger.warn("Failed to retrieve current cluster node ID: {}", throwable);
} else {
connectionState.setNodeId(nodeId);
}
});
}

ClusterDistributionChannelWriter getClusterDistributionChannelWriter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
*
* @author Mark Paluch
*/
@SuppressWarnings("DuplicatedCode")
public class DefaultAutoBatchFlushEndpoint implements RedisChannelWriter, AutoBatchFlushEndpoint, PushHandler {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(AutoBatchFlushEndpoint.class);
Expand Down Expand Up @@ -235,27 +236,27 @@ public List<PushListener> getPushListeners() {

@Override
public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {

LettuceAssert.notNull(command, "Command must not be null");

final Throwable validation = validateWrite(1);
final ContextualChannel chan = this.channel;
final Throwable validation = validateWrite(chan, 1, inActivation);
if (validation != null) {
command.completeExceptionally(validation);
return command;
}

try {
if (inActivation) {
// needs write and flush activation command immediately, cannot queue it.
command = processActivationCommand(command);
}

this.taskQueue.offer(command);
QUEUE_SIZE.incrementAndGet(this);
writeAndFlushActivationCommand(chan, command);
} else {
this.taskQueue.offer(command);
QUEUE_SIZE.incrementAndGet(this);

if (autoFlushCommands) {
flushCommands();
if (autoFlushCommands) {
flushCommands();
}
}

} finally {
if (debugEnabled) {
logger.debug("{} write() done", logPrefix());
Expand All @@ -268,25 +269,27 @@ public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
@SuppressWarnings("unchecked")
@Override
public <K, V> Collection<RedisCommand<K, V, ?>> write(Collection<? extends RedisCommand<K, V, ?>> commands) {

LettuceAssert.notNull(commands, "Commands must not be null");

final Throwable validation = validateWrite(commands.size());
final ContextualChannel chan = this.channel;
final Throwable validation = validateWrite(chan, commands.size(), inActivation);
if (validation != null) {
commands.forEach(it -> it.completeExceptionally(validation));
return (Collection<RedisCommand<K, V, ?>>) commands;
}

try {
if (inActivation) {
// needs write and flush activation commands immediately, cannot queue it.
commands = processActivationCommands(commands);
}

this.taskQueue.offer(commands);
QUEUE_SIZE.addAndGet(this, commands.size());
writeAndFlushActivationCommands(chan, commands);
} else {
this.taskQueue.offer(commands);
QUEUE_SIZE.addAndGet(this, commands.size());

if (autoFlushCommands) {
flushCommands();
if (autoFlushCommands) {
flushCommands();
}
}
} finally {
if (debugEnabled) {
Expand All @@ -297,6 +300,19 @@ public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
return (Collection<RedisCommand<K, V, ?>>) commands;
}

private <V, K> void writeAndFlushActivationCommand(ContextualChannel chan, RedisCommand<K, V, ?> command) {
channelWrite(chan, command).addListener(WrittenToChannel.newInstance(this, chan, command, true));
channelFlush(chan);
}

private <V, K> void writeAndFlushActivationCommands(ContextualChannel chan,
Collection<? extends RedisCommand<K, V, ?>> commands) {
for (RedisCommand<?, ?, ?> command : commands) {
channelWrite(chan, command).addListener(WrittenToChannel.newInstance(this, chan, command, true));
}
channelFlush(chan);
}

@Override
public void notifyChannelActive(Channel channel) {
final ContextualChannel contextualChannel = new ContextualChannel(channel, ConnectionContext.State.CONNECTED);
Expand Down Expand Up @@ -494,8 +510,8 @@ public void close() {
}

@Override
@SuppressWarnings("java:S125" /* The comments are necessary to prove the correctness code */)
public CompletableFuture<Void> closeAsync() {

if (debugEnabled) {
logger.debug("{} closeAsync()", logPrefix());
}
Expand Down Expand Up @@ -650,7 +666,7 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
// 2. hasOngoingSendLoop.safe.get() == 1 (volatile read) synchronizes-before
// hasOngoingSendLoop.safe.set(0) (volatile write) in first loopSend0()
// 3. hasOngoingSendLoop.safe.set(0) (volatile write) synchronizes-before
// second loopSend0(), which will call poll() (volatile read of producerIndex)
// taskQueue.isEmpty() (volatile read of producerIndex), which guarantees to see the offered task.
}

private void loopSend(final ContextualChannel chan, boolean entered) {
Expand Down Expand Up @@ -703,13 +719,13 @@ private int pollBatch(final AutoBatchFlushEndPointContext autoBatchFlushEndPoint

if (o instanceof RedisCommand<?, ?, ?>) {
RedisCommand<?, ?, ?> cmd = (RedisCommand<?, ?, ?>) o;
channelWrite(chan, cmd).addListener(WrittenToChannel.newInstance(this, chan, cmd));
channelWrite(chan, cmd).addListener(WrittenToChannel.newInstance(this, chan, cmd, false));
count++;
} else {
@SuppressWarnings("unchecked")
Collection<? extends RedisCommand<?, ?, ?>> commands = (Collection<? extends RedisCommand<?, ?, ?>>) o;
for (RedisCommand<?, ?, ?> cmd : commands) {
channelWrite(chan, cmd).addListener(WrittenToChannel.newInstance(this, chan, cmd));
channelWrite(chan, cmd).addListener(WrittenToChannel.newInstance(this, chan, cmd, false));
}
count += commands.size();
}
Expand Down Expand Up @@ -770,6 +786,7 @@ private void onEndpointQuiescence() {
}

// neither connectionWatchdog nor doReconnectOnEndpointQuiescence could be null
// noinspection DataFlowIssue
connectionWatchdog.reconnectOnAutoBatchFlushEndpointQuiescence();
}

Expand Down Expand Up @@ -839,6 +856,7 @@ private final void onReconnectFailed() {
}

@SafeVarargs
@SuppressWarnings("java:S3776" /* Suppress cognitive complexity warning */)
private final void fulfillCommands(String message, Consumer<RedisCommand<?, ?, ?>> commandConsumer,
Queue<RedisCommand<?, ?, ?>>... queues) {
int totalCancelledTaskNum = 0;
Expand Down Expand Up @@ -901,7 +919,6 @@ private Throwable getFailedToReconnectReason() {
}

private <K, V, T> RedisCommand<K, V, T> processActivationCommand(RedisCommand<K, V, T> command) {

if (!ActivationCommand.isActivationCommand(command)) {
return new ActivationCommand<>(command);
}
Expand All @@ -926,7 +943,7 @@ private <K, V, T> RedisCommand<K, V, T> processActivationCommand(RedisCommand<K,
return commandsToReturn;
}

private Throwable validateWrite(@SuppressWarnings("unused") int commands) {
private Throwable validateWrite(ContextualChannel chan, int commands, boolean isActivationCommand) {
if (isClosed()) {
return new RedisException("Connection is closed");
}
Expand All @@ -936,25 +953,29 @@ private Throwable validateWrite(@SuppressWarnings("unused") int commands) {
return localConnectionErr;
}

if (boundedQueues && queueSize + commands > clientOptions.getRequestQueueSize()) {
if (!isActivationCommand /* activation command should never be excluded due to queue full */ && boundedQueues
&& queueSize + commands > clientOptions.getRequestQueueSize()) {
return new RedisException("Request queue size exceeded: " + clientOptions.getRequestQueueSize()
+ ". Commands are not accepted until the queue size drops.");
}

final ContextualChannel chan = this.channel;
switch (chan.context.initialState) {
final ConnectionContext.State initialState = chan.context.initialState;
final boolean rejectCommandsWhileDisconnectedLocal = this.rejectCommandsWhileDisconnected || isActivationCommand;
switch (initialState) {
case ENDPOINT_CLOSED:
return new RedisException("Connection is closed");
case RECONNECT_FAILED:
return failedToReconnectReason;
case WILL_RECONNECT:
case CONNECTING:
return rejectCommandsWhileDisconnected ? new RedisException("Currently not connected. Commands are rejected.")
return rejectCommandsWhileDisconnectedLocal
? new RedisException("Currently not connected. Commands are rejected.")
: null;
case CONNECTED:
return !chan.isActive() && rejectCommandsWhileDisconnected ? new RedisException("Connection is closed") : null;
return !chan.isActive() && rejectCommandsWhileDisconnectedLocal ? new RedisException("Channel is closed")
: null;
default:
throw new IllegalStateException("unexpected state: " + chan.context.initialState);
throw new IllegalStateException("unexpected state: " + initialState);
}
}

Expand Down Expand Up @@ -1023,6 +1044,8 @@ protected WrittenToChannel newObject(Recycler.Handle<WrittenToChannel> handle) {

private RedisCommand<?, ?, ?> cmd;

private boolean isActivationCommand;

private ContextualChannel chan;

private WrittenToChannel(Recycler.Handle<WrittenToChannel> handle) {
Expand All @@ -1035,21 +1058,32 @@ private WrittenToChannel(Recycler.Handle<WrittenToChannel> handle) {
* @return new instance
*/
static WrittenToChannel newInstance(DefaultAutoBatchFlushEndpoint endpoint, ContextualChannel chan,
RedisCommand<?, ?, ?> command) {
RedisCommand<?, ?, ?> command, boolean isActivationCommand) {

WrittenToChannel entry = RECYCLER.get();

entry.endpoint = endpoint;
entry.chan = chan;
entry.cmd = command;
entry.isActivationCommand = isActivationCommand;

LettuceAssert.assertState(isActivationCommand == ActivationCommand.isActivationCommand(command),
"unexpected: isActivationCommand not match");

return entry;
}

@Override
public void operationComplete(Future<Void> future) {
final AutoBatchFlushEndPointContext autoBatchFlushEndPointContext = chan.context.autoBatchFlushEndPointContext;
try {
if (isActivationCommand) {
if (!future.isSuccess()) {
cmd.completeExceptionally(future.cause());
}
return;
}

final AutoBatchFlushEndPointContext autoBatchFlushEndPointContext = chan.context.autoBatchFlushEndPointContext;
QUEUE_SIZE.decrementAndGet(endpoint);
autoBatchFlushEndPointContext.done(1);

Expand Down Expand Up @@ -1105,7 +1139,7 @@ private Throwable checkSendResult(Future<?> sendFuture) {
}

private boolean shouldNotRetry(Throwable cause) {
return endpoint.reliability == Reliability.AT_MOST_ONCE || ActivationCommand.isActivationCommand(cmd)
return endpoint.reliability == Reliability.AT_MOST_ONCE
|| ExceptionUtils.oneOf(cause, SHOULD_NOT_RETRY_EXCEPTION_TYPES);
}

Expand All @@ -1129,6 +1163,7 @@ private void recycle() {
this.endpoint = null;
this.chan = null;
this.cmd = null;
this.isActivationCommand = false;

handle.recycle(this);
}
Expand Down
20 changes: 12 additions & 8 deletions src/main/java/io/lettuce/core/utils/ExceptionUtils.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package io.lettuce.core.utils;

import io.lettuce.core.output.CommandOutput;
import io.lettuce.core.protocol.RedisCommand;
import io.netty.channel.socket.ChannelOutputShutdownException;
import io.netty.util.internal.logging.InternalLogger;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Function;

import io.lettuce.core.output.CommandOutput;
import io.lettuce.core.protocol.RedisCommand;
import io.netty.channel.socket.ChannelOutputShutdownException;
import io.netty.util.internal.logging.InternalLogLevel;
import io.netty.util.internal.logging.InternalLogger;

public class ExceptionUtils {

Expand All @@ -26,12 +26,16 @@ public static void maybeLogSendError(InternalLogger logger, Throwable cause) {
return;
}

final String message = "Unexpected exception during request: {}";
final InternalLogLevel logLevel;

if (cause instanceof IOException && (SUPPRESS_IO_EXCEPTION_MESSAGES.contains(cause.getMessage())
|| cause instanceof ChannelOutputShutdownException)) {
logger.debug("[maybeLogSendError] error during request: {}", cause.getMessage(), cause);
logLevel = InternalLogLevel.DEBUG;
} else {
logger.error("[maybeLogSendError][attention] unexpected exception during request: {}", cause.getMessage(), cause);
logLevel = InternalLogLevel.WARN;
}
logger.log(logLevel, message, cause.toString(), cause);
}

/**
Expand Down

0 comments on commit 7a54c8b

Please sign in to comment.