Skip to content
Draft
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9d5f0f3
Introduce push handler
ggivo Jun 20, 2025
0f96e2d
Introduce PushHandlerChain for composable push event handling
ggivo Jun 23, 2025
4aec187
Handle relax timeout for maintenance events
ggivo Jun 25, 2025
67d4afa
Merge branch 'master' into feature/hu-notifications
ggivo Jul 2, 2025
ab62e9d
Support custom Push listeners for Jedis client
ggivo Jul 1, 2025
b36d7f8
Add proactiveRebindEnabled configuration option
ggivo Jul 3, 2025
0452a93
PushHandler is now provided through JedisClientConfig instead through…
ggivo Jul 4, 2025
4bd38da
Fix NPE in CacheConnection
ggivo Jul 4, 2025
4ad3fce
[cleanup] Use weak reference in AdaptiveTimeoutHandler to avoid memor…
ggivo Jul 7, 2025
f475177
[cleanup] Fix javadoc errors
ggivo Jul 7, 2025
56cd409
[cleanup] Fix TransactionCommandsTest mocked test
ggivo Jul 7, 2025
175b773
Moving/Rebind initial support
ggivo Jul 9, 2025
c19d8a5
Mocked relaxed timeout test
ggivo Jul 9, 2025
4ee525d
Mocked rebind test
ggivo Jul 9, 2025
b940ad7
Fix : wrong order connection.rebind pool.clear
ggivo Jul 10, 2025
7efbe72
[clean up] Address review comments from a-TODO-rov
ggivo Jul 10, 2025
b0987e0
add more rebind tests
ggivo Jul 11, 2025
7e11737
clean up
ggivo Jul 11, 2025
b769492
clean up remove unused test method
ggivo Jul 11, 2025
e7ffd1d
fix relaxed timeout on blocking command
ggivo Jul 21, 2025
a67b5cb
format
ggivo Jul 21, 2025
d34f72d
Merge branch 'redis:master' into feature/hu-notifications
ggivo Jul 28, 2025
5a59b85
enforce code formating for new classes
ggivo Jul 28, 2025
5d6c322
reformat to fix java docs
ggivo Jul 28, 2025
c6dec9b
force formating of TimeoutOptions.java
ggivo Jul 28, 2025
f5d57d4
Address review comments
ggivo Jul 28, 2025
15dc654
Address review comments
ggivo Jul 29, 2025
0e39aa4
Address review comments
ggivo Jul 29, 2025
8d9a43f
format ConnectionTestHelper
ggivo Jul 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
325 changes: 310 additions & 15 deletions src/main/java/redis/clients/jedis/Connection.java

Large diffs are not rendered by default.

16 changes: 15 additions & 1 deletion src/main/java/redis/clients/jedis/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
/**
* PoolableObjectFactory custom impl.
*/
public class ConnectionFactory implements PooledObjectFactory<Connection> {
public class ConnectionFactory implements PooledObjectFactory<Connection> , RebindAware {

private static final Logger logger = LoggerFactory.getLogger(ConnectionFactory.class);

Expand Down Expand Up @@ -140,4 +140,18 @@ private void reAuthenticate(Connection jedis) throws Exception {
throw e;
}
}


@Override
public void rebind(HostAndPort newHostAndPort) {
// TODO : extract interface from DefaultJedisSocketFactory so that we can support custom socket factories
if (!(jedisSocketFactory instanceof DefaultJedisSocketFactory)) {
throw new IllegalStateException("Rebind not supported for custom JedisSocketFactory implementations");
}

DefaultJedisSocketFactory factory = (DefaultJedisSocketFactory) jedisSocketFactory;
logger.debug("Rebinding to {}", newHostAndPort);
factory.updateHostAndPort(newHostAndPort);
}

}
35 changes: 35 additions & 0 deletions src/main/java/redis/clients/jedis/ConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,32 @@
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.authentication.AuthXManager;
import redis.clients.jedis.csc.Cache;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.Pool;

import java.util.concurrent.atomic.AtomicReference;

public class ConnectionPool extends Pool<Connection> {

private AuthXManager authXManager;

public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
this(new ConnectionFactory(hostAndPort, clientConfig));
attachAuthenticationListener(clientConfig.getAuthXManager());
attachRebindHandler(clientConfig, (ConnectionFactory) this.getFactory());
}

@Experimental
public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig,
Cache clientSideCache) {
this(new ConnectionFactory(hostAndPort, clientConfig, clientSideCache));
attachAuthenticationListener(clientConfig.getAuthXManager());
attachRebindHandler(clientConfig, (ConnectionFactory) this.getFactory());
}

public ConnectionPool(PooledObjectFactory<Connection> factory) {
Expand All @@ -33,13 +39,15 @@ public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig) {
this(new ConnectionFactory(hostAndPort, clientConfig), poolConfig);
attachAuthenticationListener(clientConfig.getAuthXManager());
attachRebindHandler(clientConfig, (ConnectionFactory) this.getFactory());
}

@Experimental
public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig,
Cache clientSideCache, GenericObjectPoolConfig<Connection> poolConfig) {
this(new ConnectionFactory(hostAndPort, clientConfig, clientSideCache), poolConfig);
attachAuthenticationListener(clientConfig.getAuthXManager());
attachRebindHandler(clientConfig, (ConnectionFactory) this.getFactory());
}

public ConnectionPool(PooledObjectFactory<Connection> factory,
Expand Down Expand Up @@ -78,4 +86,31 @@ private void attachAuthenticationListener(AuthXManager authXManager) {
});
}
}

private void attachRebindHandler(JedisClientConfig clientConfig, ConnectionFactory factory) {
if (clientConfig.isProactiveRebindEnabled()) {
RebindHandler rebindHandler = new RebindHandler(this, factory);
clientConfig.getMaintenanceEventHandler().addListener(rebindHandler);
}
}

private static class RebindHandler implements MaintenanceEventListener {
private final ConnectionPool pool;
private final ConnectionFactory factory;
private final AtomicReference<HostAndPort> rebindTarget = new AtomicReference<>();

public RebindHandler(ConnectionPool pool, ConnectionFactory factory) {
this.pool = pool;
this.factory = factory;
}

@Override
public void onRebind(HostAndPort target) {
HostAndPort previous = rebindTarget.getAndSet(target);
if (previous != target) {
this.factory.rebind(target);
this.pool.clear();
}
}
}
}
76 changes: 76 additions & 0 deletions src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ public final class DefaultJedisClientConfig implements JedisClientConfig {

private final AuthXManager authXManager;

private final TimeoutOptions timeoutOptions;

private final boolean proactiveRebindEnabled;

private final PushHandler pushHandler;

private final MaintenanceEventHandler maintenanceEventHandler;

private DefaultJedisClientConfig(DefaultJedisClientConfig.Builder builder) {
this.redisProtocol = builder.redisProtocol;
this.connectionTimeoutMillis = builder.connectionTimeoutMillis;
Expand All @@ -50,6 +58,17 @@ private DefaultJedisClientConfig(DefaultJedisClientConfig.Builder builder) {
this.clientSetInfoConfig = builder.clientSetInfoConfig;
this.readOnlyForRedisClusterReplicas = builder.readOnlyForRedisClusterReplicas;
this.authXManager = builder.authXManager;
this.timeoutOptions = builder.timeoutOptions;
this.proactiveRebindEnabled = builder.proactiveRebindEnabled;
this.pushHandler = builder.pushHandler;

if ((builder.proactiveRebindEnabled || TimeoutOptions.isRelaxedTimeoutEnabled(builder.timeoutOptions.getRelaxedTimeout()))
&& builder.maintenanceEventHandler == null) {
// Proactive rebind or relaxed timeouts require a maintenance event handler
this.maintenanceEventHandler = new MaintenanceEventHandlerImpl();
} else {
this.maintenanceEventHandler = builder.maintenanceEventHandler;
}
}

@Override
Expand Down Expand Up @@ -143,6 +162,27 @@ public boolean isReadOnlyForRedisClusterReplicas() {
return readOnlyForRedisClusterReplicas;
}

@Override
public TimeoutOptions getTimeoutOptions() {
return timeoutOptions;
}

@Override
public boolean isProactiveRebindEnabled() {
return proactiveRebindEnabled;
}

@Override
public PushHandler getPushHandler() {
return pushHandler;
}


@Override
public MaintenanceEventHandler getMaintenanceEventHandler() {
return maintenanceEventHandler;
}

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -175,6 +215,14 @@ public static class Builder {

private AuthXManager authXManager = null;

private TimeoutOptions timeoutOptions = TimeoutOptions.create();

private boolean proactiveRebindEnabled = false;

private PushHandler pushHandler = null;

private MaintenanceEventHandler maintenanceEventHandler = null;

private Builder() {
}

Expand Down Expand Up @@ -297,6 +345,26 @@ public Builder authXManager(AuthXManager authXManager) {
return this;
}

public Builder timeoutOptions(TimeoutOptions timeoutOptions) {
this.timeoutOptions = timeoutOptions;
return this;
}

public Builder proactiveRebindEnabled(boolean proactiveRebindEnabled) {
this.proactiveRebindEnabled = proactiveRebindEnabled;
return this;
}

public Builder pushHandler(PushHandler pushHandler) {
this.pushHandler = pushHandler;
return this;
}

public Builder maintenanceEventHandler(MaintenanceEventHandler maintenanceEventHandler) {
this.maintenanceEventHandler = maintenanceEventHandler;
return this;
}

public Builder from(JedisClientConfig instance) {
this.redisProtocol = instance.getRedisProtocol();
this.connectionTimeoutMillis = instance.getConnectionTimeoutMillis();
Expand All @@ -314,6 +382,10 @@ public Builder from(JedisClientConfig instance) {
this.clientSetInfoConfig = instance.getClientSetInfoConfig();
this.readOnlyForRedisClusterReplicas = instance.isReadOnlyForRedisClusterReplicas();
this.authXManager = instance.getAuthXManager();
this.timeoutOptions = instance.getTimeoutOptions();
this.proactiveRebindEnabled = instance.isProactiveRebindEnabled();
this.pushHandler = instance.getPushHandler();
this.maintenanceEventHandler = instance.getMaintenanceEventHandler();
return this;
}
}
Expand Down Expand Up @@ -375,6 +447,10 @@ public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) {
}

builder.authXManager(copy.getAuthXManager());
builder.timeoutOptions(copy.getTimeoutOptions());
if (copy.isProactiveRebindEnabled()) {
builder.proactiveRebindEnabled(true);
}

return builder.build();
}
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/redis/clients/jedis/JedisClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,32 @@ default boolean isReadOnlyForRedisClusterReplicas() {
default ClientSetInfoConfig getClientSetInfoConfig() {
return ClientSetInfoConfig.DEFAULT;
}

default TimeoutOptions getTimeoutOptions() {
return TimeoutOptions.create();
}

/**
* Configure whether the driver should listen for server events that indicate the current endpoint is being re-bound.
* When enabled, the proactive re-bind will help with the connection handover and reduce the number of failed commands.
* This feature requires the server to support proactive re-binds.
* Enabling this feature requires also setting a {@link #getMaintenanceEventHandler() maintenance event handler}
*
* Defaults to {@code false}.
*/
default boolean isProactiveRebindEnabled() {
return false;
}

default PushHandler getPushHandler() {
return null;
}

/**
* @return The event handler to use for server maintenance events.
*/
default MaintenanceEventHandler getMaintenanceEventHandler(){
return null;
}

}
18 changes: 18 additions & 0 deletions src/main/java/redis/clients/jedis/MaintenanceEventHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package redis.clients.jedis;

import java.util.Collection;

public interface MaintenanceEventHandler {


void addListener(MaintenanceEventListener listener);


void removeListener(MaintenanceEventListener listener);


void removeAllListeners();


Collection<MaintenanceEventListener> getListeners();
}
29 changes: 29 additions & 0 deletions src/main/java/redis/clients/jedis/MaintenanceEventHandlerImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package redis.clients.jedis;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class MaintenanceEventHandlerImpl implements MaintenanceEventHandler {
private final List<MaintenanceEventListener> listeners = new CopyOnWriteArrayList<>();

@Override
public void addListener(MaintenanceEventListener listener) {
listeners.add(listener);
}

@Override
public void removeListener(MaintenanceEventListener listener) {
listeners.remove(listener);
}

@Override
public void removeAllListeners() {
listeners.clear();
}

@Override
public Collection<MaintenanceEventListener> getListeners() {
return listeners;
}
}
15 changes: 15 additions & 0 deletions src/main/java/redis/clients/jedis/MaintenanceEventListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package redis.clients.jedis;


public interface MaintenanceEventListener {

default void onMigrating(){};

default void onMigrated(){};

default void onFailOver(){};

default void onFailedOver(){};

default void onRebind( HostAndPort target){};
}
Loading
Loading