|
25 | 25 | import io.confluent.kafka.connect.utils.config.ValidEnum;
|
26 | 26 | import io.confluent.kafka.connect.utils.config.ValidPort;
|
27 | 27 | import io.connect.scylladb.topictotable.TopicConfigs;
|
| 28 | +import org.slf4j.Logger; |
| 29 | +import org.slf4j.LoggerFactory; |
28 | 30 |
|
29 | 31 | /**
|
30 | 32 | * Configuration class for {@link ScyllaDbSinkConnector}.
|
@@ -62,6 +64,10 @@ public class ScyllaDbSinkConnectorConfig extends AbstractConfig {
|
62 | 64 | private static final Pattern TOPIC_KS_TABLE_SETTING_PATTERN =
|
63 | 65 | Pattern.compile("topic\\.([a-zA-Z0-9._-]+)\\.([^.]+|\"[\"]+\")\\.([^.]+|\"[\"]+\")\\.(mapping|consistencyLevel|ttlSeconds|deletesEnabled)$");
|
64 | 66 |
|
| 67 | + private static final String[] TOPIC_WISE_CONFIGS_VALID_SUFFIXES = {".mapping",".consistencyLevel",".ttlSeconds",".deletesEnabled"}; |
| 68 | + |
| 69 | + private static final Logger log = LoggerFactory.getLogger(ScyllaDbSinkConnectorConfig.class); |
| 70 | + |
65 | 71 | static final Set<String> CLIENT_COMPRESSION = ImmutableSet.of("none", "lz4", "snappy");
|
66 | 72 |
|
67 | 73 | static final Set<String> TABLE_COMPRESSORS = ImmutableSet.of("SnappyCompressor", "LZ4Compressor", "DeflateCompressor", "none");
|
@@ -111,8 +117,9 @@ public ScyllaDbSinkConnectorConfig(Map<?, ?> originals) {
|
111 | 117 | Map<String, Map<String, String>> topicWiseConfigsMap = new HashMap<>();
|
112 | 118 | for (final Map.Entry<String, String> entry : ((Map<String, String>) originals).entrySet()) {
|
113 | 119 | final String name2 = entry.getKey();
|
114 |
| - if (name2.startsWith("topic.")) { |
| 120 | + if (name2.startsWith("topic.") && hasTopicWiseConfigSuffix(name2)) { |
115 | 121 | final String topicName = this.tryMatchTopicName(name2);
|
| 122 | + log.debug("Interpreting " + name2 + " as custom TopicWiseConfig for topic " + topicName); |
116 | 123 | final Map<String, String> topicMap = topicWiseConfigsMap.computeIfAbsent(topicName, t -> new HashMap());
|
117 | 124 | topicMap.put(name2.split("\\.")[name2.split("\\.").length - 1], entry.getValue());
|
118 | 125 | }
|
@@ -567,6 +574,13 @@ private String tryMatchTopicName(final String name) {
|
567 | 574 | throw new IllegalArgumentException("The setting: " + name + " does not match topic.keyspace.table nor topic.codec regular expression pattern");
|
568 | 575 | }
|
569 | 576 |
|
| 577 | + private boolean hasTopicWiseConfigSuffix(final String name) { |
| 578 | + for (String suffix : TOPIC_WISE_CONFIGS_VALID_SUFFIXES) { |
| 579 | + if (name.endsWith(suffix)) return true; |
| 580 | + } |
| 581 | + return false; |
| 582 | + } |
| 583 | + |
570 | 584 | private static String[] toStringArray(Object[] arr){
|
571 | 585 | return Arrays.stream(arr).map(Object::toString).toArray(String[]::new);
|
572 | 586 | }
|
|
0 commit comments