Skip to content

Commit

Permalink
fix: Hacky way to wait prediction service to be ready
Browse files Browse the repository at this point in the history
Signed-off-by: pitt-liang <[email protected]>
  • Loading branch information
pitt-liang committed Dec 29, 2023
1 parent 88d5b44 commit 7c75c17
Showing 1 changed file with 46 additions and 5 deletions.
51 changes: 46 additions & 5 deletions pai/predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ def completed_status(cls):


class EndpointType(object):

# Public Internet Endpoint
INTERNET = "INTERNET"

Expand All @@ -82,7 +81,6 @@ class EndpointType(object):


class ServiceType(object):

Standard = "Standard"
Async = "Async"

Expand Down Expand Up @@ -296,22 +294,65 @@ def delete_service(self):
"""Delete the service."""
self.session.service_api.delete(name=self.service_name)

def wait_for_ready(self):
"""Wait until the service enter running status."""
def wait_for_ready(self, force: bool = False):
"""Wait until the service enter running status.
Args:
force (bool): Whether to force wait for ready.
Raises:
RuntimeError: Raise if the service terminated unexpectedly.
"""
if self.service_status == ServiceStatus.Running and not force:
return

logger.info(
"Service waiting for ready: service_name={}".format(self.service_name)
)
unexpected_status = ServiceStatus.completed_status()
unexpected_status.remove(ServiceStatus.Running)

type(self)._wait_for_status(
service_name=self.service_name,
status=ServiceStatus.Running,
unexpected_status=unexpected_status,
session=self.session,
)

# hack: PAI-EAS gateway may not be ready when the service is ready.
self._wait_for_gateway_ready()
self.refresh()

def _wait_for_gateway_ready(self, attempts: int = 30, interval: int = 2):
"""Hacky way to wait for the service gateway to be ready.
Args:
attempts (int): Number of attempts to wait for the service gateway to be
ready.
interval (int): Interval between each attempt.
"""

def _is_gateway_not_ready(resp: requests.Response):
return resp.status_code == 503 and resp.content == b"no healthy upstream"

err_count_threshold = 3
err_count = 0
while attempts > 0:
attempts -= 1
try:
# Send a probe request to the service.
resp = self._send_request(method="GET")
if not _is_gateway_not_ready(resp):
break
except requests.exceptions.RequestException as e:
err_count += 1
if err_count >= err_count_threshold:
logger.warning("Failed to check gateway status: %s", e)
break
time.sleep(interval)
else:
logger.warning("Timeout waiting for gateway to be ready.")

@classmethod
def _wait_for_status(
cls,
Expand Down

0 comments on commit 7c75c17

Please sign in to comment.