Skip to content

Commit

Permalink
NIFI-12010: Handle auto-commit and commit based on driver capabilitie…
Browse files Browse the repository at this point in the history
…s in SQL components
  • Loading branch information
mattyb149 committed Aug 31, 2023
1 parent a9ac8fb commit a277151
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;

/**
Expand Down Expand Up @@ -146,7 +147,14 @@ public Object getObject() throws Exception {
try {
// get a connection
connection = connectionPool.getConnection();
connection.setAutoCommit(false);
final boolean isAutoCommit = connection.getAutoCommit();
if (isAutoCommit) {
try {
connection.setAutoCommit(false);
} catch (SQLFeatureNotSupportedException sfnse) {
logger.debug("setAutoCommit(false) not supported by this driver");
}
}

// create a statement for initializing the database
statement = connection.createStatement();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import javax.sql.DataSource;
import org.apache.nifi.admin.service.transaction.Transaction;
import org.apache.nifi.admin.service.transaction.TransactionBuilder;
Expand All @@ -35,7 +36,14 @@ public Transaction start() throws TransactionException {
try {
// get a new connection
Connection connection = dataSource.getConnection();
connection.setAutoCommit(false);
final boolean isAutoCommit = connection.getAutoCommit();
if (isAutoCommit) {
try {
connection.setAutoCommit(false);
} catch (SQLFeatureNotSupportedException sfnse) {
throw new TransactionException("setAutoCommit(false) not supported by this driver");
}
}

// create a new transaction
return new StandardTransaction(connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.io.File;
import java.lang.reflect.Method;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -353,7 +354,11 @@ private void onInitSQL(Map<String, Object> SQL) throws SQLException {
//try to set autocommit to false
try {
if (sql.getConnection().getAutoCommit()) {
sql.getConnection().setAutoCommit(false);
try {
sql.getConnection().setAutoCommit(false);
} catch (SQLFeatureNotSupportedException sfnse) {
getLogger().debug("setAutoCommit(false) not supported by this driver");
}
}
} catch (Throwable ei) {
getLogger().warn("Failed to set autocommit=false for `" + e.getKey() + "`", ei);
Expand Down Expand Up @@ -382,7 +387,11 @@ private void onFinitSQL(Map<String, Object> SQL) {
OSql sql = (OSql) e.getValue();
try {
if (!sql.getConnection().getAutoCommit()) {
sql.getConnection().setAutoCommit(true); //default autocommit value in nifi
try {
sql.getConnection().setAutoCommit(true); //default autocommit value in nifi
} catch (SQLFeatureNotSupportedException sfnse) {
getLogger().debug("setAutoCommit(true) not supported by this driver");
}
}
} catch (Throwable ei) {
getLogger().warn("Failed to set autocommit=true for `" + e.getKey() + "`", ei);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ public class ScriptRunner {

public ScriptRunner(Connection connection) throws SQLException {
this.connection = connection;
this.connection.setAutoCommit(true);
if (!this.connection.getAutoCommit()) {
// May throw SQLFeatureNotSupportedException which is a subclass of SQLException
this.connection.setAutoCommit(true);
}
}

public void runScript(Reader reader) throws IOException, SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -255,7 +256,16 @@ public void onTrigger(final ProcessContext context, final ProcessSession session

int resultCount = 0;
try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes())) {
con.setAutoCommit(context.getProperty(AUTO_COMMIT).asBoolean());
final boolean isAutoCommit = con.getAutoCommit();
final boolean setAutoCommitValue = context.getProperty(AUTO_COMMIT).asBoolean();
// Only set auto-commit if necessary, log any "feature not supported" exceptions
if (isAutoCommit != setAutoCommitValue) {
try {
con.setAutoCommit(setAutoCommitValue);
} catch (SQLFeatureNotSupportedException sfnse) {
logger.debug("setAutoCommit({}) not supported by this driver", setAutoCommitValue);
}
}
try (final PreparedStatement st = con.prepareStatement(selectQuery)) {
if (fetchSize != null && fetchSize > 0) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLNonTransientException;
import java.sql.Statement;
import java.util.ArrayList;
Expand Down Expand Up @@ -290,7 +291,11 @@ private boolean isSupportBatching() {
fc.originalAutoCommit = connection.getAutoCommit();
final boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean();
if(fc.originalAutoCommit != autocommit) {
connection.setAutoCommit(autocommit);
try {
connection.setAutoCommit(autocommit);
} catch (SQLFeatureNotSupportedException sfnse) {
getLogger().debug("setAutoCommit({}) not supported by this driver", autocommit);
}
}
} catch (SQLException e) {
throw new ProcessException("Failed to disable auto commit due to " + e, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -786,7 +787,7 @@ public Connection getConnection() throws ProcessException {
try {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
return con;
return Mockito.spy(con);
} catch (final Exception e) {
throw new ProcessException("getConnection failed: " + e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.sql.ResultSetMetaData;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -202,7 +203,13 @@ public WriteResult sendData(RecordSet recordSet, Map<String, String> attributes,
try {
connection = dbcpService.getConnection(attributes);
originalAutoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
if (originalAutoCommit) {
try {
connection.setAutoCommit(false);
} catch (SQLFeatureNotSupportedException sfnse) {
getLogger().debug("setAutoCommit(false) not supported by this driver");
}
}
final DMLSettings settings = new DMLSettings(context);
final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions().getValue();
final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions().getValue();
Expand Down

0 comments on commit a277151

Please sign in to comment.