diff --git a/src/promptflow/promptflow/azure/_restclient/flow_service_caller.py b/src/promptflow/promptflow/azure/_restclient/flow_service_caller.py index f17a0b25daa..4988e47c179 100644 --- a/src/promptflow/promptflow/azure/_restclient/flow_service_caller.py +++ b/src/promptflow/promptflow/azure/_restclient/flow_service_caller.py @@ -11,6 +11,7 @@ import sys import time import uuid +from functools import wraps from azure.core.exceptions import HttpResponseError, ResourceExistsError from azure.core.pipeline.policies import RetryPolicy @@ -56,6 +57,29 @@ def _refresh_request_id_for_telemetry(self): self._request_id = str(uuid.uuid4()) +def _request_wrapper(): + """Wrapper for request. Will refress request id and pretty print exception.""" + def exception_wrapper(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + if not isinstance(self, RequestTelemetryMixin): + raise PromptflowException(f"Wrapped function is not RequestTelemetryMixin, got {type(self)}") + # refresh request before each request + self._refresh_request_id_for_telemetry() + try: + return func(self, *args, **kwargs) + except HttpResponseError as e: + raise FlowRequestException( + f"Calling {func.__name__} failed with request id: {self._request_id} \n" + f"Status code: {e.status_code} \n" + f"Reason: {e.reason} \n" + f"Error message: {e.message} \n" + ) + return wrapper + + return exception_wrapper + + class FlowServiceCaller(RequestTelemetryMixin): """FlowServiceCaller. :param workspace: workspace @@ -135,6 +159,7 @@ def _set_headers_with_user_aml_token(self, headers): headers["aml-user-token"] = aml_token + @_request_wrapper() def create_flow( self, subscription_id, # type: str @@ -144,22 +169,18 @@ def create_flow( body=None, # type: Optional["_models.CreateFlowRequest"] **kwargs # type: Any ): - # TODO: move the wrapper to decorator - self._refresh_request_id_for_telemetry() headers = self._get_headers() - try: - return self.caller.flows.create_flow( - subscription_id=subscription_id, - resource_group_name=resource_group_name, - workspace_name=workspace_name, - experiment_id=experiment_id, - body=body, - headers=headers, - **kwargs - ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + return self.caller.flows.create_flow( + subscription_id=subscription_id, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + experiment_id=experiment_id, + body=body, + headers=headers, + **kwargs + ) + @_request_wrapper() def create_component_from_flow( self, subscription_id, # type: str @@ -168,24 +189,22 @@ def create_component_from_flow( body=None, # type: Optional["_models.LoadFlowAsComponentRequest"] **kwargs # type: Any ): - self._refresh_request_id_for_telemetry() headers = self._get_headers() try: return self.caller.flows.load_as_component( - subscription_id=subscription_id, - resource_group_name=resource_group_name, - workspace_name=workspace_name, - body=body, - headers=headers, - **kwargs - ) + subscription_id=subscription_id, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + body=body, + headers=headers, + **kwargs + ) except ResourceExistsError: return f"/subscriptions/{subscription_id}/resourceGroups/{resource_group_name}" \ f"/providers/Microsoft.MachineLearningServices/workspaces/{workspace_name}" \ f"/components/{body.component_name}/versions/{body.component_version}" - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + @_request_wrapper() def list_flows( self, subscription_id, # type: str @@ -197,23 +216,20 @@ def list_flows( list_view_type=None, # type: Optional[Union[str, "_models.ListViewType"]] **kwargs # type: Any ): - self._refresh_request_id_for_telemetry() headers = self._get_headers() - try: - return self.caller.flows.list_flows( - subscription_id=subscription_id, - resource_group_name=resource_group_name, - workspace_name=workspace_name, - experiment_id=experiment_id, - owned_only=owned_only, - flow_type=flow_type, - list_view_type=list_view_type, - headers=headers, - **kwargs, - ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + return self.caller.flows.list_flows( + subscription_id=subscription_id, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + experiment_id=experiment_id, + owned_only=owned_only, + flow_type=flow_type, + list_view_type=list_view_type, + headers=headers, + **kwargs, + ) + @_request_wrapper() def submit_flow( self, subscription_id, # type: str @@ -224,22 +240,20 @@ def submit_flow( body=None, # type: Optional["_models.SubmitFlowRequest"] **kwargs # type: Any ): - self._refresh_request_id_for_telemetry() + headers = self._get_headers() - try: - return self.caller.flows.submit_flow( - subscription_id=subscription_id, - resource_group_name=resource_group_name, - workspace_name=workspace_name, - experiment_id=experiment_id, - endpoint_name=endpoint_name, - body=body, - headers=headers, - **kwargs - ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + return self.caller.flows.submit_flow( + subscription_id=subscription_id, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + experiment_id=experiment_id, + endpoint_name=endpoint_name, + body=body, + headers=headers, + **kwargs + ) + @_request_wrapper() def get_flow( self, subscription_id, # type: str @@ -249,21 +263,19 @@ def get_flow( experiment_id, # type: str **kwargs # type: Any ): - self._refresh_request_id_for_telemetry() + headers = self._get_headers() - try: - return self.caller.flows.get_flow( - subscription_id=subscription_id, - resource_group_name=resource_group_name, - workspace_name=workspace_name, - experiment_id=experiment_id, - flow_id=flow_id, - headers=headers, - **kwargs - ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + return self.caller.flows.get_flow( + subscription_id=subscription_id, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + experiment_id=experiment_id, + flow_id=flow_id, + headers=headers, + **kwargs + ) + @_request_wrapper() def create_connection( self, subscription_id, # type: str @@ -273,21 +285,19 @@ def create_connection( body=None, # type: Optional["_models.CreateOrUpdateConnectionRequest"] **kwargs # type: Any ): - self._refresh_request_id_for_telemetry() + headers = self._get_headers() - try: - return self.caller.connections.create_connection( - subscription_id=subscription_id, - resource_group_name=resource_group_name, - workspace_name=workspace_name, - connection_name=connection_name, - body=body, - headers=headers, - **kwargs - ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + return self.caller.connections.create_connection( + subscription_id=subscription_id, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + connection_name=connection_name, + body=body, + headers=headers, + **kwargs + ) + @_request_wrapper() def update_connection( self, subscription_id, # type: str @@ -297,10 +307,9 @@ def update_connection( body=None, # type: Optional["_models.CreateOrUpdateConnectionRequestDto"] **kwargs # type: Any ): - self._refresh_request_id_for_telemetry() + headers = self._get_headers() - try: - return self.caller.connections.update_connection( + return self.caller.connections.update_connection( subscription_id=subscription_id, resource_group_name=resource_group_name, workspace_name=workspace_name, @@ -309,9 +318,9 @@ def update_connection( headers=headers, **kwargs ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + + @_request_wrapper() def get_connection( self, subscription_id, # type: str @@ -320,10 +329,9 @@ def get_connection( connection_name, # type: str **kwargs # type: Any ): - self._refresh_request_id_for_telemetry() + headers = self._get_headers() - try: - return self.caller.connections.get_connection( + return self.caller.connections.get_connection( subscription_id=subscription_id, resource_group_name=resource_group_name, workspace_name=workspace_name, @@ -331,9 +339,8 @@ def get_connection( headers=headers, **kwargs ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + @_request_wrapper() def delete_connection( self, subscription_id, # type: str @@ -342,10 +349,9 @@ def delete_connection( connection_name, # type: str **kwargs # type: Any ): - self._refresh_request_id_for_telemetry() + headers = self._get_headers() - try: - return self.caller.connections.delete_connection( + return self.caller.connections.delete_connection( subscription_id=subscription_id, resource_group_name=resource_group_name, workspace_name=workspace_name, @@ -353,9 +359,9 @@ def delete_connection( headers=headers, **kwargs ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + + @_request_wrapper() def list_connections( self, subscription_id, # type: str @@ -363,19 +369,18 @@ def list_connections( workspace_name, # type: str **kwargs # type: Any ): - self._refresh_request_id_for_telemetry() + headers = self._get_headers() - try: - return self.caller.connections.list_connections( + return self.caller.connections.list_connections( subscription_id=subscription_id, resource_group_name=resource_group_name, workspace_name=workspace_name, headers=headers, **kwargs ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + + @_request_wrapper() def list_connection_specs( self, subscription_id, # type: str @@ -383,19 +388,18 @@ def list_connection_specs( workspace_name, # type: str **kwargs # type: Any ): - self._refresh_request_id_for_telemetry() + headers = self._get_headers() - try: - return self.caller.connections.list_connection_specs( + return self.caller.connections.list_connection_specs( subscription_id=subscription_id, resource_group_name=resource_group_name, workspace_name=workspace_name, headers=headers, **kwargs ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + + @_request_wrapper() def list_runs( self, subscription_id, # type: str @@ -408,19 +412,17 @@ def list_runs( :return: A list of runs in the workspace. :rtype: list[~azure.ml._restclient.machinelearningservices.models.Run] """ - self._refresh_request_id_for_telemetry() + headers = self._get_headers() - try: - return self.caller.flows.list_flow_runs( + return self.caller.flows.list_flow_runs( subscription_id=subscription_id, resource_group_name=resource_group_name, workspace_name=workspace_name, headers=headers, **kwargs ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + @_request_wrapper() def submit_bulk_run( self, subscription_id, # type: str @@ -444,12 +446,11 @@ def submit_bulk_run( :rtype: str :raises: ~azure.core.exceptions.HttpResponseError """ - self._refresh_request_id_for_telemetry() + headers = self._get_headers() # pass user aml token to flow run submission self._set_headers_with_user_aml_token(headers) - try: - return self.caller.bulk_runs.submit_bulk_run( + return self.caller.bulk_runs.submit_bulk_run( subscription_id=subscription_id, resource_group_name=resource_group_name, workspace_name=workspace_name, @@ -457,9 +458,9 @@ def submit_bulk_run( body=body, **kwargs ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + + @_request_wrapper() def create_flow_session( self, subscription_id, # type: str @@ -479,98 +480,97 @@ def create_flow_session( from promptflow.azure._constants._flow import SESSION_CREATION_TIMEOUT_SECONDS from promptflow.azure._restclient.flow.models import SetupFlowSessionAction - self._refresh_request_id_for_telemetry() + headers = self._get_headers() # pass user aml token to session create so user don't need to do authentication again in CI self._set_headers_with_user_aml_token(headers) - try: - # did not call self.caller.flow_sessions.create_flow_session because it does not support return headers - cls = kwargs.pop('cls', None) # type: ClsType[Any] - error_map = { - 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError - } - error_map.update(kwargs.pop('error_map', {})) + # did not call self.caller.flow_sessions.create_flow_session because it does not support return headers + cls = kwargs.pop('cls', None) # type: ClsType[Any] + error_map = { + 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError + } + error_map.update(kwargs.pop('error_map', {})) - content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] + content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] - _json = self.caller.flow_sessions._serialize.body(body, 'CreateFlowSessionRequest') + _json = self.caller.flow_sessions._serialize.body(body, 'CreateFlowSessionRequest') - request = build_create_flow_session_request( - subscription_id=subscription_id, - resource_group_name=resource_group_name, - workspace_name=workspace_name, - session_id=session_id, - content_type=content_type, - json=_json, - template_url=self.caller.flow_sessions.create_flow_session.metadata['url'], - headers=headers + request = build_create_flow_session_request( + subscription_id=subscription_id, + resource_group_name=resource_group_name, + workspace_name=workspace_name, + session_id=session_id, + content_type=content_type, + json=_json, + template_url=self.caller.flow_sessions.create_flow_session.metadata['url'], + headers=headers + ) + request = _convert_request(request) + request.url = self.caller.flow_sessions._client.format_url(request.url) + pipeline_response = self.caller.flow_sessions._client._pipeline.run(request, stream=False, **kwargs) + + response = pipeline_response.http_response + + if response.status_code not in [200, 202]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + error = self.caller.flow_sessions._deserialize.failsafe_deserialize(_models.ErrorResponse, + pipeline_response) + raise HttpResponseError(response=response, model=error) + if response.status_code == 200: + return + action = body.action or SetupFlowSessionAction.INSTALL.value + if action == SetupFlowSessionAction.INSTALL.value: + action = "creation" + else: + action = "reset" + + logger.info(f"Start polling until session {action} is completed...") + # start polling status here. + if "azure-asyncoperation" not in response.headers: + raise FlowRequestException( + "No polling url found in response headers. " + f"Request id: {headers['x-ms-client-request-id']}. " + f"Response headers: {response.headers}." ) - request = _convert_request(request) - request.url = self.caller.flow_sessions._client.format_url(request.url) - pipeline_response = self.caller.flow_sessions._client._pipeline.run(request, stream=False, **kwargs) - - response = pipeline_response.http_response - - if response.status_code not in [200, 202]: - map_error(status_code=response.status_code, response=response, error_map=error_map) - error = self.caller.flow_sessions._deserialize.failsafe_deserialize(_models.ErrorResponse, - pipeline_response) - raise HttpResponseError(response=response, model=error) - if response.status_code == 200: - return - action = body.action or SetupFlowSessionAction.INSTALL.value - if action == SetupFlowSessionAction.INSTALL.value: - action = "creation" + polling_url = response.headers["azure-asyncoperation"] + time_run = 0 + sleep_period = 5 + status = None + timeout_seconds = SESSION_CREATION_TIMEOUT_SECONDS + # InProgress is only known non-terminal status for now. + while status in [None, "InProgress"]: + if time_run + sleep_period > timeout_seconds: + message = f"Timeout for session {action} {session_id} for {AUTOMATIC_RUNTIME}.\n" \ + "Please resubmit the flow later." + raise Exception(message) + time_run += sleep_period + time.sleep(sleep_period) + response = self.poll_operation_status(url=polling_url, **kwargs) + status = response["status"] + logger.debug(f"Current polling status: {status}") + if time_run % 30 == 0: + # print the message every 30 seconds to avoid users feeling stuck during the operation + print(f"Waiting for session {action}, current status: {status}") else: - action = "reset" + logger.debug(f"Waiting for session {action}, current status: {status}") + + if status == "Succeeded": + logger.info(f"Session {action} finished with status {status}.") + else: + # refine response error message + try: + response["error"]["message"] = json.loads(response["error"]["message"]) + except Exception: + pass + raise FlowRequestException( + f"Session {action} failed for {session_id}. \n" + f"Session {action} status: {status}. \n" + f"Request id: {headers['x-ms-client-request-id']}. \n" + f"{json.dumps(response, indent=2)}." + ) - logger.info(f"Start polling until session {action} is completed...") - # start polling status here. - if "azure-asyncoperation" not in response.headers: - raise FlowRequestException( - "No polling url found in response headers. " - f"Request id: {headers['x-ms-client-request-id']}. " - f"Response headers: {response.headers}." - ) - polling_url = response.headers["azure-asyncoperation"] - time_run = 0 - sleep_period = 5 - status = None - timeout_seconds = SESSION_CREATION_TIMEOUT_SECONDS - # InProgress is only known non-terminal status for now. - while status in [None, "InProgress"]: - if time_run + sleep_period > timeout_seconds: - message = f"Timeout for session {action} {session_id} for {AUTOMATIC_RUNTIME}.\n" \ - "Please resubmit the flow later." - raise Exception(message) - time_run += sleep_period - time.sleep(sleep_period) - response = self.poll_operation_status(url=polling_url, **kwargs) - status = response["status"] - logger.debug(f"Current polling status: {status}") - if time_run % 30 == 0: - # print the message every 30 seconds to avoid users feeling stuck during the operation - print(f"Waiting for session {action}, current status: {status}") - else: - logger.debug(f"Waiting for session {action}, current status: {status}") - - if status == "Succeeded": - logger.info(f"Session {action} finished with status {status}.") - else: - # refine response error message - try: - response["error"]["message"] = json.loads(response["error"]["message"]) - except Exception: - pass - raise FlowRequestException( - f"Session {action} failed for {session_id}. \n" - f"Session {action} status: {status}. \n" - f"Request id: {headers['x-ms-client-request-id']}. \n" - f"{json.dumps(response, indent=2)}." - ) - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + @_request_wrapper() def poll_operation_status( self, url, @@ -581,32 +581,29 @@ def poll_operation_status( ResourceNotFoundError, map_error from promptflow.azure._restclient.flow.operations._flow_sessions_operations import _models - self._refresh_request_id_for_telemetry() + headers = self._get_headers() - try: - request = HttpRequest( - method="GET", - url=url, - headers=headers, - **kwargs + request = HttpRequest( + method="GET", + url=url, + headers=headers, + **kwargs + ) + pipeline_response = self.caller.flow_sessions._client._pipeline.run(request, stream=False, **kwargs) + response = pipeline_response.http_response + error_map = { + 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError + } + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + error = self.caller.flow_sessions._deserialize.failsafe_deserialize(_models.ErrorResponse, + pipeline_response) + raise HttpResponseError(response=response, model=error) + + deserialized = self.caller.flow_sessions._deserialize('object', pipeline_response) + if "status" not in deserialized: + raise FlowRequestException( + f"Status not found in response. Request id: {headers['x-ms-client-request-id']}. " + f"Response headers: {response.headers}." ) - pipeline_response = self.caller.flow_sessions._client._pipeline.run(request, stream=False, **kwargs) - response = pipeline_response.http_response - error_map = { - 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError - } - if response.status_code not in [200]: - map_error(status_code=response.status_code, response=response, error_map=error_map) - error = self.caller.flow_sessions._deserialize.failsafe_deserialize(_models.ErrorResponse, - pipeline_response) - raise HttpResponseError(response=response, model=error) - - deserialized = self.caller.flow_sessions._deserialize('object', pipeline_response) - if "status" not in deserialized: - raise FlowRequestException( - f"Status not found in response. Request id: {headers['x-ms-client-request-id']}. " - f"Response headers: {response.headers}." - ) - return deserialized - except HttpResponseError as e: - raise FlowRequestException(f"Request id: {headers['x-ms-client-request-id']}") from e + return deserialized diff --git a/src/promptflow/tests/sdk_cli_azure_test/e2etests/test_run_operations.py b/src/promptflow/tests/sdk_cli_azure_test/e2etests/test_run_operations.py index 9ed66be513d..8361c322232 100644 --- a/src/promptflow/tests/sdk_cli_azure_test/e2etests/test_run_operations.py +++ b/src/promptflow/tests/sdk_cli_azure_test/e2etests/test_run_operations.py @@ -16,7 +16,7 @@ from promptflow._sdk.entities import Run from promptflow._utils.flow_utils import get_flow_lineage_id from promptflow.azure import PFClient -from promptflow.azure._restclient.flow_service_caller import FlowRequestException +from promptflow.azure._restclient.flow_service_caller import FlowRequestException, FlowServiceCaller from promptflow.azure.operations import RunOperations PROMOTFLOW_ROOT = Path(__file__) / "../../../.." @@ -556,3 +556,27 @@ def test_automatic_runtime_creation_failure(self, pf): runtime=None, ) assert "Session creation failed for" in str(e.value) + + def test_run_submission_exception(self, remote_client): + from azure.core.exceptions import HttpResponseError + + from promptflow.azure._restclient.flow.operations import BulkRunsOperations + + with patch.object(BulkRunsOperations, "submit_bulk_run") as mock_request, patch.object( + FlowServiceCaller, "_set_headers_with_user_aml_token" + ): + mock_request.side_effect = HttpResponseError("customized error message.") + with pytest.raises(FlowRequestException) as e: + original_request_id = remote_client.runs._service_caller._request_id + remote_client.runs._service_caller.submit_bulk_run( + subscription_id="fake_subscription_id", + resource_group_name="fake_resource_group", + workspace_name="fake_workspace_name", + ) + # request id has been updated + assert original_request_id != remote_client.runs._service_caller._request_id + + # original error message should be included in FlowRequestException + assert "customized error message" in str(e.value) + # request id should be included in FlowRequestException + assert f"request id: {remote_client.runs._service_caller._request_id}" in str(e.value)