Skip to content

Commit ae962a8

Browse files
RocMarshalWeiZhong94
authored andcommitted
[FLINK-33389][runtime] Add the co-location related methods for related abstractions of adaptive scheduler.
1 parent 08050f8 commit ae962a8

File tree

4 files changed

+111
-5
lines changed

4 files changed

+111
-5
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.runtime.jobgraph.JobVertex;
2323
import org.apache.flink.runtime.jobgraph.JobVertexID;
2424
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
25+
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
2526
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
2627
import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
2728
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
@@ -30,6 +31,8 @@
3031

3132
import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables;
3233

34+
import javax.annotation.Nullable;
35+
3336
import java.util.Collection;
3437

3538
/** {@link JobInformation} created from a {@link JobGraph}. */
@@ -53,6 +56,11 @@ public Collection<SlotSharingGroup> getSlotSharingGroups() {
5356
return jobGraph.getSlotSharingGroups();
5457
}
5558

59+
@Override
60+
public Collection<CoLocationGroup> getCoLocationGroups() {
61+
return jobGraph.getCoLocationGroups();
62+
}
63+
5664
@Override
5765
public JobInformation.VertexInformation getVertexInformation(JobVertexID jobVertexId) {
5866
return new JobVertexInformation(
@@ -123,5 +131,11 @@ public int getMaxParallelism() {
123131
public SlotSharingGroup getSlotSharingGroup() {
124132
return jobVertex.getSlotSharingGroup();
125133
}
134+
135+
@Nullable
136+
@Override
137+
public CoLocationGroup getCoLocationGroup() {
138+
return jobVertex.getCoLocationGroup();
139+
}
126140
}
127141
}

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818
package org.apache.flink.runtime.scheduler.adaptive.allocator;
1919

2020
import org.apache.flink.runtime.jobgraph.JobVertexID;
21+
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
2122
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
2223

24+
import javax.annotation.Nullable;
25+
2326
import java.util.Collection;
2427

2528
/** Information about the job. */
@@ -34,6 +37,16 @@ public interface JobInformation {
3437
*/
3538
Collection<SlotSharingGroup> getSlotSharingGroups();
3639

40+
/**
41+
* Returns all co-location groups of the job.
42+
*
43+
* <p>Attention: The returned co-location groups should never be modified (its are indeed
44+
* mutable)!
45+
*
46+
* @return all co-location groups of the job
47+
*/
48+
Collection<CoLocationGroup> getCoLocationGroups();
49+
3750
VertexInformation getVertexInformation(JobVertexID jobVertexId);
3851

3952
Iterable<VertexInformation> getVertices();
@@ -49,5 +62,8 @@ interface VertexInformation {
4962
int getMaxParallelism();
5063

5164
SlotSharingGroup getSlotSharingGroup();
65+
66+
@Nullable
67+
CoLocationGroup getCoLocationGroup();
5268
}
5369
}

flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestJobInformation.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,24 @@
1818
package org.apache.flink.runtime.scheduler.adaptive.allocator;
1919

2020
import org.apache.flink.runtime.jobgraph.JobVertexID;
21+
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
2122
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
2223

2324
import java.util.Collection;
25+
import java.util.Collections;
2426
import java.util.Map;
27+
import java.util.Objects;
28+
import java.util.Set;
2529
import java.util.function.Function;
2630
import java.util.stream.Collectors;
2731

28-
class TestJobInformation implements JobInformation {
32+
public class TestJobInformation implements JobInformation {
2933

3034
private final Map<JobVertexID, VertexInformation> vertexIdToInformation;
31-
private final Collection<SlotSharingGroup> slotSharingGroups;
35+
private final Set<SlotSharingGroup> slotSharingGroups;
36+
private final Set<CoLocationGroup> coLocationGroups;
3237

33-
TestJobInformation(Collection<? extends VertexInformation> vertexIdToInformation) {
38+
public TestJobInformation(Collection<? extends VertexInformation> vertexIdToInformation) {
3439
this.vertexIdToInformation =
3540
vertexIdToInformation.stream()
3641
.collect(
@@ -40,13 +45,24 @@ class TestJobInformation implements JobInformation {
4045
vertexIdToInformation.stream()
4146
.map(VertexInformation::getSlotSharingGroup)
4247
.collect(Collectors.toSet());
48+
this.coLocationGroups =
49+
Collections.unmodifiableSet(
50+
vertexIdToInformation.stream()
51+
.map(VertexInformation::getCoLocationGroup)
52+
.filter(Objects::nonNull)
53+
.collect(Collectors.toSet()));
4354
}
4455

4556
@Override
4657
public Collection<SlotSharingGroup> getSlotSharingGroups() {
4758
return slotSharingGroups;
4859
}
4960

61+
@Override
62+
public Collection<CoLocationGroup> getCoLocationGroups() {
63+
return coLocationGroups;
64+
}
65+
5066
@Override
5167
public VertexInformation getVertexInformation(JobVertexID jobVertexId) {
5268
return vertexIdToInformation.get(jobVertexId);

flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,34 @@
1818
package org.apache.flink.runtime.scheduler.adaptive.allocator;
1919

2020
import org.apache.flink.runtime.jobgraph.JobVertexID;
21+
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl;
2122
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
2223

23-
class TestVertexInformation implements JobInformation.VertexInformation {
24+
import javax.annotation.Nullable;
25+
26+
import java.util.ArrayList;
27+
import java.util.Arrays;
28+
import java.util.List;
29+
import java.util.stream.Collectors;
30+
31+
public class TestVertexInformation implements JobInformation.VertexInformation {
2432

2533
private final JobVertexID jobVertexId;
2634
private final int minParallelism;
2735
private final int parallelism;
2836
private final SlotSharingGroup slotSharingGroup;
37+
@Nullable private final TestingCoLocationGroup coLocationGroup;
38+
39+
public TestVertexInformation(int parallelism, SlotSharingGroup slotSharingGroup) {
40+
this(new JobVertexID(), 1, parallelism, slotSharingGroup);
41+
}
42+
43+
public TestVertexInformation(
44+
int parallelism,
45+
SlotSharingGroup slotSharingGroup,
46+
TestingCoLocationGroup coLocationGroup) {
47+
this(new JobVertexID(), 1, parallelism, slotSharingGroup, coLocationGroup);
48+
}
2949

3050
TestVertexInformation(
3151
JobVertexID jobVertexId, int parallelism, SlotSharingGroup slotSharingGroup) {
@@ -37,11 +57,24 @@ class TestVertexInformation implements JobInformation.VertexInformation {
3757
int minParallelism,
3858
int parallelism,
3959
SlotSharingGroup slotSharingGroup) {
60+
this(jobVertexId, minParallelism, parallelism, slotSharingGroup, null);
61+
}
62+
63+
TestVertexInformation(
64+
JobVertexID jobVertexId,
65+
int minParallelism,
66+
int parallelism,
67+
SlotSharingGroup slotSharingGroup,
68+
@Nullable TestingCoLocationGroup coLocationGroup) {
4069
this.jobVertexId = jobVertexId;
4170
this.minParallelism = minParallelism;
4271
this.parallelism = parallelism;
4372
this.slotSharingGroup = slotSharingGroup;
44-
slotSharingGroup.addVertexToGroup(jobVertexId);
73+
this.slotSharingGroup.addVertexToGroup(jobVertexId);
74+
this.coLocationGroup = coLocationGroup;
75+
if (this.coLocationGroup != null) {
76+
this.coLocationGroup.addVertex(jobVertexId);
77+
}
4578
}
4679

4780
@Override
@@ -68,4 +101,31 @@ public int getMaxParallelism() {
68101
public SlotSharingGroup getSlotSharingGroup() {
69102
return slotSharingGroup;
70103
}
104+
105+
@Nullable
106+
@Override
107+
public TestingCoLocationGroup getCoLocationGroup() {
108+
return coLocationGroup;
109+
}
110+
111+
/** Testing util class. */
112+
public static class TestingCoLocationGroup extends CoLocationGroupImpl {
113+
private final List<JobVertexID> verticesIDs = new ArrayList<>();
114+
115+
public TestingCoLocationGroup(JobVertexID... verticesIDs) {
116+
super();
117+
if (verticesIDs != null && verticesIDs.length > 0) {
118+
this.verticesIDs.addAll(Arrays.stream(verticesIDs).collect(Collectors.toList()));
119+
}
120+
}
121+
122+
public void addVertex(JobVertexID vertexID) {
123+
this.verticesIDs.add(vertexID);
124+
}
125+
126+
@Override
127+
public List<JobVertexID> getVertexIds() {
128+
return verticesIDs;
129+
}
130+
}
71131
}

0 commit comments

Comments
 (0)