Skip to content

Commit 0f9b348

Browse files
committed
modified based on CR
1 parent abe1bc7 commit 0f9b348

File tree

9 files changed

+34
-35
lines changed

9 files changed

+34
-35
lines changed

flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
* A container class hosting the information of a {@link SourceReader}.
3030
*
3131
* <p>The {@code reportedSplitsOnRegistration} can only be provided when the source implements
32-
* {@link SupportSplitReassignmentOnRecovery}.
32+
* {@link SupportsSplitReassignmentOnRecovery}.
3333
*/
3434
@Public
3535
public final class ReaderInfo implements Serializable {
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import org.apache.flink.annotation.PublicEvolving;
2222

2323
/**
24-
* A decorative interface {@link Source}. Implementing this interface indicates that the source
24+
* A decorative interface for {@link Source}. Implementing this interface indicates that the source
2525
* operator needs to report splits to the enumerator and receive reassignment.
2626
*/
2727
@PublicEvolving
28-
public interface SupportSplitReassignmentOnRecovery {}
28+
public interface SupportsSplitReassignmentOnRecovery {}

flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
/** A mock {@link SplitEnumerator} for unit tests. */
4242
public class MockSplitEnumerator
4343
implements SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>>, SupportsBatchSnapshot {
44-
// 扩成16个partition, unas
4544
private final Map<Integer, Set<MockSourceSplit>> pendingSplitAssignment;
4645
private final Map<String, Integer> globalSplitAssignment;
4746
private final SplitEnumeratorContext<MockSourceSplit> enumContext;
@@ -56,7 +55,7 @@ public MockSplitEnumerator(int numSplits, SplitEnumeratorContext<MockSourceSplit
5655
for (int i = 0; i < numSplits; i++) {
5756
unassignedSplits.add(new MockSourceSplit(i));
5857
}
59-
calculateAndPutPendingAssignments(unassignedSplits);
58+
recalculateAssignments(unassignedSplits);
6059
}
6160

6261
public MockSplitEnumerator(
@@ -69,7 +68,7 @@ public MockSplitEnumerator(
6968
this.successfulCheckpoints = new ArrayList<>();
7069
this.started = false;
7170
this.closed = false;
72-
calculateAndPutPendingAssignments(unassignedSplits);
71+
recalculateAssignments(unassignedSplits);
7372
}
7473

7574
@Override
@@ -100,16 +99,16 @@ public void addReader(int subtaskId) {
10099
List<MockSourceSplit> addBackSplits = new ArrayList<>();
101100
for (MockSourceSplit split : splitsOnRecovery) {
102101
if (!globalSplitAssignment.containsKey(split.splitId())) {
103-
// if split not existed in globalSplitAssignment, mean that it's registered first
104-
// time, can be redistibuted.
102+
// if the split is not present in globalSplitAssignment, it means that this split is
103+
// being registered for the first time and is eligible for redistribution.
105104
redistributedSplits.add(split);
106105
} else if (!globalSplitAssignment.containsKey(split.splitId())) {
107-
// if split already is assigned to other substaskId, just ignore it. Otherwise,
108-
// addback to this subtaskId again.
106+
// if split is already assigned to other sub-task, just ignore it. Otherwise, add
107+
// back to this sub-task again.
109108
addBackSplits.add(split);
110109
}
111110
}
112-
calculateAndPutPendingAssignments(redistributedSplits);
111+
recalculateAssignments(redistributedSplits);
113112
putPendingAssignments(subtaskId, addBackSplits);
114113
assignAllSplits();
115114
}
@@ -167,7 +166,7 @@ private void assignAllSplits() {
167166
assignment.keySet().forEach(pendingSplitAssignment::remove);
168167
}
169168

170-
private void calculateAndPutPendingAssignments(Collection<MockSourceSplit> newSplits) {
169+
private void recalculateAssignments(Collection<MockSourceSplit> newSplits) {
171170
for (MockSourceSplit split : newSplits) {
172171
int subtaskId = Integer.parseInt(split.splitId()) % enumContext.currentParallelism();
173172
putPendingAssignments(subtaskId, Collections.singletonList(split));

flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ Licensed to the Apache Software Foundation (ASF) under one
1919
package org.apache.flink.runtime.source.event;
2020

2121
import org.apache.flink.api.connector.source.SourceSplit;
22+
import org.apache.flink.api.connector.source.SupportsSplitReassignmentOnRecovery;
2223
import org.apache.flink.core.io.SimpleVersionedSerializer;
2324
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
2425

@@ -30,7 +31,7 @@ Licensed to the Apache Software Foundation (ASF) under one
3031
/**
3132
* The SourceOperator should always send the ReaderRegistrationEvent with the
3233
* `reportedSplitsOnRegistration` list. But it will not add the splits to readers if {@link
33-
* org.apache.flink.api.connector.source.SupportSplitReassignmentOnRecovery} is implemented.
34+
* SupportsSplitReassignmentOnRecovery} is implemented.
3435
*/
3536
public class ReaderRegistrationEvent implements OperatorEvent {
3637

flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ private enum OperatingMode {
215215
/** Watermark identifier to whether the watermark are aligned. */
216216
private final Map<String, Boolean> watermarkIsAlignedMap;
217217

218-
private final boolean supportSupportSplitReassignmentOnRecovery;
218+
private final boolean supportsSplitReassignmentOnRecovery;
219219

220220
public SourceOperator(
221221
StreamOperatorParameters<OUT> parameters,
@@ -230,7 +230,7 @@ public SourceOperator(
230230
boolean emitProgressiveWatermarks,
231231
CanEmitBatchOfRecordsChecker canEmitBatchOfRecords,
232232
Map<String, Boolean> watermarkIsAlignedMap,
233-
boolean supportSupportSplitReassignmentOnRecovery) {
233+
boolean supportsSplitReassignmentOnRecovery) {
234234
super(parameters);
235235
this.readerFactory = checkNotNull(readerFactory);
236236
this.operatorEventGateway = checkNotNull(operatorEventGateway);
@@ -245,7 +245,7 @@ public SourceOperator(
245245
this.allowUnalignedSourceSplits = configuration.get(ALLOW_UNALIGNED_SOURCE_SPLITS);
246246
this.canEmitBatchOfRecords = checkNotNull(canEmitBatchOfRecords);
247247
this.watermarkIsAlignedMap = watermarkIsAlignedMap;
248-
this.supportSupportSplitReassignmentOnRecovery = supportSupportSplitReassignmentOnRecovery;
248+
this.supportsSplitReassignmentOnRecovery = supportsSplitReassignmentOnRecovery;
249249
}
250250

251251
@Override
@@ -415,7 +415,7 @@ public void open() throws Exception {
415415

416416
// restore the state if necessary.
417417
final List<SplitT> splits = CollectionUtil.iterableToList(readerState.get());
418-
if (!splits.isEmpty() && !supportSupportSplitReassignmentOnRecovery) {
418+
if (!splits.isEmpty() && !supportsSplitReassignmentOnRecovery) {
419419
LOG.info("Restoring state for {} split(s) to reader.", splits.size());
420420
for (SplitT s : splits) {
421421
getOrCreateSplitMetricGroup(s.splitId());
@@ -425,8 +425,7 @@ public void open() throws Exception {
425425
}
426426

427427
// Register the reader to the coordinator.
428-
registerReader(
429-
supportSupportSplitReassignmentOnRecovery ? splits : Collections.emptyList());
428+
registerReader(supportsSplitReassignmentOnRecovery ? splits : Collections.emptyList());
430429

431430
sourceMetricGroup.idlingStarted();
432431
// Start the reader after registration, sending messages in start is allowed.

flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ Licensed to the Apache Software Foundation (ASF) under one
2727
import org.apache.flink.api.connector.source.SourceReader;
2828
import org.apache.flink.api.connector.source.SourceReaderContext;
2929
import org.apache.flink.api.connector.source.SourceSplit;
30-
import org.apache.flink.api.connector.source.SupportSplitReassignmentOnRecovery;
30+
import org.apache.flink.api.connector.source.SupportsSplitReassignmentOnRecovery;
3131
import org.apache.flink.configuration.Configuration;
3232
import org.apache.flink.core.io.SimpleVersionedSerializer;
3333
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -130,7 +130,7 @@ public <T extends StreamOperator<OUT>> T createStreamOperator(
130130
emitProgressiveWatermarks,
131131
parameters.getContainingTask().getCanEmitBatchOfRecords(),
132132
getSourceWatermarkDeclarations(),
133-
source instanceof SupportSplitReassignmentOnRecovery);
133+
source instanceof SupportsSplitReassignmentOnRecovery);
134134

135135
parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, sourceOperator);
136136

@@ -202,7 +202,7 @@ SourceOperator<T, SplitT> instantiateSourceOperator(
202202
boolean emitProgressiveWatermarks,
203203
CanEmitBatchOfRecordsChecker canEmitBatchOfRecords,
204204
Collection<? extends WatermarkDeclaration> watermarkDeclarations,
205-
boolean supportSupportSplitReassignmentOnRecovery) {
205+
boolean supportsSplitReassignmentOnRecovery) {
206206

207207
// jumping through generics hoops: cast the generics away to then cast them back more
208208
// strictly typed
@@ -235,6 +235,6 @@ SourceOperator<T, SplitT> instantiateSourceOperator(
235235
emitProgressiveWatermarks,
236236
canEmitBatchOfRecords,
237237
watermarkIsAlignedMap,
238-
supportSupportSplitReassignmentOnRecovery);
238+
supportsSplitReassignmentOnRecovery);
239239
}
240240
}

flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,21 +104,21 @@ void testInitializeState() throws Exception {
104104

105105
@ParameterizedTest
106106
@ValueSource(booleans = {true, false})
107-
void testOpen(boolean supportSupportSplitReassignmentOnRecovery) throws Exception {
107+
void testOpen(boolean supportsSplitReassignmentOnRecovery) throws Exception {
108108
try (SourceOperatorTestContext context =
109109
new SourceOperatorTestContext(
110110
false,
111111
false,
112112
WatermarkStrategy.noWatermarks(),
113113
new MockOutput<>(new ArrayList<>()),
114-
supportSupportSplitReassignmentOnRecovery)) {
114+
supportsSplitReassignmentOnRecovery)) {
115115
SourceOperator<Integer, MockSourceSplit> operator = context.getOperator();
116116
// Initialize the operator.
117117
operator.initializeState(context.createStateContext());
118118
// Open the operator.
119119
operator.open();
120120
// The source reader should have been assigned a split.
121-
if (supportSupportSplitReassignmentOnRecovery) {
121+
if (supportsSplitReassignmentOnRecovery) {
122122
assertThat(context.getSourceReader().getAssignedSplits()).isEmpty();
123123
} else {
124124
assertThat(context.getSourceReader().getAssignedSplits())
@@ -135,7 +135,7 @@ void testOpen(boolean supportSupportSplitReassignmentOnRecovery) throws Exceptio
135135
ReaderRegistrationEvent registrationEvent = (ReaderRegistrationEvent) operatorEvent;
136136
assertThat(registrationEvent.subtaskId())
137137
.isEqualTo(SourceOperatorTestContext.SUBTASK_INDEX);
138-
if (supportSupportSplitReassignmentOnRecovery) {
138+
if (supportsSplitReassignmentOnRecovery) {
139139
assertThat(registrationEvent.splits(new MockSourceSplitSerializer()))
140140
.containsExactly(SourceOperatorTestContext.MOCK_SPLIT);
141141
} else {

flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public SourceOperatorTestContext(
8383
boolean usePerSplitOutputs,
8484
WatermarkStrategy<Integer> watermarkStrategy,
8585
Output<StreamRecord<Integer>> output,
86-
boolean supportSupportSplitReassignmentOnRecovery)
86+
boolean supportsSplitReassignmentOnRecovery)
8787
throws Exception {
8888
mockSourceReader =
8989
new MockSourceReader(
@@ -111,7 +111,7 @@ public SourceOperatorTestContext(
111111
SUBTASK_INDEX,
112112
5,
113113
true,
114-
supportSupportSplitReassignmentOnRecovery);
114+
supportsSplitReassignmentOnRecovery);
115115
operator.initializeState(
116116
new StreamTaskStateInitializerImpl(env, new HashMapStateBackend()));
117117
}

flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public TestingSourceOperator(
6565
WatermarkStrategy<T> watermarkStrategy,
6666
ProcessingTimeService timeService,
6767
boolean emitProgressiveWatermarks,
68-
boolean supportSupportSplitReassignmentOnRecovery) {
68+
boolean supportsSplitReassignmentOnRecovery) {
6969

7070
this(
7171
parameters,
@@ -76,7 +76,7 @@ public TestingSourceOperator(
7676
1,
7777
5,
7878
emitProgressiveWatermarks,
79-
supportSupportSplitReassignmentOnRecovery);
79+
supportsSplitReassignmentOnRecovery);
8080
}
8181

8282
public TestingSourceOperator(
@@ -88,7 +88,7 @@ public TestingSourceOperator(
8888
int subtaskIndex,
8989
int parallelism,
9090
boolean emitProgressiveWatermarks,
91-
boolean supportSupportSplitReassignmentOnRecovery) {
91+
boolean supportsSplitReassignmentOnRecovery) {
9292

9393
super(
9494
parameters,
@@ -102,7 +102,7 @@ public TestingSourceOperator(
102102
emitProgressiveWatermarks,
103103
() -> false,
104104
Collections.emptyMap(),
105-
supportSupportSplitReassignmentOnRecovery);
105+
supportsSplitReassignmentOnRecovery);
106106

107107
this.subtaskIndex = subtaskIndex;
108108
this.parallelism = parallelism;
@@ -141,7 +141,7 @@ public static <T> SourceOperator<T, MockSourceSplit> createTestOperator(
141141
SourceReader<T, MockSourceSplit> reader,
142142
WatermarkStrategy<T> watermarkStrategy,
143143
boolean emitProgressiveWatermarks,
144-
boolean supportSupportSplitReassignmentOnRecovery)
144+
boolean supportsSplitReassignmentOnRecovery)
145145
throws Exception {
146146

147147
AbstractStateBackend abstractStateBackend = new HashMapStateBackend();
@@ -182,7 +182,7 @@ public static <T> SourceOperator<T, MockSourceSplit> createTestOperator(
182182
watermarkStrategy,
183183
timeService,
184184
emitProgressiveWatermarks,
185-
supportSupportSplitReassignmentOnRecovery);
185+
supportsSplitReassignmentOnRecovery);
186186
sourceOperator.initializeState(stateContext);
187187
sourceOperator.open();
188188

0 commit comments

Comments
 (0)