Skip to content

Commit f69e02a

Browse files
committed
Rename scheduler managers to controllers
1 parent a88368d commit f69e02a

File tree

13 files changed

+195
-190
lines changed

13 files changed

+195
-190
lines changed

scaler/io/ymq/ymq.pyi

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ class Message:
2222
address: Bytes
2323
payload: Bytes
2424

25-
def __init__(self, address: Bytes | bytes | SupportsBytes | None, payload: Bytes | bytes | SupportsBytes) -> None: ...
25+
def __init__(
26+
self, address: Bytes | bytes | SupportsBytes | None, payload: Bytes | bytes | SupportsBytes
27+
) -> None: ...
2628
def __repr__(self) -> str: ...
2729
def __str__(self) -> str: ...
2830

@@ -38,32 +40,35 @@ class IOContext:
3840

3941
def __init__(self, num_threads: int = 1) -> None: ...
4042
def __repr__(self) -> str: ...
41-
4243
def createIOSocket(self, /, identity: str, socket_type: IOSocketType) -> Awaitable[IOSocket]:
4344
"""Create an io socket with an identity and socket type"""
4445

45-
4646
class IOSocket:
4747
identity: str
4848
socket_type: IOSocketType
4949

5050
def __repr__(self) -> str: ...
51-
5251
async def send(self, message: Message) -> None:
5352
"""Send a message to one of the socket's peers"""
53+
5454
async def recv(self) -> Message:
5555
"""Receive a message from one of the socket's peers"""
56+
5657
async def bind(self, address: str) -> None:
5758
"""Bind the socket to an address and listen for incoming connections"""
59+
5860
async def connect(self, address: str) -> None:
5961
"""Connect to a remote socket"""
6062

6163
def send_sync(self, message: Message) -> None:
6264
"""Send a message to one of the socket's peers synchronously"""
65+
6366
def recv_sync(self) -> Message:
6467
"""Receive a message from one of the socket's peers synchronously"""
68+
6569
def bind_sync(self, address: str) -> None:
6670
"""Bind the socket to an address and listen for incoming connections synchronously"""
71+
6772
def connect_sync(self, address: str) -> None:
6873
"""Connect to a remote socket synchronously"""
6974

scaler/scheduler/managers/balance_manager.py renamed to scaler/scheduler/controllers/balance_controller.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@
55
from scaler.io.async_connector import AsyncConnector
66
from scaler.protocol.python.message import StateBalanceAdvice
77
from scaler.scheduler.allocate_policy.mixins import TaskAllocatePolicy
8-
from scaler.scheduler.managers.mixins import TaskManager
8+
from scaler.scheduler.controllers.mixins import TaskController
99
from scaler.utility.identifiers import WorkerID, TaskID
1010
from scaler.utility.mixins import Looper
1111

1212

13-
class VanillaBalanceManager(Looper):
13+
class VanillaBalanceController(Looper):
1414
def __init__(self, load_balance_trigger_times: int, task_allocate_policy: TaskAllocatePolicy):
1515
self._load_balance_trigger_times = load_balance_trigger_times
1616

@@ -22,13 +22,13 @@ def __init__(self, load_balance_trigger_times: int, task_allocate_policy: TaskAl
2222
self._binder: Optional[AsyncBinder] = None
2323
self._binder_monitor: Optional[AsyncConnector] = None
2424

25-
self._task_manager: Optional[TaskManager] = None
25+
self._task_controller: Optional[TaskController] = None
2626

27-
def register(self, binder: AsyncBinder, binder_monitor: AsyncConnector, task_manager: TaskManager):
27+
def register(self, binder: AsyncBinder, binder_monitor: AsyncConnector, task_controller: TaskController):
2828
self._binder = binder
2929
self._binder_monitor = binder_monitor
3030

31-
self._task_manager = task_manager
31+
self._task_controller = task_controller
3232

3333
async def routine(self):
3434
current_advice = self._task_allocate_policy.balance()

scaler/scheduler/managers/client_manager.py renamed to scaler/scheduler/controllers/client_controller.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@
1313
TaskCancel,
1414
)
1515
from scaler.protocol.python.status import ClientManagerStatus
16-
from scaler.scheduler.managers.mixins import ClientManager, ObjectManager, TaskManager, WorkerManager
16+
from scaler.scheduler.controllers.mixins import ClientController, ObjectController, TaskController, WorkerController
1717
from scaler.utility.exceptions import ClientShutdownException
1818
from scaler.utility.identifiers import ClientID, TaskID
1919
from scaler.utility.mixins import Looper, Reporter
2020
from scaler.utility.one_to_many_dict import OneToManyDict
2121

2222

23-
class VanillaClientManager(ClientManager, Looper, Reporter):
23+
class VanillaClientController(ClientController, Looper, Reporter):
2424
def __init__(self, client_timeout_seconds: int, protected: bool, storage_address: ObjectStorageAddress):
2525
self._client_timeout_seconds = client_timeout_seconds
2626
self._protected = protected
@@ -30,25 +30,25 @@ def __init__(self, client_timeout_seconds: int, protected: bool, storage_address
3030

3131
self._binder: Optional[AsyncBinder] = None
3232
self._binder_monitor: Optional[AsyncConnector] = None
33-
self._object_manager: Optional[ObjectManager] = None
34-
self._task_manager: Optional[TaskManager] = None
35-
self._worker_manager: Optional[WorkerManager] = None
33+
self._object_controller: Optional[ObjectController] = None
34+
self._task_controller: Optional[TaskController] = None
35+
self._worker_controller: Optional[WorkerController] = None
3636

3737
self._client_last_seen: Dict[ClientID, Tuple[float, ClientHeartbeat]] = dict()
3838

3939
def register(
4040
self,
4141
binder: AsyncBinder,
4242
binder_monitor: AsyncConnector,
43-
object_manager: ObjectManager,
44-
task_manager: TaskManager,
45-
worker_manager: WorkerManager,
43+
object_controller: ObjectController,
44+
task_controller: TaskController,
45+
worker_controller: WorkerController,
4646
):
4747
self._binder = binder
4848
self._binder_monitor = binder_monitor
49-
self._object_manager = object_manager
50-
self._task_manager = task_manager
51-
self._worker_manager = worker_manager
49+
self._object_controller = object_controller
50+
self._task_controller = task_controller
51+
self._worker_controller = worker_controller
5252

5353
def get_client_task_ids(self, client_id: ClientID) -> Set[TaskID]:
5454
return self._client_to_task_ids.get_values(client_id)
@@ -89,7 +89,7 @@ async def on_client_disconnect(self, client_id: ClientID, request: ClientDisconn
8989
if self._protected:
9090
return
9191

92-
await self._worker_manager.on_client_shutdown(client_id)
92+
await self._worker_controller.on_client_shutdown(client_id)
9393

9494
raise ClientShutdownException(f"received client shutdown from {client_id!r}, quitting")
9595

@@ -118,12 +118,12 @@ async def __on_client_disconnect(self, client_id: ClientID):
118118
self._client_last_seen.pop(client_id)
119119

120120
await self.__cancel_tasks(client_id)
121-
self._object_manager.clean_client(client_id)
121+
self._object_controller.clean_client(client_id)
122122

123123
async def __cancel_tasks(self, client_id: ClientID):
124124
if client_id not in self._client_to_task_ids.keys():
125125
return
126126

127127
tasks = self._client_to_task_ids.get_values(client_id).copy()
128128
for task in tasks:
129-
await self._task_manager.on_task_cancel(client_id, TaskCancel.new_msg(task))
129+
await self._task_controller.on_task_cancel(client_id, TaskCancel.new_msg(task))

scaler/scheduler/managers/graph_manager.py renamed to scaler/scheduler/controllers/graph_controller.py

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from scaler.io.async_object_storage_connector import AsyncObjectStorageConnector
1010
from scaler.protocol.python.common import ObjectMetadata, TaskStatus
1111
from scaler.protocol.python.message import GraphTask, GraphTaskCancel, StateGraphTask, Task, TaskCancel, TaskResult
12-
from scaler.scheduler.managers.mixins import ClientManager, GraphTaskManager, ObjectManager, TaskManager
12+
from scaler.scheduler.controllers.mixins import ClientController, GraphTaskController, ObjectController, TaskController
1313
from scaler.utility.graph.topological_sorter import TopologicalSorter
1414
from scaler.utility.identifiers import ClientID, ObjectID, TaskID
1515
from scaler.utility.many_to_many_dict import ManyToManyDict
@@ -47,7 +47,7 @@ class _Graph:
4747
running_task_ids: Set[TaskID] = dataclasses.field(default_factory=set)
4848

4949

50-
class VanillaGraphTaskManager(GraphTaskManager, Looper, Reporter):
50+
class VanillaGraphTaskController(GraphTaskController, Looper, Reporter):
5151
"""
5252
A = func()
5353
B = func2(A)
@@ -73,9 +73,9 @@ def __init__(self):
7373
self._binder_monitor: Optional[AsyncConnector] = None
7474
self._connector_storage: Optional[AsyncObjectStorageConnector] = None
7575

76-
self._client_manager: Optional[ClientManager] = None
77-
self._task_manager: Optional[TaskManager] = None
78-
self._object_manager: Optional[ObjectManager] = None
76+
self._client_controller: Optional[ClientController] = None
77+
self._task_controller: Optional[TaskController] = None
78+
self._object_controller: Optional[ObjectController] = None
7979

8080
self._unassigned: Queue = Queue()
8181

@@ -87,16 +87,16 @@ def register(
8787
binder: AsyncBinder,
8888
binder_monitor: AsyncConnector,
8989
connector_storage: AsyncObjectStorageConnector,
90-
client_manager: ClientManager,
91-
task_manager: TaskManager,
92-
object_manager: ObjectManager,
90+
client_controller: ClientController,
91+
task_controller: TaskController,
92+
object_controller: ObjectController,
9393
):
9494
self._binder = binder
9595
self._binder_monitor = binder_monitor
9696
self._connector_storage = connector_storage
97-
self._client_manager = client_manager
98-
self._task_manager = task_manager
99-
self._object_manager = object_manager
97+
self._client_controller = client_controller
98+
self._task_controller = task_controller
99+
self._object_controller = object_controller
100100

101101
async def on_graph_task(self, client_id: ClientID, graph_task: GraphTask):
102102
await self._unassigned.put((client_id, graph_task))
@@ -141,7 +141,7 @@ def get_status(self) -> Dict:
141141
async def __add_new_graph(self, client_id: ClientID, graph_task: GraphTask):
142142
graph = {}
143143

144-
self._client_manager.on_task_begin(client_id, graph_task.task_id)
144+
self._client_controller.on_task_begin(client_id, graph_task.task_id)
145145

146146
tasks = dict()
147147
depended_task_id_to_task_id: ManyToManyDict[TaskID, TaskID] = ManyToManyDict()
@@ -199,7 +199,7 @@ async def __check_one_graph(self, graph_task_id: TaskID):
199199
function_args=[self.__get_argument_object(graph_task_id, arg) for arg in task_info.task.function_args],
200200
)
201201

202-
await self._task_manager.on_task_new(graph_info.client, task)
202+
await self._task_controller.on_task_new(graph_info.client, task)
203203

204204
async def __mark_node_done(self, result: TaskResult):
205205
graph_task_id = self._task_id_to_graph_task_id.pop(result.task_id)
@@ -240,7 +240,7 @@ async def __cancel_one_graph(self, graph_task_id: TaskID, result: TaskResult):
240240
result_metadata = result.metadata
241241
result_object_ids = [ObjectID(object_id_bytes) for object_id_bytes in result.results]
242242
result_objects = [
243-
(object_id, self._object_manager.get_object_name(object_id)) for object_id in result_object_ids
243+
(object_id, self._object_controller.get_object_name(object_id)) for object_id in result_object_ids
244244
]
245245
await self.__clean_all_running_nodes(graph_task_id, result_status, result_metadata, result_objects)
246246
await self.__clean_all_inactive_nodes(graph_task_id, result_status, result_metadata, result_objects)
@@ -261,7 +261,7 @@ async def __clean_all_running_nodes(
261261
# cancel all running tasks
262262
for task_id in running_task_ids:
263263
new_result_object_ids = await self.__duplicate_objects(graph_info.client, result_objects)
264-
await self._task_manager.on_task_cancel(graph_info.client, TaskCancel.new_msg(task_id))
264+
await self._task_controller.on_task_cancel(graph_info.client, TaskCancel.new_msg(task_id))
265265
await self.__mark_node_done(
266266
TaskResult.new_msg(
267267
task_id, result_status, result_metadata, [bytes(object_id) for object_id in new_result_object_ids]
@@ -292,7 +292,7 @@ async def __clean_all_inactive_nodes(
292292
)
293293

294294
async def __finish_one_graph(self, graph_task_id: TaskID, result_status: TaskStatus):
295-
self._client_manager.on_task_finish(graph_task_id)
295+
self._client_controller.on_task_finish(graph_task_id)
296296
info = self._graph_task_id_to_graph.pop(graph_task_id)
297297
await self._binder.send(info.client, TaskResult.new_msg(graph_task_id, result_status, metadata=b""))
298298

@@ -329,7 +329,7 @@ async def __clean_intermediate_result(self, graph_task_id: TaskID, task_id: Task
329329
continue
330330

331331
# delete intermediate results as they are not needed anymore
332-
self._object_manager.on_del_objects(graph_info.client, set(graph_info.tasks[argument].result_object_ids))
332+
self._object_controller.on_del_objects(graph_info.client, set(graph_info.tasks[argument].result_object_ids))
333333

334334
async def __duplicate_objects(
335335
self, owner: ClientID, result_objects: List[Tuple[ObjectID, bytes]]
@@ -350,4 +350,6 @@ async def __duplicate_object(
350350
):
351351
await self._connector_storage.duplicate_object_id(object_id, new_object_id)
352352

353-
self._object_manager.on_add_object(owner, new_object_id, ObjectMetadata.ObjectContentType.Object, object_name)
353+
self._object_controller.on_add_object(
354+
owner, new_object_id, ObjectMetadata.ObjectContentType.Object, object_name
355+
)
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
from typing import Optional
2+
3+
import psutil
4+
5+
from scaler.io.async_binder import AsyncBinder
6+
from scaler.io.async_connector import AsyncConnector
7+
from scaler.protocol.python.message import StateScheduler
8+
from scaler.protocol.python.status import Resource
9+
from scaler.scheduler.controllers.mixins import (
10+
ClientController,
11+
InformationController,
12+
ObjectController,
13+
TaskController,
14+
WorkerController,
15+
)
16+
from scaler.utility.mixins import Looper
17+
18+
19+
class VanillaInformationController(InformationController, Looper):
20+
def __init__(self, binder: AsyncConnector):
21+
self._monitor_binder: AsyncConnector = binder
22+
self._process = psutil.Process()
23+
24+
self._binder: Optional[AsyncBinder] = None
25+
self._client_controller: Optional[ClientController] = None
26+
self._object_controller: Optional[ObjectController] = None
27+
self._task_controller: Optional[TaskController] = None
28+
self._worker_controller: Optional[WorkerController] = None
29+
30+
def register_managers(
31+
self,
32+
binder: AsyncBinder,
33+
client_controller: ClientController,
34+
object_controller: ObjectController,
35+
task_controller: TaskController,
36+
worker_controller: WorkerController,
37+
):
38+
self._binder = binder
39+
self._client_controller = client_controller
40+
self._object_controller = object_controller
41+
self._task_controller = task_controller
42+
self._worker_controller = worker_controller
43+
44+
async def routine(self):
45+
await self._monitor_binder.send(
46+
StateScheduler.new_msg(
47+
binder=self._binder.get_status(),
48+
scheduler=Resource.new_msg(int(self._process.cpu_percent() * 10), self._process.memory_info().rss),
49+
rss_free=psutil.virtual_memory().available,
50+
client_manager=self._client_controller.get_status(),
51+
object_manager=self._object_controller.get_status(),
52+
task_manager=self._task_controller.get_status(),
53+
worker_manager=self._worker_controller.get_status(),
54+
)
55+
)
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from scaler.utility.mixins import Reporter
1919

2020

21-
class ObjectManager(Reporter):
21+
class ObjectController(Reporter):
2222
@abc.abstractmethod
2323
async def on_object_instruction(self, source: bytes, request: ObjectInstruction):
2424
raise NotImplementedError()
@@ -50,7 +50,7 @@ def get_object_name(self, object_id: ObjectID) -> bytes:
5050
raise NotImplementedError()
5151

5252

53-
class ClientManager(Reporter):
53+
class ClientController(Reporter):
5454
@abc.abstractmethod
5555
def get_client_task_ids(self, client_id: ClientID) -> Set[TaskID]:
5656
raise NotImplementedError()
@@ -80,7 +80,7 @@ async def on_client_disconnect(self, client_id: ClientID, request: ClientDisconn
8080
raise NotImplementedError()
8181

8282

83-
class GraphTaskManager(Reporter):
83+
class GraphTaskController(Reporter):
8484
@abc.abstractmethod
8585
async def on_graph_task(self, client_id: ClientID, graph_task: GraphTask):
8686
raise NotImplementedError()
@@ -98,7 +98,7 @@ def is_graph_sub_task(self, task_id: TaskID) -> bool:
9898
raise NotImplementedError()
9999

100100

101-
class TaskManager(Reporter):
101+
class TaskController(Reporter):
102102
@abc.abstractmethod
103103
async def on_task_new(self, client_id: ClientID, task: Task):
104104
raise NotImplementedError()
@@ -116,7 +116,7 @@ async def on_task_reroute(self, task_id: TaskID):
116116
raise NotImplementedError()
117117

118118

119-
class WorkerManager(Reporter):
119+
class WorkerController(Reporter):
120120
@abc.abstractmethod
121121
async def assign_task_to_worker(self, task: Task) -> bool:
122122
raise NotImplementedError()
@@ -154,5 +154,5 @@ def get_worker_ids(self) -> Set[WorkerID]:
154154
raise NotImplementedError()
155155

156156

157-
class InformationManager(metaclass=abc.ABCMeta):
157+
class InformationController(metaclass=abc.ABCMeta):
158158
pass

0 commit comments

Comments
 (0)