diff --git a/docs/en/connector-v2/changelog/connector-fluss.md b/docs/en/connector-v2/changelog/connector-fluss.md new file mode 100644 index 00000000000..97ff1428167 --- /dev/null +++ b/docs/en/connector-v2/changelog/connector-fluss.md @@ -0,0 +1,6 @@ +
Change Log + +| Change | Commit | Version | +|--------|--------|---------| + +
diff --git a/docs/en/connector-v2/sink/Fluss.md b/docs/en/connector-v2/sink/Fluss.md new file mode 100644 index 00000000000..06f70d0a272 --- /dev/null +++ b/docs/en/connector-v2/sink/Fluss.md @@ -0,0 +1,352 @@ +import ChangeLog from '../changelog/connector-fluss.md'; + +# Fluss + +> Fluss sink connector + +## Support These Engines + +> Spark
+> Flink
+> SeaTunnel Zeta
+ +## Key Features + +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [x] [cdc](../../concept/connector-v2-features.md) +- [x] [support multiple table write](../../concept/connector-v2-features.md) + +## Description + +Used to send data to Fluss. Both support streaming and batch mode. + +## Using Dependency + + com.alibaba.fluss + fluss-client + 0.7.0 + + +## Sink Options + +| Name | Type | Required | Default | Description | +|-------------------|--------|----------|---------|-------------------------------------------------------------------------------------------------------------| +| bootstrap.servers | string | yes | - | The bootstrap servers for the Fluss sink connection. | +| database | string | no | - | The name of Fluss database, If not set, the table name will be the name of the upstream db | +| table | string | no | - | The name of Fluss table, If not set, the table name will be the name of the upstream table | +| client.config | Map | no | - | set other client config. Please refer to https://fluss.apache.org/docs/engine-flink/options/#other-options | + + +### database [string] + +The name of Fluss database, If not set, the table name will be the name of the upstream db + +for example: + +1. test_${schema_name}_test +2. sink_sinkdb +3. ss_${database_name} + + +### table [string] + +The name of Fluss table, If not set, the table name will be the name of the upstream table + +for example: +1. test_${table_name}_test +2. sink_sinktable +3. ss_${table_name} + + +## Data Type Mapping + +| StarRocks Data type | Fluss Data type | +|---------------------|-----------------| +| BOOLEAN | BOOLEAN | +| TINYINT | TINYINT | +| SMALLINT | SMALLINT | +| INT | INT | +| BIGINT | BIGINT | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| DOUBLE | DOUBLE | +| BYTES | BYTES | +| DATE | DATE | +| TIME | TIME | +| TIMESTAMP | TIMESTAMP | +| TIMESTAMP_TZ | TIMESTAMP_TZ | +| STRING | STRING | + +## Task Example + +### Simple + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + parallelism = 1 + tables_configs = [ + { + row.num = 7 + schema { + table = "test.table1" + fields { + fbytes = bytes + fboolean = boolean + fint = int + ftinyint = tinyint + fsmallint = smallint + fbigint = bigint + ffloat = float + fdouble = double + fdecimal = "decimal(30, 8)" + fstring = string + fdate = date + ftime = time + ftimestamp = timestamp + ftimestamp_ltz = timestamp_tz + } + } + rows = [ + { + kind = INSERT + fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 7408919466156976747, 9.434991E37, 3.140411637757371E307, 4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", "2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", "2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = DELETE + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_BEFORE + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_AFTER + fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", "2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"] + } + ] + } + ] +} +} + +transform { +} + +sink { + Fluss { + bootstrap.servers="fluss_coordinator_e2e:9123" + database = "fluss_db_${database_name}" + table = "fluss_tb_${table_name}" + } +} +``` + +### Multiple table + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + parallelism = 1 + tables_configs = [ + { + row.num = 7 + schema { + table = "test2.table1" + fields { + fbytes = bytes + fboolean = boolean + fint = int + ftinyint = tinyint + fsmallint = smallint + fbigint = bigint + ffloat = float + fdouble = double + fdecimal = "decimal(30, 8)" + fstring = string + fdate = date + ftime = time + ftimestamp = timestamp + ftimestamp_ltz = timestamp_tz + } + } + rows = [ + { + kind = INSERT + fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 7408919466156976747, 9.434991E37, 3.140411637757371E307, 4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", "2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", "2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = DELETE + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_BEFORE + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_AFTER + fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", "2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"] + } + ] + }, + { + row.num = 7 + schema { + table = "test2.table2" + fields { + fbytes = bytes + fboolean = boolean + fint = int + ftinyint = tinyint + fsmallint = smallint + fbigint = bigint + ffloat = float + fdouble = double + fdecimal = "decimal(30, 8)" + fstring = string + fdate = date + ftime = time + ftimestamp = timestamp + ftimestamp_ltz = timestamp_tz + } + } + rows = [ + { + kind = INSERT + fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 7408919466156976747, 9.434991E37, 3.140411637757371E307, 4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", "2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", "2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = DELETE + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_BEFORE + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_AFTER + fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", "2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"] + } + ] + }, + { + row.num = 7 + schema { + table = "test3.table3" + fields { + fbytes = bytes + fboolean = boolean + fint = int + ftinyint = tinyint + fsmallint = smallint + fbigint = bigint + ffloat = float + fdouble = double + fdecimal = "decimal(30, 8)" + fstring = string + fdate = date + ftime = time + ftimestamp = timestamp + ftimestamp_ltz = timestamp_tz + } + } + rows = [ + { + kind = INSERT + fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 7408919466156976747, 9.434991E37, 3.140411637757371E307, 4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", "2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", "2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = DELETE + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_BEFORE + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_AFTER + fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", "2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"] + } + ] + } + ] +} +} + +transform { +} + +sink { + Fluss { + bootstrap.servers="fluss_coordinator_e2e:9123" + database = "fluss_db_${database_name}" + table = "fluss_tb_${table_name}" + } +} +``` + + +## Changelog + + + diff --git a/docs/zh/connector-v2/changelog/connector-fluss.md b/docs/zh/connector-v2/changelog/connector-fluss.md new file mode 100644 index 00000000000..97ff1428167 --- /dev/null +++ b/docs/zh/connector-v2/changelog/connector-fluss.md @@ -0,0 +1,6 @@ +
Change Log + +| Change | Commit | Version | +|--------|--------|---------| + +
diff --git a/docs/zh/connector-v2/sink/Fluss.md b/docs/zh/connector-v2/sink/Fluss.md new file mode 100644 index 00000000000..bc32f8db8f9 --- /dev/null +++ b/docs/zh/connector-v2/sink/Fluss.md @@ -0,0 +1,351 @@ +import ChangeLog from '../changelog/connector-fluss.md'; + +# Fluss + +> Fluss 数据接收器 + +## 引擎支持 + +> Spark
+> Flink
+> SeaTunnel Zeta
+ +## 主要特性 + +- [ ] [精准一次](../../concept/connector-v2-features.md) +- [x] [cdc](../../concept/connector-v2-features.md) +- [x] [支持多表写入](../../concept/connector-v2-features.md) + +## 描述 + +该接收器用于将数据写入到Fluss中。支持批和流两种模式。 + +## 依赖 + + com.alibaba.fluss + fluss-client + 0.7.0 + + + +## 接收器选项 + +| 名称 | 类型 | 是否必须 | 默认值 | Description | +|-------------------|--------|------|-----|----------------------------------------------------------------------------------| +| bootstrap.servers | string | yes | - | fluss 集群地址 | +| database | string | no | - | 指定目标 Fluss 表所在的数据库的名称, 如果没有设置该值,则表名与上游库名相同 | +| table | string | no | - | 指定目标 Fluss 表的名称, 如果没有设置该值,则表名与上游表名相同 | +| client.config | Map | no | - | 设置其他客户端配置. 参考 https://fluss.apache.org/docs/engine-flink/options/#other-options | + + +### database [string] + +database选项参数可以填入一任意库名,这个名字最终会被用作目标表的库名,并且支持变量(`${database_name}`,`${schema_name}`)。 +替换规则如下:`${schema_name}` 将替换传递给目标端的 SCHEMA 名称,`${database_name}` 将替换传递给目标端的库名。 + +例如: +1. test_${schema_name}_test +2. sink_sinkdb +3. ss_${database_name} + + +### table [string] + +table选项参数可以填入一任意表名,这个名字最终会被用作目标表的表名,并且支持变量(`${table_name}`,`${schema_name}`)。 +替换规则如下:`${schema_name}` 将替换传递给目标端的 SCHEMA 名称,`${table_name}` 将替换传递给目标端的表名。 + +例如: +1. test_${schema_name}_test +2. sink_sinktable +3. ss_${table_name} + +## 数据类型映射 + +| FLuss数据类型 | SeaTunnel数据类型 | +|--------------|---------------| +| BOOLEAN | BOOLEAN | +| TINYINT | TINYINT | +| SMALLINT | SMALLINT | +| INT | INT | +| BIGINT | BIGINT | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| DOUBLE | DOUBLE | +| BYTES | BYTES | +| DATE | DATE | +| TIME | TIME | +| TIMESTAMP | TIMESTAMP | +| TIMESTAMP_TZ | TIMESTAMP_TZ | +| STRING | STRING | + + +## 任务示例 + +### 简单示例 + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + parallelism = 1 + tables_configs = [ + { + row.num = 7 + schema { + table = "test.table1" + fields { + fbytes = bytes + fboolean = boolean + fint = int + ftinyint = tinyint + fsmallint = smallint + fbigint = bigint + ffloat = float + fdouble = double + fdecimal = "decimal(30, 8)" + fstring = string + fdate = date + ftime = time + ftimestamp = timestamp + ftimestamp_ltz = timestamp_tz + } + } + rows = [ + { + kind = INSERT + fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 7408919466156976747, 9.434991E37, 3.140411637757371E307, 4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", "2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", "2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = DELETE + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_BEFORE + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_AFTER + fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", "2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"] + } + ] + } + ] +} +} + +transform { +} + +sink { + Fluss { + bootstrap.servers="fluss_coordinator_e2e:9123" + database = "fluss_db_${database_name}" + table = "fluss_tb_${table_name}" + } +} +``` +### 多表写入 + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + parallelism = 1 + tables_configs = [ + { + row.num = 7 + schema { + table = "test2.table1" + fields { + fbytes = bytes + fboolean = boolean + fint = int + ftinyint = tinyint + fsmallint = smallint + fbigint = bigint + ffloat = float + fdouble = double + fdecimal = "decimal(30, 8)" + fstring = string + fdate = date + ftime = time + ftimestamp = timestamp + ftimestamp_ltz = timestamp_tz + } + } + rows = [ + { + kind = INSERT + fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 7408919466156976747, 9.434991E37, 3.140411637757371E307, 4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", "2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", "2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = DELETE + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_BEFORE + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_AFTER + fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", "2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"] + } + ] + }, + { + row.num = 7 + schema { + table = "test2.table2" + fields { + fbytes = bytes + fboolean = boolean + fint = int + ftinyint = tinyint + fsmallint = smallint + fbigint = bigint + ffloat = float + fdouble = double + fdecimal = "decimal(30, 8)" + fstring = string + fdate = date + ftime = time + ftimestamp = timestamp + ftimestamp_ltz = timestamp_tz + } + } + rows = [ + { + kind = INSERT + fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 7408919466156976747, 9.434991E37, 3.140411637757371E307, 4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", "2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", "2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = DELETE + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_BEFORE + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_AFTER + fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", "2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"] + } + ] + }, + { + row.num = 7 + schema { + table = "test3.table3" + fields { + fbytes = bytes + fboolean = boolean + fint = int + ftinyint = tinyint + fsmallint = smallint + fbigint = bigint + ffloat = float + fdouble = double + fdecimal = "decimal(30, 8)" + fstring = string + fdate = date + ftime = time + ftimestamp = timestamp + ftimestamp_ltz = timestamp_tz + } + } + rows = [ + { + kind = INSERT + fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 7408919466156976747, 9.434991E37, 3.140411637757371E307, 4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", "2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", "2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = DELETE + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_BEFORE + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_AFTER + fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", "2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"] + } + ] + } + ] +} +} + +transform { +} + +sink { + Fluss { + bootstrap.servers="fluss_coordinator_e2e:9123" + database = "fluss_db_${database_name}" + table = "fluss_tb_${table_name}" + } +} +``` + +## 变更日志 + + diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 4172161f5e4..74b84a47582 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -150,6 +150,7 @@ seatunnel.sink.GraphQL = connector-graphql seatunnel.sink.Aerospike = connector-aerospike seatunnel.sink.SensorsData = connector-sensorsdata seatunnel.sink.HugeGraph = connector-hugegraph +seatunnel.sink.Fluss = connector-fluss # For custom transforms, make sure to use the seatunnel.transform.[PluginIdentifier]=[JarPerfix] naming convention. For example: # seatunnel.transform.Sql = seatunnel-transforms-v2 diff --git a/seatunnel-connectors-v2/connector-fluss/pom.xml b/seatunnel-connectors-v2/connector-fluss/pom.xml new file mode 100644 index 00000000000..8db032b6f90 --- /dev/null +++ b/seatunnel-connectors-v2/connector-fluss/pom.xml @@ -0,0 +1,51 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-connectors-v2 + ${revision} + + + connector-fluss + SeaTunnel : Connectors V2 : Fluss + + + 0.7.0 + connector.fluss + + + + + org.apache.seatunnel + seatunnel-api + ${project.version} + + + org.apache.seatunnel + connector-common + ${project.version} + + + com.alibaba.fluss + fluss-client + ${fluss.client.version} + + + + diff --git a/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/config/FlussBaseOptions.java b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/config/FlussBaseOptions.java new file mode 100644 index 00000000000..e582c760ec3 --- /dev/null +++ b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/config/FlussBaseOptions.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.connectors.seatunnel.fluss.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +import java.io.Serializable; +import java.util.Map; + +public class FlussBaseOptions implements Serializable { + public static final String CONNECTOR_IDENTITY = "Fluss"; + public static final Option BOOTSTRAP_SERVERS = + Options.key("bootstrap.servers") + .stringType() + .noDefaultValue() + .withDescription("Fluss cluster address"); + public static final Option DATABASE = + Options.key("database") + .stringType() + .noDefaultValue() + .withDescription("The name of Fluss database"); + + public static final Option TABLE = + Options.key("table") + .stringType() + .noDefaultValue() + .withDescription("The name of Fluss table"); + + public static final Option> CLIENT_CONFIG = + Options.key("client.config") + .mapType() + .noDefaultValue() + .withDescription("The parameter of Fluss client add to Connection "); +} diff --git a/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/config/FlussSinkOptions.java b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/config/FlussSinkOptions.java new file mode 100644 index 00000000000..293770eba96 --- /dev/null +++ b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/config/FlussSinkOptions.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.fluss.config; + +public class FlussSinkOptions extends FlussBaseOptions {} diff --git a/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSink.java b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSink.java new file mode 100644 index 00000000000..771d414c700 --- /dev/null +++ b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSink.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.fluss.sink; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSink; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.fluss.config.FlussSinkOptions; + +import lombok.extern.slf4j.Slf4j; + +import java.util.Optional; + +@Slf4j +public class FlussSink extends AbstractSimpleSink + implements SupportMultiTableSink { + + private final ReadonlyConfig pluginConfig; + private final CatalogTable catalogTable; + + public FlussSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) { + this.pluginConfig = pluginConfig; + this.catalogTable = catalogTable; + } + + @Override + public FlussSinkWriter createWriter(SinkWriter.Context context) { + return new FlussSinkWriter(context, catalogTable, pluginConfig); + } + + @Override + public Optional getWriteCatalogTable() { + return Optional.of(catalogTable); + } + + @Override + public String getPluginName() { + return FlussSinkOptions.CONNECTOR_IDENTITY; + } +} diff --git a/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSinkFactory.java b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSinkFactory.java new file mode 100644 index 00000000000..13ee1424685 --- /dev/null +++ b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSinkFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.fluss.sink; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.connector.TableSink; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.connectors.seatunnel.fluss.config.FlussSinkOptions; + +import com.google.auto.service.AutoService; + +import static org.apache.seatunnel.api.options.SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA; + +@AutoService(Factory.class) +public class FlussSinkFactory implements TableSinkFactory { + @Override + public String factoryIdentifier() { + return FlussSinkOptions.CONNECTOR_IDENTITY; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(FlussSinkOptions.BOOTSTRAP_SERVERS) + .optional(FlussSinkOptions.DATABASE) + .optional(FlussSinkOptions.TABLE) + .optional(FlussSinkOptions.CLIENT_CONFIG) + .optional(MULTI_TABLE_SINK_REPLICA) + .build(); + } + + @Override + public TableSink createSink(TableSinkFactoryContext context) { + return () -> new FlussSink(context.getOptions(), context.getCatalogTable()); + } +} diff --git a/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSinkWriter.java b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSinkWriter.java new file mode 100644 index 00000000000..91881d30e5a --- /dev/null +++ b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSinkWriter.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.fluss.sink; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonError; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.fluss.config.FlussSinkOptions; + +import com.alibaba.fluss.client.Connection; +import com.alibaba.fluss.client.ConnectionFactory; +import com.alibaba.fluss.client.table.Table; +import com.alibaba.fluss.client.table.writer.AppendWriter; +import com.alibaba.fluss.client.table.writer.TableWriter; +import com.alibaba.fluss.client.table.writer.UpsertWriter; +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.GenericRow; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.math.BigDecimal; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.util.Map; +import java.util.Optional; + +@Slf4j +public class FlussSinkWriter extends AbstractSinkWriter + implements SupportMultiTableSinkWriter { + + private Connection connection; + private TableWriter writer; + private Table table; + private String dbName; + private String tableName; + private final SeaTunnelRowType seaTunnelRowType; + + public FlussSinkWriter( + SinkWriter.Context context, CatalogTable catalogTable, ReadonlyConfig pluginConfig) { + seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType(); + Configuration flussConfig = new Configuration(); + flussConfig.setString( + FlussSinkOptions.BOOTSTRAP_SERVERS.key(), + pluginConfig.get(FlussSinkOptions.BOOTSTRAP_SERVERS)); + Optional> clientConfig = + pluginConfig.getOptional(FlussSinkOptions.CLIENT_CONFIG); + if (clientConfig.isPresent()) { + clientConfig + .get() + .forEach( + (k, v) -> { + flussConfig.setString(k, v); + }); + } + log.info("Connect to Fluss with config: {}", flussConfig); + connection = ConnectionFactory.createConnection(flussConfig); + log.info("Connect to Fluss success"); + dbName = + pluginConfig + .getOptional(FlussSinkOptions.DATABASE) + .orElseGet(() -> catalogTable.getTableId().getDatabaseName()); + tableName = + pluginConfig + .getOptional(FlussSinkOptions.TABLE) + .orElseGet(() -> catalogTable.getTableId().getTableName()); + TablePath tablePath = TablePath.of(dbName, tableName); + table = connection.getTable(tablePath); + if (table.getTableInfo().hasPrimaryKey()) { + log.info("Table {} has primary key, use upsert writer", tableName); + writer = table.newUpsert().createWriter(); + } else { + log.info("Table {} has no primary key, use append writer", tableName); + writer = table.newAppend().createWriter(); + } + } + + @Override + public void write(SeaTunnelRow element) { + RowKind rowKind = element.getRowKind(); + GenericRow genericRow = new GenericRow(element.getFields().length); + for (int i = 0; i < element.getFields().length; i++) { + genericRow.setField( + i, + convert( + seaTunnelRowType.getFieldType(i), + seaTunnelRowType.getFieldName(i), + element.getField(i))); + } + + if (writer instanceof UpsertWriter) { + UpsertWriter upsertWriter = (UpsertWriter) writer; + switch (rowKind) { + case INSERT: + case UPDATE_AFTER: + upsertWriter.upsert(genericRow); + break; + case DELETE: + upsertWriter.delete(genericRow); + break; + case UPDATE_BEFORE: + return; + default: + throw CommonError.unsupportedRowKind( + FlussSinkOptions.CONNECTOR_IDENTITY, tableName, rowKind.shortString()); + } + } else if (writer instanceof AppendWriter) { + AppendWriter appendWriter = (AppendWriter) writer; + switch (rowKind) { + case INSERT: + case UPDATE_AFTER: + appendWriter.append(genericRow); + break; + case DELETE: + case UPDATE_BEFORE: + return; + default: + throw CommonError.unsupportedRowKind( + FlussSinkOptions.CONNECTOR_IDENTITY, tableName, rowKind.shortString()); + } + } else { + throw CommonError.unsupportedOperation( + FlussSinkOptions.CONNECTOR_IDENTITY, writer.getClass().getName()); + } + } + + @Override + public Optional prepareCommit(long checkpointId) throws IOException { + writer.flush(); + return super.prepareCommit(checkpointId); + } + + @Override + public void close() { + log.info("Close Fluss table."); + try { + if (table != null) { + table.close(); + } + } catch (Exception e) { + throw CommonError.closeFailed("Close Fluss table failed.", e); + } + + log.info("Close Fluss connection."); + try { + if (connection != null) { + connection.close(); + } + } catch (Exception e) { + throw CommonError.closeFailed("Close Fluss connection failed.", e); + } + } + + protected Object convert(SeaTunnelDataType dataType, String fieldName, Object val) { + if (val == null) { + return null; + } + switch (dataType.getSqlType()) { + case BOOLEAN: + case TINYINT: + case SMALLINT: + case INT: + case BIGINT: + case FLOAT: + case DOUBLE: + case BYTES: + return val; + case STRING: + return BinaryString.fromString((String) val); + case DECIMAL: + return Decimal.fromBigDecimal( + (BigDecimal) val, + ((DecimalType) dataType).getPrecision(), + ((DecimalType) dataType).getScale()); + case DATE: + return (int) ((LocalDate) val).toEpochDay(); + case TIME: + return (int) (((LocalTime) val).toNanoOfDay() / 1_000_000); + case TIMESTAMP: + return TimestampNtz.fromLocalDateTime((LocalDateTime) val); + case TIMESTAMP_TZ: + if (val instanceof Instant) { + return TimestampLtz.fromInstant((Instant) val); + } else if (val instanceof OffsetDateTime) { + return TimestampLtz.fromInstant(((OffsetDateTime) val).toInstant()); + } + throw CommonError.unsupportedDataType( + FlussSinkOptions.CONNECTOR_IDENTITY, + dataType.getSqlType().name(), + fieldName); + default: + throw CommonError.unsupportedDataType( + FlussSinkOptions.CONNECTOR_IDENTITY, + dataType.getSqlType().name(), + fieldName); + } + } +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index c587da3740d..37e0e668a6a 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -89,6 +89,7 @@ connector-graphql connector-aerospike connector-sensorsdata + connector-fluss diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index 8f79459a541..f4196c4918c 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -651,6 +651,13 @@ provided + + org.apache.seatunnel + connector-fluss + ${project.version} + provided + + com.aliyun.phoenix diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/pom.xml new file mode 100644 index 00000000000..4105b14d249 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/pom.xml @@ -0,0 +1,82 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-connector-v2-e2e + ${revision} + + + connector-fluss-e2e + SeaTunnel : E2E : Connector V2 : Fluss + + + + + org.apache.seatunnel + connector-fluss + ${project.version} + test + + + org.apache.seatunnel + connector-fake + ${project.version} + test + + + org.apache.seatunnel + connector-cdc-mysql + ${project.version} + test-jar + test + + + org.apache.seatunnel + connector-assert + ${project.version} + test + + + + org.testcontainers + mysql + ${testcontainer.version} + test + + + org.testcontainers + postgresql + ${testcontainer.version} + test + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fluss/FlussSinkIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fluss/FlussSinkIT.java new file mode 100644 index 00000000000..04230c362e5 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fluss/FlussSinkIT.java @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.fluss; + +import org.apache.seatunnel.shade.com.google.common.collect.Lists; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerLoggerFactory; + +import com.alibaba.fluss.client.Connection; +import com.alibaba.fluss.client.ConnectionFactory; +import com.alibaba.fluss.client.admin.Admin; +import com.alibaba.fluss.client.table.Table; +import com.alibaba.fluss.client.table.scanner.ScanRecord; +import com.alibaba.fluss.client.table.scanner.log.LogScanner; +import com.alibaba.fluss.client.table.scanner.log.ScanRecords; +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.metadata.DatabaseDescriptor; +import com.alibaba.fluss.metadata.Schema; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TableDescriptor; +import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.row.GenericRow; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.types.DataTypes; +import com.alibaba.fluss.utils.CloseableIterator; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.net.Socket; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import static org.awaitility.Awaitility.given; + +@Slf4j +public class FlussSinkIT extends TestSuiteBase implements TestResource { + private static final String DOCKER_IMAGE = "fluss/fluss:0.7.0"; + private static final String DOCKER_ZK_IMAGE = "zookeeper:3.9.2"; + + private static final String FLUSS_Coordinator_HOST = "fluss_coordinator_e2e"; + private static final String FLUSS_Tablet_HOST = "fluss_tablet_e2e"; + private static final String ZK_HOST = "zk_e2e"; + private static final int ZK_PORT = 2181; + private static final int FLUSS_Coordinator_PORT = 9123; + private static final int FLUSS_Tablet_PORT = 9124; + private static final int FLUSS_Coordinator_LOCAL_PORT = 8123; + private static final int FLUSS_Tablet_LOCAL_PORT = 8124; + + private GenericContainer zookeeperServer; + private GenericContainer coordinatorServer; + private GenericContainer tabletServer; + + private Connection flussConnection; + + private static final String DB_NAME = "fluss_db_test"; + private static final String DB_NAME_2 = "fluss_db_test2"; + private static final String DB_NAME_3 = "fluss_db_test3"; + private static final String TABLE_NAME = "fluss_tb_table1"; + private static final String TABLE_NAME_2 = "fluss_tb_table2"; + private static final String TABLE_NAME_3 = "fluss_tb_table3"; + + @BeforeAll + @Override + public void startUp() { + createZookeeperContainer(); + createFlussContainer(); + } + + private void createFlussContainer() { + log.info("Starting FlussServer container..."); + String coordinatorEnv = + String.format( + "zookeeper.address: %s:%d\n" + + "bind.listeners: INTERNAL://%s:%d, LOCALCLIENT://%s:%d \n" + + "advertised.listeners: INTERNAL://%s:%d, LOCALCLIENT://localhost:%d\n" + + "internal.listener.name: INTERNAL", + ZK_HOST, + ZK_PORT, + FLUSS_Coordinator_HOST, + FLUSS_Coordinator_PORT, + FLUSS_Coordinator_HOST, + FLUSS_Coordinator_LOCAL_PORT, + FLUSS_Coordinator_HOST, + FLUSS_Coordinator_PORT, + FLUSS_Coordinator_LOCAL_PORT); + coordinatorServer = + new GenericContainer<>(DOCKER_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(FLUSS_Coordinator_HOST) + .withEnv("FLUSS_PROPERTIES", coordinatorEnv) + .withCommand("coordinatorServer") + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger("coordinatorServer"))); + coordinatorServer.setPortBindings( + Lists.newArrayList( + String.format( + "%s:%s", + FLUSS_Coordinator_LOCAL_PORT, FLUSS_Coordinator_LOCAL_PORT))); + Startables.deepStart(Stream.of(coordinatorServer)).join(); + given().ignoreExceptions() + .await() + .atMost(120, TimeUnit.SECONDS) + .pollInterval(5, TimeUnit.SECONDS) + .until( + () -> + checkPort( + coordinatorServer.getHost(), + FLUSS_Coordinator_LOCAL_PORT, + 1000)); + log.info("coordinatorServer container start success"); + + String tabletEnv = + String.format( + "zookeeper.address: %s:%d\n" + + "bind.listeners: INTERNAL://%s:%d, LOCALCLIENT://%s:%d\n" + + "advertised.listeners: INTERNAL://%s:%d, LOCALCLIENT://localhost:%d\n" + + "internal.listener.name: INTERNAL\n" + + "tablet-server.id: 0\n" + + "kv.snapshot.interval: 0s\n" + + "data.dir: /tmp/fluss/data\n" + + "remote.data.dir: /tmp/fluss/remote-data", + ZK_HOST, + ZK_PORT, + FLUSS_Tablet_HOST, + FLUSS_Tablet_PORT, + FLUSS_Tablet_HOST, + FLUSS_Tablet_LOCAL_PORT, + FLUSS_Tablet_HOST, + FLUSS_Tablet_PORT, + FLUSS_Tablet_LOCAL_PORT); + tabletServer = + new GenericContainer<>(DOCKER_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(FLUSS_Tablet_HOST) + .withEnv("FLUSS_PROPERTIES", tabletEnv) + .withCommand("tabletServer") + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger("tabletServer"))); + tabletServer.setPortBindings( + Lists.newArrayList( + String.format("%s:%s", FLUSS_Tablet_LOCAL_PORT, FLUSS_Tablet_LOCAL_PORT))); + Startables.deepStart(Stream.of(tabletServer)).join(); + given().ignoreExceptions() + .await() + .atMost(120, TimeUnit.SECONDS) + .pollInterval(5, TimeUnit.SECONDS) + .untilAsserted(this::initializeConnection); + log.info("tabletServer container start success"); + log.info("FlussServer Containers are started"); + } + + private void createZookeeperContainer() { + log.info("Starting ZookeeperServer container..."); + zookeeperServer = + new GenericContainer<>(DOCKER_ZK_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(ZK_HOST) + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger(DOCKER_ZK_IMAGE))); + zookeeperServer.setPortBindings( + Lists.newArrayList(String.format("%s:%s", ZK_PORT, ZK_PORT))); + Startables.deepStart(Stream.of(zookeeperServer)).join(); + given().ignoreExceptions() + .await() + .atMost(60, TimeUnit.SECONDS) + .pollInterval(5, TimeUnit.SECONDS) + .until(() -> checkPort(zookeeperServer.getHost(), ZK_PORT, 1000)); + log.info("ZookeeperServer Containers are started"); + } + + private void initializeConnection() throws ExecutionException, InterruptedException { + Configuration flussConfig = new Configuration(); + flussConfig.setString( + "bootstrap.servers", + coordinatorServer.getHost() + ":" + FLUSS_Coordinator_LOCAL_PORT); + flussConnection = ConnectionFactory.createConnection(flussConfig); + createDb(flussConnection, DB_NAME); + } + + public void createDb(Connection connection, String dbName) + throws ExecutionException, InterruptedException { + Admin admin = connection.getAdmin(); + DatabaseDescriptor descriptor = DatabaseDescriptor.builder().build(); + admin.dropDatabase(dbName, true, true).get(); + admin.createDatabase(dbName, descriptor, true).get(); + } + + public Schema getFlussSchema() { + return Schema.newBuilder() + .column("fbytes", DataTypes.BYTES()) + .column("fboolean", DataTypes.BOOLEAN()) + .column("fint", DataTypes.INT()) + .column("ftinyint", DataTypes.TINYINT()) + .column("fsmallint", DataTypes.SMALLINT()) + .column("fbigint", DataTypes.BIGINT()) + .column("ffloat", DataTypes.FLOAT()) + .column("fdouble", DataTypes.DOUBLE()) + .column("fdecimal", DataTypes.DECIMAL(30, 8)) + .column("fstring", DataTypes.STRING()) + .column("fdate", DataTypes.DATE()) + .column("ftime", DataTypes.TIME()) + .column("ftimestamp", DataTypes.TIMESTAMP()) + .column("ftimestamp_ltz", DataTypes.TIMESTAMP_LTZ()) + .primaryKey("fstring") + .build(); + } + + public void createTable(Connection connection, String dbName, String tableName, Schema schema) + throws ExecutionException, InterruptedException { + Admin admin = connection.getAdmin(); + TableDescriptor tableDescriptor = TableDescriptor.builder().schema(schema).build(); + TablePath tablePath = TablePath.of(dbName, tableName); + admin.dropTable(tablePath, true).get(); + admin.createTable(tablePath, tableDescriptor, true).get(); // blocking call + } + + public static boolean checkPort(String host, int port, int timeoutMs) throws IOException { + try (Socket socket = new Socket()) { + socket.connect(new java.net.InetSocketAddress(host, port), timeoutMs); + return true; + } catch (Exception e) { + throw e; + } + } + + @AfterAll + @Override + public void tearDown() throws Exception { + if (tabletServer != null) { + tabletServer.close(); + } + if (coordinatorServer != null) { + coordinatorServer.close(); + } + if (zookeeperServer != null) { + zookeeperServer.close(); + } + } + + @TestTemplate + public void testFlussSink(TestContainer container) throws Exception { + log.info(" create fluss table"); + createDb(flussConnection, DB_NAME); + createTable(flussConnection, DB_NAME, TABLE_NAME, getFlussSchema()); + Container.ExecResult execFake2fluss = container.executeJob("/fake_to_fluss.conf"); + Assertions.assertEquals(0, execFake2fluss.getExitCode(), execFake2fluss.getStderr()); + checkFlussData(DB_NAME, TABLE_NAME); + } + + @TestTemplate + public void testFlussMultiTableSink(TestContainer container) throws Exception { + log.info(" create fluss tables"); + createDb(flussConnection, DB_NAME_2); + createDb(flussConnection, DB_NAME_3); + createTable(flussConnection, DB_NAME_2, TABLE_NAME, getFlussSchema()); + createTable(flussConnection, DB_NAME_2, TABLE_NAME_2, getFlussSchema()); + createTable(flussConnection, DB_NAME_3, TABLE_NAME_3, getFlussSchema()); + + Container.ExecResult execFake2fluss = + container.executeJob("/fake_to_multipletable_fluss.conf"); + Assertions.assertEquals(0, execFake2fluss.getExitCode(), execFake2fluss.getStderr()); + checkFlussData(DB_NAME_2, TABLE_NAME); + checkFlussData(DB_NAME_2, TABLE_NAME_2); + checkFlussData(DB_NAME_3, TABLE_NAME_3); + } + + public void checkFlussData(String dbName, String tableName) throws IOException { + // check log data + List streamData = + getFlussTableStreamData(flussConnection, dbName, tableName, 10); + checkFlussTableStreamData(streamData); + // check data + List data = getFlussTableData(flussConnection, dbName, tableName, 10); + checkFlussTableData(data); + } + + public void checkFlussTableData(List streamData) { + Assertions.assertEquals(3, streamData.size()); + List expectedResult = + Arrays.asList( + "([109, 105, 73, 90, 106],true,1940337748,73,17489,7408919466156976747,9.434991E37,3.140411637757371E307,4029933791018936000000.00000000,aaaaa,20091,9010000,2025-05-27T21:56:09,2025-09-27T18:54:08Z)", + "([109, 105, 73, 90, 106],true,90650390,37,22504,5851888708829345169,2.6221706E36,1.8915341983748786E307,3093109630614623000000.00000000,bbbbb,20089,76964000,2025-05-08T05:26:18,2025-08-04T08:49:45Z)", + "([109, 105, 73, 90, 106],true,388742243,89,15831,159071788675312856,7.310445E37,1.2166972324288247E308,7994947075691901000000.00000000,ddddd,20092,55687000,2025-07-18T08:59:49,2025-09-12T15:46:25Z)"); + ArrayList result = new ArrayList<>(); + for (GenericRow streamDatum : streamData) { + result.add(streamDatum.toString()); + } + Assertions.assertEquals(expectedResult, result); + } + + public void checkFlussTableStreamData(List streamData) { + Assertions.assertEquals(7, streamData.size()); + List expectedResult = + Arrays.asList( + "([109, 105, 73, 90, 106],true,1940337748,73,17489,7408919466156976747,9.434991E37,3.140411637757371E307,4029933791018936000000.00000000,aaaaa,20091,9010000,2025-05-27T21:56:09,2025-09-27T18:54:08Z)", + "([109, 105, 73, 90, 106],true,90650390,37,22504,5851888708829345169,2.6221706E36,1.8915341983748786E307,3093109630614623000000.00000000,bbbbb,20089,76964000,2025-05-08T05:26:18,2025-08-04T08:49:45Z)", + "([109, 105, 73, 90, 106],true,2146418323,79,19821,6393905306944584839,2.0462337E38,1.4868114385836557E308,5594947262031770000000.00000000,ccccc,20367,79840000,2025-03-25T01:49:14,2025-07-03T03:52:06Z)", + "([109, 105, 73, 90, 106],true,2146418323,79,19821,6393905306944584839,2.0462337E38,1.4868114385836557E308,5594947262031770000000.00000000,ccccc,20367,79840000,2025-03-25T01:49:14,2025-07-03T03:52:06Z)", + "([109, 105, 73, 90, 106],true,82794384,27,30339,5826566947079347516,2.2137477E37,1.7737681870839753E308,3984670873242882300000.00000000,ddddd,20344,37972000,2025-01-27T19:20:51,2025-11-06T18:38:54Z)", + "([109, 105, 73, 90, 106],true,82794384,27,30339,5826566947079347516,2.2137477E37,1.7737681870839753E308,3984670873242882300000.00000000,ddddd,20344,37972000,2025-01-27T19:20:51,2025-11-06T18:38:54Z)", + "([109, 105, 73, 90, 106],true,388742243,89,15831,159071788675312856,7.310445E37,1.2166972324288247E308,7994947075691901000000.00000000,ddddd,20092,55687000,2025-07-18T08:59:49,2025-09-12T15:46:25Z)"); + ArrayList result = new ArrayList<>(); + for (GenericRow streamDatum : streamData) { + result.add(streamDatum.toString()); + } + Assertions.assertEquals(expectedResult, result); + } + + public List getFlussTableStreamData( + Connection connection, String dbName, String tableName, int scanNum) { + TablePath tablePath = TablePath.of(dbName, tableName); + Table table = connection.getTable(tablePath); + LogScanner logScanner = table.newScan().createLogScanner(); + int numBuckets = table.getTableInfo().getNumBuckets(); + for (int i = 0; i < numBuckets; i++) { + logScanner.subscribeFromBeginning(i); + } + int scanned = 0; + List rows = new ArrayList<>(); + + while (true) { + if (scanned > scanNum) break; + log.info("Polling for stream records..."); + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); + for (TableBucket bucket : scanRecords.buckets()) { + for (ScanRecord record : scanRecords.records(bucket)) { + GenericRow row = (GenericRow) record.getRow(); + rows.add(row); + } + } + scanned++; + } + return rows; + } + + public List getFlussTableData( + Connection connection, String dbName, String tableName, int scanNum) + throws IOException { + TablePath tablePath = TablePath.of(dbName, tableName); + Table table = connection.getTable(tablePath); + LogScanner logScanner = table.newScan().createLogScanner(); + int numBuckets = table.getTableInfo().getNumBuckets(); + for (int i = 0; i < numBuckets; i++) { + logScanner.subscribeFromBeginning(i); + } + int scanned = 0; + List rows = new ArrayList<>(); + + while (true) { + if (scanned > scanNum) break; + log.info("Polling for records..."); + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); + for (TableBucket bucket : scanRecords.buckets()) { + CloseableIterator data = + table.newScan() + .limit(10) + .createBatchScanner(bucket) + .pollBatch(Duration.ofSeconds(5)); + while (data.hasNext()) { + rows.add((GenericRow) data.next()); + } + } + scanned++; + } + return rows; + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/resources/fake_to_fluss.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/resources/fake_to_fluss.conf new file mode 100644 index 00000000000..27ea0435bb6 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/resources/fake_to_fluss.conf @@ -0,0 +1,96 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + parallelism = 1 + tables_configs = [ + { + row.num = 7 + schema { + table = "test.table1" + fields { + fbytes = bytes + fboolean = boolean + fint = int + ftinyint = tinyint + fsmallint = smallint + fbigint = bigint + ffloat = float + fdouble = double + fdecimal = "decimal(30, 8)" + fstring = string + fdate = date + ftime = time + ftimestamp = timestamp + ftimestamp_ltz = timestamp_tz + } + } + rows = [ + { + kind = INSERT + fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 7408919466156976747, 9.434991E37, 3.140411637757371E307, 4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", "2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", "2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = DELETE + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_BEFORE + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_AFTER + fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", "2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"] + } + ] + } + ] +} +} + +transform { +} + +sink { + Fluss { + bootstrap.servers="fluss_coordinator_e2e:9123" + database = "fluss_db_${database_name}" + table = "fluss_tb_${table_name}" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/resources/fake_to_multipletable_fluss.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/resources/fake_to_multipletable_fluss.conf new file mode 100644 index 00000000000..b1b1307652f --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/resources/fake_to_multipletable_fluss.conf @@ -0,0 +1,200 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + parallelism = 1 + tables_configs = [ + { + row.num = 7 + schema { + table = "test2.table1" + fields { + fbytes = bytes + fboolean = boolean + fint = int + ftinyint = tinyint + fsmallint = smallint + fbigint = bigint + ffloat = float + fdouble = double + fdecimal = "decimal(30, 8)" + fstring = string + fdate = date + ftime = time + ftimestamp = timestamp + ftimestamp_ltz = timestamp_tz + } + } + rows = [ + { + kind = INSERT + fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 7408919466156976747, 9.434991E37, 3.140411637757371E307, 4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", "2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", "2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = DELETE + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_BEFORE + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_AFTER + fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", "2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"] + } + ] + }, + { + row.num = 7 + schema { + table = "test2.table2" + fields { + fbytes = bytes + fboolean = boolean + fint = int + ftinyint = tinyint + fsmallint = smallint + fbigint = bigint + ffloat = float + fdouble = double + fdecimal = "decimal(30, 8)" + fstring = string + fdate = date + ftime = time + ftimestamp = timestamp + ftimestamp_ltz = timestamp_tz + } + } + rows = [ + { + kind = INSERT + fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 7408919466156976747, 9.434991E37, 3.140411637757371E307, 4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", "2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", "2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = DELETE + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_BEFORE + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_AFTER + fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", "2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"] + } + ] + }, + { + row.num = 7 + schema { + table = "test3.table3" + fields { + fbytes = bytes + fboolean = boolean + fint = int + ftinyint = tinyint + fsmallint = smallint + fbigint = bigint + ffloat = float + fdouble = double + fdecimal = "decimal(30, 8)" + fstring = string + fdate = date + ftime = time + ftimestamp = timestamp + ftimestamp_ltz = timestamp_tz + } + } + rows = [ + { + kind = INSERT + fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 7408919466156976747, 9.434991E37, 3.140411637757371E307, 4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", "2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", "2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = DELETE + fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", "2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"] + } + { + kind = INSERT + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_BEFORE + fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", "2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"] + } + { + kind = UPDATE_AFTER + fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", "2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"] + } + ] + } + ] +} +} + +transform { +} + +sink { + Fluss { + bootstrap.servers="fluss_coordinator_e2e:9123" + database = "fluss_db_${database_name}" + table = "fluss_tb_${table_name}" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index ffcfc5f0a79..3d826457139 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -90,6 +90,7 @@ connector-aerospike-e2e connector-sensorsdata-e2e connector-hugegraph-e2e + connector-fluss-e2e diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java index 949392d1724..a452db64366 100644 --- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java +++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java @@ -41,6 +41,7 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.OffsetDateTime; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; import java.time.temporal.ChronoField; @@ -155,6 +156,13 @@ public Object convert(JsonNode jsonNode, String fieldName) { return convertToLocalDateTime(jsonNode, fieldName); } }; + case TIMESTAMP_TZ: + return new JsonToObjectConverter() { + @Override + public Object convert(JsonNode jsonNode, String fieldName) { + return convertToOffsetDateTime(jsonNode, fieldName); + } + }; case FLOAT: return new JsonToObjectConverter() { @Override @@ -284,6 +292,11 @@ private LocalDateTime convertToLocalDateTime(JsonNode jsonNode, String fieldName return LocalDateTime.of(localDate, localTime); } + private OffsetDateTime convertToOffsetDateTime(JsonNode jsonNode, String fieldName) { + String datetimeStr = jsonNode.asText(); + return OffsetDateTime.parse(datetimeStr); + } + private String convertToString(JsonNode jsonNode) { if (jsonNode.isContainerNode()) { return jsonNode.toString(); diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java index 2cf8ae092e7..13a30442d17 100644 --- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java +++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java @@ -37,6 +37,7 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.OffsetDateTime; import java.util.Arrays; import java.util.Map; import java.util.function.Function; @@ -44,6 +45,7 @@ import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE; import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE_TIME; +import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME; public class RowToJsonConverters implements Serializable { @@ -183,6 +185,14 @@ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { .textNode(ISO_LOCAL_DATE_TIME.format((LocalDateTime) value)); } }; + case TIMESTAMP_TZ: + return new RowToJsonConverter() { + @Override + public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { + return mapper.getNodeFactory() + .textNode(ISO_OFFSET_DATE_TIME.format((OffsetDateTime) value)); + } + }; case ARRAY: return createArrayConverter((ArrayType) type); case MAP: diff --git a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java index 8471756b8b5..13d8b5c820c 100644 --- a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java +++ b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java @@ -47,6 +47,7 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.OffsetDateTime; import java.time.format.DateTimeFormatter; import java.time.temporal.TemporalQueries; import java.util.HashMap; @@ -76,6 +77,7 @@ public void testSerDe() throws Exception { String name = "asdlkjasjkdla998y1122"; LocalDate date = LocalDate.parse("1990-10-14"); LocalTime time = LocalTime.parse("12:12:43"); + OffsetDateTime offsetDateTime = OffsetDateTime.parse("2025-09-12T23:46:25+08:00"); Timestamp timestamp3 = Timestamp.valueOf("1990-10-14 12:12:43.123"); Timestamp timestamp9 = Timestamp.valueOf("1990-10-14 12:12:43.123456789"); Map map = new HashMap<>(); @@ -100,6 +102,7 @@ public void testSerDe() throws Exception { root.put("name", name); root.put("date", "1990-10-14"); root.put("time", "12:12:43"); + root.put("timestamp_tz", "2025-09-12T23:46:25+08:00"); root.put("timestamp3", "1990-10-14T12:12:43.123"); root.put("timestamp9", "1990-10-14T12:12:43.123456789"); root.putObject("map").put("element", 123); @@ -121,6 +124,7 @@ public void testSerDe() throws Exception { "name", "date", "time", + "timestamp_tz", "timestamp3", "timestamp9", "map", @@ -136,6 +140,7 @@ public void testSerDe() throws Exception { STRING_TYPE, LocalTimeType.LOCAL_DATE_TYPE, LocalTimeType.LOCAL_TIME_TYPE, + LocalTimeType.OFFSET_DATE_TIME_TYPE, LocalTimeType.LOCAL_DATE_TIME_TYPE, LocalTimeType.LOCAL_DATE_TIME_TYPE, new MapType(STRING_TYPE, LONG_TYPE), @@ -150,6 +155,7 @@ public void testSerDe() throws Exception { "name", "date", "time", + "timestamp_tz", "timestamp3", "timestamp9", "map", @@ -164,6 +170,7 @@ public void testSerDe() throws Exception { STRING_TYPE, LocalTimeType.LOCAL_DATE_TIME_TYPE, LocalTimeType.LOCAL_TIME_TYPE, + LocalTimeType.OFFSET_DATE_TIME_TYPE, LocalTimeType.LOCAL_DATE_TIME_TYPE, LocalTimeType.LOCAL_DATE_TIME_TYPE, new MapType(STRING_TYPE, LONG_TYPE), @@ -175,7 +182,7 @@ public void testSerDe() throws Exception { JsonDeserializationSchema deserializationSchema = new JsonDeserializationSchema(catalogTables, false, false); - SeaTunnelRow expected = new SeaTunnelRow(13); + SeaTunnelRow expected = new SeaTunnelRow(14); expected.setField(0, true); expected.setField(1, intValue); expected.setField(2, longValue); @@ -183,13 +190,14 @@ public void testSerDe() throws Exception { expected.setField(4, name); expected.setField(5, date); expected.setField(6, time); - expected.setField(7, timestamp3.toLocalDateTime()); - expected.setField(8, timestamp9.toLocalDateTime()); - expected.setField(9, map); - expected.setField(10, multiSet); - expected.setField(11, nestedMap); - - SeaTunnelRow rowFieldRow = new SeaTunnelRow(12); + expected.setField(7, offsetDateTime); + expected.setField(8, timestamp3.toLocalDateTime()); + expected.setField(9, timestamp9.toLocalDateTime()); + expected.setField(10, map); + expected.setField(11, multiSet); + expected.setField(12, nestedMap); + + SeaTunnelRow rowFieldRow = new SeaTunnelRow(13); rowFieldRow.setField(0, true); rowFieldRow.setField(1, intValue); rowFieldRow.setField(2, longValue); @@ -197,13 +205,14 @@ public void testSerDe() throws Exception { rowFieldRow.setField(4, name); rowFieldRow.setField(5, timestamp3.toLocalDateTime()); rowFieldRow.setField(6, time); - rowFieldRow.setField(7, timestamp3.toLocalDateTime()); - rowFieldRow.setField(8, timestamp9.toLocalDateTime()); - rowFieldRow.setField(9, map); - rowFieldRow.setField(10, multiSet); - rowFieldRow.setField(11, nestedMap); + rowFieldRow.setField(7, offsetDateTime); + rowFieldRow.setField(8, timestamp3.toLocalDateTime()); + rowFieldRow.setField(9, timestamp9.toLocalDateTime()); + rowFieldRow.setField(10, map); + rowFieldRow.setField(11, multiSet); + rowFieldRow.setField(12, nestedMap); - expected.setField(12, rowFieldRow); + expected.setField(13, rowFieldRow); SeaTunnelRow seaTunnelRow = deserializationSchema.deserialize(serializedJson); assertEquals(expected, seaTunnelRow); @@ -678,6 +687,16 @@ public void testSerializationWithTimestamp() { assertEquals( "{\"timestamp\":\"2022-09-24T22:45:00.000123456\"}", new String(new JsonSerializationSchema(schema, "\\N").serialize(row))); + + schema = + new SeaTunnelRowType( + new String[] {"timestamp_tz"}, + new SeaTunnelDataType[] {LocalTimeType.OFFSET_DATE_TIME_TYPE}); + OffsetDateTime offsetDateTime = OffsetDateTime.parse("2025-09-12T23:46:25+08:00"); + row = new SeaTunnelRow(new Object[] {offsetDateTime}); + assertEquals( + "{\"timestamp_tz\":\"2025-09-12T23:46:25+08:00\"}", + new String(new JsonSerializationSchema(schema, "\\N").serialize(row))); } @Test