Skip to content

Commit d54cef1

Browse files
committed
enable_delete
1 parent 1fb5cb8 commit d54cef1

File tree

8 files changed

+50
-50
lines changed

8 files changed

+50
-50
lines changed

docs/en/connector-v2/sink/Databend.md

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,23 +34,23 @@ The Databend sink internally implements bulk data import through stage attachmen
3434
3535
## Sink Options
3636

37-
| Name | Type | Required | Default Value | Description |
38-
|------|------|----------|---------------|---------------------------------------------|
39-
| url | String | Yes | - | Databend JDBC connection URL |
40-
| username | String | Yes | - | Databend database username |
41-
| password | String | Yes | - | Databend database password |
42-
| database | String | No | - | Databend database name, defaults to the database name specified in the connection URL |
43-
| table | String | No | - | Databend table name |
44-
| batch_size | Integer | No | 1000 | Number of records for batch writing |
45-
| auto_commit | Boolean | No | true | Whether to auto-commit transactions |
46-
| max_retries | Integer | No | 3 | Maximum retry attempts on write failure |
47-
| schema_save_mode | Enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST | Schema save mode |
48-
| data_save_mode | Enum | No | APPEND_DATA | Data save mode |
49-
| custom_sql | String | No | - | Custom write SQL, typically used for complex write scenarios |
37+
| Name | Type | Required | Default Value | Description |
38+
|---------------------|------|----------|---------------|---------------------------------------------|
39+
| url | String | Yes | - | Databend JDBC connection URL |
40+
| username | String | Yes | - | Databend database username |
41+
| password | String | Yes | - | Databend database password |
42+
| database | String | No | - | Databend database name, defaults to the database name specified in the connection URL |
43+
| table | String | No | - | Databend table name |
44+
| batch_size | Integer | No | 1000 | Number of records for batch writing |
45+
| auto_commit | Boolean | No | true | Whether to auto-commit transactions |
46+
| max_retries | Integer | No | 3 | Maximum retry attempts on write failure |
47+
| schema_save_mode | Enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST | Schema save mode |
48+
| data_save_mode | Enum | No | APPEND_DATA | Data save mode |
49+
| custom_sql | String | No | - | Custom write SQL, typically used for complex write scenarios |
5050
| execute_timeout_sec | Integer | No | 300 | SQL execution timeout (seconds) |
51-
| jdbc_config | Map | No | - | Additional JDBC connection configuration, such as connection timeout parameters |
52-
| conflict_key | String | No | - | Conflict key for CDC mode, used to determine the primary key for conflict resolution |
53-
| allow_delete | Boolean | No | false | Whether to allow delete operations in CDC mode |
51+
| jdbc_config | Map | No | - | Additional JDBC connection configuration, such as connection timeout parameters |
52+
| conflict_key | String | No | - | Conflict key for CDC mode, used to determine the primary key for conflict resolution |
53+
| enable_delete | Boolean | No | false | Whether to allow delete operations in CDC mode |
5454

5555
### schema_save_mode[Enum]
5656

@@ -168,7 +168,7 @@ sink {
168168
# Enable CDC mode
169169
batch_size = 1
170170
conflict_key = "id"
171-
allow_delete = true
171+
enable_delete = true
172172
}
173173
}
174174
```

docs/zh/connector-v2/sink/Databend.md

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,23 +34,23 @@ Databend sink 内部通过 stage attachment 实现数据的批量导入。
3434
3535
## Sink 选项
3636

37-
| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
38-
|------|------|----------|--------|------------------------------------|
39-
| url | String || - | Databend JDBC 连接 URL |
40-
| username | String || - | Databend 数据库用户名 |
41-
| password | String || - | Databend 数据库密码 |
42-
| database | String || - | Databend 数据库名称,默认使用连接 URL 中指定的数据库名 |
43-
| table | String || - | Databend 表名称 |
44-
| batch_size | Integer || 1000 | 批量写入的记录数 |
45-
| auto_commit | Boolean || true | 是否自动提交事务 |
46-
| max_retries | Integer || 3 | 写入失败时的最大重试次数 |
47-
| schema_save_mode | Enum || CREATE_SCHEMA_WHEN_NOT_EXIST | 保存 Schema 的模式 |
48-
| data_save_mode | Enum || APPEND_DATA | 保存数据的模式 |
49-
| custom_sql | String || - | 自定义写入 SQL,通常用于复杂的写入场景 |
37+
| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
38+
|---------------------|------|----------|--------|------------------------------------|
39+
| url | String || - | Databend JDBC 连接 URL |
40+
| username | String || - | Databend 数据库用户名 |
41+
| password | String || - | Databend 数据库密码 |
42+
| database | String || - | Databend 数据库名称,默认使用连接 URL 中指定的数据库名 |
43+
| table | String || - | Databend 表名称 |
44+
| batch_size | Integer || 1000 | 批量写入的记录数 |
45+
| auto_commit | Boolean || true | 是否自动提交事务 |
46+
| max_retries | Integer || 3 | 写入失败时的最大重试次数 |
47+
| schema_save_mode | Enum || CREATE_SCHEMA_WHEN_NOT_EXIST | 保存 Schema 的模式 |
48+
| data_save_mode | Enum || APPEND_DATA | 保存数据的模式 |
49+
| custom_sql | String || - | 自定义写入 SQL,通常用于复杂的写入场景 |
5050
| execute_timeout_sec | Integer || 300 | 执行SQL的超时时间(秒) |
51-
| jdbc_config | Map || - | 额外的 JDBC 连接配置,如连接超时参数等 |
52-
| conflict_key | String || - | cdc 模式下的冲突键,用于确定冲突解决的主键 |
53-
| allow_delete | Boolean || false | cdc 模式下是否允许删除操作 |
51+
| jdbc_config | Map || - | 额外的 JDBC 连接配置,如连接超时参数等 |
52+
| conflict_key | String || - | cdc 模式下的冲突键,用于确定冲突解决的主键 |
53+
| enable_delete | Boolean || false | cdc 模式下是否允许删除操作 |
5454

5555
### schema_save_mode[Enum]
5656

@@ -169,7 +169,7 @@ sink {
169169
batch_size = 1
170170
interval = 3
171171
conflict_key = "id"
172-
allow_delete = true
172+
enable_delete = true
173173
}
174174
}
175175
```

seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendSinkConfig.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class DatabendSinkConfig implements Serializable {
4040
private final int executeTimeoutSec;
4141
private final int interval;
4242
private final String conflictKey;
43-
private final boolean allowDelete;
43+
private final boolean enableDelete;
4444

4545
private DatabendSinkConfig(Builder builder) {
4646
this.url = builder.url;
@@ -53,7 +53,7 @@ private DatabendSinkConfig(Builder builder) {
5353
this.executeTimeoutSec = builder.executeTimeoutSec;
5454
this.interval = builder.interval;
5555
this.conflictKey = builder.conflictKey;
56-
this.allowDelete = builder.allowDelete;
56+
this.enableDelete = builder.enableDelete;
5757
}
5858

5959
public static DatabendSinkConfig of(ReadonlyConfig config) {
@@ -67,7 +67,7 @@ public static DatabendSinkConfig of(ReadonlyConfig config) {
6767
.withBatchSize(config.get(DatabendOptions.BATCH_SIZE))
6868
.withExecuteTimeoutSec(config.get(DatabendSinkOptions.EXECUTE_TIMEOUT_SEC))
6969
.withConflictKey(config.get(DatabendSinkOptions.CONFLICT_KEY))
70-
.withAllowDelete(config.get(DatabendSinkOptions.ALLOW_DELETE))
70+
.withAllowDelete(config.get(DatabendSinkOptions.ENABLE_DELETE))
7171
.build();
7272
}
7373

@@ -82,7 +82,7 @@ public static class Builder {
8282
private int executeTimeoutSec = 300;
8383
private int interval = 30;
8484
private String conflictKey;
85-
private boolean allowDelete = false;
85+
private boolean enableDelete = false;
8686

8787
public Builder withUrl(String url) {
8888
this.url = url;
@@ -135,7 +135,7 @@ public Builder withConflictKey(String conflictKey) {
135135
}
136136

137137
public Builder withAllowDelete(boolean allowDelete) {
138-
this.allowDelete = allowDelete;
138+
this.enableDelete = allowDelete;
139139
return this;
140140
}
141141

seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendSinkOptions.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ public class DatabendSinkOptions {
6060
.noDefaultValue()
6161
.withDescription("Conflict key for CDC merge operations");
6262

63-
public static final Option<Boolean> ALLOW_DELETE =
64-
Options.key("allow_delete")
63+
public static final Option<Boolean> ENABLE_DELETE =
64+
Options.key("enable_delete")
6565
.booleanType()
6666
.defaultValue(false)
6767
.withDescription("Whether to allow delete operations in CDC mode");

seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public DatabendSink(CatalogTable catalogTable, ReadonlyConfig options) {
132132
// CDC mode info
133133
if (databendSinkConfig.isCdcMode()) {
134134
log.info("CDC mode enabled with conflict key: {}", databendSinkConfig.getConflictKey());
135-
log.info("Allow delete: {}", databendSinkConfig.isAllowDelete());
135+
log.info("Enable delete: {}", databendSinkConfig.isEnableDelete());
136136
log.info("Interval: {} seconds", databendSinkConfig.getInterval());
137137
}
138138
}

seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkAggregatedCommitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ private String generateMergeSql() {
195195

196196
sql.append("WHEN MATCHED AND b.action = 'update' THEN UPDATE * ");
197197

198-
if (databendSinkConfig.isAllowDelete()) {
198+
if (databendSinkConfig.isEnableDelete()) {
199199
sql.append("WHEN MATCHED AND b.action = 'delete' THEN DELETE ");
200200
}
201201

seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkWriter.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public class DatabendSinkWriter
8282
private String targetTableName;
8383
private PreparedStatement cdcPreparedStatement;
8484
private String conflictKey;
85-
private boolean allowDelete;
85+
private boolean enableDelete;
8686
private int interval;
8787

8888
public DatabendSinkWriter(
@@ -116,7 +116,7 @@ public DatabendSinkWriter(
116116
log.info("DatabendSinkWriter initialized in traditional mode");
117117
}
118118
this.conflictKey = databendSinkConfig.getConflictKey();
119-
this.allowDelete = databendSinkConfig.isAllowDelete();
119+
this.enableDelete = databendSinkConfig.isEnableDelete();
120120
this.interval = databendSinkConfig.getInterval();
121121
this.targetTableName = table;
122122

@@ -272,7 +272,7 @@ String generateMergeSql() {
272272

273273
sql.append("WHEN MATCHED AND b.action = 'update' THEN UPDATE * ");
274274

275-
if (allowDelete) {
275+
if (enableDelete) {
276276
sql.append("WHEN MATCHED AND b.action = 'delete' THEN DELETE ");
277277
}
278278

@@ -387,7 +387,7 @@ private void processCdcRow(SeaTunnelRow row) throws SQLException {
387387
return;
388388
}
389389

390-
if ("delete".equals(action) && !allowDelete) {
390+
if ("delete".equals(action) && !enableDelete) {
391391
log.debug("DELETE operation not allowed, skipping row");
392392
return;
393393
}
@@ -970,8 +970,8 @@ String getStreamName() {
970970
return streamName;
971971
}
972972

973-
boolean isAllowDelete() {
974-
return allowDelete;
973+
boolean isEnableDelete() {
974+
return enableDelete;
975975
}
976976

977977
CatalogTable getCatalogTable() {

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-databend-e2e/src/test/resources/databend/fake_to_databend_cdc.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,6 @@ sink {
8787
# Enable CDC mode
8888
batch_size = 1
8989
conflict_key = "id"
90-
allow_delete = true
90+
enable_delete = true
9191
}
9292
}

0 commit comments

Comments
 (0)