diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml deleted file mode 100644 index e70a926..0000000 --- a/.github/workflows/integration.yaml +++ /dev/null @@ -1,29 +0,0 @@ -name: Integration test - -on: - push: - branches: - - master - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: true - -jobs: - integration-test: - runs-on: ubuntu-latest - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - TEST_CONFIG_FILE: ${{ secrets.TEST_CONFIG_FILE }} - steps: - - uses: actions/checkout@v4 - - name: Set up Python 3.8 - uses: actions/setup-python@v5 - with: - python-version: "3.8" - - name: Prepare test configuration - run: echo $TEST_CONFIG_FILE > tests/integration/test.ini - - name: Install Nox - run: pip install nox - - name: Run integration test - run: nox -s integration diff --git a/.github/workflows/unit.yaml b/.github/workflows/unit.yaml index de74d41..9d91f5b 100644 --- a/.github/workflows/unit.yaml +++ b/.github/workflows/unit.yaml @@ -7,7 +7,7 @@ concurrency: cancel-in-progress: true jobs: - lint: + unit-test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 diff --git a/noxfile.py b/noxfile.py index cbba9d7..686429b 100644 --- a/noxfile.py +++ b/noxfile.py @@ -38,7 +38,7 @@ def install_test_dependencies(session: Session): session.install("-r", TEST_REQUIREMENTS) -@nox.session(venv_backend=TEST_VENV_BACKEND, python=INTEGRATION_TEST_PYTHON_VERSIONS) +@nox.session(venv_backend=TEST_VENV_BACKEND) def integration(session: Session): """Run integration test.""" install_test_dependencies(session=session) diff --git a/pai/estimator.py b/pai/estimator.py index 33b6bfd..55887e8 100644 --- a/pai/estimator.py +++ b/pai/estimator.py @@ -867,6 +867,7 @@ def __init__( instance_count=instance_count, user_vpc_config=user_vpc_config, experiment_config=experiment_config, + resource_id=resource_id, session=session, ) @@ -1437,6 +1438,7 @@ def _fit(self, job_name, inputs: Dict[str, Any] = None): experiment_config=self.experiment_config.to_dict() if self.experiment_config else None, + labels=self.labels, ) training_job = _TrainingJob.get(training_job_id) print( diff --git a/pai/model.py b/pai/model.py index d2345bc..2ee1e08 100644 --- a/pai/model.py +++ b/pai/model.py @@ -55,6 +55,9 @@ # Reserved ports for internal use, do not use them for service _RESERVED_PORTS = [8080, 9090] +# Default model upstream source +MODEL_TASK_CREATED_BY_QUICKSTART = "QuickStart" + class DefaultServiceConfig(object): """Default configuration used in creating prediction service.""" @@ -851,6 +854,7 @@ def _deploy( wait: bool = True, serializer: "SerializerBase" = None, labels: Optional[Dict[str, str]] = None, + **kwargs, ): """Create a prediction service.""" if not service_name: @@ -1723,6 +1727,20 @@ def deploy( if not self.inference_spec: raise RuntimeError("No inference_spec for the registered model.") + labels = kwargs.pop("labels", dict()) + if self.model_provider == ProviderAlibabaPAI: + default_labels = { + "Task": self.task, + "RootModelName": self.model_name, + "RootModelVersion": self.model_version, + "RootModelID": self.model_id, + "Domain": self.domain, + "CreatedBy": MODEL_TASK_CREATED_BY_QUICKSTART, + "BaseModelUri": self.uri, + } + default_labels.update(labels) + labels = default_labels + if is_local_run_instance_type(instance_type): return self._deploy_local( instance_type=instance_type, @@ -1740,6 +1758,7 @@ def deploy( options=options, wait=wait, serializer=serializer, + labels=labels, **kwargs, ) @@ -1906,7 +1925,7 @@ def get_estimator( if self.model_provider == ProviderAlibabaPAI: default_labels = { "BaseModelUri": self.uri, - "CreatedBy": "QuickStart", + "CreatedBy": MODEL_TASK_CREATED_BY_QUICKSTART, "Domain": self.domain, "RootModelID": self.model_id, "RootModelName": self.model_name, diff --git a/pai/predictor.py b/pai/predictor.py index f4f92e2..689c5e0 100644 --- a/pai/predictor.py +++ b/pai/predictor.py @@ -181,10 +181,19 @@ def service_status(self): return self._service_api_object["Status"] @property - def access_token(self): + def access_token(self) -> str: """Access token of the service.""" return self._service_api_object["AccessToken"] + @property + def labels(self) -> Dict[str, str]: + """Labels of the service.""" + labels = { + item["LabelKey"]: item["LabelValue"] + for item in self._service_api_object.get("Labels", []) + } + return labels + @property def console_uri(self): """Returns the console URI of the service.""" @@ -298,17 +307,14 @@ def delete_service(self): """Delete the service.""" self.session.service_api.delete(name=self.service_name) - def wait_for_ready(self, force: bool = False): + def wait_for_ready(self): """Wait until the service enter running status. - Args: - force (bool): Whether to force wait for ready. - Raises: RuntimeError: Raise if the service terminated unexpectedly. """ - if self.service_status == ServiceStatus.Running and not force: + if self.service_status == ServiceStatus.Running: return logger.info( @@ -327,6 +333,10 @@ def wait_for_ready(self, force: bool = False): self._wait_for_gateway_ready() self.refresh() + def wait(self): + """Wait for the service to be ready.""" + return self.wait_for_ready() + def _wait_for_gateway_ready(self, attempts: int = 60, interval: int = 2): """Hacky way to wait for the service gateway to be ready. @@ -337,6 +347,8 @@ def _wait_for_gateway_ready(self, attempts: int = 60, interval: int = 2): """ def _is_gateway_ready(): + # can't use HEAD method to check gateway status because the service will + # block the request until timeout. resp = self._send_request(method="GET") res = not ( # following status code and content indicates the gateway is not ready @@ -730,7 +742,8 @@ def openai(self, **kwargs) -> "OpenAI": raise ImportError( "openai package is not installed, install it with `pip install openai`." ) - base_url = kwargs.pop("base_url", self.endpoint + "/v1/") + + base_url = kwargs.pop("base_url", posixpath.join(self.endpoint + "v1/")) api_key = kwargs.pop("api_key", self.access_token) return OpenAI(base_url=base_url, api_key=api_key, **kwargs) @@ -1378,6 +1391,9 @@ def wait_for_ready(self): self._wait_local_server_ready() time.sleep(5) + def wait(self): + return self.wait_for_ready() + def _wait_local_server_ready( self, interval: int = 5, diff --git a/pyproject.toml b/pyproject.toml index 2f8fb69..a3e74c4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,7 @@ src_paths = ["pai", "tests"] #known_first_party = ["pai", "tests"] [tool.pytest.ini_options] -timeout = 300 +timeout = 600 [doc8] max-line-length=88 diff --git a/tests/integration/test_model.py b/tests/integration/test_model.py index 1b038d0..e2dac19 100644 --- a/tests/integration/test_model.py +++ b/tests/integration/test_model.py @@ -305,8 +305,14 @@ def test_rm_deploy(self): ) p = m.deploy() - self.predictors.append(p) + + self.assertEqual(p.labels.get("RootModelID"), m.model_id) + self.assertEqual(p.labels.get("RootModelName"), m.model_name) + self.assertEqual(p.labels.get("RootModelVersion"), m.model_version) + self.assertEqual(p.labels.get("BaseModelUri"), m.uri) + self.assertEqual(p.labels.get("Task"), m.task) + self.assertEqual(p.labels.get("Domain"), m.domain) self.assertTrue(p.service_name) res = p.predict(["开心", "死亡"]) self.assertTrue(isinstance(res, list))