Skip to content

Commit

Permalink
initial work, first test, read jar, parse param
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bzabek committed Nov 5, 2024
1 parent 37745a4 commit 30f6918
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 18 deletions.
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@
<artifactId>snowflake-jdbc</artifactId>
</exclusion>
</exclusions>
<scope>system</scope>
<systemPath>/Users/bzabek/snowflake-kafka-connector/snowflake-ingest-sdk.jar</systemPath>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory;
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.kafka.connect.errors.ConnectException;

/** This class handles all calls to manage the streaming ingestion client */
Expand All @@ -49,8 +48,6 @@ public SnowflakeStreamingIngestClient createClient(
.setProperties(streamingClientProperties.clientProperties)
.setParameterOverrides(streamingClientProperties.parameterOverrides);

setIcebergEnabled(builder, streamingClientProperties.isIcebergEnabled);

SnowflakeStreamingIngestClient createdClient = builder.build();

LOGGER.info(
Expand All @@ -65,17 +62,6 @@ public SnowflakeStreamingIngestClient createClient(
}
}

private static void setIcebergEnabled(
SnowflakeStreamingIngestClientFactory.Builder builder, boolean isIcebergEnabled) {
try {
// TODO reflection should be replaced by proper builder.setIceberg(true) call in SNOW-1728002
FieldUtils.writeField(builder, "isIceberg", isIcebergEnabled, true);
} catch (IllegalAccessException e) {
throw new IllegalStateException(
"Couldn't set iceberg by accessing private field: " + "isIceberg", e);
}
}

/**
* Closes the given client. Swallows any exceptions
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public StreamingClientProperties(Map<String, String> connectorConfig) {

// Override only if the streaming client properties are explicitly set in config
this.parameterOverrides = new HashMap<>();
if (isIcebergEnabled) {
// todo extract to field
this.parameterOverrides.put("enable_iceberg_streaming", "true");
}
Optional<String> snowpipeStreamingMaxClientLag =
Optional.ofNullable(connectorConfig.get(SNOWPIPE_STREAMING_MAX_CLIENT_LAG));
snowpipeStreamingMaxClientLag.ifPresent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ public String mapToColumnType(Schema.Type kafkaType, String schemaName) {
} else {
return "BINARY";
}
case STRUCT:
return "OBJECT()";
case ARRAY:
throw new IllegalArgumentException("Arrays, struct and map not supported!");
default:
// MAP and STRUCT will go here
throw new IllegalArgumentException("Arrays, struct and map not supported!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@
import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord;
import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord.RecordWithMetadata;
import com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord;
import java.util.Collections;
import java.util.List;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand All @@ -35,7 +33,7 @@ protected void createIcebergTable() {

@ParameterizedTest(name = "{0}")
@MethodSource("prepareData")
@Disabled
// @Disabled
void shouldEvolveSchemaAndInsertRecords(
String description, String message, DescribeTableRow[] expectedSchema, boolean withSchema)
throws Exception {
Expand Down Expand Up @@ -81,6 +79,64 @@ void shouldEvolveSchemaAndInsertRecords(
assertRecordsInTable();
}

@ParameterizedTest(name = "{0}")
@MethodSource("prepareData")
// @Disabled
void shouldEvolveSchemaAndInsertRecords_withObjects(
String description, String message, DescribeTableRow[] expectedSchema, boolean withSchema)
throws Exception {
// start off with just one column
List<DescribeTableRow> rows = describeTable(tableName);
assertThat(rows)
.hasSize(1)
.extracting(DescribeTableRow::getColumn)
.contains(Utils.TABLE_COLUMN_METADATA);

SinkRecord record = createKafkaRecord(message, 0, withSchema);
service.insert(Collections.singletonList(record));
waitForOffset(-1);
rows = describeTable(tableName);
assertThat(rows.size()).isEqualTo(9);

// don't check metadata column schema, we have different tests for that
rows =
rows.stream()
.filter(r -> !r.getColumn().equals(Utils.TABLE_COLUMN_METADATA))
.collect(Collectors.toList());

assertThat(rows).containsExactlyInAnyOrder(expectedSchema);

// resend and store same record without any issues now
service.insert(Collections.singletonList(record));
waitForOffset(1);

// and another record with same schema
service.insert(Collections.singletonList(createKafkaRecord(message, 1, withSchema)));
waitForOffset(2);

String testStruct = "{ \"testStruct\": {" + "\"k1\" : 1," + "\"k2\" : 2" + "} " + "}";

// String testStruct =
// "{ \"testStruct\": {" +
// "\"k1\" : { \"nested_key1\" : 1}," +
// "\"k2\" : { \"nested_key2\" : 2}" +
// "} " +
// "}";

// String testStruct =
// "{ \"testStruct\": {" +
// "\"k1\" : { \"car\" : { \"brand\" : \"vw\" } }," +
// "\"k2\" : { \"car\" : { \"brand\" : \"toyota\" } }" +
// "} " +
// "}";
// reinsert record with extra field
service.insert(Collections.singletonList(createKafkaRecord(testStruct, 2, false)));
rows = describeTable(tableName);
// assertThat(rows).hasSize(15);
service.insert(Collections.singletonList(createKafkaRecord(testStruct, 2, false)));
waitForOffset(3);
}

private void assertRecordsInTable() {
List<RecordWithMetadata<PrimitiveJsonRecord>> recordsWithMetadata =
selectAllSchematizedRecords();
Expand Down

0 comments on commit 30f6918

Please sign in to comment.