1+ /**
2+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
3+ * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
4+ * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
5+ *
6+ * http://www.apache.org/licenses/LICENSE-2.0
7+ *
8+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
9+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
10+ * and limitations under the License.
11+ */
12+
13+ package org .apache .storm .scheduler ;
14+
15+ import java .lang .reflect .Method ;
16+ import java .util .ArrayList ;
17+ import java .util .Collections ;
18+ import java .util .HashMap ;
19+ import java .util .LinkedList ;
20+ import java .util .List ;
21+ import java .util .Map ;
22+ import org .apache .storm .Constants ;
23+ import org .apache .storm .metric .StormMetricsRegistry ;
24+ import org .apache .storm .scheduler .blacklist .TestUtilsForBlacklistScheduler ;
25+ import org .apache .storm .scheduler .resource .normalization .ResourceMetrics ;
26+ import org .junit .jupiter .api .Test ;
27+
28+ import static org .junit .jupiter .api .Assertions .assertEquals ;
29+
30+ /**
31+ * Unit tests for {@link IsolationScheduler}.
32+ */
33+ public class IsolationSchedulerTest {
34+
35+ private static SupervisorDetails mkSupervisor (String id , String host , int numPorts ) {
36+ List <Number > ports = new ArrayList <>();
37+ for (int i = 0 ; i < numPorts ; i ++) {
38+ ports .add (i );
39+ }
40+ Map <String , Double > resources = new HashMap <>();
41+ resources .put (Constants .COMMON_CPU_RESOURCE_NAME , 400.0 );
42+ resources .put (Constants .COMMON_TOTAL_MEMORY_RESOURCE_NAME , 4096.0 );
43+ return new SupervisorDetails (id , host , null , ports , resources );
44+ }
45+
46+ private static Cluster mkCluster (Map <String , SupervisorDetails > supervisors , Topologies topologies ) {
47+ INimbus iNimbus = new TestUtilsForBlacklistScheduler .INimbusTest ();
48+ ResourceMetrics resourceMetrics = new ResourceMetrics (new StormMetricsRegistry ());
49+ return new Cluster (iNimbus , resourceMetrics , supervisors , new HashMap <String , SchedulerAssignmentImpl >(),
50+ topologies , new HashMap <String , Object >());
51+ }
52+
53+ @ SuppressWarnings ("unchecked" )
54+ private static LinkedList <IsolationScheduler .HostAssignableSlots > hostAssignableSlots (
55+ IsolationScheduler scheduler , Cluster cluster ) throws Exception {
56+ Method method = IsolationScheduler .class .getDeclaredMethod ("hostAssignableSlots" , Cluster .class );
57+ method .setAccessible (true );
58+ return (LinkedList <IsolationScheduler .HostAssignableSlots >) method .invoke (scheduler , cluster );
59+ }
60+
61+ private static List <String > hostOrder (LinkedList <IsolationScheduler .HostAssignableSlots > slots ) {
62+ List <String > hosts = new ArrayList <>();
63+ for (IsolationScheduler .HostAssignableSlots slot : slots ) {
64+ hosts .add (slot .getHostName ());
65+ }
66+ return hosts ;
67+ }
68+
69+ @ Test
70+ public void hostAssignableSlots_prefersHostWithMoreFreeSlots () throws Exception {
71+ Map <String , SupervisorDetails > supervisors = new HashMap <>();
72+ supervisors .put ("sup-busy" , mkSupervisor ("sup-busy" , "host-busy" , 2 ));
73+ supervisors .put ("sup-free" , mkSupervisor ("sup-free" , "host-free" , 2 ));
74+
75+ Map <String , Object > conf = new HashMap <>();
76+ TopologyDetails filler = TestUtilsForBlacklistScheduler .getTopology ("filler" , conf , 1 , 0 , 1 , 0 , 0 , false );
77+ Map <String , TopologyDetails > topoMap = new HashMap <>();
78+ topoMap .put (filler .getId (), filler );
79+ Topologies topologies = new Topologies (topoMap );
80+
81+ Cluster cluster = mkCluster (supervisors , topologies );
82+ cluster .assign (new WorkerSlot ("sup-busy" , 0 ), filler .getId (),
83+ Collections .singletonList (filler .getExecutors ().iterator ().next ()));
84+
85+ LinkedList <IsolationScheduler .HostAssignableSlots > ranked =
86+ hostAssignableSlots (new IsolationScheduler (), cluster );
87+
88+ assertEquals (2 , ranked .size ());
89+ assertEquals ("host-free" , ranked .get (0 ).getHostName ());
90+ assertEquals (2 , ranked .get (0 ).getFreeSlots ());
91+ assertEquals ("host-busy" , ranked .get (1 ).getHostName ());
92+ assertEquals (1 , ranked .get (1 ).getFreeSlots ());
93+ assertEquals (hostOrder (ranked ), List .of ("host-free" , "host-busy" ));
94+ }
95+
96+ @ Test
97+ public void hostAssignableSlots_breaksTiesByHostName () throws Exception {
98+ Map <String , SupervisorDetails > supervisors = new HashMap <>();
99+ supervisors .put ("sup-a" , mkSupervisor ("sup-a" , "host-aaa" , 2 ));
100+ supervisors .put ("sup-b" , mkSupervisor ("sup-b" , "host-bbb" , 2 ));
101+
102+ Cluster cluster = mkCluster (supervisors , new Topologies ());
103+
104+ LinkedList <IsolationScheduler .HostAssignableSlots > ranked =
105+ hostAssignableSlots (new IsolationScheduler (), cluster );
106+
107+ assertEquals (2 , ranked .size ());
108+ assertEquals (2 , ranked .get (0 ).getWorkerSlots ().size ());
109+ assertEquals (2 , ranked .get (1 ).getWorkerSlots ().size ());
110+ assertEquals (2 , ranked .get (0 ).getFreeSlots ());
111+ assertEquals (2 , ranked .get (1 ).getFreeSlots ());
112+ assertEquals (hostOrder (ranked ), List .of ("host-aaa" , "host-bbb" ));
113+ }
114+ }
0 commit comments