From 1dcffc707e76ad1ba18c191fe933f689c3e52548 Mon Sep 17 00:00:00 2001 From: Dariusz Seweryn Date: Fri, 20 Dec 2024 11:19:47 +0100 Subject: [PATCH 1/2] SNOW-1859651 Salt prefix for files mapped from different topics --- .../com/snowflake/kafka/connector/Utils.java | 59 +++++++++++++++++-- .../connector/internal/FileNameUtils.java | 4 +- .../internal/SnowflakeSinkServiceV1.java | 45 +++++++------- .../snowflake/kafka/connector/UtilsTest.java | 31 ++++++++++ 4 files changed, 110 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/Utils.java b/src/main/java/com/snowflake/kafka/connector/Utils.java index 3480de9d2..2abeefaf7 100644 --- a/src/main/java/com/snowflake/kafka/connector/Utils.java +++ b/src/main/java/com/snowflake/kafka/connector/Utils.java @@ -430,6 +430,29 @@ public static boolean isSnowpipeStreamingIngestion(Map config) { return !isSnowpipeIngestion(config); } + /** + * Class for returned GeneratedName. isNameFromMap equal to True indicates that the name was + * resolved by using the map passed to appropriate function. {@link + * Utils#generateTableName(String, Map)} + */ + public static class GeneratedName { + public final String name; + public final boolean isNameFromMap; + + private GeneratedName(String name, boolean isNameFromMap) { + this.name = name; + this.isNameFromMap = isNameFromMap; + } + + private static GeneratedName fromMap(String name) { + return new GeneratedName(name, true); + } + + private static GeneratedName generated(String name) { + return new GeneratedName(name, false); + } + } + /** * modify invalid application name in config and return the generated application name * @@ -438,7 +461,7 @@ public static boolean isSnowpipeStreamingIngestion(Map config) { public static void convertAppName(Map config) { String appName = config.getOrDefault(SnowflakeSinkConnectorConfig.NAME, ""); // If appName is empty the following call will throw error - String validAppName = generateValidName(appName, new HashMap()); + String validAppName = generateValidName(appName, new HashMap<>()); config.put(SnowflakeSinkConnectorConfig.NAME, validAppName); } @@ -454,6 +477,20 @@ public static String tableName(String topic, Map topic2table) { return generateValidName(topic, topic2table); } + /** + * Verify topic name and generate a valid table name. The returned GeneratedName has a flag + * isNameFromMap that indicates if the name was retrieved from the passed topic2table map which + * has particular outcomes for the SnowflakeSinkServiceV1 + * + * @param topic input topic name + * @param topic2table topic to table map + * @return return GeneratedName with valid table name and a flag whether the name was taken from + * the topic2table + */ + public static GeneratedName generateTableName(String topic, Map topic2table) { + return generateValidNameFromMap(topic, topic2table); + } + /** * verify topic name, and generate valid table/application name * @@ -462,23 +499,35 @@ public static String tableName(String topic, Map topic2table) { * @return valid table/application name */ public static String generateValidName(String topic, Map topic2table) { + return generateValidNameFromMap(topic, topic2table).name; + } + + /** + * verify topic name, and generate valid table/application name + * + * @param topic input topic name + * @param topic2table topic to table map + * @return valid generated table/application name + */ + private static GeneratedName generateValidNameFromMap( + String topic, Map topic2table) { final String PLACE_HOLDER = "_"; if (topic == null || topic.isEmpty()) { throw SnowflakeErrors.ERROR_0020.getException("topic name: " + topic); } if (topic2table.containsKey(topic)) { - return topic2table.get(topic); + return GeneratedName.fromMap(topic2table.get(topic)); } // try matching regex tables for (String regexTopic : topic2table.keySet()) { if (topic.matches(regexTopic)) { - return topic2table.get(regexTopic); + return GeneratedName.fromMap(topic2table.get(regexTopic)); } } if (Utils.isValidSnowflakeObjectIdentifier(topic)) { - return topic; + return GeneratedName.generated(topic); } int hash = Math.abs(topic.hashCode()); @@ -507,7 +556,7 @@ public static String generateValidName(String topic, Map topic2t result.append(PLACE_HOLDER); result.append(hash); - return result.toString(); + return GeneratedName.generated(result.toString()); } public static Map parseTopicToTableMap(String input) { diff --git a/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java index ce594f033..2b10ddaef 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java @@ -66,11 +66,11 @@ private static BigInteger calculatePartitionPart(String topic, int partition) { BigInteger partitionPart = BigInteger.valueOf(partition); if (!Strings.isNullOrEmpty(topic)) { // if topic is provided as part of the file prefix, - // 1. lets calculate stable hash code out of it, + // 1. let's calculate stable hash code out of it, // 2. bit shift it by 16 bits left, // 3. add 0x8000 (light up 15th bit as a marker) // 4. add partition id (which should in production use cases never reach a value above 5.000 - // partitions pers topic). + // partitions per topic). // In theory - we would support 32767 partitions, which is more than any reasonable value for // a single topic byte[] bytes = topic.toUpperCase().getBytes(StandardCharsets.UTF_8); diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java index 16f027636..26471d504 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java @@ -1,7 +1,6 @@ package com.snowflake.kafka.connector.internal; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED; -import static com.snowflake.kafka.connector.config.TopicToTableModeExtractor.determineTopic2TableMode; import static com.snowflake.kafka.connector.internal.FileNameUtils.searchForMissingOffsets; import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_RECORD_COUNT; import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SIZE_BYTES; @@ -126,11 +125,19 @@ class SnowflakeSinkServiceV1 implements SnowflakeSinkService { * Create new ingestion task from existing table and stage, tries to reuse existing pipe and * recover previous task, otherwise, create a new pipe. * - * @param tableName destination table name in Snowflake + * @param ignoredTableName destination table name in Snowflake. Is ignored and recalculated to + * accommodate proper cleaning of staged files. * @param topicPartition TopicPartition passed from Kafka */ @Override - public void startPartition(final String tableName, final TopicPartition topicPartition) { + public void startPartition(final String ignoredTableName, final TopicPartition topicPartition) { + Utils.GeneratedName generatedTableName = + Utils.generateTableName(topicPartition.topic(), topic2TableMap); + final String tableName = generatedTableName.name; + if (!tableName.equals(ignoredTableName)) { + LOGGER.warn( + "tableNames do not match: original={}, recalculated={}", ignoredTableName, tableName); + } String stageName = Utils.stageName(conn.getConnectorName(), tableName); String nameIndex = getNameIndex(topicPartition.topic(), topicPartition.partition()); if (pipes.containsKey(nameIndex)) { @@ -142,7 +149,7 @@ public void startPartition(final String tableName, final TopicPartition topicPar pipes.put( nameIndex, new ServiceContext( - tableName, + generatedTableName, stageName, pipeName, topicPartition.topic(), @@ -486,7 +493,7 @@ private class ServiceContext { private boolean forceCleanerFileReset = false; private ServiceContext( - String tableName, + Utils.GeneratedName generatedTableName, String stageName, String pipeName, String topicName, @@ -494,32 +501,26 @@ private ServiceContext( int partition, ScheduledExecutorService v2CleanerExecutor) { this.pipeName = pipeName; - this.tableName = tableName; + this.tableName = generatedTableName.name; this.stageName = stageName; this.conn = conn; this.fileNames = new LinkedList<>(); this.cleanerFileNames = new LinkedList<>(); this.buffer = new SnowpipeBuffer(); this.ingestionService = conn.buildIngestService(stageName, pipeName); - // SNOW-1642799 = if multiple topics load data into single table, we need to ensure prefix is - // unique per table - otherwise, file cleaners for different channels may run into race - // condition - TopicToTableModeExtractor.Topic2TableMode mode = - determineTopic2TableMode(topic2TableMap, topicName); - if (mode == TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE - && !enableStageFilePrefixExtension) { + // SNOW-1642799 = if multiple topics load data into single table, we need to ensure the file + // prefix is unique per topic - otherwise, file cleaners for different topics will try to + // clean the same prefixed files creating a race condition and a potential to delete + // not yet ingested files created by another topic + if (generatedTableName.isNameFromMap && !enableStageFilePrefixExtension) { LOGGER.warn( - "The table {} is used as ingestion target by multiple topics - including this one" - + " '{}'.\n" - + "To prevent potential data loss consider setting" - + " '" - + SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED - + "' to true", + "The table {} may be used as ingestion target by multiple topics - including this one" + + " '{}'.\nTo prevent potential data loss consider setting '{}' to true", + tableName, topicName, - tableName); + SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED); } - if (mode == TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE - && enableStageFilePrefixExtension) { + if (generatedTableName.isNameFromMap && enableStageFilePrefixExtension) { this.prefix = FileNameUtils.filePrefix(conn.getConnectorName(), tableName, topicName, partition); } else { diff --git a/src/test/java/com/snowflake/kafka/connector/UtilsTest.java b/src/test/java/com/snowflake/kafka/connector/UtilsTest.java index 4a21ee177..7e26323e7 100644 --- a/src/test/java/com/snowflake/kafka/connector/UtilsTest.java +++ b/src/test/java/com/snowflake/kafka/connector/UtilsTest.java @@ -112,6 +112,37 @@ public void testTableName() { assert Utils.tableName(topic, topic2table).equals("_12345_" + Math.abs(topic.hashCode())); } + @Test + public void testGenerateTableName() { + Map topic2table = Utils.parseTopicToTableMap("ab@cd:abcd, 1234:_1234"); + + String topic0 = "ab@cd"; + Utils.GeneratedName generatedTableName1 = Utils.generateTableName(topic0, topic2table); + Assert.assertEquals("abcd", generatedTableName1.name); + Assert.assertTrue(generatedTableName1.isNameFromMap); + + String topic1 = "1234"; + Utils.GeneratedName generatedTableName2 = Utils.generateTableName(topic1, topic2table); + Assert.assertEquals("_1234", generatedTableName2.name); + Assert.assertTrue(generatedTableName2.isNameFromMap); + + String topic2 = "bc*def"; + Utils.GeneratedName generatedTableName3 = Utils.generateTableName(topic2, topic2table); + Assert.assertEquals("bc_def_" + Math.abs(topic2.hashCode()), generatedTableName3.name); + Assert.assertFalse(generatedTableName3.isNameFromMap); + + String topic3 = "12345"; + Utils.GeneratedName generatedTableName4 = Utils.generateTableName(topic3, topic2table); + Assert.assertEquals("_12345_" + Math.abs(topic3.hashCode()), generatedTableName4.name); + Assert.assertFalse(generatedTableName4.isNameFromMap); + + TestUtils.assertError( + SnowflakeErrors.ERROR_0020, () -> Utils.generateTableName("", topic2table)); + //noinspection DataFlowIssue + TestUtils.assertError( + SnowflakeErrors.ERROR_0020, () -> Utils.generateTableName(null, topic2table)); + } + @Test public void testTableNameRegex() { String catTable = "cat_table"; From 9e42f22267559b13fe73d3844749235d8eaf5b02 Mon Sep 17 00:00:00 2001 From: Dariusz Seweryn Date: Fri, 20 Dec 2024 14:14:44 +0100 Subject: [PATCH 2/2] Fix tests and address comment --- .../com/snowflake/kafka/connector/Utils.java | 14 +++++++--- .../internal/SnowflakeSinkServiceV1.java | 27 ++++++++++--------- .../snowflake/kafka/connector/UtilsTest.java | 16 +++++------ 3 files changed, 34 insertions(+), 23 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/Utils.java b/src/main/java/com/snowflake/kafka/connector/Utils.java index 2abeefaf7..2a472bafb 100644 --- a/src/main/java/com/snowflake/kafka/connector/Utils.java +++ b/src/main/java/com/snowflake/kafka/connector/Utils.java @@ -436,8 +436,8 @@ public static boolean isSnowpipeStreamingIngestion(Map config) { * Utils#generateTableName(String, Map)} */ public static class GeneratedName { - public final String name; - public final boolean isNameFromMap; + private final String name; + private final boolean isNameFromMap; private GeneratedName(String name, boolean isNameFromMap) { this.name = name; @@ -448,9 +448,17 @@ private static GeneratedName fromMap(String name) { return new GeneratedName(name, true); } - private static GeneratedName generated(String name) { + public static GeneratedName generated(String name) { return new GeneratedName(name, false); } + + public String getName() { + return name; + } + + public boolean isNameFromMap() { + return isNameFromMap; + } } /** diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java index 26471d504..24869a88d 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java @@ -125,18 +125,21 @@ class SnowflakeSinkServiceV1 implements SnowflakeSinkService { * Create new ingestion task from existing table and stage, tries to reuse existing pipe and * recover previous task, otherwise, create a new pipe. * - * @param ignoredTableName destination table name in Snowflake. Is ignored and recalculated to - * accommodate proper cleaning of staged files. + * @param tableName destination table name in Snowflake * @param topicPartition TopicPartition passed from Kafka */ @Override - public void startPartition(final String ignoredTableName, final TopicPartition topicPartition) { + public void startPartition(final String tableName, final TopicPartition topicPartition) { Utils.GeneratedName generatedTableName = Utils.generateTableName(topicPartition.topic(), topic2TableMap); - final String tableName = generatedTableName.name; - if (!tableName.equals(ignoredTableName)) { + if (!tableName.equals(generatedTableName.getName())) { LOGGER.warn( - "tableNames do not match: original={}, recalculated={}", ignoredTableName, tableName); + "tableNames do not match, this is acceptable in tests but not in production! Resorting to" + + " originalName and assuming no potential clashes on file prefixes. original={}," + + " recalculated={}", + tableName, + generatedTableName.getName()); + generatedTableName = Utils.GeneratedName.generated(tableName); } String stageName = Utils.stageName(conn.getConnectorName(), tableName); String nameIndex = getNameIndex(topicPartition.topic(), topicPartition.partition()); @@ -501,7 +504,7 @@ private ServiceContext( int partition, ScheduledExecutorService v2CleanerExecutor) { this.pipeName = pipeName; - this.tableName = generatedTableName.name; + this.tableName = generatedTableName.getName(); this.stageName = stageName; this.conn = conn; this.fileNames = new LinkedList<>(); @@ -512,7 +515,7 @@ private ServiceContext( // prefix is unique per topic - otherwise, file cleaners for different topics will try to // clean the same prefixed files creating a race condition and a potential to delete // not yet ingested files created by another topic - if (generatedTableName.isNameFromMap && !enableStageFilePrefixExtension) { + if (generatedTableName.isNameFromMap() && !enableStageFilePrefixExtension) { LOGGER.warn( "The table {} may be used as ingestion target by multiple topics - including this one" + " '{}'.\nTo prevent potential data loss consider setting '{}' to true", @@ -520,11 +523,11 @@ private ServiceContext( topicName, SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED); } - if (generatedTableName.isNameFromMap && enableStageFilePrefixExtension) { + { + final String topicForPrefix = + generatedTableName.isNameFromMap() && enableStageFilePrefixExtension ? topicName : ""; this.prefix = - FileNameUtils.filePrefix(conn.getConnectorName(), tableName, topicName, partition); - } else { - this.prefix = FileNameUtils.filePrefix(conn.getConnectorName(), tableName, "", partition); + FileNameUtils.filePrefix(conn.getConnectorName(), tableName, topicForPrefix, partition); } this.processedOffset = new AtomicLong(-1); this.flushedOffset = new AtomicLong(-1); diff --git a/src/test/java/com/snowflake/kafka/connector/UtilsTest.java b/src/test/java/com/snowflake/kafka/connector/UtilsTest.java index 7e26323e7..41e434154 100644 --- a/src/test/java/com/snowflake/kafka/connector/UtilsTest.java +++ b/src/test/java/com/snowflake/kafka/connector/UtilsTest.java @@ -118,23 +118,23 @@ public void testGenerateTableName() { String topic0 = "ab@cd"; Utils.GeneratedName generatedTableName1 = Utils.generateTableName(topic0, topic2table); - Assert.assertEquals("abcd", generatedTableName1.name); - Assert.assertTrue(generatedTableName1.isNameFromMap); + Assert.assertEquals("abcd", generatedTableName1.getName()); + Assert.assertTrue(generatedTableName1.isNameFromMap()); String topic1 = "1234"; Utils.GeneratedName generatedTableName2 = Utils.generateTableName(topic1, topic2table); - Assert.assertEquals("_1234", generatedTableName2.name); - Assert.assertTrue(generatedTableName2.isNameFromMap); + Assert.assertEquals("_1234", generatedTableName2.getName()); + Assert.assertTrue(generatedTableName2.isNameFromMap()); String topic2 = "bc*def"; Utils.GeneratedName generatedTableName3 = Utils.generateTableName(topic2, topic2table); - Assert.assertEquals("bc_def_" + Math.abs(topic2.hashCode()), generatedTableName3.name); - Assert.assertFalse(generatedTableName3.isNameFromMap); + Assert.assertEquals("bc_def_" + Math.abs(topic2.hashCode()), generatedTableName3.getName()); + Assert.assertFalse(generatedTableName3.isNameFromMap()); String topic3 = "12345"; Utils.GeneratedName generatedTableName4 = Utils.generateTableName(topic3, topic2table); - Assert.assertEquals("_12345_" + Math.abs(topic3.hashCode()), generatedTableName4.name); - Assert.assertFalse(generatedTableName4.isNameFromMap); + Assert.assertEquals("_12345_" + Math.abs(topic3.hashCode()), generatedTableName4.getName()); + Assert.assertFalse(generatedTableName4.isNameFromMap()); TestUtils.assertError( SnowflakeErrors.ERROR_0020, () -> Utils.generateTableName("", topic2table));