Skip to content

Commit

Permalink
[Flowinsight] add flow insight frontend and backend (#522)
Browse files Browse the repository at this point in the history
* [Flowinsight] add flow insight frontend and backend

Signed-off-by: sule <[email protected]>

* [Flowinsight] add data flow

Signed-off-by: sule <[email protected]>

* [Flowinsight] use width to represent speed for data flow

Signed-off-by: sule <[email protected]>

---------

Signed-off-by: sule <[email protected]>
  • Loading branch information
xsuler authored Mar 10, 2025
1 parent 20f2d30 commit 32426e1
Show file tree
Hide file tree
Showing 18 changed files with 3,907 additions and 1 deletion.
12 changes: 11 additions & 1 deletion python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ def put_object(
# reference will be created. If another reference is created and
# removed before this one, it will corrupt the state in the
# reference counter.
return ray.ObjectRef(
ref = ray.ObjectRef(
self.core_worker.put_serialized_object_and_increment_local_ref(
serialized_value,
object_ref=object_ref,
Expand All @@ -845,6 +845,10 @@ def put_object(
# The initial local reference is already acquired internally.
skip_adding_local_ref=True,
)
from ray.util.insight import record_object_put

record_object_put(ref.hex(), serialized_value.total_bytes)
return ref

def raise_errors(self, data_metadata_pairs, object_refs):
out = self.deserialize_objects(data_metadata_pairs, object_refs)
Expand Down Expand Up @@ -934,6 +938,12 @@ def get_objects(
else:
raise value

from ray.util.insight import record_object_get

for value, object_ref in zip(values, object_refs):
if value is not None:
record_object_get(object_ref.hex(), object_ref.task_id())

return values, debugger_breakpoint

def main_loop(self):
Expand Down
13 changes: 13 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,8 @@ cdef prepare_args_internal(
put_id = CObjectID.FromBinary(
core_worker.put_serialized_object_and_increment_local_ref(
serialized_arg, inline_small_object=False))
from ray.util.insight import record_object_arg_put
record_object_arg_put(put_id.Hex().decode(), size, function_descriptor.repr)
args_vector.push_back(unique_ptr[CTaskArg](
new CTaskArgByReference(
put_id,
Expand Down Expand Up @@ -1834,6 +1836,8 @@ cdef void execute_task(

return function(actor, *arguments, **kwarguments)

from ray.util.insight import record_object_arg_get

with core_worker.profile_event(b"task::" + name, extra_data=extra_data), \
ray._private.worker._changeproctitle(title, next_title):
task_exception = False
Expand All @@ -1846,6 +1850,10 @@ cdef void execute_task(
object_refs = VectorToObjectRefs(
c_arg_refs,
skip_adding_local_ref=False)

for object_ref in object_refs:
record_object_arg_get(object_ref.hex())

if core_worker.current_actor_is_asyncio():
# We deserialize objects in event loop thread to
# prevent segfaults. See #7799
Expand Down Expand Up @@ -4360,12 +4368,17 @@ cdef class CoreWorker:
check_status(
CCoreWorkerProcess.GetCoreWorker().SealReturnObject(
return_id, return_ptr[0], generator_id, caller_address))

from ray.util.insight import record_object_return_put
record_object_return_put(return_id.Hex().decode(), data_size)
return True
else:
with nogil:
success = (
CCoreWorkerProcess.GetCoreWorker().PinExistingReturnObject(
return_id, return_ptr, generator_id, caller_address))
from ray.util.insight import record_object_return_put
record_object_return_put(return_id.Hex().decode(), data_size)
return success

cdef store_task_outputs(self,
Expand Down
42 changes: 42 additions & 0 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,28 @@ def bind(self, *args, **kwargs):
"""
return self._bind(args, kwargs)

def _get_callee_info(self):
"""
get the callee info of the actor method
this is needed for the insight monitor to record the call
"""
callee_func = self._method_name
actor = self._actor_ref()
callee_class = None
if actor is not None:
callee_class = (
actor._ray_actor_creation_function_descriptor.class_name
+ ":"
+ actor._ray_actor_id.hex()
)
return callee_class, callee_func

def remote(self, *args, **kwargs):
from ray.util.insight import record_control_flow

callee_class, callee_func = self._get_callee_info()
# report the call info to the insight monitor
record_control_flow(callee_class, callee_func)
return self._remote(args, kwargs)

def options(self, **options):
Expand Down Expand Up @@ -1254,8 +1275,29 @@ def _remote(self, args=None, kwargs=None, **actor_options):
original_handle=True,
)

callee_class, callee_func = self._get_callee_info(actor_handle)
from ray.util.insight import record_control_flow

# report the call info to the insight monitor
record_control_flow(callee_class, callee_func)

return actor_handle

def _get_callee_info(self, actor_handle):
"""
get the callee info of the actor method
this is needed for the insight monitor to record the call
"""
callee_func = "__init__"
callee_class = None
if actor_handle is not None:
callee_class = (
actor_handle._ray_actor_creation_function_descriptor.class_name
+ ":"
+ actor_handle._ray_actor_id.hex()
)
return callee_class, callee_func

@DeveloperAPI
def bind(self, *args, **kwargs):
"""
Expand Down
6 changes: 6 additions & 0 deletions python/ray/dashboard/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,19 @@
"@mui/icons-material": "^5.15.5",
"@mui/material": "^5.15.5",
"@reduxjs/toolkit": "^1.3.1",
"@types/d3": "^7.4.3",
"@types/dagre": "^0.7.52",
"@types/dagre-d3": "^0.6.6",
"@types/jest": "^27.5.2",
"@types/lodash": "^4.14.161",
"@types/node": "13.9.5",
"@types/react-redux": "^7.1.7",
"@types/react-window": "^1.8.2",
"axios": "^0.21.1",
"copy-to-clipboard": "^3.3.2",
"d3": "^7.9.0",
"dagre": "^0.8.5",
"dagre-d3": "^0.6.4",
"dayjs": "^1.9.4",
"js-yaml": "^4.1.0",
"lodash": "^4.17.20",
Expand Down
2 changes: 2 additions & 0 deletions python/ray/dashboard/client/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import ActorDetailPage, { ActorDetailLayout } from "./pages/actor/ActorDetail";
import { ActorLayout } from "./pages/actor/ActorLayout";
import Loading from "./pages/exception/Loading";
import JobList, { JobsLayout } from "./pages/job";
import ActorGraph from "./pages/job/ActorGraph";
import { JobDetailChartsPage } from "./pages/job/JobDetail";
import {
JobDetailActorDetailWrapper,
Expand Down Expand Up @@ -239,6 +240,7 @@ const App = () => {
</Route>
<Route element={<JobsLayout />} path="jobs">
<Route element={<JobList />} path="" />
<Route element={<ActorGraph />} path=":jobId/graph" />
<Route element={<JobPage />} path=":id">
<Route element={<JobDetailLayout />} path="">
<Route
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
.elements-panel {
width: 300px;
height: 100%;
background-color: #f8f9fa;
border-right: 1px solid #e1e4e8;
display: flex;
flex-direction: column;
overflow: hidden;
}

.elements-header {
padding: 15px;
border-bottom: 1px solid #e1e4e8;
}

.elements-header h3 {
margin: 0 0 15px 0;
font-size: 16px;
color: #24292e;
}

.search-container {
width: 100%;
}

.search-input {
width: 100%;
padding: 8px 12px;
border: 1px solid #d1d5da;
border-radius: 4px;
font-size: 14px;
box-sizing: border-box;
}

.tab-container {
display: flex;
border-bottom: 1px solid #e1e4e8;
}

.tab {
flex: 1;
text-align: center;
padding: 10px;
cursor: pointer;
font-size: 14px;
color: #586069;
transition: all 0.2s ease;
}

.tab.active {
color: #0366d6;
border-bottom: 2px solid #0366d6;
font-weight: 600;
}

.tab:hover:not(.active) {
background-color: #f1f2f3;
}

.elements-table-container {
flex: 1;
overflow-y: auto;
padding: 0;
}

.elements-table {
width: 100%;
border-collapse: collapse;
font-size: 13px;
}

.elements-table th {
position: sticky;
top: 0;
background-color: #f8f9fa;
padding: 8px 16px;
text-align: left;
border-bottom: 1px solid #e1e4e8;
color: #586069;
font-weight: 600;
width: 100%;
}

.elements-table td {
padding: 8px 16px;
border-bottom: 1px solid #eaecef;
width: 100%;
}

.elements-table tbody tr {
cursor: pointer;
transition: all 0.2s ease;
width: 100%;
}

.elements-table tbody tr:hover {
background-color: #f1f8ff;
}

/* Responsive adjustments */
@media (max-width: 768px) {
.elements-panel {
width: 100%;
height: 300px;
border-right: none;
border-bottom: 1px solid #e1e4e8;
}
}

/* Actor row styles */
.actor-row {
cursor: pointer;
display: flex;
align-items: center;
padding: 0;
border-bottom: 1px solid #eaecef;
width: 100%;
}

.actor-row td {
display: flex;
align-items: center;
width: 100%;
padding: 8px 16px;
}

.actor-row td span {
flex: 1;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
cursor: pointer;
}

/* Expand button */
.expand-button {
background: none;
border: none;
width: 24px;
height: 24px;
padding: 0;
margin-right: 8px;
border-radius: 50%;
display: flex;
align-items: center;
justify-content: center;
cursor: pointer;
font-size: 16px;
font-weight: bold;
color: #586069;
background-color: #f1f2f3;
transition: all 0.2s ease;
flex-shrink: 0;
}

.expand-button:hover {
background-color: #e1e4e8;
}

.expand-button.expanded {
background-color: #0366d6;
color: white;
}

/* Methods container */
.methods-container {
width: 100%;
}

.actor-methods {
width: 100%;
padding: 0;
margin-left: 32px;
}

.actor-methods h4 {
display: none;
}

/* Methods table */
.methods-table {
width: calc(100% - 32px);
border-collapse: separate;
border-spacing: 0;
font-size: 12px;
margin-top: 0;
}

.methods-table tr {
width: 100%;
}

.methods-table td {
padding: 6px 16px;
border-bottom: 1px solid #eaecef;
cursor: pointer;
width: 100%;
}

.methods-table tr:hover {
background-color: #f1f8ff;
}

/* Selected states */
.elements-table tbody tr.selected,
.actor-row.selected,
.methods-table tr.selected {
background-color: #e6f7ff;
border-left: 3px solid #1890ff;
}

.elements-table tbody tr.selected:hover,
.actor-row.selected:hover,
.methods-table tr.selected:hover {
background-color: #e6f7ff;
}
Loading

0 comments on commit 32426e1

Please sign in to comment.