From 7506b3da07014e8ef22d6b05c24822464ecdb51b Mon Sep 17 00:00:00 2001 From: Parag Date: Mon, 17 Oct 2022 20:43:15 +0530 Subject: [PATCH] CCMSG-1613 | Use the configured schema.generation.key.name and schema.generation.value.name while creating the schema's. (#204) --- .../spooldir/AbstractSchemaGenerator.java | 4 +-- .../spooldir/JsonSchemaGeneratorTest.java | 32 +++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSchemaGenerator.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSchemaGenerator.java index 04b253c..7cdc80b 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSchemaGenerator.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSchemaGenerator.java @@ -191,7 +191,7 @@ public Map.Entry generate(File inputFile, List keyFields log.trace("generate() - Building key schema."); SchemaBuilder keySchemaBuilder = SchemaBuilder.struct() - .name("com.github.jcustenborder.kafka.connect.model.Key"); + .name(this.config.schemaGenerationKeyName); for (String keyFieldName : keyFields) { log.trace("generate() - Adding keyFieldName field '{}'", keyFieldName); @@ -205,7 +205,7 @@ public Map.Entry generate(File inputFile, List keyFields log.trace("generate() - Building value schema."); SchemaBuilder valueSchemaBuilder = SchemaBuilder.struct() - .name("com.github.jcustenborder.kafka.connect.model.Value"); + .name(this.config.schemaGenerationValueName); for (Map.Entry kvp : fieldTypes.entrySet()) { addField(valueSchemaBuilder, kvp.getKey(), kvp.getValue()); diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/JsonSchemaGeneratorTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/JsonSchemaGeneratorTest.java index 65c1318..be46ce0 100644 --- a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/JsonSchemaGeneratorTest.java +++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/JsonSchemaGeneratorTest.java @@ -15,6 +15,7 @@ */ package com.github.jcustenborder.kafka.connect.spooldir; +import java.util.HashMap; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.junit.jupiter.api.Test; @@ -57,4 +58,35 @@ public void schema() throws IOException { } + @Test + public void schemaWithCustomSchemaName() throws IOException { + File inputFile = new File("src/test/resources/com/github/jcustenborder/kafka/connect/spooldir/json/FieldsMatch.data"); + Map configs = new HashMap<>(settings); + configs.put(AbstractSpoolDirSourceConnectorConfig.SCHEMA_GENERATION_KEY_NAME_CONF, "com.foo.key"); + configs.put(AbstractSpoolDirSourceConnectorConfig.SCHEMA_GENERATION_VALUE_NAME_CONF, "com.foo.value"); + JsonSchemaGenerator schemaGenerator = new JsonSchemaGenerator(configs); + Map.Entry kvp = schemaGenerator.generate(inputFile, Arrays.asList("id")); + final Schema expectedKeySchema = SchemaBuilder.struct() + .name("com.foo.key") + .field("id", Schema.OPTIONAL_STRING_SCHEMA) + .build(); + + final Schema expectedValueSchema = SchemaBuilder.struct() + .name("com.foo.value") + .field("id", Schema.OPTIONAL_STRING_SCHEMA) + .field("first_name", Schema.OPTIONAL_STRING_SCHEMA) + .field("last_name", Schema.OPTIONAL_STRING_SCHEMA) + .field("email", Schema.OPTIONAL_STRING_SCHEMA) + .field("gender", Schema.OPTIONAL_STRING_SCHEMA) + .field("ip_address", Schema.OPTIONAL_STRING_SCHEMA) + .field("last_login", Schema.OPTIONAL_STRING_SCHEMA) + .field("account_balance", Schema.OPTIONAL_STRING_SCHEMA) + .field("country", Schema.OPTIONAL_STRING_SCHEMA) + .field("favorite_color", Schema.OPTIONAL_STRING_SCHEMA) + .build(); + + assertSchema(expectedKeySchema, kvp.getKey(), "key schema does not match."); + assertSchema(expectedValueSchema, kvp.getValue(), "value schema does not match."); + } + }