Skip to content
Draft
7 changes: 6 additions & 1 deletion docs/en/connector-v2/sink/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it.
|-------------------------------------------|---------|----------|------------------------------|
| url | String | Yes | - |
| driver | String | Yes | - |
| username | String | No | - |
| username | String | No | - |
| password | String | No | - |
| query | String | No | - |
| compatible_mode | String | No | - |
Expand Down Expand Up @@ -64,6 +64,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it.
| access_key_id | String | No | |
| secret_access_key | String | No | |
| region | String | No | |
| use_sqlserver_bulk_copy | Boolean | No | false |

### driver [string]

Expand Down Expand Up @@ -245,6 +246,10 @@ The secret_access_key in AWS authentication. Only valid for dialect="dsql"
### region [String]
The area where Amazon Aurora DSQL is located. Only valid for dialect="dsql"

### use_sqlserver_bulk_copy [boolean]
When you use sqlserver as the target end, in order to improve the insertion efficiency, you can turn on `use_sqlserver_bulk_copy=true`, which will use the bulk copy method of sqlserver to write data, but only for insertion.
In addition, using this parameter will make the parameters `auto_commit`, `is_exactly_once` and `enable_upsert` not effective. Field types that are not supported when inserting data using this method: `smalldatetime`, `sql_variant`.


## tips

Expand Down
11 changes: 8 additions & 3 deletions docs/zh/connector-v2/sink/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ import ChangeLog from '../changelog/connector-jdbc.md';
| custom_sql | String | 否 | - |
| enable_upsert | Boolean | 否 | true |
| use_copy_statement | Boolean | 否 | false |
| access_key_id | String | 否 | |
| secret_access_key | String | 否 | |
| region | String | 否 | |
| access_key_id | String | 否 | |
| secret_access_key | String | 否 | |
| region | String | 否 | |
| use_sqlserver_bulk_copy | Boolean | 否 | false |

### driver [string]

Expand Down Expand Up @@ -234,6 +235,10 @@ AWS IAM 认证中所需要的secret_access_key。 该参考仅适用于 dialect=
### region [String]
Amazon Aurora DSQL 所在的区域。 该参考仅适用于 dialect="dsql"

### use_sqlserver_bulk_copy [boolean]
当你使用sqlserver作为目标端的时候,为了提高插入效率,可以开启`use_sqlserver_bulk_copy=true`,这会使用sqlserver的bulk copy方式写入数据,但仅限于插入。
另外,使用该参数会使参数`auto_commit`、`is_exactly_once`以及`enable_upsert`都不会生效。使用该方式插入数据不支持的字段类型:`smalldatetime`、`sql_variant`。

## tips

在 is_exactly_once = "true" 的情况下,使用 XA 事务。这需要数据库支持,有些数据库需要一些设置:<br/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
KERBEROS_AUTHORIZED_FAILED("COMMON-35", "Kerberos authorized failed"),
CLOSE_FAILED("COMMON-36", "'<identifier>' close failed."),
SEATUNNEL_ROW_SERIALIZE_FAILED("COMMON-37", "Seatunnel row serialize failed. Row={ '<row>' }"),
;
SEATUNNEL_CONFIG_ERROR(
"COMMON-38",
"The parameter is configured incorrectly,it shouldn’t be '<value>', please check!");

private final String code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class JdbcSinkConfig implements Serializable {
@Builder.Default private boolean isPrimaryKeyUpdated = true;
private boolean supportUpsertByInsertOnly;
private boolean useCopyStatement;
private boolean useSqlserverBulkCopy;
@Builder.Default private boolean createIndex = true;

public static JdbcSinkConfig of(ReadonlyConfig config) {
Expand All @@ -55,6 +56,7 @@ public static JdbcSinkConfig of(ReadonlyConfig config) {
config.get(JdbcSinkOptions.SUPPORT_UPSERT_BY_INSERT_ONLY));
builder.simpleSql(config.get(JdbcSinkOptions.QUERY));
builder.useCopyStatement(config.get(JdbcSinkOptions.USE_COPY_STATEMENT));
builder.useSqlserverBulkCopy(config.get(JdbcSinkOptions.USE_SQLSERVER_BULK_COPY));
builder.createIndex(config.get(JdbcSinkOptions.CREATE_INDEX));
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ public class JdbcSinkOptions extends JdbcCommonOptions {
.defaultValue(false)
.withDescription("support copy in statement (postgresql)");

public static final Option<Boolean> USE_SQLSERVER_BULK_COPY =
Options.key("use_sqlserver_bulk_copy")
.booleanType()
.defaultValue(false)
.withDescription("support bulk copy in statement (sqlserver)");

public static final Option<FieldIdeEnum> FIELD_IDE =
Options.key("field_ide")
.enumType(FieldIdeEnum.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.InsertOrUpdateBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SqlserverBulkCopyBatchStatementExecutor;

import lombok.NonNull;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -68,6 +69,11 @@ public JdbcOutputFormat build() {
createCopyInBufferStatementExecutor(
createCopyInBatchStatementExecutor(
dialect, table, tableSchema));
} else if (jdbcSinkConfig.isUseSqlserverBulkCopy()) {
statementExecutorFactory =
() ->
createSqlserverBulkCopyInBufferStatementExecutor(
jdbcSinkConfig.getTable(), tableSchema.getColumns());
} else if (StringUtils.isNotBlank(jdbcSinkConfig.getSimpleSql())) {
statementExecutorFactory =
() ->
Expand Down Expand Up @@ -222,6 +228,12 @@ private static JdbcBatchStatementExecutor<SeaTunnelRow> createCopyInBufferStatem
copyManagerBatchStatementExecutor, Function.identity());
}

private static JdbcBatchStatementExecutor<SeaTunnelRow>
createSqlserverBulkCopyInBufferStatementExecutor(
String schemaTableName, List<Column> columns) {
return new SqlserverBulkCopyBatchStatementExecutor(schemaTableName, columns);
}

private static CopyManagerBatchStatementExecutor createCopyInBatchStatementExecutor(
JdbcDialect dialect, String table, TableSchema tableSchema) {
String columns =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;

import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;

import com.microsoft.sqlserver.jdbc.ISQLServerBulkData;
import com.microsoft.sqlserver.jdbc.SQLServerBulkCopy;
import com.microsoft.sqlserver.jdbc.SQLServerBulkCopyOptions;
Comment on lines +27 to +29
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chl-wxp This strong dependency will lead to a strong reliance, where mssql-jdbc is imported regardless of whether it is used, which is inconsistent with the previous design.
cc @davidzollo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chl-wxp This strong dependency will lead to a strong reliance, where mssql-jdbc is imported regardless of whether it is used, which is inconsistent with the previous design. cc @davidzollo

Thanks for the reminder. Such strong dependency is indeed a problem. I need to think about how to solve this problem.

import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

@Slf4j
public class SqlserverBulkCopyBatchStatementExecutor
implements JdbcBatchStatementExecutor<SeaTunnelRow> {

@NonNull private final String schemaTableName;
@NonNull private final List<Column> columns;
@NonNull private final List<Object[]> buffer = new ArrayList<>();

private Connection connection;
private ResultSetMetaData resultSetMetaData;

public SqlserverBulkCopyBatchStatementExecutor(String schemaTableName, List<Column> columns) {
this.columns = columns;
this.schemaTableName = schemaTableName;
}

@Override
public void prepareStatements(Connection connection) throws SQLException {
this.connection = connection.unwrap(com.microsoft.sqlserver.jdbc.SQLServerConnection.class);
this.connection.setAutoCommit(false);
this.resultSetMetaData = getResultSetMetaData(this.connection, schemaTableName);
}

@Override
public void addToBatch(SeaTunnelRow record) throws SQLException {
Object[] rowData = new Object[columns.size()];
for (int i = 0; i < columns.size(); i++) {
Object field = record.getField(i);
SeaTunnelDataType<?> type = columns.get(i).getDataType();
switch (type.getSqlType()) {
case DATE:
rowData[i] =
field == null
? null
: java.sql.Date.valueOf((java.time.LocalDate) field);
break;
case TIME:
rowData[i] =
field == null
? null
: java.sql.Time.valueOf((java.time.LocalTime) field);
break;
case TIMESTAMP:
rowData[i] =
field == null
? null
: java.sql.Timestamp.valueOf((java.time.LocalDateTime) field);
break;
Comment on lines +75 to +92
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any other types need to handle? like Decimal?

default:
rowData[i] = field;
}
}
buffer.add(rowData);
}

@Override
public void executeBatch() throws SQLException {
if (!buffer.isEmpty()) {
executeBatchInternal();
}
}

private void executeBatchInternal() {
try (SQLServerBulkCopy bulkCopy = new SQLServerBulkCopy(connection)) {
bulkCopy.setDestinationTableName(schemaTableName);
// BulkCopy config
SQLServerBulkCopyOptions options = new SQLServerBulkCopyOptions();
options.setTableLock(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is enabling table-level locks necessary?

options.setUseInternalTransaction(false);
options.setCheckConstraints(false);
options.setFireTriggers(false);
options.setBatchSize(buffer.size());
bulkCopy.setBulkCopyOptions(options);
long start = System.currentTimeMillis();
bulkCopy.writeToServer(new MemoryBulkData(resultSetMetaData, buffer));
connection.commit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
connection.commit();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SeaTunnel self will handle commit

log.info(
"Bulk copied {} rows to table {}, cost {}s",
buffer.size(),
schemaTableName,
(System.currentTimeMillis() - start) / 1000);
buffer.clear();
} catch (SQLException e) {
try {
connection.rollback();
} catch (SQLException rollbackEx) {
log.error("Failed to rollback", rollbackEx);
}
Comment on lines +128 to +132
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
try {
connection.rollback();
} catch (SQLException rollbackEx) {
log.error("Failed to rollback", rollbackEx);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There‘s no need to handle rollback, JdbcOutputFormat will handle it

throw new JdbcConnectorException(
JdbcConnectorErrorCode.TRANSACTION_OPERATION_FAILED, e);
}
}

@Override
public void closeStatements() throws SQLException {
executeBatch();
}

private ResultSetMetaData getResultSetMetaData(Connection connection, String schemaTableName) {
final String[] split = schemaTableName.split("\\.");
if (split.length != 2) {
Map<String, String> params = new HashMap<>();
params.put("value", schemaTableName);
throw new SeaTunnelRuntimeException(CommonErrorCode.SEATUNNEL_CONFIG_ERROR, params);
}
Comment on lines +144 to +149
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it have to be schema.tableName? How about we allow users to only configure tableName, and in this case, the default schema will be used?

sink {
  Jdbc {
    // ...
    table = "my_target_table"  
  }
}

String queryMeta =
String.format("select * from \"%s\".\"%s\" where 1=0", split[0], split[1]);
try {
return connection.createStatement().executeQuery(queryMeta).getMetaData();
} catch (SQLException e) {
throw new SeaTunnelRuntimeException(
JdbcConnectorErrorCode.NO_SUPPORT_OPERATION_FAILED,
"get meta data fail:" + schemaTableName);
}
Comment on lines +152 to +158
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
try {
return connection.createStatement().executeQuery(queryMeta).getMetaData();
} catch (SQLException e) {
throw new SeaTunnelRuntimeException(
JdbcConnectorErrorCode.NO_SUPPORT_OPERATION_FAILED,
"get meta data fail:" + schemaTableName);
}
try (Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(queryMeta)) {
return resultSet.getMetaData();
} catch (SQLException e) {
throw new SeaTunnelRuntimeException(
JdbcConnectorErrorCode.NO_SUPPORT_OPERATION_FAILED,
"get meta data fail: " + schemaTableName, e);
}

}

static class MemoryBulkData implements ISQLServerBulkData {
private final ResultSetMetaData metaData;
private final Iterator<Object[]> iterator;
private Object[] current;

public MemoryBulkData(ResultSetMetaData metaData, List<Object[]> rows) {
this.metaData = metaData;
this.iterator = rows.iterator();
}

@SneakyThrows
@Override
public Set<Integer> getColumnOrdinals() {
int columnCount = metaData.getColumnCount();
Set<Integer> ordinals = new LinkedHashSet<>();
for (int i = 1; i <= columnCount; i++) {
ordinals.add(i);
}
return ordinals;
}

@Override
public Object[] getRowData() {
return current;
}

@Override
public boolean next() {
if (iterator.hasNext()) {
current = iterator.next();
return true;
}
return false;
}

@SneakyThrows
@Override
public String getColumnName(int column) {
return metaData.getColumnName(column);
}

@SneakyThrows
@Override
public int getColumnType(int column) {
return metaData.getColumnType(column);
}

@SneakyThrows
@Override
public int getPrecision(int column) {
return metaData.getPrecision(column);
}

@SneakyThrows
@Override
public int getScale(int column) {
return metaData.getScale(column);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkOptions;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class JdbcSinkConfigChecker {

public static void check(ReadonlyConfig readonlyConfig) {
if (readonlyConfig.get(JdbcSinkOptions.USE_SQLSERVER_BULK_COPY)) {
if (readonlyConfig.get(JdbcSinkOptions.AUTO_COMMIT)) {
log.warn(
"When use_sqlserver_bulk_copy is enabled, auto_commit is true and does not take effect.");
}
if (readonlyConfig.get(JdbcSinkOptions.IS_EXACTLY_ONCE)) {
log.warn(
"When use_sqlserver_bulk_copy is enabled, is_exactly_once is true and does not take effect.");
}
if (readonlyConfig.get(JdbcSinkOptions.ENABLE_UPSERT)) {
log.warn(
"When use_sqlserver_bulk_copy is enabled, enable_upsert is true and does not take effect.");
}
}
}
}
Loading