Skip to content

Commit

Permalink
[model server]Add base multi node tests (#85)
Browse files Browse the repository at this point in the history
* multi node tests

* Create size-labeler.yml

* Delete .github/workflows/size-labeler.yml

* add multi node base test

* add test

* fix expected res
  • Loading branch information
rnetser authored Jan 12, 2025
1 parent e814822 commit f4c0992
Show file tree
Hide file tree
Showing 15 changed files with 397 additions and 90 deletions.
35 changes: 35 additions & 0 deletions tests/model_serving/model_server/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
from ocp_resources.cluster_service_version import ClusterServiceVersion
from ocp_resources.inference_service import InferenceService
from ocp_resources.namespace import Namespace
from ocp_resources.persistent_volume_claim import PersistentVolumeClaim
from ocp_resources.secret import Secret
from ocp_resources.service_account import ServiceAccount
from ocp_resources.serving_runtime import ServingRuntime
from ocp_resources.storage_class import StorageClass

from tests.model_serving.model_server.utils import create_isvc
from utilities.constants import StorageClassName
from utilities.infra import s3_endpoint_secret
from utilities.serving_runtime import ServingRuntimeFromTemplate

Expand Down Expand Up @@ -125,3 +128,35 @@ def s3_models_inference_service(

with create_isvc(**isvc_kwargs) as isvc:
yield isvc


@pytest.fixture(scope="class")
def model_pvc(
request: FixtureRequest,
admin_client: DynamicClient,
model_namespace: Namespace,
) -> Generator[PersistentVolumeClaim, Any, Any]:
access_mode = "ReadWriteOnce"
pvc_kwargs = {
"name": "model-pvc",
"namespace": model_namespace.name,
"client": admin_client,
"size": request.param["pvc-size"],
}
if hasattr(request, "param"):
access_mode = request.param.get("access-modes")

if storage_class_name := request.param.get("storage-class-name"):
pvc_kwargs["storage_class"] = storage_class_name

pvc_kwargs["accessmodes"] = access_mode

with PersistentVolumeClaim(**pvc_kwargs) as pvc:
pvc.wait_for_status(status=pvc.Status.BOUND, timeout=120)
yield pvc


@pytest.fixture(scope="session")
def skip_if_no_nfs_storage_class(admin_client: DynamicClient) -> None:
if not StorageClass(client=admin_client, name=StorageClassName.NFS).exists:
pytest.skip(f"StorageClass {StorageClassName.NFS} is missing from the cluster")
Empty file.
101 changes: 101 additions & 0 deletions tests/model_serving/model_server/multi_node/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from typing import Any, Generator, List

import pytest
from _pytest.fixtures import FixtureRequest
from kubernetes.dynamic import DynamicClient
from ocp_resources.inference_service import InferenceService
from ocp_resources.namespace import Namespace
from ocp_resources.node import Node
from ocp_resources.persistent_volume_claim import PersistentVolumeClaim
from ocp_resources.pod import Pod
from ocp_resources.serving_runtime import ServingRuntime

from tests.model_serving.model_server.utils import create_isvc
from utilities.constants import KServeDeploymentType
from utilities.general import download_model_data
from utilities.infra import (
get_pods_by_isvc_label,
wait_for_inference_deployment_replicas,
)


@pytest.fixture(scope="session")
def nodes(admin_client: DynamicClient) -> list[Node]:
return list(Node.get(dyn_client=admin_client))


@pytest.fixture(scope="session")
def nvidia_gpu_nodes(nodes: list[Node]) -> list[Node]:
return [node for node in nodes if "nvidia.com/gpu.present" in node.labels.keys()]


@pytest.fixture(scope="session")
def skip_if_no_gpu_nodes(nvidia_gpu_nodes):
if len(nvidia_gpu_nodes) < 2:
pytest.skip("Multi-node tests can only run on a Cluster with at least 2 GPU Worker nodes")


@pytest.fixture(scope="class")
def models_bucket_downloaded_model_data(
request: FixtureRequest,
admin_client: DynamicClient,
model_namespace: Namespace,
models_s3_bucket_name: str,
model_pvc: PersistentVolumeClaim,
aws_secret_access_key: str,
aws_access_key_id: str,
models_s3_bucket_endpoint: str,
models_s3_bucket_region: str,
) -> str:
return download_model_data(
admin_client=admin_client,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
model_namespace=model_namespace.name,
model_pvc_name=model_pvc.name,
bucket_name=models_s3_bucket_name,
aws_endpoint_url=models_s3_bucket_endpoint,
aws_default_region=models_s3_bucket_region,
model_path=request.param["model-dir"],
)


@pytest.fixture(scope="class")
def multi_node_inference_service(
request: FixtureRequest,
admin_client: DynamicClient,
model_namespace: Namespace,
serving_runtime_from_template: ServingRuntime,
model_pvc: PersistentVolumeClaim,
models_bucket_downloaded_model_data: str,
) -> Generator[InferenceService, Any, Any]:
with create_isvc(
client=admin_client,
name=request.param["name"],
namespace=model_namespace.name,
runtime=serving_runtime_from_template.name,
storage_uri=f"pvc://{model_pvc.name}/{models_bucket_downloaded_model_data}",
model_format=serving_runtime_from_template.instance.spec.supportedModelFormats[0].name,
deployment_mode=KServeDeploymentType.RAW_DEPLOYMENT,
autoscaler_mode="external",
multi_node_worker_spec={},
wait_for_predictor_pods=False,
) as isvc:
wait_for_inference_deployment_replicas(
client=admin_client,
isvc=isvc,
deployment_mode=KServeDeploymentType.RAW_DEPLOYMENT,
expected_num_deployments=2,
)
yield isvc


@pytest.fixture(scope="class")
def multi_node_predictor_pods_scope_class(
admin_client: DynamicClient,
multi_node_inference_service: InferenceService,
) -> List[Pod]:
return get_pods_by_isvc_label(
client=admin_client,
isvc=multi_node_inference_service,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import pytest

from tests.model_serving.model_server.multi_node.utils import (
verify_nvidia_gpu_status,
verify_ray_status,
)
from tests.model_serving.model_server.utils import verify_inference_response
from utilities.constants import ModelInferenceRuntime, Protocols, StorageClassName

pytestmark = pytest.mark.usefixtures("skip_if_no_gpu_nodes", "skip_if_no_nfs_storage_class")


@pytest.mark.parametrize(
"model_namespace, models_bucket_downloaded_model_data, model_pvc, "
"serving_runtime_from_template, multi_node_inference_service",
[
pytest.param(
{"name": "gpu-multi-node"},
{"model-dir": "granite-8b-code-base"},
{
"access-modes": "ReadWriteMany",
"storage-class-name": StorageClassName.NFS,
"pvc-size": "40Gi",
},
{
"name": "granite-runtime",
"template-name": "vllm-multinode-runtime-template",
"multi-model": False,
},
{"name": "multi-vllm"},
)
],
indirect=True,
)
class TestMultiNode:
def test_multi_node_ray_status(self, multi_node_predictor_pods_scope_class):
"""Test multi node ray status"""
verify_ray_status(pods=multi_node_predictor_pods_scope_class)

def test_multi_node_nvidia_gpu_status(self, multi_node_predictor_pods_scope_class):
"""Test multi node ray status"""
verify_nvidia_gpu_status(pod=multi_node_predictor_pods_scope_class[0])

def test_multi_node_default_config(self, serving_runtime_from_template, multi_node_predictor_pods_scope_class):
"""Test multi node inference service with default config"""
runtime_worker_spec = serving_runtime_from_template.instance.spec.workerSpec

if runtime_worker_spec.tensorParallelSize != 1 or runtime_worker_spec.pipelineParallelSize != 2:
pytest.fail(f"Multinode runtime default worker spec is not as expected, {runtime_worker_spec}")

def test_multi_node_pods_distribution(self, multi_node_predictor_pods_scope_class, nvidia_gpu_nodes):
"""Verify multi node pods are distributed between cluster GPU nodes"""
pods_nodes = {pod.node.name for pod in multi_node_predictor_pods_scope_class}
assert len(multi_node_predictor_pods_scope_class) == len(pods_nodes), (
"Pods are not distributed between cluster GPU nodes"
)

assert pods_nodes.issubset({node.name for node in nvidia_gpu_nodes}), "Pods not running on GPU nodes"

def test_multi_node_basic_inference(self, multi_node_inference_service):
"""Test multi node basic inference"""
verify_inference_response(
inference_service=multi_node_inference_service,
runtime=ModelInferenceRuntime.VLLM_RUNTIME,
inference_type="completions",
protocol=Protocols.HTTP,
use_default_query=True,
)
40 changes: 40 additions & 0 deletions tests/model_serving/model_server/multi_node/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import re
import shlex
from typing import Dict, List

from ocp_resources.pod import Pod


def verify_ray_status(pods: List[Pod]) -> None:
cmd = shlex.split("ray status")
ray_failures: Dict[str, List[str]] = {}
res = None
for pod in pods:
res = pod.execute(command=cmd)
if res_regex := re.search(
r"Active:\n(?P<active>.*)\nPending:\n(?P<pending>.*)\nRecent.*CPU\n(?P<gpu>.*)GPU",
res,
re.IGNORECASE | re.DOTALL,
):
ray_formatted_result = res_regex.groupdict()
if len(ray_formatted_result["active"].split("\n")) != len(pods):
ray_failures.setdefault(pod.name, []).append("Wrong number of active nodes")

if "no pending nodes" not in ray_formatted_result["pending"]:
ray_failures.setdefault(pod.name, []).append("Some nodes are pending")

if (gpus := ray_formatted_result["gpu"].strip().split("/")) and gpus[0] != gpus[1]:
ray_failures.setdefault(pod.name, []).append("Wrong number of GPUs")

assert not ray_failures, f"Failure in ray status check: {ray_failures}, {res}"


def verify_nvidia_gpu_status(pod: Pod) -> None:
res = pod.execute(command=shlex.split("nvidia-smi --query-gpu=memory.used --format=csv"))
mem_regex = re.search(r"(\d+)", res)

if not mem_regex:
raise ValueError(f"Could not find memory usage in response, {res}")

elif mem_regex and int(mem_regex.group(1)) == 0:
raise ValueError(f"GPU memory is not used, {res}")
3 changes: 0 additions & 3 deletions tests/model_serving/model_server/storage/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,3 @@
"multi-model": False,
}
INFERENCE_SERVICE_PARAMS: Dict[str, str] = {"name": ModelFormat.ONNX}

# Storage
NFS_STR: str = "nfs"
Loading

0 comments on commit f4c0992

Please sign in to comment.