diff --git a/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/RedisTemplateTest.java b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/RedisTemplateTest.java index b7e03962c56..3f86e04c682 100644 --- a/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/RedisTemplateTest.java +++ b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/RedisTemplateTest.java @@ -25,10 +25,10 @@ import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.LocalTimeType; -import org.apache.seatunnel.api.table.type.RowKind; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisContainerInfo; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType; +import org.apache.seatunnel.connectors.seatunnel.redis.row.TestForDeleteRows; +import org.apache.seatunnel.connectors.seatunnel.redis.row.TestKeyOrValueIsNullRows; import org.apache.seatunnel.connectors.seatunnel.redis.sink.RedisSinkFactory; import org.apache.seatunnel.connectors.seatunnel.sink.SinkFlowTestUtils; @@ -49,11 +49,8 @@ import redis.clients.jedis.Jedis; import java.io.IOException; -import java.math.BigDecimal; import java.time.Duration; -import java.time.LocalDateTime; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -138,13 +135,7 @@ public void testFakeToRedisDeleteHashTest() throws IOException { getCatalogTable(0, key), getDefaultReadonlyConfig(RedisDataType.HASH, key, otherParams), new RedisSinkFactory(), - Arrays.asList( - getSeaTunnelRowInsert1(), - getSeaTunnelRowInsert2(), - getSeaTunnelRowInsert3(), - getSeaTunnelRowUpdateBefore(), - getSeaTunnelRowUpdateAfter(), - getSeaTunnelRowDelete())); + TestForDeleteRows.getRows()); Assertions.assertEquals(2, jedis.hlen(key)); jedis.del(key); } @@ -158,13 +149,7 @@ public void testFakeToRedisDeleteKeyTest() throws IOException { getCatalogTable(0, key), getDefaultReadonlyConfig(RedisDataType.KEY, key, otherParams), new RedisSinkFactory(), - Arrays.asList( - getSeaTunnelRowInsert1(), - getSeaTunnelRowInsert2(), - getSeaTunnelRowInsert3(), - getSeaTunnelRowUpdateBefore(), - getSeaTunnelRowUpdateAfter(), - getSeaTunnelRowDelete())); + TestForDeleteRows.getRows()); int count = 0; for (int i = 1; i <= 3; i++) { String data = jedis.get("key_check:" + i); @@ -185,13 +170,7 @@ public void testFakeToRedisDeleteListTest() throws IOException { getCatalogTable(0, key), getDefaultReadonlyConfig(RedisDataType.LIST, key, new HashMap<>()), new RedisSinkFactory(), - Arrays.asList( - getSeaTunnelRowInsert1(), - getSeaTunnelRowInsert2(), - getSeaTunnelRowInsert3(), - getSeaTunnelRowUpdateBefore(), - getSeaTunnelRowUpdateAfter(), - getSeaTunnelRowDelete())); + TestForDeleteRows.getRows()); Assertions.assertEquals(2, jedis.llen(key)); jedis.del(key); } @@ -203,13 +182,7 @@ public void testFakeToRedisDeleteSetTest() throws IOException { getCatalogTable(0, key), getDefaultReadonlyConfig(RedisDataType.SET, key, new HashMap<>()), new RedisSinkFactory(), - Arrays.asList( - getSeaTunnelRowInsert1(), - getSeaTunnelRowInsert2(), - getSeaTunnelRowInsert3(), - getSeaTunnelRowUpdateBefore(), - getSeaTunnelRowUpdateAfter(), - getSeaTunnelRowDelete())); + TestForDeleteRows.getRows()); Assertions.assertEquals(2, jedis.scard(key)); jedis.del(key); } @@ -221,17 +194,80 @@ public void testFakeToToRedisDeleteZSetTest() throws IOException { getCatalogTable(0, key), getDefaultReadonlyConfig(RedisDataType.ZSET, key, new HashMap<>()), new RedisSinkFactory(), - Arrays.asList( - getSeaTunnelRowInsert1(), - getSeaTunnelRowInsert2(), - getSeaTunnelRowInsert3(), - getSeaTunnelRowUpdateBefore(), - getSeaTunnelRowUpdateAfter(), - getSeaTunnelRowDelete())); + TestForDeleteRows.getRows()); Assertions.assertEquals(2, jedis.zcard(key)); jedis.del(key); } + @Test + public void testFakeToRedisCustomKeyIsNullTest() throws IOException { + String key = "key_check:{val_string}"; + Map otherParams = new HashMap<>(); + otherParams.put("support_custom_key", true); + SinkFlowTestUtils.runBatchWithCheckpointDisabled( + getCatalogTable(0, key), + getDefaultReadonlyConfig(RedisDataType.KEY, key, otherParams), + new RedisSinkFactory(), + TestKeyOrValueIsNullRows.getRows()); + int count = 0; + String data = jedis.get("key_check:"); + if (data != null) { + count++; + jedis.del("key_check:"); + } + for (int i = 2; i <= 3; i++) { + data = jedis.get("key_check:NEW" + i); + if (data != null) { + count++; + jedis.del("key_check:NEW" + i); + } + } + Assertions.assertEquals(2, count); + } + + @Test + public void testFakeToRedisOtherTypeValueIsNullTest() throws IOException { + String key = "list_check"; + Map otherParams = new HashMap<>(); + otherParams.put("value_field", "val_string"); + SinkFlowTestUtils.runBatchWithCheckpointDisabled( + getCatalogTable(0, key), + getDefaultReadonlyConfig(RedisDataType.LIST, key, otherParams), + new RedisSinkFactory(), + TestKeyOrValueIsNullRows.getRows()); + Assertions.assertEquals(2, jedis.llen(key)); + jedis.del(key); + } + + @Test + public void testFakeToRedisHashTypeKeyIsNullTest() throws IOException { + String key = "hash_check"; + Map otherParams = new HashMap<>(); + otherParams.put("hash_key_field", "val_string"); + SinkFlowTestUtils.runBatchWithCheckpointDisabled( + getCatalogTable(0, key), + getDefaultReadonlyConfig(RedisDataType.HASH, key, otherParams), + new RedisSinkFactory(), + TestKeyOrValueIsNullRows.getRows()); + Assertions.assertEquals(2, jedis.hlen(key)); + jedis.del(key); + } + + @Test + public void testFakeToRedisHashTypeValueIsNullTest() throws IOException { + String key = "hash_check"; + Map otherParams = new HashMap<>(); + otherParams.put("hash_key_field", "id"); + otherParams.put("hash_value_field", "val_string"); + SinkFlowTestUtils.runBatchWithCheckpointDisabled( + getCatalogTable(0, key), + getDefaultReadonlyConfig(RedisDataType.HASH, key, otherParams), + new RedisSinkFactory(), + TestKeyOrValueIsNullRows.getRows()); + Assertions.assertEquals(2, jedis.hlen(key)); + jedis.del(key); + } + private ReadonlyConfig getDefaultReadonlyConfig( RedisDataType dataType, String key, Map otherParams) { Map map = new HashMap<>(otherParams); @@ -245,117 +281,6 @@ private ReadonlyConfig getDefaultReadonlyConfig( return ReadonlyConfig.fromMap(map); } - private SeaTunnelRow getSeaTunnelRowInsert1() { - return new SeaTunnelRow( - new Object[] { - 1, - true, - (byte) 1, - (short) 2, - 3, - 4L, - 4.3f, - 5.3d, - BigDecimal.valueOf(6.3).setScale(1), - "NEW", - LocalDateTime.parse("2020-02-02T02:02:02") - }); - } - - private SeaTunnelRow getSeaTunnelRowInsert2() { - return new SeaTunnelRow( - new Object[] { - 2, - true, - (byte) 1, - (short) 2, - 3, - 4L, - 4.3f, - 5.3d, - BigDecimal.valueOf(6.3).setScale(1), - "NEW", - LocalDateTime.parse("2020-02-02T02:02:02") - }); - } - - private SeaTunnelRow getSeaTunnelRowInsert3() { - return new SeaTunnelRow( - new Object[] { - 3, - true, - (byte) 1, - (short) 2, - 3, - 4L, - 4.3f, - 5.3d, - BigDecimal.valueOf(6.3).setScale(1), - "NEW", - LocalDateTime.parse("2020-02-02T02:02:02") - }); - } - - private SeaTunnelRow getSeaTunnelRowUpdateBefore() { - final SeaTunnelRow seaTunnelRow = - new SeaTunnelRow( - new Object[] { - 1, - true, - (byte) 1, - (short) 2, - 3, - 4L, - 4.3f, - 5.3d, - BigDecimal.valueOf(6.3).setScale(1), - "NEW", - LocalDateTime.parse("2020-02-02T02:02:02") - }); - seaTunnelRow.setRowKind(RowKind.UPDATE_BEFORE); - return seaTunnelRow; - } - - private SeaTunnelRow getSeaTunnelRowUpdateAfter() { - final SeaTunnelRow seaTunnelRow = - new SeaTunnelRow( - new Object[] { - 1, - true, - (byte) 2, - (short) 2, - 3, - 4L, - 4.3f, - 5.3d, - BigDecimal.valueOf(6.3).setScale(1), - "NEW", - LocalDateTime.parse("2020-02-02T02:02:02") - }); - seaTunnelRow.setRowKind(RowKind.UPDATE_AFTER); - return seaTunnelRow; - } - - private SeaTunnelRow getSeaTunnelRowDelete() { - final SeaTunnelRow seaTunnelRow = - new SeaTunnelRow( - new Object[] { - 2, - true, - (byte) 1, - (short) 2, - 3, - 4L, - 4.3f, - 5.3d, - BigDecimal.valueOf(6.3).setScale(1), - "NEW", - LocalDateTime.parse("2020-02-02T02:02:02") - }); - seaTunnelRow.setRowKind(RowKind.DELETE); - return seaTunnelRow; - } - private CatalogTable getCatalogTable(Integer dbNum, String key) { return CatalogTable.of( TableIdentifier.of(CONNECTOR_IDENTITY, dbNum.toString(), key), diff --git a/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/row/TestForDeleteRows.java b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/row/TestForDeleteRows.java new file mode 100644 index 00000000000..32a9d8b4983 --- /dev/null +++ b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/row/TestForDeleteRows.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.connectors.seatunnel.redis.row; + +import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.List; + +public class TestForDeleteRows { + + public static List getRows() { + return Arrays.asList( + getSeaTunnelRowInsert1(), + getSeaTunnelRowInsert2(), + getSeaTunnelRowInsert3(), + getSeaTunnelRowUpdateBefore(), + getSeaTunnelRowUpdateAfter(), + getSeaTunnelRowDelete()); + } + + private static SeaTunnelRow getSeaTunnelRowInsert1() { + return new SeaTunnelRow( + new Object[] { + 1, + true, + (byte) 1, + (short) 2, + 3, + 4L, + 4.3f, + 5.3d, + BigDecimal.valueOf(6.3).setScale(1), + "NEW", + LocalDateTime.parse("2020-02-02T02:02:02") + }); + } + + private static SeaTunnelRow getSeaTunnelRowInsert2() { + return new SeaTunnelRow( + new Object[] { + 2, + true, + (byte) 1, + (short) 2, + 3, + 4L, + 4.3f, + 5.3d, + BigDecimal.valueOf(6.3).setScale(1), + "NEW", + LocalDateTime.parse("2020-02-02T02:02:02") + }); + } + + private static SeaTunnelRow getSeaTunnelRowInsert3() { + return new SeaTunnelRow( + new Object[] { + 3, + true, + (byte) 1, + (short) 2, + 3, + 4L, + 4.3f, + 5.3d, + BigDecimal.valueOf(6.3).setScale(1), + "NEW", + LocalDateTime.parse("2020-02-02T02:02:02") + }); + } + + private static SeaTunnelRow getSeaTunnelRowUpdateBefore() { + final SeaTunnelRow seaTunnelRow = + new SeaTunnelRow( + new Object[] { + 1, + true, + (byte) 1, + (short) 2, + 3, + 4L, + 4.3f, + 5.3d, + BigDecimal.valueOf(6.3).setScale(1), + "NEW", + LocalDateTime.parse("2020-02-02T02:02:02") + }); + seaTunnelRow.setRowKind(RowKind.UPDATE_BEFORE); + return seaTunnelRow; + } + + private static SeaTunnelRow getSeaTunnelRowUpdateAfter() { + final SeaTunnelRow seaTunnelRow = + new SeaTunnelRow( + new Object[] { + 1, + true, + (byte) 2, + (short) 2, + 3, + 4L, + 4.3f, + 5.3d, + BigDecimal.valueOf(6.3).setScale(1), + "NEW", + LocalDateTime.parse("2020-02-02T02:02:02") + }); + seaTunnelRow.setRowKind(RowKind.UPDATE_AFTER); + return seaTunnelRow; + } + + private static SeaTunnelRow getSeaTunnelRowDelete() { + final SeaTunnelRow seaTunnelRow = + new SeaTunnelRow( + new Object[] { + 2, + true, + (byte) 1, + (short) 2, + 3, + 4L, + 4.3f, + 5.3d, + BigDecimal.valueOf(6.3).setScale(1), + "NEW", + LocalDateTime.parse("2020-02-02T02:02:02") + }); + seaTunnelRow.setRowKind(RowKind.DELETE); + return seaTunnelRow; + } +} diff --git a/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/row/TestKeyOrValueIsNullRows.java b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/row/TestKeyOrValueIsNullRows.java new file mode 100644 index 00000000000..be83768b35e --- /dev/null +++ b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/row/TestKeyOrValueIsNullRows.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.connectors.seatunnel.redis.row; + +import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.List; + +public class TestKeyOrValueIsNullRows { + + public static List getRows() { + return Arrays.asList( + getSeaTunnelRowWithStringNullInsert1(), + getSeaTunnelRowInsert2(), + getSeaTunnelRowInsert3(), + getSeaTunnelRowWithStringNullUpdateBefore(), + getSeaTunnelRowWithStringNullUpdateAfter(), + getSeaTunnelRowWithStringNullDelete()); + } + + private static SeaTunnelRow getSeaTunnelRowWithStringNullInsert1() { + return new SeaTunnelRow( + new Object[] { + 1, + true, + (byte) 1, + (short) 2, + 3, + 4L, + 4.3f, + 5.3d, + BigDecimal.valueOf(6.3).setScale(1), + null, + LocalDateTime.parse("2020-02-02T02:02:02") + }); + } + + private static SeaTunnelRow getSeaTunnelRowInsert2() { + return new SeaTunnelRow( + new Object[] { + 2, + true, + (byte) 1, + (short) 2, + 3, + 4L, + 4.3f, + 5.3d, + BigDecimal.valueOf(6.3).setScale(1), + "NEW2", + LocalDateTime.parse("2020-02-02T02:02:02") + }); + } + + private static SeaTunnelRow getSeaTunnelRowInsert3() { + return new SeaTunnelRow( + new Object[] { + 3, + true, + (byte) 1, + (short) 2, + 3, + 4L, + 4.3f, + 5.3d, + BigDecimal.valueOf(6.3).setScale(1), + "NEW3", + LocalDateTime.parse("2020-02-02T02:02:02") + }); + } + + private static SeaTunnelRow getSeaTunnelRowWithStringNullUpdateBefore() { + final SeaTunnelRow seaTunnelRow = + new SeaTunnelRow( + new Object[] { + 1, + true, + (byte) 1, + (short) 2, + 3, + 4L, + 4.3f, + 5.3d, + BigDecimal.valueOf(6.3).setScale(1), + null, + LocalDateTime.parse("2020-02-02T02:02:02") + }); + seaTunnelRow.setRowKind(RowKind.UPDATE_BEFORE); + return seaTunnelRow; + } + + private static SeaTunnelRow getSeaTunnelRowWithStringNullUpdateAfter() { + final SeaTunnelRow seaTunnelRow = + new SeaTunnelRow( + new Object[] { + 1, + true, + (byte) 2, + (short) 2, + 3, + 4L, + 4.3f, + 5.3d, + BigDecimal.valueOf(6.3).setScale(1), + null, + LocalDateTime.parse("2020-02-02T02:02:02") + }); + seaTunnelRow.setRowKind(RowKind.UPDATE_AFTER); + return seaTunnelRow; + } + + private static SeaTunnelRow getSeaTunnelRowWithStringNullDelete() { + final SeaTunnelRow seaTunnelRow = + new SeaTunnelRow( + new Object[] { + 1, + true, + (byte) 1, + (short) 2, + 3, + 4L, + 4.3f, + 5.3d, + BigDecimal.valueOf(6.3).setScale(1), + null, + LocalDateTime.parse("2020-02-02T02:02:02") + }); + seaTunnelRow.setRowKind(RowKind.DELETE); + return seaTunnelRow; + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java index 0124681abbf..77b0733db59 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java @@ -18,25 +18,17 @@ import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.seatunnel.api.configuration.ReadonlyConfig; -import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.Column; -import org.apache.seatunnel.api.table.catalog.PhysicalColumn; -import org.apache.seatunnel.api.table.catalog.TableIdentifier; -import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.ArrayType; import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.LocalTimeType; import org.apache.seatunnel.api.table.type.MapType; import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; -import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisContainerInfo; -import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType; import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.EngineType; @@ -81,7 +73,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions.CONNECTOR_IDENTITY; import static org.awaitility.Awaitility.await; @Slf4j @@ -706,221 +697,5 @@ public void testFakeToRedisNormalKeyIsNullTest(TestContainer container) Assertions.assertEquals(2, count); } - @TestTemplate - public void testFakeToRedisCustomKeyIsNullTest(TestContainer container) - throws IOException, InterruptedException { - Container.ExecResult execResult = - container.executeJob("/fake-to-redis-test-custom-key-is-null.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); - int count = 0; - String data = jedis.get("key_check:"); - if (data != null) { - count++; - jedis.del("key_check:"); - } - for (int i = 2; i <= 3; i++) { - data = jedis.get("key_check:NEW" + i); - if (data != null) { - count++; - jedis.del("key_check:NEW" + i); - } - } - Assertions.assertEquals(2, count); - } - - @TestTemplate - public void testFakeToRedisOtherTypeValueIsNullTest(TestContainer container) - throws IOException, InterruptedException { - Container.ExecResult execResult = - container.executeJob( - "/fake-to-redis-test-custom-value-when-other-type-is-null.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); - Assertions.assertEquals(2, jedis.llen("list_check")); - jedis.del("list_check"); - } - - @TestTemplate - public void testFakeToRedisHashTypeKeyIsNullTest(TestContainer container) - throws IOException, InterruptedException { - Container.ExecResult execResult = - container.executeJob("/fake-to-redis-test-custom-value-when-hash-key-is-null.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); - Assertions.assertEquals(2, jedis.hlen("hash_check")); - jedis.del("hash_check"); - } - - @TestTemplate - public void testFakeToRedisHashTypeValueIsNullTest(TestContainer container) - throws IOException, InterruptedException { - Container.ExecResult execResult = - container.executeJob( - "/fake-to-redis-test-custom-value-when-hash-value-is-null.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); - Assertions.assertEquals(2, jedis.hlen("hash_check")); - jedis.del("hash_check"); - } - public abstract RedisContainerInfo getRedisContainerInfo(); - - private ReadonlyConfig getDefaultReadonlyConfig( - RedisDataType dataType, String key, Map otherParams) { - Map map = new HashMap<>(otherParams); - map.put("host", redisContainer.getHost()); - map.put("port", redisContainer.getFirstMappedPort()); - map.put("db_num", 0); - map.put("auth", password); - map.put("key", key); - map.put("data_type", dataType.name()); - map.put("batch_size", 33); - return ReadonlyConfig.fromMap(map); - } - - private SeaTunnelRow getSeaTunnelRowInsert1() { - return new SeaTunnelRow( - new Object[] { - 1, - true, - (byte) 1, - (short) 2, - 3, - 4L, - 4.3f, - 5.3d, - BigDecimal.valueOf(6.3).setScale(1), - "NEW", - LocalDateTime.parse("2020-02-02T02:02:02") - }); - } - - private SeaTunnelRow getSeaTunnelRowInsert2() { - return new SeaTunnelRow( - new Object[] { - 2, - true, - (byte) 1, - (short) 2, - 3, - 4L, - 4.3f, - 5.3d, - BigDecimal.valueOf(6.3).setScale(1), - "NEW", - LocalDateTime.parse("2020-02-02T02:02:02") - }); - } - - private SeaTunnelRow getSeaTunnelRowInsert3() { - return new SeaTunnelRow( - new Object[] { - 3, - true, - (byte) 1, - (short) 2, - 3, - 4L, - 4.3f, - 5.3d, - BigDecimal.valueOf(6.3).setScale(1), - "NEW", - LocalDateTime.parse("2020-02-02T02:02:02") - }); - } - - private SeaTunnelRow getSeaTunnelRowUpdateBefore() { - final SeaTunnelRow seaTunnelRow = - new SeaTunnelRow( - new Object[] { - 1, - true, - (byte) 1, - (short) 2, - 3, - 4L, - 4.3f, - 5.3d, - BigDecimal.valueOf(6.3).setScale(1), - "NEW", - LocalDateTime.parse("2020-02-02T02:02:02") - }); - seaTunnelRow.setRowKind(RowKind.UPDATE_BEFORE); - return seaTunnelRow; - } - - private SeaTunnelRow getSeaTunnelRowUpdateAfter() { - final SeaTunnelRow seaTunnelRow = - new SeaTunnelRow( - new Object[] { - 1, - true, - (byte) 2, - (short) 2, - 3, - 4L, - 4.3f, - 5.3d, - BigDecimal.valueOf(6.3).setScale(1), - "NEW", - LocalDateTime.parse("2020-02-02T02:02:02") - }); - seaTunnelRow.setRowKind(RowKind.UPDATE_AFTER); - return seaTunnelRow; - } - - private SeaTunnelRow getSeaTunnelRowDelete() { - final SeaTunnelRow seaTunnelRow = - new SeaTunnelRow( - new Object[] { - 2, - true, - (byte) 1, - (short) 2, - 3, - 4L, - 4.3f, - 5.3d, - BigDecimal.valueOf(6.3).setScale(1), - "NEW", - LocalDateTime.parse("2020-02-02T02:02:02") - }); - seaTunnelRow.setRowKind(RowKind.DELETE); - return seaTunnelRow; - } - - private CatalogTable getCatalogTable(Integer dbNum, String key) { - return CatalogTable.of( - TableIdentifier.of(CONNECTOR_IDENTITY, dbNum.toString(), key), - getTableSchema(), - new HashMap<>(), - new ArrayList<>(), - ""); - } - - private TableSchema getTableSchema() { - return new TableSchema(getColumns(), null, null); - } - - private List getColumns() { - List columns = new ArrayList<>(); - columns.add(new PhysicalColumn("id", BasicType.INT_TYPE, 32L, 0, true, "", "")); - columns.add(new PhysicalColumn("val_bool", BasicType.BOOLEAN_TYPE, 1L, 0, true, "", "")); - columns.add(new PhysicalColumn("val_int8", BasicType.BYTE_TYPE, 8L, 0, true, "", "")); - columns.add(new PhysicalColumn("val_int16", BasicType.SHORT_TYPE, 16L, 0, true, "", "")); - columns.add(new PhysicalColumn("val_int32", BasicType.INT_TYPE, 32L, 0, true, "", "")); - columns.add(new PhysicalColumn("val_int64", BasicType.LONG_TYPE, 64L, 0, true, "", "")); - columns.add(new PhysicalColumn("val_float", BasicType.FLOAT_TYPE, 32L, 0, true, "", "")); - columns.add(new PhysicalColumn("val_double", BasicType.DOUBLE_TYPE, 64L, 0, true, "", "")); - columns.add( - new PhysicalColumn("val_decimal", new DecimalType(16, 1), 16L, 1, true, "", "")); - columns.add(new PhysicalColumn("val_string", BasicType.STRING_TYPE, 0L, 0, true, "", "")); - columns.add( - new PhysicalColumn( - "val_unixtime_micros", - LocalTimeType.LOCAL_DATE_TIME_TYPE, - 64L, - 6, - true, - "", - "")); - return columns; - } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-key-is-null.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-key-is-null.conf deleted file mode 100644 index 20ec18450a9..00000000000 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-key-is-null.conf +++ /dev/null @@ -1,87 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -env { - parallelism = 1 - job.mode = "BATCH" - shade.identifier = "base64" - - #spark config - spark.app.name = "SeaTunnel" - spark.executor.instances = 2 - spark.executor.cores = 1 - spark.executor.memory = "1g" - spark.master = local -} - -source { - FakeSource { - schema = { - fields { - id = int - val_bool = boolean - val_int8 = tinyint - val_int16 = smallint - val_int32 = int - val_int64 = bigint - val_float = float - val_double = double - val_decimal = "decimal(16, 1)" - val_string = string - val_unixtime_micros = timestamp - } - } - rows = [ - { - kind = INSERT - fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, "2020-02-02T02:02:02"] - }, - { - kind = INSERT - fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW2", "2020-02-02T02:02:02"] - }, - { - kind = INSERT - fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW3", "2020-02-02T02:02:02"] - }, - { - kind = UPDATE_BEFORE - fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, "2020-02-02T02:02:02"] - }, - { - kind = UPDATE_AFTER - fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, null, "2020-02-02T02:02:02"] - }, - { - kind = DELETE - fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, "2020-02-02T02:02:02"] - } - ] - } -} - -sink { - Redis { - host = "redis-e2e" - port = 6379 - auth = "U2VhVHVubmVs" - key = "key_check:{val_string}" - data_type = key - support_custom_key = true - batch_size = 33 - } -} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-key-is-null.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-key-is-null.conf deleted file mode 100644 index 774b4aa379a..00000000000 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-key-is-null.conf +++ /dev/null @@ -1,87 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -env { - parallelism = 1 - job.mode = "BATCH" - shade.identifier = "base64" - - #spark config - spark.app.name = "SeaTunnel" - spark.executor.instances = 2 - spark.executor.cores = 1 - spark.executor.memory = "1g" - spark.master = local -} - -source { - FakeSource { - schema = { - fields { - id = int - val_bool = boolean - val_int8 = tinyint - val_int16 = smallint - val_int32 = int - val_int64 = bigint - val_float = float - val_double = double - val_decimal = "decimal(16, 1)" - val_string = string - val_unixtime_micros = timestamp - } - } - rows = [ - { - kind = INSERT - fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, "2020-02-02T02:02:02"] - }, - { - kind = INSERT - fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW2", "2020-02-02T02:02:02"] - }, - { - kind = INSERT - fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW3", "2020-02-02T02:02:02"] - }, - { - kind = UPDATE_BEFORE - fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, "2020-02-02T02:02:02"] - }, - { - kind = UPDATE_AFTER - fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, null, "2020-02-02T02:02:02"] - }, - { - kind = DELETE - fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, "2020-02-02T02:02:02"] - } - ] - } -} - -sink { - Redis { - host = "redis-e2e" - port = 6379 - auth = "U2VhVHVubmVs" - key = "hash_check" - data_type = hash - hash_key_field = "val_string" - batch_size = 33 - } -} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-value-is-null.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-value-is-null.conf deleted file mode 100644 index 8d3c2fee7a9..00000000000 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-value-is-null.conf +++ /dev/null @@ -1,88 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -env { - parallelism = 1 - job.mode = "BATCH" - shade.identifier = "base64" - - #spark config - spark.app.name = "SeaTunnel" - spark.executor.instances = 2 - spark.executor.cores = 1 - spark.executor.memory = "1g" - spark.master = local -} - -source { - FakeSource { - schema = { - fields { - id = int - val_bool = boolean - val_int8 = tinyint - val_int16 = smallint - val_int32 = int - val_int64 = bigint - val_float = float - val_double = double - val_decimal = "decimal(16, 1)" - val_string = string - val_unixtime_micros = timestamp - } - } - rows = [ - { - kind = INSERT - fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, "2020-02-02T02:02:02"] - }, - { - kind = INSERT - fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW2", "2020-02-02T02:02:02"] - }, - { - kind = INSERT - fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW3", "2020-02-02T02:02:02"] - }, - { - kind = UPDATE_BEFORE - fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, "2020-02-02T02:02:02"] - }, - { - kind = UPDATE_AFTER - fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, null, "2020-02-02T02:02:02"] - }, - { - kind = DELETE - fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, "2020-02-02T02:02:02"] - } - ] - } -} - -sink { - Redis { - host = "redis-e2e" - port = 6379 - auth = "U2VhVHVubmVs" - key = "hash_check" - data_type = hash - hash_key_field = "id" - hash_value_field = "val_string" - batch_size = 33 - } -} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-other-type-is-null.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-other-type-is-null.conf deleted file mode 100644 index 1eab8030d9b..00000000000 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-other-type-is-null.conf +++ /dev/null @@ -1,87 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -env { - parallelism = 1 - job.mode = "BATCH" - shade.identifier = "base64" - - #spark config - spark.app.name = "SeaTunnel" - spark.executor.instances = 2 - spark.executor.cores = 1 - spark.executor.memory = "1g" - spark.master = local -} - -source { - FakeSource { - schema = { - fields { - id = int - val_bool = boolean - val_int8 = tinyint - val_int16 = smallint - val_int32 = int - val_int64 = bigint - val_float = float - val_double = double - val_decimal = "decimal(16, 1)" - val_string = string - val_unixtime_micros = timestamp - } - } - rows = [ - { - kind = INSERT - fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, "2020-02-02T02:02:02"] - }, - { - kind = INSERT - fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW2", "2020-02-02T02:02:02"] - }, - { - kind = INSERT - fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW3", "2020-02-02T02:02:02"] - }, - { - kind = UPDATE_BEFORE - fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, "2020-02-02T02:02:02"] - }, - { - kind = UPDATE_AFTER - fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, null, "2020-02-02T02:02:02"] - }, - { - kind = DELETE - fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, "2020-02-02T02:02:02"] - } - ] - } -} - -sink { - Redis { - host = "redis-e2e" - port = 6379 - auth = "U2VhVHVubmVs" - key = "list_check" - data_type = list - value_field = "val_string" - batch_size = 33 - } -} \ No newline at end of file