Skip to content

Commit ca20e1d

Browse files
RocMarshalWeiZhong94
authored andcommitted
[FLINK-33389][runtime] Support tasks balancing at slot level for Adaptive Scheduler
1 parent ae962a8 commit ca20e1d

File tree

3 files changed

+331
-3
lines changed

3 files changed

+331
-3
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.flink.runtime.scheduler;
2020

21-
import org.apache.flink.annotation.VisibleForTesting;
2221
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
2322
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
2423
import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
@@ -49,9 +48,8 @@ public void addVertex(final ExecutionVertexID executionVertexId) {
4948
executionVertexIds.add(executionVertexId);
5049
}
5150

52-
@VisibleForTesting
5351
@Nonnull
54-
SlotSharingGroup getSlotSharingGroup() {
52+
public SlotSharingGroup getSlotSharingGroup() {
5553
return slotSharingGroup;
5654
}
5755

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.scheduler.adaptive.allocator;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.runtime.jobgraph.JobVertexID;
23+
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
24+
import org.apache.flink.runtime.scheduler.TaskBalancedExecutionSlotSharingGroupBuilder;
25+
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
26+
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
27+
28+
import java.util.ArrayList;
29+
import java.util.HashMap;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.stream.Collectors;
33+
34+
/** The implementation of the {@link SlotSharingResolver} based on tasks balanced resolver. */
35+
@Internal
36+
public enum TaskBalancedSlotSharingResolver implements SlotSharingResolver {
37+
INSTANCE;
38+
39+
@Override
40+
public List<ExecutionSlotSharingGroup> getExecutionSlotSharingGroups(
41+
JobInformation jobInformation, VertexParallelism vertexParallelism) {
42+
return new TaskBalancedExecutionSlotSharingGroupBuilder(
43+
getAllVertices(jobInformation, vertexParallelism),
44+
jobInformation.getSlotSharingGroups(),
45+
jobInformation.getCoLocationGroups())
46+
.build().values().stream()
47+
.distinct()
48+
.map(
49+
fromGroup ->
50+
new ExecutionSlotSharingGroup(
51+
fromGroup.getSlotSharingGroup(),
52+
fromGroup.getExecutionVertexIds()))
53+
.collect(Collectors.toList());
54+
}
55+
56+
static Map<JobVertexID, List<ExecutionVertexID>> getAllVertices(
57+
JobInformation jobInformation, VertexParallelism vertexParallelism) {
58+
final Map<JobVertexID, List<ExecutionVertexID>> jobVertexToExecutionVertices =
59+
new HashMap<>();
60+
for (SlotSharingGroup slotSharingGroup : jobInformation.getSlotSharingGroups()) {
61+
slotSharingGroup
62+
.getJobVertexIds()
63+
.forEach(
64+
jobVertexId -> {
65+
int parallelism = vertexParallelism.getParallelism(jobVertexId);
66+
for (int subtaskIdx = 0; subtaskIdx < parallelism; subtaskIdx++) {
67+
jobVertexToExecutionVertices
68+
.computeIfAbsent(
69+
jobVertexId, ignored -> new ArrayList<>())
70+
.add(new ExecutionVertexID(jobVertexId, subtaskIdx));
71+
}
72+
});
73+
}
74+
return jobVertexToExecutionVertices;
75+
}
76+
}
Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.scheduler.adaptive;
20+
21+
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
22+
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
23+
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
24+
import org.apache.flink.runtime.scheduler.adaptive.allocator.FreeSlotFunction;
25+
import org.apache.flink.runtime.scheduler.adaptive.allocator.IsSlotAvailableAndFreeFunction;
26+
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
27+
import org.apache.flink.runtime.scheduler.adaptive.allocator.ReserveSlotFunction;
28+
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingResolver;
29+
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator;
30+
import org.apache.flink.runtime.scheduler.adaptive.allocator.TaskBalancedSlotSharingResolver;
31+
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestJobInformation;
32+
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestVertexInformation;
33+
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
34+
35+
import org.junit.jupiter.api.BeforeEach;
36+
import org.junit.jupiter.api.Test;
37+
38+
import java.util.Arrays;
39+
import java.util.Collection;
40+
import java.util.Collections;
41+
import java.util.stream.Collectors;
42+
43+
import static org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot.getSlots;
44+
import static org.assertj.core.api.Assertions.assertThat;
45+
46+
/** Test for {@link TaskBalancedSlotSharingResolver}. */
47+
class TaskBalancedSlotSharingResolverTest {
48+
49+
private static final IsSlotAvailableAndFreeFunction is_slot_free_function = ignored -> true;
50+
private static final FreeSlotFunction free_slot_function = (a, c, t) -> {};
51+
private static final ReserveSlotFunction reserve_slot_function =
52+
(allocationId, resourceProfile) ->
53+
TestingPhysicalSlot.builder()
54+
.withAllocationID(allocationId)
55+
.withResourceProfile(resourceProfile)
56+
.build();
57+
private static final boolean disable_local_recovery = false;
58+
private static final String NULL_EXECUTION_TARGET = null;
59+
private static final SlotSharingResolver slotSharingResolver =
60+
TaskBalancedSlotSharingResolver.INSTANCE;
61+
private static final SlotSharingSlotAllocator slotAllocator =
62+
SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
63+
reserve_slot_function,
64+
free_slot_function,
65+
is_slot_free_function,
66+
disable_local_recovery,
67+
NULL_EXECUTION_TARGET,
68+
false);
69+
70+
private SlotSharingGroup slotSharingGroup1;
71+
private SlotSharingGroup slotSharingGroup2;
72+
private TestVertexInformation.TestingCoLocationGroup coLocationGroup1;
73+
private TestVertexInformation.TestingCoLocationGroup coLocationGroup2;
74+
75+
@BeforeEach
76+
void setup() {
77+
slotSharingGroup1 = new SlotSharingGroup();
78+
slotSharingGroup2 = new SlotSharingGroup();
79+
coLocationGroup1 = new TestVertexInformation.TestingCoLocationGroup();
80+
coLocationGroup2 = new TestVertexInformation.TestingCoLocationGroup();
81+
}
82+
83+
@Test
84+
void testGetExecutionSlotSharingGroupsInOneSlotSharingGroup() {
85+
final JobInformation.VertexInformation vertex1 =
86+
new TestVertexInformation(1, slotSharingGroup1);
87+
final JobInformation.VertexInformation vertex2 =
88+
new TestVertexInformation(2, slotSharingGroup1);
89+
final JobInformation.VertexInformation vertex3 =
90+
new TestVertexInformation(3, slotSharingGroup1);
91+
final TestJobInformation testJobInformation =
92+
new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
93+
94+
final VertexParallelism vertexParallelism =
95+
getVertexParallelism(testJobInformation, getSlotsFor(vertex1, vertex2, vertex3));
96+
final Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroup>
97+
executionSlotSharingGroups =
98+
slotSharingResolver.getExecutionSlotSharingGroups(
99+
testJobInformation, vertexParallelism);
100+
101+
assertThat(executionSlotSharingGroups).hasSize(3);
102+
assertExecutionSlotSharingGroupsInSlotSharingGroupsExactlyAnyOrderOf(
103+
executionSlotSharingGroups, slotSharingGroup1);
104+
assertAscendingTasksPerExecutionSlotSharingGroupOfSlotSharingGroup(
105+
slotSharingGroup1, executionSlotSharingGroups, 2, 2, 2);
106+
}
107+
108+
@Test
109+
void testGetExecutionSlotSharingGroupsInOneSlotSharingGroupWithCoLocationGroup() {
110+
final JobInformation.VertexInformation vertex1 =
111+
new TestVertexInformation(1, slotSharingGroup1, coLocationGroup1);
112+
final JobInformation.VertexInformation vertex2 =
113+
new TestVertexInformation(2, slotSharingGroup1, coLocationGroup1);
114+
final JobInformation.VertexInformation vertex3 =
115+
new TestVertexInformation(3, slotSharingGroup1);
116+
final TestJobInformation testJobInformation =
117+
new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
118+
119+
final VertexParallelism vertexParallelism =
120+
getVertexParallelism(testJobInformation, getSlotsFor(vertex1, vertex2, vertex3));
121+
final Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroup>
122+
executionSlotSharingGroups =
123+
slotSharingResolver.getExecutionSlotSharingGroups(
124+
testJobInformation, vertexParallelism);
125+
126+
assertThat(executionSlotSharingGroups).hasSize(3);
127+
assertExecutionSlotSharingGroupsInSlotSharingGroupsExactlyAnyOrderOf(
128+
executionSlotSharingGroups, slotSharingGroup1);
129+
assertAscendingTasksPerExecutionSlotSharingGroupOfSlotSharingGroup(
130+
slotSharingGroup1, executionSlotSharingGroups, 1, 2, 3);
131+
}
132+
133+
@Test
134+
void testGetExecutionSlotSharingGroupsInMultiSlotSharingGroups() {
135+
final JobInformation.VertexInformation vertex1 =
136+
new TestVertexInformation(1, slotSharingGroup1);
137+
final JobInformation.VertexInformation vertex2 =
138+
new TestVertexInformation(2, slotSharingGroup1);
139+
final JobInformation.VertexInformation vertex3 =
140+
new TestVertexInformation(3, slotSharingGroup1);
141+
142+
final JobInformation.VertexInformation vertex4 =
143+
new TestVertexInformation(1, slotSharingGroup2);
144+
final JobInformation.VertexInformation vertex5 =
145+
new TestVertexInformation(2, slotSharingGroup2);
146+
147+
final TestJobInformation testJobInformation =
148+
new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3, vertex4, vertex5));
149+
150+
final VertexParallelism vertexParallelism =
151+
getVertexParallelism(
152+
testJobInformation,
153+
getSlotsFor(vertex1, vertex2, vertex3, vertex4, vertex5));
154+
final Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroup>
155+
executionSlotSharingGroups =
156+
slotSharingResolver.getExecutionSlotSharingGroups(
157+
testJobInformation, vertexParallelism);
158+
159+
assertThat(executionSlotSharingGroups).hasSize(5);
160+
assertExecutionSlotSharingGroupsInSlotSharingGroupsExactlyAnyOrderOf(
161+
executionSlotSharingGroups, slotSharingGroup1, slotSharingGroup2);
162+
assertAscendingTasksPerExecutionSlotSharingGroupOfSlotSharingGroup(
163+
slotSharingGroup1, executionSlotSharingGroups, 2, 2, 2);
164+
assertAscendingTasksPerExecutionSlotSharingGroupOfSlotSharingGroup(
165+
slotSharingGroup2, executionSlotSharingGroups, 1, 2);
166+
}
167+
168+
@Test
169+
void testGetExecutionSlotSharingGroupsInMultiSlotSharingGroupsWithCoLocationGroups() {
170+
final JobInformation.VertexInformation vertex1 =
171+
new TestVertexInformation(1, slotSharingGroup1, coLocationGroup1);
172+
final JobInformation.VertexInformation vertex2 =
173+
new TestVertexInformation(2, slotSharingGroup1, coLocationGroup1);
174+
final JobInformation.VertexInformation vertex3 =
175+
new TestVertexInformation(3, slotSharingGroup1);
176+
177+
final JobInformation.VertexInformation vertex4 =
178+
new TestVertexInformation(1, slotSharingGroup2, coLocationGroup2);
179+
final JobInformation.VertexInformation vertex5 =
180+
new TestVertexInformation(3, slotSharingGroup2);
181+
final JobInformation.VertexInformation vertex6 =
182+
new TestVertexInformation(2, slotSharingGroup2, coLocationGroup2);
183+
final JobInformation.VertexInformation vertex7 =
184+
new TestVertexInformation(1, slotSharingGroup2);
185+
186+
final TestJobInformation testJobInformation =
187+
new TestJobInformation(
188+
Arrays.asList(
189+
vertex1, vertex2, vertex3, vertex4, vertex5, vertex6, vertex7));
190+
191+
final VertexParallelism vertexParallelism =
192+
getVertexParallelism(
193+
testJobInformation,
194+
getSlotsFor(vertex1, vertex2, vertex3, vertex4, vertex5, vertex6, vertex7));
195+
final Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroup>
196+
executionSlotSharingGroups =
197+
slotSharingResolver.getExecutionSlotSharingGroups(
198+
testJobInformation, vertexParallelism);
199+
200+
assertThat(executionSlotSharingGroups).hasSize(6);
201+
assertExecutionSlotSharingGroupsInSlotSharingGroupsExactlyAnyOrderOf(
202+
executionSlotSharingGroups, slotSharingGroup1, slotSharingGroup2);
203+
assertAscendingTasksPerExecutionSlotSharingGroupOfSlotSharingGroup(
204+
slotSharingGroup1, executionSlotSharingGroups, 1, 2, 3);
205+
assertAscendingTasksPerExecutionSlotSharingGroupOfSlotSharingGroup(
206+
slotSharingGroup2, executionSlotSharingGroups, 2, 2, 3);
207+
}
208+
209+
private Collection<PhysicalSlot> getSlotsFor(
210+
JobInformation.VertexInformation... verticesInformation) {
211+
if (verticesInformation == null || verticesInformation.length < 1) {
212+
return Collections.emptyList();
213+
}
214+
int slots =
215+
Arrays.stream(verticesInformation)
216+
.map(JobInformation.VertexInformation::getParallelism)
217+
.reduce(0, Integer::sum);
218+
return getSlots(slots);
219+
}
220+
221+
private VertexParallelism getVertexParallelism(
222+
JobInformation jobInformation, Collection<PhysicalSlot> slots) {
223+
return slotAllocator
224+
.determineParallelism(jobInformation, slots)
225+
.orElse(VertexParallelism.empty());
226+
}
227+
228+
private static void assertExecutionSlotSharingGroupsInSlotSharingGroupsExactlyAnyOrderOf(
229+
Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroup>
230+
executionSlotSharingGroups,
231+
SlotSharingGroup... slotSharingGroups) {
232+
assertThat(
233+
executionSlotSharingGroups.stream()
234+
.map(
235+
SlotSharingSlotAllocator.ExecutionSlotSharingGroup
236+
::getSlotSharingGroup)
237+
.collect(Collectors.toSet()))
238+
.containsExactlyInAnyOrder(slotSharingGroups);
239+
}
240+
241+
private static void assertAscendingTasksPerExecutionSlotSharingGroupOfSlotSharingGroup(
242+
SlotSharingGroup slotSharingGroup,
243+
Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroup>
244+
executionSlotSharingGroups,
245+
Integer... numbers) {
246+
assertThat(
247+
executionSlotSharingGroups.stream()
248+
.filter(essg -> essg.getSlotSharingGroup().equals(slotSharingGroup))
249+
.map(essg -> essg.getContainedExecutionVertices().size())
250+
.sorted()
251+
.collect(Collectors.toList()))
252+
.containsExactly(numbers);
253+
}
254+
}

0 commit comments

Comments
 (0)