Skip to content

Commit

Permalink
feat: See the job status with persisted stats
Browse files Browse the repository at this point in the history
  • Loading branch information
clemlesne committed Aug 16, 2024
1 parent a0c2c63 commit 4e6c558
Show file tree
Hide file tree
Showing 6 changed files with 361 additions and 202 deletions.
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ Indexer:

### Scrape a website

#### Run a job

Basic usage:

```bash
Expand All @@ -48,8 +50,31 @@ For documentation on all available options, run:
scrape-it-now scrape run --help
```

#### Show job status

Basic usage:

```bash
export AZURE_STORAGE_CONNECTION_STRING=xxx
scrape-it-now scrape status [job_name]
```

Most frequent options are:

| `Options` | Description | `Environment variable` |
|-|-|-|
| `--azure-storage-connection-string`</br>`-ascs` | Azure Storage connection string | `AZURE_STORAGE_CONNECTION_STRING` |

For documentation on all available options, run:

```bash
scrape-it-now scrape status --help
```

### Index a scraped website

#### Run a job

Basic usage:

```bash
Expand Down
30 changes: 29 additions & 1 deletion app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from app.helpers.logging import logger
from app.helpers.monitoring import VERSION
from app.index import run as index_backend_run
from app.scrape import run as scrape_backend_run
from app.scrape import run as scrape_backend_run, state as scrape_backend_state


def run_in_async(func):
Expand Down Expand Up @@ -205,6 +205,34 @@ async def scrape_run(
)


@click.option(
"--azure-storage-connection-string",
"-ascs",
envvar="AZURE_STORAGE_CONNECTION_STRING",
hide_input=True,
required=True,
type=str,
)
@click.argument(
"job_name",
envvar="JOB_NAME",
)
@scrape.command("status")
@run_in_async
async def scrape_status(
azure_storage_connection_string: str,
job_name: str | None,
) -> None:
"""
Get the state of a scraping job.
"""
state = await scrape_backend_state(
job=job_name,
storage_connection_string=azure_storage_connection_string,
)
logger.info(state.model_dump_json())


@cli.group
def index() -> None:
"""
Expand Down
71 changes: 39 additions & 32 deletions app/helpers/persistence.py
Original file line number Diff line number Diff line change
@@ -1,88 +1,95 @@
from os import environ as env

from app.helpers.logging import logger
from azure.core.credentials import AzureKeyCredential
from azure.core.exceptions import ResourceExistsError
from azure.search.documents.aio import SearchClient
from azure.storage.blob.aio import BlobServiceClient, ContainerClient
from azure.storage.queue.aio import QueueClient, QueueServiceClient
from contextlib import asynccontextmanager
from openai import AsyncAzureOpenAI

from app.helpers.logging import logger
from typing import AsyncGenerator


@asynccontextmanager
async def openai_client(
api_key: str,
api_version: str,
endpoint: str,
) -> AsyncAzureOpenAI:
) -> AsyncGenerator[AsyncAzureOpenAI, None]:
"""
Get the Azure OpenAI client.
"""
return AsyncAzureOpenAI(
async with AsyncAzureOpenAI(
# Deployment
api_version=api_version,
azure_endpoint=endpoint,
# Authentication
api_key=api_key,
)
) as client:
yield client


@asynccontextmanager
async def search_client(
api_key: str,
endpoint: str,
index: str,
) -> SearchClient:
) -> AsyncGenerator[SearchClient, None]:
"""
Get the Azure AI Search client.
"""
return SearchClient(
async with SearchClient(
# Deployment
endpoint=endpoint,
index_name=index,
# Authentication
credential=AzureKeyCredential(api_key),
)
) as client:
yield client


@asynccontextmanager
async def blob_client(
connection_string: str,
container: str,
) -> ContainerClient:
) -> AsyncGenerator[ContainerClient, None]:
"""
Get the Azure Blob Storage client.
"""
client = BlobServiceClient.from_connection_string(
async with BlobServiceClient.from_connection_string(
connection_string
).get_container_client(container)
) as x:
client = x.get_container_client(container)

# Create if it does not exist
try:
await client.create_container()
logger.info('Created Blob Storage "%s"', container)
except ResourceExistsError:
pass
# Create if it does not exist
try:
await client.create_container()
logger.info('Created Blob Storage "%s"', container)
except ResourceExistsError:
pass

# Return client
return client
# Return client
yield client


@asynccontextmanager
async def queue_client(
connection_string: str,
queue: str,
) -> QueueClient:
) -> AsyncGenerator[QueueClient, None]:
"""
Get the Azure Queue Storage client.
"""
client = QueueServiceClient.from_connection_string(
async with QueueServiceClient.from_connection_string(
connection_string
).get_queue_client(queue)
) as x:
client = x.get_queue_client(queue)

# Create if it does not exist
try:
await client.create_queue()
logger.info('Created Queue Storage "%s"', queue)
except ResourceExistsError:
pass
# Create if it does not exist
try:
await client.create_queue()
logger.info('Created Queue Storage "%s"', queue)
except ResourceExistsError:
pass

# Return client
return client
# Return client
yield client
101 changes: 47 additions & 54 deletions app/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,69 +371,62 @@ async def run(

async def _worker(
azure_openai_api_key: str,
openai_api_version: str,
azure_openai_embedding_deployment: str,
azure_openai_embedding_dimensions: int,
azure_openai_endpoint: str,
job: str,
openai_api_version: str,
search_api_key: str,
search_endpoint: str,
search_index: str,
storage_connection_string: str,
) -> None:
# Init clients
blob = await blob_client(
async with blob_client(
connection_string=storage_connection_string,
container=scrape_container_name(job),
)
openai = await openai_client(
api_key=azure_openai_api_key,
api_version=openai_api_version,
endpoint=azure_openai_endpoint,
)
queue = await queue_client(
connection_string=storage_connection_string,
queue=chunck_queue_name(job),
)
search = await search_client(
api_key=search_api_key,
endpoint=search_endpoint,
index=search_index,
)

try:
# Process the queue
while messages := queue.receive_messages(
max_messages=32,
visibility_timeout=32 * 10, # 10 secs per message
):
logger.info("Processing new messages")
async for message in messages:
blob_name = message.content
try:
await _process_one(
blob=blob,
embedding_deployment=azure_openai_embedding_deployment,
embedding_dimensions=azure_openai_embedding_dimensions,
file_name=blob_name,
openai=openai,
search=search,
)
await queue.delete_message(message)
except Exception:
# TODO: Add a dead-letter queue
# TODO: Add a retry mechanism
# TODO: Narrow the exception type
logger.error("Error processing %s", blob_name, exc_info=True)

# Wait 3 sec to avoid spamming the queue if it is empty
await asyncio.sleep(3)

logger.info("No more queued messages, exiting")

finally:
# Close the clients
await blob.close()
await openai.close()
await queue.close()
await search.close()
) as blob:
async with openai_client(
api_key=azure_openai_api_key,
api_version=openai_api_version,
endpoint=azure_openai_endpoint,
) as openai:
async with queue_client(
connection_string=storage_connection_string,
queue=chunck_queue_name(job),
) as queue:
async with search_client(
api_key=search_api_key,
endpoint=search_endpoint,
index=search_index,
) as search:

# Process the queue
while messages := queue.receive_messages(
max_messages=32,
visibility_timeout=32 * 10, # 10 secs per message
):
logger.info("Processing new messages")
async for message in messages:
blob_name = message.content
try:
await _process_one(
blob=blob,
embedding_deployment=azure_openai_embedding_deployment,
embedding_dimensions=azure_openai_embedding_dimensions,
file_name=blob_name,
openai=openai,
search=search,
)
await queue.delete_message(message)

except Exception:
# TODO: Add a dead-letter queue
# TODO: Add a retry mechanism
# TODO: Narrow the exception type
logger.error("Error processing %s", blob_name, exc_info=True)

# Wait 3 sec to avoid spamming the queue if it is empty
await asyncio.sleep(3)

logger.info("No more queued messages, exiting")
13 changes: 13 additions & 0 deletions app/models/state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from datetime import datetime, UTC
from pydantic import BaseModel, Field


class StateJobModel(BaseModel):
created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
last_updated: datetime = Field(default_factory=lambda: datetime.now(UTC))
processed: int = 0
queued: int = 0


class StateScrapedModel(BaseModel):
created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
Loading

0 comments on commit 4e6c558

Please sign in to comment.