Skip to content
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
cee27ea
feat: support CDC in databend sink connector
hantmac Jul 23, 2025
648143c
feat(Databend): support CDC mode for databend sink connector
hantmac Aug 4, 2025
5bde12c
fix merge into
hantmac Aug 5, 2025
6ad3c2c
Update seatunnel-connectors-v2/connector-databend/src/main/java/org/a…
hantmac Aug 5, 2025
b78e622
fix config key
hantmac Aug 5, 2025
e31773e
use AggregatedCommitter to do merge into
hantmac Aug 6, 2025
1f38aee
fix
hantmac Aug 7, 2025
51a379f
fix
hantmac Aug 7, 2025
450ff40
fix tests
hantmac Aug 10, 2025
b5ab0ec
Merge remote-tracking branch 'seaup/dev' into feat/databend-sink-cdc
hantmac Aug 10, 2025
d4fffce
check perform merge
hantmac Aug 14, 2025
45983e9
add cdc docs
hantmac Aug 14, 2025
be9c701
add zh docs
hantmac Aug 14, 2025
f8e64b3
fix
hantmac Aug 14, 2025
778954f
fix update_before event
hantmac Aug 21, 2025
014243b
fix some comments
hantmac Aug 21, 2025
dfdb225
remove should merge condition
hantmac Aug 26, 2025
880e138
Merge remote-tracking branch 'seaup/dev' into feat/databend-sink-cdc
hantmac Aug 26, 2025
1fb5cb8
fix
hantmac Aug 26, 2025
d54cef1
enable_delete
hantmac Aug 26, 2025
4fcc845
remove useless
hantmac Aug 27, 2025
ddcca59
fix insert event
hantmac Aug 29, 2025
dd052e7
revert username
hantmac Aug 30, 2025
18e2648
fix
hantmac Sep 1, 2025
f4a8422
Merge remote-tracking branch 'seaup/dev' into feat/databend-sink-cdc
hantmac Sep 1, 2025
31e778e
fix ut
hantmac Sep 3, 2025
8ecb530
fix raw table
hantmac Sep 3, 2025
dffab14
fix import
hantmac Sep 4, 2025
2e1971f
fix
hantmac Sep 4, 2025
96e2ebe
Merge remote-tracking branch 'seaup/dev' into feat/databend-sink-cdc
hantmac Sep 9, 2025
daca685
Merge remote-tracking branch 'seaup/dev' into feat/databend-sink-cdc
hantmac Sep 11, 2025
d01ebcc
fix
hantmac Sep 17, 2025
162a2e7
Merge remote-tracking branch 'seaup/dev' into feat/databend-sink-cdc
hantmac Sep 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions docs/en/connector-v2/sink/Databend.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import ChangeLog from '../changelog/connector-databend.md';

## Key Features

- [ ] [Exactly-Once](../../concept/connector-v2-features.md)
- [ ] [Support Multi-table Writing](../../concept/connector-v2-features.md)
- [ ] [CDC](../../concept/connector-v2-features.md)
- [x] [Exactly-Once](../../concept/connector-v2-features.md)
- [x] [CDC](../../concept/connector-v2-features.md)
- [x] [Parallelism](../../concept/connector-v2-features.md)

## Description
Expand Down Expand Up @@ -49,6 +49,8 @@ The Databend sink internally implements bulk data import through stage attachmen
| custom_sql | String | No | - | Custom write SQL, typically used for complex write scenarios |
| execute_timeout_sec | Integer | No | 300 | SQL execution timeout (seconds) |
| jdbc_config | Map | No | - | Additional JDBC connection configuration, such as connection timeout parameters |
| conflict_key | String | No | - | Conflict key for CDC mode, used to determine the primary key for conflict resolution |
| allow_delete | Boolean | No | false | Whether to allow delete operations in CDC mode |

### schema_save_mode[Enum]

Expand Down Expand Up @@ -152,6 +154,25 @@ sink {
}
```

### CDC mode

```hocon
sink {
Databend {
url = "jdbc:databend://databend:8000/default?ssl=false"
username = "root"
password = ""
database = "default"
table = "sink_table"

# Enable CDC mode
batch_size = 1
conflict_key = "id"
allow_delete = true
}
}
```

## Related Links

- [Databend Official Website](https://databend.rs/)
Expand Down
26 changes: 24 additions & 2 deletions docs/zh/connector-v2/sink/Databend.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import ChangeLog from '../changelog/connector-databend.md';

## 主要特性

- [ ] [精确一次](../../concept/connector-v2-features.md)
- [ ] [支持多表写入](../../concept/connector-v2-features.md)
- [ ] [cdc](../../concept/connector-v2-features.md)
- [x] [精确一次](../../concept/connector-v2-features.md)
- [x] [cdc](../../concept/connector-v2-features.md)
- [x] [并行度](../../concept/connector-v2-features.md)

## 描述
Expand Down Expand Up @@ -49,6 +49,8 @@ Databend sink 内部通过 stage attachment 实现数据的批量导入。
| custom_sql | String | 否 | - | 自定义写入 SQL,通常用于复杂的写入场景 |
| execute_timeout_sec | Integer | 否 | 300 | 执行SQL的超时时间(秒) |
| jdbc_config | Map | 否 | - | 额外的 JDBC 连接配置,如连接超时参数等 |
| conflict_key | String | 否 | - | cdc 模式下的冲突键,用于确定冲突解决的主键 |
| allow_delete | Boolean | 否 | false | cdc 模式下是否允许删除操作 |

### schema_save_mode[Enum]

Expand Down Expand Up @@ -152,6 +154,26 @@ sink {
}
```

### CDC mode

```hocon
sink {
Databend {
url = "jdbc:databend://databend:8000/default?ssl=false"
user = "root"
password = ""
database = "default"
table = "sink_table"

# Enable CDC mode
batch_size = 1
interval = 3
conflict_key = "id"
allow_delete = true
}
}
```

## 相关链接

- [Databend 官方网站](https://databend.rs/)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,10 @@ public class DatabendOptions {
.booleanType()
.defaultValue(true)
.withDescription("Whether to auto commit for sink");

public static final Option<String> CONFLICT_KEY =
Options.key("conflict_key")
.stringType()
.noDefaultValue()
.withDescription("The conflict key for sink, used in upsert mode");
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,131 +18,154 @@
package org.apache.seatunnel.connectors.seatunnel.databend.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SchemaSaveMode;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import static org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.AUTO_COMMIT;
import static org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.BATCH_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.JDBC_CONFIG;
import static org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.MAX_RETRIES;
import static org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.SSL;
import static org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.TABLE;
import static org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.URL;
import static org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.USERNAME;
import static org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendSinkOptions.CUSTOM_SQL;
import static org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendSinkOptions.DATA_SAVE_MODE;
import static org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendSinkOptions.EXECUTE_TIMEOUT_SEC;
import static org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendSinkOptions.SCHEMA_SAVE_MODE;

@Setter
@Slf4j
@Getter
@ToString
public class DatabendSinkConfig implements Serializable {
private static final long serialVersionUID = 1L;

// common options
private String url;
private String username;
private String password;
private Boolean ssl;
private String database;
private String table;
private Boolean autoCommit;
private Integer batchSize;
private Integer maxRetries;
private Map<String, String> jdbcConfig;

// sink options
private Integer executeTimeoutSec;
private String customSql;
private SchemaSaveMode schemaSaveMode;
private DataSaveMode dataSaveMode;
private Properties properties;
private final String url;
private final String username;
private final String password;
private final String database;
private final String table;
private final boolean autoCommit;
private final int batchSize;
private final int executeTimeoutSec;
private final int interval;
private final String conflictKey;
private final boolean allowDelete;

private DatabendSinkConfig(Builder builder) {
this.url = builder.url;
this.username = builder.username;
this.password = builder.password;
this.database = builder.database;
this.table = builder.table;
this.autoCommit = builder.autoCommit;
this.batchSize = builder.batchSize;
this.executeTimeoutSec = builder.executeTimeoutSec;
this.interval = builder.interval;
this.conflictKey = builder.conflictKey;
this.allowDelete = builder.allowDelete;
}

public static DatabendSinkConfig of(ReadonlyConfig config) {
DatabendSinkConfig sinkConfig = new DatabendSinkConfig();

// common options
sinkConfig.setUrl(config.get(URL));
sinkConfig.setUsername(config.get(USERNAME));
sinkConfig.setPassword(config.get(PASSWORD));
sinkConfig.setDatabase(config.get(DATABASE));
sinkConfig.setTable(config.get(TABLE));
sinkConfig.setAutoCommit(config.get(AUTO_COMMIT));
sinkConfig.setBatchSize(config.get(BATCH_SIZE));
sinkConfig.setMaxRetries(config.get(MAX_RETRIES));
sinkConfig.setJdbcConfig(config.get(JDBC_CONFIG));

// sink options
sinkConfig.setExecuteTimeoutSec(config.get(EXECUTE_TIMEOUT_SEC));
sinkConfig.setCustomSql(config.getOptional(CUSTOM_SQL).orElse(null));
sinkConfig.setSchemaSaveMode(config.get(SCHEMA_SAVE_MODE));
sinkConfig.setDataSaveMode(config.get(DATA_SAVE_MODE));
// Create properties for JDBC connection
Properties properties = new Properties();
if (sinkConfig.getJdbcConfig() != null) {
sinkConfig.getJdbcConfig().forEach(properties::setProperty);
return new Builder()
.withUrl(config.get(DatabendOptions.URL))
.withUsername(config.get(DatabendOptions.USERNAME))
.withPassword(config.get(DatabendOptions.PASSWORD))
.withDatabase(config.get(DatabendOptions.DATABASE))
.withTable(config.get(DatabendOptions.TABLE))
.withAutoCommit(config.get(DatabendOptions.AUTO_COMMIT))
.withBatchSize(config.get(DatabendOptions.BATCH_SIZE))
.withExecuteTimeoutSec(config.get(DatabendSinkOptions.EXECUTE_TIMEOUT_SEC))
.withConflictKey(config.get(DatabendSinkOptions.CONFLICT_KEY))
.withAllowDelete(config.get(DatabendSinkOptions.ALLOW_DELETE))
.build();
}

public static class Builder {
private String url;
private String username;
private String password;
private String database;
private String table;
private boolean autoCommit = true;
private int batchSize = 1000;
private int executeTimeoutSec = 300;
private int interval = 30;
private String conflictKey;
private boolean allowDelete = false;

public Builder withUrl(String url) {
this.url = url;
return this;
}

public Builder withUsername(String username) {
this.username = username;
return this;
}
if (!properties.containsKey("user")) {
properties.setProperty("user", sinkConfig.getUsername());

public Builder withPassword(String password) {
this.password = password;
return this;
}
if (!properties.containsKey("password")) {
properties.setProperty("password", sinkConfig.getPassword());

public Builder withDatabase(String database) {
this.database = database;
return this;
}
if (sinkConfig.getSsl() != null) {
properties.setProperty("ssl", sinkConfig.getSsl().toString());

public Builder withTable(String table) {
this.table = table;
return this;
}
sinkConfig.setProperties(properties);

return sinkConfig;
}
public Builder withAutoCommit(boolean autoCommit) {
this.autoCommit = autoCommit;
return this;
}

// Change UserName, password, jdbcConfig to properties from databendSinkConfig
public Properties toProperties() {
Properties properties = new Properties();
properties.put("user", username);
properties.put("password", password);
properties.put("ssl", ssl);
if (jdbcConfig != null) {
jdbcConfig.forEach(properties::put);
public Builder withBatchSize(int batchSize) {
this.batchSize = batchSize;
return this;
}
return properties;
}
/** Convert this config to a ReadonlyConfig */
public ReadonlyConfig toReadonlyConfig() {
Map<String, Object> map = new HashMap<>();
map.put(URL.key(), url);
map.put(USERNAME.key(), username);
map.put(PASSWORD.key(), password);
if (ssl != null) {
map.put(SSL.key(), ssl);

public Builder withExecuteTimeoutSec(int executeTimeoutSec) {
this.executeTimeoutSec = executeTimeoutSec;
return this;
}
map.put(DATABASE.key(), database);
map.put(TABLE.key(), table);
map.put(AUTO_COMMIT.key(), autoCommit);
map.put(BATCH_SIZE.key(), batchSize);
map.put(MAX_RETRIES.key(), maxRetries);
if (jdbcConfig != null) {
map.put(JDBC_CONFIG.key(), jdbcConfig);

public Builder withInterval(int interval) {
this.interval = interval;
return this;
}
map.put(EXECUTE_TIMEOUT_SEC.key(), executeTimeoutSec);
if (customSql != null) {
map.put(CUSTOM_SQL.key(), customSql);

public Builder withConflictKey(String conflictKey) {
this.conflictKey = conflictKey;
return this;
}
map.put(SCHEMA_SAVE_MODE.key(), schemaSaveMode);
map.put(DATA_SAVE_MODE.key(), dataSaveMode);

return ReadonlyConfig.fromMap(map);
public Builder withAllowDelete(boolean allowDelete) {
this.allowDelete = allowDelete;
return this;
}

public DatabendSinkConfig build() {
return new DatabendSinkConfig(this);
}
}

public Properties getProperties() {
Properties properties = new Properties();
properties.setProperty("user", username);
properties.setProperty("password", password);
return properties;
}

public String getRawTableName() {
long timestamp = System.currentTimeMillis();
return table + "_raw_" + timestamp;
}

public String getStreamName() {
long timestamp = System.currentTimeMillis();
return table + "_stream_" + timestamp;
}

public Properties toProperties() {
return getProperties();
}

public boolean isCdcMode() {
return conflictKey != null && !conflictKey.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,22 @@ public class DatabendSinkOptions {
.intType()
.defaultValue(300)
.withDescription("The timeout seconds for Databend client execution");

public static final Option<Integer> BATCH_SIZE =
Options.key("batch_size")
.intType()
.defaultValue(1000)
.withDescription("Batch size for CDC merge operations");

public static final Option<String> CONFLICT_KEY =
Options.key("conflict_key")
.stringType()
.noDefaultValue()
.withDescription("Conflict key for CDC merge operations");

public static final Option<Boolean> ALLOW_DELETE =
Options.key("allow_delete")
.booleanType()
.defaultValue(false)
.withDescription("Whether to allow delete operations in CDC mode");
}
Loading