Skip to content

Commit

Permalink
add multi node base test
Browse files Browse the repository at this point in the history
  • Loading branch information
rnetser committed Jan 6, 2025
1 parent 1c73e5b commit b98eb3b
Show file tree
Hide file tree
Showing 15 changed files with 317 additions and 147 deletions.
36 changes: 35 additions & 1 deletion tests/model_serving/model_server/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
from kubernetes.dynamic import DynamicClient
from ocp_resources.cluster_service_version import ClusterServiceVersion
from ocp_resources.namespace import Namespace
from ocp_resources.persistent_volume_claim import PersistentVolumeClaim
from ocp_resources.secret import Secret
from ocp_resources.service_account import ServiceAccount
from ocp_resources.serving_runtime import ServingRuntime
from ocp_resources.storage_class import StorageClass

from utilities.constants import Protocols, ModelInferenceRuntime, RuntimeTemplates
from utilities.constants import Protocols, ModelInferenceRuntime, RuntimeTemplates, StorageClassName
from utilities.infra import s3_endpoint_secret
from utilities.serving_runtime import ServingRuntimeFromTemplate

Expand Down Expand Up @@ -104,3 +106,35 @@ def serving_runtime_from_template(
@pytest.fixture(scope="class")
def ci_s3_storage_uri(request: FixtureRequest, ci_s3_bucket_name: str) -> str:
return f"s3://{ci_s3_bucket_name}/{request.param['model-dir']}/"


@pytest.fixture(scope="class")
def model_pvc(
request: FixtureRequest,
admin_client: DynamicClient,
model_namespace: Namespace,
) -> Generator[PersistentVolumeClaim, Any, Any]:
access_mode = "ReadWriteOnce"
pvc_kwargs = {
"name": "model-pvc",
"namespace": model_namespace.name,
"client": admin_client,
"size": request.param["pvc-size"],
}
if hasattr(request, "param"):
access_mode = request.param.get("access-modes")

if storage_class_name := request.param.get("storage-class-name"):
pvc_kwargs["storage_class"] = storage_class_name

pvc_kwargs["accessmodes"] = access_mode

with PersistentVolumeClaim(**pvc_kwargs) as pvc:
pvc.wait_for_status(status=pvc.Status.BOUND, timeout=120)
yield pvc


@pytest.fixture(scope="session")
def skip_if_no_nfs_storage_class(admin_client: DynamicClient) -> None:
if not StorageClass(client=admin_client, name=StorageClassName.NFS).exists:
pytest.skip(f"StorageClass {StorageClassName.NFS} is missing from the cluster")
65 changes: 32 additions & 33 deletions tests/model_serving/model_server/multi_node/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Generator
from typing import Any, Generator, List

import pytest
from _pytest.fixtures import FixtureRequest
Expand All @@ -7,13 +7,13 @@
from ocp_resources.namespace import Namespace
from ocp_resources.node import Node
from ocp_resources.persistent_volume_claim import PersistentVolumeClaim
from ocp_resources.pod import Pod
from ocp_resources.serving_runtime import ServingRuntime

from tests.model_serving.model_server.utils import create_isvc
from utilities.constants import KServeDeploymentType
from utilities.general import download_model_data
from utilities.infra import wait_for_kserve_predictor_deployment_replicas
from utilities.serving_runtime import ServingRuntimeFromTemplate
from utilities.infra import get_pods_by_isvc_label, wait_for_kserve_predictor_deployment_replicas


@pytest.fixture(scope="session")
Expand All @@ -22,72 +22,71 @@ def nodes(admin_client: DynamicClient) -> list[Node]:


@pytest.fixture(scope="session")
def gpu_nodes(nodes: list[Node]) -> list[Node]:
def nvidia_gpu_nodes(nodes: list[Node]) -> list[Node]:
return [node for node in nodes if "nvidia.com/gpu.present" in node.labels.keys()]


@pytest.fixture(scope="session")
def skip_if_no_gpu_nodes(gpu_nodes):
if len(gpu_nodes) < 2:
def skip_if_no_gpu_nodes(nvidia_gpu_nodes):
if len(nvidia_gpu_nodes) < 2:
pytest.skip("Multi-node tests can only run on a Cluster with at least 2 GPU Worker nodes")


@pytest.fixture(scope="class")
def downloaded_model_data(
def models_bucket_downloaded_model_data(
request: FixtureRequest,
admin_client: DynamicClient,
model_namespace: Namespace,
s3_models_storage_uri: str,
models_s3_bucket_name: str,
model_pvc: PersistentVolumeClaim,
aws_secret_access_key: str,
aws_access_key_id: str,
models_s3_bucket_endpoint: str,
models_s3_bucket_region: str,
) -> str:
return download_model_data(
admin_client=admin_client,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
storage_uri=s3_models_storage_uri,
model_namespace=model_namespace.name,
model_pvc_name=model_pvc.name,
bucket_name=models_s3_bucket_name,
aws_endpoint_url=models_s3_bucket_endpoint,
aws_default_region=models_s3_bucket_region,
model_path=request.param["model-dir"],
)


@pytest.fixture(scope="class")
def multi_node_serving_runtime(
request: FixtureRequest,
admin_client: DynamicClient,
model_namespace: Namespace,
) -> Generator[ServingRuntime, Any, Any]:
with ServingRuntimeFromTemplate(
client=admin_client,
name=request.param["name"],
namespace=model_namespace.name,
template_name=request.param["template-name"],
) as model_runtime:
yield model_runtime


@pytest.fixture(scope="class")
def multi_node_inference_service(
request: FixtureRequest,
admin_client: DynamicClient,
model_namespace: Namespace,
multi_node_serving_runtime: ServingRuntime,
serving_runtime_from_template: ServingRuntime,
model_pvc: PersistentVolumeClaim,
downloaded_model_data: str,
models_bucket_downloaded_model_data: str,
) -> Generator[InferenceService, Any, Any]:
with create_isvc(
client=admin_client,
name=request.param["name"],
namespace=model_namespace.name,
runtime=multi_node_serving_runtime.name,
storage_uri=f"pvc://{model_pvc.name}/{downloaded_model_data}",
model_format=multi_node_serving_runtime.instance.spec.supportedModelFormats[0].name,
runtime=serving_runtime_from_template.name,
storage_uri=f"pvc://{model_pvc.name}/{models_bucket_downloaded_model_data}",
model_format=serving_runtime_from_template.instance.spec.supportedModelFormats[0].name,
deployment_mode=KServeDeploymentType.RAW_DEPLOYMENT,
autoscaler_mode="external",
multi_node_worker_spec={},
) as isvc:
wait_for_kserve_predictor_deployment_replicas(
client=admin_client,
isvc=isvc,
)
wait_for_kserve_predictor_deployment_replicas(client=admin_client, isvc=isvc, expected_num_deployments=2)
yield isvc


@pytest.fixture(scope="class")
def multi_node_predictor_pods_scope_class(
admin_client: DynamicClient,
multi_node_inference_service: InferenceService,
) -> List[Pod]:
return get_pods_by_isvc_label(
client=admin_client,
isvc=multi_node_inference_service,
)
27 changes: 0 additions & 27 deletions tests/model_serving/model_server/multi_node/test_multi_node.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import pytest

from tests.model_serving.model_server.multi_node.utils import verify_nvidia_gpu_status, verify_ray_status
from tests.model_serving.model_server.utils import verify_inference_response
from utilities.constants import ModelInferenceRuntime, Protocols, StorageClassName

pytestmark = pytest.mark.usefixtures("skip_if_no_gpu_nodes", "skip_if_no_nfs_storage_class")


@pytest.mark.parametrize(
"model_namespace, models_bucket_downloaded_model_data, model_pvc, "
"serving_runtime_from_template, multi_node_inference_service",
[
pytest.param(
{"name": "gpu-multi-node"},
{"model-dir": "granite-8b-code-base"},
{
"access-modes": "ReadWriteMany",
"storage-class-name": StorageClassName.NFS,
"pvc-size": "40Gi",
},
{
"name": "granite-runtime",
"template-name": "vllm-multinode-runtime-template",
"multi-model": False,
},
{"name": "multi-vllm"},
)
],
indirect=True,
)
class TestMultiNode:
def test_multi_node_ray_status(self, multi_node_predictor_pods_scope_class):
"""Test multi node ray status"""
verify_ray_status(pods=multi_node_predictor_pods_scope_class)

def test_multi_node_nvidia_gpu_status(self, multi_node_predictor_pods_scope_class):
"""Test multi node ray status"""
verify_nvidia_gpu_status(pod=multi_node_predictor_pods_scope_class[0])

def test_multi_node_default_config(self, serving_runtime_from_template, multi_node_predictor_pods_scope_class):
"""Test multi node inference service with default config"""
runtime_worker_spec = serving_runtime_from_template.instance.spec.workerSpec

if runtime_worker_spec.tensorParallelSize != 1 or runtime_worker_spec.pipelineParallelSize != 2:
pytest.fail("Multinode runtime default worker spec is not as expected, {runtime_worker_spec")

def test_multi_node_basic_inference(self, multi_node_inference_service):
"""Test multi node basic inference"""
verify_inference_response(
inference_service=multi_node_inference_service,
runtime=ModelInferenceRuntime.VLLM_RUNTIME,
inference_type="completions",
protocol=Protocols.HTTP,
use_default_query=True,
)
40 changes: 40 additions & 0 deletions tests/model_serving/model_server/multi_node/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import re
import shlex
from typing import Dict, List

from ocp_resources.pod import Pod


def verify_ray_status(pods: List[Pod]) -> None:
cmd = shlex.split("ray status")
ray_failures: Dict[str, List[str]] = {}
res = None
for pod in pods:
res = pod.execute(command=cmd)
if res_regex := re.search(
r"Active:\n(?P<active>.*)\nPending:\n(?P<pending>.*)\nRecent.*CPU\n(?P<gpu>.*)GPU",
res,
re.IGNORECASE | re.DOTALL,
):
ray_formatted_result = res_regex.groupdict()
if len(ray_formatted_result["active"].split("\n")) != len(pods):
ray_failures.setdefault(pod.name, []).append("Wrong number of active nodes")

if "no pending nodes" not in ray_formatted_result["pending"]:
ray_failures.setdefault(pod.name, []).append("Some nodes are pending")

if (gpus := ray_formatted_result["gpu"].strip().split("/")) and gpus[0] != gpus[1]:
ray_failures.setdefault(pod.name, []).append("Wrong number of GPUs")

assert not ray_failures, f"Failure in ray status check: {ray_failures}, {res}"


def verify_nvidia_gpu_status(pod: Pod) -> None:
res = pod.execute(command=shlex.split("nvidia-smi --query-gpu=memory.used --format=csv"))
mem_regex = re.search(r"(\d+)", res)

if not mem_regex:
raise ValueError(f"Could not find memory usage in response, {res}")

elif mem_regex and int(mem_regex.group(1)) == 0:
raise ValueError(f"GPU memory is not used, {res}")
3 changes: 0 additions & 3 deletions tests/model_serving/model_server/storage/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,3 @@
"multi-model": False,
}
INFERENCE_SERVICE_PARAMS: Dict[str, str] = {"name": ModelFormat.ONNX}

# Storage
NFS_STR: str = "nfs"
Loading

0 comments on commit b98eb3b

Please sign in to comment.