Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions exb_sdk/workflow_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import logging
import time
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Self
from uuid import UUID
Expand All @@ -26,6 +27,7 @@
)
from exb_sdk.workflow_client.exceptions import (
DocumentProcessingError,
DocumentProcessingTimeoutError,
HttpError,
WaitForResultCancelledError,
)
Expand Down Expand Up @@ -61,13 +63,17 @@ class Client:
dev_mode: When enabled allows to cache file upload and faster result polling. Useful
during development when extracting results of the same file multiple times.
Do not use when running this in production! Defaults to False.
precessing_timeout_seconds: The timeout in seconds for document processing.
For cases when the document faíled due internal reasons within the processing and
the error is not recoverable. Defaults to 30 minutes.
"""

base_url: str
customer_id: UUID
solution_id: UUID
token: str
dev_mode: bool = False
precessing_timeout_seconds: int = 30 * 60

_http_client: AsyncClient = field(init=False)
_poll_interval: float = field(init=False)
Expand Down Expand Up @@ -109,6 +115,7 @@ async def get_result(self, document_path: Path) -> Result:
Raises:
WaitForResultCancelledError: if the client gets closed while waiting for a result.
DocumentProcessingError: if the extraction failed.
DocumentProcessingTimeoutError: if the extraction was not processed in time.
HTTPError: if any operation to upload or getting the result fails with an
unexpected error.

Expand All @@ -119,6 +126,7 @@ async def get_result(self, document_path: Path) -> Result:
document_id = await self._upload_document(document_path=document_path)

# Poll for result
start_time = time.time()
while True:
await asyncio.sleep(self._poll_interval)
if self._closing:
Expand All @@ -131,6 +139,18 @@ async def get_result(self, document_path: Path) -> Result:
if state == DocumentState.PROCESSING_ERROR:
raise DocumentProcessingError

# in cases where the document was not processed in time
elapsed_time = time.time() - start_time
if elapsed_time > self.precessing_timeout_seconds:
# the processing failed (crash loops)
# and the state never set to DOCUMENT_PROCESSED or PROCESSING_ERROR
logger.error(
f"Document processing failed due to timeout"
f" ({self.precessing_timeout_seconds}s)"
f" after {int(elapsed_time)} seconds.",
)
raise DocumentProcessingTimeoutError

# Get result and return it
return await self._download_result(document_id)

Expand Down
9 changes: 8 additions & 1 deletion exb_sdk/workflow_client/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,15 @@
class DocumentProcessingError(Exception):
"""Raised when an error occurs during an extraction."""

def __init__(self, message: str = "Error processing document") -> None: # noqa: D107
super().__init__(message)


class DocumentProcessingTimeoutError(Exception):
"""Raised when an error occurs during an extraction."""

def __init__(self) -> None: # noqa: D107
message = "Error processing document"
message = "Error processing document due to timeout"
super().__init__(message)


Expand Down
35 changes: 34 additions & 1 deletion tests/workflow_client/test_get_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
from typing import TYPE_CHECKING

import pytest
from httpx import Request, Response

from exb_sdk.workflow_client import Client
from exb_sdk.workflow_client.api_constants import DocumentState
from exb_sdk.workflow_client.exceptions import DocumentProcessingError, WaitForResultCancelledError
from exb_sdk.workflow_client.exceptions import (
DocumentProcessingError,
DocumentProcessingTimeoutError,
WaitForResultCancelledError,
)
from tests.workflow_client.conftest import Context, response_result, response_state, response_upload

if TYPE_CHECKING:
Expand Down Expand Up @@ -89,6 +94,34 @@ async def test_error_in_processing(
await client.get_result(document_path=document_path)


async def test_timeout_in_processing(
ctx: Context,
client: Client,
document_path: Path,
httpx_mock: HTTPXMock,
) -> None:
async def simulate_network_latency(_: Request) -> Response:
await asyncio.sleep(2)
return Response(
status_code=200,
json={"state": str(DocumentState.QUEUED_FOR_PROCESSING)},
)

# arrange
client.precessing_timeout_seconds = 1

httpx_mock.add_response(**response_upload(ctx=ctx))
httpx_mock.add_callback(simulate_network_latency)

async with client:
# assert
with pytest.raises(
DocumentProcessingTimeoutError, match="Error processing document due to timeout"
):
# act
await client.get_result(document_path=document_path)


async def test_get_result_httpx_client_lifecycle(
ctx: Context,
client: Client,
Expand Down