Skip to content

Commit 255055f

Browse files
SNOW-1055561: Check whether SMT returning null values no longer stops a data ingestion. (#816)
1 parent 20a8b37 commit 255055f

File tree

8 files changed

+212
-184
lines changed

8 files changed

+212
-184
lines changed

src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProvider.java

+47-7
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_DEFAULT;
2121

2222
import com.google.common.annotations.VisibleForTesting;
23+
import com.google.common.base.Preconditions;
2324
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
2425
import com.snowflake.kafka.connector.internal.KCLogger;
2526
import java.util.Map;
27+
import java.util.function.Supplier;
2628
import net.snowflake.ingest.internal.com.github.benmanes.caffeine.cache.Caffeine;
2729
import net.snowflake.ingest.internal.com.github.benmanes.caffeine.cache.LoadingCache;
2830
import net.snowflake.ingest.internal.com.github.benmanes.caffeine.cache.RemovalCause;
@@ -38,18 +40,56 @@
3840
* node with equal {@link StreamingClientProperties} will use the same client
3941
*/
4042
public class StreamingClientProvider {
41-
private static class StreamingClientProviderSingleton {
42-
private static final StreamingClientProvider streamingClientProvider =
43-
new StreamingClientProvider();
44-
}
43+
private static volatile StreamingClientProvider streamingClientProvider = null;
44+
45+
private static Supplier<StreamingClientHandler> clientHandlerSupplier =
46+
DirectStreamingClientHandler::new;
4547

4648
/**
4749
* Gets the current streaming provider
4850
*
4951
* @return The streaming client provider
5052
*/
5153
public static StreamingClientProvider getStreamingClientProviderInstance() {
52-
return StreamingClientProviderSingleton.streamingClientProvider;
54+
if (streamingClientProvider == null) {
55+
synchronized (StreamingClientProvider.class) {
56+
if (streamingClientProvider == null) {
57+
streamingClientProvider = new StreamingClientProvider(clientHandlerSupplier.get());
58+
}
59+
}
60+
}
61+
62+
return streamingClientProvider;
63+
}
64+
65+
/**
66+
* Gets the provider state to pre-initialization state. This method is currently used by the test
67+
* code only.
68+
*/
69+
@VisibleForTesting
70+
public static void reset() {
71+
synchronized (StreamingClientProvider.class) {
72+
streamingClientProvider = null;
73+
clientHandlerSupplier = DirectStreamingClientHandler::new;
74+
}
75+
}
76+
77+
/***
78+
* The method allows for providing custom {@link StreamingClientHandler} to be used by the connector
79+
* instead of the default that is {@link DirectStreamingClientHandler}
80+
*
81+
* This method is currently used by the test code only.
82+
*
83+
* @param streamingClientHandler The handler that will be used by the connector.
84+
*/
85+
@VisibleForTesting
86+
public static void overrideStreamingClientHandler(StreamingClientHandler streamingClientHandler) {
87+
Preconditions.checkState(
88+
streamingClientProvider == null,
89+
"StreamingClientProvider is already initialized and cannot be overridden.");
90+
synchronized (StreamingClientProvider.class) {
91+
clientHandlerSupplier = () -> streamingClientHandler;
92+
}
5393
}
5494

5595
/**
@@ -92,8 +132,8 @@ public static StreamingClientProvider getStreamingClientProviderInstance() {
92132
* When a client is evicted, the cache will try closing the client, however it is best to still
93133
* call close client manually as eviction is executed lazily
94134
*/
95-
private StreamingClientProvider() {
96-
this.streamingClientHandler = new DirectStreamingClientHandler();
135+
private StreamingClientProvider(StreamingClientHandler streamingClientHandler) {
136+
this.streamingClientHandler = streamingClientHandler;
97137
this.registeredClients = buildLoadingCache(this.streamingClientHandler);
98138
}
99139

Original file line numberDiff line numberDiff line change
@@ -1,48 +1,35 @@
11
package com.snowflake.kafka.connector;
22

3-
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS;
43
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC;
4+
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT;
55
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.NAME;
6-
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_DATABASE;
7-
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY;
8-
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA;
9-
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_URL;
10-
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_USER;
116
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
127
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
138
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
149
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
1510
import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
16-
import static org.assertj.core.api.Assertions.assertThat;
17-
import static org.awaitility.Awaitility.await;
1811

19-
import com.snowflake.kafka.connector.fake.SnowflakeFakeSinkConnector;
20-
import com.snowflake.kafka.connector.fake.SnowflakeFakeSinkTask;
21-
import java.time.Duration;
22-
import java.util.HashMap;
23-
import java.util.List;
12+
import com.snowflake.kafka.connector.internal.TestUtils;
13+
import com.snowflake.kafka.connector.internal.streaming.FakeStreamingClientHandler;
14+
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
15+
import com.snowflake.kafka.connector.internal.streaming.StreamingClientProvider;
2416
import java.util.Map;
25-
import org.apache.kafka.connect.runtime.AbstractStatus;
26-
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
27-
import org.apache.kafka.connect.sink.SinkRecord;
2817
import org.apache.kafka.connect.storage.StringConverter;
2918
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
3019
import org.junit.jupiter.api.AfterAll;
3120
import org.junit.jupiter.api.AfterEach;
3221
import org.junit.jupiter.api.BeforeAll;
3322
import org.junit.jupiter.api.BeforeEach;
34-
import org.junit.jupiter.api.Test;
3523
import org.junit.jupiter.api.TestInstance;
3624

3725
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
38-
public class ConnectClusterBaseIT {
26+
class ConnectClusterBaseIT {
3927

40-
protected EmbeddedConnectCluster connectCluster;
28+
EmbeddedConnectCluster connectCluster;
4129

42-
protected static final String TEST_TOPIC = "kafka-int-test";
43-
protected static final String TEST_CONNECTOR_NAME = "test-connector";
44-
protected static final Integer TASK_NUMBER = 1;
45-
private static final Duration CONNECTOR_MAX_STARTUP_TIME = Duration.ofSeconds(20);
30+
FakeStreamingClientHandler fakeStreamingClientHandler;
31+
32+
static final Integer TASK_NUMBER = 1;
4633

4734
@BeforeAll
4835
public void beforeAll() {
@@ -52,14 +39,18 @@ public void beforeAll() {
5239
.numWorkers(3)
5340
.build();
5441
connectCluster.start();
55-
connectCluster.kafka().createTopic(TEST_TOPIC);
56-
connectCluster.configureConnector(TEST_CONNECTOR_NAME, createProperties());
57-
await().timeout(CONNECTOR_MAX_STARTUP_TIME).until(this::isConnectorRunning);
5842
}
5943

6044
@BeforeEach
61-
public void before() {
62-
SnowflakeFakeSinkTask.resetRecords();
45+
public void beforeEach() {
46+
StreamingClientProvider.reset();
47+
fakeStreamingClientHandler = new FakeStreamingClientHandler();
48+
StreamingClientProvider.overrideStreamingClientHandler(fakeStreamingClientHandler);
49+
}
50+
51+
@AfterEach
52+
public void afterEach() {
53+
StreamingClientProvider.reset();
6354
}
6455

6556
@AfterAll
@@ -70,55 +61,30 @@ public void afterAll() {
7061
}
7162
}
7263

73-
@AfterEach
74-
public void after() {
75-
SnowflakeFakeSinkTask.resetRecords();
76-
}
77-
78-
@Test
79-
public void connectorShouldConsumeMessagesFromTopic() {
80-
connectCluster.kafka().produce(TEST_TOPIC, "test1");
81-
connectCluster.kafka().produce(TEST_TOPIC, "test2");
64+
final Map<String, String> defaultProperties(String topicName, String connectorName) {
65+
Map<String, String> config = TestUtils.getConf();
8266

83-
await()
84-
.untilAsserted(
85-
() -> {
86-
List<SinkRecord> records = SnowflakeFakeSinkTask.getRecords();
87-
assertThat(records).hasSize(2);
88-
assertThat(records.stream().map(SinkRecord::value)).containsExactly("test1", "test2");
89-
});
90-
}
91-
92-
protected Map<String, String> createProperties() {
93-
Map<String, String> config = new HashMap<>();
94-
95-
// kafka connect specific
96-
// real connector will be specified with SNOW-1055561
97-
config.put(CONNECTOR_CLASS_CONFIG, SnowflakeFakeSinkConnector.class.getName());
98-
config.put(TOPICS_CONFIG, TEST_TOPIC);
67+
config.put(CONNECTOR_CLASS_CONFIG, SnowflakeSinkConnector.class.getName());
68+
config.put(NAME, connectorName);
69+
config.put(TOPICS_CONFIG, topicName);
70+
config.put(INGESTION_METHOD_OPT, IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
71+
config.put(Utils.SF_ROLE, "testrole_kafka");
72+
config.put(BUFFER_FLUSH_TIME_SEC, "1");
9973
config.put(TASKS_MAX_CONFIG, TASK_NUMBER.toString());
10074
config.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
10175
config.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
10276

103-
// kafka push specific
104-
config.put(NAME, TEST_CONNECTOR_NAME);
105-
config.put(SNOWFLAKE_URL, "https://test.testregion.snowflakecomputing.com:443");
106-
config.put(SNOWFLAKE_USER, "testName");
107-
config.put(SNOWFLAKE_PRIVATE_KEY, "testPrivateKey");
108-
config.put(SNOWFLAKE_DATABASE, "testDbName");
109-
config.put(SNOWFLAKE_SCHEMA, "testSchema");
110-
config.put(BUFFER_COUNT_RECORDS, "1000000");
111-
config.put(BUFFER_FLUSH_TIME_SEC, "1");
112-
11377
return config;
11478
}
11579

116-
private boolean isConnectorRunning() {
117-
ConnectorStateInfo status = connectCluster.connectorStatus(TEST_CONNECTOR_NAME);
118-
return status != null
119-
&& status.connector().state().equals(AbstractStatus.State.RUNNING.toString())
120-
&& status.tasks().size() >= TASK_NUMBER
121-
&& status.tasks().stream()
122-
.allMatch(state -> state.state().equals(AbstractStatus.State.RUNNING.toString()));
80+
final void waitForConnectorRunning(String connectorName) {
81+
try {
82+
connectCluster
83+
.assertions()
84+
.assertConnectorAndAtLeastNumTasksAreRunning(
85+
connectorName, 1, "The connector did not start.");
86+
} catch (InterruptedException e) {
87+
throw new IllegalStateException("The connector is not running");
88+
}
12389
}
12490
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package com.snowflake.kafka.connector;
2+
3+
import static org.apache.kafka.connect.runtime.ConnectorConfig.TRANSFORMS_CONFIG;
4+
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
5+
import static org.assertj.core.api.Assertions.assertThat;
6+
import static org.awaitility.Awaitility.await;
7+
8+
import com.snowflake.kafka.connector.internal.TestUtils;
9+
import java.time.Duration;
10+
import java.util.Map;
11+
import java.util.function.UnaryOperator;
12+
import java.util.stream.Stream;
13+
import org.apache.kafka.connect.json.JsonConverter;
14+
import org.junit.jupiter.api.AfterEach;
15+
import org.junit.jupiter.api.BeforeEach;
16+
import org.junit.jupiter.params.ParameterizedTest;
17+
import org.junit.jupiter.params.provider.CsvSource;
18+
19+
public class SmtIT extends ConnectClusterBaseIT {
20+
21+
private String smtTopic;
22+
private String smtConnector;
23+
24+
@BeforeEach
25+
void before() {
26+
smtTopic = TestUtils.randomTableName();
27+
smtConnector = String.format("%s_connector", smtTopic);
28+
connectCluster.kafka().createTopic(smtTopic);
29+
}
30+
31+
@AfterEach
32+
void after() {
33+
connectCluster.kafka().deleteTopic(smtTopic);
34+
connectCluster.deleteConnector(smtConnector);
35+
}
36+
37+
private Map<String, String> smtProperties(
38+
String smtTopic, String smtConnector, String behaviorOnNull) {
39+
Map<String, String> config = defaultProperties(smtTopic, smtConnector);
40+
41+
config.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
42+
config.put("value.converter.schemas.enable", "false");
43+
config.put("behavior.on.null.values", behaviorOnNull);
44+
45+
config.put(TRANSFORMS_CONFIG, "extractField");
46+
config.put(
47+
"transforms.extractField.type", "org.apache.kafka.connect.transforms.ExtractField$Value");
48+
config.put("transforms.extractField.field", "message");
49+
50+
return config;
51+
}
52+
53+
@ParameterizedTest
54+
@CsvSource({"DEFAULT, 20", "IGNORE, 10"})
55+
void testIfSmtReturningNullsIngestDataCorrectly(String behaviorOnNull, int expectedRecordNumber) {
56+
// given
57+
connectCluster.configureConnector(
58+
smtConnector, smtProperties(smtTopic, smtConnector, behaviorOnNull));
59+
waitForConnectorRunning(smtConnector);
60+
61+
// when
62+
Stream.iterate(0, UnaryOperator.identity())
63+
.limit(10)
64+
.flatMap(v -> Stream.of("{}", "{\"message\":\"value\"}"))
65+
.forEach(message -> connectCluster.kafka().produce(smtTopic, message));
66+
67+
// then
68+
await()
69+
.timeout(Duration.ofSeconds(60))
70+
.untilAsserted(
71+
() -> {
72+
assertThat(fakeStreamingClientHandler.ingestedRows()).hasSize(expectedRecordNumber);
73+
assertThat(fakeStreamingClientHandler.getLatestCommittedOffsetTokensPerChannel())
74+
.hasSize(1)
75+
.containsValue("19");
76+
});
77+
}
78+
}

src/test/java/com/snowflake/kafka/connector/fake/SnowflakeFakeSinkConnector.java

-53
This file was deleted.

0 commit comments

Comments
 (0)