diff --git a/README.md b/README.md index b22a140..6fc8d0f 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,9 @@ Scraper: - [x] Explore pages in depth by detecting links and de-duplicating them - [x] Extract markdown content from a page with [html2text](https://github.com/aaronsw/html2text) - [x] Load dynamic JavaScript content with [Playwright](https://github.com/microsoft/playwright-python) +- [x] Show progress with a status command +- [x] Track progress of total network usage +- [ ] Respect [`robots.txt`](https://en.wikipedia.org/wiki/Robots.txt) Indexer: diff --git a/app/models/scraped.py b/app/models/scraped.py index 17fa50a..bab5edc 100644 --- a/app/models/scraped.py +++ b/app/models/scraped.py @@ -12,6 +12,7 @@ class ScrapedUrlModel(BaseModel): created_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) etag: str | None links: list[str] = [] + network_used_mb: float = 0.0 raw: str | None = None redirect: str | None = None status: int diff --git a/app/models/state.py b/app/models/state.py index be21916..931cbc6 100644 --- a/app/models/state.py +++ b/app/models/state.py @@ -6,6 +6,7 @@ class StateJobModel(BaseModel): created_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) last_updated: datetime = Field(default_factory=lambda: datetime.now(UTC)) + network_used_mb: float = 0.0 processed: int = 0 queued: int = 0 diff --git a/app/scrape.py b/app/scrape.py index 553ee14..887a27e 100644 --- a/app/scrape.py +++ b/app/scrape.py @@ -1,5 +1,6 @@ import asyncio, random, re from datetime import UTC, datetime, timedelta +from typing import Awaitable, Callable from urllib.parse import urlparse from uuid import uuid4 @@ -173,11 +174,11 @@ async def _process_one( user_agents: list[str], viewports: list[ViewportSize], whitelist: dict[re.Pattern, list[re.Pattern]], -) -> tuple[int, int]: +) -> tuple[int, int, int]: """ Process one URL from the queue. - Returns the number of processed and queued URLs. + Returns the number of processed URLs, queued URLs, and total network used. """ queued_urls = 0 @@ -239,8 +240,8 @@ async def _process_one( current_item.depth, ) - # Return the number of processed and queued URLs - return (0, queued_urls) + # Return the number of processed URLs, queued URLs, and total network used + return (0, queued_urls, 0) except (ResourceNotFoundError, ValidationError): logger.debug("Cache miss for %s", current_item.url) @@ -273,8 +274,8 @@ async def _process_one( logger.info("Used cache for %s (%i)", cache_item.url, current_item.depth) - # Return the number of processed and queued URLs - return (0, queued_urls) + # Return the number of processed URLs, queued URLs, and total network used + return (0, queued_urls, 0) # Output to a blob and queue model_bytes = new_result.model_dump_json().encode("utf-8") @@ -303,8 +304,8 @@ async def _process_one( logger.info("Scraped %s (%i)", new_result.url, current_item.depth) - # Return the number of processed and queued URLs - return (1, queued_urls) + # Return the number of processed URLs, queued URLs, and total network used + return (1, queued_urls, new_result.network_used_mb) async def _worker( @@ -342,6 +343,7 @@ async def _worker( max_messages=32, visibility_timeout=32 * 5, # 5 secs per message ): + total_network_used_mb = 0 total_processed = 0 total_queued = 0 @@ -352,7 +354,7 @@ async def _worker( message.content ) try: - processed, queued = await _process_one( + processed, queued, network_used_mb = await _process_one( blob=blob, browser=browser, cache_refresh=cache_refresh, @@ -368,6 +370,7 @@ async def _worker( await in_queue.delete_message(message) # Update counters + total_network_used_mb += network_used_mb total_processed += processed total_queued += queued @@ -416,6 +419,7 @@ async def _worker( state = StateJobModel() # Update state state.last_updated = datetime.now(UTC) + state.network_used_mb += total_network_used_mb state.processed += total_processed state.queued += total_queued # Save state @@ -477,27 +481,41 @@ def _ads_pattern() -> re.Pattern: return _ads_pattern_cache -async def _filter_routes( - route: Route, -) -> None: +def _filter_routes( + size_callback: Callable[[int], None], +) -> Callable[[Route], Awaitable[None]]: """ Speed up page loading by aborting some requests. It includes for images, media, fonts, and stylesheets. """ - # Skip UI resources - if route.request.resource_type in {"image", "media", "font", "stylesheet"}: - logger.debug("Blocked resource type %s", route.request.resource_type) - await route.abort("blockedbyclient") - return - # Check if the request is to a known ad domain - if _ads_pattern().search(route.request.url) is not None: - logger.debug("Blocked ad %s", route.request.url) - await route.abort("blockedbyclient") - return + async def _wrapper( + route: Route, + ) -> None: + # Skip UI resources + if route.request.resource_type in {"image", "media", "font", "stylesheet"}: + logger.debug("Blocked resource type %s", route.request.resource_type) + await route.abort("blockedbyclient") + return + + # Check if the request is to a known ad domain + if _ads_pattern().search(route.request.url) is not None: + logger.debug("Blocked ad %s", route.request.url) + await route.abort("blockedbyclient") + return + + # Continue the request + res = await route.fetch() - await route.continue_() + # Store content size + size_bytes = int(content_length) if (content_length := res.headers.get("content-length")) else 0 + size_callback(size_bytes) + + # Continue the request + await route.fulfill(response=res) + + return _wrapper async def _scrape_page( @@ -523,11 +541,17 @@ def _generic_error( # Return an empty result if Playwright fails return ScrapedUrlModel( etag=etag, + network_used_mb=total_size_bytes / 1024 / 1024, status=status, url=url_clean.geturl(), valid_until=valid_until, ) + total_size_bytes = 0 + def _size_callback(size_bytes: int) -> None: + nonlocal total_size_bytes + total_size_bytes += size_bytes + # Parse URL url_clean = urlparse(url)._replace( query="", @@ -547,7 +571,7 @@ def _generic_error( page = await context.new_page() # Apply filtering to reduce traffic and CPU usage - await page.route("**/*", _filter_routes) + await page.route("**/*", _filter_routes(_size_callback)) # Caching of unchanged resources if previous_etag: @@ -568,6 +592,10 @@ def _generic_error( ) return _generic_error(etag=previous_etag) + # Remove all routes, as our filter manipulates all requests, we don't need it anymore + # See: https://github.com/microsoft/playwright/issues/30667#issuecomment-2095788164 + await page.unroute_all(behavior="ignoreErrors") + # Skip if the content is not HTML content_type = res.headers.get("content-type", "") if "text/html" not in content_type: