Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion debezium-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
<!-- Storages -->

<!-- Redis -->
<version.jedis>4.1.1</version.jedis>
<version.jedis>6.0.0</version.jedis>

<!-- S3 -->
<version.s3>2.17.241</version.s3>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,36 @@ public class RedisCommonConfig {
.withDescription("Use SSL for Redis connection")
.withDefault(DEFAULT_SSL_ENABLED);

private static final String DEFAULT_TRUSTSTORE_PATH = "";
private static final Field PROP_SSL_TRUSTSTORE_PATH = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.truststore.path")
.withDescription("Path to the truststore file")
.withDefault(DEFAULT_TRUSTSTORE_PATH);

private static final String DEFAULT_TRUSTSTORE_PASSWORD = "";
private static final Field PROP_SSL_TRUSTSTORE_PASSWORD = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.truststore.password")
.withDescription("Password for the truststore")
.withDefault(DEFAULT_TRUSTSTORE_PASSWORD);

private static final String DEFAULT_TRUSTSTORE_TYPE = "JKS";
private static final Field PROP_SSL_TRUSTSTORE_TYPE = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.truststore.type")
.withDescription("Type of the truststore (e.g., JKS, PKCS12)")
.withDefault(DEFAULT_TRUSTSTORE_TYPE);

private static final String DEFAULT_KEYSTORE_PATH = "";
private static final Field PROP_SSL_KEYSTORE_PATH = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.keystore.path")
.withDescription("Path to the keystore file")
.withDefault(DEFAULT_KEYSTORE_PATH);

private static final String DEFAULT_KEYSTORE_PASSWORD = "";
private static final Field PROP_SSL_KEYSTORE_PASSWORD = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.keystore.password")
.withDescription("Password for the keystore")
.withDefault(DEFAULT_KEYSTORE_PASSWORD);

private static final String DEFAULT_KEYSTORE_TYPE = "JKS";
private static final Field PROP_SSL_KEYSTORE_TYPE = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.keystore.type")
.withDescription("Type of the keystore (e.g., JKS, PKCS12)")
.withDefault(DEFAULT_KEYSTORE_TYPE);

private static final boolean DEFAULT_HOSTNAME_VERIFICATION = false;
private static final Field PROP_SSL_HOSTNAME_VERIFICATION_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.hostname.verification.enabled")
.withDescription("Enable hostname verification")
Expand Down Expand Up @@ -99,8 +129,15 @@ public class RedisCommonConfig {
private int dbIndex;
private String user;
private String password;

private boolean sslEnabled;
private boolean hostnameVerificationEnabled;
private String truststorePath;
private String truststorePassword;
private String truststoreType;
private String keystorePath;
private String keystorePassword;
private String keystoreType;

private Integer initialRetryDelay;
private Integer maxRetryDelay;
Expand All @@ -126,18 +163,30 @@ public RedisCommonConfig(Configuration config, String prefix) {
}

protected List<Field> getAllConfigurationFields() {
return Collect.arrayListOf(PROP_ADDRESS, PROP_DB_INDEX, PROP_USER, PROP_PASSWORD, PROP_SSL_ENABLED, PROP_SSL_HOSTNAME_VERIFICATION_ENABLED,
PROP_CONNECTION_TIMEOUT, PROP_SOCKET_TIMEOUT, PROP_RETRY_INITIAL_DELAY, PROP_RETRY_MAX_DELAY, PROP_WAIT_ENABLED, PROP_WAIT_TIMEOUT,
PROP_WAIT_RETRY_ENABLED, PROP_WAIT_RETRY_DELAY);
return Collect.arrayListOf(
PROP_ADDRESS, PROP_DB_INDEX, PROP_USER, PROP_PASSWORD,
PROP_SSL_ENABLED, PROP_SSL_HOSTNAME_VERIFICATION_ENABLED,
PROP_SSL_TRUSTSTORE_PATH, PROP_SSL_TRUSTSTORE_PASSWORD, PROP_SSL_TRUSTSTORE_TYPE,
PROP_SSL_KEYSTORE_PATH, PROP_SSL_KEYSTORE_PASSWORD, PROP_SSL_KEYSTORE_TYPE,
PROP_CONNECTION_TIMEOUT, PROP_SOCKET_TIMEOUT,
PROP_RETRY_INITIAL_DELAY, PROP_RETRY_MAX_DELAY,
PROP_WAIT_ENABLED, PROP_WAIT_TIMEOUT, PROP_WAIT_RETRY_ENABLED, PROP_WAIT_RETRY_DELAY);
}

protected void init(Configuration config) {
address = config.getString(PROP_ADDRESS);
dbIndex = config.getInteger(PROP_DB_INDEX);
user = config.getString(PROP_USER);
password = config.getString(PROP_PASSWORD);

sslEnabled = config.getBoolean(PROP_SSL_ENABLED);
hostnameVerificationEnabled = config.getBoolean(PROP_SSL_HOSTNAME_VERIFICATION_ENABLED);
truststorePath = config.getString(PROP_SSL_TRUSTSTORE_PATH);
truststorePassword = config.getString(PROP_SSL_TRUSTSTORE_PASSWORD);
truststoreType = config.getString(PROP_SSL_TRUSTSTORE_TYPE);
keystorePath = config.getString(PROP_SSL_KEYSTORE_PATH);
keystorePassword = config.getString(PROP_SSL_KEYSTORE_PASSWORD);
keystoreType = config.getString(PROP_SSL_KEYSTORE_TYPE);

initialRetryDelay = config.getInteger(PROP_RETRY_INITIAL_DELAY);
maxRetryDelay = config.getInteger(PROP_RETRY_MAX_DELAY);
Expand Down Expand Up @@ -216,4 +265,27 @@ public long getWaitRetryDelay() {
return waitRetryDelay;
}

public String getTruststorePath() {
return truststorePath;
}

public String getTruststorePassword() {
return truststorePassword;
}

public String getTruststoreType() {
return truststoreType;
}

public String getKeystorePath() {
return keystorePath;
}

public String getKeystorePassword() {
return keystorePassword;
}

public String getKeystoreType() {
return keystoreType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package io.debezium.storage.redis;

import java.io.File;
import java.util.regex.Pattern;

import javax.net.ssl.SSLParameters;
Expand All @@ -19,6 +20,8 @@
import redis.clients.jedis.DefaultJedisClientConfig.Builder;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.SslOptions;
import redis.clients.jedis.SslVerifyMode;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;

Expand All @@ -34,36 +37,38 @@ public class RedisConnection {
public static final String DEBEZIUM_SCHEMA_HISTORY = "debezium:schema_history";
private static final String HOST_PORT_ERROR = "Invalid host:port format in '<...>.redis.address' property.";

private String address;
private int dbIndex;
private String user;
private String password;
private int connectionTimeout;
private int socketTimeout;
private boolean sslEnabled;
private boolean hostnameVerificationEnabled;

/**
*
* @param address
* @param user
* @param password
* @param connectionTimeout
* @param socketTimeout
* @param sslEnabled
*/
public RedisConnection(String address, int dbIndex, String user, String password, int connectionTimeout, int socketTimeout, boolean sslEnabled,
boolean hostnameVerificationEnabled) {
validateHostPort(address);

this.address = address;
this.dbIndex = dbIndex;
this.user = user;
this.password = password;
this.connectionTimeout = connectionTimeout;
this.socketTimeout = socketTimeout;
this.sslEnabled = sslEnabled;
this.hostnameVerificationEnabled = hostnameVerificationEnabled;
private final String address;
private final int dbIndex;
private final String user;
private final String password;
private final int connectionTimeout;
private final int socketTimeout;
private final boolean sslEnabled;
private final boolean hostnameVerificationEnabled;
private final String truststorePath;
private final String truststorePassword;
private final String truststoreType;
private final String keystorePath;
private final String keystorePassword;
private final String keystoreType;

public RedisConnection(RedisCommonConfig config) {
validateHostPort(config.getAddress());

this.address = config.getAddress();
this.dbIndex = config.getDbIndex();
this.user = config.getUser();
this.password = config.getPassword();
this.connectionTimeout = config.getConnectionTimeout();
this.socketTimeout = config.getSocketTimeout();
this.sslEnabled = config.isSslEnabled();
this.hostnameVerificationEnabled = config.isHostnameVerificationEnabled();
this.truststorePath = config.getTruststorePath();
this.truststorePassword = config.getTruststorePassword();
this.truststoreType = config.getTruststoreType();
this.keystorePath = config.getKeystorePath();
this.keystorePassword = config.getKeystorePassword();
this.keystoreType = config.getKeystoreType();
}

/**
Expand Down Expand Up @@ -91,6 +96,32 @@ public RedisClient getRedisClient(String clientName, boolean waitEnabled, long w
.socketTimeoutMillis(this.socketTimeout)
.ssl(this.sslEnabled);

boolean configureSslOptions = this.sslEnabled && (!Strings.isNullOrEmpty(this.truststorePath) ||
!Strings.isNullOrEmpty(this.keystorePath));

// The SslOptions in Jedis override the default SSL context if explicitly configured.
// - When a custom truststore or keystore is provided for the Jedis client, hostname verification
// must also be configured explicitly through the SslOptions.
// - If no custom truststore or keystore is provided, hostname verification will rely on the
// SSLParameters, which use the truststore or keystore specified via system properties.
if (configureSslOptions) {
var tsPasswordRaw = !Strings.isNullOrEmpty(truststorePassword) ? truststorePassword.toCharArray() : null;
var ksPasswordRaw = !Strings.isNullOrEmpty(keystorePassword) ? keystorePassword.toCharArray() : null;
var sslOptions = SslOptions.builder()
.truststore(new File(truststorePath), tsPasswordRaw)
.trustStoreType(truststoreType)
.keystore(new File(keystorePath), ksPasswordRaw)
.keyStoreType(keystoreType)
.sslVerifyMode(hostnameVerificationEnabled ? SslVerifyMode.FULL : SslVerifyMode.CA)
.build();
configBuilder.sslOptions(sslOptions);
} else if (hostnameVerificationEnabled) {
// Enforce strict hostname verification to prevent man-in-the-middle attacks.
var sslParameters = new SSLParameters();
sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
configBuilder.sslParameters(sslParameters);
}

if (!Strings.isNullOrEmpty(this.user)) {
configBuilder = configBuilder.user(this.user);
}
Expand All @@ -99,13 +130,6 @@ public RedisClient getRedisClient(String clientName, boolean waitEnabled, long w
configBuilder = configBuilder.password(this.password);
}

if (hostnameVerificationEnabled) {
// Enforce strict hostname verification to prevent man-in-the-middle attacks.
var sslParameters = new SSLParameters();
sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
configBuilder.sslParameters(sslParameters);
}

client = new Jedis(address, configBuilder.build());

// make sure that client is connected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ public class RedisSchemaHistory extends AbstractSchemaHistory {
private RedisSchemaHistoryConfig config;

void connect() {
RedisConnection redisConnection = new RedisConnection(config.getAddress(), config.getDbIndex(), config.getUser(), config.getPassword(),
config.getConnectionTimeout(), config.getSocketTimeout(), config.isSslEnabled(), config.isHostnameVerificationEnabled());
RedisConnection redisConnection = new RedisConnection(config);
client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_SCHEMA_HISTORY, config.isWaitEnabled(), config.getWaitTimeout(),
config.isWaitRetryEnabled(), config.getWaitRetryDelay());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ public void setRedisClient(RedisClient client) {

void connect() {
closeClient();
RedisConnection redisConnection = new RedisConnection(config.getAddress(), config.getDbIndex(), config.getUser(), config.getPassword(),
config.getConnectionTimeout(), config.getSocketTimeout(), config.isSslEnabled(), config.isHostnameVerificationEnabled());
RedisConnection redisConnection = new RedisConnection(config);
client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME, config.isWaitEnabled(), config.getWaitTimeout(),
config.isWaitRetryEnabled(), config.getWaitRetryDelay());
}
Expand Down