Skip to content

Commit

Permalink
Watsonx-Serving: pipelining to the prepare steps (openshift-psap#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
kpouget committed Aug 26, 2023
2 parents db10ab6 + 5202aa5 commit b761c62
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 31 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ flake8
pylint
awscli
numpy
joblib

jsonpath_ng
state-signals==0.5.2
42 changes: 42 additions & 0 deletions testing/common/run.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import sys, os, signal
import traceback
import logging
logging.getLogger().setLevel(logging.INFO)

import subprocess

import joblib

# create new process group, become its leader
os.setpgrp()

def run(command, capture_stdout=False, capture_stderr=False, check=True, protect_shell=True, cwd=None, stdin_file=None):
logging.info(f"run: {command}")
args = {}
Expand All @@ -27,3 +34,38 @@ def run(command, capture_stdout=False, capture_stderr=False, check=True, protect
if capture_stderr: proc.stderr = proc.stderr.decode("utf8")

return proc

class Parallel(object):
def __init__(self, exit_on_exception=True):
self.parallel_tasks = None
self.exit_on_exception = exit_on_exception

def __enter__(self):
self.parallel_tasks = []

return self

def delayed(self, function, *args, **kwargs):
self.parallel_tasks += [joblib.delayed(function)(*args, **kwargs)]

def __exit__(self, ex_type, ex_value, exc_traceback):

if ex_value:
logging.warning("An exception occured while preparing the Parallel execution ...")
return False

try:
joblib.Parallel(n_jobs=-1, backend="threading")(self.parallel_tasks)
except Exception as e:
if not self.exit_on_exception:
raise e

traceback.print_exc()

logging.error(f"Exception caught during the parallel execution. Exiting.")
# kill all processes in my group
# (the group was started with the os.setpgrp() above)
os.killpg(0, signal.SIGKILL)
sys.exit(1)

return False # If we returned True here, any exception would be suppressed!
42 changes: 28 additions & 14 deletions testing/watsonx-serving/prepare_scale.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,37 @@
import prepare_watsonx_serving


def prepare_sutest():
with run.Parallel() as parallel:
parallel.delayed(prepare_watsonx_serving.prepare)
parallel.delayed(prepare_watsonx_serving.scale_up_sutest)

with run.Parallel() as parallel:
parallel.delayed(prepare_watsonx_serving.preload_image)
parallel.delayed(prepare_gpu)


def prepare_gpu():
if not config.ci_artifacts.get_config("tests.want_gpu"):
return

prepare_gpu_operator.prepare_gpu_operator()

if config.ci_artifacts.get_config("clusters.sutest.compute.dedicated"):
toleration_key = config.ci_artifacts.get_config("clusters.sutest.compute.machineset.taint.key")
toleration_effect = config.ci_artifacts.get_config("clusters.sutest.compute.machineset.taint.effect")
prepare_gpu_operator.add_toleration(toleration_effect, toleration_key)

run.run("./run_toolbox.py gpu_operator wait_stack_deployed")


def prepare():
"""
Prepares the cluster and the namespace for running the Watsonx scale tests
"""

prepare_watsonx_serving.prepare()
prepare_watsonx_serving.prepare_sutest()

if config.ci_artifacts.get_config("tests.want_gpu"):
prepare_gpu_operator.prepare_gpu_operator()

if config.ci_artifacts.get_config("clusters.sutest.compute.dedicated"):
toleration_key = config.ci_artifacts.get_config("clusters.sutest.compute.machineset.taint.key")
toleration_effect = config.ci_artifacts.get_config("clusters.sutest.compute.machineset.taint.effect")
prepare_gpu_operator.add_toleration(toleration_effect, toleration_key)

run.run("./run_toolbox.py gpu_operator wait_stack_deployed")
with run.Parallel() as parallel:
parallel.delayed(prepare_sutest)

namespace = config.ci_artifacts.get_config("base_image.namespace")
prepare_user_pods.prepare_user_pods(namespace)
namespace = config.ci_artifacts.get_config("base_image.namespace")
parallel.delayed(prepare_user_pods.prepare_user_pods, namespace)
42 changes: 25 additions & 17 deletions testing/watsonx-serving/prepare_watsonx_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ def prepare():
token_file = PSAP_ODS_SECRET_PATH / config.ci_artifacts.get_config("secrets.brew_registry_redhat_io_token_file")
rhods.install(token_file)

for operator in config.ci_artifacts.get_config("prepare.operators"):
run.run(f"./run_toolbox.py cluster deploy_operator {operator['catalog']} {operator['name']} {operator['namespace']}")
with run.Parallel() as parallel:
for operator in config.ci_artifacts.get_config("prepare.operators"):
parallel.delayed(run.run, f"./run_toolbox.py cluster deploy_operator {operator['catalog']} {operator['name']} {operator['namespace']}")

run.run("testing/watsonx-serving/poc/prepare.sh | tee -a $ARTIFACT_DIR/000_prepare_sh.log")


def prepare_sutest():
def scale_up_sutest():
if config.ci_artifacts.get_config("clusters.sutest.is_metal"):
return

Expand All @@ -44,17 +45,24 @@ def prepare_sutest():

run.run(f"./run_toolbox.py from_config cluster set_scale --prefix=sutest")

if config.ci_artifacts.get_config("clusters.sutest.compute.dedicated"):
# this is required to properly create the namespace used to preload the image
test_namespace = config.ci_artifacts.get_config("tests.scale.namespace")
test_scale.prepare_user_namespace(test_namespace)

RETRIES = 3
for i in range(RETRIES):
try:
run.run("./run_toolbox.py from_config cluster preload_image --prefix sutest --suffix watsonx-serving-runtime")
break
except Exception:
logging.warning("Watsonx Serving Runtime image preloading try #{i}/{RETRIES} failed :/")
if i == RETRIES:
raise

def preload_image():
if config.ci_artifacts.get_config("clusters.sutest.is_metal"):
return

if not config.ci_artifacts.get_config("clusters.sutest.compute.dedicated"):
return

# this is required to properly create the namespace used to preload the image
test_namespace = config.ci_artifacts.get_config("tests.scale.namespace")
test_scale.prepare_user_namespace(test_namespace)

RETRIES = 3
for i in range(RETRIES):
try:
run.run("./run_toolbox.py from_config cluster preload_image --prefix sutest --suffix watsonx-serving-runtime")
break
except Exception:
logging.warning("Watsonx Serving Runtime image preloading try #{i}/{RETRIES} failed :/")
if i == RETRIES:
raise
3 changes: 3 additions & 0 deletions testing/watsonx-serving/test_scale.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ def validate_model_deployment(namespace):

if tries == 1:
run.run(f"""grpcurl -vv -insecure -d '{{"text": "At what temperature does liquid Nitrogen boil?"}}' -H "mm-model-id: flan-t5-small-caikit" {ksvc_hostname}:443 caikit.runtime.Nlp.NlpService/TextGenerationTaskPredict &> {env.ARTIFACT_DIR}/artifacts/Invalid.answer""", check=False)
run.run(f"oc get pods -n {namespace} > {env.ARTIFACT_DIR}/artifacts/Invalid.answer.pod.status")
run.run(f"oc get pods -n {namespace} -oyaml > {env.ARTIFACT_DIR}/artifacts/Invalid.answer.pod.yaml")

time.sleep(0.5)

Expand All @@ -179,6 +181,7 @@ def validate_model_deployment(namespace):
yaml.dump(model_ready, f, indent=4)

logging.info(f"The model responded properly after {model_ready['duration_s']:.0f} seconds.")
run.run(f"oc get pods -n {namespace} -oyaml > {env.ARTIFACT_DIR}/artifacts/Valid.answer.pod.yaml")

logging.info(f"Querying the ServerStreamingTextGenerationTaskPredict endpoint ...")
run.run(f"""grpcurl -insecure -d '{{"text": "At what temperature does liquid Nitrogen boil?"}}' -H "mm-model-id: flan-t5-small-caikit" {ksvc_hostname}:443 caikit.runtime.Nlp.NlpService/ServerStreamingTextGenerationTaskPredict > {env.ARTIFACT_DIR}/artifacts/ServerStreamingTextGenerationTaskPredict.answer""")
Expand Down

0 comments on commit b761c62

Please sign in to comment.