Skip to content

Commit 778954f

Browse files
committed
fix update_before event
1 parent f8e64b3 commit 778954f

File tree

1 file changed

+7
-6
lines changed
  • seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink

1 file changed

+7
-6
lines changed

seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkWriter.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,10 @@ private void processCdcRow(SeaTunnelRow row) throws SQLException {
418418
log.info("Processing CDC row with kind: {}", row.getRowKind());
419419

420420
String action = mapRowKindToAction(row.getRowKind());
421+
if ("update_before".equals(action)) {
422+
log.debug("UPDATE_BEFORE operation detected, skipping row");
423+
return;
424+
}
421425

422426
if ("delete".equals(action) && !allowDelete) {
423427
log.debug("DELETE operation not allowed, skipping row");
@@ -492,11 +496,9 @@ private String mapRowKindToAction(RowKind rowKind) {
492496
case UPDATE_AFTER:
493497
return "update";
494498
case DELETE:
495-
case UPDATE_BEFORE:
496499
return "delete";
497-
default:
498-
return "update";
499500
}
501+
return "update_before";
500502
}
501503

502504
/**
@@ -704,7 +706,7 @@ private void processRow(SeaTunnelRow row) throws SQLException {
704706
preparedStatement.addBatch();
705707
log.info("Added row to batch, current batch count: {}", batchCount + 1);
706708
}
707-
// 在写入 raw table 后,立即查询验证
709+
708710
private void verifyRawTableData(String rawTableName, String database) throws SQLException {
709711
try (Statement stmt = connection.createStatement();
710712
ResultSet rs =
@@ -722,7 +724,6 @@ private void verifyRawTableData(String rawTableName, String database) throws SQL
722724
}
723725
}
724726

725-
// 打印详细数据
726727
try (Statement stmt = connection.createStatement();
727728
ResultSet dataRs =
728729
stmt.executeQuery(
@@ -733,7 +734,7 @@ private void verifyRawTableData(String rawTableName, String database) throws SQL
733734
+ " ORDER BY add_time"); ) {
734735
while (dataRs.next()) {
735736
log.info(
736-
"Raw data ssj: {}, action: {}, time: {}",
737+
"Raw data : {}, action: {}, time: {}",
737738
dataRs.getString(1),
738739
dataRs.getString(2),
739740
dataRs.getTimestamp(3));

0 commit comments

Comments
 (0)