Skip to content

Commit

Permalink
[ISSUE #5044] Data synchronization strong verification in mariadb gti…
Browse files Browse the repository at this point in the history
…d mode (#5045)

* [ISSUE #5040] Support gtid mode for sync data with mysql

* fix conflicts with master

* fix checkstyle error

* [ISSUE #5044] Data synchronization strong verification in mariadb gtid mode

* fix checkstyle error
  • Loading branch information
xwm1992 committed Jul 26, 2024
1 parent b3a42e1 commit 294e4c2
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class CanalSinkConfig extends SinkConfig {

private boolean isGTIDMode = true;

private boolean isMariaDB = true;

// skip sink process exception
private Boolean skipException = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class CanalSourceConfig extends SourceConfig {

private String serverUUID;

private boolean isMariaDB = true;

private boolean isGTIDMode = true;

private Integer batchSize = 10000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,11 @@ private Exception doCall() {
}
JdbcTemplate template = dbDialect.getJdbcTemplate();
String sourceGtid = context.getGtid();
if (StringUtils.isNotEmpty(sourceGtid)) {
String setGtid = "SET @@session.gtid_next = '" + sourceGtid + "';";
template.execute(setGtid);
if (StringUtils.isNotEmpty(sourceGtid) && !sinkConfig.isMariaDB()) {
String setMySQLGtid = "SET @@session.gtid_next = '" + sourceGtid + "';";
template.execute(setMySQLGtid);
} else if (StringUtils.isNotEmpty(sourceGtid) && sinkConfig.isMariaDB()) {
throw new RuntimeException("unsupport gtid mode for mariaDB");
} else {
log.error("gtid is empty in gtid mode");
throw new RuntimeException("gtid is empty in gtid mode");
Expand Down Expand Up @@ -510,8 +512,13 @@ public void setValues(PreparedStatement ps) throws SQLException {
});

// reset gtid
String resetGtid = "SET @@session.gtid_next = AUTOMATIC;";
dbDialect.getJdbcTemplate().execute(resetGtid);
if (sinkConfig.isMariaDB()) {
throw new RuntimeException("unsupport gtid mode for mariaDB");
} else {
String resetMySQLGtid = "SET @@session.gtid_next = 'AUTOMATIC';";
dbDialect.getJdbcTemplate().execute(resetMySQLGtid);
}

error = null;
exeResult = ExecuteResult.SUCCESS;
} catch (DeadlockLoserDataAccessException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
public class EntryParser {

public static Map<Long, List<CanalConnectRecord>> parse(CanalSourceConfig sourceConfig, List<Entry> datas,
RdbTableMgr tables) {
RdbTableMgr tables) {
List<CanalConnectRecord> recordList = new ArrayList<>();
List<Entry> transactionDataBuffer = new ArrayList<>();
// need check weather the entry is loopback
Expand All @@ -60,9 +60,9 @@ public static Map<Long, List<CanalConnectRecord>> parse(CanalSourceConfig source
switch (entry.getEntryType()) {
case ROWDATA:
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
if (sourceConfig.getServerUUID() != null && sourceConfig.isGTIDMode()) {
String currentGtid = entry.getHeader().getPropsList().get(0).getValue();
if (currentGtid.contains(sourceConfig.getServerUUID())) {
// don't support gtid for mariadb
if (sourceConfig.getServerUUID() != null && sourceConfig.isGTIDMode() && !sourceConfig.isMariaDB()) {
if (checkGtidForEntry(entry, sourceConfig)) {
transactionDataBuffer.add(entry);
}
} else {
Expand Down Expand Up @@ -90,9 +90,14 @@ public static Map<Long, List<CanalConnectRecord>> parse(CanalSourceConfig source
return recordMap;
}

private static boolean checkGtidForEntry(Entry entry, CanalSourceConfig sourceConfig) {
String currentGtid = entry.getHeader().getPropsList().get(0).getValue();
return currentGtid.contains(sourceConfig.getServerUUID());
}

private static void parseRecordListWithEntryBuffer(CanalSourceConfig sourceConfig,
List<CanalConnectRecord> recordList,
List<Entry> transactionDataBuffer, RdbTableMgr tables) {
List<CanalConnectRecord> recordList,
List<Entry> transactionDataBuffer, RdbTableMgr tables) {
for (Entry bufferEntry : transactionDataBuffer) {
List<CanalConnectRecord> recordParsedList = internParse(sourceConfig, bufferEntry, tables);
if (CollectionUtils.isEmpty(recordParsedList)) {
Expand Down Expand Up @@ -130,7 +135,7 @@ private static Column getColumnIgnoreCase(List<Column> columns, String columName
}

private static List<CanalConnectRecord> internParse(CanalSourceConfig sourceConfig, Entry entry,
RdbTableMgr tableMgr) {
RdbTableMgr tableMgr) {
String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
if (tableMgr.getTable(schemaName, tableName) == null) {
Expand Down Expand Up @@ -169,7 +174,7 @@ private static List<CanalConnectRecord> internParse(CanalSourceConfig sourceConf
}

private static CanalConnectRecord internParse(CanalSourceConfig canalSourceConfig, Entry entry,
RowChange rowChange, RowData rowData) {
RowChange rowChange, RowData rowData) {
CanalConnectRecord canalConnectRecord = new CanalConnectRecord();
canalConnectRecord.setTableName(entry.getHeader().getTableName());
canalConnectRecord.setSchemaName(entry.getHeader().getSchemaName());
Expand All @@ -179,10 +184,16 @@ private static CanalConnectRecord internParse(CanalSourceConfig canalSourceConfi
canalConnectRecord.setBinLogOffset(entry.getHeader().getLogfileOffset());
// if enabled gtid mode, gtid not null
if (canalSourceConfig.isGTIDMode()) {
String currentGtid = entry.getHeader().getPropsList().get(0).getValue();
String gtidRange = replaceGtidRange(entry.getHeader().getGtid(), currentGtid, canalSourceConfig.getServerUUID());
canalConnectRecord.setGtid(gtidRange);
canalConnectRecord.setCurrentGtid(currentGtid);
if (canalSourceConfig.isMariaDB()) {
String currentGtid = entry.getHeader().getGtid();
canalConnectRecord.setGtid(currentGtid);
canalConnectRecord.setCurrentGtid(currentGtid);
} else {
String currentGtid = entry.getHeader().getPropsList().get(0).getValue();
String gtidRange = replaceGtidRange(entry.getHeader().getGtid(), currentGtid, canalSourceConfig.getServerUUID());
canalConnectRecord.setGtid(gtidRange);
canalConnectRecord.setCurrentGtid(currentGtid);
}
}

EventType eventType = canalConnectRecord.getEventType();
Expand Down Expand Up @@ -276,7 +287,7 @@ public static String replaceGtidRange(String gtid, String currentGtid, String se
}

private static void checkUpdateKeyColumns(Map<String, EventColumn> oldKeyColumns,
Map<String, EventColumn> keyColumns) {
Map<String, EventColumn> keyColumns) {
if (oldKeyColumns.isEmpty()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,16 @@ private Canal buildCanal(CanalSourceConfig sourceConfig) {
recordPositionMap.put("journalName", canalRecordPartition.getJournalName());
recordPositionMap.put("timestamp", canalRecordPartition.getTimeStamp());
recordPositionMap.put("position", canalRecordOffset.getOffset());
String gtidRange = canalRecordOffset.getGtid();
if (gtidRange != null) {
if (canalRecordOffset.getCurrentGtid() != null) {
gtidRange = EntryParser.replaceGtidRange(canalRecordOffset.getGtid(), canalRecordOffset.getCurrentGtid(),
sourceConfig.getServerUUID());
// for mariaDB not support gtid mode
if (sourceConfig.isGTIDMode() && !sourceConfig.isMariaDB()) {
String gtidRange = canalRecordOffset.getGtid();
if (gtidRange != null) {
if (canalRecordOffset.getCurrentGtid() != null) {
gtidRange = EntryParser.replaceGtidRange(canalRecordOffset.getGtid(), canalRecordOffset.getCurrentGtid(),
sourceConfig.getServerUUID());
}
recordPositionMap.put("gtid", gtidRange);
}
recordPositionMap.put("gtid", gtidRange);
}
positions.add(JsonUtils.toJSONString(recordPositionMap));
});
Expand Down

0 comments on commit 294e4c2

Please sign in to comment.