Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #5044] Data synchronization strong verification in mariadb gtid mode #5045

Merged
merged 7 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class CanalSinkConfig extends SinkConfig {

private boolean isGTIDMode = true;

private boolean isMariaDB = true;

// skip sink process exception
private Boolean skipException = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class CanalSourceConfig extends SourceConfig {

private String serverUUID;

private boolean isMariaDB = true;

private boolean isGTIDMode = true;

private Integer batchSize = 10000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,11 @@ private Exception doCall() {
}
JdbcTemplate template = dbDialect.getJdbcTemplate();
String sourceGtid = context.getGtid();
if (StringUtils.isNotEmpty(sourceGtid)) {
String setGtid = "SET @@session.gtid_next = '" + sourceGtid + "';";
template.execute(setGtid);
if (StringUtils.isNotEmpty(sourceGtid) && !sinkConfig.isMariaDB()) {
String setMySQLGtid = "SET @@session.gtid_next = '" + sourceGtid + "';";
template.execute(setMySQLGtid);
} else if (StringUtils.isNotEmpty(sourceGtid) && sinkConfig.isMariaDB()) {
throw new RuntimeException("unsupport gtid mode for mariaDB");
} else {
log.error("gtid is empty in gtid mode");
throw new RuntimeException("gtid is empty in gtid mode");
Expand Down Expand Up @@ -510,8 +512,13 @@ public void setValues(PreparedStatement ps) throws SQLException {
});

// reset gtid
String resetGtid = "SET @@session.gtid_next = AUTOMATIC;";
dbDialect.getJdbcTemplate().execute(resetGtid);
if (sinkConfig.isMariaDB()) {
throw new RuntimeException("unsupport gtid mode for mariaDB");
} else {
String resetMySQLGtid = "SET @@session.gtid_next = 'AUTOMATIC';";
dbDialect.getJdbcTemplate().execute(resetMySQLGtid);
}

error = null;
exeResult = ExecuteResult.SUCCESS;
} catch (DeadlockLoserDataAccessException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
public class EntryParser {

public static Map<Long, List<CanalConnectRecord>> parse(CanalSourceConfig sourceConfig, List<Entry> datas,
RdbTableMgr tables) {
RdbTableMgr tables) {
List<CanalConnectRecord> recordList = new ArrayList<>();
List<Entry> transactionDataBuffer = new ArrayList<>();
// need check weather the entry is loopback
Expand All @@ -60,9 +60,9 @@ public static Map<Long, List<CanalConnectRecord>> parse(CanalSourceConfig source
switch (entry.getEntryType()) {
case ROWDATA:
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
if (sourceConfig.getServerUUID() != null && sourceConfig.isGTIDMode()) {
String currentGtid = entry.getHeader().getPropsList().get(0).getValue();
if (currentGtid.contains(sourceConfig.getServerUUID())) {
// don't support gtid for mariadb
if (sourceConfig.getServerUUID() != null && sourceConfig.isGTIDMode() && !sourceConfig.isMariaDB()) {
if (checkGtidForEntry(entry, sourceConfig)) {
transactionDataBuffer.add(entry);
}
} else {
Expand Down Expand Up @@ -90,9 +90,14 @@ public static Map<Long, List<CanalConnectRecord>> parse(CanalSourceConfig source
return recordMap;
}

private static boolean checkGtidForEntry(Entry entry, CanalSourceConfig sourceConfig) {
String currentGtid = entry.getHeader().getPropsList().get(0).getValue();
return currentGtid.contains(sourceConfig.getServerUUID());
}

private static void parseRecordListWithEntryBuffer(CanalSourceConfig sourceConfig,
List<CanalConnectRecord> recordList,
List<Entry> transactionDataBuffer, RdbTableMgr tables) {
List<CanalConnectRecord> recordList,
List<Entry> transactionDataBuffer, RdbTableMgr tables) {
for (Entry bufferEntry : transactionDataBuffer) {
List<CanalConnectRecord> recordParsedList = internParse(sourceConfig, bufferEntry, tables);
if (CollectionUtils.isEmpty(recordParsedList)) {
Expand Down Expand Up @@ -130,7 +135,7 @@ private static Column getColumnIgnoreCase(List<Column> columns, String columName
}

private static List<CanalConnectRecord> internParse(CanalSourceConfig sourceConfig, Entry entry,
RdbTableMgr tableMgr) {
RdbTableMgr tableMgr) {
String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
if (tableMgr.getTable(schemaName, tableName) == null) {
Expand Down Expand Up @@ -169,7 +174,7 @@ private static List<CanalConnectRecord> internParse(CanalSourceConfig sourceConf
}

private static CanalConnectRecord internParse(CanalSourceConfig canalSourceConfig, Entry entry,
RowChange rowChange, RowData rowData) {
RowChange rowChange, RowData rowData) {
CanalConnectRecord canalConnectRecord = new CanalConnectRecord();
canalConnectRecord.setTableName(entry.getHeader().getTableName());
canalConnectRecord.setSchemaName(entry.getHeader().getSchemaName());
Expand All @@ -179,10 +184,16 @@ private static CanalConnectRecord internParse(CanalSourceConfig canalSourceConfi
canalConnectRecord.setBinLogOffset(entry.getHeader().getLogfileOffset());
// if enabled gtid mode, gtid not null
if (canalSourceConfig.isGTIDMode()) {
String currentGtid = entry.getHeader().getPropsList().get(0).getValue();
String gtidRange = replaceGtidRange(entry.getHeader().getGtid(), currentGtid, canalSourceConfig.getServerUUID());
canalConnectRecord.setGtid(gtidRange);
canalConnectRecord.setCurrentGtid(currentGtid);
if (canalSourceConfig.isMariaDB()) {
String currentGtid = entry.getHeader().getGtid();
canalConnectRecord.setGtid(currentGtid);
canalConnectRecord.setCurrentGtid(currentGtid);
} else {
String currentGtid = entry.getHeader().getPropsList().get(0).getValue();
String gtidRange = replaceGtidRange(entry.getHeader().getGtid(), currentGtid, canalSourceConfig.getServerUUID());
canalConnectRecord.setGtid(gtidRange);
canalConnectRecord.setCurrentGtid(currentGtid);
}
}

EventType eventType = canalConnectRecord.getEventType();
Expand Down Expand Up @@ -276,7 +287,7 @@ public static String replaceGtidRange(String gtid, String currentGtid, String se
}

private static void checkUpdateKeyColumns(Map<String, EventColumn> oldKeyColumns,
Map<String, EventColumn> keyColumns) {
Map<String, EventColumn> keyColumns) {
if (oldKeyColumns.isEmpty()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,16 @@ private Canal buildCanal(CanalSourceConfig sourceConfig) {
recordPositionMap.put("journalName", canalRecordPartition.getJournalName());
recordPositionMap.put("timestamp", canalRecordPartition.getTimeStamp());
recordPositionMap.put("position", canalRecordOffset.getOffset());
String gtidRange = canalRecordOffset.getGtid();
if (gtidRange != null) {
if (canalRecordOffset.getCurrentGtid() != null) {
gtidRange = EntryParser.replaceGtidRange(canalRecordOffset.getGtid(), canalRecordOffset.getCurrentGtid(),
sourceConfig.getServerUUID());
// for mariaDB not support gtid mode
if (sourceConfig.isGTIDMode() && !sourceConfig.isMariaDB()) {
String gtidRange = canalRecordOffset.getGtid();
if (gtidRange != null) {
if (canalRecordOffset.getCurrentGtid() != null) {
gtidRange = EntryParser.replaceGtidRange(canalRecordOffset.getGtid(), canalRecordOffset.getCurrentGtid(),
sourceConfig.getServerUUID());
}
recordPositionMap.put("gtid", gtidRange);
}
recordPositionMap.put("gtid", gtidRange);
}
positions.add(JsonUtils.toJSONString(recordPositionMap));
});
Expand Down
Loading