Skip to content

Commit

Permalink
Added connection attributes (#39)
Browse files Browse the repository at this point in the history
* Added connection attributes

* Incremented version
  • Loading branch information
AdalbertMemSQL authored Apr 1, 2024
1 parent 9c3bd54 commit 295f35f
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 26 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Ensure that the following are installed:
* Kafka Connect

### Installation
* Download the [SingleStore connector for Debezium plugin archive](https://github.com/singlestore-labs/singlestore-debezium-connector/releases/download/v0.1.1/singlestore-debezium-connector-0.1.1-plugin.tar.gz).
* Download the [SingleStore connector for Debezium plugin archive](https://github.com/singlestore-labs/singlestore-debezium-connector/releases/download/v0.1.2/singlestore-debezium-connector-0.1.2-plugin.tar.gz).
* Extract the archive to a directory.
* Add the directory from the previous step to Kafka Connect’s plugin path.
Set the `plugin.path` property in the `connect-distributed.properties` file.
Expand Down Expand Up @@ -160,7 +160,7 @@ The following example demonstrates the value of a change event that the connecto
"a":33
},
"source":{
"version":"0.1.1",
"version":"0.1.2",
"connector":"singlestore",
"name":"singlestore",
"ts_ms":1706197043473,
Expand Down Expand Up @@ -191,7 +191,7 @@ The following example shows an update change event that the connector captures f
"a":22
},
"source":{
"version":"0.1.1",
"version":"0.1.2",
"connector":"singlestore",
"name":"singlestore",
"ts_ms":1706197446500,
Expand Down Expand Up @@ -227,7 +227,7 @@ The following example shows a delete event for the table that is shown in the pr
"before":null,
"after":null,
"source":{
"version":"0.1.1",
"version":"0.1.2",
"connector":"singlestore",
"name":"singlestore",
"ts_ms":1706197665407,
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.singlestore</groupId>
<artifactId>singlestore-debezium-connector</artifactId>
<version>0.1.1</version>
<version>0.1.2</version>

<properties>
<maven.compiler.source>11</maven.compiler.source>
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/com/singlestore/debezium/Module.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@
* Information about this module.
*/
public class Module {
private static final Properties INFO = IoUtil.loadProperties(Module.class, "com/singlestore/debezium/build.version");

private static final Properties INFO = IoUtil.loadProperties(Module.class,
"com/singlestore/debezium/build.version");

public static String version() {
return INFO.getProperty("version");
}

public static String debeziumVersion() {
return INFO.getProperty("debezium.version");
}

/**
* @return symbolic name of the connector plugin
*/
Expand Down
38 changes: 20 additions & 18 deletions src/main/java/com/singlestore/debezium/SingleStoreConnection.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.singlestore.debezium;


import static io.debezium.config.CommonConnectorConfig.DATABASE_CONFIG_PREFIX;
import static io.debezium.config.CommonConnectorConfig.DRIVER_CONFIG_PREFIX;

Expand Down Expand Up @@ -52,27 +51,27 @@ private static void validateServerVersion(Statement statement) throws SQLExcepti
/**
* Executes OBSERVE query for CDC output stream events.
*
* @param tableFilter tables filter to observe
* @param tableFilter tables filter to observe
* @param resultSetConsumer the consumer of the query results
* @return this object for chaining methods together
* @throws SQLException if there is an error connecting to the database or executing the
* statements
* statements
*/
public JdbcConnection observe(Set<TableId> tableFilter, ResultSetConsumer resultSetConsumer)
throws SQLException {
public JdbcConnection observe(Set<TableId> tableFilter,
ResultSetConsumer resultSetConsumer) throws SQLException {
return observe(null, tableFilter, Optional.empty(), Optional.empty(), Optional.empty(),
Optional.empty(), resultSetConsumer);
}

/**
* Executes OBSERVE query for CDC output stream events.
*
* @param fieldFilter columns filter to observe
* @param tableFilter tables filter to observe
* @param fieldFilter columns filter to observe
* @param tableFilter tables filter to observe
* @param resultSetConsumer the consumer of the query results
* @return this object for chaining methods together
* @throws SQLException if there is an error connecting to the database or executing the
* statements
* statements
*/
public JdbcConnection observe(Set<ColumnId> fieldFilter, Set<TableId> tableFilter,
ResultSetConsumer resultSetConsumer) throws SQLException {
Expand All @@ -83,16 +82,16 @@ public JdbcConnection observe(Set<ColumnId> fieldFilter, Set<TableId> tableFilte
/**
* Executes OBSERVE query for CDC output stream events.
*
* @param fieldFilter columns filter to observe
* @param tableFilter tables filter to observe
* @param format output format(SQL | JSON)
* @param outputConfig FS <FsConfig> | S3 <S3Config> | GCS <GCSConfig>
* @param offSetConfig offset config (<offset> | NULL),+ // # of partitions
* @param recordFilter filter on record metadata or content
* @param fieldFilter columns filter to observe
* @param tableFilter tables filter to observe
* @param format output format(SQL | JSON)
* @param outputConfig FS <FsConfig> | S3 <S3Config> | GCS <GCSConfig>
* @param offSetConfig offset config (<offset> | NULL),+ // # of partitions
* @param recordFilter filter on record metadata or content
* @param resultSetConsumer the consumer of the query results
* @return this object for chaining methods together
* @throws SQLException if there is an error connecting to the database or executing the
* statements
* statements
*/
public JdbcConnection observe(Set<ColumnId> fieldFilter, Set<TableId> tableFilter,
Optional<OBSERVE_OUTPUT_FORMAT> format,
Expand Down Expand Up @@ -179,6 +178,9 @@ public SingleStoreConnectionConfiguration(Configuration config) {
.with("sslMode", sslMode().getValue())
.with("defaultFetchSize", 1)
.with("tinyInt1IsBit", "false")
.with("connectionAttributes", String.format(
"_connector_name:%s,_connector_version:%s,_product_version:%s",
"SingleStore Debezium Connector", Module.version(), Module.debeziumVersion()))
.without("parameters");
if (useSSL) {
if (!Strings.isNullOrBlank(sslTrustStore())) {
Expand Down Expand Up @@ -276,9 +278,9 @@ public Duration getConnectionTimeout() {
public Map<String, String> driverParameters() {
final String driverParametersString = config
.getString(SingleStoreConnectorConfig.DRIVER_PARAMETERS);
return driverParametersString == null ? Collections.emptyMap() :
Arrays.stream(driverParametersString.split(";"))
.map(s -> s.split("=")).collect(Collectors.toMap(s -> s[0].trim(), s -> s[1].trim()));
return driverParametersString == null ? Collections.emptyMap() : Arrays.stream(
driverParametersString.split(";"))
.map(s -> s.split("=")).collect(Collectors.toMap(s -> s[0].trim(), s -> s[1].trim()));
}

public CommonConnectorConfig.EventProcessingFailureHandlingMode eventProcessingFailureHandlingMode() {
Expand Down
3 changes: 2 additions & 1 deletion src/main/resources/com/singlestore/debezium/build.version
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
version=${project.version}
version=${project.version}
debezium.version=${version.debezium}
2 changes: 1 addition & 1 deletion src/test/java/com/singlestore/debezium/StreamingIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public void populatesSourceInfo() throws SQLException, InterruptedException {
SourceRecord record = records.get(0);

Struct source = (Struct) ((Struct) record.value()).get("source");
assertEquals(source.get("version"), "0.1.1");
assertEquals(source.get("version"), "0.1.2");
assertEquals(source.get("connector"), "singlestore");
assertEquals(source.get("name"), "singlestore_topic");
assertNotNull(source.get("ts_ms"));
Expand Down

0 comments on commit 295f35f

Please sign in to comment.