Skip to content

Commit 154b4f6

Browse files
committed
Merge branch 'MET-6544-flink-code-cleanup-in-the-area-of-exception-handling' into develop
2 parents 36b6fab + 22bb0e8 commit 154b4f6

25 files changed

+214
-82
lines changed

client/src/main/java/eu/europeana/cloud/flink/client/JobExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ private JobDetails getProgressWithRetry(String jobId) throws InterruptedExceptio
120120
return getProgress(jobId);
121121
} catch (RestClientResponseException e) {
122122
if (e.getStatusCode() == HttpStatus.NOT_FOUND &&
123-
e.getResponseBodyAsString().contains("org.apache.flink.runtime.rest.NotFoundException")) {
123+
e.getResponseBodyAsString().contains("org.apache.flink.runtime.rest.NotFoundException")) {
124124
throw new RuntimeException("There is no more job of the id: " + jobId + " on the server", e);
125125
}
126126
throw e;

commons/src/main/java/eu/europeana/processing/MetisJob.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ protected void generateTaskIdIfNeeded() {
113113
}
114114
}
115115
}catch (Exception e){
116-
throw new RuntimeException("Error while generating task id!",e);
116+
throw new RuntimeException("Error while generating task id!", e);
117117
}
118118
}
119119

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package eu.europeana.processing.exception;
2+
3+
public class FlinkWorkflowException extends Exception{
4+
public FlinkWorkflowException(String message, Throwable cause) {
5+
super(message, cause);
6+
}
7+
public FlinkWorkflowException(Throwable cause) {
8+
super(cause);
9+
}
10+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package eu.europeana.processing.exception;
2+
3+
public class FlinkWorkflowRuntimeException extends RuntimeException{
4+
public FlinkWorkflowRuntimeException(String message, Throwable cause) {
5+
super(message, cause);
6+
}
7+
public FlinkWorkflowRuntimeException(Throwable cause) {
8+
super(cause);
9+
}
10+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package eu.europeana.processing.exception;
2+
3+
public class RecoverableException extends FlinkWorkflowRuntimeException {
4+
public RecoverableException(String message, Throwable cause) {
5+
super(message, cause);
6+
}
7+
public RecoverableException(Throwable cause) {
8+
super(cause);
9+
}
10+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package eu.europeana.processing.exception;
2+
3+
4+
5+
public class UnrecoverableJobException extends FlinkWorkflowRuntimeException {
6+
public UnrecoverableJobException(String message, Throwable cause) {
7+
super(message, cause);
8+
}
9+
public UnrecoverableJobException(Throwable cause) {
10+
super(cause);
11+
}
12+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package eu.europeana.processing.exception;
2+
3+
4+
5+
public class UnrecoverableRecordException extends FlinkWorkflowException {
6+
public UnrecoverableRecordException(String message, Throwable cause) {
7+
super(message, cause);
8+
}
9+
public UnrecoverableRecordException(Throwable cause) {
10+
super(cause);
11+
}
12+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package eu.europeana.processing.exception.classifier;
2+
3+
4+
import eu.europeana.processing.exception.RecoverableException;
5+
import eu.europeana.processing.exception.UnrecoverableJobException;
6+
import eu.europeana.processing.exception.UnrecoverableRecordException;
7+
8+
import java.sql.SQLException;
9+
10+
public class DatabaseExceptionClassifier {
11+
12+
private DatabaseExceptionClassifier() {}
13+
14+
public static void classifyAndThrow(SQLException sqlException) throws UnrecoverableRecordException {
15+
String sqlState = sqlException.getSQLState();
16+
if (sqlState == null || sqlState.length() < 2) {
17+
throw new UnrecoverableJobException("Unknown database error", sqlException);
18+
}
19+
20+
// 08 Connection Exception ConnectionException
21+
// 22 Data Exception (invalid data) DataFormatException
22+
// 23 Integrity Constraint Violation IntegrityConstraintException
23+
// 28 Invalid Authorization AuthenticationException
24+
// 40 Transaction Rollback TransactionException
25+
// 42 Syntax Error or Access Rule Violation SqlSyntaxException
26+
27+
switch (sqlState.substring(0, 2)) {
28+
case "08" -> throw new RecoverableException("Connection error", sqlException);
29+
case "22" -> throw new UnrecoverableRecordException("Invalid data format", sqlException);
30+
case "23", "28", "40", "42" -> throw new UnrecoverableJobException("Database error (" + sqlException.getSQLState() + ")", sqlException);
31+
default -> throw new UnrecoverableJobException("Database error", sqlException);
32+
}
33+
}
34+
35+
}

commons/src/main/java/eu/europeana/processing/repository/ExecutionRecordExceptionLogRepository.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
11
package eu.europeana.processing.repository;
22

33
import eu.europeana.processing.DbConnectionProvider;
4+
import eu.europeana.processing.exception.FlinkWorkflowException;
45
import eu.europeana.processing.retryable.Retryable;
56
import eu.europeana.processing.model.ExecutionRecord;
67
import eu.europeana.processing.model.ExecutionRecordResult;
78
import java.io.Serial;
89
import org.slf4j.Logger;
910
import org.slf4j.LoggerFactory;
1011

11-
import java.io.IOException;
1212
import java.io.Serializable;
1313
import java.sql.Connection;
1414
import java.sql.PreparedStatement;
1515
import java.sql.ResultSet;
1616
import java.sql.SQLException;
1717

18+
import static eu.europeana.processing.exception.classifier.DatabaseExceptionClassifier.classifyAndThrow;
19+
1820
/**
1921
* Database repository responsible for <b>execution_record_exception_log</b> table
2022
*/
@@ -56,8 +58,9 @@ public ExecutionRecordExceptionLogRepository(DbConnectionProvider dbConnectionPr
5658
* In case of conflict (try to insert the same record twice) the method does nothing.
5759
*
5860
* @param executionRecordResult instance to be saved in the database
61+
* @throws FlinkWorkflowException in case of any DB exception
5962
*/
60-
public void save(ExecutionRecordResult executionRecordResult) {
63+
public void save(ExecutionRecordResult executionRecordResult) throws FlinkWorkflowException {
6164
try (Connection con = dbConnectionProvider.getConnection();
6265
PreparedStatement preparedStatement = con.prepareStatement(
6366
"INSERT INTO \"batch-framework\".execution_record_exception_log (DATASET_ID,EXECUTION_ID,EXECUTION_NAME, RECORD_ID, exception)"
@@ -76,7 +79,7 @@ public void save(ExecutionRecordResult executionRecordResult) {
7679
LOGGER.info("Record error log already existed in the DB: {}", executionRecord.getExecutionRecordKey());
7780
}
7881
} catch (SQLException e) {
79-
throw new RuntimeException(e);
82+
classifyAndThrow(e);
8083
}
8184
}
8285

@@ -85,25 +88,24 @@ public void save(ExecutionRecordResult executionRecordResult) {
8588
* @param datasetId dataset identifier
8689
* @param executionId execution identifier
8790
* @return number of elements in <b>execution_record_exception_log</b> table for specified dataset and execution
88-
* @throws IOException
91+
* @throws FlinkWorkflowException in case of any DB exception
8992
*/
90-
public long countByDatasetIdAndExecutionId(String datasetId, String executionId) throws IOException {
93+
public long countByDatasetIdAndExecutionId(String datasetId, String executionId) throws FlinkWorkflowException {
9194

92-
ResultSet resultSet;
95+
long count = 0L;
9396
try (PreparedStatement preparedStatement = dbConnectionProvider.getConnection().prepareStatement(NO_OF_ELEMENTS)) {
9497
preparedStatement.setString(1, datasetId);
9598
preparedStatement.setString(2, executionId);
9699

97-
resultSet = preparedStatement.executeQuery();
100+
ResultSet resultSet = preparedStatement.executeQuery();
98101

99102
if (resultSet.next()) {
100-
return resultSet.getLong("elements");
101-
} else {
102-
return 0L;
103+
count = resultSet.getLong("elements");
103104
}
104105
} catch(SQLException e){
105-
throw new IOException(e);
106+
classifyAndThrow(e);
106107
}
108+
return count;
107109
}
108110

109111
}

commons/src/main/java/eu/europeana/processing/repository/ExecutionRecordRepository.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package eu.europeana.processing.repository;
22

33
import eu.europeana.processing.DbConnectionProvider;
4+
import eu.europeana.processing.exception.FlinkWorkflowException;
45
import eu.europeana.processing.model.ExecutionRecord;
56
import eu.europeana.processing.model.ExecutionRecordKey;
67
import eu.europeana.processing.model.ExecutionRecordResult;
@@ -9,7 +10,6 @@
910
import org.slf4j.Logger;
1011
import org.slf4j.LoggerFactory;
1112

12-
import java.io.IOException;
1313
import java.io.Serializable;
1414
import java.sql.Connection;
1515
import java.sql.PreparedStatement;
@@ -18,6 +18,8 @@
1818
import java.util.ArrayList;
1919
import java.util.List;
2020

21+
import static eu.europeana.processing.exception.classifier.DatabaseExceptionClassifier.classifyAndThrow;
22+
2123
/**
2224
* Database repository responsible for <b>execution_record</b> table
2325
*/
@@ -67,9 +69,9 @@ public ExecutionRecordRepository(DbConnectionProvider dbConnectionProvider) {
6769
* In case of conflict (try to insert the same record twice) the method does nothing.
6870
*
6971
* @param executionRecordResult instance to be saved in the database
70-
* @throws IOException in case of any DB exception
72+
* @throws FlinkWorkflowException in case of any DB exception
7173
*/
72-
public void save(ExecutionRecordResult executionRecordResult) throws IOException {
74+
public void save(ExecutionRecordResult executionRecordResult) throws FlinkWorkflowException {
7375

7476
try (Connection con = dbConnectionProvider.getConnection();
7577
PreparedStatement preparedStatement = con.prepareStatement(
@@ -88,7 +90,7 @@ public void save(ExecutionRecordResult executionRecordResult) throws IOException
8890
LOGGER.info("Execution record already existed in the DB: {}", executionRecord.getExecutionRecordKey());
8991
}
9092
} catch (SQLException e) {
91-
throw new IOException(e);
93+
classifyAndThrow(e);
9294
}
9395
}
9496

@@ -98,26 +100,25 @@ public void save(ExecutionRecordResult executionRecordResult) throws IOException
98100
* @param datasetId dataset identifier
99101
* @param executionId execution identifier
100102
* @return number of elements in <b>execution_record</b> table for specified dataset and execution
101-
* @throws IOException in case of any DB exception
103+
* @throws FlinkWorkflowException in case of any DB exception
102104
*/
103-
public long countByDatasetIdAndExecutionId(String datasetId, String executionId) throws IOException {
105+
public long countByDatasetIdAndExecutionId(String datasetId, String executionId) throws FlinkWorkflowException {
104106

105-
ResultSet resultSet;
107+
long count = 0L;
106108
try (Connection con = dbConnectionProvider.getConnection();
107109
PreparedStatement preparedStatement = con.prepareStatement(NO_OF_ELEMENTS)) {
108110
preparedStatement.setString(1, datasetId);
109111
preparedStatement.setString(2, executionId);
110112

111-
resultSet = preparedStatement.executeQuery();
113+
ResultSet resultSet = preparedStatement.executeQuery();
112114

113115
if (resultSet.next()) {
114-
return resultSet.getLong("elements");
115-
} else {
116-
return 0L;
116+
count = resultSet.getLong("elements");
117117
}
118118
} catch (SQLException e) {
119-
throw new IOException(e);
119+
classifyAndThrow(e);
120120
}
121+
return count;
121122
}
122123

123124
/**
@@ -127,14 +128,15 @@ public long countByDatasetIdAndExecutionId(String datasetId, String executionId)
127128
* @param offset dataset offset
128129
* @param limit dataset limit
129130
* @return list of all {@link ExecutionRecord} fulfilling provided criteria
130-
* @throws IOException in case of any DB exception
131+
* @throws FlinkWorkflowException in case of any DB exception
131132
*/
132133
//TODO to be changed, returned list may be really big
133134
public List<ExecutionRecord> getByDatasetIdAndExecutionIdAndOffsetAndLimit(
134135
String datasetId,
135136
String executionId,
136137
long offset,
137-
long limit) throws IOException {
138+
long limit) throws FlinkWorkflowException {
139+
List<ExecutionRecord> result = new ArrayList<>();
138140
try (Connection con = dbConnectionProvider.getConnection();
139141
PreparedStatement preparedStatement = con.prepareStatement(LIMIT)) {
140142

@@ -143,7 +145,6 @@ public List<ExecutionRecord> getByDatasetIdAndExecutionIdAndOffsetAndLimit(
143145
preparedStatement.setLong(3, offset);
144146
preparedStatement.setLong(4, limit);
145147

146-
List<ExecutionRecord> result = new ArrayList<>();
147148
ResultSet resultSet = preparedStatement.executeQuery();
148149
while (resultSet.next()) {
149150
result.add(
@@ -159,9 +160,9 @@ public List<ExecutionRecord> getByDatasetIdAndExecutionIdAndOffsetAndLimit(
159160
.build()
160161
);
161162
}
162-
return result;
163163
} catch (SQLException e) {
164-
throw new IOException(e);
164+
classifyAndThrow(e);
165165
}
166+
return result;
166167
}
167168
}

0 commit comments

Comments
 (0)