From 96b2ab6e386729f3e9d9522f6c0ae7736f04accf Mon Sep 17 00:00:00 2001 From: Han Wang Date: Fri, 8 Sep 2023 09:44:01 +0800 Subject: [PATCH] [SDK]Refine error message (#337) # Description Please add an informative description that covers that changes made by the pull request and link all relevant issues. # All Promptflow Contribution checklist: - [ ] **The pull request does not introduce [breaking changes]** - [ ] **CHANGELOG is updated for new features, bug fixes or other significant changes.** - [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).** ## General Guidelines and Best Practices - [ ] Title of the pull request is clear and informative. - [ ] There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, [see this page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md). ### Testing Guidelines - [ ] Pull request includes test coverage for the included changes. --- .../azure/_restclient/flow_service_caller.py | 443 +++++++++--------- .../e2etests/test_run_operations.py | 26 +- 2 files changed, 245 insertions(+), 224 deletions(-) diff --git a/src/promptflow/promptflow/azure/_restclient/flow_service_caller.py b/src/promptflow/promptflow/azure/_restclient/flow_service_caller.py index f17a0b25daa..4988e47c179 100644 --- a/src/promptflow/promptflow/azure/_restclient/flow_service_caller.py +++ b/src/promptflow/promptflow/azure/_restclient/flow_service_caller.py @@ -11,6 +11,7 @@ import sys import time import uuid +from functools import wraps from azure.core.exceptions import HttpResponseError, ResourceExistsError from azure.core.pipeline.policies import RetryPolicy @@ -56,6 +57,29 @@ def _refresh_request_id_for_telemetry(self): self._request_id = str(uuid.uuid4()) +def _request_wrapper(): + """Wrapper for request. Will refress request id and pretty print exception.""" + def exception_wrapper(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + if not isinstance(self, RequestTelemetryMixin): + raise PromptflowException(f"Wrapped function is not RequestTelemetryMixin, got {type(self)}") + # refresh request before each request + self._refresh_request_id_for_telemetry() + try: + return func(self, *args, **kwargs) + except HttpResponseError as e: + raise FlowRequestException( + f"Calling {func.__name__} failed with request id: {self._request_id} \n" + f"Status code: {e.status_code} \n" + f"Reason: {e.reason} \n" + f"Error message: {e.message} \n" + ) + return wrapper + + return exception_wrapper + + class FlowServiceCaller(RequestTelemetryMixin): """FlowServiceCaller. :param workspace: workspace @@ -135,6 +159,7 @@ def _set_headers_with_user_aml_token(self, headers): headers["aml-user-token"] = aml_token + @_request_wrapper() def create_flow( self, subscription_id, # type: str @@ -144,22 +169,18 @@ def create_flow( body=None, # type: Optional["_models.CreateFlowRequest"] **kwargs # type: Any ): - # TODO: move the wrapper to decorator - self._refresh_request_id_for_telemetry() headers = self._get_headers() - try: - return self.caller.flows.create_flow( - subscription_id=subscription_id, - resource_group_name=resource_group_name, - workspace_name=workspace_name, - experiment_id=experiment_id, - body=body, - headers=headers, - **kwargs - ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + return self.caller.flows.create_flow( + subscription_id=subscription_id, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + experiment_id=experiment_id, + body=body, + headers=headers, + **kwargs + ) + @_request_wrapper() def create_component_from_flow( self, subscription_id, # type: str @@ -168,24 +189,22 @@ def create_component_from_flow( body=None, # type: Optional["_models.LoadFlowAsComponentRequest"] **kwargs # type: Any ): - self._refresh_request_id_for_telemetry() headers = self._get_headers() try: return self.caller.flows.load_as_component( - subscription_id=subscription_id, - resource_group_name=resource_group_name, - workspace_name=workspace_name, - body=body, - headers=headers, - **kwargs - ) + subscription_id=subscription_id, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + body=body, + headers=headers, + **kwargs + ) except ResourceExistsError: return f"/subscriptions/{subscription_id}/resourceGroups/{resource_group_name}" \ f"/providers/Microsoft.MachineLearningServices/workspaces/{workspace_name}" \ f"/components/{body.component_name}/versions/{body.component_version}" - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + @_request_wrapper() def list_flows( self, subscription_id, # type: str @@ -197,23 +216,20 @@ def list_flows( list_view_type=None, # type: Optional[Union[str, "_models.ListViewType"]] **kwargs # type: Any ): - self._refresh_request_id_for_telemetry() headers = self._get_headers() - try: - return self.caller.flows.list_flows( - subscription_id=subscription_id, - resource_group_name=resource_group_name, - workspace_name=workspace_name, - experiment_id=experiment_id, - owned_only=owned_only, - flow_type=flow_type, - list_view_type=list_view_type, - headers=headers, - **kwargs, - ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + return self.caller.flows.list_flows( + subscription_id=subscription_id, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + experiment_id=experiment_id, + owned_only=owned_only, + flow_type=flow_type, + list_view_type=list_view_type, + headers=headers, + **kwargs, + ) + @_request_wrapper() def submit_flow( self, subscription_id, # type: str @@ -224,22 +240,20 @@ def submit_flow( body=None, # type: Optional["_models.SubmitFlowRequest"] **kwargs # type: Any ): - self._refresh_request_id_for_telemetry() + headers = self._get_headers() - try: - return self.caller.flows.submit_flow( - subscription_id=subscription_id, - resource_group_name=resource_group_name, - workspace_name=workspace_name, - experiment_id=experiment_id, - endpoint_name=endpoint_name, - body=body, - headers=headers, - **kwargs - ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + return self.caller.flows.submit_flow( + subscription_id=subscription_id, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + experiment_id=experiment_id, + endpoint_name=endpoint_name, + body=body, + headers=headers, + **kwargs + ) + @_request_wrapper() def get_flow( self, subscription_id, # type: str @@ -249,21 +263,19 @@ def get_flow( experiment_id, # type: str **kwargs # type: Any ): - self._refresh_request_id_for_telemetry() + headers = self._get_headers() - try: - return self.caller.flows.get_flow( - subscription_id=subscription_id, - resource_group_name=resource_group_name, - workspace_name=workspace_name, - experiment_id=experiment_id, - flow_id=flow_id, - headers=headers, - **kwargs - ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + return self.caller.flows.get_flow( + subscription_id=subscription_id, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + experiment_id=experiment_id, + flow_id=flow_id, + headers=headers, + **kwargs + ) + @_request_wrapper() def create_connection( self, subscription_id, # type: str @@ -273,21 +285,19 @@ def create_connection( body=None, # type: Optional["_models.CreateOrUpdateConnectionRequest"] **kwargs # type: Any ): - self._refresh_request_id_for_telemetry() + headers = self._get_headers() - try: - return self.caller.connections.create_connection( - subscription_id=subscription_id, - resource_group_name=resource_group_name, - workspace_name=workspace_name, - connection_name=connection_name, - body=body, - headers=headers, - **kwargs - ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + return self.caller.connections.create_connection( + subscription_id=subscription_id, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + connection_name=connection_name, + body=body, + headers=headers, + **kwargs + ) + @_request_wrapper() def update_connection( self, subscription_id, # type: str @@ -297,10 +307,9 @@ def update_connection( body=None, # type: Optional["_models.CreateOrUpdateConnectionRequestDto"] **kwargs # type: Any ): - self._refresh_request_id_for_telemetry() + headers = self._get_headers() - try: - return self.caller.connections.update_connection( + return self.caller.connections.update_connection( subscription_id=subscription_id, resource_group_name=resource_group_name, workspace_name=workspace_name, @@ -309,9 +318,9 @@ def update_connection( headers=headers, **kwargs ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + + @_request_wrapper() def get_connection( self, subscription_id, # type: str @@ -320,10 +329,9 @@ def get_connection( connection_name, # type: str **kwargs # type: Any ): - self._refresh_request_id_for_telemetry() + headers = self._get_headers() - try: - return self.caller.connections.get_connection( + return self.caller.connections.get_connection( subscription_id=subscription_id, resource_group_name=resource_group_name, workspace_name=workspace_name, @@ -331,9 +339,8 @@ def get_connection( headers=headers, **kwargs ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + @_request_wrapper() def delete_connection( self, subscription_id, # type: str @@ -342,10 +349,9 @@ def delete_connection( connection_name, # type: str **kwargs # type: Any ): - self._refresh_request_id_for_telemetry() + headers = self._get_headers() - try: - return self.caller.connections.delete_connection( + return self.caller.connections.delete_connection( subscription_id=subscription_id, resource_group_name=resource_group_name, workspace_name=workspace_name, @@ -353,9 +359,9 @@ def delete_connection( headers=headers, **kwargs ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + + @_request_wrapper() def list_connections( self, subscription_id, # type: str @@ -363,19 +369,18 @@ def list_connections( workspace_name, # type: str **kwargs # type: Any ): - self._refresh_request_id_for_telemetry() + headers = self._get_headers() - try: - return self.caller.connections.list_connections( + return self.caller.connections.list_connections( subscription_id=subscription_id, resource_group_name=resource_group_name, workspace_name=workspace_name, headers=headers, **kwargs ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + + @_request_wrapper() def list_connection_specs( self, subscription_id, # type: str @@ -383,19 +388,18 @@ def list_connection_specs( workspace_name, # type: str **kwargs # type: Any ): - self._refresh_request_id_for_telemetry() + headers = self._get_headers() - try: - return self.caller.connections.list_connection_specs( + return self.caller.connections.list_connection_specs( subscription_id=subscription_id, resource_group_name=resource_group_name, workspace_name=workspace_name, headers=headers, **kwargs ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + + @_request_wrapper() def list_runs( self, subscription_id, # type: str @@ -408,19 +412,17 @@ def list_runs( :return: A list of runs in the workspace. :rtype: list[~azure.ml._restclient.machinelearningservices.models.Run] """ - self._refresh_request_id_for_telemetry() + headers = self._get_headers() - try: - return self.caller.flows.list_flow_runs( + return self.caller.flows.list_flow_runs( subscription_id=subscription_id, resource_group_name=resource_group_name, workspace_name=workspace_name, headers=headers, **kwargs ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + @_request_wrapper() def submit_bulk_run( self, subscription_id, # type: str @@ -444,12 +446,11 @@ def submit_bulk_run( :rtype: str :raises: ~azure.core.exceptions.HttpResponseError """ - self._refresh_request_id_for_telemetry() + headers = self._get_headers() # pass user aml token to flow run submission self._set_headers_with_user_aml_token(headers) - try: - return self.caller.bulk_runs.submit_bulk_run( + return self.caller.bulk_runs.submit_bulk_run( subscription_id=subscription_id, resource_group_name=resource_group_name, workspace_name=workspace_name, @@ -457,9 +458,9 @@ def submit_bulk_run( body=body, **kwargs ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + + @_request_wrapper() def create_flow_session( self, subscription_id, # type: str @@ -479,98 +480,97 @@ def create_flow_session( from promptflow.azure._constants._flow import SESSION_CREATION_TIMEOUT_SECONDS from promptflow.azure._restclient.flow.models import SetupFlowSessionAction - self._refresh_request_id_for_telemetry() + headers = self._get_headers() # pass user aml token to session create so user don't need to do authentication again in CI self._set_headers_with_user_aml_token(headers) - try: - # did not call self.caller.flow_sessions.create_flow_session because it does not support return headers - cls = kwargs.pop('cls', None) # type: ClsType[Any] - error_map = { - 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError - } - error_map.update(kwargs.pop('error_map', {})) + # did not call self.caller.flow_sessions.create_flow_session because it does not support return headers + cls = kwargs.pop('cls', None) # type: ClsType[Any] + error_map = { + 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError + } + error_map.update(kwargs.pop('error_map', {})) - content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] + content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] - _json = self.caller.flow_sessions._serialize.body(body, 'CreateFlowSessionRequest') + _json = self.caller.flow_sessions._serialize.body(body, 'CreateFlowSessionRequest') - request = build_create_flow_session_request( - subscription_id=subscription_id, - resource_group_name=resource_group_name, - workspace_name=workspace_name, - session_id=session_id, - content_type=content_type, - json=_json, - template_url=self.caller.flow_sessions.create_flow_session.metadata['url'], - headers=headers + request = build_create_flow_session_request( + subscription_id=subscription_id, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + session_id=session_id, + content_type=content_type, + json=_json, + template_url=self.caller.flow_sessions.create_flow_session.metadata['url'], + headers=headers + ) + request = _convert_request(request) + request.url = self.caller.flow_sessions._client.format_url(request.url) + pipeline_response = self.caller.flow_sessions._client._pipeline.run(request, stream=False, **kwargs) + + response = pipeline_response.http_response + + if response.status_code not in [200, 202]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + error = self.caller.flow_sessions._deserialize.failsafe_deserialize(_models.ErrorResponse, + pipeline_response) + raise HttpResponseError(response=response, model=error) + if response.status_code == 200: + return + action = body.action or SetupFlowSessionAction.INSTALL.value + if action == SetupFlowSessionAction.INSTALL.value: + action = "creation" + else: + action = "reset" + + logger.info(f"Start polling until session {action} is completed...") + # start polling status here. + if "azure-asyncoperation" not in response.headers: + raise FlowRequestException( + "No polling url found in response headers. " + f"Request id: {headers['x-ms-client-request-id']}. " + f"Response headers: {response.headers}." ) - request = _convert_request(request) - request.url = self.caller.flow_sessions._client.format_url(request.url) - pipeline_response = self.caller.flow_sessions._client._pipeline.run(request, stream=False, **kwargs) - - response = pipeline_response.http_response - - if response.status_code not in [200, 202]: - map_error(status_code=response.status_code, response=response, error_map=error_map) - error = self.caller.flow_sessions._deserialize.failsafe_deserialize(_models.ErrorResponse, - pipeline_response) - raise HttpResponseError(response=response, model=error) - if response.status_code == 200: - return - action = body.action or SetupFlowSessionAction.INSTALL.value - if action == SetupFlowSessionAction.INSTALL.value: - action = "creation" + polling_url = response.headers["azure-asyncoperation"] + time_run = 0 + sleep_period = 5 + status = None + timeout_seconds = SESSION_CREATION_TIMEOUT_SECONDS + # InProgress is only known non-terminal status for now. + while status in [None, "InProgress"]: + if time_run + sleep_period > timeout_seconds: + message = f"Timeout for session {action} {session_id} for {AUTOMATIC_RUNTIME}.\n" \ + "Please resubmit the flow later." + raise Exception(message) + time_run += sleep_period + time.sleep(sleep_period) + response = self.poll_operation_status(url=polling_url, **kwargs) + status = response["status"] + logger.debug(f"Current polling status: {status}") + if time_run % 30 == 0: + # print the message every 30 seconds to avoid users feeling stuck during the operation + print(f"Waiting for session {action}, current status: {status}") else: - action = "reset" + logger.debug(f"Waiting for session {action}, current status: {status}") + + if status == "Succeeded": + logger.info(f"Session {action} finished with status {status}.") + else: + # refine response error message + try: + response["error"]["message"] = json.loads(response["error"]["message"]) + except Exception: + pass + raise FlowRequestException( + f"Session {action} failed for {session_id}. \n" + f"Session {action} status: {status}. \n" + f"Request id: {headers['x-ms-client-request-id']}. \n" + f"{json.dumps(response, indent=2)}." + ) - logger.info(f"Start polling until session {action} is completed...") - # start polling status here. - if "azure-asyncoperation" not in response.headers: - raise FlowRequestException( - "No polling url found in response headers. " - f"Request id: {headers['x-ms-client-request-id']}. " - f"Response headers: {response.headers}." - ) - polling_url = response.headers["azure-asyncoperation"] - time_run = 0 - sleep_period = 5 - status = None - timeout_seconds = SESSION_CREATION_TIMEOUT_SECONDS - # InProgress is only known non-terminal status for now. - while status in [None, "InProgress"]: - if time_run + sleep_period > timeout_seconds: - message = f"Timeout for session {action} {session_id} for {AUTOMATIC_RUNTIME}.\n" \ - "Please resubmit the flow later." - raise Exception(message) - time_run += sleep_period - time.sleep(sleep_period) - response = self.poll_operation_status(url=polling_url, **kwargs) - status = response["status"] - logger.debug(f"Current polling status: {status}") - if time_run % 30 == 0: - # print the message every 30 seconds to avoid users feeling stuck during the operation - print(f"Waiting for session {action}, current status: {status}") - else: - logger.debug(f"Waiting for session {action}, current status: {status}") - - if status == "Succeeded": - logger.info(f"Session {action} finished with status {status}.") - else: - # refine response error message - try: - response["error"]["message"] = json.loads(response["error"]["message"]) - except Exception: - pass - raise FlowRequestException( - f"Session {action} failed for {session_id}. \n" - f"Session {action} status: {status}. \n" - f"Request id: {headers['x-ms-client-request-id']}. \n" - f"{json.dumps(response, indent=2)}." - ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + @_request_wrapper() def poll_operation_status( self, url, @@ -581,32 +581,29 @@ def poll_operation_status( ResourceNotFoundError, map_error from promptflow.azure._restclient.flow.operations._flow_sessions_operations import _models - self._refresh_request_id_for_telemetry() + headers = self._get_headers() - try: - request = HttpRequest( - method="GET", - url=url, - headers=headers, - **kwargs + request = HttpRequest( + method="GET", + url=url, + headers=headers, + **kwargs + ) + pipeline_response = self.caller.flow_sessions._client._pipeline.run(request, stream=False, **kwargs) + response = pipeline_response.http_response + error_map = { + 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError + } + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + error = self.caller.flow_sessions._deserialize.failsafe_deserialize(_models.ErrorResponse, + pipeline_response) + raise HttpResponseError(response=response, model=error) + + deserialized = self.caller.flow_sessions._deserialize('object', pipeline_response) + if "status" not in deserialized: + raise FlowRequestException( + f"Status not found in response. Request id: {headers['x-ms-client-request-id']}. " + f"Response headers: {response.headers}." ) - pipeline_response = self.caller.flow_sessions._client._pipeline.run(request, stream=False, **kwargs) - response = pipeline_response.http_response - error_map = { - 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError - } - if response.status_code not in [200]: - map_error(status_code=response.status_code, response=response, error_map=error_map) - error = self.caller.flow_sessions._deserialize.failsafe_deserialize(_models.ErrorResponse, - pipeline_response) - raise HttpResponseError(response=response, model=error) - - deserialized = self.caller.flow_sessions._deserialize('object', pipeline_response) - if "status" not in deserialized: - raise FlowRequestException( - f"Status not found in response. Request id: {headers['x-ms-client-request-id']}. " - f"Response headers: {response.headers}." - ) - return deserialized - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + return deserialized diff --git a/src/promptflow/tests/sdk_cli_azure_test/e2etests/test_run_operations.py b/src/promptflow/tests/sdk_cli_azure_test/e2etests/test_run_operations.py index 9ed66be513d..8361c322232 100644 --- a/src/promptflow/tests/sdk_cli_azure_test/e2etests/test_run_operations.py +++ b/src/promptflow/tests/sdk_cli_azure_test/e2etests/test_run_operations.py @@ -16,7 +16,7 @@ from promptflow._sdk.entities import Run from promptflow._utils.flow_utils import get_flow_lineage_id from promptflow.azure import PFClient -from promptflow.azure._restclient.flow_service_caller import FlowRequestException +from promptflow.azure._restclient.flow_service_caller import FlowRequestException, FlowServiceCaller from promptflow.azure.operations import RunOperations PROMOTFLOW_ROOT = Path(__file__) / "../../../.." @@ -556,3 +556,27 @@ def test_automatic_runtime_creation_failure(self, pf): runtime=None, ) assert "Session creation failed for" in str(e.value) + + def test_run_submission_exception(self, remote_client): + from azure.core.exceptions import HttpResponseError + + from promptflow.azure._restclient.flow.operations import BulkRunsOperations + + with patch.object(BulkRunsOperations, "submit_bulk_run") as mock_request, patch.object( + FlowServiceCaller, "_set_headers_with_user_aml_token" + ): + mock_request.side_effect = HttpResponseError("customized error message.") + with pytest.raises(FlowRequestException) as e: + original_request_id = remote_client.runs._service_caller._request_id + remote_client.runs._service_caller.submit_bulk_run( + subscription_id="fake_subscription_id", + resource_group_name="fake_resource_group", + workspace_name="fake_workspace_name", + ) + # request id has been updated + assert original_request_id != remote_client.runs._service_caller._request_id + + # original error message should be included in FlowRequestException + assert "customized error message" in str(e.value) + # request id should be included in FlowRequestException + assert f"request id: {remote_client.runs._service_caller._request_id}" in str(e.value)