Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisContainerInfo;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.row.TestForDeleteRows;
import org.apache.seatunnel.connectors.seatunnel.redis.row.TestKeyOrValueIsNullRows;
import org.apache.seatunnel.connectors.seatunnel.redis.sink.RedisSinkFactory;
import org.apache.seatunnel.connectors.seatunnel.sink.SinkFlowTestUtils;

Expand All @@ -49,11 +49,8 @@
import redis.clients.jedis.Jedis;

import java.io.IOException;
import java.math.BigDecimal;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -138,13 +135,7 @@ public void testFakeToRedisDeleteHashTest() throws IOException {
getCatalogTable(0, key),
getDefaultReadonlyConfig(RedisDataType.HASH, key, otherParams),
new RedisSinkFactory(),
Arrays.asList(
getSeaTunnelRowInsert1(),
getSeaTunnelRowInsert2(),
getSeaTunnelRowInsert3(),
getSeaTunnelRowUpdateBefore(),
getSeaTunnelRowUpdateAfter(),
getSeaTunnelRowDelete()));
TestForDeleteRows.getRows());
Assertions.assertEquals(2, jedis.hlen(key));
jedis.del(key);
}
Expand All @@ -158,13 +149,7 @@ public void testFakeToRedisDeleteKeyTest() throws IOException {
getCatalogTable(0, key),
getDefaultReadonlyConfig(RedisDataType.KEY, key, otherParams),
new RedisSinkFactory(),
Arrays.asList(
getSeaTunnelRowInsert1(),
getSeaTunnelRowInsert2(),
getSeaTunnelRowInsert3(),
getSeaTunnelRowUpdateBefore(),
getSeaTunnelRowUpdateAfter(),
getSeaTunnelRowDelete()));
TestForDeleteRows.getRows());
int count = 0;
for (int i = 1; i <= 3; i++) {
String data = jedis.get("key_check:" + i);
Expand All @@ -185,13 +170,7 @@ public void testFakeToRedisDeleteListTest() throws IOException {
getCatalogTable(0, key),
getDefaultReadonlyConfig(RedisDataType.LIST, key, new HashMap<>()),
new RedisSinkFactory(),
Arrays.asList(
getSeaTunnelRowInsert1(),
getSeaTunnelRowInsert2(),
getSeaTunnelRowInsert3(),
getSeaTunnelRowUpdateBefore(),
getSeaTunnelRowUpdateAfter(),
getSeaTunnelRowDelete()));
TestForDeleteRows.getRows());
Assertions.assertEquals(2, jedis.llen(key));
jedis.del(key);
}
Expand All @@ -203,13 +182,7 @@ public void testFakeToRedisDeleteSetTest() throws IOException {
getCatalogTable(0, key),
getDefaultReadonlyConfig(RedisDataType.SET, key, new HashMap<>()),
new RedisSinkFactory(),
Arrays.asList(
getSeaTunnelRowInsert1(),
getSeaTunnelRowInsert2(),
getSeaTunnelRowInsert3(),
getSeaTunnelRowUpdateBefore(),
getSeaTunnelRowUpdateAfter(),
getSeaTunnelRowDelete()));
TestForDeleteRows.getRows());
Assertions.assertEquals(2, jedis.scard(key));
jedis.del(key);
}
Expand All @@ -221,17 +194,80 @@ public void testFakeToToRedisDeleteZSetTest() throws IOException {
getCatalogTable(0, key),
getDefaultReadonlyConfig(RedisDataType.ZSET, key, new HashMap<>()),
new RedisSinkFactory(),
Arrays.asList(
getSeaTunnelRowInsert1(),
getSeaTunnelRowInsert2(),
getSeaTunnelRowInsert3(),
getSeaTunnelRowUpdateBefore(),
getSeaTunnelRowUpdateAfter(),
getSeaTunnelRowDelete()));
TestForDeleteRows.getRows());
Assertions.assertEquals(2, jedis.zcard(key));
jedis.del(key);
}

@Test
public void testFakeToRedisCustomKeyIsNullTest() throws IOException {
String key = "key_check:{val_string}";
Map<String, Object> otherParams = new HashMap<>();
otherParams.put("support_custom_key", true);
SinkFlowTestUtils.runBatchWithCheckpointDisabled(
getCatalogTable(0, key),
getDefaultReadonlyConfig(RedisDataType.KEY, key, otherParams),
new RedisSinkFactory(),
TestKeyOrValueIsNullRows.getRows());
int count = 0;
String data = jedis.get("key_check:");
if (data != null) {
count++;
jedis.del("key_check:");
}
for (int i = 2; i <= 3; i++) {
data = jedis.get("key_check:NEW" + i);
if (data != null) {
count++;
jedis.del("key_check:NEW" + i);
}
}
Assertions.assertEquals(2, count);
}

@Test
public void testFakeToRedisOtherTypeValueIsNullTest() throws IOException {
String key = "list_check";
Map<String, Object> otherParams = new HashMap<>();
otherParams.put("value_field", "val_string");
SinkFlowTestUtils.runBatchWithCheckpointDisabled(
getCatalogTable(0, key),
getDefaultReadonlyConfig(RedisDataType.LIST, key, otherParams),
new RedisSinkFactory(),
TestKeyOrValueIsNullRows.getRows());
Assertions.assertEquals(2, jedis.llen(key));
jedis.del(key);
}

@Test
public void testFakeToRedisHashTypeKeyIsNullTest() throws IOException {
String key = "hash_check";
Map<String, Object> otherParams = new HashMap<>();
otherParams.put("hash_key_field", "val_string");
SinkFlowTestUtils.runBatchWithCheckpointDisabled(
getCatalogTable(0, key),
getDefaultReadonlyConfig(RedisDataType.HASH, key, otherParams),
new RedisSinkFactory(),
TestKeyOrValueIsNullRows.getRows());
Assertions.assertEquals(2, jedis.hlen(key));
jedis.del(key);
}

@Test
public void testFakeToRedisHashTypeValueIsNullTest() throws IOException {
String key = "hash_check";
Map<String, Object> otherParams = new HashMap<>();
otherParams.put("hash_key_field", "id");
otherParams.put("hash_value_field", "val_string");
SinkFlowTestUtils.runBatchWithCheckpointDisabled(
getCatalogTable(0, key),
getDefaultReadonlyConfig(RedisDataType.HASH, key, otherParams),
new RedisSinkFactory(),
TestKeyOrValueIsNullRows.getRows());
Assertions.assertEquals(2, jedis.hlen(key));
jedis.del(key);
}

private ReadonlyConfig getDefaultReadonlyConfig(
RedisDataType dataType, String key, Map<String, Object> otherParams) {
Map<String, Object> map = new HashMap<>(otherParams);
Expand All @@ -245,117 +281,6 @@ private ReadonlyConfig getDefaultReadonlyConfig(
return ReadonlyConfig.fromMap(map);
}

private SeaTunnelRow getSeaTunnelRowInsert1() {
return new SeaTunnelRow(
new Object[] {
1,
true,
(byte) 1,
(short) 2,
3,
4L,
4.3f,
5.3d,
BigDecimal.valueOf(6.3).setScale(1),
"NEW",
LocalDateTime.parse("2020-02-02T02:02:02")
});
}

private SeaTunnelRow getSeaTunnelRowInsert2() {
return new SeaTunnelRow(
new Object[] {
2,
true,
(byte) 1,
(short) 2,
3,
4L,
4.3f,
5.3d,
BigDecimal.valueOf(6.3).setScale(1),
"NEW",
LocalDateTime.parse("2020-02-02T02:02:02")
});
}

private SeaTunnelRow getSeaTunnelRowInsert3() {
return new SeaTunnelRow(
new Object[] {
3,
true,
(byte) 1,
(short) 2,
3,
4L,
4.3f,
5.3d,
BigDecimal.valueOf(6.3).setScale(1),
"NEW",
LocalDateTime.parse("2020-02-02T02:02:02")
});
}

private SeaTunnelRow getSeaTunnelRowUpdateBefore() {
final SeaTunnelRow seaTunnelRow =
new SeaTunnelRow(
new Object[] {
1,
true,
(byte) 1,
(short) 2,
3,
4L,
4.3f,
5.3d,
BigDecimal.valueOf(6.3).setScale(1),
"NEW",
LocalDateTime.parse("2020-02-02T02:02:02")
});
seaTunnelRow.setRowKind(RowKind.UPDATE_BEFORE);
return seaTunnelRow;
}

private SeaTunnelRow getSeaTunnelRowUpdateAfter() {
final SeaTunnelRow seaTunnelRow =
new SeaTunnelRow(
new Object[] {
1,
true,
(byte) 2,
(short) 2,
3,
4L,
4.3f,
5.3d,
BigDecimal.valueOf(6.3).setScale(1),
"NEW",
LocalDateTime.parse("2020-02-02T02:02:02")
});
seaTunnelRow.setRowKind(RowKind.UPDATE_AFTER);
return seaTunnelRow;
}

private SeaTunnelRow getSeaTunnelRowDelete() {
final SeaTunnelRow seaTunnelRow =
new SeaTunnelRow(
new Object[] {
2,
true,
(byte) 1,
(short) 2,
3,
4L,
4.3f,
5.3d,
BigDecimal.valueOf(6.3).setScale(1),
"NEW",
LocalDateTime.parse("2020-02-02T02:02:02")
});
seaTunnelRow.setRowKind(RowKind.DELETE);
return seaTunnelRow;
}

private CatalogTable getCatalogTable(Integer dbNum, String key) {
return CatalogTable.of(
TableIdentifier.of(CONNECTOR_IDENTITY, dbNum.toString(), key),
Expand Down
Loading