Skip to content

Commit 8dc699a

Browse files
committed
[FLINK-33391][runtime] Support tasks balancing at TM level for Adaptive Scheduler.
1 parent bc84294 commit 8dc699a

File tree

1 file changed

+5
-2
lines changed

1 file changed

+5
-2
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ public Optional<ReservedSlots> tryReserveResources(JobSchedulingPlan jobScheduli
296296
final Map<ExecutionVertexID, LogicalSlot> assignedSlots = new HashMap<>();
297297

298298
for (SlotAssignment assignment : jobSchedulingPlan.getSlotAssignments()) {
299-
final SharedSlot sharedSlot = reserveSharedSlot(assignment.getSlotInfo());
299+
final SharedSlot sharedSlot = reserveSharedSlot(assignment);
300300
for (ExecutionVertexID executionVertexId :
301301
assignment
302302
.getTargetAs(ExecutionSlotSharingGroup.class)
@@ -332,7 +332,10 @@ private boolean areAllExpectedSlotsAvailableAndFree(
332332
return true;
333333
}
334334

335-
private SharedSlot reserveSharedSlot(SlotInfo slotInfo) {
335+
private SharedSlot reserveSharedSlot(SlotAssignment slotAssignment) {
336+
final SlotInfo slotInfo = slotAssignment.getSlotInfo();
337+
final ExecutionSlotSharingGroup requestedGroup =
338+
slotAssignment.getTargetAs(ExecutionSlotSharingGroup.class);
336339
final PhysicalSlot physicalSlot =
337340
reserveSlotFunction.reserveSlot(
338341
slotInfo.getAllocationId(), ResourceProfile.UNKNOWN);

0 commit comments

Comments
 (0)