diff --git a/exb_sdk/workflow_client/client.py b/exb_sdk/workflow_client/client.py index 48c93a7..7273bac 100644 --- a/exb_sdk/workflow_client/client.py +++ b/exb_sdk/workflow_client/client.py @@ -2,6 +2,7 @@ import asyncio import logging +import time from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any, Self from uuid import UUID @@ -26,6 +27,7 @@ ) from exb_sdk.workflow_client.exceptions import ( DocumentProcessingError, + DocumentProcessingTimeoutError, HttpError, WaitForResultCancelledError, ) @@ -61,6 +63,9 @@ class Client: dev_mode: When enabled allows to cache file upload and faster result polling. Useful during development when extracting results of the same file multiple times. Do not use when running this in production! Defaults to False. + precessing_timeout_seconds: The timeout in seconds for document processing. + For cases when the document faĆ­led due internal reasons within the processing and + the error is not recoverable. Defaults to 30 minutes. """ base_url: str @@ -68,6 +73,7 @@ class Client: solution_id: UUID token: str dev_mode: bool = False + precessing_timeout_seconds: int = 30 * 60 _http_client: AsyncClient = field(init=False) _poll_interval: float = field(init=False) @@ -109,6 +115,7 @@ async def get_result(self, document_path: Path) -> Result: Raises: WaitForResultCancelledError: if the client gets closed while waiting for a result. DocumentProcessingError: if the extraction failed. + DocumentProcessingTimeoutError: if the extraction was not processed in time. HTTPError: if any operation to upload or getting the result fails with an unexpected error. @@ -119,6 +126,7 @@ async def get_result(self, document_path: Path) -> Result: document_id = await self._upload_document(document_path=document_path) # Poll for result + start_time = time.time() while True: await asyncio.sleep(self._poll_interval) if self._closing: @@ -131,6 +139,18 @@ async def get_result(self, document_path: Path) -> Result: if state == DocumentState.PROCESSING_ERROR: raise DocumentProcessingError + # in cases where the document was not processed in time + elapsed_time = time.time() - start_time + if elapsed_time > self.precessing_timeout_seconds: + # the processing failed (crash loops) + # and the state never set to DOCUMENT_PROCESSED or PROCESSING_ERROR + logger.error( + f"Document processing failed due to timeout" + f" ({self.precessing_timeout_seconds}s)" + f" after {int(elapsed_time)} seconds.", + ) + raise DocumentProcessingTimeoutError + # Get result and return it return await self._download_result(document_id) diff --git a/exb_sdk/workflow_client/exceptions.py b/exb_sdk/workflow_client/exceptions.py index e3db659..bee7975 100644 --- a/exb_sdk/workflow_client/exceptions.py +++ b/exb_sdk/workflow_client/exceptions.py @@ -4,8 +4,15 @@ class DocumentProcessingError(Exception): """Raised when an error occurs during an extraction.""" + def __init__(self, message: str = "Error processing document") -> None: # noqa: D107 + super().__init__(message) + + +class DocumentProcessingTimeoutError(Exception): + """Raised when an error occurs during an extraction.""" + def __init__(self) -> None: # noqa: D107 - message = "Error processing document" + message = "Error processing document due to timeout" super().__init__(message) diff --git a/tests/workflow_client/test_get_result.py b/tests/workflow_client/test_get_result.py index c37fcae..58d1bde 100644 --- a/tests/workflow_client/test_get_result.py +++ b/tests/workflow_client/test_get_result.py @@ -4,10 +4,15 @@ from typing import TYPE_CHECKING import pytest +from httpx import Request, Response from exb_sdk.workflow_client import Client from exb_sdk.workflow_client.api_constants import DocumentState -from exb_sdk.workflow_client.exceptions import DocumentProcessingError, WaitForResultCancelledError +from exb_sdk.workflow_client.exceptions import ( + DocumentProcessingError, + DocumentProcessingTimeoutError, + WaitForResultCancelledError, +) from tests.workflow_client.conftest import Context, response_result, response_state, response_upload if TYPE_CHECKING: @@ -89,6 +94,34 @@ async def test_error_in_processing( await client.get_result(document_path=document_path) +async def test_timeout_in_processing( + ctx: Context, + client: Client, + document_path: Path, + httpx_mock: HTTPXMock, +) -> None: + async def simulate_network_latency(_: Request) -> Response: + await asyncio.sleep(2) + return Response( + status_code=200, + json={"state": str(DocumentState.QUEUED_FOR_PROCESSING)}, + ) + + # arrange + client.precessing_timeout_seconds = 1 + + httpx_mock.add_response(**response_upload(ctx=ctx)) + httpx_mock.add_callback(simulate_network_latency) + + async with client: + # assert + with pytest.raises( + DocumentProcessingTimeoutError, match="Error processing document due to timeout" + ): + # act + await client.get_result(document_path=document_path) + + async def test_get_result_httpx_client_lifecycle( ctx: Context, client: Client,