[tool] feature: scheduling analysis based on profiling data#5248
[tool] feature: scheduling analysis based on profiling data#5248mayunaise wants to merge 4 commits intoverl-project:mainfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a new tool for scheduling analysis based on profiling data. The implementation is comprehensive, including data preprocessing, parsing, visualization, and testing. However, I've identified several critical issues related to code robustness, portability, and correctness that could lead to runtime crashes or unexpected behavior. These include fragile logic that relies on specific file naming conventions, potential null reference exceptions, and incorrect type hint imports. Addressing these issues will significantly improve the tool's reliability and maintainability.
| try: | ||
| for (map_key, dir_list) in rank_id_map.items(): | ||
| dir_list.sort(key=lambda x: x.split('_')[-3]) | ||
| self.data_map[map_key] = dir_list | ||
| except Exception as e: | ||
| raise RuntimeError("Found invalid directory name!") from e |
There was a problem hiding this comment.
The sorting key lambda x: x.split('_')[-3] is very fragile and will raise an IndexError if a directory name doesn't have at least three underscores. This error is caught by a broad except Exception block which then raises a generic RuntimeError, crashing the program with an unhelpful message. This should be made more robust to handle unexpected directory names without crashing.
| try: | |
| for (map_key, dir_list) in rank_id_map.items(): | |
| dir_list.sort(key=lambda x: x.split('_')[-3]) | |
| self.data_map[map_key] = dir_list | |
| except Exception as e: | |
| raise RuntimeError("Found invalid directory name!") from e | |
| for (map_key, dir_list) in rank_id_map.items(): | |
| try: | |
| dir_list.sort(key=lambda x: x.split('_')[-3]) | |
| except IndexError: | |
| logger.warning(f"Could not sort paths for {map_key} due to unexpected directory name format. The order may be incorrect.") | |
| self.data_map[map_key] = dir_list |
| # Convert to milliseconds | ||
| us_to_ms = Constant.US_TO_MS | ||
| start_time_ms = start_ids / us_to_ms | ||
| duration_ms = (end_ids - start_ids) / us_to_ms | ||
| end_time_ms = start_time_ms + duration_ms | ||
|
|
||
| event_data = { | ||
| 'name': roll, | ||
| "roll": roll, | ||
| 'domain': "default", | ||
| 'start_time_ms': start_time_ms, | ||
| 'end_time_ms': end_time_ms, | ||
| 'duration_ms': duration_ms, | ||
| 'rank_id': rank_id, | ||
| 'tid': row["pid"] | ||
| } | ||
|
|
||
| events.append(event_data) | ||
|
|
||
| return events |
There was a problem hiding this comment.
There are two issues here. First, if no events are found for the process_id, start_ids will remain None, and line 154 will raise a TypeError. Second, if the loop at line 124 finds no matching events, row will be unbound, and line 166 will raise an UnboundLocalError. You should handle the case where no events are found, and use process_id for the tid.
if start_ids is None:
return events
# Convert to milliseconds
us_to_ms = Constant.US_TO_MS
start_time_ms = start_ids / us_to_ms
duration_ms = (end_ids - start_ids) / us_to_ms
end_time_ms = start_time_ms + duration_ms
event_data = {
'name': roll,
"roll": roll,
'domain': "default",
'start_time_ms': start_time_ms,
'end_time_ms': end_time_ms,
'duration_ms': duration_ms,
'rank_id': rank_id,
'tid': process_id
}
events.append(event_data)
return events| t0 = df["Start"].min() | ||
| df["Start"] -= t0 | ||
| df["Finish"] -= t0 | ||
| df["Duration"] = df["Finish"] - df["Start"] | ||
| return df, t0 |
There was a problem hiding this comment.
If the dataframe df is empty after the cleaning steps above (e.g., all rows have NaNs or invalid start/finish times), df["Start"].min() will raise a ValueError because min() will be called on an empty sequence. You should handle the case of an empty dataframe gracefully, for example by returning early.
| t0 = df["Start"].min() | |
| df["Start"] -= t0 | |
| df["Finish"] -= t0 | |
| df["Duration"] = df["Finish"] - df["Start"] | |
| return df, t0 | |
| if df.empty: | |
| return df, 0.0 | |
| t0 = df["Start"].min() | |
| df["Start"] -= t0 | |
| df["Finish"] -= t0 | |
| df["Duration"] = df["Finish"] - df["Start"] | |
| return df, t0 |
| for dir_name in dirs: | ||
| if dir_name.endswith(Constant.PT_PROF_SUFFIX): | ||
| path = os.path.join(root, dir_name) | ||
| ascend_pt_dirs.append({"roll": os.path.dirname(path).split('/')[-1], "path": path}) |
There was a problem hiding this comment.
The use of split('/') to extract the directory name is not portable and will fail on operating systems that use a different path separator, such as Windows. You should use os.path.basename to make this code cross-platform.
| ascend_pt_dirs.append({"roll": os.path.dirname(path).split('/')[-1], "path": path}) | |
| ascend_pt_dirs.append({"roll": os.path.basename(os.path.dirname(path)), "path": path}) |
| ClusterParserFn = Callable[ | ||
| [ | ||
| str, | ||
| str, | ||
| Dict | ||
| ], | ||
| pd.DataFrame, | ||
| ] |
There was a problem hiding this comment.
The type hint ClusterParserFn is defined as Callable[[str, str, Dict], pd.DataFrame], but the registered functions like cluster_parser_mstx have the signature (config: Dict) -> pd.DataFrame. The type hint should match the actual function signature to avoid confusion and potential issues with static analysis tools.
ClusterParserFn = Callable[[Dict], pd.DataFrame]8f7ad0c to
01268e5
Compare
| if dir_name.endswith(Constant.PT_PROF_SUFFIX): | ||
| path = os.path.join(root, dir_name) | ||
| ascend_pt_dirs.append({"roll": os.path.dirname(path).split("/")[-1], "path": path}) | ||
| data_processor = DataPreprocessor(ascend_pt_dirs) |
There was a problem hiding this comment.
the code here needs to be generic and not Ascend-specific.
|
|
||
|
|
||
| def main(): | ||
| arg_parser = argparse.ArgumentParser(description="集群调度可视化") |
There was a problem hiding this comment.
Translate the comments and help information into English.
| def __init__(self, path_list: list[dict]) -> None: | ||
| self.path_list = path_list | ||
| self.data_map = {} | ||
| pass |
| rank_id_map[(task_roll, rank_id)].append(dir_name) | ||
| try: | ||
| for map_key, dir_list in rank_id_map.items(): | ||
| dir_list.sort(key=lambda x: x.split("_")[-3]) |
4897663 to
12cd5c8
Compare
|
|
||
| class DataMap(TypedDict): | ||
| rank_id: int | ||
| roll: str |
There was a problem hiding this comment.
what's the meaning of ROLL
| def main(): | ||
| arg_parser = argparse.ArgumentParser(description="Cluster scheduling visualization") | ||
| arg_parser.add_argument("--input-path", default="test", help="Raw path of profiling data") | ||
| arg_parser.add_argument("--profiler-type", default="mstx", help="Profiling data type") |
There was a problem hiding this comment.
在help加上profiler-type支持的全量类型,CLUSTER_PARSER_REGISTRY.keys()
12cd5c8 to
48c9a18
Compare
| def main(): | ||
| arg_parser = argparse.ArgumentParser(description="Cluster scheduling visualization") | ||
| arg_parser.add_argument("--input-path", default="test", help="Raw path of profiling data") | ||
| arg_parser.add_argument("--profiler-type", default="mstx", help="Profiler type") |
There was a problem hiding this comment.
Add descriptions of currently supported options in the help information.
| super().__init__(params) | ||
|
|
||
| # TODO: Future support for parsing with MSTX events | ||
| def _parse_rl_mstx_event(self, profiler_data_path: str, rank_id: int, role: str) -> list[EventRow]: |
There was a problem hiding this comment.
The function is not being called. What is its purpose, and is it necessary?
What does this PR do?
Summary
This PR introduces Cluster Analyse, a powerful visualization tool designed to transform raw performance metrics into high-fidelity, actionable timeline insights. It simplifies the complex task of debugging and optimizing distributed RL workloads within the VeRL framework.
From Raw Logs to Instant Insights
Manual analysis of discrete profiling data is time-consuming and error-prone. This tool automates the parsing of profiling logs, allowing developers to visualize the entire RL pipeline (Rollout, Evaluation, Training) in a cohesive temporal view.
Democratizing Performance Optimization
By visualizing stage-by-stage latency, this tool helps researchers pinpoint issues without deep-diving into raw JSON traces.
Strengthening the VeRL Ecosystem
As RL scales to larger clusters, observability becomes a first-class requirement. This feature fills a critical gap in the VeRL toolchain, providing a standardized way for the community to share and compare performance profiles.
Sample
Checklist Before Starting
[{modules}] {type}: {description}(This will be checked by the CI){modules}includefsdp,megatron,veomni,sglang,vllm,rollout,trainer,ci,training_utils,recipe,hardware,deployment,ray,worker,single_controller,misc,perf,model,algo,env,tool,ckpt,doc,data,cfg,reward,like[megatron, fsdp, doc]{type}is infeat,fix,refactor,chore,test[BREAKING]to the beginning of the title.[BREAKING][fsdp, megatron] feat: dynamic batchingTest
API and Usage Example
Design & Code Changes
Checklist Before Submitting
Important
Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=alwaysci-requestchannel in theverlSlack workspace. (If not accessible, please try the Feishu group (飞书群).)recipesubmodule, please also update the reference to the submodule commit viagit submodule update --remoteorcd recipe && git pull origin main.