diff --git a/README.md b/README.md index 63f6b2b..40d6e45 100644 --- a/README.md +++ b/README.md @@ -154,7 +154,9 @@ describes the structure of the payload, while the payload contains the actual da ### Change event keys -For each changed table, the change event key payload consists of a single field named `internalid`. +The change event key payload for rowstore tables that have a primary key consists of primary key +fields. +The change event key payload for all other tables consists of a single field named `internalid`. It represents a unique ID assigned to each row in the database. ``` @@ -176,10 +178,10 @@ It represents a unique ID assigned to each row in the database. } ``` -| Item | Field name | Description -|------|------------|------------------------------------------------------------------------------------------------------------------- -| 1 | `schema` | Specifies a Kafka Connect schema that describes the structure of the key's `payload`. -| 2 | `payload` | Contains a single field `internalId` used to specify the ID of the row for which this change event was generated. +| Item | Field name | Description +|------|------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- +| 1 | `schema` | Specifies a Kafka Connect schema that describes the structure of the key's `payload`. +| 2 | `payload` | For rowstore tables that have a primary key, contains primary key fields. For all other tables, contains a single field `internalId` used to specify the ID of the row for which this change event was generated. ### Change event values diff --git a/src/main/java/com/singlestore/debezium/SingleStoreChangeRecordEmitter.java b/src/main/java/com/singlestore/debezium/SingleStoreChangeRecordEmitter.java index af806f7..7bf4c48 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreChangeRecordEmitter.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreChangeRecordEmitter.java @@ -1,5 +1,7 @@ package com.singlestore.debezium; +import com.singlestore.debezium.util.InternalIdUtils; +import io.debezium.relational.Table; import java.util.Objects; import org.apache.kafka.connect.data.Schema; @@ -25,25 +27,25 @@ public class SingleStoreChangeRecordEmitter extends private final Object[] before; private final Object[] after; private final long internalId; - - private static final String INTERNAL_ID = "internalId"; + private final Table table; public SingleStoreChangeRecordEmitter(SingleStorePartition partition, OffsetContext offset, Clock clock, Operation operation, Object[] before, - Object[] after, long internalId, SingleStoreConnectorConfig connectorConfig) { + Object[] after, long internalId, SingleStoreConnectorConfig connectorConfig, Table table) { super(partition, offset, clock, connectorConfig); this.offset = offset; this.operation = operation; this.before = before; this.after = after; this.internalId = internalId; + this.table = table; } @Override protected void emitCreateRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { Object[] newColumnValues = getNewColumnValues(); - Struct newKey = keyFromInternalId(); + Struct newKey = InternalIdUtils.generateKey(table, tableSchema, newColumnValues, internalId); Struct newValue = tableSchema.valueFromColumnData(newColumnValues); Struct envelope = tableSchema.getEnvelopeSchema() .create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant()); @@ -64,7 +66,7 @@ protected void emitUpdateRecord(Receiver receiver, TableSc Object[] oldColumnValues = getOldColumnValues(); Object[] newColumnValues = getNewColumnValues(); - Struct newKey = keyFromInternalId(); + Struct newKey = InternalIdUtils.generateKey(table, tableSchema, newColumnValues, internalId); Struct newValue = tableSchema.valueFromColumnData(newColumnValues); Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues); @@ -99,7 +101,8 @@ protected void emitUpdateRecord(Receiver receiver, TableSc protected void emitDeleteRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { Object[] oldColumnValues = getOldColumnValues(); - Struct newKey = keyFromInternalId(); + Object[] newColumnValues = getNewColumnValues(); + Struct newKey = InternalIdUtils.generateKey(table, tableSchema, newColumnValues, internalId); Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues); @@ -134,11 +137,4 @@ protected Object[] getOldColumnValues() { protected Object[] getNewColumnValues() { return after; } - - private Struct keyFromInternalId() { - Struct result = new Struct( - SchemaBuilder.struct().field(INTERNAL_ID, Schema.INT64_SCHEMA).build()); - result.put(INTERNAL_ID, internalId); - return result; - } } diff --git a/src/main/java/com/singlestore/debezium/SingleStoreConnection.java b/src/main/java/com/singlestore/debezium/SingleStoreConnection.java index f791f94..b7a9b1b 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreConnection.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreConnection.java @@ -7,8 +7,12 @@ import io.debezium.config.Configuration; import io.debezium.jdbc.JdbcConfiguration; import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.Attribute; import io.debezium.relational.ColumnId; import io.debezium.relational.TableId; +import io.debezium.relational.Tables; +import io.debezium.relational.Tables.ColumnNameFilter; +import io.debezium.relational.Tables.TableFilter; import io.debezium.util.Strings; import java.sql.DatabaseMetaData; import java.sql.SQLException; @@ -16,16 +20,22 @@ import java.time.Duration; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@link JdbcConnection} extension to be used with SingleStore */ public class SingleStoreConnection extends JdbcConnection { + private static final Logger LOGGER = LoggerFactory + .getLogger(SingleStoreConnection.class); private static final String QUOTED_CHARACTER = "`"; protected static final String URL_PATTERN = "jdbc:singlestore://${hostname}:${port}/?connectTimeout=${connectTimeout}"; protected static final String URL_PATTERN_DATABASE = "jdbc:singlestore://${hostname}:${port}/${dbname}?connectTimeout=${connectTimeout}"; @@ -38,6 +48,43 @@ public SingleStoreConnection(SingleStoreConnectionConfiguration connectionConfig this.connectionConfig = connectionConfig; } + public boolean isRowstoreTable(TableId tableId) throws SQLException { + AtomicBoolean res = new AtomicBoolean(false); + prepareQuery( + "SELECT storage_type = 'INMEMORY_ROWSTORE' as isRowstore FROM information_schema.tables WHERE table_schema = ? && table_name = ?", + statement -> { + statement.setString(1, tableId.catalog()); + statement.setString(2, tableId.table()); + }, + rs -> { + rs.next(); + res.set(rs.getBoolean(1)); + }); + + return res.get(); + } + + public void addIsRowstoreAttribute(Tables tables) throws SQLException { + for (TableId tableId : tables.tableIds()) { + Attribute isRowstore = Attribute.editor() + .name("IS_ROWSTORE") + .value(isRowstoreTable(tableId)) + .create(); + + tables.updateTable(tableId, table -> + table.edit().addAttribute(isRowstore).create()); + } + } + + @Override + public void readSchema(Tables tables, String databaseCatalog, String schemaNamePattern, + TableFilter tableFilter, ColumnNameFilter columnFilter, boolean removeTablesNotFoundInJdbc) + throws SQLException { + super.readSchema(tables, databaseCatalog, schemaNamePattern, tableFilter, columnFilter, + removeTablesNotFoundInJdbc); + addIsRowstoreAttribute(tables); + } + private static void validateServerVersion(Statement statement) throws SQLException { DatabaseMetaData metaData = statement.getConnection().getMetaData(); int majorVersion = metaData.getDatabaseMajorVersion(); @@ -288,4 +335,10 @@ public CommonConnectorConfig.EventProcessingFailureHandlingMode eventProcessingF return CommonConnectorConfig.EventProcessingFailureHandlingMode.parse(mode); } } + + @Override + protected List readPrimaryKeyOrUniqueIndexNames(DatabaseMetaData metadata, TableId id) + throws SQLException { + return readPrimaryKeyNames(metadata, id); + } } diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDatabaseSchema.java b/src/main/java/com/singlestore/debezium/SingleStoreDatabaseSchema.java index d60e100..a633de0 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDatabaseSchema.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreDatabaseSchema.java @@ -4,7 +4,6 @@ import io.debezium.relational.RelationalDatabaseSchema; import io.debezium.relational.TableId; import io.debezium.relational.TableSchemaBuilder; -import io.debezium.relational.Key.KeyMapper; import io.debezium.spi.topic.TopicNamingStrategy; import java.sql.SQLException; diff --git a/src/main/java/com/singlestore/debezium/SingleStoreSnapshotChangeEventSource.java b/src/main/java/com/singlestore/debezium/SingleStoreSnapshotChangeEventSource.java index 4837bb1..96e13f9 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreSnapshotChangeEventSource.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreSnapshotChangeEventSource.java @@ -328,7 +328,8 @@ private SingleStoreOffsetContext doCreateDataEventsForTable( setSnapshotMarker(offset, firstTable, lastTable, rows == 1, ObserveResultSetUtils.isCommitSnapshot(rs) && numPartitions == 1); dispatcher.dispatchSnapshotEvent(partition, table.id(), - getChangeRecordEmitter(partition, offset, table.id(), row, internalId, + getChangeRecordEmitter(partition, offset, table.id(), table, row, + internalId, sourceTableSnapshotTimestamp), snapshotReceiver); } } @@ -396,11 +397,11 @@ private void updateSnapshotOffset(SingleStoreOffsetContext offset, ResultSet rs) * Returns a {@link ChangeRecordEmitter} producing the change records for the given table row. */ protected ChangeRecordEmitter getChangeRecordEmitter( - SingleStorePartition partition, SingleStoreOffsetContext offset, TableId tableId, + SingleStorePartition partition, SingleStoreOffsetContext offset, TableId tableId, Table table, Object[] row, long internalId, Instant timestamp) { offset.event(tableId, timestamp); return new SingleStoreSnapshotChangeRecordEmitter(partition, offset, row, internalId, - getClock(), connectorConfig); + getClock(), connectorConfig, table); } private Threads.Timer getTableScanLogTimer() { diff --git a/src/main/java/com/singlestore/debezium/SingleStoreSnapshotChangeRecordEmitter.java b/src/main/java/com/singlestore/debezium/SingleStoreSnapshotChangeRecordEmitter.java index 29f2df5..ca7c8da 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreSnapshotChangeRecordEmitter.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreSnapshotChangeRecordEmitter.java @@ -1,9 +1,11 @@ package com.singlestore.debezium; +import com.singlestore.debezium.util.InternalIdUtils; import io.debezium.data.Envelope; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.SnapshotChangeRecordEmitter; +import io.debezium.relational.Table; import io.debezium.relational.TableSchema; import io.debezium.util.Clock; import org.apache.kafka.connect.data.Schema; @@ -13,15 +15,15 @@ public class SingleStoreSnapshotChangeRecordEmitter extends SnapshotChangeRecordEmitter { - private static final String INTERNAL_ID = "internalId"; - private final long internalId; + private final Table table; public SingleStoreSnapshotChangeRecordEmitter(SingleStorePartition partition, OffsetContext offset, Object[] row, long internalId, Clock clock, - RelationalDatabaseConnectorConfig connectorConfig) { + RelationalDatabaseConnectorConfig connectorConfig, Table table) { super(partition, offset, row, clock, connectorConfig); this.internalId = internalId; + this.table = table; } @Override @@ -32,14 +34,8 @@ protected void emitReadRecord(Receiver receiver, TableSche Struct envelope = tableSchema.getEnvelopeSchema() .read(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant()); - receiver.changeRecord(getPartition(), tableSchema, Envelope.Operation.READ, keyFromInternalId(), + receiver.changeRecord(getPartition(), tableSchema, Envelope.Operation.READ, + InternalIdUtils.generateKey(table, tableSchema, newColumnValues, internalId), envelope, getOffset(), null); } - - private Struct keyFromInternalId() { - Struct result = new Struct( - SchemaBuilder.struct().field(INTERNAL_ID, Schema.INT64_SCHEMA).build()); - result.put(INTERNAL_ID, internalId); - return result; - } } diff --git a/src/main/java/com/singlestore/debezium/SingleStoreStreamingChangeEventSource.java b/src/main/java/com/singlestore/debezium/SingleStoreStreamingChangeEventSource.java index 0c00433..85d5fa1 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreStreamingChangeEventSource.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreStreamingChangeEventSource.java @@ -144,7 +144,8 @@ public void run() { null, after, internalId, - connectorConfig)); + connectorConfig, + schema.tableFor(table))); } catch (InterruptedException e) { interrupted[0] = e; break; diff --git a/src/main/java/com/singlestore/debezium/SingleStoreTableSchemaBuilder.java b/src/main/java/com/singlestore/debezium/SingleStoreTableSchemaBuilder.java index a2658a5..8b2f9fa 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreTableSchemaBuilder.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreTableSchemaBuilder.java @@ -1,5 +1,6 @@ package com.singlestore.debezium; +import com.singlestore.debezium.util.InternalIdUtils; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -26,7 +27,6 @@ public class SingleStoreTableSchemaBuilder extends TableSchemaBuilder { - public static final String INTERNAL_ID = "internalId"; private final Boolean populateInternalId; public SingleStoreTableSchemaBuilder(ValueConverterProvider valueConverterProvider, @@ -40,48 +40,21 @@ public SingleStoreTableSchemaBuilder(ValueConverterProvider valueConverterProvid this.populateInternalId = populateInternalId; } - private Schema addInternalId(Schema s) { - SchemaBuilder res = SchemaBuilder.struct(); - for (Field f : s.fields()) { - res.field(f.name(), f.schema()); - } - res.field(INTERNAL_ID, Schema.INT64_SCHEMA); - - return res.optional().build(); - } - - private final List addInternalId(List columns) { - List result = new ArrayList<>(columns); - result.add(Column.editor() - .name(INTERNAL_ID) - .position(result.size() + 1) - .type("INT64") - .autoIncremented(false) - .generated(false) - .optional(false) - .jdbcType(-5) - .nativeType(-1) - .length(19) - .scale(0) - .create()); - - return result; - } - public TableSchema create(TopicNamingStrategy topicNamingStrategy, Table table, ColumnNameFilter filter, ColumnMappers mappers, KeyMapper keysMapper) { TableSchema schema = super.create(topicNamingStrategy, table, filter, mappers, keysMapper); + Schema keySchema = InternalIdUtils.getKeySchema(table, schema); if (!populateInternalId) { return new TableSchema(schema.id(), - SchemaBuilder.struct().field(INTERNAL_ID, Schema.INT64_SCHEMA).build(), + keySchema, (row) -> schema.keyFromColumnData(row), schema.getEnvelopeSchema(), schema.valueSchema(), (row) -> schema.valueFromColumnData(row)); } else { - Schema valSchema = addInternalId(schema.valueSchema()); - List valColumns = addInternalId(table.columns()); + Schema valSchema = InternalIdUtils.addInternalId(schema.valueSchema()); + List valColumns = InternalIdUtils.addInternalId(table.columns()); Envelope envelopeWithoutInternalId = schema.getEnvelopeSchema(); Envelope envelope = Envelope.defineSchema() .withName(envelopeWithoutInternalId.schema().name()) @@ -90,7 +63,7 @@ public TableSchema create(TopicNamingStrategy topicNamingStrategy, Table table, .build(); return new TableSchema(schema.id(), - SchemaBuilder.struct().field(INTERNAL_ID, Schema.INT64_SCHEMA).build(), + keySchema, (row) -> schema.keyFromColumnData(row), envelope, valSchema, diff --git a/src/main/java/com/singlestore/debezium/util/InternalIdUtils.java b/src/main/java/com/singlestore/debezium/util/InternalIdUtils.java new file mode 100644 index 0000000..ac97de9 --- /dev/null +++ b/src/main/java/com/singlestore/debezium/util/InternalIdUtils.java @@ -0,0 +1,76 @@ +package com.singlestore.debezium.util; + +import io.debezium.relational.Column; +import io.debezium.relational.Table; +import io.debezium.relational.TableSchema; +import java.util.ArrayList; +import java.util.List; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; + +public final class InternalIdUtils { + + public static final String INTERNAL_ID = "internalId"; + + public static Schema getKeySchema(Table table, TableSchema schema) { + if (useInternalIdAsKey(table)) { + return SchemaBuilder.struct().field(INTERNAL_ID, Schema.INT64_SCHEMA).build(); + } else { + return schema.keySchema(); + } + } + + private static boolean useInternalIdAsKey(Table table) { + return table.primaryKeyColumnNames().isEmpty() || + !table.attributeWithName("IS_ROWSTORE").asBoolean(); + } + + public static Struct generateKey(Table table, TableSchema tableSchema, Object[] values, + Long internalId) { + if (useInternalIdAsKey(table)) { + return keyFromInternalId(internalId); + } else { + return tableSchema.keyFromColumnData(values); + } + } + + private static Struct keyFromInternalId(Long internalId) { + Struct result = new Struct( + SchemaBuilder.struct().field(INTERNAL_ID, Schema.INT64_SCHEMA).build()); + result.put(INTERNAL_ID, internalId); + return result; + } + + public static Schema addInternalId(Schema s) { + SchemaBuilder res = SchemaBuilder.struct(); + for (Field f : s.fields()) { + res.field(f.name(), f.schema()); + } + res.field(INTERNAL_ID, Schema.INT64_SCHEMA); + + return res.optional().build(); + } + + public static List addInternalId(List columns) { + List result = new ArrayList<>(columns); + result.add(Column.editor() + .name(INTERNAL_ID) + .position(result.size() + 1) + .type("INT64") + .autoIncremented(false) + .generated(false) + .optional(false) + .jdbcType(-5) + .nativeType(-1) + .length(19) + .scale(0) + .create()); + + return result; + } + + private InternalIdUtils() { + } +} diff --git a/src/test/java/com/singlestore/debezium/SnapshotIT.java b/src/test/java/com/singlestore/debezium/SnapshotIT.java index 6408a9c..3b03189 100644 --- a/src/test/java/com/singlestore/debezium/SnapshotIT.java +++ b/src/test/java/com/singlestore/debezium/SnapshotIT.java @@ -238,4 +238,42 @@ public void testSnapshotDelay() throws Exception { stopConnector(); } } + + @Test + public void testPKInRowstore() throws Exception { + try (SingleStoreConnection createTableConn = new SingleStoreConnection( + defaultJdbcConnectionConfigWithTable("product"))) { + createTableConn.execute( + "CREATE ROWSTORE TABLE IF NOT EXISTS pkInRowstoreSnapshot(a INT, b TEXT, c TEXT, PRIMARY KEY(a, b));" + + "DELETE FROM pkInRowstoreSnapshot WHERE 1 = 1;"); + try (SingleStoreConnection conn = new SingleStoreConnection( + defaultJdbcConnectionConfigWithTable("pkInRowstoreSnapshot"))) { + conn.execute("INSERT INTO pkInRowstoreSnapshot VALUES (2, 'd', 'e')"); + conn.execute("INSERT INTO pkInRowstoreSnapshot VALUES (1, 'b', 'c')"); + conn.execute("SNAPSHOT DATABASE " + TEST_DATABASE + ";"); + + Configuration config = defaultJdbcConfigWithTable("pkInRowstoreSnapshot"); + config = config.edit().withDefault(SingleStoreConnectorConfig.COLUMN_INCLUDE_LIST, + "db.pkInRowstoreSnapshot.a,db.pkInRowstoreSnapshot.c").build(); + start(SingleStoreConnector.class, config); + assertConnectorIsRunning(); + try { + List records = consumeRecordsByTopic(2).allRecordsInOrder(); + assertEquals(2, records.size()); + + List keyA = Arrays.asList(2, 1); + List keyB = Arrays.asList("d", "b"); + + for (int i = 0; i < records.size(); i++) { + SourceRecord record = records.get(i); + Struct key = (Struct) record.key(); + assertEquals(key.getInt32("a"), keyA.get(i)); + assertEquals(key.getString("b"), keyB.get(i)); + } + } finally { + stopConnector(); + } + } + } + } } diff --git a/src/test/java/com/singlestore/debezium/StreamingIT.java b/src/test/java/com/singlestore/debezium/StreamingIT.java index 15f0e15..1f624a4 100644 --- a/src/test/java/com/singlestore/debezium/StreamingIT.java +++ b/src/test/java/com/singlestore/debezium/StreamingIT.java @@ -479,4 +479,45 @@ public void testStaleOffset() throws Exception { } } } + + @Test + public void testPKInRowstore() throws Exception { + try (SingleStoreConnection createTableConn = new SingleStoreConnection( + defaultJdbcConnectionConfigWithTable("product"))) { + createTableConn.execute( + "CREATE ROWSTORE TABLE IF NOT EXISTS pkInRowstore(a INT, b TEXT, c TEXT, PRIMARY KEY(a, b));" + + "DELETE FROM pkInRowstore WHERE 1 = 1;"); + try (SingleStoreConnection conn = new SingleStoreConnection( + defaultJdbcConnectionConfigWithTable("pkInRowstore"))) { + Configuration config = defaultJdbcConfigWithTable("pkInRowstore"); + config = config.edit().withDefault(SingleStoreConnectorConfig.COLUMN_INCLUDE_LIST, + "db.pkInRowstore.a,db.pkInRowstore.c").build(); + conn.execute("SNAPSHOT DATABASE " + TEST_DATABASE + ";"); + start(SingleStoreConnector.class, config); + assertConnectorIsRunning(); + waitForStreamingToStart(); + + try { + conn.execute("INSERT INTO pkInRowstore VALUES (2, 'd', 'e')"); + conn.execute("INSERT INTO pkInRowstore VALUES (1, 'b', 'c')"); + conn.execute("DELETE FROM pkInRowstore WHERE a = 1"); + + List records = consumeRecordsByTopic(3).allRecordsInOrder(); + assertEquals(3, records.size()); + + List keyA = Arrays.asList(2, 1, 1); + List keyB = Arrays.asList("d", "b", "b"); + + for (int i = 0; i < records.size(); i++) { + SourceRecord record = records.get(i); + Struct key = (Struct) record.key(); + assertEquals(key.getInt32("a"), keyA.get(i)); + assertEquals(key.getString("b"), keyB.get(i)); + } + } finally { + stopConnector(); + } + } + } + } }