Skip to content

Commit

Permalink
Used PK as key for rowstore tables (#50)
Browse files Browse the repository at this point in the history
In rowstore tables, internalId is not populated when there is a PK.
Thus changed to use PK as record key when rowstore table has it.
Later same changes should be done for columnstore tables.
  • Loading branch information
AdalbertMemSQL authored Jul 4, 2024
1 parent f50bb83 commit 98789a2
Show file tree
Hide file tree
Showing 11 changed files with 243 additions and 67 deletions.
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ describes the structure of the payload, while the payload contains the actual da
### Change event keys
For each changed table, the change event key payload consists of a single field named `internalid`.
The change event key payload for rowstore tables that have a primary key consists of primary key
fields.
The change event key payload for all other tables consists of a single field named `internalid`.
It represents a unique ID assigned to each row in the database.
```
Expand All @@ -176,10 +178,10 @@ It represents a unique ID assigned to each row in the database.
}
```
| Item | Field name | Description
|------|------------|-------------------------------------------------------------------------------------------------------------------
| 1 | `schema` | Specifies a Kafka Connect schema that describes the structure of the key's `payload`.
| 2 | `payload` | Contains a single field `internalId` used to specify the ID of the row for which this change event was generated.
| Item | Field name | Description
|------|------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| 1 | `schema` | Specifies a Kafka Connect schema that describes the structure of the key's `payload`.
| 2 | `payload` | For rowstore tables that have a primary key, contains primary key fields. For all other tables, contains a single field `internalId` used to specify the ID of the row for which this change event was generated.
### Change event values
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.singlestore.debezium;

import com.singlestore.debezium.util.InternalIdUtils;
import io.debezium.relational.Table;
import java.util.Objects;

import org.apache.kafka.connect.data.Schema;
Expand All @@ -25,25 +27,25 @@ public class SingleStoreChangeRecordEmitter extends
private final Object[] before;
private final Object[] after;
private final long internalId;

private static final String INTERNAL_ID = "internalId";
private final Table table;

public SingleStoreChangeRecordEmitter(SingleStorePartition partition, OffsetContext offset,
Clock clock, Operation operation, Object[] before,
Object[] after, long internalId, SingleStoreConnectorConfig connectorConfig) {
Object[] after, long internalId, SingleStoreConnectorConfig connectorConfig, Table table) {
super(partition, offset, clock, connectorConfig);
this.offset = offset;
this.operation = operation;
this.before = before;
this.after = after;
this.internalId = internalId;
this.table = table;
}

@Override
protected void emitCreateRecord(Receiver<SingleStorePartition> receiver, TableSchema tableSchema)
throws InterruptedException {
Object[] newColumnValues = getNewColumnValues();
Struct newKey = keyFromInternalId();
Struct newKey = InternalIdUtils.generateKey(table, tableSchema, newColumnValues, internalId);
Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct envelope = tableSchema.getEnvelopeSchema()
.create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
Expand All @@ -64,7 +66,7 @@ protected void emitUpdateRecord(Receiver<SingleStorePartition> receiver, TableSc
Object[] oldColumnValues = getOldColumnValues();
Object[] newColumnValues = getNewColumnValues();

Struct newKey = keyFromInternalId();
Struct newKey = InternalIdUtils.generateKey(table, tableSchema, newColumnValues, internalId);

Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);
Expand Down Expand Up @@ -99,7 +101,8 @@ protected void emitUpdateRecord(Receiver<SingleStorePartition> receiver, TableSc
protected void emitDeleteRecord(Receiver<SingleStorePartition> receiver, TableSchema tableSchema)
throws InterruptedException {
Object[] oldColumnValues = getOldColumnValues();
Struct newKey = keyFromInternalId();
Object[] newColumnValues = getNewColumnValues();
Struct newKey = InternalIdUtils.generateKey(table, tableSchema, newColumnValues, internalId);

Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);

Expand Down Expand Up @@ -134,11 +137,4 @@ protected Object[] getOldColumnValues() {
protected Object[] getNewColumnValues() {
return after;
}

private Struct keyFromInternalId() {
Struct result = new Struct(
SchemaBuilder.struct().field(INTERNAL_ID, Schema.INT64_SCHEMA).build());
result.put(INTERNAL_ID, internalId);
return result;
}
}
53 changes: 53 additions & 0 deletions src/main/java/com/singlestore/debezium/SingleStoreConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,35 @@
import io.debezium.config.Configuration;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Attribute;
import io.debezium.relational.ColumnId;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.Tables.ColumnNameFilter;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.util.Strings;
import java.sql.DatabaseMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@link JdbcConnection} extension to be used with SingleStore
*/
public class SingleStoreConnection extends JdbcConnection {

private static final Logger LOGGER = LoggerFactory
.getLogger(SingleStoreConnection.class);
private static final String QUOTED_CHARACTER = "`";
protected static final String URL_PATTERN = "jdbc:singlestore://${hostname}:${port}/?connectTimeout=${connectTimeout}";
protected static final String URL_PATTERN_DATABASE = "jdbc:singlestore://${hostname}:${port}/${dbname}?connectTimeout=${connectTimeout}";
Expand All @@ -38,6 +48,43 @@ public SingleStoreConnection(SingleStoreConnectionConfiguration connectionConfig
this.connectionConfig = connectionConfig;
}

public boolean isRowstoreTable(TableId tableId) throws SQLException {
AtomicBoolean res = new AtomicBoolean(false);
prepareQuery(
"SELECT storage_type = 'INMEMORY_ROWSTORE' as isRowstore FROM information_schema.tables WHERE table_schema = ? && table_name = ?",
statement -> {
statement.setString(1, tableId.catalog());
statement.setString(2, tableId.table());
},
rs -> {
rs.next();
res.set(rs.getBoolean(1));
});

return res.get();
}

public void addIsRowstoreAttribute(Tables tables) throws SQLException {
for (TableId tableId : tables.tableIds()) {
Attribute isRowstore = Attribute.editor()
.name("IS_ROWSTORE")
.value(isRowstoreTable(tableId))
.create();

tables.updateTable(tableId, table ->
table.edit().addAttribute(isRowstore).create());
}
}

@Override
public void readSchema(Tables tables, String databaseCatalog, String schemaNamePattern,
TableFilter tableFilter, ColumnNameFilter columnFilter, boolean removeTablesNotFoundInJdbc)
throws SQLException {
super.readSchema(tables, databaseCatalog, schemaNamePattern, tableFilter, columnFilter,
removeTablesNotFoundInJdbc);
addIsRowstoreAttribute(tables);
}

private static void validateServerVersion(Statement statement) throws SQLException {
DatabaseMetaData metaData = statement.getConnection().getMetaData();
int majorVersion = metaData.getDatabaseMajorVersion();
Expand Down Expand Up @@ -288,4 +335,10 @@ public CommonConnectorConfig.EventProcessingFailureHandlingMode eventProcessingF
return CommonConnectorConfig.EventProcessingFailureHandlingMode.parse(mode);
}
}

@Override
protected List<String> readPrimaryKeyOrUniqueIndexNames(DatabaseMetaData metadata, TableId id)
throws SQLException {
return readPrimaryKeyNames(metadata, id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Key.KeyMapper;
import io.debezium.spi.topic.TopicNamingStrategy;

import java.sql.SQLException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,8 @@ private SingleStoreOffsetContext doCreateDataEventsForTable(
setSnapshotMarker(offset, firstTable, lastTable, rows == 1,
ObserveResultSetUtils.isCommitSnapshot(rs) && numPartitions == 1);
dispatcher.dispatchSnapshotEvent(partition, table.id(),
getChangeRecordEmitter(partition, offset, table.id(), row, internalId,
getChangeRecordEmitter(partition, offset, table.id(), table, row,
internalId,
sourceTableSnapshotTimestamp), snapshotReceiver);
}
}
Expand Down Expand Up @@ -396,11 +397,11 @@ private void updateSnapshotOffset(SingleStoreOffsetContext offset, ResultSet rs)
* Returns a {@link ChangeRecordEmitter} producing the change records for the given table row.
*/
protected ChangeRecordEmitter<SingleStorePartition> getChangeRecordEmitter(
SingleStorePartition partition, SingleStoreOffsetContext offset, TableId tableId,
SingleStorePartition partition, SingleStoreOffsetContext offset, TableId tableId, Table table,
Object[] row, long internalId, Instant timestamp) {
offset.event(tableId, timestamp);
return new SingleStoreSnapshotChangeRecordEmitter(partition, offset, row, internalId,
getClock(), connectorConfig);
getClock(), connectorConfig, table);
}

private Threads.Timer getTableScanLogTimer() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.singlestore.debezium;

import com.singlestore.debezium.util.InternalIdUtils;
import io.debezium.data.Envelope;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.SnapshotChangeRecordEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableSchema;
import io.debezium.util.Clock;
import org.apache.kafka.connect.data.Schema;
Expand All @@ -13,15 +15,15 @@
public class SingleStoreSnapshotChangeRecordEmitter extends
SnapshotChangeRecordEmitter<SingleStorePartition> {

private static final String INTERNAL_ID = "internalId";

private final long internalId;
private final Table table;

public SingleStoreSnapshotChangeRecordEmitter(SingleStorePartition partition,
OffsetContext offset, Object[] row, long internalId, Clock clock,
RelationalDatabaseConnectorConfig connectorConfig) {
RelationalDatabaseConnectorConfig connectorConfig, Table table) {
super(partition, offset, row, clock, connectorConfig);
this.internalId = internalId;
this.table = table;
}

@Override
Expand All @@ -32,14 +34,8 @@ protected void emitReadRecord(Receiver<SingleStorePartition> receiver, TableSche
Struct envelope = tableSchema.getEnvelopeSchema()
.read(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());

receiver.changeRecord(getPartition(), tableSchema, Envelope.Operation.READ, keyFromInternalId(),
receiver.changeRecord(getPartition(), tableSchema, Envelope.Operation.READ,
InternalIdUtils.generateKey(table, tableSchema, newColumnValues, internalId),
envelope, getOffset(), null);
}

private Struct keyFromInternalId() {
Struct result = new Struct(
SchemaBuilder.struct().field(INTERNAL_ID, Schema.INT64_SCHEMA).build());
result.put(INTERNAL_ID, internalId);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ public void run() {
null,
after,
internalId,
connectorConfig));
connectorConfig,
schema.tableFor(table)));
} catch (InterruptedException e) {
interrupted[0] = e;
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.singlestore.debezium;

import com.singlestore.debezium.util.InternalIdUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -26,7 +27,6 @@

public class SingleStoreTableSchemaBuilder extends TableSchemaBuilder {

public static final String INTERNAL_ID = "internalId";
private final Boolean populateInternalId;

public SingleStoreTableSchemaBuilder(ValueConverterProvider valueConverterProvider,
Expand All @@ -40,48 +40,21 @@ public SingleStoreTableSchemaBuilder(ValueConverterProvider valueConverterProvid
this.populateInternalId = populateInternalId;
}

private Schema addInternalId(Schema s) {
SchemaBuilder res = SchemaBuilder.struct();
for (Field f : s.fields()) {
res.field(f.name(), f.schema());
}
res.field(INTERNAL_ID, Schema.INT64_SCHEMA);

return res.optional().build();
}

private final List<Column> addInternalId(List<Column> columns) {
List<Column> result = new ArrayList<>(columns);
result.add(Column.editor()
.name(INTERNAL_ID)
.position(result.size() + 1)
.type("INT64")
.autoIncremented(false)
.generated(false)
.optional(false)
.jdbcType(-5)
.nativeType(-1)
.length(19)
.scale(0)
.create());

return result;
}

public TableSchema create(TopicNamingStrategy topicNamingStrategy, Table table,
ColumnNameFilter filter, ColumnMappers mappers, KeyMapper keysMapper) {
TableSchema schema = super.create(topicNamingStrategy, table, filter, mappers, keysMapper);
Schema keySchema = InternalIdUtils.getKeySchema(table, schema);

if (!populateInternalId) {
return new TableSchema(schema.id(),
SchemaBuilder.struct().field(INTERNAL_ID, Schema.INT64_SCHEMA).build(),
keySchema,
(row) -> schema.keyFromColumnData(row),
schema.getEnvelopeSchema(),
schema.valueSchema(),
(row) -> schema.valueFromColumnData(row));
} else {
Schema valSchema = addInternalId(schema.valueSchema());
List<Column> valColumns = addInternalId(table.columns());
Schema valSchema = InternalIdUtils.addInternalId(schema.valueSchema());
List<Column> valColumns = InternalIdUtils.addInternalId(table.columns());
Envelope envelopeWithoutInternalId = schema.getEnvelopeSchema();
Envelope envelope = Envelope.defineSchema()
.withName(envelopeWithoutInternalId.schema().name())
Expand All @@ -90,7 +63,7 @@ public TableSchema create(TopicNamingStrategy topicNamingStrategy, Table table,
.build();

return new TableSchema(schema.id(),
SchemaBuilder.struct().field(INTERNAL_ID, Schema.INT64_SCHEMA).build(),
keySchema,
(row) -> schema.keyFromColumnData(row),
envelope,
valSchema,
Expand Down
Loading

0 comments on commit 98789a2

Please sign in to comment.