Skip to content
Draft
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9d5f0f3
Introduce push handler
ggivo Jun 20, 2025
0f96e2d
Introduce PushHandlerChain for composable push event handling
ggivo Jun 23, 2025
4aec187
Handle relax timeout for maintenance events
ggivo Jun 25, 2025
67d4afa
Merge branch 'master' into feature/hu-notifications
ggivo Jul 2, 2025
ab62e9d
Support custom Push listeners for Jedis client
ggivo Jul 1, 2025
b36d7f8
Add proactiveRebindEnabled configuration option
ggivo Jul 3, 2025
0452a93
PushHandler is now provided through JedisClientConfig instead through…
ggivo Jul 4, 2025
4bd38da
Fix NPE in CacheConnection
ggivo Jul 4, 2025
4ad3fce
[cleanup] Use weak reference in AdaptiveTimeoutHandler to avoid memor…
ggivo Jul 7, 2025
f475177
[cleanup] Fix javadoc errors
ggivo Jul 7, 2025
56cd409
[cleanup] Fix TransactionCommandsTest mocked test
ggivo Jul 7, 2025
175b773
Moving/Rebind initial support
ggivo Jul 9, 2025
c19d8a5
Mocked relaxed timeout test
ggivo Jul 9, 2025
4ee525d
Mocked rebind test
ggivo Jul 9, 2025
b940ad7
Fix : wrong order connection.rebind pool.clear
ggivo Jul 10, 2025
7efbe72
[clean up] Address review comments from a-TODO-rov
ggivo Jul 10, 2025
b0987e0
add more rebind tests
ggivo Jul 11, 2025
7e11737
clean up
ggivo Jul 11, 2025
b769492
clean up remove unused test method
ggivo Jul 11, 2025
e7ffd1d
fix relaxed timeout on blocking command
ggivo Jul 21, 2025
a67b5cb
format
ggivo Jul 21, 2025
d34f72d
Merge branch 'redis:master' into feature/hu-notifications
ggivo Jul 28, 2025
5a59b85
enforce code formating for new classes
ggivo Jul 28, 2025
5d6c322
reformat to fix java docs
ggivo Jul 28, 2025
c6dec9b
force formating of TimeoutOptions.java
ggivo Jul 28, 2025
f5d57d4
Address review comments
ggivo Jul 28, 2025
15dc654
Address review comments
ggivo Jul 29, 2025
0e39aa4
Address review comments
ggivo Jul 29, 2025
8d9a43f
format ConnectionTestHelper
ggivo Jul 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions src/main/java/redis/clients/jedis/AdaptiveTimeoutHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package redis.clients.jedis;

import java.lang.ref.WeakReference;
import redis.clients.jedis.annots.Experimental;

/**
* Implementation of {@link PushConsumer } that manages connection timeout relaxation
* during Redis server maintenance events like migration and failover.
*/
@Experimental
public class AdaptiveTimeoutHandler implements PushConsumer {

private final WeakReference<Connection> connectionRef;

/**
* Creates a new maintenance listener for the specified connection.
*
* @param connection The connection to manage timeouts for
*/
public AdaptiveTimeoutHandler(Connection connection) {
this.connectionRef = new WeakReference<>(connection);
}

@Override
public void accept(PushConsumerContext context) {
String type = context.getMessage().getType();

switch (type) {
case "MIGRATING":
onMigrating();
break;
case "MIGRATED":
onMigrated();;
break;
case "FAILING_OVER":
onFailOver();
break;
case "FAILED_OVER":
onFailedOver();
break;
}
}

private void onMigrating() {
Connection connection = connectionRef.get();
if (connection != null) {
connection.relaxTimeouts();
}
}

private void onMigrated() {
Connection connection = connectionRef.get();
if (connection != null) {
connection.disableRelaxedTimeout();
}
}

private void onFailOver() {
Connection connection = connectionRef.get();
if (connection != null) {
connection.relaxTimeouts();
}
}

private void onFailedOver() {
Connection connection = connectionRef.get();
if (connection != null) {
connection.disableRelaxedTimeout();
}
}
}
160 changes: 147 additions & 13 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package redis.clients.jedis;

import static redis.clients.jedis.PushConsumerChain.PROPAGATE_ALL_HANDLER;
import static redis.clients.jedis.util.SafeEncoder.encode;

import java.io.Closeable;
Expand All @@ -9,13 +10,16 @@
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.Protocol.Keyword;
import redis.clients.jedis.annots.Experimental;
Expand All @@ -32,6 +36,7 @@
import redis.clients.jedis.util.RedisOutputStream;

public class Connection implements Closeable {
public static Logger logger = LoggerFactory.getLogger(Connection.class);

private ConnectionPool memberOf;
protected RedisProtocol protocol;
Expand All @@ -40,6 +45,7 @@ public class Connection implements Closeable {
private RedisOutputStream outputStream;
private RedisInputStream inputStream;
private int soTimeout = 0;
private Duration relaxedTimeout = TimeoutOptions.DISABLED_TIMEOUT;
private int infiniteSoTimeout = 0;
private boolean broken = false;
private boolean strValActive;
Expand All @@ -48,6 +54,11 @@ public class Connection implements Closeable {
protected String version;
private AtomicReference<RedisCredentials> currentCredentials = new AtomicReference<>(null);
private AuthXManager authXManager;
private boolean relaxedTimeoutActive = false;


protected PushConsumerChain pushConsumer;
private PushHandler pushHandler;

public Connection() {
this(Protocol.DEFAULT_HOST, Protocol.DEFAULT_PORT);
Expand All @@ -68,15 +79,53 @@ public Connection(final HostAndPort hostAndPort, final JedisClientConfig clientC
public Connection(final JedisSocketFactory socketFactory) {
this.socketFactory = socketFactory;
this.authXManager = null;

initPushConsumers(null);
}

public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clientConfig) {
this.socketFactory = socketFactory;
this.soTimeout = clientConfig.getSocketTimeoutMillis();
this.infiniteSoTimeout = clientConfig.getBlockingSocketTimeoutMillis();
this.relaxedTimeout = clientConfig.getTimeoutOptions().getRelaxedTimeout();

initPushConsumers(clientConfig);
initializeFromClientConfig(clientConfig);
}


protected void initPushConsumers(JedisClientConfig config) {
/*
* Default consumers to process push messages.
* Marks all @{link PushMessage}s as processed, except for pub/sub.
* Pub/sub messages are propagated to the client.
*/
this.pushConsumer = PushConsumerChain.of(
PushConsumerChain.CONSUME_ALL_HANDLER,
PushConsumerChain.PUBSUB_ONLY_HANDLER
);

if (config != null) {
/*
* If the user has enabled relaxed timeouts, add consumer to handle push messages
* related to server maintenance events.
*/
if (TimeoutOptions.isRelaxedTimeoutEnabled(config.getTimeoutOptions().getRelaxedTimeout())) {
PushConsumer maintenanceHandler = new AdaptiveTimeoutHandler(Connection.this);
this.pushConsumer.add(maintenanceHandler);
}

/*
* If the user has provided a {@link PushHandler},
* add consumer to notify {@link PushListener}s, without changing the processed flag.
*/
pushHandler = config.getPushHandler();
if (this.pushHandler != null) {
this.pushConsumer.add(new ListenerNotificationConsumer(pushHandler));
}
}
}

@Override
public String toString() {
return getClass().getSimpleName() + "{" + socketFactory + "}";
Expand Down Expand Up @@ -303,7 +352,7 @@ public void setBroken() {

public String getStatusCodeReply() {
flush();
final byte[] resp = (byte[]) readProtocolWithCheckingBroken();
final byte[] resp = (byte[]) readProtocolWithCheckingBroken(pushConsumer);
if (null == resp) {
return null;
} else {
Expand All @@ -322,12 +371,12 @@ public String getBulkReply() {

public byte[] getBinaryBulkReply() {
flush();
return (byte[]) readProtocolWithCheckingBroken();
return (byte[]) readProtocolWithCheckingBroken(pushConsumer);
}

public Long getIntegerReply() {
flush();
return (Long) readProtocolWithCheckingBroken();
return (Long) readProtocolWithCheckingBroken(pushConsumer);
}

public List<String> getMultiBulkReply() {
Expand All @@ -337,7 +386,7 @@ public List<String> getMultiBulkReply() {
@SuppressWarnings("unchecked")
public List<byte[]> getBinaryMultiBulkReply() {
flush();
return (List<byte[]>) readProtocolWithCheckingBroken();
return (List<byte[]>) readProtocolWithCheckingBroken(pushConsumer);
}

/**
Expand All @@ -346,28 +395,28 @@ public List<byte[]> getBinaryMultiBulkReply() {
@Deprecated
@SuppressWarnings("unchecked")
public List<Object> getUnflushedObjectMultiBulkReply() {
return (List<Object>) readProtocolWithCheckingBroken();
return (List<Object>) readProtocolWithCheckingBroken(pushConsumer);
}

@SuppressWarnings("unchecked")
public Object getUnflushedObject() {
return readProtocolWithCheckingBroken();
return readProtocolWithCheckingBroken(pushConsumer);
}

public List<Object> getObjectMultiBulkReply() {
flush();
return (List<Object>) readProtocolWithCheckingBroken();
return (List<Object>) readProtocolWithCheckingBroken(pushConsumer);
}

@SuppressWarnings("unchecked")
public List<Long> getIntegerMultiBulkReply() {
flush();
return (List<Long>) readProtocolWithCheckingBroken();
return (List<Long>) readProtocolWithCheckingBroken(pushConsumer);
}

public Object getOne() {
flush();
return readProtocolWithCheckingBroken();
return readProtocolWithCheckingBroken(pushConsumer);
}

protected void flush() {
Expand All @@ -380,21 +429,39 @@ protected void flush() {
}

@Experimental
protected Object protocolRead(RedisInputStream is) {
return Protocol.read(is);
protected Object protocolRead(RedisInputStream is, PushConsumer handler) {
return Protocol.read(is, handler);
}

@Experimental
protected void protocolReadPushes(RedisInputStream is) {
}

protected Object readProtocolWithCheckingBroken(PushConsumer handler) {
if (broken) {
throw new JedisConnectionException("Attempting to read from a broken connection.");
}

try {
return protocolRead(inputStream, handler);
} catch (JedisConnectionException exc) {
broken = true;
throw exc;
}
}

/**
* @deprecated Use {@link #readProtocolWithCheckingBroken(PushConsumer)}
* @return
*/
@Deprecated
protected Object readProtocolWithCheckingBroken() {
if (broken) {
throw new JedisConnectionException("Attempting to read from a broken connection.");
}

try {
return protocolRead(inputStream);
return protocolRead(inputStream, PROPAGATE_ALL_HANDLER);
} catch (JedisConnectionException exc) {
broken = true;
throw exc;
Expand Down Expand Up @@ -424,7 +491,7 @@ public List<Object> getMany(final int count) {
final List<Object> responses = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
try {
responses.add(readProtocolWithCheckingBroken());
responses.add(readProtocolWithCheckingBroken(pushConsumer));
} catch (JedisDataException e) {
responses.add(e);
}
Expand Down Expand Up @@ -614,4 +681,71 @@ protected boolean isTokenBasedAuthenticationEnabled() {
protected AuthXManager getAuthXManager() {
return authXManager;
}

@Experimental
PushConsumerChain getPushConsumer() {
return this.pushConsumer;
}

@Experimental
public void relaxTimeouts() {
if (!relaxedTimeoutActive && !TimeoutOptions.isRelaxedTimeoutDisabled(relaxedTimeout)) {
relaxedTimeoutActive = true;
try {
if (isConnected()) {
socket.setSoTimeout((int) relaxedTimeout.toMillis());
}
} catch (SocketException ex) {
setBroken();
throw new JedisConnectionException(ex);
}
}
}

@Experimental
public void disableRelaxedTimeout() {
if (relaxedTimeoutActive) {
relaxedTimeoutActive = false;
try {
if (isConnected()) {
socket.setSoTimeout(soTimeout);
}
} catch (SocketException ex) {
setBroken();
throw new JedisConnectionException(ex);
}
}
}

/**
* Push consumer that delegates to a {@link PushHandler} for listener notification.
*/
private static class ListenerNotificationConsumer implements PushConsumer {
private final PushHandler pushHandler;

public ListenerNotificationConsumer(PushHandler pushHandler) {
this.pushHandler = pushHandler;
}

@Override
public void accept(PushConsumerContext context) {
if (pushHandler != null) {
notifyListeners(context.getMessage());
}
}

private void notifyListeners(PushMessage pushMessage) {
try {
pushHandler.getPushListeners().forEach(pushListener -> {
try {
pushListener.onPush(pushMessage);
} catch (Exception e) {
// Log individual listener failures
}
});
} catch (Exception e) {
// Log notification failures
}
}
}
}
Loading
Loading