-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[Feature][seatunnel-connectors-v2/connector-jdbc]sqlserver support bulk copy write #10099
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
[Feature][seatunnel-connectors-v2/connector-jdbc]sqlserver support bulk copy write #10099
Conversation
...nel/connectors/seatunnel/jdbc/internal/executor/SqlserverBulkCopyBatchStatementExecutor.java
Show resolved
Hide resolved
...nel/connectors/seatunnel/jdbc/internal/executor/SqlserverBulkCopyBatchStatementExecutor.java
Outdated
Show resolved
Hide resolved
| try { | ||
| return connection.createStatement().executeQuery(queryMeta).getMetaData(); | ||
| } catch (SQLException e) { | ||
| throw new SeaTunnelRuntimeException( | ||
| JdbcConnectorErrorCode.NO_SUPPORT_OPERATION_FAILED, | ||
| "get meta data fail:" + schemaTableName); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| try { | |
| return connection.createStatement().executeQuery(queryMeta).getMetaData(); | |
| } catch (SQLException e) { | |
| throw new SeaTunnelRuntimeException( | |
| JdbcConnectorErrorCode.NO_SUPPORT_OPERATION_FAILED, | |
| "get meta data fail:" + schemaTableName); | |
| } | |
| try (Statement statement = connection.createStatement(); | |
| ResultSet resultSet = statement.executeQuery(queryMeta)) { | |
| return resultSet.getMetaData(); | |
| } catch (SQLException e) { | |
| throw new SeaTunnelRuntimeException( | |
| JdbcConnectorErrorCode.NO_SUPPORT_OPERATION_FAILED, | |
| "get meta data fail: " + schemaTableName, e); | |
| } |
| try { | ||
| connection.rollback(); | ||
| } catch (SQLException rollbackEx) { | ||
| log.error("Failed to rollback", rollbackEx); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| try { | |
| connection.rollback(); | |
| } catch (SQLException rollbackEx) { | |
| log.error("Failed to rollback", rollbackEx); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There‘s no need to handle rollback, JdbcOutputFormat will handle it
| bulkCopy.setBulkCopyOptions(options); | ||
| long start = System.currentTimeMillis(); | ||
| bulkCopy.writeToServer(new MemoryBulkData(resultSetMetaData, buffer)); | ||
| connection.commit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| connection.commit(); | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SeaTunnel self will handle commit
| case DATE: | ||
| rowData[i] = | ||
| field == null | ||
| ? null | ||
| : java.sql.Date.valueOf((java.time.LocalDate) field); | ||
| break; | ||
| case TIME: | ||
| rowData[i] = | ||
| field == null | ||
| ? null | ||
| : java.sql.Time.valueOf((java.time.LocalTime) field); | ||
| break; | ||
| case TIMESTAMP: | ||
| rowData[i] = | ||
| field == null | ||
| ? null | ||
| : java.sql.Timestamp.valueOf((java.time.LocalDateTime) field); | ||
| break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any other types need to handle? like Decimal?
| bulkCopy.setDestinationTableName(schemaTableName); | ||
| // BulkCopy config | ||
| SQLServerBulkCopyOptions options = new SQLServerBulkCopyOptions(); | ||
| options.setTableLock(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is enabling table-level locks necessary?
| final String[] split = schemaTableName.split("\\."); | ||
| if (split.length != 2) { | ||
| Map<String, String> params = new HashMap<>(); | ||
| params.put("value", schemaTableName); | ||
| throw new SeaTunnelRuntimeException(CommonErrorCode.SEATUNNEL_CONFIG_ERROR, params); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it have to be schema.tableName? How about we allow users to only configure tableName, and in this case, the default schema will be used?
sink {
Jdbc {
// ...
table = "my_target_table"
}
}
| import com.microsoft.sqlserver.jdbc.ISQLServerBulkData; | ||
| import com.microsoft.sqlserver.jdbc.SQLServerBulkCopy; | ||
| import com.microsoft.sqlserver.jdbc.SQLServerBulkCopyOptions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chl-wxp This strong dependency will lead to a strong reliance, where mssql-jdbc is imported regardless of whether it is used, which is inconsistent with the previous design.
cc @davidzollo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chl-wxp This strong dependency will lead to a strong reliance, where
mssql-jdbcis imported regardless of whether it is used, which is inconsistent with the previous design. cc @davidzollo
Thanks for the reminder. Such strong dependency is indeed a problem. I need to think about how to solve this problem.
Purpose of this pull request
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.