Skip to content

Commit 6ea75eb

Browse files
committed
MET-6144 Review remarks - part 3 Code formating
1 parent 2e65580 commit 6ea75eb

File tree

20 files changed

+115
-99
lines changed

20 files changed

+115
-99
lines changed

commons/src/main/java/eu/europeana/processing/model/DataPartition.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,24 @@
88

99
/**
1010
* Class describing actual partition of data that is delivered to reader
11-
*
1211
*/
1312
@Value
1413
@AllArgsConstructor
1514
public class DataPartition implements AbstractPartition {
1615

17-
@Serial
18-
private static final long serialVersionUID = 1;
16+
@Serial
17+
private static final long serialVersionUID = 1;
1918

20-
long offset;
21-
long limit;
22-
@With
23-
long progress;
24-
@With
25-
UUID enumeratorId;
19+
long offset;
20+
long limit;
21+
@With
22+
long progress;
23+
@With
24+
UUID enumeratorId;
2625

27-
@Override
28-
public String splitId() {
29-
return offset+"_"+limit;
30-
}
26+
@Override
27+
public String splitId() {
28+
return offset + "_" + limit;
29+
}
3130

3231
}

commons/src/main/java/eu/europeana/processing/source/AbstractDbEnumerator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99

1010
/**
1111
* Flink enumerator that provides splits for Metis jobs using as a source PostgresDB
12+
*
1213
* @param <S> state used by an implementation of enumerator which is saved in snapshot
1314
*/
14-
public abstract class AbstractDbEnumerator<S extends DbEnumeratorState> extends AbstractEnumerator<DataPartition,S> {
15+
public abstract class AbstractDbEnumerator<S extends DbEnumeratorState> extends AbstractEnumerator<DataPartition, S> {
16+
1517
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDbEnumerator.class);
1618

1719

@@ -67,7 +69,7 @@ protected DataPartition createNextPartition() {
6769

6870
@Override
6971
public S snapshotState(long checkpointId) {
70-
S state=super.snapshotState(checkpointId);
72+
S state = super.snapshotState(checkpointId);
7173
state.setRecordsToBeProcessed(recordsToBeProcessed);
7274
return state;
7375
}

commons/src/main/java/eu/europeana/processing/source/AbstractEnumerator.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@
2525

2626
/**
2727
* Base Flink enumerator base for different sources, implementing checkpointing failover.
28+
*
2829
* @param <P> split type that is used by the source implementation
2930
* @param <S> state used by an implementation of enumerator which is saved in snapshot
3031
*/
31-
public abstract class AbstractEnumerator<P extends AbstractPartition,S extends AbstractEnumeratorState<P>> implements SplitEnumerator<P, S> {
32+
public abstract class AbstractEnumerator<P extends AbstractPartition, S extends AbstractEnumeratorState<P>> implements
33+
SplitEnumerator<P, S> {
3234

3335
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractEnumerator.class);
3436
private static final int DEFAULT_CHUNK_SIZE = 1000;
@@ -142,14 +144,14 @@ private void addSplitBack(P split, int subtaskId) {
142144
if (inExecutingMap != null) {
143145
split = updateProgress(inExecutingMap, split.getProgress());
144146
}
145-
returnedPartitions.put(split.splitId(), updateProgress(returnedPartitions.get(split.splitId()),split.getProgress()));
147+
returnedPartitions.put(split.splitId(), updateProgress(returnedPartitions.get(split.splitId()), split.getProgress()));
146148
LOGGER.info(
147149
"Added split: {} from subtask: {} back. Currently executing: {} splits, all returned splits: {}",
148150
split, subtaskId, executingPartitions.size(), returnedPartitions.size());
149151
}
150152

151153
private P updateProgress(P previousSplit, long progress) {
152-
LOGGER.debug("Updating progress to: {}, for split: {}",progress, previousSplit);
154+
LOGGER.debug("Updating progress to: {}, for split: {}", progress, previousSplit);
153155
P current = (P) previousSplit.withProgress(progress);
154156
long progressIncrease = current.getProgress() - previousSplit.getProgress();
155157
emittedRecordCount += progressIncrease;
@@ -234,7 +236,6 @@ public void close() throws IOException {
234236
}
235237

236238

237-
238239
private List<P> getIncompletePartitionsSnapshot() {
239240
List<P> incompletePartitions = new ArrayList<>();
240241
incompletePartitions.addAll(executingPartitions.values());

commons/src/main/java/eu/europeana/processing/source/DbEnumeratorState.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
@NoArgsConstructor
2121
@SuperBuilder
2222
public class DbEnumeratorState extends AbstractEnumeratorState<DataPartition> {
23+
2324
@Serial
2425
private static final long serialVersionUID = 2;
2526
}

commons/src/main/java/eu/europeana/processing/source/DbSourceWithProgressHandling.java

Lines changed: 40 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,45 +14,45 @@
1414

1515
public class DbSourceWithProgressHandling implements Source<ExecutionRecord, DataPartition, DbEnumeratorState> {
1616

17-
@Serial
18-
private static final long serialVersionUID = 1;
19-
20-
private final ParameterTool parameterTool;
21-
22-
public DbSourceWithProgressHandling(ParameterTool parameterTool) {
23-
this.parameterTool = parameterTool;
24-
}
25-
26-
@Override
27-
public Boundedness getBoundedness() {
28-
return Boundedness.BOUNDED;
29-
}
30-
31-
@Override
32-
public SplitEnumerator<DataPartition, DbEnumeratorState> createEnumerator(SplitEnumeratorContext<DataPartition> enumContext) {
33-
return new RegularDbEnumerator(enumContext, parameterTool);
34-
}
35-
36-
@Override
37-
public SplitEnumerator<DataPartition, DbEnumeratorState> restoreEnumerator(
38-
SplitEnumeratorContext<DataPartition> enumContext,
39-
DbEnumeratorState state) {
40-
return new RegularDbEnumerator(enumContext, parameterTool, state);
41-
}
42-
43-
@Override
44-
public SourceReader<ExecutionRecord, DataPartition> createReader(SourceReaderContext readerContext) {
45-
return new RegularDbReader(readerContext, parameterTool);
46-
}
47-
48-
@Override
49-
public SimpleVersionedSerializer<DataPartition> getSplitSerializer() {
50-
return new ObjectStreamVersionedSerializer<>();
51-
}
52-
53-
@Override
54-
public SimpleVersionedSerializer<DbEnumeratorState> getEnumeratorCheckpointSerializer() {
55-
return new ObjectStreamVersionedSerializer<>();
56-
}
17+
@Serial
18+
private static final long serialVersionUID = 1;
19+
20+
private final ParameterTool parameterTool;
21+
22+
public DbSourceWithProgressHandling(ParameterTool parameterTool) {
23+
this.parameterTool = parameterTool;
24+
}
25+
26+
@Override
27+
public Boundedness getBoundedness() {
28+
return Boundedness.BOUNDED;
29+
}
30+
31+
@Override
32+
public SplitEnumerator<DataPartition, DbEnumeratorState> createEnumerator(SplitEnumeratorContext<DataPartition> enumContext) {
33+
return new RegularDbEnumerator(enumContext, parameterTool);
34+
}
35+
36+
@Override
37+
public SplitEnumerator<DataPartition, DbEnumeratorState> restoreEnumerator(
38+
SplitEnumeratorContext<DataPartition> enumContext,
39+
DbEnumeratorState state) {
40+
return new RegularDbEnumerator(enumContext, parameterTool, state);
41+
}
42+
43+
@Override
44+
public SourceReader<ExecutionRecord, DataPartition> createReader(SourceReaderContext readerContext) {
45+
return new RegularDbReader(readerContext, parameterTool);
46+
}
47+
48+
@Override
49+
public SimpleVersionedSerializer<DataPartition> getSplitSerializer() {
50+
return new ObjectStreamVersionedSerializer<>();
51+
}
52+
53+
@Override
54+
public SimpleVersionedSerializer<DbEnumeratorState> getEnumeratorCheckpointSerializer() {
55+
return new ObjectStreamVersionedSerializer<>();
56+
}
5757

5858
}

commons/src/main/java/eu/europeana/processing/source/ProgressSnapshotEvent.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
import org.apache.flink.api.connector.source.SourceEvent;
77

88
/**
9-
* Event with progress snapshot was created on a reader - number of record emitted by reader to execution.
10-
* The event does not mean that records are already stored in DB, but contains checkpointId, so could be
11-
* held and used when given checkpoint is completed.
9+
* Event with progress snapshot was created on a reader - number of record emitted by reader to execution. The event does not mean
10+
* that records are already stored in DB, but contains checkpointId, so could be held and used when given checkpoint is
11+
* completed.
1212
*/
1313
@Value
1414
public class ProgressSnapshotEvent implements SourceEvent {

commons/src/main/java/eu/europeana/processing/source/ProgressUpdater.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ public class ProgressUpdater implements Closeable {
2424

2525
/**
2626
* Creates ProgressUpdater
27+
*
2728
* @param parameterTool - all the command line parameters of the job
28-
* @param completedFilesCount - Number of files already completed. It is greater than 0 only if the
29-
* source is restored from a checkpoint.
29+
* @param completedFilesCount - Number of files already completed. It is greater than 0 only if the source is restored from a
30+
* checkpoint.
3031
* @param dbConnectionProvider - db connection provider
31-
*
3232
*/
3333
public ProgressUpdater(DbConnectionProvider dbConnectionProvider, ParameterTool parameterTool, long completedFilesCount) {
3434
this.dbConnectionProvider = dbConnectionProvider;
@@ -40,8 +40,9 @@ public ProgressUpdater(DbConnectionProvider dbConnectionProvider, ParameterTool
4040

4141

4242
/**
43-
* Stores count of emitted files. It is invoked during doing snapshot of enumerator state.
44-
* This count that is later saved as progress into DB after the checkpoint is completed.
43+
* Stores count of emitted files. It is invoked during doing snapshot of enumerator state. This count that is later saved as
44+
* progress into DB after the checkpoint is completed.
45+
*
4546
* @param shapshotedEmittedFilesCount - number of emitted files.
4647
*/
4748
public void snapshotEmittedFilesCount(long shapshotedEmittedFilesCount) {
@@ -50,8 +51,8 @@ public void snapshotEmittedFilesCount(long shapshotedEmittedFilesCount) {
5051
}
5152

5253
/**
53-
* Saves previously stored count of emitted files in the DB. Invoked after the checkpoint is completed,
54-
* what means that the emitted files are already stored in the DB.
54+
* Saves previously stored count of emitted files in the DB. Invoked after the checkpoint is completed, what means that the
55+
* emitted files are already stored in the DB.
5556
*/
5657
public void saveProgressInDB() {
5758
if (snapshottedEmittedFilesCount != lastStoredFilesCount) {
@@ -63,14 +64,14 @@ public void saveProgressInDB() {
6364
taskInfoRepo.update(taskInfo);
6465
lastStoredFilesCount = snapshottedEmittedFilesCount;
6566
LOGGER.info("Updated task progress in DB: {}", taskInfo);
66-
}else{
67+
} else {
6768
LOGGER.info("Need not update progress for: {}", taskId);
6869
}
6970
}
7071

7172
public void close() {
72-
dbConnectionProvider.close();
73-
LOGGER.debug("Closed: {}", ProgressUpdater.class.getSimpleName());
73+
dbConnectionProvider.close();
74+
LOGGER.debug("Closed: {}", ProgressUpdater.class.getSimpleName());
7475
}
7576

7677
}

commons/src/main/java/eu/europeana/processing/source/RegularDbEnumerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public RegularDbEnumerator(SplitEnumeratorContext<DataPartition> context, Parame
3434
* @param state enumerator state container
3535
*/
3636
public RegularDbEnumerator(SplitEnumeratorContext<DataPartition> context,
37-
ParameterTool parameterTool,DbEnumeratorState state) {
37+
ParameterTool parameterTool, DbEnumeratorState state) {
3838
super(context, parameterTool, state);
3939
}
4040

commons/src/main/java/eu/europeana/processing/source/RegularDbReader.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
* SourceReader implementation for regular - all but not harvesting jobs.
1616
*/
1717
public class RegularDbReader extends AbstractDbReader<ExecutionRecord> {
18+
1819
private static final Logger LOGGER = LoggerFactory.getLogger(RegularDbReader.class);
1920
private ExecutionRecordRepository executionRecordRepository;
2021

2122
/**
2223
* Creates RegularDbReader
24+
*
2325
* @param context - Flink context
2426
* @param parameterTool - job parameters
2527
*/

http/src/main/java/eu/europeana/processing/http/reader/HttpEnumerator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
/**
1919
* The SplitEnumerator implementation for HttpSource. It downloads an archive file and provided splits containing lists of the
20-
* files from the archive and chosen extraction mode - depending on the archive type. Depending on the archive type, the archive also
21-
* is or is not extracted to the shared temporary directory - more described in: {@link ArchiveFileNamesExtractor}
20+
* files from the archive and chosen extraction mode - depending on the archive type. Depending on the archive type, the archive
21+
* also is or is not extracted to the shared temporary directory - more described in: {@link ArchiveFileNamesExtractor}
2222
*/
2323
public class HttpEnumerator extends AbstractEnumerator<HttpSourceSplit, HttpEnumeratorState> {
2424

@@ -123,7 +123,7 @@ protected HttpSourceSplit createNextPartition() {
123123

124124
startedRecordsCount += fileNames.size();
125125
return split;
126-
}else{
126+
} else {
127127
return null;
128128
}
129129
}

0 commit comments

Comments
 (0)