-
Notifications
You must be signed in to change notification settings - Fork 99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add connector version in RECORD_METADATA #766
base: master
Are you sure you want to change the base?
Changes from 4 commits
09c0bd6
f345bfa
81b39c8
055bf1b
1270dc6
41967cb
a2209be
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,6 +38,12 @@ public class MetaColumnTest { | |
put(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_TOPIC, "false"); | ||
} | ||
}; | ||
private HashMap<String, String> versionConfig = | ||
new HashMap<String, String>() { | ||
{ | ||
put(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_SF_CONNECTOR_VERSION, "false"); | ||
} | ||
}; | ||
private HashMap<String, String> offsetAndPartitionConfig = | ||
new HashMap<String, String>() { | ||
{ | ||
|
@@ -113,6 +119,18 @@ public void testConfig() throws IOException { | |
assert result.get(META).has(RecordService.OFFSET); | ||
assert result.get(META).has(RecordService.PARTITION); | ||
assert result.get(META).has(record.timestampType().name); | ||
assert result.get(META).has(RecordService.SF_CONNECTOR_VERSION); | ||
|
||
// test metadata configuration -- remove version | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is for default, can you add a test for what happens when you add true to the metadata version? Also, once you are done, if you dont mind, I would like to add an e2e test here to verify if this works fine. Then we can push the code. (I will open another PR where you will be seen as co-author) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just updated the test - something to note: Checking for default value is a bit different than rest of the metadata fields as they are enabled by default. Here I'm explicitly checking the value of sf connector version config to be "true" for it to enable, it's disabled by default. So this test already tests "what happens when you add true to the metadata version" |
||
metadataConfig = new SnowflakeMetadataConfig(versionConfig); | ||
service.setMetadataConfig(metadataConfig); | ||
result = mapper.readTree(service.getProcessedRecordForSnowpipe(record)); | ||
assert result.has(META); | ||
assert !result.get(META).has(RecordService.SF_CONNECTOR_VERSION); | ||
assert result.get(META).has(RecordService.OFFSET); | ||
assert result.get(META).has(RecordService.PARTITION); | ||
assert result.get(META).has(record.timestampType().name); | ||
assert result.get(META).has(RecordService.TOPIC); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. something doesn add up in this test. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right. There was a bug and had to update the test. Fixed it in 1270dc6 |
||
|
||
// test metadata configuration -- remove offset and partition | ||
metadataConfig = new SnowflakeMetadataConfig(offsetAndPartitionConfig); | ||
|
@@ -123,6 +141,7 @@ public void testConfig() throws IOException { | |
assert !result.get(META).has(RecordService.PARTITION); | ||
assert result.get(META).has(record.timestampType().name); | ||
assert result.get(META).has(RecordService.TOPIC); | ||
assert result.get(META).has(RecordService.SF_CONNECTOR_VERSION); | ||
|
||
// test metadata configuration -- remove time stamp | ||
metadataConfig = new SnowflakeMetadataConfig(createTimeConfig); | ||
|
@@ -131,6 +150,7 @@ public void testConfig() throws IOException { | |
assert result.has(META); | ||
assert !result.get(META).has(record.timestampType().name); | ||
assert result.get(META).has(RecordService.TOPIC); | ||
assert result.get(META).has(RecordService.SF_CONNECTOR_VERSION); | ||
assert result.get(META).has(RecordService.OFFSET); | ||
assert result.get(META).has(RecordService.PARTITION); | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -264,7 +264,10 @@ public void testSchematizationArrayOfObject() throws JsonProcessingException { | |
service.setEnableSchematization(true); | ||
String value = | ||
"{\"players\":[{\"name\":\"John Doe\",\"age\":30},{\"name\":\"Jane Doe\",\"age\":30}]}"; | ||
byte[] valueContents = (value).getBytes(StandardCharsets.UTF_8); | ||
String value2 = | ||
"{\"cricket\":{\"team\":{\"MI\":{\"players\":[{\"name\":\"John" | ||
+ " Doe\",\"age\":30},{\"name\":\"Jane Doe\",\"age\":30}]}}}}"; | ||
byte[] valueContents = (value2).getBytes(StandardCharsets.UTF_8); | ||
Comment on lines
+267
to
+270
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you need this? |
||
SchemaAndValue sv = jsonConverter.toConnectData(topic, valueContents); | ||
|
||
SinkRecord record = | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set this to true only when SNOWFLAKE_METADATA_SF_CONNECTOR_VERSION is true right? default is false now!