Skip to content

Commit 32b8e5c

Browse files
authored
Merge pull request #8709 from meetjain74/patch-2
Isolation Scheduler Hosts Sorting | Add secondary sort (No of free slots) and tertiary sort (Hostname)
2 parents 4907e6e + 6d0eddd commit 32b8e5c

2 files changed

Lines changed: 146 additions & 4 deletions

File tree

storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -303,10 +303,16 @@ private Map<String, Set<WorkerSlot>> hostToUsedSlots(Cluster cluster) {
303303
return hostUsedSlots;
304304
}
305305

306-
// returns list of list of slots, reverse sorted by number of slots
306+
// Returns list of hosts with their assignable slots, sorted by:
307+
// 1. (primary) total assignable slots, descending — same as before
308+
// 2. (secondary) currently free slots, descending — prefer hosts that
309+
// need fewer evictions
310+
// 3. (tertiary) host name, ascending — deterministic order
311+
// for testability
307312
private LinkedList<HostAssignableSlots> hostAssignableSlots(Cluster cluster) {
308313
List<WorkerSlot> assignableSlots = cluster.getAssignableSlots();
309314
Map<String, List<WorkerSlot>> hostAssignableSlots = new HashMap<String, List<WorkerSlot>>();
315+
Map<String, Integer> hostFreeSlotCounts = new HashMap<String, Integer>();
310316
for (WorkerSlot slot : assignableSlots) {
311317
String host = cluster.getHost(slot.getNodeId());
312318
List<WorkerSlot> slots = hostAssignableSlots.get(host);
@@ -315,15 +321,31 @@ private LinkedList<HostAssignableSlots> hostAssignableSlots(Cluster cluster) {
315321
hostAssignableSlots.put(host, slots);
316322
}
317323
slots.add(slot);
324+
if (!cluster.isSlotOccupied(slot)) {
325+
Integer count = hostFreeSlotCounts.get(host);
326+
hostFreeSlotCounts.put(host, count == null ? 1 : count + 1);
327+
}
318328
}
319329
List<HostAssignableSlots> sortHostAssignSlots = new ArrayList<HostAssignableSlots>();
320330
for (Map.Entry<String, List<WorkerSlot>> entry : hostAssignableSlots.entrySet()) {
321-
sortHostAssignSlots.add(new HostAssignableSlots(entry.getKey(), entry.getValue()));
331+
Integer free = hostFreeSlotCounts.get(entry.getKey());
332+
sortHostAssignSlots.add(new HostAssignableSlots(entry.getKey(), entry.getValue(),
333+
free != null ? free.intValue() : 0));
322334
}
323335
Collections.sort(sortHostAssignSlots, new Comparator<HostAssignableSlots>() {
324336
@Override
325337
public int compare(HostAssignableSlots o1, HostAssignableSlots o2) {
326-
return o2.getWorkerSlots().size() - o1.getWorkerSlots().size();
338+
int bySlots = o2.getWorkerSlots().size() - o1.getWorkerSlots().size();
339+
if (bySlots != 0) {
340+
return bySlots;
341+
}
342+
343+
int byFree = o2.getFreeSlots() - o1.getFreeSlots();
344+
if (byFree != 0) {
345+
return byFree;
346+
}
347+
348+
return o1.getHostName().compareTo(o2.getHostName());
327349
}
328350
});
329351

@@ -400,10 +422,12 @@ public Set<ExecutorDetails> getExecutors() {
400422
class HostAssignableSlots {
401423
private String hostName;
402424
private List<WorkerSlot> workerSlots;
425+
private final int freeSlots;
403426

404-
HostAssignableSlots(String hostName, List<WorkerSlot> workerSlots) {
427+
HostAssignableSlots(String hostName, List<WorkerSlot> workerSlots, int freeSlots) {
405428
this.hostName = hostName;
406429
this.workerSlots = workerSlots;
430+
this.freeSlots = freeSlots;
407431
}
408432

409433
public String getHostName() {
@@ -414,5 +438,9 @@ public List<WorkerSlot> getWorkerSlots() {
414438
return workerSlots;
415439
}
416440

441+
public int getFreeSlots() {
442+
return freeSlots;
443+
}
444+
417445
}
418446
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
3+
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
4+
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
10+
* and limitations under the License.
11+
*/
12+
13+
package org.apache.storm.scheduler;
14+
15+
import java.lang.reflect.Method;
16+
import java.util.ArrayList;
17+
import java.util.Collections;
18+
import java.util.HashMap;
19+
import java.util.LinkedList;
20+
import java.util.List;
21+
import java.util.Map;
22+
import org.apache.storm.Constants;
23+
import org.apache.storm.metric.StormMetricsRegistry;
24+
import org.apache.storm.scheduler.blacklist.TestUtilsForBlacklistScheduler;
25+
import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
26+
import org.junit.jupiter.api.Test;
27+
28+
import static org.junit.jupiter.api.Assertions.assertEquals;
29+
30+
/**
31+
* Unit tests for {@link IsolationScheduler}.
32+
*/
33+
public class IsolationSchedulerTest {
34+
35+
private static SupervisorDetails mkSupervisor(String id, String host, int numPorts) {
36+
List<Number> ports = new ArrayList<>();
37+
for (int i = 0; i < numPorts; i++) {
38+
ports.add(i);
39+
}
40+
Map<String, Double> resources = new HashMap<>();
41+
resources.put(Constants.COMMON_CPU_RESOURCE_NAME, 400.0);
42+
resources.put(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 4096.0);
43+
return new SupervisorDetails(id, host, null, ports, resources);
44+
}
45+
46+
private static Cluster mkCluster(Map<String, SupervisorDetails> supervisors, Topologies topologies) {
47+
INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
48+
ResourceMetrics resourceMetrics = new ResourceMetrics(new StormMetricsRegistry());
49+
return new Cluster(iNimbus, resourceMetrics, supervisors, new HashMap<String, SchedulerAssignmentImpl>(),
50+
topologies, new HashMap<String, Object>());
51+
}
52+
53+
@SuppressWarnings("unchecked")
54+
private static LinkedList<IsolationScheduler.HostAssignableSlots> hostAssignableSlots(
55+
IsolationScheduler scheduler, Cluster cluster) throws Exception {
56+
Method method = IsolationScheduler.class.getDeclaredMethod("hostAssignableSlots", Cluster.class);
57+
method.setAccessible(true);
58+
return (LinkedList<IsolationScheduler.HostAssignableSlots>) method.invoke(scheduler, cluster);
59+
}
60+
61+
private static List<String> hostOrder(LinkedList<IsolationScheduler.HostAssignableSlots> slots) {
62+
List<String> hosts = new ArrayList<>();
63+
for (IsolationScheduler.HostAssignableSlots slot : slots) {
64+
hosts.add(slot.getHostName());
65+
}
66+
return hosts;
67+
}
68+
69+
@Test
70+
public void hostAssignableSlots_prefersHostWithMoreFreeSlots() throws Exception {
71+
Map<String, SupervisorDetails> supervisors = new HashMap<>();
72+
supervisors.put("sup-busy", mkSupervisor("sup-busy", "host-busy", 2));
73+
supervisors.put("sup-free", mkSupervisor("sup-free", "host-free", 2));
74+
75+
Map<String, Object> conf = new HashMap<>();
76+
TopologyDetails filler = TestUtilsForBlacklistScheduler.getTopology("filler", conf, 1, 0, 1, 0, 0, false);
77+
Map<String, TopologyDetails> topoMap = new HashMap<>();
78+
topoMap.put(filler.getId(), filler);
79+
Topologies topologies = new Topologies(topoMap);
80+
81+
Cluster cluster = mkCluster(supervisors, topologies);
82+
cluster.assign(new WorkerSlot("sup-busy", 0), filler.getId(),
83+
Collections.singletonList(filler.getExecutors().iterator().next()));
84+
85+
LinkedList<IsolationScheduler.HostAssignableSlots> ranked =
86+
hostAssignableSlots(new IsolationScheduler(), cluster);
87+
88+
assertEquals(2, ranked.size());
89+
assertEquals("host-free", ranked.get(0).getHostName());
90+
assertEquals(2, ranked.get(0).getFreeSlots());
91+
assertEquals("host-busy", ranked.get(1).getHostName());
92+
assertEquals(1, ranked.get(1).getFreeSlots());
93+
assertEquals(hostOrder(ranked), List.of("host-free", "host-busy"));
94+
}
95+
96+
@Test
97+
public void hostAssignableSlots_breaksTiesByHostName() throws Exception {
98+
Map<String, SupervisorDetails> supervisors = new HashMap<>();
99+
supervisors.put("sup-a", mkSupervisor("sup-a", "host-aaa", 2));
100+
supervisors.put("sup-b", mkSupervisor("sup-b", "host-bbb", 2));
101+
102+
Cluster cluster = mkCluster(supervisors, new Topologies());
103+
104+
LinkedList<IsolationScheduler.HostAssignableSlots> ranked =
105+
hostAssignableSlots(new IsolationScheduler(), cluster);
106+
107+
assertEquals(2, ranked.size());
108+
assertEquals(2, ranked.get(0).getWorkerSlots().size());
109+
assertEquals(2, ranked.get(1).getWorkerSlots().size());
110+
assertEquals(2, ranked.get(0).getFreeSlots());
111+
assertEquals(2, ranked.get(1).getFreeSlots());
112+
assertEquals(hostOrder(ranked), List.of("host-aaa", "host-bbb"));
113+
}
114+
}

0 commit comments

Comments
 (0)