Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
cee27ea
feat: support CDC in databend sink connector
hantmac Jul 23, 2025
648143c
feat(Databend): support CDC mode for databend sink connector
hantmac Aug 4, 2025
5bde12c
fix merge into
hantmac Aug 5, 2025
6ad3c2c
Update seatunnel-connectors-v2/connector-databend/src/main/java/org/a…
hantmac Aug 5, 2025
b78e622
fix config key
hantmac Aug 5, 2025
e31773e
use AggregatedCommitter to do merge into
hantmac Aug 6, 2025
1f38aee
fix
hantmac Aug 7, 2025
51a379f
fix
hantmac Aug 7, 2025
450ff40
fix tests
hantmac Aug 10, 2025
b5ab0ec
Merge remote-tracking branch 'seaup/dev' into feat/databend-sink-cdc
hantmac Aug 10, 2025
d4fffce
check perform merge
hantmac Aug 14, 2025
45983e9
add cdc docs
hantmac Aug 14, 2025
be9c701
add zh docs
hantmac Aug 14, 2025
f8e64b3
fix
hantmac Aug 14, 2025
778954f
fix update_before event
hantmac Aug 21, 2025
014243b
fix some comments
hantmac Aug 21, 2025
dfdb225
remove should merge condition
hantmac Aug 26, 2025
880e138
Merge remote-tracking branch 'seaup/dev' into feat/databend-sink-cdc
hantmac Aug 26, 2025
1fb5cb8
fix
hantmac Aug 26, 2025
d54cef1
enable_delete
hantmac Aug 26, 2025
4fcc845
remove useless
hantmac Aug 27, 2025
ddcca59
fix insert event
hantmac Aug 29, 2025
dd052e7
revert username
hantmac Aug 30, 2025
18e2648
fix
hantmac Sep 1, 2025
f4a8422
Merge remote-tracking branch 'seaup/dev' into feat/databend-sink-cdc
hantmac Sep 1, 2025
31e778e
fix ut
hantmac Sep 3, 2025
8ecb530
fix raw table
hantmac Sep 3, 2025
dffab14
fix import
hantmac Sep 4, 2025
2e1971f
fix
hantmac Sep 4, 2025
96e2ebe
Merge remote-tracking branch 'seaup/dev' into feat/databend-sink-cdc
hantmac Sep 9, 2025
daca685
Merge remote-tracking branch 'seaup/dev' into feat/databend-sink-cdc
hantmac Sep 11, 2025
d01ebcc
fix
hantmac Sep 17, 2025
162a2e7
Merge remote-tracking branch 'seaup/dev' into feat/databend-sink-cdc
hantmac Sep 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions docs/en/connector-v2/sink/Databend.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import ChangeLog from '../changelog/connector-databend.md';

## Key Features

- [ ] [Exactly-Once](../../concept/connector-v2-features.md)
- [ ] [Support Multi-table Writing](../../concept/connector-v2-features.md)
- [ ] [CDC](../../concept/connector-v2-features.md)
- [x] [Exactly-Once](../../concept/connector-v2-features.md)
- [x] [CDC](../../concept/connector-v2-features.md)
- [x] [Parallelism](../../concept/connector-v2-features.md)

## Description
Expand All @@ -37,7 +37,7 @@ The Databend sink internally implements bulk data import through stage attachmen
| Name | Type | Required | Default Value | Description |
|------|------|----------|---------------|---------------------------------------------|
| url | String | Yes | - | Databend JDBC connection URL |
| username | String | Yes | - | Databend database username |
| user | String | Yes | - | Databend database username |
| password | String | Yes | - | Databend database password |
| database | String | No | - | Databend database name, defaults to the database name specified in the connection URL |
| table | String | No | - | Databend table name |
Expand All @@ -49,6 +49,8 @@ The Databend sink internally implements bulk data import through stage attachmen
| custom_sql | String | No | - | Custom write SQL, typically used for complex write scenarios |
| execute_timeout_sec | Integer | No | 300 | SQL execution timeout (seconds) |
| jdbc_config | Map | No | - | Additional JDBC connection configuration, such as connection timeout parameters |
| conflict_key | String | No | - | Conflict key for CDC mode, used to determine the primary key for conflict resolution |
| allow_delete | Boolean | No | false | Whether to allow delete operations in CDC mode |

### schema_save_mode[Enum]

Expand Down Expand Up @@ -112,7 +114,7 @@ source {
sink {
Databend {
url = "jdbc:databend://localhost:8000"
username = "root"
user = "root"
password = ""
database = "default"
table = "target_table"
Expand All @@ -127,7 +129,7 @@ sink {
sink {
Databend {
url = "jdbc:databend://localhost:8000"
username = "root"
user = "root"
password = ""
database = "default"
table = "target_table"
Expand All @@ -142,7 +144,7 @@ sink {
sink {
Databend {
url = "jdbc:databend://localhost:8000"
username = "root"
user = "root"
password = ""
database = "default"
table = "target_table"
Expand All @@ -152,6 +154,26 @@ sink {
}
```

### CDC mode

```hocon
sink {
Databend {
url = "jdbc:databend://databend:8000/default?ssl=false"
user = "root"
password = ""
database = "default"
table = "sink_table"

# Enable CDC mode
batch_size = 1
interval = 3
conflict_key = "id"
allow_delete = true
}
}
```

## Related Links

- [Databend Official Website](https://databend.rs/)
Expand Down
26 changes: 24 additions & 2 deletions docs/zh/connector-v2/sink/Databend.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import ChangeLog from '../changelog/connector-databend.md';

## 主要特性

- [ ] [精确一次](../../concept/connector-v2-features.md)
- [ ] [支持多表写入](../../concept/connector-v2-features.md)
- [ ] [cdc](../../concept/connector-v2-features.md)
- [x] [精确一次](../../concept/connector-v2-features.md)
- [x] [cdc](../../concept/connector-v2-features.md)
- [x] [并行度](../../concept/connector-v2-features.md)

## 描述
Expand Down Expand Up @@ -49,6 +49,8 @@ Databend sink 内部通过 stage attachment 实现数据的批量导入。
| custom_sql | String | 否 | - | 自定义写入 SQL,通常用于复杂的写入场景 |
| execute_timeout_sec | Integer | 否 | 300 | 执行SQL的超时时间(秒) |
| jdbc_config | Map | 否 | - | 额外的 JDBC 连接配置,如连接超时参数等 |
| conflict_key | String | 否 | - | cdc 模式下的冲突键,用于确定冲突解决的主键 |
| allow_delete | Boolean | 否 | false | cdc 模式下是否允许删除操作 |

### schema_save_mode[Enum]

Expand Down Expand Up @@ -152,6 +154,26 @@ sink {
}
```

### CDC mode

```hocon
sink {
Databend {
url = "jdbc:databend://databend:8000/default?ssl=false"
user = "root"
password = ""
database = "default"
table = "sink_table"

# Enable CDC mode
batch_size = 1
interval = 3
conflict_key = "id"
allow_delete = true
}
}
```

## 相关链接

- [Databend 官方网站](https://databend.rs/)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class DatabendOptions {
.withDescription("Whether to use SSL for the Databend connection");

public static final Option<String> USERNAME =
Options.key("username")
Options.key("user")
.stringType()
.noDefaultValue()
.withDescription("The username for Databend database authentication");
Expand Down Expand Up @@ -97,4 +97,10 @@ public class DatabendOptions {
.booleanType()
.defaultValue(true)
.withDescription("Whether to auto commit for sink");

public static final Option<String> CONFLICT_KEY =
Options.key("conflict_key")
.stringType()
.noDefaultValue()
.withDescription("The conflict key for sink, used in upsert mode");
}
Loading