Skip to content

Commit 1a2b5df

Browse files
cloakedclocksharpener6
authored andcommitted
webui: add capabilities support
Signed-off-by: Cloaked Clock <213850831+cloakedclock@users.noreply.github.com>
1 parent 1e5529c commit 1a2b5df

File tree

8 files changed

+510
-153
lines changed

8 files changed

+510
-153
lines changed

scaler/protocol/capnp/message.capnp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ struct StateTask {
150150
functionName @1 :Data;
151151
state @2 :CommonType.TaskState;
152152
worker @3 :Data;
153-
metadata @4 :Data;
153+
capabilities @4 :List(CommonType.TaskCapability);
154+
metadata @5 :Data;
154155
}
155156

156157
struct StateGraphTask {

scaler/protocol/python/message.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -588,20 +588,32 @@ def state(self) -> TaskState:
588588
def worker(self) -> WorkerID:
589589
return WorkerID(self._msg.worker)
590590

591+
@property
592+
def capabilities(self) -> Dict[str, int]:
593+
return {capability.name: capability.value for capability in self._msg.capabilities}
594+
591595
@property
592596
def metadata(self) -> bytes:
593597
return self._msg.metadata
594598

595599
@staticmethod
596600
def new_msg(
597-
task_id: TaskID, function_name: bytes, task_state: TaskState, worker: WorkerID, metadata: bytes = b""
601+
task_id: TaskID,
602+
function_name: bytes,
603+
task_state: TaskState,
604+
worker: WorkerID,
605+
capabilities: Dict[str, int],
606+
metadata: bytes = b"",
598607
) -> "StateTask":
599608
return StateTask(
600609
_message.StateTask(
601610
taskId=bytes(task_id),
602611
functionName=function_name,
603612
state=task_state.value,
604613
worker=bytes(worker),
614+
capabilities=[
615+
TaskCapability.new_msg(name, value).get_message() for name, value in capabilities.items()
616+
],
605617
metadata=metadata,
606618
)
607619
)

scaler/scheduler/controllers/task_controller.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,10 @@ async def __send_task_cancel_confirm_to_client(self, task_cancel_confirm: TaskCa
332332
async def __send_monitor(self, task_id: TaskID, function_name: bytes, metadata: bytes = b""):
333333
worker = self._worker_controller.get_worker_by_task_id(task_id)
334334
task_state = self._task_state_manager.get_state_machine(task_id).current_state()
335-
await self._binder_monitor.send(StateTask.new_msg(task_id, function_name, task_state, worker, metadata))
335+
capabilities = self._task_id_to_task[task_id].capabilities if task_id in self._task_id_to_task else {}
336+
await self._binder_monitor.send(
337+
StateTask.new_msg(task_id, function_name, task_state, worker, capabilities, metadata)
338+
)
336339

337340
async def __routing(self, task_id: TaskID, transition: TaskTransition, **kwargs):
338341
state_machine = self._task_state_manager.on_transition(task_id, transition)

scaler/ui/memory_window.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,15 @@ def setup_memory_chart(self, settings: Settings):
6969
self._plot = ui.plotly(self._figure).classes("w-full h-full")
7070
self._settings = settings
7171

72-
def handle_task_state(self, state: StateTask):
72+
def handle_task_state(self, state_task: StateTask):
7373
"""
7474
Only completed tasks have profiling data.
7575
Use this data to fill in history.
7676
"""
77-
78-
if state.metadata == b"":
77+
if state_task.metadata == b"":
7978
return
8079

81-
profile_result = ProfileResult.deserialize(state.metadata)
80+
profile_result = ProfileResult.deserialize(state_task.metadata)
8281

8382
worker_memory = profile_result.memory_peak
8483
worker_duration = profile_result.duration_s

0 commit comments

Comments
 (0)