diff --git a/README.md b/README.md index 28c6d30e2..1c591d8ef 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,7 @@ Features | **[Sequence Distributions](docs/tutorials/sequence-distributions.md)** | Mixed ISL/OSL pairings | Benchmarking mixed use cases | | **[Goodput](docs/tutorials/goodput.md)** | Throughput of requests meeting user-defined SLOs | SLO validation, capacity planning, runtime/model comparisons | | **[Request Rate with Max Concurrency](docs/tutorials/request-rate-concurrency.md)** | Dual control of request timing and concurrent connection ceiling (Poisson or constant modes) | Testing API rate/concurrency limits, avoiding thundering herd, realistic client simulation | +| **[GPU Telemetry](docs/tutorials/gpu-telemetry.md)** | Real-time GPU metrics collection via DCGM (power, utilization, memory, temperature, etc) | Performance optimization, resource monitoring, multi-node telemetry | | **[Template Endpoint](docs/tutorials/template-endpoint.md)** | Benchmark custom APIs with flexible Jinja2 request templates | Custom API formats, rapid prototyping, non-standard endpoints | ### Working with Benchmark Data diff --git a/docs/tutorials/gpu-telemetry.md b/docs/tutorials/gpu-telemetry.md index b01e48e94..80c5ea55c 100644 --- a/docs/tutorials/gpu-telemetry.md +++ b/docs/tutorials/gpu-telemetry.md @@ -12,7 +12,7 @@ This guide shows you how to collect GPU metrics (power, utilization, memory, tem This guide covers two setup paths depending on your inference backend: ### Path 1: Dynamo (Built-in DCGM) -If you're using **Dynamo**, it comes with DCGM pre-configured on port 9401. No additional setup needed! Just use the `--gpu-telemetry` flag to enable console display and optionally add additional DCGM url endpoints. +If you're using **Dynamo**, it comes with DCGM pre-configured on port 9401. No additional setup needed! Just use the `--gpu-telemetry` flag to enable console display and optionally add additional DCGM url endpoints. URLs can be specified with or without the `http://` prefix (e.g., `localhost:9400` or `http://localhost:9400`). ### Path 2: Other Inference Servers (Custom DCGM) If you're using **any other inference backend**, you'll need to set up DCGM separately. @@ -28,15 +28,28 @@ AIPerf provides GPU telemetry collection with the `--gpu-telemetry` flag. Here's ### How the `--gpu-telemetry` Flag Works -| Usage | Command | What Gets Collected (If Available) | Console Display | CSV/JSON Export | -|-------|---------|---------------------|-----------------|-----------------| -| **No flag** | `aiperf profile --model MODEL ...` | `http://localhost:9400/metrics` + `http://localhost:9401/metrics` | ❌ No | ✅ Yes | -| **Flag only** | `aiperf profile --model MODEL ... --gpu-telemetry` | `http://localhost:9400/metrics` + `http://localhost:9401/metrics` | ✅ Yes | ✅ Yes | -| **Custom URLs** | `aiperf profile --model MODEL ... --gpu-telemetry http://node1:9400/metrics http://node2:9400/metrics` | `http://localhost:9400/metrics` + `http://localhost:9401/metrics` + custom URLs | ✅ Yes | ✅ Yes | +| Usage | Command | What Gets Collected (If Available) | Console Display | Dashboard View | CSV/JSON Export | +|-------|---------|---------------------|-----------------|----------------|-----------------| +| **No flag** | `aiperf profile --model MODEL ...` | `http://localhost:9400/metrics` + `http://localhost:9401/metrics` | ❌ No | ❌ No | ✅ Yes | +| **Flag only** | `aiperf profile --model MODEL ... --gpu-telemetry` | `http://localhost:9400/metrics` + `http://localhost:9401/metrics` | ✅ Yes | ❌ No | ✅ Yes | +| **Dashboard mode** | `aiperf profile --model MODEL ... --gpu-telemetry dashboard` | `http://localhost:9400/metrics` + `http://localhost:9401/metrics` | ✅ Yes | ✅ Yes | ✅ Yes | +| **Custom URLs** | `aiperf profile --model MODEL ... --gpu-telemetry node1:9400 http://node2:9400/metrics` | `http://localhost:9400/metrics` + `http://localhost:9401/metrics` + custom URLs | ✅ Yes | ❌ No | ✅ Yes | +| **Dashboard + URLs** | `aiperf profile --model MODEL ... --gpu-telemetry dashboard localhost:9400` | `http://localhost:9400/metrics` + `http://localhost:9401/metrics` + custom URLs | ✅ Yes | ✅ Yes | ✅ Yes | > [!IMPORTANT] > The default endpoints `http://localhost:9400/metrics` and `http://localhost:9401/metrics` are ALWAYS attempted for telemetry collection, regardless of whether the `--gpu-telemetry` flag is used. The flag primarily controls whether metrics are displayed on the console and allows you to specify additional custom DCGM exporter endpoints. +> [!NOTE] +> When specifying custom DCGM exporter URLs, the `http://` prefix is optional. URLs like `localhost:9400` will automatically be treated as `http://localhost:9400`. Both formats work identically. + +### Real-Time Dashboard View + +Adding `dashboard` to the `--gpu-telemetry` flag enables a live terminal UI (TUI) that displays GPU metrics in real-time during your benchmark runs: + +```bash +aiperf profile --model MODEL ... --gpu-telemetry dashboard +``` + --- # 1: Using Dynamo @@ -48,7 +61,7 @@ Dynamo includes DCGM out of the box on port 9401 - no extra setup needed! ```bash # Set environment variables export AIPERF_REPO_TAG="main" -export DYNAMO_PREBUILT_IMAGE_TAG="nvcr.io/nvidia/ai-dynamo/vllm-runtime:0.5.0" +export DYNAMO_PREBUILT_IMAGE_TAG="nvcr.io/nvidia/ai-dynamo/vllm-runtime:0.5.1" export MODEL="Qwen/Qwen3-0.6B" # Download the Dynamo container @@ -99,7 +112,7 @@ uv pip install ./aiperf ```bash # Wait for Dynamo API to be ready (up to 15 minutes) -timeout 900 bash -c 'while [ "$(curl -s -o /dev/null -w "%{http_code}" localhost:8080/v1/chat/completions -H "Content-Type: application/json" -d "{\"model\":\"Qwen/Qwen3-0.6B\",\"messages\":[{\"role\":\"user\",\"content\":\"a\"}],\"max_completion_tokens\":1}")" != "200" ]; do sleep 2; done' || { echo "Dynamo not ready after 15min"; exit 1; } +timeout 900 bash -c 'while [ "$(curl -s -o /dev/null -w "%{http_code}" localhost:8000/v1/chat/completions -H "Content-Type: application/json" -d "{\"model\":\"Qwen/Qwen3-0.6B\",\"messages\":[{\"role\":\"user\",\"content\":\"a\"}],\"max_completion_tokens\":1}")" != "200" ]; do sleep 2; done' || { echo "Dynamo not ready after 15min"; exit 1; } ``` ```bash # Wait for DCGM Exporter to be ready (up to 2 minutes after Dynamo is ready) @@ -116,7 +129,7 @@ aiperf profile \ --endpoint-type chat \ --endpoint /v1/chat/completions \ --streaming \ - --url localhost:8080 \ + --url localhost:8000 \ --synthetic-input-tokens-mean 100 \ --synthetic-input-tokens-stddev 0 \ --output-tokens-mean 200 \ @@ -131,6 +144,9 @@ aiperf profile \ --gpu-telemetry ``` +> [!TIP] +> The `dashboard` keyword enables a live terminal UI for real-time GPU telemetry visualization. Press `5` to maximize the GPU Telemetry panel during the benchmark run. + --- # 2: Using Other Inference Server @@ -279,6 +295,12 @@ aiperf profile \ --gpu-telemetry ``` +> [!TIP] +> The `dashboard` keyword enables a live terminal UI for real-time GPU telemetry visualization. Press `5` to maximize the GPU Telemetry panel during the benchmark run. + +> [!TIP] +> The `dashboard` keyword enables a live terminal UI for real-time GPU telemetry visualization. Press `5` to maximize the GPU Telemetry panel during the benchmark run. + ## Multi-Node GPU Telemetry Example For distributed setups with multiple nodes, you can collect GPU telemetry from all nodes simultaneously: @@ -287,12 +309,13 @@ For distributed setups with multiple nodes, you can collect GPU telemetry from a # Example: Collecting telemetry from 3 nodes in a distributed setup # Note: The default endpoints http://localhost:9400/metrics and http://localhost:9401/metrics # are always attempted in addition to these custom URLs +# URLs can be specified with or without the http:// prefix aiperf profile \ --model Qwen/Qwen3-0.6B \ --endpoint-type chat \ --endpoint /v1/chat/completions \ --streaming \ - --url localhost:8080 \ + --url localhost:8000 \ --synthetic-input-tokens-mean 100 \ --synthetic-input-tokens-stddev 0 \ --output-tokens-mean 200 \ @@ -304,14 +327,14 @@ aiperf profile \ --warmup-request-count 1 \ --conversation-num 8 \ --random-seed 100 \ - --gpu-telemetry http://node1:9400/metrics http://node2:9400/metrics http://node3:9400/metrics + --gpu-telemetry node1:9400 node2:9400 http://node3:9400/metrics ``` This will collect GPU metrics from: - `http://localhost:9400/metrics` (default, always attempted) - `http://localhost:9401/metrics` (default, always attempted) -- `http://node1:9400/metrics` (custom node 1) -- `http://node2:9400/metrics` (custom node 2) +- `http://node1:9400` (custom node 1, normalized from `node1:9400`) +- `http://node2:9400` (custom node 2, normalized from `node2:9400`) - `http://node3:9400/metrics` (custom node 3) All metrics are displayed on the console and saved to the output CSV and JSON files, with GPU indices and hostnames distinguishing metrics from different nodes. diff --git a/mkdocs.yml b/mkdocs.yml index 77cc62f1b..850bc506b 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -17,6 +17,8 @@ nav: - Time-based Benchmarking: tutorials/time-based-benchmarking.md - Sequence Distributions: tutorials/sequence-distributions.md - Goodput: tutorials/goodput.md + - Request Rate with Max Concurrency: tutorials/request-rate-concurrency.md + - GPU Telemetry: tutorials/gpu-telemetry.md - Template Endpoint: tutorials/template-endpoint.md - Reference: - Architecture: architecture.md diff --git a/src/aiperf/common/config/user_config.py b/src/aiperf/common/config/user_config.py index c9fe568c3..b912fd450 100644 --- a/src/aiperf/common/config/user_config.py +++ b/src/aiperf/common/config/user_config.py @@ -19,7 +19,7 @@ from aiperf.common.config.loadgen_config import LoadGeneratorConfig from aiperf.common.config.output_config import OutputConfig from aiperf.common.config.tokenizer_config import TokenizerConfig -from aiperf.common.enums import CustomDatasetType +from aiperf.common.enums import CustomDatasetType, GPUTelemetryMode from aiperf.common.enums.timing_enums import RequestRateMode, TimingMode from aiperf.common.utils import load_json_str @@ -224,6 +224,44 @@ def _count_dataset_entries(self) -> int: ), ] + _gpu_telemetry_mode: GPUTelemetryMode = GPUTelemetryMode.SUMMARY + _gpu_telemetry_urls: list[str] = [] + + @model_validator(mode="after") + def _parse_gpu_telemetry_config(self) -> Self: + """Parse gpu_telemetry list into mode and URLs.""" + if not self.gpu_telemetry: + return self + + mode = GPUTelemetryMode.SUMMARY + urls = [] + + for item in self.gpu_telemetry: + if item in ["dashboard"]: + mode = GPUTelemetryMode.REALTIME_DASHBOARD + elif item.startswith("http") or ":" in item: + normalized_url = item if item.startswith("http") else f"http://{item}" + urls.append(normalized_url) + + self._gpu_telemetry_mode = mode + self._gpu_telemetry_urls = urls + return self + + @property + def gpu_telemetry_mode(self) -> GPUTelemetryMode: + """Get the GPU telemetry display mode (parsed from gpu_telemetry list).""" + return self._gpu_telemetry_mode + + @gpu_telemetry_mode.setter + def gpu_telemetry_mode(self, value: GPUTelemetryMode) -> None: + """Set the GPU telemetry display mode.""" + self._gpu_telemetry_mode = value + + @property + def gpu_telemetry_urls(self) -> list[str]: + """Get the parsed GPU telemetry DCGM endpoint URLs.""" + return self._gpu_telemetry_urls + @model_validator(mode="after") def _compute_config(self) -> Self: """Compute additional configuration. diff --git a/src/aiperf/common/enums/__init__.py b/src/aiperf/common/enums/__init__.py index c562d5094..0f887d8e9 100644 --- a/src/aiperf/common/enums/__init__.py +++ b/src/aiperf/common/enums/__init__.py @@ -96,6 +96,9 @@ from aiperf.common.enums.system_enums import ( SystemState, ) +from aiperf.common.enums.telemetry_enums import ( + GPUTelemetryMode, +) from aiperf.common.enums.timing_enums import ( CreditPhase, RequestRateMode, @@ -131,6 +134,7 @@ "ExportLevel", "FrequencyMetricUnit", "FrequencyMetricUnitInfo", + "GPUTelemetryMode", "GenericMetricUnit", "ImageFormat", "LifecycleState", diff --git a/src/aiperf/common/enums/command_enums.py b/src/aiperf/common/enums/command_enums.py index 0b7416459..3dcf9bd9c 100644 --- a/src/aiperf/common/enums/command_enums.py +++ b/src/aiperf/common/enums/command_enums.py @@ -14,6 +14,7 @@ class CommandType(CaseInsensitiveStrEnum): SHUTDOWN = "shutdown" SHUTDOWN_WORKERS = "shutdown_workers" SPAWN_WORKERS = "spawn_workers" + START_REALTIME_TELEMETRY = "start_realtime_telemetry" class CommandResponseStatus(CaseInsensitiveStrEnum): diff --git a/src/aiperf/common/enums/message_enums.py b/src/aiperf/common/enums/message_enums.py index d18db39e3..3ba913cef 100644 --- a/src/aiperf/common/enums/message_enums.py +++ b/src/aiperf/common/enums/message_enums.py @@ -41,6 +41,7 @@ class MessageType(CaseInsensitiveStrEnum): PROFILE_PROGRESS = "profile_progress" PROFILE_RESULTS = "profile_results" REALTIME_METRICS = "realtime_metrics" + REALTIME_TELEMETRY_METRICS = "realtime_telemetry_metrics" REGISTRATION = "registration" SERVICE_ERROR = "service_error" STATUS = "status" diff --git a/src/aiperf/common/enums/telemetry_enums.py b/src/aiperf/common/enums/telemetry_enums.py new file mode 100644 index 000000000..1efd870de --- /dev/null +++ b/src/aiperf/common/enums/telemetry_enums.py @@ -0,0 +1,11 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from aiperf.common.enums.base_enums import CaseInsensitiveStrEnum + + +class GPUTelemetryMode(CaseInsensitiveStrEnum): + """GPU telemetry display mode.""" + + SUMMARY = "summary" + REALTIME_DASHBOARD = "realtime_dashboard" diff --git a/src/aiperf/common/hooks.py b/src/aiperf/common/hooks.py index 92adf80f6..9887332d9 100644 --- a/src/aiperf/common/hooks.py +++ b/src/aiperf/common/hooks.py @@ -44,6 +44,7 @@ class AIPerfHook(CaseInsensitiveStrEnum): ON_INIT = "@on_init" ON_MESSAGE = "@on_message" ON_REALTIME_METRICS = "@on_realtime_metrics" + ON_REALTIME_TELEMETRY_METRICS = "@on_realtime_telemetry_metrics" ON_PROFILING_PROGRESS = "@on_profiling_progress" ON_PULL_MESSAGE = "@on_pull_message" ON_RECORDS_PROGRESS = "@on_records_progress" @@ -348,6 +349,21 @@ def _on_realtime_metrics(self, metrics: list[MetricResult]) -> None: return _hook_decorator(AIPerfHook.ON_REALTIME_METRICS, func) +def on_realtime_telemetry_metrics(func: Callable) -> Callable: + """Decorator to specify that the function is a hook that should be called when real-time GPU telemetry metrics are received. + See :func:`aiperf.common.hooks._hook_decorator`. + + Example: + ```python + class MyPlugin(RealtimeMetricsMixin): + @on_realtime_telemetry_metrics + def _on_realtime_telemetry_metrics(self, metrics: list[MetricResult]) -> None: + pass + ``` + """ + return _hook_decorator(AIPerfHook.ON_REALTIME_TELEMETRY_METRICS, func) + + def on_pull_message( *message_types: MessageTypeT | Callable[[SelfT], Iterable[MessageTypeT]], ) -> Callable: diff --git a/src/aiperf/common/messages/__init__.py b/src/aiperf/common/messages/__init__.py index 315faa854..5551e2c65 100644 --- a/src/aiperf/common/messages/__init__.py +++ b/src/aiperf/common/messages/__init__.py @@ -31,6 +31,7 @@ ShutdownCommand, ShutdownWorkersCommand, SpawnWorkersCommand, + StartRealtimeTelemetryCommand, TargetedServiceMessage, ) from aiperf.common.messages.credit_messages import ( @@ -75,6 +76,7 @@ ) from aiperf.common.messages.telemetry_messages import ( ProcessTelemetryResultMessage, + RealtimeTelemetryMetricsMessage, TelemetryRecordsMessage, TelemetryStatusMessage, ) @@ -127,6 +129,7 @@ "ProfileStartCommand", "RealtimeMetricsCommand", "RealtimeMetricsMessage", + "RealtimeTelemetryMetricsMessage", "RecordsProcessingStatsMessage", "RegisterServiceCommand", "RegistrationMessage", @@ -134,6 +137,7 @@ "ShutdownCommand", "ShutdownWorkersCommand", "SpawnWorkersCommand", + "StartRealtimeTelemetryCommand", "StatusMessage", "TargetedServiceMessage", "TelemetryRecordsMessage", diff --git a/src/aiperf/common/messages/command_messages.py b/src/aiperf/common/messages/command_messages.py index 45b271453..f417dd6dd 100644 --- a/src/aiperf/common/messages/command_messages.py +++ b/src/aiperf/common/messages/command_messages.py @@ -242,6 +242,17 @@ class RealtimeMetricsCommand(CommandMessage): command: CommandTypeT = CommandType.REALTIME_METRICS +class StartRealtimeTelemetryCommand(CommandMessage): + """Command to start the realtime telemetry background task in RecordsManager. + + This command is sent when the user dynamically enables the telemetry dashboard + by pressing the telemetry option in the UI. This always sets the GPU telemetry + mode to REALTIME_DASHBOARD. + """ + + command: CommandTypeT = CommandType.START_REALTIME_TELEMETRY + + class SpawnWorkersCommand(CommandMessage): command: CommandTypeT = CommandType.SPAWN_WORKERS diff --git a/src/aiperf/common/messages/telemetry_messages.py b/src/aiperf/common/messages/telemetry_messages.py index cb05608d9..f8c42b764 100644 --- a/src/aiperf/common/messages/telemetry_messages.py +++ b/src/aiperf/common/messages/telemetry_messages.py @@ -5,7 +5,12 @@ from aiperf.common.enums import MessageType from aiperf.common.messages.service_messages import BaseServiceMessage -from aiperf.common.models import ErrorDetails, ProcessTelemetryResult, TelemetryRecord +from aiperf.common.models import ( + ErrorDetails, + MetricResult, + ProcessTelemetryResult, + TelemetryRecord, +) from aiperf.common.types import MessageTypeT @@ -19,6 +24,10 @@ class TelemetryRecordsMessage(BaseServiceMessage): ..., description="The ID of the telemetry data collector that collected the records.", ) + dcgm_url: str = Field( + ..., + description="The DCGM endpoint URL that was contacted (e.g., 'http://localhost:9400/metrics')", + ) records: list[TelemetryRecord] = Field( ..., description="The telemetry records collected from GPU monitoring" ) @@ -62,3 +71,13 @@ class TelemetryStatusMessage(BaseServiceMessage): default_factory=list, description="List of DCGM endpoint URLs that were reachable and will provide data", ) + + +class RealtimeTelemetryMetricsMessage(BaseServiceMessage): + """Message from the records manager to show real-time GPU telemetry metrics.""" + + message_type: MessageTypeT = MessageType.REALTIME_TELEMETRY_METRICS + + metrics: list[MetricResult] = Field( + ..., description="The current real-time GPU telemetry metrics." + ) diff --git a/src/aiperf/common/mixins/__init__.py b/src/aiperf/common/mixins/__init__.py index 0ad0d87f5..17a3d71a0 100644 --- a/src/aiperf/common/mixins/__init__.py +++ b/src/aiperf/common/mixins/__init__.py @@ -44,6 +44,9 @@ from aiperf.common.mixins.realtime_metrics_mixin import ( RealtimeMetricsMixin, ) +from aiperf.common.mixins.realtime_telemetry_metrics_mixin import ( + RealtimeTelemetryMetricsMixin, +) from aiperf.common.mixins.reply_client_mixin import ( ReplyClientMixin, ) @@ -67,6 +70,7 @@ "ProgressTrackerMixin", "PullClientMixin", "RealtimeMetricsMixin", + "RealtimeTelemetryMetricsMixin", "ReplyClientMixin", "TaskManagerMixin", "WorkerTrackerMixin", diff --git a/src/aiperf/common/mixins/realtime_telemetry_metrics_mixin.py b/src/aiperf/common/mixins/realtime_telemetry_metrics_mixin.py new file mode 100644 index 000000000..e9410dec9 --- /dev/null +++ b/src/aiperf/common/mixins/realtime_telemetry_metrics_mixin.py @@ -0,0 +1,40 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +import asyncio + +from aiperf.common.config import ServiceConfig +from aiperf.common.enums import MessageType +from aiperf.common.hooks import AIPerfHook, on_message, provides_hooks +from aiperf.common.messages import RealtimeTelemetryMetricsMessage +from aiperf.common.mixins.message_bus_mixin import MessageBusClientMixin +from aiperf.common.models import MetricResult +from aiperf.controller.system_controller import SystemController + + +@provides_hooks(AIPerfHook.ON_REALTIME_TELEMETRY_METRICS) +class RealtimeTelemetryMetricsMixin(MessageBusClientMixin): + """A mixin that provides a hook for real-time GPU telemetry metrics.""" + + def __init__( + self, service_config: ServiceConfig, controller: SystemController, **kwargs + ): + super().__init__(service_config=service_config, controller=controller, **kwargs) + self._controller = controller + self._telemetry_metrics: list[MetricResult] = [] + self._telemetry_metrics_lock = asyncio.Lock() + + @on_message(MessageType.REALTIME_TELEMETRY_METRICS) + async def _on_realtime_telemetry_metrics( + self, message: RealtimeTelemetryMetricsMessage + ): + """Update the telemetry metrics from a real-time telemetry metrics message.""" + self.debug( + f"Mixin received telemetry message with {len(message.metrics)} metrics, triggering hook" + ) + + async with self._telemetry_metrics_lock: + self._telemetry_metrics = message.metrics + await self.run_hooks( + AIPerfHook.ON_REALTIME_TELEMETRY_METRICS, + metrics=message.metrics, + ) diff --git a/src/aiperf/common/models/record_models.py b/src/aiperf/common/models/record_models.py index 13af89cd5..57cd19c2f 100644 --- a/src/aiperf/common/models/record_models.py +++ b/src/aiperf/common/models/record_models.py @@ -44,6 +44,10 @@ class MetricResult(JsonMetricResult): default=None, description="The total number of records used to calculate the metric", ) + current: float | None = Field( + default=None, + description="The most recent value of the metric (used for realtime dashboard display only)", + ) def to_display_unit(self) -> "MetricResult": """Convert the metric result to its display unit.""" diff --git a/src/aiperf/common/models/telemetry_models.py b/src/aiperf/common/models/telemetry_models.py index 90c0d8d5d..46ee7a7e2 100644 --- a/src/aiperf/common/models/telemetry_models.py +++ b/src/aiperf/common/models/telemetry_models.py @@ -212,6 +212,7 @@ def to_metric_result( avg=float(np.mean(values)), std=float(np.std(values)), count=len(values), + current=float(data_points[-1][0]), p1=p1, p5=p5, p10=p10, diff --git a/src/aiperf/exporters/csv_exporter.py b/src/aiperf/exporters/csv_exporter.py index e79470020..0b6783a18 100644 --- a/src/aiperf/exporters/csv_exporter.py +++ b/src/aiperf/exporters/csv_exporter.py @@ -38,7 +38,6 @@ class CsvExporter(AIPerfLoggerMixin): def __init__(self, exporter_config: ExporterConfig, **kwargs) -> None: super().__init__(**kwargs) - self.debug(lambda: f"Initializing CsvExporter with config: {exporter_config}") self._results = exporter_config.results self._telemetry_results = exporter_config.telemetry_results self._output_directory = exporter_config.user_config.output.artifact_directory diff --git a/src/aiperf/exporters/json_exporter.py b/src/aiperf/exporters/json_exporter.py index b0d2d30b6..2704feb2b 100644 --- a/src/aiperf/exporters/json_exporter.py +++ b/src/aiperf/exporters/json_exporter.py @@ -38,7 +38,6 @@ class JsonExporter(AIPerfLoggerMixin): def __init__(self, exporter_config: ExporterConfig, **kwargs) -> None: super().__init__(**kwargs) - self.debug(lambda: f"Initializing JsonExporter with config: {exporter_config}") self._results = exporter_config.results self._telemetry_results = exporter_config.telemetry_results self._input_config = exporter_config.user_config diff --git a/src/aiperf/gpu_telemetry/telemetry_data_collector.py b/src/aiperf/gpu_telemetry/telemetry_data_collector.py index ac75345bf..726f62197 100644 --- a/src/aiperf/gpu_telemetry/telemetry_data_collector.py +++ b/src/aiperf/gpu_telemetry/telemetry_data_collector.py @@ -235,7 +235,6 @@ def _parse_metrics_to_records(self, metrics_data: str) -> list[TelemetryRecord]: Returns empty list if metrics_data is empty or parsing fails. """ if not metrics_data.strip(): - self.warning("Response from DCGM metrics endpoint is empty") return [] current_timestamp = time.time_ns() diff --git a/src/aiperf/gpu_telemetry/telemetry_manager.py b/src/aiperf/gpu_telemetry/telemetry_manager.py index 2662f5262..fb161dda2 100644 --- a/src/aiperf/gpu_telemetry/telemetry_manager.py +++ b/src/aiperf/gpu_telemetry/telemetry_manager.py @@ -2,7 +2,6 @@ # SPDX-License-Identifier: Apache-2.0 import asyncio -from urllib.parse import urlparse from aiperf.common.base_component_service import BaseComponentService from aiperf.common.config import ServiceConfig, UserConfig @@ -16,7 +15,6 @@ from aiperf.common.factories import ServiceFactory from aiperf.common.hooks import on_command, on_init, on_stop from aiperf.common.messages import ( - CommandAcknowledgedResponse, ProfileCancelCommand, ProfileConfigureCommand, TelemetryRecordsMessage, @@ -71,29 +69,17 @@ def __init__( ) self._collectors: dict[str, TelemetryDataCollector] = {} + self._collector_id_to_url: dict[str, str] = {} self._user_explicitly_configured_telemetry = ( user_config.gpu_telemetry is not None ) - # Normalize to list - user_endpoints = ( - [] - if user_config.gpu_telemetry is None - else [user_config.gpu_telemetry] - if isinstance(user_config.gpu_telemetry, str) - else list(user_config.gpu_telemetry) - ) + user_endpoints = user_config.gpu_telemetry_urls or [] + if isinstance(user_endpoints, str): + user_endpoints = [user_endpoints] - # Validate and normalize URLs - valid_endpoints = [] - for endpoint in user_endpoints: - try: - parsed = urlparse(endpoint) - if parsed.scheme in ("http", "https") and parsed.netloc: - valid_endpoints.append(self._normalize_dcgm_url(endpoint)) - except Exception: - continue + valid_endpoints = [self._normalize_dcgm_url(url) for url in user_endpoints] # Store user-provided endpoints separately for display filtering (excluding auto-inserted defaults) self._user_provided_endpoints = [ @@ -179,9 +165,11 @@ async def _profile_configure_command( """ self._collectors.clear() + self._collector_id_to_url.clear() for dcgm_url in self._dcgm_endpoints: self.debug(f"GPU Telemetry: Testing reachability of {dcgm_url}") collector_id = f"collector_{dcgm_url.replace(':', '_').replace('/', '_')}" + self._collector_id_to_url[collector_id] = dcgm_url collector = TelemetryDataCollector( dcgm_url=dcgm_url, collection_interval=self._collection_interval, @@ -238,10 +226,6 @@ async def _on_start_profiling(self, message) -> None: Args: message: Profile start command from SystemController """ - await self.publish( - CommandAcknowledgedResponse.from_command_message(message, self.service_id) - ) - if not self._collectors: # Telemetry disabled status already sent in _profile_configure_command, only shutdown here self._shutdown_task = asyncio.create_task(self._delayed_shutdown()) @@ -337,9 +321,11 @@ async def _on_telemetry_records( return try: + dcgm_url = self._collector_id_to_url.get(collector_id, "") message = TelemetryRecordsMessage( service_id=self.service_id, collector_id=collector_id, + dcgm_url=dcgm_url, records=records, error=None, ) @@ -361,9 +347,11 @@ async def _on_telemetry_error(self, error: ErrorDetails, collector_id: str) -> N """ try: + dcgm_url = self._collector_id_to_url.get(collector_id, "") error_message = TelemetryRecordsMessage( service_id=self.service_id, collector_id=collector_id, + dcgm_url=dcgm_url, records=[], error=error, ) diff --git a/src/aiperf/post_processors/telemetry_results_processor.py b/src/aiperf/post_processors/telemetry_results_processor.py index 0329c0071..acba26871 100644 --- a/src/aiperf/post_processors/telemetry_results_processor.py +++ b/src/aiperf/post_processors/telemetry_results_processor.py @@ -13,6 +13,7 @@ from aiperf.common.protocols import ( TelemetryResultsProcessorProtocol, ) +from aiperf.exporters.display_units_utils import normalize_endpoint_display from aiperf.gpu_telemetry.constants import GPU_TELEMETRY_METRICS_CONFIG from aiperf.post_processors.base_metrics_processor import BaseMetricsProcessor @@ -27,13 +28,16 @@ def __init__(self, user_config: UserConfig, **kwargs: Any): self._telemetry_hierarchy = TelemetryHierarchy() + def get_telemetry_hierarchy(self) -> TelemetryHierarchy: + """Get the accumulated telemetry hierarchy.""" + return self._telemetry_hierarchy + async def process_telemetry_record(self, record: TelemetryRecord) -> None: """Process individual telemetry record into hierarchical storage. Args: record: TelemetryRecord containing GPU metrics and hierarchical metadata """ - self._telemetry_hierarchy.add_record(record) async def summarize(self) -> list[MetricResult]: @@ -41,19 +45,20 @@ async def summarize(self) -> list[MetricResult]: This method is called by RecordsManager for: 1. Final results generation when profiling completes - 2. [AIP-355] TODO: @ilana-n [FUTURE] real-time dashboard updates (every DEFAULT_REALTIME_METRICS_INTERVAL) - when user-set flag is enabled + 2. Real-time dashboard updates when --gpu-telemetry dashboard is enabled Returns: List of MetricResult objects, one per GPU per metric type. Tags follow hierarchical naming pattern for dashboard filtering. """ - results = [] for dcgm_url, gpu_data in self._telemetry_hierarchy.dcgm_endpoints.items(): + endpoint_display = normalize_endpoint_display(dcgm_url) + for gpu_uuid, telemetry_data in gpu_data.items(): gpu_index = telemetry_data.metadata.gpu_index + model_name = telemetry_data.metadata.model_name for ( metric_display, @@ -66,12 +71,9 @@ async def summarize(self) -> list[MetricResult]: .replace("/", "_") .replace(".", "_") ) - # Use first 12 chars of UUID for readability while maintaining uniqueness tag = f"{metric_name}_dcgm_{dcgm_tag}_gpu{gpu_index}_{gpu_uuid[:12]}" - header = ( - f"{metric_display} (GPU {gpu_index}, {gpu_uuid[:12]}...)" - ) + header = f"{metric_display} | {endpoint_display} | GPU {gpu_index} | {model_name}" unit = unit_enum.value diff --git a/src/aiperf/records/__init__.py b/src/aiperf/records/__init__.py index 02e44de59..ced5b2392 100644 --- a/src/aiperf/records/__init__.py +++ b/src/aiperf/records/__init__.py @@ -24,6 +24,7 @@ ) from aiperf.records.records_manager import ( RecordsManager, + TelemetryTrackingState, ) __all__ = [ @@ -36,4 +37,5 @@ "PhaseCompletionContext", "RecordProcessor", "RecordsManager", + "TelemetryTrackingState", ] diff --git a/src/aiperf/records/records_manager.py b/src/aiperf/records/records_manager.py index 55e828ee7..d5f347dba 100644 --- a/src/aiperf/records/records_manager.py +++ b/src/aiperf/records/records_manager.py @@ -3,6 +3,8 @@ import asyncio import copy import time +from collections import defaultdict +from dataclasses import dataclass, field from aiperf.common.base_component_service import BaseComponentService from aiperf.common.config import ServiceConfig, UserConfig @@ -13,6 +15,7 @@ CommAddress, CommandType, CreditPhase, + GPUTelemetryMode, MessageType, ServiceType, ) @@ -30,7 +33,9 @@ ProcessTelemetryResultMessage, ProfileCancelCommand, RealtimeMetricsMessage, + RealtimeTelemetryMetricsMessage, RecordsProcessingStatsMessage, + StartRealtimeTelemetryCommand, TelemetryRecordsMessage, ) from aiperf.common.messages.command_messages import RealtimeMetricsCommand @@ -59,6 +64,28 @@ from aiperf.records.phase_completion import PhaseCompletionChecker +@dataclass +class TelemetryTrackingState: + """ + Tracks telemetry-related state and performance metrics. + + Consolidates error tracking, warnings, endpoint status, and performance + statistics for GPU telemetry collection and processing. + """ + + error_counts: dict[ErrorDetails, int] = field( + default_factory=lambda: defaultdict(int) + ) + error_counts_lock: asyncio.Lock = field(default_factory=asyncio.Lock) + + task_runs: int = 0 + total_gen_time_ms: float = 0.0 + total_pub_time_ms: float = 0.0 + total_metrics_generated: int = 0 + mode_enabled_time: float | None = None + last_metric_values: dict[str, float | None] | None = None + + @implements_protocol(ServiceProtocol) @ServiceFactory.register(ServiceType.RECORDS_MANAGER) class RecordsManager(PullClientMixin, BaseComponentService): @@ -105,13 +132,8 @@ def __init__( self._previous_realtime_records: int | None = None - # Telemetry data storage - self._telemetry_hierarchy = TelemetryHierarchy() - self._telemetry_hierarchy_lock = asyncio.Lock() - self._telemetry_error_counts: dict[ - ErrorDetails, int - ] = {} # Track telemetry-specific errors with counts - self._telemetry_error_counts_lock = asyncio.Lock() + self._telemetry_state = TelemetryTrackingState() + self._telemetry_enable_event = asyncio.Event() self._metric_results_processors: list[ResultsProcessorProtocol] = [] self._telemetry_results_processors: list[TelemetryResultsProcessorProtocol] = [] @@ -200,7 +222,6 @@ async def _on_telemetry_records(self, message: TelemetryRecordsMessage) -> None: Args: message: Batch of telemetry records from a DCGM collector """ - if message.valid: try: await self._send_telemetry_to_results_processors(message.records) @@ -208,21 +229,13 @@ async def _on_telemetry_records(self, message: TelemetryRecordsMessage) -> None: error_details = ErrorDetails( message=f"Telemetry processor error: {str(e)}" ) - async with self._telemetry_error_counts_lock: - self._telemetry_error_counts[error_details] = ( - self._telemetry_error_counts.get(error_details, 0) + 1 - ) + async with self._telemetry_state.error_counts_lock: + self._telemetry_state.error_counts[error_details] += 1 self.debug(f"Failed to process telemetry batch: {e}") - - async with self._telemetry_hierarchy_lock: - for record in message.records: - self._telemetry_hierarchy.add_record(record) else: if message.error: - async with self._telemetry_error_counts_lock: - self._telemetry_error_counts[message.error] = ( - self._telemetry_error_counts.get(message.error, 0) + 1 - ) + async with self._telemetry_state.error_counts_lock: + self._telemetry_state.error_counts[message.error] += 1 def _should_include_request_by_duration( self, record_data: MetricRecordsData @@ -418,10 +431,11 @@ async def _on_profile_cancel_command( return await self._process_results(cancelled=True) @background_task(interval=None, immediate=True) - async def _report_realtime_metrics_task(self) -> None: - """Report the real-time metrics at a regular interval (only if the UI type is dashboard).""" + async def _report_realtime_inference_metrics_task(self) -> None: + """Report inference metrics at regular intervals (dashboard only).""" if self.service_config.ui_type != AIPerfUIType.DASHBOARD: return + while not self.stop_requested: await asyncio.sleep(Environment.UI.REALTIME_METRICS_INTERVAL) async with self.processing_status_lock: @@ -433,6 +447,41 @@ async def _report_realtime_metrics_task(self) -> None: self._previous_realtime_records = self.processing_stats.processed await self._report_realtime_metrics() + @background_task(interval=None, immediate=True) + async def _report_realtime_telemetry_metrics_task(self) -> None: + """Report telemetry metrics - sleeps when disabled, resumes on command.""" + if self.service_config.ui_type != AIPerfUIType.DASHBOARD: + return + + while not self.stop_requested: + if ( + self.user_config.gpu_telemetry_mode + != GPUTelemetryMode.REALTIME_DASHBOARD + ): + # Disabled - sleep until command wakes us + await self._telemetry_enable_event.wait() + self._telemetry_enable_event.clear() + + telemetry_metrics = await self._generate_realtime_telemetry_metrics() + self._telemetry_state.total_metrics_generated += len(telemetry_metrics) + + if telemetry_metrics: + # Only publish if values have changed - extract once for efficiency + new_values = {m.tag: m.current for m in telemetry_metrics} + if ( + self._telemetry_state.last_metric_values is None + or new_values != self._telemetry_state.last_metric_values + ): + await self.publish( + RealtimeTelemetryMetricsMessage( + service_id=self.service_id, + metrics=telemetry_metrics, + ) + ) + self._telemetry_state.last_metric_values = new_values + + await asyncio.sleep(Environment.UI.REALTIME_METRICS_INTERVAL) + @on_command(CommandType.REALTIME_METRICS) async def _on_realtime_metrics_command( self, message: RealtimeMetricsCommand @@ -440,17 +489,43 @@ async def _on_realtime_metrics_command( """Handle a real-time metrics command.""" await self._report_realtime_metrics() + @on_command(CommandType.START_REALTIME_TELEMETRY) + async def _on_start_realtime_telemetry_command( + self, message: StartRealtimeTelemetryCommand + ) -> None: + """Handle command to start the realtime telemetry background task. + + This is called when the user dynamically enables the telemetry dashboard + by pressing the telemetry option in the UI without having passed the 'dashboard' parameter + at startup. + """ + self.info("Received START_REALTIME_TELEMETRY command") + + self.user_config.gpu_telemetry_mode = GPUTelemetryMode.REALTIME_DASHBOARD + + # Wake up the sleeping telemetry task + self._telemetry_enable_event.set() + async def _report_realtime_metrics(self) -> None: - """Report the real-time metrics.""" + """Report both inference and telemetry metrics (used by command handler).""" metrics = await self._generate_realtime_metrics() - if not metrics: - return - await self.publish( - RealtimeMetricsMessage( - service_id=self.service_id, - metrics=metrics, + if metrics: + await self.publish( + RealtimeMetricsMessage( + service_id=self.service_id, + metrics=metrics, + ) ) - ) + + if self.user_config.gpu_telemetry_mode == GPUTelemetryMode.REALTIME_DASHBOARD: + telemetry_metrics = await self._generate_realtime_telemetry_metrics() + if telemetry_metrics: + await self.publish( + RealtimeTelemetryMetricsMessage( + service_id=self.service_id, + metrics=telemetry_metrics, + ) + ) async def _generate_realtime_metrics(self) -> list[MetricResult]: """Generate the real-time metrics for the profile run.""" @@ -461,7 +536,10 @@ async def _generate_realtime_metrics(self) -> list[MetricResult]: ], return_exceptions=True, ) - return [ + + # Flatten results: each processor returns list[MetricResult], so we have + # list[list[MetricResult] | Exception]. Flatten to single list[MetricResult]. + metric_results = [ res for result in results if isinstance(result, list) @@ -469,6 +547,29 @@ async def _generate_realtime_metrics(self) -> list[MetricResult]: if isinstance(res, MetricResult) ] + return metric_results + + async def _generate_realtime_telemetry_metrics(self) -> list[MetricResult]: + """Generate the real-time GPU telemetry metrics.""" + telemetry_results = await asyncio.gather( + *[ + results_processor.summarize() + for results_processor in self._telemetry_results_processors + ], + return_exceptions=True, + ) + + # Flatten results: each processor returns list[MetricResult], so we have + # list[list[MetricResult] | Exception]. Flatten to single list[MetricResult]. + telemetry_metrics = [ + res + for result in telemetry_results + if isinstance(result, list) + for res in result + if isinstance(res, MetricResult) + ] + return telemetry_metrics + async def _process_results(self, cancelled: bool) -> ProcessRecordsResult: """Process the results.""" self.debug(lambda: f"Processing records (cancelled: {cancelled})") @@ -521,27 +622,33 @@ async def export_telemetry_independently(self) -> TelemetryResults | None: This method provides a separate export path for telemetry data that doesn't interfere with the inference results pipeline. + Retrieves the accumulated telemetry hierarchy from the TelemetryResultsProcessor, + which serves as the single source of truth for all telemetry data. + Returns: TelemetryResults if telemetry data was collected, None otherwise """ - async with self._telemetry_hierarchy_lock: - if not self._telemetry_hierarchy.dcgm_endpoints: - return None - - telemetry_results = TelemetryResults( - telemetry_data=self._telemetry_hierarchy, - start_ns=self.start_time_ns or time.time_ns(), - end_ns=self.end_time_ns or time.time_ns(), - endpoints_configured=list( - self._telemetry_hierarchy.dcgm_endpoints.keys() - ), - endpoints_successful=list( - self._telemetry_hierarchy.dcgm_endpoints.keys() - ), - error_summary=await self.get_telemetry_error_summary(), - ) + # Get hierarchy from the telemetry results processor (single source of truth) + if not self._telemetry_results_processors: + return None + + # Get hierarchy from first processor (typically only one telemetry processor) + processor = self._telemetry_results_processors[0] + telemetry_hierarchy = processor.get_telemetry_hierarchy() + + if not telemetry_hierarchy.dcgm_endpoints: + return None + + telemetry_results = TelemetryResults( + telemetry_data=telemetry_hierarchy, + start_ns=self.start_time_ns or time.time_ns(), + end_ns=self.end_time_ns or time.time_ns(), + endpoints_tested=list(telemetry_hierarchy.dcgm_endpoints.keys()), + endpoints_successful=list(telemetry_hierarchy.dcgm_endpoints.keys()), + error_summary=await self.get_telemetry_error_summary(), + ) - return telemetry_results + return telemetry_results async def _process_telemetry_results(self) -> ProcessTelemetryResult: """Process telemetry results by calling summarize on all telemetry processors. @@ -578,13 +685,8 @@ async def _process_telemetry_results(self) -> ProcessTelemetryResult: end_ns=self.end_time_ns or time.time_ns(), ) - async with self._telemetry_hierarchy_lock: - # Reset hierarchy once we've captured a snapshot for this result - self._telemetry_hierarchy = TelemetryHierarchy() - - async with self._telemetry_error_counts_lock: - unique_errors = list(self._telemetry_error_counts.keys()) - self._telemetry_error_counts.clear() + async with self._telemetry_state.error_counts_lock: + unique_errors = list(self._telemetry_state.error_counts.keys()) return ProcessTelemetryResult( results=telemetry_results, @@ -616,10 +718,10 @@ async def get_error_summary(self) -> list[ErrorDetailsCount]: async def get_telemetry_error_summary(self) -> list[ErrorDetailsCount]: """Generate a summary of the telemetry error records.""" - async with self._telemetry_error_counts_lock: + async with self._telemetry_state.error_counts_lock: return [ ErrorDetailsCount(error_details=error_details, count=count) - for error_details, count in self._telemetry_error_counts.items() + for error_details, count in self._telemetry_state.error_counts.items() ] diff --git a/src/aiperf/ui/__init__.py b/src/aiperf/ui/__init__.py index da56f4a5b..2b0c88f2c 100644 --- a/src/aiperf/ui/__init__.py +++ b/src/aiperf/ui/__init__.py @@ -17,6 +17,7 @@ WORKER_STATUS_STYLES, AIPerfDashboardUI, AIPerfTextualApp, + GPUMetricsTable, LogConsumer, MaximizableWidget, NonFocusableDataTable, @@ -24,7 +25,9 @@ ProgressHeader, RealtimeMetricsDashboard, RealtimeMetricsTable, + RealtimeTelemetryDashboard, RichLogViewer, + SingleNodeView, WorkerDashboard, WorkerStatusTable, ) @@ -46,6 +49,7 @@ "AIPerfDashboardUI", "AIPerfTextualApp", "BaseAIPerfUI", + "GPUMetricsTable", "LogConsumer", "MaximizableWidget", "NoUI", @@ -55,7 +59,9 @@ "ProgressHeader", "RealtimeMetricsDashboard", "RealtimeMetricsTable", + "RealtimeTelemetryDashboard", "RichLogViewer", + "SingleNodeView", "TQDMProgressUI", "WORKER_STATUS_STYLES", "WorkerDashboard", diff --git a/src/aiperf/ui/base_ui.py b/src/aiperf/ui/base_ui.py index 280be9e21..50414cc93 100644 --- a/src/aiperf/ui/base_ui.py +++ b/src/aiperf/ui/base_ui.py @@ -4,19 +4,27 @@ from aiperf.common.mixins import ( ProgressTrackerMixin, RealtimeMetricsMixin, + RealtimeTelemetryMetricsMixin, WorkerTrackerMixin, ) -class BaseAIPerfUI(ProgressTrackerMixin, WorkerTrackerMixin, RealtimeMetricsMixin): +class BaseAIPerfUI( + ProgressTrackerMixin, + WorkerTrackerMixin, + RealtimeMetricsMixin, + RealtimeTelemetryMetricsMixin, +): """Base class for AIPerf UI implementations. This class provides a simple starting point for a UI for AIPerf components. - It inherits from the :class:`ProgressTrackerMixin`, :class:`WorkerTrackerMixin`, and :class:`RealtimeMetricsMixin` + It inherits from the :class:`ProgressTrackerMixin`, :class:`WorkerTrackerMixin`, + :class:`RealtimeMetricsMixin`, and :class:`RealtimeTelemetryMetricsMixin` to provide a simple starting point for a UI for AIPerf components. - Now, you can use the various hooks defined in the :class:`ProgressTrackerMixin`, :class:`WorkerTrackerMixin`, and :class:`RealtimeMetricsMixin` - to create a UI for AIPerf components. + Now, you can use the various hooks defined in the :class:`ProgressTrackerMixin`, + :class:`WorkerTrackerMixin`, :class:`RealtimeMetricsMixin`, and + :class:`RealtimeTelemetryMetricsMixin` to create a UI for AIPerf components. Example: ```python diff --git a/src/aiperf/ui/dashboard/__init__.py b/src/aiperf/ui/dashboard/__init__.py index eeccf5c30..3bb95585a 100644 --- a/src/aiperf/ui/dashboard/__init__.py +++ b/src/aiperf/ui/dashboard/__init__.py @@ -31,6 +31,11 @@ RealtimeMetricsDashboard, RealtimeMetricsTable, ) +from aiperf.ui.dashboard.realtime_telemetry_dashboard import ( + GPUMetricsTable, + RealtimeTelemetryDashboard, + SingleNodeView, +) from aiperf.ui.dashboard.rich_log_viewer import ( LogConsumer, RichLogViewer, @@ -47,6 +52,7 @@ "AIPERF_THEME", "AIPerfDashboardUI", "AIPerfTextualApp", + "GPUMetricsTable", "LogConsumer", "MaximizableWidget", "NonFocusableDataTable", @@ -54,7 +60,9 @@ "ProgressHeader", "RealtimeMetricsDashboard", "RealtimeMetricsTable", + "RealtimeTelemetryDashboard", "RichLogViewer", + "SingleNodeView", "WORKER_STATUS_STYLES", "WorkerDashboard", "WorkerStatusTable", diff --git a/src/aiperf/ui/dashboard/aiperf_dashboard_ui.py b/src/aiperf/ui/dashboard/aiperf_dashboard_ui.py index 5d2358ebf..a89718f0d 100644 --- a/src/aiperf/ui/dashboard/aiperf_dashboard_ui.py +++ b/src/aiperf/ui/dashboard/aiperf_dashboard_ui.py @@ -70,6 +70,10 @@ def __init__( AIPerfHook.ON_WORKER_STATUS_SUMMARY, self.app.on_worker_status_summary ) self.attach_hook(AIPerfHook.ON_REALTIME_METRICS, self.app.on_realtime_metrics) + self.attach_hook( + AIPerfHook.ON_REALTIME_TELEMETRY_METRICS, + self.app.on_realtime_telemetry_metrics, + ) @on_start async def _run_app(self) -> None: diff --git a/src/aiperf/ui/dashboard/aiperf_textual_app.py b/src/aiperf/ui/dashboard/aiperf_textual_app.py index e99cc4389..b4cafbcc2 100644 --- a/src/aiperf/ui/dashboard/aiperf_textual_app.py +++ b/src/aiperf/ui/dashboard/aiperf_textual_app.py @@ -12,14 +12,16 @@ from textual.widgets import Footer from aiperf.common.config.service_config import ServiceConfig -from aiperf.common.enums import WorkerStatus +from aiperf.common.enums import GPUTelemetryMode, WorkerStatus from aiperf.common.environment import Environment +from aiperf.common.messages import StartRealtimeTelemetryCommand from aiperf.common.models import MetricResult, RecordsStats, RequestsStats, WorkerStats from aiperf.controller.system_controller import SystemController from aiperf.ui.dashboard.aiperf_theme import AIPERF_THEME from aiperf.ui.dashboard.progress_dashboard import ProgressDashboard from aiperf.ui.dashboard.progress_header import ProgressHeader from aiperf.ui.dashboard.realtime_metrics_dashboard import RealtimeMetricsDashboard +from aiperf.ui.dashboard.realtime_telemetry_dashboard import RealtimeTelemetryDashboard from aiperf.ui.dashboard.rich_log_viewer import RichLogViewer from aiperf.ui.dashboard.worker_dashboard import WorkerDashboard @@ -53,6 +55,10 @@ class AIPerfTextualApp(App): #workers-section { height: 3; } + #telemetry-section { + height: 3fr; + min-height: 14; + } #progress-section { width: 1fr; } @@ -70,7 +76,8 @@ class AIPerfTextualApp(App): ("2", "toggle_maximize('progress')", "Progress"), ("3", "toggle_maximize('metrics')", "Metrics"), ("4", "toggle_maximize('workers')", "Workers"), - ("5", "toggle_maximize('logs')", "Logs"), + ("5", "toggle_maximize_telemetry", "GPU Telemetry"), + ("6", "toggle_maximize('logs')", "Logs"), ("escape", "restore_all_panels", "Restore View"), Binding("ctrl+s", "screenshot", "Save Screenshot", show=False), Binding("l", "toggle_hide_log_viewer", "Toggle Logs", show=False), @@ -90,6 +97,7 @@ def __init__( self.progress_header: ProgressHeader | None = None self.worker_dashboard: WorkerDashboard | None = None self.realtime_metrics_dashboard: RealtimeMetricsDashboard | None = None + self.realtime_telemetry_dashboard: RealtimeTelemetryDashboard | None = None self.profile_results: list[RenderableType] = [] self.service_config = service_config self.controller: SystemController = controller @@ -137,6 +145,12 @@ def compose(self) -> ComposeResult: self.worker_dashboard = WorkerDashboard(id="workers") yield self.worker_dashboard + with Container(id="telemetry-section", classes="hidden"): + self.realtime_telemetry_dashboard = RealtimeTelemetryDashboard( + service_config=self.service_config, id="telemetry" + ) + yield self.realtime_telemetry_dashboard + with Container(id="logs-section"): self.log_viewer = RichLogViewer(id="logs") yield self.log_viewer @@ -180,6 +194,28 @@ async def action_toggle_maximize(self, panel_id: str) -> None: else: self.screen.maximize(panel) + async def action_toggle_maximize_telemetry(self) -> None: + """Toggle the maximize state of the telemetry panel and enable realtime GPU telemetry if needed.""" + if ( + self.controller.user_config.gpu_telemetry_mode + != GPUTelemetryMode.REALTIME_DASHBOARD + ): + self.controller.user_config.gpu_telemetry_mode = ( + GPUTelemetryMode.REALTIME_DASHBOARD + ) + if self.realtime_telemetry_dashboard: + self.realtime_telemetry_dashboard.set_status_message( + "Enabling live GPU telemetry..." + ) + + await self.controller.publish( + StartRealtimeTelemetryCommand( + service_id=self.controller.service_id, + ) + ) + + await self.action_toggle_maximize("telemetry") + async def on_warmup_progress(self, warmup_stats: RequestsStats) -> None: """Forward warmup progress updates to the Textual App.""" if not self._has_result_data: @@ -245,3 +281,9 @@ async def on_realtime_metrics(self, metrics: list[MetricResult]) -> None: if self.realtime_metrics_dashboard: async with self.realtime_metrics_dashboard.batch(): self.realtime_metrics_dashboard.on_realtime_metrics(metrics) + + async def on_realtime_telemetry_metrics(self, metrics: list[MetricResult]) -> None: + """Forward real-time GPU telemetry metrics updates to the Textual App.""" + if self.realtime_telemetry_dashboard: + async with self.realtime_telemetry_dashboard.batch(): + self.realtime_telemetry_dashboard.on_realtime_telemetry_metrics(metrics) diff --git a/src/aiperf/ui/dashboard/realtime_telemetry_dashboard.py b/src/aiperf/ui/dashboard/realtime_telemetry_dashboard.py new file mode 100644 index 000000000..df9de5a35 --- /dev/null +++ b/src/aiperf/ui/dashboard/realtime_telemetry_dashboard.py @@ -0,0 +1,304 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from contextlib import suppress + +from rich.text import Text +from textual.app import ComposeResult +from textual.containers import Container, VerticalScroll +from textual.widget import Widget +from textual.widgets import Static +from textual.widgets.data_table import ColumnKey, RowDoesNotExist, RowKey + +from aiperf.common.aiperf_logger import AIPerfLogger +from aiperf.common.config.service_config import ServiceConfig +from aiperf.common.models.record_models import MetricResult +from aiperf.ui.dashboard.custom_widgets import MaximizableWidget, NonFocusableDataTable + +_logger = AIPerfLogger(__name__) + + +class GPUMetricsTable(Widget): + """Display metrics table for a single GPU.""" + + DEFAULT_CSS = """ + GPUMetricsTable { + height: auto; + margin: 0 0 1 0; + } + GPUMetricsTable Static { + background: $boost; + padding: 0 1; + margin: 0 0 0 0; + text-style: bold; + } + NonFocusableDataTable { + height: auto; + max-height: 20; + } + """ + + STATS_FIELDS = ["current", "avg", "min", "max", "p99", "p90", "p50", "std"] + COLUMNS = ["Metric", *STATS_FIELDS] + + def __init__( + self, endpoint: str, gpu_uuid: str, gpu_index: int, model_name: str, **kwargs + ): + super().__init__(**kwargs) + self.endpoint = endpoint + self.gpu_uuid = gpu_uuid + self.gpu_index = gpu_index + self.model_name = model_name + self.data_table: NonFocusableDataTable | None = None + self._columns_initialized = False + self._column_keys: dict[str, ColumnKey] = {} + self._metric_row_keys: dict[str, RowKey] = {} + + def compose(self) -> ComposeResult: + """Compose GPU metrics table with header and data table.""" + yield Static(f"{self.endpoint} | GPU {self.gpu_index} | {self.model_name}") + self.data_table = NonFocusableDataTable( + cursor_type="row", show_cursor=False, zebra_stripes=True + ) + yield self.data_table + + def on_mount(self) -> None: + """Initialize table columns on mount.""" + if self.data_table and not self._columns_initialized: + self._initialize_columns() + + def _initialize_columns(self) -> None: + """Initialize table columns.""" + for i, col in enumerate(self.COLUMNS): + if i == 0: + self._column_keys[col] = self.data_table.add_column( # type: ignore + Text(col, justify="left") + ) + else: + self._column_keys[col] = self.data_table.add_column( # type: ignore + Text(col, justify="right") + ) + self._columns_initialized = True + + def update(self, metrics: list[MetricResult]) -> None: + """Update table with metrics for this GPU.""" + if not self.data_table or not self.data_table.is_mounted: + return + + if not self._columns_initialized: + self._initialize_columns() + + for metric in sorted(metrics, key=lambda m: m.header): + row_cells = self._format_metric_row(metric) + if metric.tag in self._metric_row_keys: + row_key = self._metric_row_keys[metric.tag] + try: + _ = self.data_table.get_row_index(row_key) + self._update_single_row(row_cells, row_key) + continue + except RowDoesNotExist: + _logger.warning(f"Row key {row_key} no longer exists, re-adding") + + row_key = self.data_table.add_row(*row_cells) + self._metric_row_keys[metric.tag] = row_key + + if self.data_table: + self.data_table.refresh() + + def _update_single_row(self, row_cells: list[Text], row_key: RowKey) -> None: + """Update a single row's cells.""" + for col_name, cell_value in zip(self.COLUMNS, row_cells, strict=True): + try: + self.data_table.update_cell( # type: ignore + row_key, self._column_keys[col_name], cell_value, update_width=True + ) + except Exception as e: + _logger.warning( + f"Error updating cell {col_name} with value {cell_value}: {e!r}" + ) + + def _format_metric_row(self, metric: MetricResult) -> list[Text]: + """Format metric data into table row cells. + + Header format: "GPU Power Usage | localhost:9401 | GPU 0 | Model" + We only want the metric name (first part before |). + """ + metric_name = ( + metric.header.split(" | ")[0] if " | " in metric.header else metric.header + ) + + return [ + Text(metric_name, style="bold cyan", justify="left"), + *[ + self._format_value(getattr(metric, field)) + for field in self.STATS_FIELDS + ], + ] + + def _format_value(self, value) -> Text: + """Format a metric value for display (matches console exporter format).""" + if value is None: + return Text("N/A", justify="right", style="dim") + + if not isinstance(value, int | float): + return Text(str(value), justify="right", style="green") + + value_str = f"{value:.2e}" if abs(value) >= 1000000 else f"{value:,.2f}" + + return Text(value_str, justify="right", style="green") + + +class SingleNodeView(VerticalScroll): + """Display all GPUs for a single node directly.""" + + DEFAULT_CSS = """ + SingleNodeView { + height: 100%; + padding: 1 2; + } + SingleNodeView > Static { + background: $panel; + padding: 0 1; + margin: 0 0 1 0; + text-style: bold; + } + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.gpu_tables: dict[str, GPUMetricsTable] = {} + + def compose(self) -> ComposeResult: + """Compose single node view (no initial widgets - GPU tables added dynamically).""" + yield from () + + def update(self, metrics: list[MetricResult]) -> None: + """Update display with all GPUs from all nodes.""" + + gpus = self._group_metrics_by_gpu(metrics) + + for gpu_key, gpu_metrics in gpus.items(): + if gpu_key not in self.gpu_tables: + if not self.is_mounted: + continue + + endpoint, gpu_index, gpu_uuid, model_name = self._extract_gpu_info( + gpu_metrics[0] + ) + gpu_table = GPUMetricsTable(endpoint, gpu_uuid, gpu_index, model_name) + self.gpu_tables[gpu_key] = gpu_table + self.mount(gpu_table) + + self.gpu_tables[gpu_key].update(gpu_metrics) + + def _group_metrics_by_gpu( + self, metrics: list[MetricResult] + ) -> dict[str, list[MetricResult]]: + """Group metrics by GPU.""" + gpus = {} + for metric in metrics: + gpu_key = self._extract_gpu_key_from_tag(metric.tag) + gpus.setdefault(gpu_key, []).append(metric) + return gpus + + def _extract_gpu_key_from_tag(self, tag: str) -> str: + """Extract GPU identifier from metric tag including endpoint info. + + Tag format: metric_name_dcgm_http___localhost_9400_metrics_gpu0_uuid + Returns: dcgm_http___localhost_9400_metrics_0_uuid + """ + if "_dcgm_" not in tag or "_gpu" not in tag: + return "unknown" + + dcgm_and_gpu = tag.split("_dcgm_")[1] + return dcgm_and_gpu.replace("_gpu", "_") + + def _extract_gpu_info(self, metric: MetricResult) -> tuple[str, int, str, str]: + """Extract endpoint, GPU index, UUID, and model name from metric. + + Header format: "GPU Power Usage | localhost:9401 | GPU 0 | NVIDIA RTX 6000..." + Tag format: metric_name_dcgm_url_gpu0_uuid + """ + parts = metric.header.split(" | ") + if len(parts) >= 4: + endpoint = parts[1] + gpu_text = parts[2] + model_name = parts[3] + gpu_index = int(gpu_text.split()[1]) if len(gpu_text.split()) > 1 else 0 + else: + endpoint = "unknown" + gpu_index = 0 + model_name = "GPU" + + tag_parts = metric.tag.split("_") + gpu_uuid = tag_parts[-1] if tag_parts else "unknown" + + return endpoint, gpu_index, gpu_uuid, model_name + + +class RealtimeTelemetryDashboard(Container, MaximizableWidget): + """Main telemetry dashboard - auto-switches between single/multi node.""" + + DEFAULT_CSS = """ + RealtimeTelemetryDashboard { + border: round $primary; + border-title-color: $primary; + border-title-style: bold; + border-title-align: center; + height: 1fr; + layout: vertical; + } + #all-nodes-view { + height: 100%; + } + .hidden { + display: none; + } + #telemetry-status { + height: 100%; + width: 100%; + color: $warning; + text-style: italic; + content-align: center middle; + } + """ + + def __init__(self, service_config: ServiceConfig, **kwargs): + super().__init__(**kwargs) + self.service_config = service_config + self.all_nodes_view: SingleNodeView | None = None + self.metrics: list[MetricResult] = [] + self.border_title = "Real-Time GPU Telemetry" + + def compose(self) -> ComposeResult: + """Compose the dashboard.""" + yield Static( + "No telemetry data available yet. Please wait...", + id="telemetry-status", + ) + + self.all_nodes_view = SingleNodeView(id="all-nodes-view", classes="hidden") + yield self.all_nodes_view + + def set_status_message(self, message: str) -> None: + """Update the status message text. + + Args: + message: The new status message to display. + """ + with suppress(Exception): + status_widget = self.query_one("#telemetry-status") + status_widget.update(message) + status_widget.remove_class("hidden") + self.all_nodes_view.add_class("hidden") + + def on_realtime_telemetry_metrics(self, metrics: list[MetricResult]) -> None: + """Handle GPU telemetry metrics updates.""" + + if not self.metrics: + with suppress(Exception): + self.query_one("#telemetry-status").add_class("hidden") + self.all_nodes_view.remove_class("hidden") + + self.metrics = metrics + self.all_nodes_view.update(metrics) diff --git a/tests/config/test_user_config.py b/tests/config/test_user_config.py index 08bc26c91..af37dbef3 100644 --- a/tests/config/test_user_config.py +++ b/tests/config/test_user_config.py @@ -17,7 +17,7 @@ TurnDelayConfig, UserConfig, ) -from aiperf.common.enums import EndpointType +from aiperf.common.enums import EndpointType, GPUTelemetryMode from aiperf.common.enums.dataset_enums import CustomDatasetType from aiperf.common.enums.timing_enums import TimingMode @@ -279,3 +279,191 @@ def test_compute_artifact_directory( artifact_dir = config._compute_artifact_directory() assert artifact_dir == Path(expected_dir) + + +@pytest.mark.parametrize( + "gpu_telemetry_input,expected_mode,expected_urls", + [ + # No telemetry configured + ([], GPUTelemetryMode.SUMMARY, []), + # Dashboard mode only + (["dashboard"], GPUTelemetryMode.REALTIME_DASHBOARD, []), + # URLs only (no dashboard) + ( + ["http://node1:9401/metrics"], + GPUTelemetryMode.SUMMARY, + ["http://node1:9401/metrics"], + ), + # Dashboard + URLs + ( + ["dashboard", "http://node1:9401/metrics"], + GPUTelemetryMode.REALTIME_DASHBOARD, + ["http://node1:9401/metrics"], + ), + # Multiple URLs + ( + ["http://node1:9401/metrics", "http://node2:9401/metrics"], + GPUTelemetryMode.SUMMARY, + ["http://node1:9401/metrics", "http://node2:9401/metrics"], + ), + # Dashboard + multiple URLs + ( + [ + "dashboard", + "http://node1:9401/metrics", + "http://node2:9401/metrics", + ], + GPUTelemetryMode.REALTIME_DASHBOARD, + ["http://node1:9401/metrics", "http://node2:9401/metrics"], + ), + ], +) +def test_parse_gpu_telemetry_config(gpu_telemetry_input, expected_mode, expected_urls): + """Test parsing of gpu_telemetry list into mode and URLs.""" + config = UserConfig( + endpoint=EndpointConfig( + model_names=["test-model"], + type=EndpointType.CHAT, + custom_endpoint="test", + ), + gpu_telemetry=gpu_telemetry_input, + ) + + assert config.gpu_telemetry_mode == expected_mode + assert config.gpu_telemetry_urls == expected_urls + + +def test_parse_gpu_telemetry_config_with_defaults(): + """Test that gpu_telemetry_mode and gpu_telemetry_urls have correct defaults.""" + config = UserConfig( + endpoint=EndpointConfig( + model_names=["test-model"], + type=EndpointType.CHAT, + custom_endpoint="test", + ) + ) + + # Should have default values + assert config.gpu_telemetry_mode == GPUTelemetryMode.SUMMARY + assert config.gpu_telemetry_urls == [] + + +def test_parse_gpu_telemetry_config_preserves_existing_fields(): + """Test that parsing GPU telemetry config doesn't affect other fields.""" + config = UserConfig( + endpoint=EndpointConfig( + model_names=["test-model"], + type=EndpointType.CHAT, + custom_endpoint="test", + streaming=True, + ), + gpu_telemetry=["dashboard", "http://custom:9401/metrics"], + ) + + # Telemetry fields should be set + assert config.gpu_telemetry_mode == GPUTelemetryMode.REALTIME_DASHBOARD + assert config.gpu_telemetry_urls == ["http://custom:9401/metrics"] + + # Other fields should be unchanged + assert config.endpoint.streaming is True + assert config.endpoint.model_names == ["test-model"] + + +def test_gpu_telemetry_urls_extraction(): + """Test that only http URLs are extracted from gpu_telemetry list.""" + config = UserConfig( + endpoint=EndpointConfig( + model_names=["test-model"], + type=EndpointType.CHAT, + custom_endpoint="test", + ), + gpu_telemetry=[ + "dashboard", # Not a URL + "http://node1:9401/metrics", # Valid URL + "https://node2:9401/metrics", # Valid URL + "summary", # Not a URL + ], + ) + + # Should extract only http/https URLs + assert len(config.gpu_telemetry_urls) == 2 + assert "http://node1:9401/metrics" in config.gpu_telemetry_urls + assert "https://node2:9401/metrics" in config.gpu_telemetry_urls + assert "dashboard" not in config.gpu_telemetry_urls + assert "summary" not in config.gpu_telemetry_urls + + +def test_gpu_telemetry_mode_detection(): + """Test that dashboard mode is detected correctly in various positions.""" + # Dashboard at beginning + config1 = UserConfig( + endpoint=EndpointConfig( + model_names=["test-model"], + type=EndpointType.CHAT, + custom_endpoint="test", + ), + gpu_telemetry=["dashboard", "http://node1:9401/metrics"], + ) + assert config1.gpu_telemetry_mode == GPUTelemetryMode.REALTIME_DASHBOARD + + # Dashboard at end + config2 = UserConfig( + endpoint=EndpointConfig( + model_names=["test-model"], + type=EndpointType.CHAT, + custom_endpoint="test", + ), + gpu_telemetry=["http://node1:9401/metrics", "dashboard"], + ) + assert config2.gpu_telemetry_mode == GPUTelemetryMode.REALTIME_DASHBOARD + + # No dashboard + config3 = UserConfig( + endpoint=EndpointConfig( + model_names=["test-model"], + type=EndpointType.CHAT, + custom_endpoint="test", + ), + gpu_telemetry=["http://node1:9401/metrics"], + ) + assert config3.gpu_telemetry_mode == GPUTelemetryMode.SUMMARY + + +def test_gpu_telemetry_url_normalization(): + """Test that URLs without http:// prefix are normalized correctly.""" + config = UserConfig( + endpoint=EndpointConfig( + model_names=["test-model"], + type=EndpointType.CHAT, + custom_endpoint="test", + ), + gpu_telemetry=[ + "localhost:9400", + "node1:9401/metrics", + "http://node2:9400", + "https://node3:9401/metrics", + ], + ) + + assert len(config.gpu_telemetry_urls) == 4 + assert "http://localhost:9400" in config.gpu_telemetry_urls + assert "http://node1:9401/metrics" in config.gpu_telemetry_urls + assert "http://node2:9400" in config.gpu_telemetry_urls + assert "https://node3:9401/metrics" in config.gpu_telemetry_urls + + +def test_gpu_telemetry_mixed_formats(): + """Test that mixed URL formats (with and without http://) work correctly.""" + config = UserConfig( + endpoint=EndpointConfig( + model_names=["test-model"], + type=EndpointType.CHAT, + custom_endpoint="test", + ), + gpu_telemetry=["dashboard", "localhost:9400", "http://node1:9401"], + ) + + assert config.gpu_telemetry_mode == GPUTelemetryMode.REALTIME_DASHBOARD + assert len(config.gpu_telemetry_urls) == 2 + assert "http://localhost:9400" in config.gpu_telemetry_urls + assert "http://node1:9401" in config.gpu_telemetry_urls diff --git a/tests/gpu_telemetry/conftest.py b/tests/gpu_telemetry/conftest.py index c57799afa..0ce6e0528 100644 --- a/tests/gpu_telemetry/conftest.py +++ b/tests/gpu_telemetry/conftest.py @@ -7,114 +7,35 @@ import pytest from aiperf.common.models.telemetry_models import TelemetryMetrics, TelemetryRecord +from tests.aiperf_mock_server.dcgm_faker import DCGMFaker @pytest.fixture def sample_dcgm_data(): - """Sample DCGM metrics data in Prometheus format (single GPU).""" - - return """# HELP DCGM_FI_DEV_SM_CLOCK SM clock frequency (in MHz) -# TYPE DCGM_FI_DEV_SM_CLOCK gauge -DCGM_FI_DEV_SM_CLOCK{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 210 -# HELP DCGM_FI_DEV_MEM_CLOCK Memory clock frequency (in MHz) -# TYPE DCGM_FI_DEV_MEM_CLOCK gauge -DCGM_FI_DEV_MEM_CLOCK{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 405 -# HELP DCGM_FI_DEV_POWER_USAGE Power draw (in W) -# TYPE DCGM_FI_DEV_POWER_USAGE gauge -DCGM_FI_DEV_POWER_USAGE{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 22.582000 -# HELP DCGM_FI_DEV_POWER_MGMT_LIMIT Power management limit (in W) -# TYPE DCGM_FI_DEV_POWER_MGMT_LIMIT gauge -DCGM_FI_DEV_POWER_MGMT_LIMIT{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 300.0 -# HELP DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION Total energy consumption since boot (in mJ) -# TYPE DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION counter -DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 955287014 -# HELP DCGM_FI_DEV_GPU_UTIL GPU utilization (in %) -# TYPE DCGM_FI_DEV_GPU_UTIL gauge -DCGM_FI_DEV_GPU_UTIL{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 1 -# HELP DCGM_FI_DEV_FB_USED Framebuffer memory used (in MiB) -# TYPE DCGM_FI_DEV_FB_USED gauge -DCGM_FI_DEV_FB_USED{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 46614 -# HELP DCGM_FI_DEV_FB_FREE Framebuffer memory free (in MiB) -# TYPE DCGM_FI_DEV_FB_FREE gauge -DCGM_FI_DEV_FB_FREE{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 2048 -# HELP DCGM_FI_DEV_FB_TOTAL Total framebuffer memory (in MiB) -# TYPE DCGM_FI_DEV_FB_TOTAL gauge -DCGM_FI_DEV_FB_TOTAL{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 48662 -# HELP DCGM_FI_DEV_MEM_COPY_UTIL Memory copy utilization (in %) -# TYPE DCGM_FI_DEV_MEM_COPY_UTIL gauge -DCGM_FI_DEV_MEM_COPY_UTIL{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 15 -# HELP DCGM_FI_DEV_XID_ERRORS Value of the last XID error encountered -# TYPE DCGM_FI_DEV_XID_ERRORS gauge -DCGM_FI_DEV_XID_ERRORS{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 0 -# HELP DCGM_FI_DEV_POWER_VIOLATION Throttling duration due to power constraints (in us) -# TYPE DCGM_FI_DEV_POWER_VIOLATION counter -DCGM_FI_DEV_POWER_VIOLATION{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 12000 -# HELP DCGM_FI_DEV_THERMAL_VIOLATION Throttling duration due to thermal constraints (in us) -# TYPE DCGM_FI_DEV_THERMAL_VIOLATION counter -DCGM_FI_DEV_THERMAL_VIOLATION{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 5000 -""" + """Sample DCGM metrics from DCGMFaker (single GPU).""" + + faker = DCGMFaker( + gpu_name="rtx6000", + num_gpus=1, + seed=42, + hostname="ed7e7a5e585f", + initial_load=0.1, + ) + return faker.generate() @pytest.fixture def multi_gpu_dcgm_data(): - """Sample DCGM metrics data with multiple GPUs.""" - - return """# HELP DCGM_FI_DEV_POWER_USAGE Power draw (in W) -# TYPE DCGM_FI_DEV_POWER_USAGE gauge -DCGM_FI_DEV_POWER_USAGE{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 79.60 -DCGM_FI_DEV_POWER_USAGE{gpu="1",UUID="GPU-12345678-1234-1234-1234-123456789abc",pci_bus_id="00000000:03:00.0",device="nvidia1",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 42.09 -DCGM_FI_DEV_POWER_USAGE{gpu="2",UUID="GPU-87654321-4321-4321-4321-cba987654321",pci_bus_id="00000000:04:00.0",device="nvidia2",modelName="NVIDIA H100 PCIe",Hostname="ed7e7a5e585f"} 43.99 -# HELP DCGM_FI_DEV_POWER_MGMT_LIMIT Power management limit (in W) -# TYPE DCGM_FI_DEV_POWER_MGMT_LIMIT gauge -DCGM_FI_DEV_POWER_MGMT_LIMIT{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 300.0 -DCGM_FI_DEV_POWER_MGMT_LIMIT{gpu="1",UUID="GPU-12345678-1234-1234-1234-123456789abc",pci_bus_id="00000000:03:00.0",device="nvidia1",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 300.0 -DCGM_FI_DEV_POWER_MGMT_LIMIT{gpu="2",UUID="GPU-87654321-4321-4321-4321-cba987654321",pci_bus_id="00000000:04:00.0",device="nvidia2",modelName="NVIDIA H100 PCIe",Hostname="ed7e7a5e585f"} 700.0 -# HELP DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION Total energy consumption since boot (in mJ) -# TYPE DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION counter -DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 280000000 -DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION{gpu="1",UUID="GPU-12345678-1234-1234-1234-123456789abc",pci_bus_id="00000000:03:00.0",device="nvidia1",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 230000000 -DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION{gpu="2",UUID="GPU-87654321-4321-4321-4321-cba987654321",pci_bus_id="00000000:04:00.0",device="nvidia2",modelName="NVIDIA H100 PCIe",Hostname="ed7e7a5e585f"} 250000000 -# HELP DCGM_FI_DEV_GPU_UTIL GPU utilization (in %) -# TYPE DCGM_FI_DEV_GPU_UTIL gauge -DCGM_FI_DEV_GPU_UTIL{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 34 -DCGM_FI_DEV_GPU_UTIL{gpu="1",UUID="GPU-12345678-1234-1234-1234-123456789abc",pci_bus_id="00000000:03:00.0",device="nvidia1",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 0 -DCGM_FI_DEV_GPU_UTIL{gpu="2",UUID="GPU-87654321-4321-4321-4321-cba987654321",pci_bus_id="00000000:04:00.0",device="nvidia2",modelName="NVIDIA H100 PCIe",Hostname="ed7e7a5e585f"} 0 -# HELP DCGM_FI_DEV_FB_USED Framebuffer memory used (in MiB) -# TYPE DCGM_FI_DEV_FB_USED gauge -DCGM_FI_DEV_FB_USED{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 15640 -DCGM_FI_DEV_FB_USED{gpu="1",UUID="GPU-12345678-1234-1234-1234-123456789abc",pci_bus_id="00000000:03:00.0",device="nvidia1",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 0 -DCGM_FI_DEV_FB_USED{gpu="2",UUID="GPU-87654321-4321-4321-4321-cba987654321",pci_bus_id="00000000:04:00.0",device="nvidia2",modelName="NVIDIA H100 PCIe",Hostname="ed7e7a5e585f"} 0 -# HELP DCGM_FI_DEV_FB_FREE Framebuffer memory free (in MiB) -# TYPE DCGM_FI_DEV_FB_FREE gauge -DCGM_FI_DEV_FB_FREE{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 33022 -DCGM_FI_DEV_FB_FREE{gpu="1",UUID="GPU-12345678-1234-1234-1234-123456789abc",pci_bus_id="00000000:03:00.0",device="nvidia1",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 48662 -DCGM_FI_DEV_FB_FREE{gpu="2",UUID="GPU-87654321-4321-4321-4321-cba987654321",pci_bus_id="00000000:04:00.0",device="nvidia2",modelName="NVIDIA H100 PCIe",Hostname="ed7e7a5e585f"} 81920 -# HELP DCGM_FI_DEV_FB_TOTAL Total framebuffer memory (in MiB) -# TYPE DCGM_FI_DEV_FB_TOTAL gauge -DCGM_FI_DEV_FB_TOTAL{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 48662 -DCGM_FI_DEV_FB_TOTAL{gpu="1",UUID="GPU-12345678-1234-1234-1234-123456789abc",pci_bus_id="00000000:03:00.0",device="nvidia1",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 48662 -DCGM_FI_DEV_FB_TOTAL{gpu="2",UUID="GPU-87654321-4321-4321-4321-cba987654321",pci_bus_id="00000000:04:00.0",device="nvidia2",modelName="NVIDIA H100 PCIe",Hostname="ed7e7a5e585f"} 81920 -# HELP DCGM_FI_DEV_MEM_COPY_UTIL Memory copy utilization (in %) -# TYPE DCGM_FI_DEV_MEM_COPY_UTIL gauge -DCGM_FI_DEV_MEM_COPY_UTIL{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 20 -DCGM_FI_DEV_MEM_COPY_UTIL{gpu="1",UUID="GPU-12345678-1234-1234-1234-123456789abc",pci_bus_id="00000000:03:00.0",device="nvidia1",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 0 -DCGM_FI_DEV_MEM_COPY_UTIL{gpu="2",UUID="GPU-87654321-4321-4321-4321-cba987654321",pci_bus_id="00000000:04:00.0",device="nvidia2",modelName="NVIDIA H100 PCIe",Hostname="ed7e7a5e585f"} 10 -# HELP DCGM_FI_DEV_XID_ERRORS Value of the last XID error encountered -# TYPE DCGM_FI_DEV_XID_ERRORS gauge -DCGM_FI_DEV_XID_ERRORS{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 0 -DCGM_FI_DEV_XID_ERRORS{gpu="1",UUID="GPU-12345678-1234-1234-1234-123456789abc",pci_bus_id="00000000:03:00.0",device="nvidia1",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 0 -DCGM_FI_DEV_XID_ERRORS{gpu="2",UUID="GPU-87654321-4321-4321-4321-cba987654321",pci_bus_id="00000000:04:00.0",device="nvidia2",modelName="NVIDIA H100 PCIe",Hostname="ed7e7a5e585f"} 0 -# HELP DCGM_FI_DEV_POWER_VIOLATION Throttling duration due to power constraints (in us) -# TYPE DCGM_FI_DEV_POWER_VIOLATION counter -DCGM_FI_DEV_POWER_VIOLATION{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 15000 -DCGM_FI_DEV_POWER_VIOLATION{gpu="1",UUID="GPU-12345678-1234-1234-1234-123456789abc",pci_bus_id="00000000:03:00.0",device="nvidia1",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 0 -DCGM_FI_DEV_POWER_VIOLATION{gpu="2",UUID="GPU-87654321-4321-4321-4321-cba987654321",pci_bus_id="00000000:04:00.0",device="nvidia2",modelName="NVIDIA H100 PCIe",Hostname="ed7e7a5e585f"} 8000 -# HELP DCGM_FI_DEV_THERMAL_VIOLATION Throttling duration due to thermal constraints (in us) -# TYPE DCGM_FI_DEV_THERMAL_VIOLATION counter -DCGM_FI_DEV_THERMAL_VIOLATION{gpu="0",UUID="GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc",pci_bus_id="00000000:02:00.0",device="nvidia0",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 3000 -DCGM_FI_DEV_THERMAL_VIOLATION{gpu="1",UUID="GPU-12345678-1234-1234-1234-123456789abc",pci_bus_id="00000000:03:00.0",device="nvidia1",modelName="NVIDIA RTX 6000 Ada Generation",Hostname="ed7e7a5e585f"} 0 -DCGM_FI_DEV_THERMAL_VIOLATION{gpu="2",UUID="GPU-87654321-4321-4321-4321-cba987654321",pci_bus_id="00000000:04:00.0",device="nvidia2",modelName="NVIDIA H100 PCIe",Hostname="ed7e7a5e585f"} 5000 -""" + """Multi-GPU DCGM metrics from DCGMFaker (3 GPUs, mixed types).""" + + faker = DCGMFaker( + gpu_name="rtx6000", + num_gpus=3, + seed=42, + hostname="ed7e7a5e585f", + initial_load=0.3, + ) + return faker.generate() @pytest.fixture diff --git a/tests/gpu_telemetry/test_telemetry_data_collector.py b/tests/gpu_telemetry/test_telemetry_data_collector.py index 062c40fc7..a350bf33f 100644 --- a/tests/gpu_telemetry/test_telemetry_data_collector.py +++ b/tests/gpu_telemetry/test_telemetry_data_collector.py @@ -85,14 +85,14 @@ def test_complete_parsing_single_gpu(self, sample_dcgm_data): assert record.dcgm_url == "http://localhost:9401/metrics" assert record.gpu_index == 0 assert record.gpu_model_name == "NVIDIA RTX 6000 Ada Generation" - assert record.gpu_uuid == "GPU-ef6ef310-f8e2-cef9-036e-8f12d59b5ffc" - assert record.telemetry_data.gpu_power_usage == 22.582000 + assert record.gpu_uuid.startswith("GPU-") + assert record.hostname == "ed7e7a5e585f" - # Test unit scaling applied correctly - assert ( - abs(record.telemetry_data.energy_consumption - 0.955287014) < 0.001 - ) # mJ to MJ - assert abs(record.telemetry_data.gpu_memory_used - 48.878) < 0.001 # MiB to GB + # Verify telemetry data has reasonable values from DCGMFaker + assert record.telemetry_data.gpu_power_usage is not None + assert 0 < record.telemetry_data.gpu_power_usage < 400 + assert record.telemetry_data.energy_consumption is not None + assert record.telemetry_data.gpu_memory_used is not None def test_complete_parsing_multi_gpu(self, multi_gpu_dcgm_data): """Test parsing complete DCGM response for multiple GPUs. @@ -111,9 +111,15 @@ def test_complete_parsing_multi_gpu(self, multi_gpu_dcgm_data): # Verify each GPU has correct metadata assert records[0].gpu_index == 0 assert records[0].gpu_model_name == "NVIDIA RTX 6000 Ada Generation" + assert records[0].gpu_uuid.startswith("GPU-") assert records[1].gpu_index == 1 + assert records[1].gpu_model_name == "NVIDIA RTX 6000 Ada Generation" assert records[2].gpu_index == 2 - assert records[2].gpu_model_name == "NVIDIA H100 PCIe" + assert records[2].gpu_model_name == "NVIDIA RTX 6000 Ada Generation" + + # Verify all GPUs have unique UUIDs + uuids = {r.gpu_uuid for r in records} + assert len(uuids) == 3 def test_empty_response_handling(self): """Test parsing logic with empty or comment-only DCGM responses. diff --git a/tests/gpu_telemetry/test_telemetry_manager.py b/tests/gpu_telemetry/test_telemetry_manager.py index 14904c80f..3480e51fa 100644 --- a/tests/gpu_telemetry/test_telemetry_manager.py +++ b/tests/gpu_telemetry/test_telemetry_manager.py @@ -8,7 +8,6 @@ from aiperf.common.config import UserConfig from aiperf.common.environment import Environment from aiperf.common.messages import ( - CommandAcknowledgedResponse, ProfileConfigureCommand, ProfileStartCommand, TelemetryRecordsMessage, @@ -48,6 +47,7 @@ def test_initialization_default_endpoint(self): """Test initialization with no user-provided endpoints uses defaults.""" mock_user_config = MagicMock(spec=UserConfig) mock_user_config.gpu_telemetry = None + mock_user_config.gpu_telemetry_urls = None manager = self._create_manager_with_mocked_base(mock_user_config) assert manager._dcgm_endpoints == list(Environment.GPU.DEFAULT_DCGM_ENDPOINTS) @@ -56,7 +56,8 @@ def test_initialization_custom_endpoints(self): """Test initialization with custom user-provided endpoints.""" mock_user_config = MagicMock(spec=UserConfig) custom_endpoint = "http://gpu-node-01:9401/metrics" - mock_user_config.gpu_telemetry = [custom_endpoint] + mock_user_config.gpu_telemetry = ["dashboard"] # User configured telemetry + mock_user_config.gpu_telemetry_urls = [custom_endpoint] manager = self._create_manager_with_mocked_base(mock_user_config) @@ -69,7 +70,8 @@ def test_initialization_custom_endpoints(self): def test_initialization_string_endpoint(self): """Test initialization converts single string endpoint to list and prepends defaults.""" mock_user_config = MagicMock(spec=UserConfig) - mock_user_config.gpu_telemetry = "http://single-node:9401/metrics" + mock_user_config.gpu_telemetry = ["dashboard"] # User configured telemetry + mock_user_config.gpu_telemetry_urls = "http://single-node:9401/metrics" manager = self._create_manager_with_mocked_base(mock_user_config) @@ -80,15 +82,16 @@ def test_initialization_string_endpoint(self): assert len(manager._dcgm_endpoints) == 3 def test_initialization_filters_invalid_urls(self): - """Test initialization filters out invalid URLs.""" + """Test initialization with only valid URLs (invalid ones filtered by user_config validator).""" mock_user_config = MagicMock(spec=UserConfig) - mock_user_config.gpu_telemetry = [ - "http://valid:9401/metrics", # Valid - "not-a-url", # Invalid - no scheme - "ftp://wrong-scheme:9401", # Invalid - wrong scheme - "http://another-valid:9401", # Valid - "", # Invalid - empty + # user_config validator would have already filtered out invalid URLs + # so telemetry_manager only receives valid ones + valid_urls = [ + "http://valid:9401/metrics", + "http://another-valid:9401/metrics", ] + mock_user_config.gpu_telemetry = ["dashboard"] # User configured telemetry + mock_user_config.gpu_telemetry_urls = valid_urls manager = self._create_manager_with_mocked_base(mock_user_config) @@ -102,11 +105,13 @@ def test_initialization_filters_invalid_urls(self): def test_initialization_deduplicates_endpoints(self): """Test initialization removes duplicate endpoints while preserving order.""" mock_user_config = MagicMock(spec=UserConfig) - mock_user_config.gpu_telemetry = [ + urls_with_duplicates = [ "http://node1:9401/metrics", "http://node2:9401/metrics", "http://node1:9401/metrics", # Duplicate ] + mock_user_config.gpu_telemetry = ["dashboard"] # User configured telemetry + mock_user_config.gpu_telemetry_urls = urls_with_duplicates manager = self._create_manager_with_mocked_base(mock_user_config) @@ -120,11 +125,13 @@ def test_initialization_deduplicates_endpoints(self): def test_user_provides_default_endpoint(self): """Test that explicitly providing a default endpoint doesn't duplicate it.""" mock_user_config = MagicMock(spec=UserConfig) - mock_user_config.gpu_telemetry = [ + urls = [ "http://localhost:9400/metrics", # This is a default "http://node1:9401/metrics", "http://localhost:9401/metrics", # This is also a default ] + mock_user_config.gpu_telemetry = ["dashboard"] # User configured telemetry + mock_user_config.gpu_telemetry_urls = urls manager = self._create_manager_with_mocked_base(mock_user_config) @@ -183,6 +190,7 @@ def _create_test_manager(self): manager = TelemetryManager.__new__(TelemetryManager) manager.service_id = "test_manager" manager._collectors = {} + manager._collector_id_to_url = {} manager._dcgm_endpoints = [] manager._user_provided_endpoints = [] manager._user_explicitly_configured_telemetry = False @@ -193,6 +201,7 @@ def _create_test_manager(self): async def test_on_telemetry_records_valid(self, sample_telemetry_records): """Test _on_telemetry_records with valid records.""" manager = self._create_test_manager() + manager._collector_id_to_url["test_collector"] = "http://localhost:9400/metrics" # Mock the push client mock_push_client = AsyncMock() @@ -207,6 +216,7 @@ async def test_on_telemetry_records_valid(self, sample_telemetry_records): assert isinstance(call_args, TelemetryRecordsMessage) assert call_args.service_id == "test_manager" assert call_args.collector_id == "test_collector" + assert call_args.dcgm_url == "http://localhost:9400/metrics" assert call_args.records == sample_telemetry_records assert call_args.error is None @@ -248,6 +258,7 @@ async def test_on_telemetry_records_exception_handling( async def test_on_telemetry_error(self): """Test _on_telemetry_error callback.""" manager = self._create_test_manager() + manager._collector_id_to_url["test_collector"] = "http://localhost:9400/metrics" # Mock the push client mock_push_client = AsyncMock() @@ -264,6 +275,7 @@ async def test_on_telemetry_error(self): assert isinstance(call_args, TelemetryRecordsMessage) assert call_args.service_id == "test_manager" assert call_args.collector_id == "test_collector" + assert call_args.dcgm_url == "http://localhost:9400/metrics" assert call_args.records == [] assert call_args.error == error_details @@ -295,6 +307,7 @@ def _create_test_manager(self): manager = TelemetryManager.__new__(TelemetryManager) manager.service_id = "test_manager" manager._collectors = {} + manager._collector_id_to_url = {} manager._dcgm_endpoints = [] manager._user_provided_endpoints = [] manager._user_explicitly_configured_telemetry = False @@ -397,15 +410,11 @@ def close_coroutine(coro): ) await manager._on_start_profiling(start_msg) - # Should have published acknowledgment AND disabled status - assert manager.publish.call_count == 2 - - # First call is acknowledgment - first_call = manager.publish.call_args_list[0][0][0] - assert isinstance(first_call, CommandAcknowledgedResponse) + # Should have published disabled status + assert manager.publish.call_count == 1 - # Second call is disabled status - second_call = manager.publish.call_args_list[1][0][0] + # Verify disabled status was published + second_call = manager.publish.call_args_list[0][0][0] assert isinstance(second_call, TelemetryStatusMessage) assert second_call.enabled is False assert second_call.reason == "all collectors failed to start" @@ -423,6 +432,7 @@ def _create_test_manager(self): manager = TelemetryManager.__new__(TelemetryManager) manager.service_id = "test_manager" manager._collectors = {} + manager._collector_id_to_url = {} manager._dcgm_endpoints = [] manager._user_provided_endpoints = [] manager._user_explicitly_configured_telemetry = False @@ -526,13 +536,12 @@ def _create_manager_with_mocked_base(self, user_config): return manager def test_invalid_endpoints_filtered_during_init(self): - """Test that empty string and invalid URLs are filtered during initialization.""" + """Test that only valid URLs reach telemetry_manager (invalid ones filtered by user_config validator).""" mock_user_config = MagicMock(spec=UserConfig) - mock_user_config.gpu_telemetry = [ - "", # Empty string - filtered - "/metrics", # No scheme - filtered - "http://valid:9401/metrics", # Valid - ] + # user_config validator would have already filtered out invalid URLs + # so telemetry_manager only receives valid ones + mock_user_config.gpu_telemetry = ["dashboard"] # User configured telemetry + mock_user_config.gpu_telemetry_urls = ["http://valid:9401/metrics"] manager = self._create_manager_with_mocked_base(mock_user_config) @@ -578,6 +587,7 @@ def test_both_defaults_included_when_no_user_config(self): """Test that both default endpoints (9400 and 9401) are included with no user config.""" mock_user_config = MagicMock(spec=UserConfig) mock_user_config.gpu_telemetry = None + mock_user_config.gpu_telemetry_urls = None manager = self._create_manager_with_mocked_base(mock_user_config) @@ -591,16 +601,19 @@ def test_user_explicitly_configured_telemetry_flag(self): # Test with None (not configured) mock_user_config = MagicMock(spec=UserConfig) mock_user_config.gpu_telemetry = None + mock_user_config.gpu_telemetry_urls = None manager = self._create_manager_with_mocked_base(mock_user_config) assert manager._user_explicitly_configured_telemetry is False # Test with value (configured) - mock_user_config.gpu_telemetry = "http://custom:9401" + mock_user_config.gpu_telemetry = ["dashboard"] # User configured telemetry + mock_user_config.gpu_telemetry_urls = ["http://custom:9401/metrics"] manager = self._create_manager_with_mocked_base(mock_user_config) assert manager._user_explicitly_configured_telemetry is True # Test with empty list (configured) - mock_user_config.gpu_telemetry = [] + mock_user_config.gpu_telemetry = [] # User explicitly passed --gpu-telemetry with no args + mock_user_config.gpu_telemetry_urls = [] manager = self._create_manager_with_mocked_base(mock_user_config) assert manager._user_explicitly_configured_telemetry is True @@ -613,6 +626,7 @@ def _create_test_manager(self): manager = TelemetryManager.__new__(TelemetryManager) manager.service_id = "test_manager" manager._collectors = {} + manager._collector_id_to_url = {} manager._dcgm_endpoints = list(Environment.GPU.DEFAULT_DCGM_ENDPOINTS) manager._user_provided_endpoints = [] manager._user_explicitly_configured_telemetry = False @@ -697,26 +711,6 @@ def _create_test_manager(self): manager.warning = MagicMock() return manager - @pytest.mark.asyncio - async def test_start_acknowledges_command_immediately(self): - """Test that start command is acknowledged at the beginning.""" - manager = self._create_test_manager() - manager.publish = AsyncMock() - - # Add a mock collector - mock_collector = AsyncMock(spec=TelemetryDataCollector) - manager._collectors["http://localhost:9400/metrics"] = mock_collector - - start_msg = ProfileStartCommand( - command_id="test", service_id="system_controller" - ) - await manager._on_start_profiling(start_msg) - - # Should have published command acknowledgment - manager.publish.assert_called_once() - call_args = manager.publish.call_args[0][0] - assert isinstance(call_args, CommandAcknowledgedResponse) - @pytest.mark.asyncio async def test_start_triggers_shutdown_when_no_collectors(self): """Test that start triggers shutdown when no collectors available.""" @@ -737,11 +731,6 @@ def close_coroutine(coro): ) await manager._on_start_profiling(start_msg) - # Should only have acknowledged (status already sent in configure phase) - assert manager.publish.call_count == 1 - call_args = manager.publish.call_args[0][0] - assert isinstance(call_args, CommandAcknowledgedResponse) - # Verify shutdown was scheduled mock_create_task.assert_called_once() assert hasattr(manager, "_shutdown_task") @@ -775,6 +764,7 @@ def _create_test_manager(self, user_requested, user_endpoints): manager = TelemetryManager.__new__(TelemetryManager) manager.service_id = "test_manager" manager._collectors = {} + manager._collector_id_to_url = {} manager._dcgm_endpoints = ( list(Environment.GPU.DEFAULT_DCGM_ENDPOINTS) + user_endpoints ) diff --git a/tests/mixins/test_realtime_telemetry_metrics_mixin.py b/tests/mixins/test_realtime_telemetry_metrics_mixin.py new file mode 100644 index 000000000..3ebdb56b8 --- /dev/null +++ b/tests/mixins/test_realtime_telemetry_metrics_mixin.py @@ -0,0 +1,196 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from aiperf.common.config import ServiceConfig +from aiperf.common.hooks import AIPerfHook +from aiperf.common.messages import RealtimeTelemetryMetricsMessage +from aiperf.common.mixins.realtime_telemetry_metrics_mixin import ( + RealtimeTelemetryMetricsMixin, +) +from aiperf.common.models import MetricResult + + +class TestRealtimeTelemetryMetricsMixin: + """Test suite for RealtimeTelemetryMetricsMixin functionality.""" + + @pytest.fixture + def mocked_mixin(self): + """Create a RealtimeTelemetryMetricsMixin instance with mocked dependencies.""" + service_config = ServiceConfig() + mock_controller = MagicMock() + + # Mock the MessageBusClientMixin.__init__ to avoid initialization issues + with patch( + "aiperf.common.mixins.message_bus_mixin.MessageBusClientMixin.__init__", + return_value=None, + ): + mixin = RealtimeTelemetryMetricsMixin( + service_config=service_config, controller=mock_controller + ) + # Manually set attributes that would be set by parent __init__ + mixin._controller = mock_controller + mixin._telemetry_metrics = [] + mixin.run_hooks = AsyncMock() + mixin.debug = MagicMock() + + return mixin + + def test_mixin_initialization(self, mocked_mixin): + """Test that mixin initializes with correct attributes.""" + assert hasattr(mocked_mixin, "_controller") + assert hasattr(mocked_mixin, "_telemetry_metrics") + assert hasattr(mocked_mixin, "_telemetry_metrics_lock") + assert mocked_mixin._telemetry_metrics == [] + + @pytest.mark.asyncio + async def test_on_realtime_telemetry_metrics_stores_metrics(self, mocked_mixin): + """Test that telemetry metrics are stored when message is received.""" + metrics = [ + MetricResult(tag="gpu_util", header="GPU Utilization", unit="%", avg=75.0), + MetricResult( + tag="gpu_memory", header="GPU Memory Used", unit="GB", avg=8.5 + ), + ] + + message = RealtimeTelemetryMetricsMessage( + service_id="records_manager", metrics=metrics + ) + + await mocked_mixin._on_realtime_telemetry_metrics(message) + + # Verify metrics were stored + assert mocked_mixin._telemetry_metrics == metrics + + @pytest.mark.asyncio + async def test_on_realtime_telemetry_metrics_triggers_hook(self, mocked_mixin): + """Test that receiving telemetry metrics triggers the appropriate hook.""" + + metrics = [ + MetricResult(tag="gpu_util", header="GPU Utilization", unit="%", avg=75.0) + ] + + message = RealtimeTelemetryMetricsMessage( + service_id="records_manager", metrics=metrics + ) + + await mocked_mixin._on_realtime_telemetry_metrics(message) + + # Verify hook was triggered with correct arguments + mocked_mixin.run_hooks.assert_called_once_with( + AIPerfHook.ON_REALTIME_TELEMETRY_METRICS, metrics=metrics + ) + + @pytest.mark.asyncio + async def test_on_realtime_telemetry_metrics_replaces_previous_metrics( + self, mocked_mixin + ): + """Test that new metrics replace previous metrics (not append).""" + # Set initial metrics + initial_metrics = [ + MetricResult(tag="old_metric", header="Old Metric", unit="ms", avg=10.0) + ] + mocked_mixin._telemetry_metrics = initial_metrics + + # Receive new metrics + new_metrics = [ + MetricResult(tag="new_metric", header="New Metric", unit="%", avg=50.0) + ] + message = RealtimeTelemetryMetricsMessage( + service_id="records_manager", metrics=new_metrics + ) + + await mocked_mixin._on_realtime_telemetry_metrics(message) + + # Verify old metrics were replaced, not appended + assert mocked_mixin._telemetry_metrics == new_metrics + assert len(mocked_mixin._telemetry_metrics) == 1 + + @pytest.mark.asyncio + async def test_on_realtime_telemetry_metrics_with_empty_list(self, mocked_mixin): + """Test that receiving empty metrics list is handled correctly.""" + message = RealtimeTelemetryMetricsMessage( + service_id="records_manager", metrics=[] + ) + + await mocked_mixin._on_realtime_telemetry_metrics(message) + + # Should store empty list and still trigger hook + assert mocked_mixin._telemetry_metrics == [] + mocked_mixin.run_hooks.assert_called_once() + + @pytest.mark.asyncio + async def test_concurrent_access_with_lock(self, mocked_mixin): + """Test that the lock protects concurrent access to telemetry metrics.""" + + # Track lock acquisition order + lock_acquired_order = [] + + async def acquire_lock_and_update(metrics_value, delay): + """Helper to simulate concurrent updates.""" + async with mocked_mixin._telemetry_metrics_lock: + lock_acquired_order.append(metrics_value) + await asyncio.sleep(delay) + mocked_mixin._telemetry_metrics = [ + MetricResult( + tag=f"metric_{metrics_value}", + header=f"Metric {metrics_value}", + unit="ms", + avg=float(metrics_value), + ) + ] + + # Start two concurrent operations + await asyncio.gather( + acquire_lock_and_update(1, 0.01), acquire_lock_and_update(2, 0.005) + ) + + # Both should have acquired the lock (order doesn't matter for this test) + assert len(lock_acquired_order) == 2 + assert set(lock_acquired_order) == {1, 2} + + # Final value should be from the last completed operation + assert len(mocked_mixin._telemetry_metrics) == 1 + + @pytest.mark.asyncio + async def test_multiple_metrics_handling(self, mocked_mixin): + """Test handling of message with multiple metrics.""" + metrics = [ + MetricResult( + tag=f"metric_{i}", header=f"Metric {i}", unit="ms", avg=float(i) + ) + for i in range(10) + ] + + message = RealtimeTelemetryMetricsMessage( + service_id="records_manager", metrics=metrics + ) + + await mocked_mixin._on_realtime_telemetry_metrics(message) + + # All metrics should be stored + assert len(mocked_mixin._telemetry_metrics) == 10 + assert mocked_mixin._telemetry_metrics == metrics + + @pytest.mark.asyncio + async def test_integration_with_controller(self): + """Test that mixin integrates correctly with controller.""" + service_config = ServiceConfig() + mock_controller = MagicMock() + mock_controller.some_method = MagicMock(return_value="test_value") + + with patch( + "aiperf.common.mixins.message_bus_mixin.MessageBusClientMixin.__init__", + return_value=None, + ): + mixin = RealtimeTelemetryMetricsMixin( + service_config=service_config, controller=mock_controller + ) + + # Verify controller is accessible + assert mixin._controller == mock_controller + assert mixin._controller.some_method() == "test_value" diff --git a/tests/post_processors/test_telemetry_results_processor.py b/tests/post_processors/test_telemetry_results_processor.py index f94041655..64e94cbb0 100644 --- a/tests/post_processors/test_telemetry_results_processor.py +++ b/tests/post_processors/test_telemetry_results_processor.py @@ -82,6 +82,26 @@ async def test_process_telemetry_record( assert dcgm_url in processor._telemetry_hierarchy.dcgm_endpoints assert gpu_uuid in processor._telemetry_hierarchy.dcgm_endpoints[dcgm_url] + @pytest.mark.asyncio + async def test_get_telemetry_hierarchy( + self, mock_user_config: UserConfig, sample_telemetry_record: TelemetryRecord + ) -> None: + """Test get_telemetry_hierarchy returns accumulated data.""" + processor = TelemetryResultsProcessor(mock_user_config) + + # Add some records + await processor.process_telemetry_record(sample_telemetry_record) + + # Get hierarchy + hierarchy = processor.get_telemetry_hierarchy() + + assert isinstance(hierarchy, TelemetryHierarchy) + assert sample_telemetry_record.dcgm_url in hierarchy.dcgm_endpoints + assert ( + sample_telemetry_record.gpu_uuid + in hierarchy.dcgm_endpoints[sample_telemetry_record.dcgm_url] + ) + @pytest.mark.asyncio async def test_summarize_with_valid_data( self, mock_user_config: UserConfig, sample_telemetry_record: TelemetryRecord diff --git a/tests/records/test_records_filtering.py b/tests/records/test_records_filtering.py index 89a13952a..83616b5bc 100644 --- a/tests/records/test_records_filtering.py +++ b/tests/records/test_records_filtering.py @@ -352,6 +352,7 @@ async def test_on_telemetry_records_valid(self): message = TelemetryRecordsMessage( service_id="test_service", collector_id="test_collector", + dcgm_url="http://localhost:9400/metrics", records=records, error=None, ) @@ -385,6 +386,7 @@ async def test_on_telemetry_records_invalid(self): message = TelemetryRecordsMessage( service_id="test_service", collector_id="test_collector", + dcgm_url="http://localhost:9400/metrics", records=[], error=error, ) diff --git a/tests/ui/test_aiperf_dashboard_ui.py b/tests/ui/test_aiperf_dashboard_ui.py new file mode 100644 index 000000000..1ec01bf19 --- /dev/null +++ b/tests/ui/test_aiperf_dashboard_ui.py @@ -0,0 +1,152 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from unittest.mock import MagicMock, Mock, patch + +import pytest + +from aiperf.ui.dashboard.aiperf_dashboard_ui import AIPerfDashboardUI + + +class TestAIPerfDashboardUIInitialization: + """Test AIPerfDashboardUI initialization.""" + + @pytest.fixture + def mock_dependencies(self, service_config, user_config): + """Create mocked dependencies for AIPerfDashboardUI.""" + mock_log_queue = Mock() + mock_controller = MagicMock() + mock_controller.service_id = "test_controller" + + return { + "log_queue": mock_log_queue, + "service_config": service_config, + "user_config": user_config, + "controller": mock_controller, + } + + @pytest.fixture + def dashboard_ui(self, mock_dependencies): + """Create AIPerfDashboardUI instance with mocked dependencies.""" + with ( + patch( + "aiperf.ui.dashboard.aiperf_dashboard_ui.AIPerfTextualApp" + ) as mock_app_class, + patch( + "aiperf.ui.dashboard.aiperf_dashboard_ui.LogConsumer" + ) as mock_consumer_class, + ): + mock_app_instance = Mock() + mock_app_class.return_value = mock_app_instance + + mock_consumer_instance = Mock() + mock_consumer_class.return_value = mock_consumer_instance + + dashboard = AIPerfDashboardUI(**mock_dependencies) + + dashboard.mock_app = mock_app_instance + dashboard.mock_consumer = mock_consumer_instance + + return dashboard + + def test_init_creates_log_consumer(self, mock_dependencies): + """Test that initialization creates LogConsumer with log_queue.""" + with ( + patch( + "aiperf.ui.dashboard.aiperf_dashboard_ui.AIPerfTextualApp" + ) as mock_app_class, + patch( + "aiperf.ui.dashboard.aiperf_dashboard_ui.LogConsumer" + ) as mock_consumer_class, + ): + mock_app_instance = Mock() + mock_app_class.return_value = mock_app_instance + + AIPerfDashboardUI(**mock_dependencies) + + mock_consumer_class.assert_called_once() + call_kwargs = mock_consumer_class.call_args[1] + assert call_kwargs["log_queue"] == mock_dependencies["log_queue"] + assert call_kwargs["app"] == mock_app_instance + + def test_init_attaches_child_lifecycle(self, dashboard_ui): + """Test that log consumer lifecycle is attached.""" + assert dashboard_ui.log_consumer == dashboard_ui.mock_consumer + + def test_init_attaches_all_hooks(self, dashboard_ui): + """Test that all required hooks are attached correctly to the app.""" + assert hasattr(dashboard_ui.app, "on_records_progress") + assert hasattr(dashboard_ui.app, "on_profiling_progress") + assert hasattr(dashboard_ui.app, "on_warmup_progress") + assert hasattr(dashboard_ui.app, "on_worker_update") + assert hasattr(dashboard_ui.app, "on_worker_status_summary") + assert hasattr(dashboard_ui.app, "on_realtime_metrics") + assert hasattr(dashboard_ui.app, "on_realtime_telemetry_metrics") + + def test_init_stores_references(self, dashboard_ui, mock_dependencies): + """Test that controller and service_config are stored.""" + assert dashboard_ui.controller == mock_dependencies["controller"] + assert dashboard_ui.service_config == mock_dependencies["service_config"] + + def test_init_creates_textual_app(self, mock_dependencies): + """Test that AIPerfTextualApp is created with correct parameters.""" + with ( + patch( + "aiperf.ui.dashboard.aiperf_dashboard_ui.AIPerfTextualApp" + ) as mock_app_class, + patch("aiperf.ui.dashboard.aiperf_dashboard_ui.LogConsumer"), + ): + AIPerfDashboardUI(**mock_dependencies) + + mock_app_class.assert_called_once() + call_kwargs = mock_app_class.call_args[1] + assert call_kwargs["service_config"] == mock_dependencies["service_config"] + assert call_kwargs["controller"] == mock_dependencies["controller"] + + +class TestAIPerfDashboardUILifecycle: + """Test lifecycle hooks.""" + + @pytest.fixture + def dashboard_ui(self, service_config, user_config): + """Create AIPerfDashboardUI instance with mocked dependencies.""" + mock_log_queue = Mock() + mock_controller = MagicMock() + + with ( + patch( + "aiperf.ui.dashboard.aiperf_dashboard_ui.AIPerfTextualApp" + ) as mock_app_class, + patch("aiperf.ui.dashboard.aiperf_dashboard_ui.LogConsumer"), + ): + mock_app_instance = Mock() + mock_app_class.return_value = mock_app_instance + + dashboard = AIPerfDashboardUI( + log_queue=mock_log_queue, + service_config=service_config, + user_config=user_config, + controller=mock_controller, + ) + + dashboard.mock_app = mock_app_instance + + return dashboard + + @pytest.mark.asyncio + async def test_run_app_starts_textual(self, dashboard_ui): + """Test that _run_app starts the Textual app via execute_async.""" + mock_coroutine = Mock() + dashboard_ui.app.run_async.return_value = mock_coroutine + + with patch.object(dashboard_ui, "execute_async") as mock_execute: + await dashboard_ui._run_app() + + mock_execute.assert_called_once_with(mock_coroutine) + + @pytest.mark.asyncio + async def test_on_stop_exits_app(self, dashboard_ui): + """Test that _on_stop calls app.exit with return_code=0.""" + await dashboard_ui._on_stop() + + dashboard_ui.app.exit.assert_called_once_with(return_code=0) diff --git a/tests/ui/test_aiperf_textual_app.py b/tests/ui/test_aiperf_textual_app.py new file mode 100644 index 000000000..362656c0a --- /dev/null +++ b/tests/ui/test_aiperf_textual_app.py @@ -0,0 +1,357 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from unittest.mock import AsyncMock, MagicMock, Mock, patch + +import pytest + +from aiperf.common.enums import GPUTelemetryMode +from aiperf.common.messages import StartRealtimeTelemetryCommand +from aiperf.common.models import MetricResult +from aiperf.ui.dashboard.aiperf_textual_app import AIPerfTextualApp + + +class TestAIPerfTextualAppInitialization: + """Test AIPerfTextualApp initialization.""" + + @pytest.fixture + def mock_controller(self): + """Create a mock controller.""" + controller = MagicMock() + controller.service_id = "test_controller" + controller.user_config = MagicMock() + controller.user_config.gpu_telemetry_mode = GPUTelemetryMode.SUMMARY + return controller + + @pytest.fixture + def app(self, service_config, mock_controller): + """Create AIPerfTextualApp instance.""" + return AIPerfTextualApp( + service_config=service_config, controller=mock_controller + ) + + def test_init_sets_title(self, app): + """Test that initialization sets the correct title.""" + assert app.title == "NVIDIA AIPerf" + + def test_init_widget_references_none(self, app): + """Test that widget references start as None.""" + assert app.log_viewer is None + assert app.progress_dashboard is None + assert app.progress_header is None + assert app.worker_dashboard is None + assert app.realtime_metrics_dashboard is None + assert app.realtime_telemetry_dashboard is None + + +class TestAIPerfTextualAppActions: + """Test action handlers.""" + + @pytest.fixture + def mock_controller(self): + """Create a mock controller.""" + controller = AsyncMock() + controller.service_id = "test_controller" + controller.user_config = MagicMock() + controller.user_config.gpu_telemetry_mode = GPUTelemetryMode.SUMMARY + controller.publish = AsyncMock() + return controller + + @pytest.fixture + def app(self, service_config, mock_controller): + """Create AIPerfTextualApp instance.""" + return AIPerfTextualApp( + service_config=service_config, controller=mock_controller + ) + + @pytest.mark.asyncio + async def test_action_quit_cleanup(self, app): + """Test that action_quit clears widget references and signals.""" + app.worker_dashboard = Mock() + app.progress_dashboard = Mock() + app.progress_header = Mock() + app.realtime_metrics_dashboard = Mock() + app.log_viewer = Mock() + + with ( + patch("os.kill") as mock_kill, + patch("os.getpid", return_value=12345), + ): + await app.action_quit() + + assert app.worker_dashboard is None + assert app.progress_dashboard is None + assert app.progress_header is None + assert app.realtime_metrics_dashboard is None + assert app.log_viewer is None + + mock_kill.assert_called_once() + + @pytest.mark.asyncio + async def test_action_toggle_hide_log_viewer(self, app): + """Test that toggle_hide_log_viewer toggles the hidden class.""" + mock_logs_section = Mock() + + with patch.object(app, "query_one", return_value=mock_logs_section): + await app.action_toggle_hide_log_viewer() + + mock_logs_section.toggle_class.assert_called_once_with("hidden") + + @pytest.mark.asyncio + async def test_action_restore_all_panels(self, app): + """Test that restore_all_panels minimizes screen and unhides logs.""" + mock_screen = Mock() + mock_logs_section = Mock() + + with ( + patch.object( + type(app), + "screen", + new_callable=lambda: property(lambda self: mock_screen), + ), + patch.object(app, "query_one", return_value=mock_logs_section), + ): + await app.action_restore_all_panels() + + mock_screen.minimize.assert_called_once() + mock_logs_section.remove_class.assert_called_once_with("hidden") + + @pytest.mark.asyncio + async def test_action_minimize_all_panels(self, app): + """Test that minimize_all_panels minimizes the screen.""" + mock_screen = Mock() + + with patch.object( + type(app), "screen", new_callable=lambda: property(lambda self: mock_screen) + ): + await app.action_minimize_all_panels() + + mock_screen.minimize.assert_called_once() + + @pytest.mark.asyncio + async def test_action_toggle_maximize_regular_panel(self, app): + """Test toggle_maximize for a regular panel.""" + mock_screen = Mock() + mock_panel = Mock() + mock_panel.is_maximized = False + + with ( + patch.object( + type(app), + "screen", + new_callable=lambda: property(lambda self: mock_screen), + ), + patch.object(app, "query_one", return_value=mock_panel), + ): + await app.action_toggle_maximize("progress") + + mock_screen.maximize.assert_called_once_with(mock_panel) + + @pytest.mark.asyncio + async def test_action_toggle_maximize_minimizes_when_maximized(self, app): + """Test toggle_maximize minimizes when panel is already maximized.""" + mock_screen = Mock() + mock_panel = Mock() + mock_panel.is_maximized = True + + with ( + patch.object( + type(app), + "screen", + new_callable=lambda: property(lambda self: mock_screen), + ), + patch.object(app, "query_one", return_value=mock_panel), + ): + await app.action_toggle_maximize("progress") + + mock_screen.minimize.assert_called_once() + mock_screen.maximize.assert_not_called() + + @pytest.mark.asyncio + async def test_action_toggle_maximize_telemetry_enables_mode( + self, app, mock_controller + ): + """Test that toggling telemetry panel enables telemetry and publishes command.""" + mock_screen = Mock() + mock_panel = Mock() + mock_panel.is_maximized = False + app.realtime_telemetry_dashboard = Mock() + + with ( + patch.object( + type(app), + "screen", + new_callable=lambda: property(lambda self: mock_screen), + ), + patch.object(app, "query_one", return_value=mock_panel), + ): + await app.action_toggle_maximize_telemetry() + + assert ( + mock_controller.user_config.gpu_telemetry_mode + == GPUTelemetryMode.REALTIME_DASHBOARD + ) + app.realtime_telemetry_dashboard.set_status_message.assert_called_once_with( + "Enabling live GPU telemetry..." + ) + mock_controller.publish.assert_called_once() + + call_args = mock_controller.publish.call_args[0][0] + assert isinstance(call_args, StartRealtimeTelemetryCommand) + + +class TestAIPerfTextualAppProgressHandlers: + """Test progress update handlers.""" + + @pytest.fixture + def mock_controller(self): + """Create a mock controller.""" + controller = MagicMock() + controller.service_id = "test_controller" + controller.user_config = MagicMock() + return controller + + @pytest.fixture + def app(self, service_config, mock_controller): + """Create AIPerfTextualApp instance.""" + return AIPerfTextualApp( + service_config=service_config, controller=mock_controller + ) + + @pytest.mark.asyncio + async def test_on_warmup_progress(self, app): + """Test on_warmup_progress updates dashboard and header.""" + app.progress_dashboard = Mock() + app.progress_dashboard.batch = MagicMock() + app.progress_header = Mock() + app._has_result_data = True + mock_section = Mock() + + warmup_stats = Mock() + warmup_stats.finished = 50 + warmup_stats.total_expected_requests = 100 + + with patch.object(app, "query_one", return_value=mock_section): + await app.on_warmup_progress(warmup_stats) + + app.progress_dashboard.on_warmup_progress.assert_called_once_with( + warmup_stats + ) + app.progress_header.update_progress.assert_called_once_with( + header="Warmup", progress=50, total=100 + ) + + @pytest.mark.asyncio + async def test_on_profiling_progress(self, app): + """Test on_profiling_progress updates dashboard and header.""" + app.progress_dashboard = Mock() + app.progress_dashboard.batch = MagicMock() + app.progress_header = Mock() + app._has_result_data = True + mock_section = Mock() + + profiling_stats = Mock() + profiling_stats.finished = 75 + profiling_stats.total_expected_requests = 150 + + with patch.object(app, "query_one", return_value=mock_section): + await app.on_profiling_progress(profiling_stats) + + app.progress_dashboard.on_profiling_progress.assert_called_once_with( + profiling_stats + ) + app.progress_header.update_progress.assert_called_once_with( + header="Profiling", progress=75, total=150 + ) + + @pytest.mark.asyncio + async def test_on_records_progress(self, app): + """Test on_records_progress updates dashboard.""" + app.progress_dashboard = Mock() + app.progress_dashboard.batch = MagicMock() + app.progress_header = Mock() + app._profiling_stats = Mock() + app._profiling_stats.finished = 100 + app._profiling_stats.total_expected_requests = 100 + app._profiling_stats.is_complete = True + + records_stats = Mock() + + await app.on_records_progress(records_stats) + + app.progress_dashboard.on_records_progress.assert_called_once_with( + records_stats + ) + app.progress_header.update_progress.assert_called_once_with( + header="Records", progress=100, total=100 + ) + + +class TestAIPerfTextualAppMetricsHandlers: + """Test metrics and telemetry handlers.""" + + @pytest.fixture + def mock_controller(self): + """Create a mock controller.""" + controller = MagicMock() + controller.service_id = "test_controller" + controller.user_config = MagicMock() + return controller + + @pytest.fixture + def app(self, service_config, mock_controller): + """Create AIPerfTextualApp instance.""" + return AIPerfTextualApp( + service_config=service_config, controller=mock_controller + ) + + @pytest.mark.asyncio + async def test_on_worker_update(self, app): + """Test on_worker_update forwards to worker_dashboard.""" + app.worker_dashboard = Mock() + app.worker_dashboard.batch = MagicMock() + + worker_stats = Mock() + + await app.on_worker_update("worker1", worker_stats) + + app.worker_dashboard.on_worker_update.assert_called_once_with( + "worker1", worker_stats + ) + + @pytest.mark.asyncio + async def test_on_realtime_metrics(self, app): + """Test on_realtime_metrics forwards to realtime_metrics_dashboard.""" + app.realtime_metrics_dashboard = Mock() + app.realtime_metrics_dashboard.batch = MagicMock() + + metrics = [ + MetricResult(tag="test_metric", header="Test Metric", unit="ms", avg=10.5) + ] + + await app.on_realtime_metrics(metrics) + + app.realtime_metrics_dashboard.on_realtime_metrics.assert_called_once_with( + metrics + ) + + @pytest.mark.asyncio + async def test_on_realtime_telemetry_metrics(self, app): + """Test on_realtime_telemetry_metrics forwards to realtime_telemetry_dashboard.""" + app.realtime_telemetry_dashboard = Mock() + app.realtime_telemetry_dashboard.batch = MagicMock() + + metrics = [ + MetricResult( + tag="gpu_util_dcgm_http___localhost_9400_metrics_gpu0_GPU-12345678", + header="GPU Utilization | localhost:9400 | GPU 0 | NVIDIA RTX 4090", + unit="%", + avg=75.0, + ) + ] + + await app.on_realtime_telemetry_metrics(metrics) + + app.realtime_telemetry_dashboard.on_realtime_telemetry_metrics.assert_called_once_with( + metrics + ) diff --git a/tests/ui/test_realtime_telemetry_dashboard.py b/tests/ui/test_realtime_telemetry_dashboard.py new file mode 100644 index 000000000..ff5339c18 --- /dev/null +++ b/tests/ui/test_realtime_telemetry_dashboard.py @@ -0,0 +1,651 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from unittest.mock import Mock, patch + +import pytest +from rich.text import Text +from textual.widgets.data_table import RowDoesNotExist + +from aiperf.common.config.service_config import ServiceConfig +from aiperf.common.models import MetricResult +from aiperf.ui.dashboard.realtime_telemetry_dashboard import ( + GPUMetricsTable, + RealtimeTelemetryDashboard, + SingleNodeView, +) + + +class TestGPUMetricsTable: + """Test utility methods in GPUMetricsTable.""" + + @pytest.fixture + def gpu_metrics_table(self): + """Create a GPUMetricsTable instance for testing.""" + return GPUMetricsTable( + endpoint="localhost:9400", + gpu_uuid="GPU-12345678-90ab", + gpu_index=0, + model_name="NVIDIA RTX 4090", + ) + + def test_format_metric_row_with_all_stats(self, gpu_metrics_table): + """Test _format_metric_row formats all statistics correctly.""" + metric = MetricResult( + tag="gpu_util_dcgm_http___localhost_9400_metrics_gpu0_GPU-12345678", + header="GPU Power Usage | localhost:9400 | GPU 0 | NVIDIA RTX 4090", + unit="W", + current=250.5, + avg=245.0, + min=200.0, + max=300.0, + p99=290.0, + p90=280.0, + p50=245.0, + std=15.5, + ) + + row_cells = gpu_metrics_table._format_metric_row(metric) + + # Should have 9 cells: metric name + 8 stats + assert len(row_cells) == 9 + + # First cell should be the metric name (before |) + assert row_cells[0].plain == "GPU Power Usage" + + # All cells should be Text objects + assert all(isinstance(cell, Text) for cell in row_cells) + + def test_format_metric_row_simple_header(self, gpu_metrics_table): + """Test _format_metric_row with simple header (no | separator).""" + metric = MetricResult( + tag="simple_metric", + header="Simple Metric", + unit="ms", + avg=10.0, + ) + + row_cells = gpu_metrics_table._format_metric_row(metric) + + # First cell should be the full header + assert row_cells[0].plain == "Simple Metric" + + def test_format_value_none(self, gpu_metrics_table): + """Test _format_value with None returns 'N/A'.""" + result = gpu_metrics_table._format_value(None) + + assert isinstance(result, Text) + assert result.plain == "N/A" + assert result.style == "dim" + + def test_format_value_small_number(self, gpu_metrics_table): + """Test _format_value with small number (< 1,000,000).""" + result = gpu_metrics_table._format_value(1234.567) + + assert isinstance(result, Text) + assert result.plain == "1,234.57" + assert result.style == "green" + + def test_format_value_large_number(self, gpu_metrics_table): + """Test _format_value with large number (>= 1,000,000) uses scientific notation.""" + result = gpu_metrics_table._format_value(1234567.89) + + assert isinstance(result, Text) + assert result.plain == "1.23e+06" + assert result.style == "green" + + def test_format_value_zero(self, gpu_metrics_table): + """Test _format_value with zero.""" + result = gpu_metrics_table._format_value(0.0) + + assert isinstance(result, Text) + assert result.plain == "0.00" + + def test_format_value_negative(self, gpu_metrics_table): + """Test _format_value with negative number.""" + result = gpu_metrics_table._format_value(-123.45) + + assert isinstance(result, Text) + assert result.plain == "-123.45" + + def test_format_value_non_numeric(self, gpu_metrics_table): + """Test _format_value with non-numeric value.""" + result = gpu_metrics_table._format_value("text_value") + + assert isinstance(result, Text) + assert result.plain == "text_value" + + +class TestSingleNodeView: + """Test utility methods in SingleNodeView.""" + + @pytest.fixture + def single_node_view(self): + """Create a SingleNodeView instance for testing.""" + return SingleNodeView() + + def test_group_metrics_by_gpu_single_gpu(self, single_node_view): + """Test _group_metrics_by_gpu with metrics from a single GPU.""" + metrics = [ + MetricResult( + tag="gpu_util_dcgm_http___localhost_9400_metrics_gpu0_GPU-12345678", + header="GPU Utilization | localhost:9400 | GPU 0 | Model", + unit="%", + avg=75.0, + ), + MetricResult( + tag="gpu_memory_dcgm_http___localhost_9400_metrics_gpu0_GPU-12345678", + header="GPU Memory | localhost:9400 | GPU 0 | Model", + unit="GB", + avg=8.5, + ), + ] + + grouped = single_node_view._group_metrics_by_gpu(metrics) + + # Should have 1 GPU group + assert len(grouped) == 1 + + # Both metrics should be in the same group + gpu_key = list(grouped.keys())[0] + assert len(grouped[gpu_key]) == 2 + + def test_group_metrics_by_gpu_multiple_gpus(self, single_node_view): + """Test _group_metrics_by_gpu with metrics from multiple GPUs.""" + metrics = [ + MetricResult( + tag="gpu_util_dcgm_http___localhost_9400_metrics_gpu0_GPU-12345678", + header="GPU Utilization | localhost:9400 | GPU 0 | Model", + unit="%", + avg=75.0, + ), + MetricResult( + tag="gpu_util_dcgm_http___localhost_9400_metrics_gpu1_GPU-87654321", + header="GPU Utilization | localhost:9400 | GPU 1 | Model", + unit="%", + avg=80.0, + ), + MetricResult( + tag="gpu_memory_dcgm_http___localhost_9400_metrics_gpu0_GPU-12345678", + header="GPU Memory | localhost:9400 | GPU 0 | Model", + unit="GB", + avg=8.5, + ), + ] + + grouped = single_node_view._group_metrics_by_gpu(metrics) + + # Should have 2 GPU groups + assert len(grouped) == 2 + + # Verify metrics are grouped correctly + all_metrics = [m for group in grouped.values() for m in group] + assert len(all_metrics) == 3 + + def test_group_metrics_by_gpu_empty_list(self, single_node_view): + """Test _group_metrics_by_gpu with empty metrics list.""" + grouped = single_node_view._group_metrics_by_gpu([]) + + assert grouped == {} + + def test_extract_gpu_key_from_tag_valid(self, single_node_view): + """Test _extract_gpu_key_from_tag with valid tag.""" + tag = "gpu_util_dcgm_http___localhost_9400_metrics_gpu0_GPU-12345678" + + gpu_key = single_node_view._extract_gpu_key_from_tag(tag) + + # Should extract everything after _dcgm_ with _gpu replaced by _ + assert "http___localhost_9400_metrics" in gpu_key + assert "0_GPU-12345678" in gpu_key + + def test_extract_gpu_key_from_tag_no_dcgm(self, single_node_view): + """Test _extract_gpu_key_from_tag with tag missing _dcgm_.""" + tag = "simple_metric_tag" + + gpu_key = single_node_view._extract_gpu_key_from_tag(tag) + + assert gpu_key == "unknown" + + def test_extract_gpu_key_from_tag_no_gpu(self, single_node_view): + """Test _extract_gpu_key_from_tag with tag missing _gpu.""" + tag = "metric_dcgm_http___localhost_9400_metrics" + + gpu_key = single_node_view._extract_gpu_key_from_tag(tag) + + assert gpu_key == "unknown" + + def test_extract_gpu_info_full_header(self, single_node_view): + """Test _extract_gpu_info with complete header and tag.""" + metric = MetricResult( + tag="gpu_util_dcgm_http___localhost_9400_metrics_gpu0_GPU-12345678", + header="GPU Power Usage | localhost:9400 | GPU 0 | NVIDIA RTX 4090", + unit="W", + avg=250.0, + ) + + endpoint, gpu_index, gpu_uuid, model_name = single_node_view._extract_gpu_info( + metric + ) + + assert endpoint == "localhost:9400" + assert gpu_index == 0 + assert gpu_uuid == "GPU-12345678" + assert model_name == "NVIDIA RTX 4090" + + def test_extract_gpu_info_incomplete_header(self, single_node_view): + """Test _extract_gpu_info with incomplete header uses defaults.""" + metric = MetricResult( + tag="simple_metric_tag", + header="Simple Metric", + unit="ms", + avg=10.0, + ) + + endpoint, gpu_index, gpu_uuid, model_name = single_node_view._extract_gpu_info( + metric + ) + + # Should use default values + assert endpoint == "unknown" + assert gpu_index == 0 + assert model_name == "GPU" + + def test_extract_gpu_info_different_gpu_index(self, single_node_view): + """Test _extract_gpu_info correctly parses different GPU indices.""" + metric = MetricResult( + tag="gpu_util_dcgm_http___localhost_9401_metrics_gpu7_UUID-999", + header="GPU Utilization | localhost:9401 | GPU 7 | Tesla V100", + unit="%", + avg=85.0, + ) + + endpoint, gpu_index, gpu_uuid, model_name = single_node_view._extract_gpu_info( + metric + ) + + assert endpoint == "localhost:9401" + assert gpu_index == 7 + assert gpu_uuid == "UUID-999" + assert model_name == "Tesla V100" + + def test_extract_gpu_info_uuid_from_tag(self, single_node_view): + """Test _extract_gpu_info extracts UUID from tag (last part).""" + metric = MetricResult( + tag="metric_name_dcgm_endpoint_gpu0_my-custom-uuid", + header="Metric | endpoint | GPU 0 | Model", + unit="ms", + avg=10.0, + ) + + endpoint, gpu_index, gpu_uuid, model_name = single_node_view._extract_gpu_info( + metric + ) + + assert gpu_uuid == "my-custom-uuid" + + def test_group_metrics_preserves_order(self, single_node_view): + """Test that _group_metrics_by_gpu preserves metric order within groups.""" + metrics = [ + MetricResult( + tag="metric1_dcgm_endpoint_gpu0_uuid", + header="Metric 1 | endpoint | GPU 0 | Model", + unit="ms", + avg=10.0, + ), + MetricResult( + tag="metric2_dcgm_endpoint_gpu0_uuid", + header="Metric 2 | endpoint | GPU 0 | Model", + unit="ms", + avg=20.0, + ), + MetricResult( + tag="metric3_dcgm_endpoint_gpu0_uuid", + header="Metric 3 | endpoint | GPU 0 | Model", + unit="ms", + avg=30.0, + ), + ] + + grouped = single_node_view._group_metrics_by_gpu(metrics) + + # All metrics should be in one group and maintain order + gpu_key = list(grouped.keys())[0] + assert [m.header.split(" | ")[0] for m in grouped[gpu_key]] == [ + "Metric 1", + "Metric 2", + "Metric 3", + ] + + +class TestGPUMetricsTableLifecycle: + """Test lifecycle methods of GPUMetricsTable.""" + + @pytest.fixture + def gpu_metrics_table(self): + """Create a GPUMetricsTable instance for testing.""" + return GPUMetricsTable( + endpoint="localhost:9400", + gpu_uuid="GPU-12345678-90ab", + gpu_index=0, + model_name="NVIDIA RTX 4090", + ) + + def test_compose_creates_widgets(self, gpu_metrics_table): + """Test that compose yields the correct widgets.""" + widgets = list(gpu_metrics_table.compose()) + + assert len(widgets) == 2 + assert widgets[0].__class__.__name__ == "Static" + assert widgets[1].__class__.__name__ == "NonFocusableDataTable" + + def test_initialize_columns(self, gpu_metrics_table): + """Test that _initialize_columns sets up all columns correctly.""" + mock_table = Mock() + mock_column_key = Mock() + mock_table.add_column.return_value = mock_column_key + + gpu_metrics_table.data_table = mock_table + gpu_metrics_table._initialize_columns() + + assert mock_table.add_column.call_count == len(gpu_metrics_table.COLUMNS) + assert gpu_metrics_table._columns_initialized is True + assert len(gpu_metrics_table._column_keys) == len(gpu_metrics_table.COLUMNS) + + def test_update_with_no_data_table(self, gpu_metrics_table): + """Test update method when data_table is None.""" + gpu_metrics_table.data_table = None + metrics = [ + MetricResult( + tag="gpu_util_dcgm_http___localhost_9400_metrics_gpu0_GPU-12345678", + header="GPU Utilization | localhost:9400 | GPU 0 | NVIDIA RTX 4090", + unit="%", + avg=75.0, + ) + ] + + gpu_metrics_table.update(metrics) + + def test_update_with_unmounted_table(self, gpu_metrics_table): + """Test update method when data_table is not mounted.""" + mock_table = Mock() + mock_table.is_mounted = False + gpu_metrics_table.data_table = mock_table + + metrics = [ + MetricResult( + tag="gpu_util_dcgm_http___localhost_9400_metrics_gpu0_GPU-12345678", + header="GPU Utilization | localhost:9400 | GPU 0 | NVIDIA RTX 4090", + unit="%", + avg=75.0, + ) + ] + + gpu_metrics_table.update(metrics) + mock_table.add_row.assert_not_called() + + def test_update_adds_new_row(self, gpu_metrics_table): + """Test that update adds a new row for a new metric.""" + mock_table = Mock() + mock_table.is_mounted = True + mock_column_key = Mock() + mock_row_key = Mock() + mock_table.add_column.return_value = mock_column_key + mock_table.add_row.return_value = mock_row_key + + gpu_metrics_table.data_table = mock_table + gpu_metrics_table._initialize_columns() + + metrics = [ + MetricResult( + tag="gpu_util_dcgm_http___localhost_9400_metrics_gpu0_GPU-12345678", + header="GPU Utilization | localhost:9400 | GPU 0 | NVIDIA RTX 4090", + unit="%", + current=75.0, + avg=75.0, + min=70.0, + max=80.0, + p99=79.0, + p90=78.0, + p50=75.0, + std=2.5, + ) + ] + + gpu_metrics_table.update(metrics) + + mock_table.add_row.assert_called_once() + assert ( + gpu_metrics_table._metric_row_keys[ + "gpu_util_dcgm_http___localhost_9400_metrics_gpu0_GPU-12345678" + ] + == mock_row_key + ) + + def test_update_single_row_with_exception(self, gpu_metrics_table): + """Test that _update_single_row handles exceptions gracefully.""" + mock_table = Mock() + mock_table.update_cell.side_effect = Exception("Update failed") + gpu_metrics_table.data_table = mock_table + + mock_row_key = Mock() + mock_column_key = Mock() + gpu_metrics_table._column_keys = { + col: mock_column_key for col in gpu_metrics_table.COLUMNS + } + + row_cells = [Text("Test", justify="left") for _ in gpu_metrics_table.COLUMNS] + + gpu_metrics_table._update_single_row(row_cells, mock_row_key) + + def test_update_handles_row_does_not_exist(self, gpu_metrics_table): + """Test that update handles RowDoesNotExist exception and re-adds row.""" + mock_table = Mock() + mock_table.is_mounted = True + mock_column_key = Mock() + mock_row_key = Mock() + mock_table.add_column.return_value = mock_column_key + mock_table.add_row.return_value = mock_row_key + + mock_table.get_row_index.side_effect = RowDoesNotExist("Row not found") + + gpu_metrics_table.data_table = mock_table + gpu_metrics_table._initialize_columns() + + metrics = [ + MetricResult( + tag="gpu_util_dcgm_http___localhost_9400_metrics_gpu0_GPU-12345678", + header="GPU Utilization | localhost:9400 | GPU 0 | NVIDIA RTX 4090", + unit="%", + current=75.0, + avg=75.0, + min=70.0, + max=80.0, + p99=79.0, + p90=78.0, + p50=75.0, + std=2.5, + ) + ] + + gpu_metrics_table._metric_row_keys[ + "gpu_util_dcgm_http___localhost_9400_metrics_gpu0_GPU-12345678" + ] = Mock() + + gpu_metrics_table.update(metrics) + + assert mock_table.add_row.call_count == 1 + + +class TestSingleNodeViewLifecycle: + """Test lifecycle methods of SingleNodeView.""" + + @pytest.fixture + def single_node_view(self): + """Create a SingleNodeView instance for testing.""" + return SingleNodeView() + + def test_compose_yields_nothing(self, single_node_view): + """Test that compose yields nothing initially (GPU tables added dynamically).""" + widgets = list(single_node_view.compose()) + assert len(widgets) == 0 + + def test_update_creates_gpu_table_when_mounted(self, single_node_view): + """Test that update creates GPU tables for new GPUs.""" + with ( + patch.object( + type(single_node_view), + "is_mounted", + new_callable=lambda: property(lambda self: True), + ), + patch.object(single_node_view, "mount") as mock_mount, + ): + metrics = [ + MetricResult( + tag="gpu_util_dcgm_http___localhost_9400_metrics_gpu0_GPU-12345678", + header="GPU Utilization | localhost:9400 | GPU 0 | NVIDIA RTX 4090", + unit="%", + avg=75.0, + ) + ] + + single_node_view.update(metrics) + + assert len(single_node_view.gpu_tables) == 1 + mock_mount.assert_called_once() + + def test_update_skips_creation_when_unmounted(self, single_node_view): + """Test that update doesn't create GPU tables when not mounted.""" + with patch.object( + type(single_node_view), + "is_mounted", + new_callable=lambda: property(lambda self: False), + ): + metrics = [ + MetricResult( + tag="gpu_util_dcgm_http___localhost_9400_metrics_gpu0_GPU-12345678", + header="GPU Utilization | localhost:9400 | GPU 0 | NVIDIA RTX 4090", + unit="%", + avg=75.0, + ) + ] + + single_node_view.update(metrics) + + assert len(single_node_view.gpu_tables) == 0 + + def test_update_updates_existing_gpu_table(self, single_node_view): + """Test that update calls update on existing GPU tables.""" + mock_gpu_table = Mock() + gpu_key = "http___localhost_9400_metrics_0_GPU-12345678" + single_node_view.gpu_tables[gpu_key] = mock_gpu_table + + with patch.object( + type(single_node_view), + "is_mounted", + new_callable=lambda: property(lambda self: True), + ): + metrics = [ + MetricResult( + tag="gpu_util_dcgm_http___localhost_9400_metrics_gpu0_GPU-12345678", + header="GPU Utilization | localhost:9400 | GPU 0 | NVIDIA RTX 4090", + unit="%", + avg=75.0, + ) + ] + + single_node_view.update(metrics) + + mock_gpu_table.update.assert_called_once() + + +class TestRealtimeTelemetryDashboard: + """Test RealtimeTelemetryDashboard widget.""" + + @pytest.fixture + def service_config(self): + """Create a mock ServiceConfig.""" + return Mock(spec=ServiceConfig) + + @pytest.fixture + def dashboard(self, service_config): + """Create a RealtimeTelemetryDashboard instance for testing.""" + return RealtimeTelemetryDashboard(service_config=service_config) + + def test_init(self, dashboard, service_config): + """Test dashboard initialization.""" + assert dashboard.service_config == service_config + assert dashboard.all_nodes_view is None + assert dashboard.metrics == [] + assert dashboard.border_title == "Real-Time GPU Telemetry" + + def test_compose_creates_widgets(self, dashboard): + """Test that compose yields the correct widgets.""" + widgets = list(dashboard.compose()) + + assert len(widgets) == 2 + assert widgets[0].__class__.__name__ == "Static" + assert widgets[1].__class__.__name__ == "SingleNodeView" + + def test_set_status_message(self, dashboard): + """Test set_status_message updates the status widget.""" + mock_status = Mock() + mock_all_nodes = Mock() + + with patch.object(dashboard, "query_one", return_value=mock_status): + dashboard.all_nodes_view = mock_all_nodes + + dashboard.set_status_message("Test message") + + mock_status.update.assert_called_once_with("Test message") + mock_status.remove_class.assert_called_once_with("hidden") + mock_all_nodes.add_class.assert_called_once_with("hidden") + + def test_set_status_message_handles_exception(self, dashboard): + """Test set_status_message handles exceptions gracefully.""" + with patch.object( + dashboard, "query_one", side_effect=Exception("Widget not found") + ): + dashboard.set_status_message("Test message") + + def test_on_realtime_telemetry_metrics_first_update(self, dashboard): + """Test on_realtime_telemetry_metrics on first metrics update.""" + mock_all_nodes = Mock() + mock_status = Mock() + dashboard.all_nodes_view = mock_all_nodes + + with patch.object(dashboard, "query_one", return_value=mock_status): + metrics = [ + MetricResult( + tag="gpu_util_dcgm_http___localhost_9400_metrics_gpu0_GPU-12345678", + header="GPU Utilization | localhost:9400 | GPU 0 | NVIDIA RTX 4090", + unit="%", + avg=75.0, + ) + ] + + dashboard.on_realtime_telemetry_metrics(metrics) + + assert dashboard.metrics == metrics + mock_status.add_class.assert_called_once_with("hidden") + mock_all_nodes.remove_class.assert_called_once_with("hidden") + mock_all_nodes.update.assert_called_once_with(metrics) + + def test_on_realtime_telemetry_metrics_subsequent_update(self, dashboard): + """Test on_realtime_telemetry_metrics on subsequent updates.""" + mock_all_nodes = Mock() + dashboard.all_nodes_view = mock_all_nodes + dashboard.metrics = [Mock()] + + metrics = [ + MetricResult( + tag="gpu_util_dcgm_http___localhost_9400_metrics_gpu0_GPU-12345678", + header="GPU Utilization | localhost:9400 | GPU 0 | NVIDIA RTX 4090", + unit="%", + avg=80.0, + ) + ] + + dashboard.on_realtime_telemetry_metrics(metrics) + + assert dashboard.metrics == metrics + mock_all_nodes.update.assert_called_once_with(metrics)