diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java index 1cb654776b5..98201bfce74 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java @@ -223,7 +223,7 @@ private void createDataEventsForTable( rows++; final Object[] row = new Object[columnArray.getGreatestColumnPosition()]; for (int i = 0; i < columnArray.getColumns().length; i++) { - Column actualColumn = table.columns().get(i); + Column actualColumn = columnArray.getColumns()[i]; row[columnArray.getColumns()[i].position() - 1] = readField(rs, i + 1, actualColumn, table); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/AbstractMysqlCDCITBase.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/AbstractMysqlCDCITBase.java index 63bf6433ecd..af7686c109e 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/AbstractMysqlCDCITBase.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/AbstractMysqlCDCITBase.java @@ -296,6 +296,64 @@ public void testMysqlCdcCheckDataWithNoPrimaryKey(TestContainer container) { }); } + @TestTemplate + public void testMysqlCdcWithColumnIncludeList(TestContainer container) { + // Clear related content to ensure that multiple operations are not affected + clearTable(MYSQL_DATABASE, SOURCE_TABLE_1); + + CompletableFuture.supplyAsync( + () -> { + try { + container.executeJob("/mysqlcdc_to_mysql_with_column_include_list.conf"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + // Check if table exists first + if (tableExists( + MYSQL_DATABASE, "mysql_cdc_e2e_sink_table_column_include")) { + log.info( + query( + getColumnIncludeQuerySQL( + MYSQL_DATABASE, + "mysql_cdc_e2e_sink_table_column_include")) + .toString()); + Assertions.assertIterableEquals( + query( + getColumnIncludeSourceQuerySQL( + MYSQL_DATABASE, SOURCE_TABLE_1)), + query( + getColumnIncludeQuerySQL( + MYSQL_DATABASE, + "mysql_cdc_e2e_sink_table_column_include"))); + } else { + Assertions.fail("Sink table not created yet"); + } + }); + + // insert update delete + upsertDeleteSourceTableColumnInclude(MYSQL_DATABASE, SOURCE_TABLE_1); + + // stream stage + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + query( + getColumnIncludeSourceQuerySQL( + MYSQL_DATABASE, SOURCE_TABLE_1)), + query( + getColumnIncludeQuerySQL( + MYSQL_DATABASE, + "mysql_cdc_e2e_sink_table_column_include"))); + }); + } + @TestTemplate @DisabledOnContainer( value = {}, @@ -757,4 +815,73 @@ private String getSinkQuerySQL(String database, String tableName) { private String getQuerySQL(String database, String tableName) { return String.format(QUERY_SQL, database, tableName); } + + // Query SQL for column include list test (excluding f_tinyblob which is the 5th field) + private static final String COLUMN_INCLUDE_SOURCE_SQL_TEMPLATE = + "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary," + + " cast(f_varbinary as char) as f_varbinary, f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int from %s.%s"; + + private static final String COLUMN_INCLUDE_SINK_SQL_TEMPLATE = + "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary," + + " cast(f_varbinary as char) as f_varbinary, f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int from %s.%s"; + + private String getColumnIncludeSourceQuerySQL(String database, String tableName) { + return String.format(COLUMN_INCLUDE_SOURCE_SQL_TEMPLATE, database, tableName); + } + + private String getColumnIncludeQuerySQL(String database, String tableName) { + return String.format(COLUMN_INCLUDE_SINK_SQL_TEMPLATE, database, tableName); + } + + private void upsertDeleteSourceTableColumnInclude(String database, String tableName) { + executeSql( + "INSERT INTO " + + database + + "." + + tableName + + " ( id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint,\n" + + " f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer,\n" + + " f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double,\n" + + " f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime,\n" + + " f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, f_real, f_time,\n" + + " f_tinyint, f_tinyint_unsigned, f_json, f_year )\n" + + "VALUES ( 5, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n" + + " 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n" + + " 0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321,\n" + + " 123456789, 987654321, 123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n" + + " 'This is a text field', 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00',\n" + + " '2023-04-27 11:08:40', 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2',\n" + + " 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field',\n" + + " 12.345, '14:30:00', -128, 255, '{ \"key\": \"value\" }', 1992 )"); + executeSql( + "INSERT INTO " + + database + + "." + + tableName + + " ( id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint,\n" + + " f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer,\n" + + " f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double,\n" + + " f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime,\n" + + " f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, f_real, f_time,\n" + + " f_tinyint, f_tinyint_unsigned, f_json, f_year )\n" + + "VALUES ( 6, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n" + + " 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n" + + " 0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321,\n" + + " 123456789, 987654321, 123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n" + + " 'This is a text field', 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00',\n" + + " '2023-04-27 11:08:40', 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2',\n" + + " 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field',\n" + + " 12.345, '14:30:00', -128, 255, '{ \"key\": \"value\" }', 1999 )"); + executeSql("DELETE FROM " + database + "." + tableName + " where id = 2"); + executeSql("UPDATE " + database + "." + tableName + " SET f_bigint = 10000 where id = 3"); + } + + private boolean tableExists(String database, String tableName) { + try { + executeSql("SELECT 1 FROM " + database + "." + tableName + " LIMIT 1"); + return true; + } catch (Exception e) { + return false; + } + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_column_include_list.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_column_include_list.conf new file mode 100644 index 00000000000..4d9e2e72046 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_column_include_list.conf @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 + read_limit.bytes_per_second=7000000 + read_limit.rows_per_second=400 +} + +source { + MySQL-CDC { + plugin_output = "customers_mysql_cdc" + server-id = 5658 + username = "st_user_source" + password = "mysqlpw" + table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"] + url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc" + + # Debezium configuration with column.include.list + # Including 10 fields but excluding the 5th field (f_tinyblob) + debezium = { + "column.include.list" = "mysql_cdc.mysql_cdc_e2e_source_table.id,mysql_cdc.mysql_cdc_e2e_source_table.f_binary,mysql_cdc.mysql_cdc_e2e_source_table.f_blob,mysql_cdc.mysql_cdc_e2e_source_table.f_long_varbinary,mysql_cdc.mysql_cdc_e2e_source_table.f_varbinary,mysql_cdc.mysql_cdc_e2e_source_table.f_smallint,mysql_cdc.mysql_cdc_e2e_source_table.f_smallint_unsigned,mysql_cdc.mysql_cdc_e2e_source_table.f_mediumint,mysql_cdc.mysql_cdc_e2e_source_table.f_mediumint_unsigned,mysql_cdc.mysql_cdc_e2e_source_table.f_int" + } + } +} + +sink { + jdbc { + plugin_input = "customers_mysql_cdc" + url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc" + driver = "com.mysql.cj.jdbc.Driver" + user = "st_user_sink" + password = "mysqlpw" + + generate_sink_sql = true + database = mysql_cdc + table = mysql_cdc_e2e_sink_table_column_include + primary_keys = ["id"] + } +}