Skip to content

Commit f7ba712

Browse files
tw4likreymer
andcommitted
Add seed file support to Browsertrix backend (#2710)
Fixes #2673 Changes in this PR: - Adds a new `file_uploads.py` module and corresponding `/files` API prefix with methods/endpoints for uploading, GETing, and deleting seed files (can be extended to other types of files moving forward) - Seed files are supported via `CrawlConfig.config.seedFileId` on POST and PATCH endpoints. This seedFileId is replaced by a presigned url when passed to the crawler by the operator - Seed files are read when first uploaded to calculate `firstSeed` and `seedCount` and store them in the database, and this is copied into the workflow and crawl documents when they are created. - Logic is added to store `firstSeed` and `seedCount` for other workflows as well, and a migration added to backfill data, to maintain consistency and fix some of the pymongo aggregations that previously assumed all workflows would have at least one `Seed` object in `CrawlConfig.seeds` - Seed file and thumbnail storage stats are added to org stats - Seed file and thumbnail uploads first check that the org's storage quota has not been exceeded and return a 400 if so - A cron background job (run weekly each Sunday at midnight by default, but configurable) is added to look for seed files at least x minutes old (1440 minutes, or 1 day, by default, but configurable) that are not in use in any workflows, and to delete them when they are found. The backend pods will ensure this k8s batch job exists when starting up and create it if it does not already exist. A database entry for each run of the job is created in the operator on job completion so that it'll appear in the `/jobs` API endpoints, but retrying of this type of regularly scheduled background job is not supported as we don't want to accidentally create multiple competing scheduled jobs. - Adds a `min_seed_file_crawler_image` value to the Helm chart that is checked before creating a crawl from a workflow if set. If a workflow cannot be run, return the detail of the exception in `CrawlConfigAddedResponse.errorDetail` so that we can display the reason in the frontend - Add SeedFile model from base UserFile (former ImageFIle), ensure all APIs returning uploaded files return an absolute pre-signed URL (either with external origin or internal service origin) --------- Co-authored-by: Ilya Kreymer <[email protected]>
1 parent 8107b05 commit f7ba712

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1649
-187
lines changed

backend/btrixcloud/background_jobs.py

Lines changed: 52 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import asyncio
44
import os
5+
import secrets
56
from datetime import datetime
67
from typing import Optional, Tuple, Union, List, Dict, TYPE_CHECKING, cast
78
from uuid import UUID
@@ -24,6 +25,7 @@
2425
RecalculateOrgStatsJob,
2526
ReAddOrgPagesJob,
2627
OptimizePagesJob,
28+
CleanupSeedFilesJob,
2729
PaginatedBackgroundJobResponse,
2830
AnyJob,
2931
StorageRef,
@@ -43,7 +45,7 @@
4345

4446

4547
# ============================================================================
46-
# pylint: disable=too-many-instance-attributes
48+
# pylint: disable=too-many-instance-attributes, too-many-public-methods
4749
class BackgroundJobOps:
4850
"""k8s background job management"""
4951

@@ -473,17 +475,39 @@ async def create_optimize_crawl_pages_job(
473475
print(f"warning: optimize pages job could not be started: {exc}")
474476
return None
475477

478+
async def ensure_cron_cleanup_jobs_exist(self):
479+
"""Ensure background job to clean up unused seed files weekly exists"""
480+
await self.crawl_manager.ensure_cleanup_seed_file_cron_job_exists()
481+
476482
async def job_finished(
477483
self,
478484
job_id: str,
479485
job_type: str,
480486
success: bool,
481487
finished: datetime,
488+
started: Optional[datetime] = None,
482489
oid: Optional[UUID] = None,
483490
) -> None:
484491
"""Update job as finished, including
485492
job-specific task handling"""
486493

494+
# For seed file cleanup jobs, no database record will exist for each
495+
# run before this point, so create it here
496+
if job_type == BgJobType.CLEANUP_SEED_FILES:
497+
if not started:
498+
started = finished
499+
cleanup_job = CleanupSeedFilesJob(
500+
id=f"seed-files-{secrets.token_hex(5)}",
501+
type=BgJobType.CLEANUP_SEED_FILES,
502+
started=started,
503+
finished=finished,
504+
success=success,
505+
)
506+
await self.jobs.insert_one(cleanup_job.to_dict())
507+
if not success:
508+
await self._send_bg_job_failure_email(cleanup_job, finished)
509+
return
510+
487511
job = await self.get_background_job(job_id)
488512
if job.finished:
489513
return
@@ -499,28 +523,31 @@ async def job_finished(
499523
cast(DeleteReplicaJob, job)
500524
)
501525
else:
502-
print(
503-
f"Background job {job.id} failed, sending email to superuser",
504-
flush=True,
505-
)
506-
superuser = await self.user_manager.get_superuser()
507-
org = None
508-
if job.oid:
509-
org = await self.org_ops.get_org_by_id(job.oid)
510-
await asyncio.get_event_loop().run_in_executor(
511-
None,
512-
self.email.send_background_job_failed,
513-
job,
514-
finished,
515-
superuser.email,
516-
org,
517-
)
526+
await self._send_bg_job_failure_email(job, finished)
518527

519528
await self.jobs.find_one_and_update(
520529
{"_id": job_id, "oid": oid},
521530
{"$set": {"success": success, "finished": finished}},
522531
)
523532

533+
async def _send_bg_job_failure_email(self, job: BackgroundJob, finished: datetime):
534+
print(
535+
f"Background job {job.id} failed, sending email to superuser",
536+
flush=True,
537+
)
538+
superuser = await self.user_manager.get_superuser()
539+
org = None
540+
if job.oid:
541+
org = await self.org_ops.get_org_by_id(job.oid)
542+
await asyncio.get_event_loop().run_in_executor(
543+
None,
544+
self.email.send_background_job_failed,
545+
job,
546+
finished,
547+
superuser.email,
548+
org,
549+
)
550+
524551
async def get_background_job(
525552
self, job_id: str, oid: Optional[UUID] = None
526553
) -> Union[
@@ -530,6 +557,7 @@ async def get_background_job(
530557
RecalculateOrgStatsJob,
531558
ReAddOrgPagesJob,
532559
OptimizePagesJob,
560+
CleanupSeedFilesJob,
533561
]:
534562
"""Get background job"""
535563
query: dict[str, object] = {"_id": job_id}
@@ -542,6 +570,7 @@ async def get_background_job(
542570

543571
return self._get_job_by_type_from_data(res)
544572

573+
# pylint: disable=too-many-return-statements
545574
def _get_job_by_type_from_data(self, data: dict[str, object]):
546575
"""convert dict to propert background job type"""
547576
if data["type"] == BgJobType.CREATE_REPLICA:
@@ -559,6 +588,9 @@ def _get_job_by_type_from_data(self, data: dict[str, object]):
559588
if data["type"] == BgJobType.OPTIMIZE_PAGES:
560589
return OptimizePagesJob.from_dict(data)
561590

591+
if data["type"] == BgJobType.CLEANUP_SEED_FILES:
592+
return CleanupSeedFilesJob.from_dict(data)
593+
562594
return DeleteOrgJob.from_dict(data)
563595

564596
async def list_background_jobs(
@@ -736,6 +768,9 @@ async def retry_org_background_job(
736768
)
737769
return {"success": True}
738770

771+
if job.type == BgJobType.CLEANUP_SEED_FILES:
772+
raise HTTPException(status_code=400, detail="cron_job_retry_not_supported")
773+
739774
return {"success": False}
740775

741776
async def retry_failed_org_background_jobs(

backend/btrixcloud/basecrawls.py

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -432,27 +432,14 @@ async def _resolve_crawl_refs(
432432
crawl: Union[CrawlOut, CrawlOutWithResources],
433433
org: Optional[Organization],
434434
files: Optional[list[dict]],
435-
add_first_seed: bool = True,
436435
):
437436
"""Resolve running crawl data"""
438437
# pylint: disable=too-many-branches
439-
config = None
440-
if crawl.cid:
441-
config = await self.crawl_configs.get_crawl_config(
442-
crawl.cid, org.id if org else None, active_only=False
443-
)
444-
445438
if not org:
446439
org = await self.orgs.get_org_by_id(crawl.oid)
447440
if not org:
448441
raise HTTPException(status_code=400, detail="missing_org")
449442

450-
if config and config.config.seeds:
451-
if add_first_seed:
452-
first_seed = config.config.seeds[0]
453-
crawl.firstSeed = first_seed.url
454-
crawl.seedCount = len(config.config.seeds)
455-
456443
if hasattr(crawl, "profileid") and crawl.profileid:
457444
crawl.profileName = await self.crawl_configs.profiles.get_profile_name(
458445
crawl.profileid, org
@@ -685,9 +672,7 @@ async def list_all_base_crawls(
685672

686673
aggregate = [
687674
{"$match": query},
688-
{"$set": {"firstSeedObject": {"$arrayElemAt": ["$config.seeds", 0]}}},
689-
{"$set": {"firstSeed": "$firstSeedObject.url"}},
690-
{"$unset": ["firstSeedObject", "errors", "behaviorLogs", "config"]},
675+
{"$unset": ["errors", "behaviorLogs", "config"]},
691676
{"$set": {"activeQAStats": "$qa.stats"}},
692677
{
693678
"$set": {

0 commit comments

Comments
 (0)