Skip to content

Commit 83a4cd4

Browse files
committed
MET-6679 more tests for ProgressUpdater and RegularDbReader
1 parent 57f404a commit 83a4cd4

File tree

12 files changed

+353
-48
lines changed

12 files changed

+353
-48
lines changed

commons/src/main/java/eu/europeana/processing/source/AbstractDbReader.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,8 @@ protected AbstractDbReader(
6767
public void start() {
6868
LOGGER.info("Starting: {}", getClass().getSimpleName());
6969
dbConnectionProvider = new DbConnectionProvider(parameterTool);
70-
createRepositories();
7170
}
7271

73-
protected abstract void createRepositories() ;
74-
7572
@Override
7673
public InputStatus pollNext(ReaderOutput<R> output) throws Exception {
7774
LOGGER.debug("Pooling next record");

commons/src/main/java/eu/europeana/processing/source/AbstractEnumerator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ protected AbstractEnumerator(SplitEnumeratorContext<P> context, ParameterTool pa
9090
public void start() {
9191
LOGGER.info("Starting enumerator");
9292
dbConnectionProvider = new DbConnectionProvider(parameterTool);
93-
progressUpdater = new ProgressUpdater(dbConnectionProvider, parameterTool, emittedRecordCount);
93+
progressUpdater = new ProgressUpdater(
94+
new TaskInfoRepository(dbConnectionProvider),
95+
parameterTool,
96+
emittedRecordCount);
9497
createDbRepositories();
9598
taskInfoRepo = RetryableMethodExecutor.createRetryProxy(new TaskInfoRepository(dbConnectionProvider));
9699
validateTaskExists();

commons/src/main/java/eu/europeana/processing/source/DbSourceWithProgressHandling.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
package eu.europeana.processing.source;
22

3+
import eu.europeana.processing.DbConnectionProvider;
34
import eu.europeana.processing.model.DataPartition;
45
import eu.europeana.processing.model.ExecutionRecord;
6+
import eu.europeana.processing.repository.ExecutionRecordRepository;
7+
import eu.europeana.processing.retryable.RetryableMethodExecutor;
58
import java.io.Serial;
69
import org.apache.flink.api.connector.source.Boundedness;
710
import org.apache.flink.api.connector.source.Source;
811
import org.apache.flink.api.connector.source.SourceReader;
912
import org.apache.flink.api.connector.source.SourceReaderContext;
1013
import org.apache.flink.api.connector.source.SplitEnumerator;
1114
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
12-
import org.apache.flink.util.ParameterTool;
1315
import org.apache.flink.core.io.SimpleVersionedSerializer;
16+
import org.apache.flink.util.ParameterTool;
1417

1518
public class DbSourceWithProgressHandling implements Source<ExecutionRecord, DataPartition, DbEnumeratorState> {
1619

@@ -42,7 +45,15 @@ public SplitEnumerator<DataPartition, DbEnumeratorState> restoreEnumerator(
4245

4346
@Override
4447
public SourceReader<ExecutionRecord, DataPartition> createReader(SourceReaderContext readerContext) {
45-
return new RegularDbReader(readerContext, parameterTool);
48+
return new RegularDbReader(
49+
readerContext,
50+
parameterTool,
51+
//TODO Using retry proxy is maybe not optimal strategy in this case. This source implements asynchronous interface, so
52+
// we could do this retries in poolNext() method by returning InputStatus.NOTHING_AVAILABLE, wait a bit and notify
53+
// completable future to poll source again. Or simple wait a bit in pollNext() but only once per one retry.
54+
// In such cases we would less block checkpointing mechanism, which should work smoothly in case of infrastructure problems
55+
// and potential job restarts. And when we do not block we could do more retries or longer pauses.
56+
RetryableMethodExecutor.createRetryProxy(new ExecutionRecordRepository(new DbConnectionProvider(parameterTool))));
4657
}
4758

4859
@Override

commons/src/main/java/eu/europeana/processing/source/ProgressUpdater.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ public class ProgressUpdater implements Closeable {
1717

1818
private static final Logger LOGGER = LoggerFactory.getLogger(ProgressUpdater.class);
1919
private final long taskId;
20-
private final DbConnectionProvider dbConnectionProvider;
2120
private final TaskInfoRepository taskInfoRepo;
2221
private long lastStoredFilesCount;
2322
private long snapshottedEmittedFilesCount = -1;
@@ -28,13 +27,12 @@ public class ProgressUpdater implements Closeable {
2827
* @param parameterTool - all the command line parameters of the job
2928
* @param completedFilesCount - Number of files already completed. It is greater than 0 only if the source is restored from a
3029
* checkpoint.
31-
* @param dbConnectionProvider - db connection provider
30+
* @param taskInfoRepository - repository for TaskInfo table
3231
*/
33-
public ProgressUpdater(DbConnectionProvider dbConnectionProvider, ParameterTool parameterTool, long completedFilesCount) {
34-
this.dbConnectionProvider = dbConnectionProvider;
32+
public ProgressUpdater(TaskInfoRepository taskInfoRepository, ParameterTool parameterTool, long completedFilesCount) {
33+
taskInfoRepo = taskInfoRepository;
3534
this.taskId = parameterTool.getLong(JobParamName.TASK_ID);
3635
lastStoredFilesCount = completedFilesCount;
37-
taskInfoRepo = RetryableMethodExecutor.createRetryProxy(new TaskInfoRepository(dbConnectionProvider));
3836
LOGGER.debug("Created ProgressUpdater");
3937
}
4038

@@ -70,7 +68,6 @@ public void saveProgressInDB() {
7068
}
7169

7270
public void close() {
73-
dbConnectionProvider.close();
7471
LOGGER.debug("Closed: {}", ProgressUpdater.class.getSimpleName());
7572
}
7673

commons/src/main/java/eu/europeana/processing/source/RegularDbReader.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,20 @@
1717
public class RegularDbReader extends AbstractDbReader<ExecutionRecord> {
1818

1919
private static final Logger LOGGER = LoggerFactory.getLogger(RegularDbReader.class);
20-
private ExecutionRecordRepository executionRecordRepository;
20+
private final ExecutionRecordRepository executionRecordRepository;
2121

2222
/**
2323
* Creates RegularDbReader
2424
*
2525
* @param context - Flink context
2626
* @param parameterTool - job parameters
2727
*/
28-
public RegularDbReader(SourceReaderContext context, ParameterTool parameterTool) {
28+
public RegularDbReader(SourceReaderContext context, ParameterTool parameterTool, ExecutionRecordRepository executionRecordRepository) {
2929
super(context, parameterTool);
30+
this.executionRecordRepository = executionRecordRepository;
3031
LOGGER.info("Created RegularDbReader");
3132
}
3233

33-
protected void createRepositories() {
34-
//TODO Using retry proxy is maybe not optimal strategy in this case. This source implements asynchronous interface, so
35-
// we could do this retries in poolNext() method by returning InputStatus.NOTHING_AVAILABLE, wait a bit and notify
36-
// completable future to poll source again. Or simple wait a bit in pollNext() but only once per one retry.
37-
// In such cases we would less block checkpointing mechanism, which should work smoothly in case of infrastructure problems
38-
// and potential job restarts. And when we do not block we could do more retries or longer pauses.
39-
executionRecordRepository = RetryableMethodExecutor.createRetryProxy(new ExecutionRecordRepository(dbConnectionProvider));
40-
}
41-
4234
protected List<ExecutionRecord> fetchRecords() throws IOException {
4335
return executionRecordRepository.getByDatasetIdAndExecutionIdAndOffsetAndLimit(
4436
parameterTool.getRequired(JobParamName.DATASET_ID),

commons/src/test/java/eu/europeana/processing/repository/TaskInfoRepositoryTest.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.assertj.db.type.AssertDbConnection;
1111
import org.assertj.db.type.AssertDbConnectionFactory;
1212
import org.assertj.db.type.Request.Builder;
13-
import org.junit.jupiter.api.AfterAll;
1413
import org.junit.jupiter.api.BeforeAll;
1514
import org.junit.jupiter.api.Test;
1615

@@ -25,12 +24,6 @@ static void before() {
2524
prepareRequests();
2625
}
2726

28-
@AfterAll
29-
static void after() {
30-
postgres.stop();
31-
}
32-
33-
3427
@Test
3528
void shouldSaveCorrectTaskInfo() {
3629
TaskInfoRepository taskInfoRepository = prepareRepository();
@@ -42,18 +35,21 @@ void shouldSaveCorrectTaskInfo() {
4235
assertThat(
4336
request.parameters(1).build()
4437
).hasNumberOfRows(1)
38+
.row(0).value(TaskInfoRepository.TASK_ID_COL_NAME).isEqualTo(1)
4539
.row(0).value(TaskInfoRepository.COMMIT_COUNT_COL_NAME).isEqualTo(12)
4640
.row(0).value(TaskInfoRepository.WRITE_COUNT_COL_NAME).isEqualTo(14);
4741

4842
assertThat(
4943
request.parameters(2).build()
5044
).hasNumberOfRows(1)
45+
.row(0).value(TaskInfoRepository.TASK_ID_COL_NAME).isEqualTo(2)
5146
.row(0).value(TaskInfoRepository.COMMIT_COUNT_COL_NAME).isEqualTo(0)
5247
.row(0).value(TaskInfoRepository.WRITE_COUNT_COL_NAME).isEqualTo(10);
5348

5449
assertThat(
5550
request.parameters(3).build()
5651
).hasNumberOfRows(1)
52+
.row(0).value(TaskInfoRepository.TASK_ID_COL_NAME).isEqualTo(3)
5753
.row(0).value(TaskInfoRepository.COMMIT_COUNT_COL_NAME).isEqualTo(20)
5854
.row(0).value(TaskInfoRepository.WRITE_COUNT_COL_NAME).isEqualTo(30);
5955
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package eu.europeana.processing.source;
2+
3+
import eu.europeana.processing.job.JobParamName;
4+
import eu.europeana.processing.model.TaskInfo;
5+
import eu.europeana.processing.repository.TaskInfoRepository;
6+
import org.apache.flink.util.ParameterTool;
7+
import org.junit.jupiter.api.Assertions;
8+
import org.junit.jupiter.api.Test;
9+
import org.mockito.ArgumentCaptor;
10+
import org.mockito.Mockito;
11+
12+
class ProgressUpdaterTest {
13+
14+
@Test
15+
void shouldSavePropperProgressInDbForFirstExecution() {
16+
TaskInfoRepository taskInfoRepository = Mockito.mock(TaskInfoRepository.class);
17+
18+
try (ProgressUpdater progressUpdater = new ProgressUpdater(taskInfoRepository, ParameterTool.fromArgs(
19+
new String[]{
20+
"-" + JobParamName.TASK_ID, "12"
21+
}
22+
), 100);) {
23+
progressUpdater.saveProgressInDB();
24+
25+
ArgumentCaptor<TaskInfo> captor = ArgumentCaptor.forClass(TaskInfo.class);
26+
27+
Mockito.verify(taskInfoRepository, Mockito.times(1)).update(captor.capture());
28+
TaskInfo emitted = captor.getValue();
29+
Assertions.assertEquals(12, emitted.taskId());
30+
Assertions.assertEquals(0, emitted.commitCount());
31+
Assertions.assertEquals(-1, emitted.writeCount());
32+
}
33+
}
34+
35+
@Test
36+
void shouldSavePropperProgressInDbForConsecutiveExecution() {
37+
TaskInfoRepository taskInfoRepository = Mockito.mock(TaskInfoRepository.class);
38+
39+
try (ProgressUpdater progressUpdater = new ProgressUpdater(taskInfoRepository, ParameterTool.fromArgs(
40+
new String[]{
41+
"-" + JobParamName.TASK_ID, "12"
42+
}
43+
), 80);) {
44+
45+
progressUpdater.snapshotEmittedFilesCount(100);
46+
progressUpdater.saveProgressInDB();
47+
48+
ArgumentCaptor<TaskInfo> captor = ArgumentCaptor.forClass(TaskInfo.class);
49+
50+
Mockito.verify(taskInfoRepository, Mockito.times(1)).update(captor.capture());
51+
TaskInfo emitted = captor.getValue();
52+
Assertions.assertEquals(12, emitted.taskId());
53+
Assertions.assertEquals(0, emitted.commitCount());
54+
Assertions.assertEquals(100, emitted.writeCount());
55+
}
56+
}
57+
58+
@Test
59+
void shouldNotSavePropperProgressInDb() {
60+
TaskInfoRepository taskInfoRepository = Mockito.mock(TaskInfoRepository.class);
61+
62+
try (ProgressUpdater progressUpdater = new ProgressUpdater(taskInfoRepository, ParameterTool.fromArgs(
63+
new String[]{
64+
"-" + JobParamName.TASK_ID, "12"
65+
}
66+
), 100);) {
67+
progressUpdater.snapshotEmittedFilesCount(100);
68+
progressUpdater.saveProgressInDB();
69+
70+
Mockito.verify(taskInfoRepository, Mockito.times(0)).update(Mockito.any());
71+
}
72+
}
73+
}

0 commit comments

Comments
 (0)