From e5171c8923b0ce1834b06601405179e1f7bb711c Mon Sep 17 00:00:00 2001 From: Nikolay Angelov Date: Tue, 13 May 2025 13:02:12 +0300 Subject: [PATCH] Add configuration options to specify custom keystore and truststore for the jedis client in redis sink --- debezium-bom/pom.xml | 2 +- .../storage/redis/RedisCommonConfig.java | 78 ++++++++++++++- .../storage/redis/RedisConnection.java | 98 ++++++++++++------- .../redis/history/RedisSchemaHistory.java | 3 +- .../redis/offset/RedisOffsetBackingStore.java | 3 +- 5 files changed, 139 insertions(+), 45 deletions(-) diff --git a/debezium-bom/pom.xml b/debezium-bom/pom.xml index 033b7d7836f..9a1083ad7f6 100644 --- a/debezium-bom/pom.xml +++ b/debezium-bom/pom.xml @@ -42,7 +42,7 @@ - 4.1.1 + 6.0.0 2.17.241 diff --git a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/RedisCommonConfig.java b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/RedisCommonConfig.java index 5a86eb44abb..716f8fa95f9 100644 --- a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/RedisCommonConfig.java +++ b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/RedisCommonConfig.java @@ -44,6 +44,36 @@ public class RedisCommonConfig { .withDescription("Use SSL for Redis connection") .withDefault(DEFAULT_SSL_ENABLED); + private static final String DEFAULT_TRUSTSTORE_PATH = ""; + private static final Field PROP_SSL_TRUSTSTORE_PATH = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.truststore.path") + .withDescription("Path to the truststore file") + .withDefault(DEFAULT_TRUSTSTORE_PATH); + + private static final String DEFAULT_TRUSTSTORE_PASSWORD = ""; + private static final Field PROP_SSL_TRUSTSTORE_PASSWORD = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.truststore.password") + .withDescription("Password for the truststore") + .withDefault(DEFAULT_TRUSTSTORE_PASSWORD); + + private static final String DEFAULT_TRUSTSTORE_TYPE = "JKS"; + private static final Field PROP_SSL_TRUSTSTORE_TYPE = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.truststore.type") + .withDescription("Type of the truststore (e.g., JKS, PKCS12)") + .withDefault(DEFAULT_TRUSTSTORE_TYPE); + + private static final String DEFAULT_KEYSTORE_PATH = ""; + private static final Field PROP_SSL_KEYSTORE_PATH = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.keystore.path") + .withDescription("Path to the keystore file") + .withDefault(DEFAULT_KEYSTORE_PATH); + + private static final String DEFAULT_KEYSTORE_PASSWORD = ""; + private static final Field PROP_SSL_KEYSTORE_PASSWORD = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.keystore.password") + .withDescription("Password for the keystore") + .withDefault(DEFAULT_KEYSTORE_PASSWORD); + + private static final String DEFAULT_KEYSTORE_TYPE = "JKS"; + private static final Field PROP_SSL_KEYSTORE_TYPE = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.keystore.type") + .withDescription("Type of the keystore (e.g., JKS, PKCS12)") + .withDefault(DEFAULT_KEYSTORE_TYPE); + private static final boolean DEFAULT_HOSTNAME_VERIFICATION = false; private static final Field PROP_SSL_HOSTNAME_VERIFICATION_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.hostname.verification.enabled") .withDescription("Enable hostname verification") @@ -99,8 +129,15 @@ public class RedisCommonConfig { private int dbIndex; private String user; private String password; + private boolean sslEnabled; private boolean hostnameVerificationEnabled; + private String truststorePath; + private String truststorePassword; + private String truststoreType; + private String keystorePath; + private String keystorePassword; + private String keystoreType; private Integer initialRetryDelay; private Integer maxRetryDelay; @@ -126,9 +163,14 @@ public RedisCommonConfig(Configuration config, String prefix) { } protected List getAllConfigurationFields() { - return Collect.arrayListOf(PROP_ADDRESS, PROP_DB_INDEX, PROP_USER, PROP_PASSWORD, PROP_SSL_ENABLED, PROP_SSL_HOSTNAME_VERIFICATION_ENABLED, - PROP_CONNECTION_TIMEOUT, PROP_SOCKET_TIMEOUT, PROP_RETRY_INITIAL_DELAY, PROP_RETRY_MAX_DELAY, PROP_WAIT_ENABLED, PROP_WAIT_TIMEOUT, - PROP_WAIT_RETRY_ENABLED, PROP_WAIT_RETRY_DELAY); + return Collect.arrayListOf( + PROP_ADDRESS, PROP_DB_INDEX, PROP_USER, PROP_PASSWORD, + PROP_SSL_ENABLED, PROP_SSL_HOSTNAME_VERIFICATION_ENABLED, + PROP_SSL_TRUSTSTORE_PATH, PROP_SSL_TRUSTSTORE_PASSWORD, PROP_SSL_TRUSTSTORE_TYPE, + PROP_SSL_KEYSTORE_PATH, PROP_SSL_KEYSTORE_PASSWORD, PROP_SSL_KEYSTORE_TYPE, + PROP_CONNECTION_TIMEOUT, PROP_SOCKET_TIMEOUT, + PROP_RETRY_INITIAL_DELAY, PROP_RETRY_MAX_DELAY, + PROP_WAIT_ENABLED, PROP_WAIT_TIMEOUT, PROP_WAIT_RETRY_ENABLED, PROP_WAIT_RETRY_DELAY); } protected void init(Configuration config) { @@ -136,8 +178,15 @@ protected void init(Configuration config) { dbIndex = config.getInteger(PROP_DB_INDEX); user = config.getString(PROP_USER); password = config.getString(PROP_PASSWORD); + sslEnabled = config.getBoolean(PROP_SSL_ENABLED); hostnameVerificationEnabled = config.getBoolean(PROP_SSL_HOSTNAME_VERIFICATION_ENABLED); + truststorePath = config.getString(PROP_SSL_TRUSTSTORE_PATH); + truststorePassword = config.getString(PROP_SSL_TRUSTSTORE_PASSWORD); + truststoreType = config.getString(PROP_SSL_TRUSTSTORE_TYPE); + keystorePath = config.getString(PROP_SSL_KEYSTORE_PATH); + keystorePassword = config.getString(PROP_SSL_KEYSTORE_PASSWORD); + keystoreType = config.getString(PROP_SSL_KEYSTORE_TYPE); initialRetryDelay = config.getInteger(PROP_RETRY_INITIAL_DELAY); maxRetryDelay = config.getInteger(PROP_RETRY_MAX_DELAY); @@ -216,4 +265,27 @@ public long getWaitRetryDelay() { return waitRetryDelay; } + public String getTruststorePath() { + return truststorePath; + } + + public String getTruststorePassword() { + return truststorePassword; + } + + public String getTruststoreType() { + return truststoreType; + } + + public String getKeystorePath() { + return keystorePath; + } + + public String getKeystorePassword() { + return keystorePassword; + } + + public String getKeystoreType() { + return keystoreType; + } } diff --git a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/RedisConnection.java b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/RedisConnection.java index 52512b8acf5..879a564b6fd 100644 --- a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/RedisConnection.java +++ b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/RedisConnection.java @@ -5,6 +5,7 @@ */ package io.debezium.storage.redis; +import java.io.File; import java.util.regex.Pattern; import javax.net.ssl.SSLParameters; @@ -19,6 +20,8 @@ import redis.clients.jedis.DefaultJedisClientConfig.Builder; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; +import redis.clients.jedis.SslOptions; +import redis.clients.jedis.SslVerifyMode; import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.exceptions.JedisDataException; @@ -34,36 +37,38 @@ public class RedisConnection { public static final String DEBEZIUM_SCHEMA_HISTORY = "debezium:schema_history"; private static final String HOST_PORT_ERROR = "Invalid host:port format in '<...>.redis.address' property."; - private String address; - private int dbIndex; - private String user; - private String password; - private int connectionTimeout; - private int socketTimeout; - private boolean sslEnabled; - private boolean hostnameVerificationEnabled; - - /** - * - * @param address - * @param user - * @param password - * @param connectionTimeout - * @param socketTimeout - * @param sslEnabled - */ - public RedisConnection(String address, int dbIndex, String user, String password, int connectionTimeout, int socketTimeout, boolean sslEnabled, - boolean hostnameVerificationEnabled) { - validateHostPort(address); - - this.address = address; - this.dbIndex = dbIndex; - this.user = user; - this.password = password; - this.connectionTimeout = connectionTimeout; - this.socketTimeout = socketTimeout; - this.sslEnabled = sslEnabled; - this.hostnameVerificationEnabled = hostnameVerificationEnabled; + private final String address; + private final int dbIndex; + private final String user; + private final String password; + private final int connectionTimeout; + private final int socketTimeout; + private final boolean sslEnabled; + private final boolean hostnameVerificationEnabled; + private final String truststorePath; + private final String truststorePassword; + private final String truststoreType; + private final String keystorePath; + private final String keystorePassword; + private final String keystoreType; + + public RedisConnection(RedisCommonConfig config) { + validateHostPort(config.getAddress()); + + this.address = config.getAddress(); + this.dbIndex = config.getDbIndex(); + this.user = config.getUser(); + this.password = config.getPassword(); + this.connectionTimeout = config.getConnectionTimeout(); + this.socketTimeout = config.getSocketTimeout(); + this.sslEnabled = config.isSslEnabled(); + this.hostnameVerificationEnabled = config.isHostnameVerificationEnabled(); + this.truststorePath = config.getTruststorePath(); + this.truststorePassword = config.getTruststorePassword(); + this.truststoreType = config.getTruststoreType(); + this.keystorePath = config.getKeystorePath(); + this.keystorePassword = config.getKeystorePassword(); + this.keystoreType = config.getKeystoreType(); } /** @@ -91,6 +96,32 @@ public RedisClient getRedisClient(String clientName, boolean waitEnabled, long w .socketTimeoutMillis(this.socketTimeout) .ssl(this.sslEnabled); + boolean configureSslOptions = this.sslEnabled && (!Strings.isNullOrEmpty(this.truststorePath) || + !Strings.isNullOrEmpty(this.keystorePath)); + + // The SslOptions in Jedis override the default SSL context if explicitly configured. + // - When a custom truststore or keystore is provided for the Jedis client, hostname verification + // must also be configured explicitly through the SslOptions. + // - If no custom truststore or keystore is provided, hostname verification will rely on the + // SSLParameters, which use the truststore or keystore specified via system properties. + if (configureSslOptions) { + var tsPasswordRaw = !Strings.isNullOrEmpty(truststorePassword) ? truststorePassword.toCharArray() : null; + var ksPasswordRaw = !Strings.isNullOrEmpty(keystorePassword) ? keystorePassword.toCharArray() : null; + var sslOptions = SslOptions.builder() + .truststore(new File(truststorePath), tsPasswordRaw) + .trustStoreType(truststoreType) + .keystore(new File(keystorePath), ksPasswordRaw) + .keyStoreType(keystoreType) + .sslVerifyMode(hostnameVerificationEnabled ? SslVerifyMode.FULL : SslVerifyMode.CA) + .build(); + configBuilder.sslOptions(sslOptions); + } else if (hostnameVerificationEnabled) { + // Enforce strict hostname verification to prevent man-in-the-middle attacks. + var sslParameters = new SSLParameters(); + sslParameters.setEndpointIdentificationAlgorithm("HTTPS"); + configBuilder.sslParameters(sslParameters); + } + if (!Strings.isNullOrEmpty(this.user)) { configBuilder = configBuilder.user(this.user); } @@ -99,13 +130,6 @@ public RedisClient getRedisClient(String clientName, boolean waitEnabled, long w configBuilder = configBuilder.password(this.password); } - if (hostnameVerificationEnabled) { - // Enforce strict hostname verification to prevent man-in-the-middle attacks. - var sslParameters = new SSLParameters(); - sslParameters.setEndpointIdentificationAlgorithm("HTTPS"); - configBuilder.sslParameters(sslParameters); - } - client = new Jedis(address, configBuilder.build()); // make sure that client is connected diff --git a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/history/RedisSchemaHistory.java b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/history/RedisSchemaHistory.java index 44ffb911492..ae2c55e94a8 100644 --- a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/history/RedisSchemaHistory.java +++ b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/history/RedisSchemaHistory.java @@ -55,8 +55,7 @@ public class RedisSchemaHistory extends AbstractSchemaHistory { private RedisSchemaHistoryConfig config; void connect() { - RedisConnection redisConnection = new RedisConnection(config.getAddress(), config.getDbIndex(), config.getUser(), config.getPassword(), - config.getConnectionTimeout(), config.getSocketTimeout(), config.isSslEnabled(), config.isHostnameVerificationEnabled()); + RedisConnection redisConnection = new RedisConnection(config); client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_SCHEMA_HISTORY, config.isWaitEnabled(), config.getWaitTimeout(), config.isWaitRetryEnabled(), config.getWaitRetryDelay()); } diff --git a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/offset/RedisOffsetBackingStore.java b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/offset/RedisOffsetBackingStore.java index 384dffe5b6c..4a254979003 100644 --- a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/offset/RedisOffsetBackingStore.java +++ b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/offset/RedisOffsetBackingStore.java @@ -47,8 +47,7 @@ public void setRedisClient(RedisClient client) { void connect() { closeClient(); - RedisConnection redisConnection = new RedisConnection(config.getAddress(), config.getDbIndex(), config.getUser(), config.getPassword(), - config.getConnectionTimeout(), config.getSocketTimeout(), config.isSslEnabled(), config.isHostnameVerificationEnabled()); + RedisConnection redisConnection = new RedisConnection(config); client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME, config.isWaitEnabled(), config.getWaitTimeout(), config.isWaitRetryEnabled(), config.getWaitRetryDelay()); }