Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
clemlesne committed Aug 16, 2024
2 parents 555ba0f + 3cc6018 commit 2c92ad2
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 24 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ Scraper:
- [x] Explore pages in depth by detecting links and de-duplicating them
- [x] Extract markdown content from a page with [html2text](https://github.com/aaronsw/html2text)
- [x] Load dynamic JavaScript content with [Playwright](https://github.com/microsoft/playwright-python)
- [x] Show progress with a status command
- [x] Track progress of total network usage
- [ ] Respect [`robots.txt`](https://en.wikipedia.org/wiki/Robots.txt)

Indexer:

Expand Down
1 change: 1 addition & 0 deletions app/models/scraped.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class ScrapedUrlModel(BaseModel):
created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
etag: str | None
links: list[str] = []
network_used_mb: float = 0.0
raw: str | None = None
redirect: str | None = None
status: int
Expand Down
1 change: 1 addition & 0 deletions app/models/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
class StateJobModel(BaseModel):
created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
last_updated: datetime = Field(default_factory=lambda: datetime.now(UTC))
network_used_mb: float = 0.0
processed: int = 0
queued: int = 0

Expand Down
76 changes: 52 additions & 24 deletions app/scrape.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio, random, re
from datetime import UTC, datetime, timedelta
from typing import Awaitable, Callable
from urllib.parse import urlparse
from uuid import uuid4

Expand Down Expand Up @@ -173,11 +174,11 @@ async def _process_one(
user_agents: list[str],
viewports: list[ViewportSize],
whitelist: dict[re.Pattern, list[re.Pattern]],
) -> tuple[int, int]:
) -> tuple[int, int, int]:
"""
Process one URL from the queue.
Returns the number of processed and queued URLs.
Returns the number of processed URLs, queued URLs, and total network used.
"""
queued_urls = 0

Expand Down Expand Up @@ -239,8 +240,8 @@ async def _process_one(
current_item.depth,
)

# Return the number of processed and queued URLs
return (0, queued_urls)
# Return the number of processed URLs, queued URLs, and total network used
return (0, queued_urls, 0)

except (ResourceNotFoundError, ValidationError):
logger.debug("Cache miss for %s", current_item.url)
Expand Down Expand Up @@ -273,8 +274,8 @@ async def _process_one(

logger.info("Used cache for %s (%i)", cache_item.url, current_item.depth)

# Return the number of processed and queued URLs
return (0, queued_urls)
# Return the number of processed URLs, queued URLs, and total network used
return (0, queued_urls, 0)

# Output to a blob and queue
model_bytes = new_result.model_dump_json().encode("utf-8")
Expand Down Expand Up @@ -303,8 +304,8 @@ async def _process_one(

logger.info("Scraped %s (%i)", new_result.url, current_item.depth)

# Return the number of processed and queued URLs
return (1, queued_urls)
# Return the number of processed URLs, queued URLs, and total network used
return (1, queued_urls, new_result.network_used_mb)


async def _worker(
Expand Down Expand Up @@ -342,6 +343,7 @@ async def _worker(
max_messages=32,
visibility_timeout=32 * 5, # 5 secs per message
):
total_network_used_mb = 0
total_processed = 0
total_queued = 0

Expand All @@ -352,7 +354,7 @@ async def _worker(
message.content
)
try:
processed, queued = await _process_one(
processed, queued, network_used_mb = await _process_one(
blob=blob,
browser=browser,
cache_refresh=cache_refresh,
Expand All @@ -368,6 +370,7 @@ async def _worker(
await in_queue.delete_message(message)

# Update counters
total_network_used_mb += network_used_mb
total_processed += processed
total_queued += queued

Expand Down Expand Up @@ -416,6 +419,7 @@ async def _worker(
state = StateJobModel()
# Update state
state.last_updated = datetime.now(UTC)
state.network_used_mb += total_network_used_mb
state.processed += total_processed
state.queued += total_queued
# Save state
Expand Down Expand Up @@ -477,27 +481,41 @@ def _ads_pattern() -> re.Pattern:
return _ads_pattern_cache


async def _filter_routes(
route: Route,
) -> None:
def _filter_routes(
size_callback: Callable[[int], None],
) -> Callable[[Route], Awaitable[None]]:
"""
Speed up page loading by aborting some requests.
It includes for images, media, fonts, and stylesheets.
"""
# Skip UI resources
if route.request.resource_type in {"image", "media", "font", "stylesheet"}:
logger.debug("Blocked resource type %s", route.request.resource_type)
await route.abort("blockedbyclient")
return

# Check if the request is to a known ad domain
if _ads_pattern().search(route.request.url) is not None:
logger.debug("Blocked ad %s", route.request.url)
await route.abort("blockedbyclient")
return
async def _wrapper(
route: Route,
) -> None:
# Skip UI resources
if route.request.resource_type in {"image", "media", "font", "stylesheet"}:
logger.debug("Blocked resource type %s", route.request.resource_type)
await route.abort("blockedbyclient")
return

# Check if the request is to a known ad domain
if _ads_pattern().search(route.request.url) is not None:
logger.debug("Blocked ad %s", route.request.url)
await route.abort("blockedbyclient")
return

# Continue the request
res = await route.fetch()

await route.continue_()
# Store content size
size_bytes = int(content_length) if (content_length := res.headers.get("content-length")) else 0
size_callback(size_bytes)

# Continue the request
await route.fulfill(response=res)

return _wrapper


async def _scrape_page(
Expand All @@ -523,11 +541,17 @@ def _generic_error(
# Return an empty result if Playwright fails
return ScrapedUrlModel(
etag=etag,
network_used_mb=total_size_bytes / 1024 / 1024,
status=status,
url=url_clean.geturl(),
valid_until=valid_until,
)

total_size_bytes = 0
def _size_callback(size_bytes: int) -> None:
nonlocal total_size_bytes
total_size_bytes += size_bytes

# Parse URL
url_clean = urlparse(url)._replace(
query="",
Expand All @@ -547,7 +571,7 @@ def _generic_error(
page = await context.new_page()

# Apply filtering to reduce traffic and CPU usage
await page.route("**/*", _filter_routes)
await page.route("**/*", _filter_routes(_size_callback))

# Caching of unchanged resources
if previous_etag:
Expand All @@ -568,6 +592,10 @@ def _generic_error(
)
return _generic_error(etag=previous_etag)

# Remove all routes, as our filter manipulates all requests, we don't need it anymore
# See: https://github.com/microsoft/playwright/issues/30667#issuecomment-2095788164
await page.unroute_all(behavior="ignoreErrors")

# Skip if the content is not HTML
content_type = res.headers.get("content-type", "")
if "text/html" not in content_type:
Expand Down

0 comments on commit 2c92ad2

Please sign in to comment.