diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java index a869f6ca8..442aaf3e5 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java @@ -262,7 +262,7 @@ public synchronized void close() { flush(); } catch (Exception e) { LOG.warn("Writing records to JDBC failed.", e); - throw new RuntimeException("Writing records to JDBC failed.", e); + flushException = e; } } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java index 3150a1537..91dee7ff0 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java @@ -39,6 +39,7 @@ import static org.apache.flink.connector.jdbc.JdbcTestFixture.OUTPUT_TABLE; import static org.apache.flink.connector.jdbc.JdbcTestFixture.OUTPUT_TABLE_2; import static org.apache.flink.connector.jdbc.JdbcTestFixture.OUTPUT_TABLE_3; +import static org.apache.flink.connector.jdbc.JdbcTestFixture.OUTPUT_TABLE_4; import static org.apache.flink.connector.jdbc.JdbcTestFixture.SELECT_ALL_NEWBOOKS; import static org.apache.flink.connector.jdbc.JdbcTestFixture.SELECT_ALL_NEWBOOKS_2; import static org.apache.flink.connector.jdbc.JdbcTestFixture.SELECT_ALL_NEWBOOKS_3; @@ -311,6 +312,30 @@ void testFlush() throws SQLException, IOException { } } + @Test + public void testExceptionOnFlush() { + JdbcRowOutputFormat jdbcOutputFormat = + JdbcRowOutputFormat.buildJdbcOutputFormat() + .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass()) + .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl()) + .setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE_4)) + .setBatchSize(2) + .finish(); + setRuntimeContext(jdbcOutputFormat, true); + try { + jdbcOutputFormat.open(0, 1); + + jdbcOutputFormat.writeRecord(toRow(TEST_DATA[1])); + jdbcOutputFormat.writeRecord(toRow(TEST_DATA[1])); + } catch (IOException e) { + try { + jdbcOutputFormat.close(); + } catch (Exception e1) { + assertThat(jdbcOutputFormat.getConnection()).isEqualTo(null); + } + } + } + @Test void testInvalidConnectionInJdbcOutputFormat() throws IOException, SQLException { jdbcOutputFormat = diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java index e9b295312..d90e3356b 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java @@ -43,6 +43,7 @@ public class JdbcTestFixture { public static final String OUTPUT_TABLE = "newbooks"; public static final String OUTPUT_TABLE_2 = "newbooks2"; public static final String OUTPUT_TABLE_3 = "newbooks3"; + public static final String OUTPUT_TABLE_4 = "newbooks4"; public static final String WORDS_TABLE = "words"; public static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE; public static final String SELECT_ID_BOOKS = "select id from " + INPUT_TABLE; @@ -199,6 +200,7 @@ public static void initSchema(DbMetadata dbMetadata) createTable(conn, OUTPUT_TABLE); createTable(conn, OUTPUT_TABLE_2); createTable(conn, OUTPUT_TABLE_3); + createTable(conn, OUTPUT_TABLE_4); createWordsTable(conn); } } @@ -243,6 +245,7 @@ public static void cleanUpDatabasesStatic(DbMetadata dbMetadata) stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE); stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE_2); stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE_3); + stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE_4); stat.executeUpdate("DROP TABLE " + WORDS_TABLE); } }