Skip to content

Commit 6697735

Browse files
authored
Merge pull request #8778 from mwkang/8590-even-rebalance-on-idle-supervisor
EvenScheduler: opt-in round-robin rebalance onto returning idle supervisors
2 parents c5680cc + e9a741e commit 6697735

8 files changed

Lines changed: 1006 additions & 4 deletions

File tree

conf/defaults.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ topology.max.replication.wait.time.sec: 60
9494
nimbus.credential.renewers.freq.secs: 600
9595
nimbus.queue.size: 100000
9696
scheduler.display.resource: false
97+
nimbus.even.rebalance.idle.supervisor.enabled: false
98+
nimbus.even.rebalance.max.free.per.topology: 0
99+
nimbus.even.rebalance.idle.supervisor.min.stable.rounds: 3
97100
nimbus.local.assignments.backend.class: "org.apache.storm.assignments.InMemoryAssignmentBackend"
98101
nimbus.assignments.service.threads: 10
99102
nimbus.assignments.service.thread.queue.size: 100

storm-server/src/main/java/org/apache/storm/DaemonConfig.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,37 @@ public class DaemonConfig implements Validated {
175175
@IsBoolean
176176
public static final String SCHEDULER_DISPLAY_RESOURCE = "scheduler.display.resource";
177177

178+
/**
179+
* If true, {@link org.apache.storm.scheduler.EvenScheduler} may move already-assigned workers onto non-blacklisted supervisors
180+
* with no slot in use. This lets a freshly returned supervisor pick up workers instead of staying idle. The number of workers
181+
* freed per topology in a single scheduling round is capped by {@link #NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY}, so even
182+
* distribution is approached gradually rather than rebuilt from scratch.
183+
*/
184+
@IsBoolean
185+
public static final String NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED
186+
= "nimbus.even.rebalance.idle.supervisor.enabled";
187+
188+
/**
189+
* Optional upper bound on the number of currently-assigned workers a single topology may release in one scheduling round
190+
* when the idle-supervisor rebalance defined by {@link #NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED} kicks in. The
191+
* default budget already targets an even per-supervisor distribution (idle supervisors absorb roughly {@code numWorkers /
192+
* numSupervisors} workers each in one round), capped by the idle side's free slot capacity. Setting this to a positive
193+
* value tightens that budget; setting it to {@code 0} or a negative value leaves the even-distribution budget unbounded.
194+
*/
195+
@IsInteger
196+
public static final String NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY
197+
= "nimbus.even.rebalance.max.free.per.topology";
198+
199+
/**
200+
* Minimum number of consecutive supervisor monitor rounds that a fully-idle supervisor must have been alive before
201+
* {@link org.apache.storm.scheduler.EvenScheduler} can relocate workers onto it. A positive value avoids moving workers onto a
202+
* supervisor that has only just returned and may still be flapping. Setting this to {@code 0} or a negative value disables the
203+
* uptime guard.
204+
*/
205+
@IsInteger
206+
public static final String NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS
207+
= "nimbus.even.rebalance.idle.supervisor.min.stable.rounds";
208+
178209
/**
179210
* The directory where storm's health scripts go.
180211
*/

storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -985,11 +985,20 @@ private static Map<String, SupervisorDetails> basicSupervisorDetailsMap(IStormCl
985985
String id = entry.getKey();
986986
SupervisorInfo info = entry.getValue();
987987
ret.put(id, new SupervisorDetails(id, info.get_server_port(), info.get_hostname(),
988-
info.get_scheduler_meta(), null, info.get_resources_map()));
988+
info.get_scheduler_meta(), null, info.get_resources_map(),
989+
supervisorUptimeSecs(info)));
989990
}
990991
return ret;
991992
}
992993

994+
private static long supervisorUptimeSecs(SupervisorInfo info) {
995+
// An unset uptime maps to 0L (not the Long.MAX_VALUE default the bare SupervisorDetails constructors use) so a
996+
// freshly (re)registered supervisor is treated as just-returned and must accrue real uptime before
997+
// Cluster#hasMinimumIdleSupervisorStability lets the idle rebalance place workers on it -- the conservative
998+
// choice on the production path.
999+
return info.is_set_uptime_secs() ? info.get_uptime_secs() : 0L;
1000+
}
1001+
9931002
/**
9941003
* NOTE: this can return false when a topology has just been activated. The topology may still be
9951004
* in the STORMS_SUBTREE.
@@ -2273,7 +2282,8 @@ private Map<String, SupervisorDetails> readAllSupervisorDetails(Map<String, Set<
22732282
List<SupervisorDetails> superDetails = new ArrayList<>();
22742283
for (Entry<String, SupervisorInfo> entry : superInfos.entrySet()) {
22752284
SupervisorInfo info = entry.getValue();
2276-
superDetails.add(new SupervisorDetails(entry.getKey(), info.get_meta(), info.get_resources_map()));
2285+
superDetails.add(new SupervisorDetails(entry.getKey(), info.get_meta(), info.get_resources_map(),
2286+
supervisorUptimeSecs(info)));
22772287
}
22782288
// Note that allSlotsAvailableForScheduling
22792289
// only uses the supervisor-details. The rest of the arguments
@@ -2306,7 +2316,7 @@ private Map<String, SupervisorDetails> readAllSupervisorDetails(Map<String, Set<
23062316
allPorts.removeAll(deadPorts);
23072317
}
23082318
ret.put(superId, new SupervisorDetails(superId, hostname, info.get_scheduler_meta(),
2309-
allPorts, info.get_resources_map()));
2319+
allPorts, info.get_resources_map(), supervisorUptimeSecs(info)));
23102320
}
23112321
return ret;
23122322
}
@@ -5526,4 +5536,3 @@ public void run() {
55265536
}
55275537
}
55285538
}
5529-

storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,42 @@ public boolean needsScheduling(TopologyDetails topology) {
363363
return desiredNumWorkers > assignedNumWorkers || getUnassignedExecutors(topology).size() > 0;
364364
}
365365

366+
/**
367+
* Returns true when {@code supervisor} is a stable, non-blacklisted supervisor whose slots are all currently free --
368+
* i.e. a returning idle supervisor the {@link EvenScheduler} idle-rebalance pass may relocate workers onto. The check
369+
* is binary by design -- a supervisor either has zero used slots or it does not -- so the rebalance never fires for an
370+
* "almost balanced" cluster. Stability is gated by {@link #hasMinimumIdleSupervisorStability(SupervisorDetails)} so a
371+
* supervisor that has only just returned (and may still be flapping) is held back until it has been up long enough. The
372+
* opt-in {@link DaemonConfig#NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED} flag is checked once by the caller
373+
* ({@link EvenScheduler#redistributeOntoIdleSupervisors(Topologies, Cluster)}), not here.
374+
*/
375+
public boolean isIdleSupervisorAvailableForEvenRebalance(SupervisorDetails supervisor) {
376+
if (supervisor == null) {
377+
return false;
378+
}
379+
if (isBlackListed(supervisor.getId())) {
380+
return false;
381+
}
382+
if (supervisor.getAllPorts().isEmpty()) {
383+
return false;
384+
}
385+
if (!getUsedPorts(supervisor).isEmpty()) {
386+
return false;
387+
}
388+
return hasMinimumIdleSupervisorStability(supervisor);
389+
}
390+
391+
private boolean hasMinimumIdleSupervisorStability(SupervisorDetails supervisor) {
392+
int minStableRounds = ObjectReader.getInt(
393+
conf.get(DaemonConfig.NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS), 3);
394+
if (minStableRounds <= 0) {
395+
return true;
396+
}
397+
int monitorFrequencySecs = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS), 3);
398+
long requiredUptimeSecs = (long) minStableRounds * Math.max(1, monitorFrequencySecs);
399+
return supervisor.getUptimeSecs() >= requiredUptimeSecs;
400+
}
401+
366402
@Override
367403
public boolean needsSchedulingRas(TopologyDetails topology) {
368404
return getUnassignedExecutors(topology).size() > 0;

storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,16 @@ public static Set<WorkerSlot> slotsCanReassign(Cluster cluster, Set<WorkerSlot>
7272
}
7373

7474
public static void defaultSchedule(Topologies topologies, Cluster cluster) {
75+
EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster);
7576
for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
77+
// needsSchedulingTopologies() returns the cluster's full topology set, but this run is scoped to the
78+
// topologies passed in: DefaultScheduler.schedule passes the full set (so the guard is a no-op), while
79+
// IsolationScheduler delegates only its leftover, non-isolated topologies here. redistributeOntoIdleSupervisors
80+
// above acted only on that passed-in set too. Skip topologies outside it so the leftover path never schedules
81+
// one the caller excluded -- e.g. a down isolated topology on a reserved host.
82+
if (topologies.getById(topology.getId()) == null) {
83+
continue;
84+
}
7685
List<WorkerSlot> availableSlots = cluster.getAvailableSlots();
7786
Set<ExecutorDetails> allExecutors = topology.getExecutors();
7887

0 commit comments

Comments
 (0)