diff --git a/README.md b/README.md index 0de3cad..70b4fbb 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,8 @@ Indexer: ### Scrape a website +#### Run a job + Basic usage: ```bash @@ -48,8 +50,31 @@ For documentation on all available options, run: scrape-it-now scrape run --help ``` +#### Show job status + +Basic usage: + +```bash +export AZURE_STORAGE_CONNECTION_STRING=xxx +scrape-it-now scrape status [job_name] +``` + +Most frequent options are: + +| `Options` | Description | `Environment variable` | +|-|-|-| +| `--azure-storage-connection-string`
`-ascs` | Azure Storage connection string | `AZURE_STORAGE_CONNECTION_STRING` | + +For documentation on all available options, run: + +```bash +scrape-it-now scrape status --help +``` + ### Index a scraped website +#### Run a job + Basic usage: ```bash diff --git a/app/app.py b/app/app.py index 9bef753..1cb262a 100644 --- a/app/app.py +++ b/app/app.py @@ -7,7 +7,7 @@ from app.helpers.logging import logger from app.helpers.monitoring import VERSION from app.index import run as index_backend_run -from app.scrape import run as scrape_backend_run +from app.scrape import run as scrape_backend_run, state as scrape_backend_state def run_in_async(func): @@ -205,6 +205,34 @@ async def scrape_run( ) +@click.option( + "--azure-storage-connection-string", + "-ascs", + envvar="AZURE_STORAGE_CONNECTION_STRING", + hide_input=True, + required=True, + type=str, +) +@click.argument( + "job_name", + envvar="JOB_NAME", +) +@scrape.command("status") +@run_in_async +async def scrape_status( + azure_storage_connection_string: str, + job_name: str | None, +) -> None: + """ + Get the state of a scraping job. + """ + state = await scrape_backend_state( + job=job_name, + storage_connection_string=azure_storage_connection_string, + ) + logger.info(state.model_dump_json()) + + @cli.group def index() -> None: """ diff --git a/app/helpers/persistence.py b/app/helpers/persistence.py index 977cec8..bff8595 100644 --- a/app/helpers/persistence.py +++ b/app/helpers/persistence.py @@ -1,88 +1,95 @@ -from os import environ as env - +from app.helpers.logging import logger from azure.core.credentials import AzureKeyCredential from azure.core.exceptions import ResourceExistsError from azure.search.documents.aio import SearchClient from azure.storage.blob.aio import BlobServiceClient, ContainerClient from azure.storage.queue.aio import QueueClient, QueueServiceClient +from contextlib import asynccontextmanager from openai import AsyncAzureOpenAI - -from app.helpers.logging import logger +from typing import AsyncGenerator +@asynccontextmanager async def openai_client( api_key: str, api_version: str, endpoint: str, -) -> AsyncAzureOpenAI: +) -> AsyncGenerator[AsyncAzureOpenAI, None]: """ Get the Azure OpenAI client. """ - return AsyncAzureOpenAI( + async with AsyncAzureOpenAI( # Deployment api_version=api_version, azure_endpoint=endpoint, # Authentication api_key=api_key, - ) + ) as client: + yield client +@asynccontextmanager async def search_client( api_key: str, endpoint: str, index: str, -) -> SearchClient: +) -> AsyncGenerator[SearchClient, None]: """ Get the Azure AI Search client. """ - return SearchClient( + async with SearchClient( # Deployment endpoint=endpoint, index_name=index, # Authentication credential=AzureKeyCredential(api_key), - ) + ) as client: + yield client +@asynccontextmanager async def blob_client( connection_string: str, container: str, -) -> ContainerClient: +) -> AsyncGenerator[ContainerClient, None]: """ Get the Azure Blob Storage client. """ - client = BlobServiceClient.from_connection_string( + async with BlobServiceClient.from_connection_string( connection_string - ).get_container_client(container) + ) as x: + client = x.get_container_client(container) - # Create if it does not exist - try: - await client.create_container() - logger.info('Created Blob Storage "%s"', container) - except ResourceExistsError: - pass + # Create if it does not exist + try: + await client.create_container() + logger.info('Created Blob Storage "%s"', container) + except ResourceExistsError: + pass - # Return client - return client + # Return client + yield client +@asynccontextmanager async def queue_client( connection_string: str, queue: str, -) -> QueueClient: +) -> AsyncGenerator[QueueClient, None]: """ Get the Azure Queue Storage client. """ - client = QueueServiceClient.from_connection_string( + async with QueueServiceClient.from_connection_string( connection_string - ).get_queue_client(queue) + ) as x: + client = x.get_queue_client(queue) - # Create if it does not exist - try: - await client.create_queue() - logger.info('Created Queue Storage "%s"', queue) - except ResourceExistsError: - pass + # Create if it does not exist + try: + await client.create_queue() + logger.info('Created Queue Storage "%s"', queue) + except ResourceExistsError: + pass - # Return client - return client + # Return client + yield client diff --git a/app/index.py b/app/index.py index 8daea36..d470fe3 100644 --- a/app/index.py +++ b/app/index.py @@ -371,69 +371,62 @@ async def run( async def _worker( azure_openai_api_key: str, - openai_api_version: str, azure_openai_embedding_deployment: str, azure_openai_embedding_dimensions: int, azure_openai_endpoint: str, job: str, + openai_api_version: str, search_api_key: str, search_endpoint: str, search_index: str, storage_connection_string: str, ) -> None: # Init clients - blob = await blob_client( + async with blob_client( connection_string=storage_connection_string, container=scrape_container_name(job), - ) - openai = await openai_client( - api_key=azure_openai_api_key, - api_version=openai_api_version, - endpoint=azure_openai_endpoint, - ) - queue = await queue_client( - connection_string=storage_connection_string, - queue=chunck_queue_name(job), - ) - search = await search_client( - api_key=search_api_key, - endpoint=search_endpoint, - index=search_index, - ) - - try: - # Process the queue - while messages := queue.receive_messages( - max_messages=32, - visibility_timeout=32 * 10, # 10 secs per message - ): - logger.info("Processing new messages") - async for message in messages: - blob_name = message.content - try: - await _process_one( - blob=blob, - embedding_deployment=azure_openai_embedding_deployment, - embedding_dimensions=azure_openai_embedding_dimensions, - file_name=blob_name, - openai=openai, - search=search, - ) - await queue.delete_message(message) - except Exception: - # TODO: Add a dead-letter queue - # TODO: Add a retry mechanism - # TODO: Narrow the exception type - logger.error("Error processing %s", blob_name, exc_info=True) - - # Wait 3 sec to avoid spamming the queue if it is empty - await asyncio.sleep(3) - - logger.info("No more queued messages, exiting") - - finally: - # Close the clients - await blob.close() - await openai.close() - await queue.close() - await search.close() + ) as blob: + async with openai_client( + api_key=azure_openai_api_key, + api_version=openai_api_version, + endpoint=azure_openai_endpoint, + ) as openai: + async with queue_client( + connection_string=storage_connection_string, + queue=chunck_queue_name(job), + ) as queue: + async with search_client( + api_key=search_api_key, + endpoint=search_endpoint, + index=search_index, + ) as search: + + # Process the queue + while messages := queue.receive_messages( + max_messages=32, + visibility_timeout=32 * 10, # 10 secs per message + ): + logger.info("Processing new messages") + async for message in messages: + blob_name = message.content + try: + await _process_one( + blob=blob, + embedding_deployment=azure_openai_embedding_deployment, + embedding_dimensions=azure_openai_embedding_dimensions, + file_name=blob_name, + openai=openai, + search=search, + ) + await queue.delete_message(message) + + except Exception: + # TODO: Add a dead-letter queue + # TODO: Add a retry mechanism + # TODO: Narrow the exception type + logger.error("Error processing %s", blob_name, exc_info=True) + + # Wait 3 sec to avoid spamming the queue if it is empty + await asyncio.sleep(3) + + logger.info("No more queued messages, exiting") diff --git a/app/models/state.py b/app/models/state.py new file mode 100644 index 0000000..eaf2c12 --- /dev/null +++ b/app/models/state.py @@ -0,0 +1,13 @@ +from datetime import datetime, UTC +from pydantic import BaseModel, Field + + +class StateJobModel(BaseModel): + created_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) + last_updated: datetime = Field(default_factory=lambda: datetime.now(UTC)) + processed: int = 0 + queued: int = 0 + + +class StateScrapedModel(BaseModel): + created_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) diff --git a/app/scrape.py b/app/scrape.py index a7228d0..02bbbc6 100644 --- a/app/scrape.py +++ b/app/scrape.py @@ -1,8 +1,9 @@ import asyncio, random, re from datetime import UTC, datetime, timedelta from urllib.parse import urlparse +from uuid import uuid4 -from azure.core.exceptions import ResourceNotFoundError +from azure.core.exceptions import ResourceNotFoundError, ResourceExistsError from azure.storage.blob.aio import ContainerClient from azure.storage.queue.aio import QueueClient from html2text import HTML2Text @@ -28,6 +29,10 @@ from app.helpers.threading import run_workers from app.helpers.trie import Trie from app.models.scraped import ScrapedQueuedModel, ScrapedUrlModel +from app.models.state import StateJobModel, StateScrapedModel + +# State +JOB_STATE_NAME = "job.json" # Storage SCRAPED_PREFIX = "scraped" @@ -56,18 +61,26 @@ async def _queue( referrer: str, urls: set[str], whitelist: dict[re.Pattern, list[re.Pattern]], -) -> None: +) -> int: + """ + Add URLs to the queue and update its state. + + The URLs are filtered by the whitelist and the cache. Then, they are added to the input queue. + + Returns the number of URLs queued. + """ # Skip if the depth is too high new_depth = deph + 1 if new_depth > max_depth: logger.info("Skipping %s (%i), depth too high", referrer, new_depth) - return + # Return the number of URLs queued + return 0 # Add the referrer to the set of scraped URLs - state_data = datetime.now(UTC).isoformat().encode("utf-8") + state_bytes = StateScrapedModel().model_dump_json().encode("utf-8") await blob.upload_blob( - data=state_data, - length=len(state_data), + data=state_bytes, + length=len(state_bytes), name=f"{STATE_PREFIX}/{id}", overwrite=True, ) @@ -99,26 +112,25 @@ async def _add( # Test previous attempts try: # Load from the validity cache - previous = await blob.download_blob( + f = await blob.download_blob( blob=f"{STATE_PREFIX}/{hash_url(url)}", encoding="utf-8", ) + previous = StateScrapedModel.model_validate_json(await f.readall()) # Skip if the previous attempt is too recent # Date is now and not the one from the model, on purposes. Otherwise, if its a cached model, the date would match the frefresher date every time. - previous_at = datetime.fromisoformat(await previous.readall()) - if previous_at >= datetime.now(UTC) - cache_refresh: - logger.debug( - "Skipping %s due to recent attempt at %s", url, previous_at - ) + if previous.created_at >= datetime.now(UTC) - cache_refresh: + logger.debug("Skipping %s due to recent attempt at %s", url, previous.created_at) return False - except ResourceNotFoundError: - pass + + except (ResourceNotFoundError, ValidationError): + logger.debug("State miss for %s", url) # Update the validity cache await blob.upload_blob( - data=state_data, - length=len(state_data), + data=state_bytes, + length=len(state_bytes), name=f"{STATE_PREFIX}/{hash_url(url)}", overwrite=True, ) @@ -143,6 +155,9 @@ async def _add( new_depth, ) + # Return the number of URLs queued + return sum(res) + async def _process_one( blob: ContainerClient, @@ -156,7 +171,14 @@ async def _process_one( user_agents: list[str], viewports: list[ViewportSize], whitelist: dict[re.Pattern, list[re.Pattern]], -) -> None: +) -> tuple[int, int]: + """ + Process one URL from the queue. + + Returns the number of processed and queued URLs. + """ + queued_urls = 0 + # Check if the URL has already been processed current_id = hash_url(current_item.url) cache_name = f"{SCRAPED_PREFIX}/{current_id}.json" @@ -197,7 +219,7 @@ async def _process_one( raise ResourceNotFoundError # Add the links to the queue - await _queue( + queued_urls = await _queue( blob=blob, cache_refresh=cache_refresh, deph=current_item.depth, @@ -208,12 +230,15 @@ async def _process_one( urls=cache_item.links, whitelist=whitelist, ) + logger.info( "Loaded %s (%i) from cache", cache_item.url, current_item.depth, ) - return + + # Return the number of processed and queued URLs + return (0, queued_urls) except (ResourceNotFoundError, ValidationError): logger.debug("Cache miss for %s", current_item.url) @@ -232,7 +257,7 @@ async def _process_one( # Use cache data if scraping fails or is cached if not new_result and cache_item: # Add the links to the queue - await _queue( + queued_urls = await _queue( blob=blob, cache_refresh=cache_refresh, deph=current_item.depth, @@ -243,8 +268,11 @@ async def _process_one( urls=cache_item.links, whitelist=whitelist, ) + logger.info("Used cache for %s (%i)", cache_item.url, current_item.depth) - return + + # Return the number of processed and queued URLs + return (0, queued_urls) # Output to a blob and queue model_bytes = new_result.model_dump_json().encode("utf-8") @@ -259,7 +287,7 @@ async def _process_one( await out_queue.send_message(content=f"{SCRAPED_PREFIX}/{current_id}.json") # Add the links to the queue - await _queue( + queued_urls = await _queue( blob=blob, cache_refresh=cache_refresh, deph=current_item.depth, @@ -273,6 +301,9 @@ async def _process_one( logger.info("Scraped %s (%i)", new_result.url, current_item.depth) + # Return the number of processed and queued URLs + return (1, queued_urls) + async def _worker( cache_refresh: timedelta, @@ -285,75 +316,120 @@ async def _worker( whitelist: dict[re.Pattern, list[re.Pattern]], ) -> None: # Init clients - blob = await blob_client( + async with blob_client( connection_string=storage_connection_string, container=scrape_container_name(job), - ) - in_queue = await queue_client( - connection_string=storage_connection_string, - queue=scrape_queue_name(job), - ) - out_queue = await queue_client( - connection_string=storage_connection_string, - queue=chunck_queue_name(job), - ) - - try: - # Init Playwright context - async with async_playwright() as p: - browser_type = p.chromium - browser = await browser_type.launch() - logger.info("Browser %s launched", browser_type.name) - - # Process the queue - while messages := in_queue.receive_messages( - max_messages=32, - visibility_timeout=32 * 5, # 5 secs per message - ): - logger.info("Processing new messages") - async for message in messages: - current_item = ScrapedQueuedModel.model_validate_json( - message.content - ) - try: - await _process_one( - blob=blob, - browser=browser, - cache_refresh=cache_refresh, - current_item=current_item, - in_queue=in_queue, - max_depth=max_depth, - out_queue=out_queue, - timezones=timezones, - user_agents=user_agents, - viewports=viewports, - whitelist=whitelist, + ) as blob: + async with queue_client( + connection_string=storage_connection_string, + queue=scrape_queue_name(job), + ) as in_queue: + async with queue_client( + connection_string=storage_connection_string, + queue=chunck_queue_name(job), + ) as out_queue: + + # Init Playwright context + async with async_playwright() as p: + browser_type = p.chromium + browser = await browser_type.launch() + logger.info("Browser %s launched", browser_type.name) + + # Process the queue + while messages := in_queue.receive_messages( + max_messages=32, + visibility_timeout=32 * 5, # 5 secs per message + ): + total_processed = 0 + total_queued = 0 + + # Iterate over the messages + logger.info("Processing new messages") + async for message in messages: + current_item = ScrapedQueuedModel.model_validate_json( + message.content + ) + try: + processed, queued = await _process_one( + blob=blob, + browser=browser, + cache_refresh=cache_refresh, + current_item=current_item, + in_queue=in_queue, + max_depth=max_depth, + out_queue=out_queue, + timezones=timezones, + user_agents=user_agents, + viewports=viewports, + whitelist=whitelist, + ) + await in_queue.delete_message(message) + + # Update counters + total_processed += processed + total_queued += queued + + except Exception: + # TODO: Add a dead-letter queue + # TODO: Add a retry mechanism + # TODO: Narrow the exception type + logger.error( + "Error processing %s (%i)", + current_item.url, + current_item.depth, + exc_info=True, + ) + + # Update job state + state_blob = blob.get_blob_client(JOB_STATE_NAME) + # Acquire a lease + logger.debug("Acquiring lease for job state") + lease_continue = True + while lease_continue: + try: + state_lease = await state_blob.acquire_lease( + lease_duration=15, + lease_id=str(uuid4()), + ) + lease_continue = False + except ResourceExistsError: # Wait for the lease to expire + logger.debug("Lease already exists, waiting") + await asyncio.sleep(1) + except ResourceNotFoundError: # Create the blob if it does not exist + logger.debug("State blob does not exist, creating an empty one") + await state_blob.upload_blob( + data=b"", + length=0, + overwrite=True, + ) + # Parse existing state + try: + f = await state_blob.download_blob(encoding="utf-8") + state = StateJobModel.model_validate_json(await f.readall()) + except (ResourceNotFoundError, ValidationError): + state = StateJobModel() + # Update state + state.last_updated = datetime.now(UTC) + state.processed += total_processed + state.queued += total_queued + # Save state + await state_blob.upload_blob( + data=state.model_dump_json().encode("utf-8"), + lease=state_lease.id, + length=len(state.model_dump_json()), + overwrite=True, ) - await in_queue.delete_message(message) - except Exception: - # TODO: Add a dead-letter queue - # TODO: Add a retry mechanism - # TODO: Narrow the exception type - logger.error( - "Error processing %s (%i)", - current_item.url, - current_item.depth, - exc_info=True, - ) - - # Wait 3 sec to avoid spamming the queue if it is empty - await asyncio.sleep(3) + # Release the lease + await state_lease.release() + logger.info("Updated job state to %i processed and %i queued", state.processed, state.queued) - logger.info("No more queued messages, exiting") + # Wait 3 sec to avoid spamming the queue if it is empty + await asyncio.sleep(3) - # Close the browser - await browser.close() + logger.info("No more queued messages, exiting") - finally: - # Close clients - await blob.close() - await in_queue.close() - await out_queue.close() + # Close the browser + await browser.close() def _ads_pattern() -> re.Pattern: @@ -650,38 +726,31 @@ async def run( viewports_parsed.append(ViewportSize(width=width, height=height)) # Init clients - blob = await blob_client( + async with blob_client( connection_string=storage_connection_string, container=scrape_container_name(job), - ) - in_queue = await queue_client( - connection_string=storage_connection_string, - queue=scrape_queue_name(job), - ) - - # Add initial URL to the queue - model = ScrapedQueuedModel( - depth=0, - referrer="https://www.google.com/search", - url=url, - ) - - try: - await _queue( - blob=blob, - cache_refresh=cache_refresh_parsed, - deph=model.depth, - id=hash_url(model.referrer), - in_queue=in_queue, - max_depth=max_depth, - referrer=model.referrer, - urls={model.url}, - whitelist=whitelist, - ) - finally: - # Close clients - await blob.close() - await in_queue.close() + ) as blob: + async with queue_client( + connection_string=storage_connection_string, + queue=scrape_queue_name(job), + ) as in_queue: + # Add initial URL to the queue + model = ScrapedQueuedModel( + depth=0, + referrer="https://www.google.com/search", + url=url, + ) + await _queue( + blob=blob, + cache_refresh=cache_refresh_parsed, + deph=model.depth, + id=hash_url(model.referrer), + in_queue=in_queue, + max_depth=max_depth, + referrer=model.referrer, + urls={model.url}, + whitelist=whitelist, + ) run_workers( cache_refresh=cache_refresh_parsed, @@ -696,3 +765,27 @@ async def run( viewports=viewports_parsed, whitelist=whitelist, ) + + +async def state( + storage_connection_string: str, + job: str, +) -> StateJobModel | None: + # Init clients + async with blob_client( + connection_string=storage_connection_string, + container=scrape_container_name(job), + ) as blob: + # Load the state + state = None + try: + f = await blob.download_blob( + blob=JOB_STATE_NAME, + encoding="utf-8", + ) + state = StateJobModel.model_validate_json(await f.readall()) + except (ResourceNotFoundError, ValidationError): + pass + + # Return model + return state