diff --git a/README.md b/README.md index 5965de3..23fc680 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ Ensure that the following are installed: * Kafka Connect ### Installation - * Download the [SingleStore connector for Debezium plugin archive](https://github.com/singlestore-labs/singlestore-debezium-connector/releases/download/v0.1.1/singlestore-debezium-connector-0.1.1-plugin.tar.gz). + * Download the [SingleStore connector for Debezium plugin archive](https://github.com/singlestore-labs/singlestore-debezium-connector/releases/download/v0.1.2/singlestore-debezium-connector-0.1.2-plugin.tar.gz). * Extract the archive to a directory. * Add the directory from the previous step to Kafka Connect’s plugin path. Set the `plugin.path` property in the `connect-distributed.properties` file. @@ -160,7 +160,7 @@ The following example demonstrates the value of a change event that the connecto "a":33 }, "source":{ - "version":"0.1.1", + "version":"0.1.2", "connector":"singlestore", "name":"singlestore", "ts_ms":1706197043473, @@ -191,7 +191,7 @@ The following example shows an update change event that the connector captures f "a":22 }, "source":{ - "version":"0.1.1", + "version":"0.1.2", "connector":"singlestore", "name":"singlestore", "ts_ms":1706197446500, @@ -227,7 +227,7 @@ The following example shows a delete event for the table that is shown in the pr "before":null, "after":null, "source":{ - "version":"0.1.1", + "version":"0.1.2", "connector":"singlestore", "name":"singlestore", "ts_ms":1706197665407, diff --git a/pom.xml b/pom.xml index 368beb5..7ca1aaf 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.singlestore singlestore-debezium-connector - 0.1.1 + 0.1.2 11 diff --git a/src/main/java/com/singlestore/debezium/Module.java b/src/main/java/com/singlestore/debezium/Module.java index 3bc4fb1..82a2bff 100644 --- a/src/main/java/com/singlestore/debezium/Module.java +++ b/src/main/java/com/singlestore/debezium/Module.java @@ -8,12 +8,18 @@ * Information about this module. */ public class Module { - private static final Properties INFO = IoUtil.loadProperties(Module.class, "com/singlestore/debezium/build.version"); + + private static final Properties INFO = IoUtil.loadProperties(Module.class, + "com/singlestore/debezium/build.version"); public static String version() { return INFO.getProperty("version"); } + public static String debeziumVersion() { + return INFO.getProperty("debezium.version"); + } + /** * @return symbolic name of the connector plugin */ diff --git a/src/main/java/com/singlestore/debezium/SingleStoreConnection.java b/src/main/java/com/singlestore/debezium/SingleStoreConnection.java index 924f945..f38cb38 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreConnection.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreConnection.java @@ -1,6 +1,5 @@ package com.singlestore.debezium; - import static io.debezium.config.CommonConnectorConfig.DATABASE_CONFIG_PREFIX; import static io.debezium.config.CommonConnectorConfig.DRIVER_CONFIG_PREFIX; @@ -52,14 +51,14 @@ private static void validateServerVersion(Statement statement) throws SQLExcepti /** * Executes OBSERVE query for CDC output stream events. * - * @param tableFilter tables filter to observe + * @param tableFilter tables filter to observe * @param resultSetConsumer the consumer of the query results * @return this object for chaining methods together * @throws SQLException if there is an error connecting to the database or executing the - * statements + * statements */ - public JdbcConnection observe(Set tableFilter, ResultSetConsumer resultSetConsumer) - throws SQLException { + public JdbcConnection observe(Set tableFilter, + ResultSetConsumer resultSetConsumer) throws SQLException { return observe(null, tableFilter, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), resultSetConsumer); } @@ -67,12 +66,12 @@ public JdbcConnection observe(Set tableFilter, ResultSetConsumer result /** * Executes OBSERVE query for CDC output stream events. * - * @param fieldFilter columns filter to observe - * @param tableFilter tables filter to observe + * @param fieldFilter columns filter to observe + * @param tableFilter tables filter to observe * @param resultSetConsumer the consumer of the query results * @return this object for chaining methods together * @throws SQLException if there is an error connecting to the database or executing the - * statements + * statements */ public JdbcConnection observe(Set fieldFilter, Set tableFilter, ResultSetConsumer resultSetConsumer) throws SQLException { @@ -83,16 +82,16 @@ public JdbcConnection observe(Set fieldFilter, Set tableFilte /** * Executes OBSERVE query for CDC output stream events. * - * @param fieldFilter columns filter to observe - * @param tableFilter tables filter to observe - * @param format output format(SQL | JSON) - * @param outputConfig FS | S3 | GCS - * @param offSetConfig offset config ( | NULL),+ // # of partitions - * @param recordFilter filter on record metadata or content + * @param fieldFilter columns filter to observe + * @param tableFilter tables filter to observe + * @param format output format(SQL | JSON) + * @param outputConfig FS | S3 | GCS + * @param offSetConfig offset config ( | NULL),+ // # of partitions + * @param recordFilter filter on record metadata or content * @param resultSetConsumer the consumer of the query results * @return this object for chaining methods together * @throws SQLException if there is an error connecting to the database or executing the - * statements + * statements */ public JdbcConnection observe(Set fieldFilter, Set tableFilter, Optional format, @@ -179,6 +178,9 @@ public SingleStoreConnectionConfiguration(Configuration config) { .with("sslMode", sslMode().getValue()) .with("defaultFetchSize", 1) .with("tinyInt1IsBit", "false") + .with("connectionAttributes", String.format( + "_connector_name:%s,_connector_version:%s,_product_version:%s", + "SingleStore Debezium Connector", Module.version(), Module.debeziumVersion())) .without("parameters"); if (useSSL) { if (!Strings.isNullOrBlank(sslTrustStore())) { @@ -276,9 +278,9 @@ public Duration getConnectionTimeout() { public Map driverParameters() { final String driverParametersString = config .getString(SingleStoreConnectorConfig.DRIVER_PARAMETERS); - return driverParametersString == null ? Collections.emptyMap() : - Arrays.stream(driverParametersString.split(";")) - .map(s -> s.split("=")).collect(Collectors.toMap(s -> s[0].trim(), s -> s[1].trim())); + return driverParametersString == null ? Collections.emptyMap() : Arrays.stream( + driverParametersString.split(";")) + .map(s -> s.split("=")).collect(Collectors.toMap(s -> s[0].trim(), s -> s[1].trim())); } public CommonConnectorConfig.EventProcessingFailureHandlingMode eventProcessingFailureHandlingMode() { diff --git a/src/main/resources/com/singlestore/debezium/build.version b/src/main/resources/com/singlestore/debezium/build.version index e5683df..b763cfb 100644 --- a/src/main/resources/com/singlestore/debezium/build.version +++ b/src/main/resources/com/singlestore/debezium/build.version @@ -1 +1,2 @@ -version=${project.version} \ No newline at end of file +version=${project.version} +debezium.version=${version.debezium} diff --git a/src/test/java/com/singlestore/debezium/StreamingIT.java b/src/test/java/com/singlestore/debezium/StreamingIT.java index 42160d6..6c7ed7e 100644 --- a/src/test/java/com/singlestore/debezium/StreamingIT.java +++ b/src/test/java/com/singlestore/debezium/StreamingIT.java @@ -158,7 +158,7 @@ public void populatesSourceInfo() throws SQLException, InterruptedException { SourceRecord record = records.get(0); Struct source = (Struct) ((Struct) record.value()).get("source"); - assertEquals(source.get("version"), "0.1.1"); + assertEquals(source.get("version"), "0.1.2"); assertEquals(source.get("connector"), "singlestore"); assertEquals(source.get("name"), "singlestore_topic"); assertNotNull(source.get("ts_ms"));