Skip to content

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -528,63 +528,24 @@ class PipelineRequestOptimize(BaseModel):
parameters: Optional[Dict[str, Any]]


class EncoderDeviceConfig(BaseModel):
"""
Encoder device configuration used in video output settings.

Attributes:
device_name: Name of the encoder device (for example ``"GPU"``).
gpu_id: Optional GPU index when applicable.

Example:
.. code-block:: json

{
"device_name": "GPU",
"gpu_id": 0
}
"""

device_name: str = Field(
default="GPU",
description="Name of the encoder device (e.g., 'GPU', 'CPU', 'NPU')",
examples=["GPU", "CPU", "NPU"],
)
gpu_id: Optional[int] = Field(
default=None,
description="GPU device index (only applicable when device_name indicates a GPU)",
examples=[0, 1],
)


class VideoOutputConfig(BaseModel):
"""
Generic configuration of optional encoded video output.

Attributes:
enabled: Flag to enable or disable video output generation.
encoder_device: EncoderDeviceConfig used when video output is enabled.

Example:
.. code-block:: json

{
"enabled": false,
"encoder_device": {
"device_name": "GPU",
"gpu_id": 0
}
"enabled": false
}
"""

enabled: bool = Field(
default=False, description="Flag to enable or disable video output generation."
)
encoder_device: EncoderDeviceConfig = Field(
default=EncoderDeviceConfig(device_name="GPU", gpu_id=0),
description="Encoder device configuration (only applicable when video output is enabled).",
examples=[{"device_name": "GPU", "gpu_id": 0}],
)


class PerformanceTestSpec(BaseModel):
Expand All @@ -609,12 +570,9 @@ class PerformanceTestSpec(BaseModel):
video_output: VideoOutputConfig = Field(
default=VideoOutputConfig(
enabled=False,
encoder_device=EncoderDeviceConfig(device_name="GPU", gpu_id=0),
),
description="Video output configuration.",
examples=[
{"enabled": False, "encoder_device": {"device_name": "GPU", "gpu_id": 0}}
],
examples=[{"enabled": False}],
)


Expand Down Expand Up @@ -646,12 +604,9 @@ class DensityTestSpec(BaseModel):
video_output: VideoOutputConfig = Field(
default=VideoOutputConfig(
enabled=False,
encoder_device=EncoderDeviceConfig(device_name="GPU", gpu_id=0),
),
description="Video output configuration.",
examples=[
{"enabled": False, "encoder_device": {"device_name": "GPU", "gpu_id": 0}}
],
examples=[{"enabled": False}],
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def run_performance_test(body: schemas.PerformanceTestSpec):
* pipeline_performance_specs – list of pipelines and number of
streams per pipeline.
* video_output – configuration for optional encoded video output
(enabled flag and encoder_device).
(enabled flag).

Returns:
202 Accepted:
Expand Down Expand Up @@ -78,8 +78,7 @@ def run_performance_test(body: schemas.PerformanceTestSpec):
{"id": "pipeline-b7c2e114", "streams": 4}
],
"video_output": {
"enabled": false,
"encoder_device": {"device_name": "GPU", "gpu_id": 0}
"enabled": false
}
}

Expand Down Expand Up @@ -186,8 +185,7 @@ def run_density_test(body: schemas.DensityTestSpec):
{"id": "pipeline-b7c2e114", "stream_rate": 50}
],
"video_output": {
"enabled": false,
"encoder_device": {"device_name": "GPU", "gpu_id": 0}
"enabled": false
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from utils import generate_unique_filename
from videos import get_videos_manager, OUTPUT_VIDEO_DIR
from models import get_supported_models_manager
from video_encoder import ENCODER_DEVICE_CPU, ENCODER_DEVICE_GPU
from resources import (
get_labels_manager,
get_scripts_manager,
Expand Down Expand Up @@ -367,6 +368,33 @@ def get_input_video_filenames(self) -> list[str]:

return input_filenames

def get_recommended_encoder_device(self) -> str:
"""
Iterate backwards through nodes to find the last video/x-raw caps node
and return the recommended encoder device based on memory type.

Note: NPU variants are not considered because NPUs do not provide dedicated
memory accessible for GStreamer pipeline buffering; they operate exclusively
on system or shared memory.

Returns:
str: ENCODER_DEVICE_GPU if video/x-raw(memory:VAMemory) is detected,
ENCODER_DEVICE_CPU for standard video/x-raw or when no video/x-raw
caps node exists in the pipeline.
"""
for node in reversed(self.nodes):
if node.data.get(NODE_KIND_KEY) != NODE_KIND_CAPS:
continue

if not node.type.startswith("video/x-raw"):
continue

if "memory:VAMemory" in node.type:
return ENCODER_DEVICE_GPU
return ENCODER_DEVICE_CPU

return ENCODER_DEVICE_CPU


@dataclass
class _Token:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,14 @@ def build_pipeline_command(

# Handle final video output if enabled
if video_config.enabled and stream_index == 0:
# Get recommended encoder device from the graph
encoder_device = graph.get_recommended_encoder_device()
# Replace fakesink with actual video output element
unique_pipeline_str, generated_paths = (
self.video_encoder.replace_fakesink_with_video_output(
pipeline.id,
unique_pipeline_str,
video_config.encoder_device,
encoder_device,
input_video_filenames,
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,6 @@ def test_run_performance_test_with_gpu_encoder(self, mock_test_manager):

# Verify video output configuration
self.assertTrue(call_args.video_output.enabled)
self.assertEqual(call_args.video_output.encoder_device.device_name, "GPU")
self.assertEqual(call_args.video_output.encoder_device.gpu_id, 0)

# ------------------------------------------------------------------
# /tests/density
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from dataclasses import dataclass
from unittest.mock import MagicMock, patch
from graph import Graph, Node, Edge
from video_encoder import ENCODER_DEVICE_GPU, ENCODER_DEVICE_CPU

mock_models_manager = MagicMock()
mock_videos_manager = MagicMock()
Expand Down Expand Up @@ -1505,5 +1506,133 @@ def test_empty_graph_raises_error(self):
self.assertIn("Empty graph", str(cm.exception))


class TestGetRecommendedEncoderDevice(unittest.TestCase):
"""Test cases for Graph.get_recommended_encoder_device method."""

def test_gpu_encoder_for_va_memory_caps(self):
"""Test that GPU encoder is recommended when video/x-raw(memory:VAMemory) is found."""
graph = Graph(
nodes=[
Node(id="0", type="filesrc", data={"location": "test.mp4"}),
Node(id="1", type="decodebin3", data={}),
Node(
id="2",
type="video/x-raw(memory:VAMemory)",
data={"__node_kind": "caps"},
),
Node(id="3", type="fakesink", data={}),
],
edges=[
Edge(id="0", source="0", target="1"),
Edge(id="1", source="1", target="2"),
Edge(id="2", source="2", target="3"),
],
)

self.assertEqual(graph.get_recommended_encoder_device(), ENCODER_DEVICE_GPU)

def test_cpu_encoder_for_standard_video_raw(self):
"""Test that CPU encoder is recommended for standard video/x-raw caps."""
graph = Graph(
nodes=[
Node(id="0", type="filesrc", data={"location": "test.mp4"}),
Node(id="1", type="decodebin3", data={}),
Node(
id="2",
type="video/x-raw",
data={"__node_kind": "caps", "width": "640", "height": "480"},
),
Node(id="3", type="fakesink", data={}),
],
edges=[
Edge(id="0", source="0", target="1"),
Edge(id="1", source="1", target="2"),
Edge(id="2", source="2", target="3"),
],
)

self.assertEqual(graph.get_recommended_encoder_device(), ENCODER_DEVICE_CPU)

def test_cpu_encoder_when_no_video_raw_caps(self):
"""Test that CPU encoder is recommended when no video/x-raw caps exist."""
graph = Graph(
nodes=[
Node(id="0", type="filesrc", data={"location": "test.mp4"}),
Node(id="1", type="decodebin3", data={}),
Node(id="2", type="queue", data={}),
Node(id="3", type="fakesink", data={}),
],
edges=[
Edge(id="0", source="0", target="1"),
Edge(id="1", source="1", target="2"),
Edge(id="2", source="2", target="3"),
],
)

self.assertEqual(graph.get_recommended_encoder_device(), ENCODER_DEVICE_CPU)

def test_uses_last_video_raw_caps_when_multiple_exist(self):
"""Test that the method uses the last video/x-raw caps in the pipeline."""
graph = Graph(
nodes=[
Node(id="0", type="filesrc", data={"location": "test.mp4"}),
Node(
id="1",
type="video/x-raw",
data={"__node_kind": "caps", "width": "640"},
),
Node(id="2", type="queue", data={}),
Node(
id="3",
type="video/x-raw(memory:VAMemory)",
data={"__node_kind": "caps"},
),
Node(id="4", type="fakesink", data={}),
],
edges=[
Edge(id="0", source="0", target="1"),
Edge(id="1", source="1", target="2"),
Edge(id="2", source="2", target="3"),
Edge(id="3", source="3", target="4"),
],
)

# Should return GPU because the last video/x-raw has VAMemory
self.assertEqual(graph.get_recommended_encoder_device(), ENCODER_DEVICE_GPU)

def test_iterates_backwards_through_nodes(self):
"""Test that the method iterates backwards (uses last occurrence, not first)."""
graph = Graph(
nodes=[
Node(id="0", type="filesrc", data={"location": "test.mp4"}),
Node(
id="1",
type="video/x-raw(memory:VAMemory)",
data={"__node_kind": "caps"},
),
Node(id="2", type="queue", data={}),
Node(
id="3", type="video/x-raw", data={"__node_kind": "caps"}
), # Last one, no VAMemory
Node(id="4", type="fakesink", data={}),
],
edges=[
Edge(id="0", source="0", target="1"),
Edge(id="1", source="1", target="2"),
Edge(id="2", source="2", target="3"),
Edge(id="3", source="3", target="4"),
],
)

# Should return CPU because iterating backwards finds node 3 first (no VAMemory)
self.assertEqual(graph.get_recommended_encoder_device(), ENCODER_DEVICE_CPU)

def test_empty_graph(self):
"""Test that CPU encoder is recommended for an empty graph."""
graph = Graph(nodes=[], edges=[])

self.assertEqual(graph.get_recommended_encoder_device(), ENCODER_DEVICE_CPU)


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from api.api_schemas import (
Node,
Edge,
EncoderDeviceConfig,
PipelineType,
PipelineSource,
PipelineGraph,
Expand Down Expand Up @@ -458,7 +457,6 @@ def test_build_pipeline_command_with_video_output_enabled(self):
pipeline_performance_specs = [PipelinePerformanceSpec(id=added.id, streams=1)]
video_config = VideoOutputConfig(
enabled=True,
encoder_device=EncoderDeviceConfig(device_name="CPU", gpu_id=None),
)

command, output_paths = manager.build_pipeline_command(
Expand Down Expand Up @@ -497,7 +495,6 @@ def test_build_pipeline_command_with_gpu_encoder(self):
pipeline_performance_specs = [PipelinePerformanceSpec(id=added.id, streams=2)]
video_config = VideoOutputConfig(
enabled=True,
encoder_device=EncoderDeviceConfig(device_name="GPU", gpu_id=0),
)

command, output_paths = manager.build_pipeline_command(
Expand Down Expand Up @@ -544,7 +541,6 @@ def test_build_pipeline_command_video_output_multiple_pipelines(self):
]
video_config = VideoOutputConfig(
enabled=True,
encoder_device=EncoderDeviceConfig(device_name="CPU", gpu_id=None),
)

command, output_paths = manager.build_pipeline_command(
Expand Down
Loading
Loading