Skip to content

Commit 0b83c0f

Browse files
committed
Squashed commit of the following:
commit 43c4dc2 Author: sua yoo <[email protected]> Date: Mon Nov 3 09:12:50 2025 -0800 task: Add dedupe form control to workflow (#2932) - Adds new "Deduplication" section to workflows - Allows users to use a collection for deduplication - Various refactors for consistency commit 2fcf6d7 Author: Ilya Kreymer <[email protected]> Date: Tue Oct 28 09:47:25 2025 -0700 Dedup Backend Initial Implementation (#2868) Fixes #2867 The backend implementation involves: Operator - A new CollIndex CRD type, btrix-crds updated to 0.2.0 - Operator that manages the new CRD type, creating a new Redis instance when the index should exist (uses redis_dedupe_memory and redis_dedupe_storage chart values) - dedupe_importer_channel can configure crawler channel for index imports - Operator starts the crawler in 'indexer' mode Workflows & Crawls: - Workflows have a new 'dedupeCollId' field for dedupe while crawling The `dedupeCollId` must also be a collection that the crawl is auto-added to. - There is a new waiting state: `waiting_for_dedupe_index` that is entered if a crawl is starting, but index is not yet ready. - Each crawl has bi-directional links for crawls that it requires for dedupe via `requiresCrawls` and other crawls for which this crawl is required via `requiredByCrawls`. - autoAddCollections automatically updated to always include `dedupeCollId` collection. Collection: - Collection has a new `hasDedupeIndex` field - Items added/removed to/from collection result in marking CollIndex object for updates by updating collItemsUpdatedAt timestamp to trigger a reindex - CollIndex object deleted on collection delete For indexing, dependent on version of crawler from webrecorder/browsertrix-crawler#884 that supports indexing mode. --------- Co-authored-by: Tessa Walsh <[email protected]>
1 parent ca3f226 commit 0b83c0f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+1658
-266
lines changed

backend/btrixcloud/colls.py

Lines changed: 99 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
UserFilePreparer,
5050
MIN_UPLOAD_PART_SIZE,
5151
PublicCollOut,
52+
ResourcesOnly,
5253
)
5354
from .utils import (
5455
dt_now,
@@ -57,6 +58,8 @@
5758
get_origin,
5859
)
5960

61+
from .crawlmanager import CrawlManager
62+
6063
if TYPE_CHECKING:
6164
from .orgs import OrgOps
6265
from .storages import StorageOps
@@ -81,8 +84,16 @@ class CollectionOps:
8184
event_webhook_ops: EventWebhookOps
8285
crawl_ops: CrawlOps
8386
page_ops: PageOps
87+
crawl_manager: CrawlManager
8488

85-
def __init__(self, mdb, storage_ops, orgs, event_webhook_ops):
89+
def __init__(
90+
self,
91+
mdb,
92+
orgs: OrgOps,
93+
storage_ops: StorageOps,
94+
crawl_manager: CrawlManager,
95+
event_webhook_ops: EventWebhookOps,
96+
):
8697
self.collections = mdb["collections"]
8798
self.crawls = mdb["crawls"]
8899
self.crawl_configs = mdb["crawl_configs"]
@@ -91,6 +102,7 @@ def __init__(self, mdb, storage_ops, orgs, event_webhook_ops):
91102

92103
self.orgs = orgs
93104
self.storage_ops = storage_ops
105+
self.crawl_manager = crawl_manager
94106
self.event_webhook_ops = event_webhook_ops
95107

96108
def set_crawl_ops(self, ops):
@@ -141,11 +153,15 @@ async def add_collection(self, oid: UUID, coll_in: CollIn):
141153
access=coll_in.access,
142154
defaultThumbnailName=coll_in.defaultThumbnailName,
143155
allowPublicDownload=coll_in.allowPublicDownload,
156+
hasDedupeIndex=coll_in.hasDedupeIndex,
144157
)
145158
try:
146159
await self.collections.insert_one(coll.to_dict())
147160
org = await self.orgs.get_org_by_id(oid)
148161
await self.clear_org_previous_slugs_matching_slug(slug, org)
162+
# create collection index
163+
if coll.hasDedupeIndex:
164+
await self.crawl_manager.create_coll_index(coll)
149165

150166
if crawl_ids:
151167
await self.crawl_ops.add_to_collection(crawl_ids, coll_id, org)
@@ -194,22 +210,33 @@ async def update_collection(
194210
db_update["$push"] = {"previousSlugs": previous_slug}
195211

196212
try:
197-
result = await self.collections.find_one_and_update(
213+
prev_result = await self.collections.find_one_and_update(
198214
{"_id": coll_id, "oid": org.id},
199215
db_update,
200-
return_document=pymongo.ReturnDocument.AFTER,
216+
return_document=pymongo.ReturnDocument.BEFORE,
201217
)
202218
except pymongo.errors.DuplicateKeyError as err:
203219
# pylint: disable=raise-missing-from
204220
field = get_duplicate_key_error_field(err)
205221
raise HTTPException(status_code=400, detail=f"collection_{field}_taken")
206222

207-
if not result:
223+
if not prev_result:
208224
raise HTTPException(status_code=404, detail="collection_not_found")
209225

210226
if slug_update:
211227
await self.clear_org_previous_slugs_matching_slug(slug_update, org)
212228

229+
# if dedupe index is true, but was false
230+
if update.hasDedupeIndex and not prev_result.get("hasDedupeIndex"):
231+
# get latest coll, create index
232+
coll = await self.get_collection(coll_id, org.id)
233+
await self.crawl_manager.create_coll_index(coll)
234+
235+
# if dedupe is false, but was true
236+
if update.hasDedupeIndex is False and prev_result.get("hasDedupeIndex"):
237+
# delete index -- may need extra restrictions
238+
await self.crawl_manager.delete_coll_index(coll_id)
239+
213240
return {"updated": True}
214241

215242
async def clear_org_previous_slugs_matching_slug(
@@ -221,6 +248,16 @@ async def clear_org_previous_slugs_matching_slug(
221248
{"$pull": {"previousSlugs": slug}},
222249
)
223250

251+
async def get_coll_dedupe_index(self, coll_id: UUID) -> bool:
252+
"""return true/false if collection has dedupe index, or raise"""
253+
result = await self.collections.find_one(
254+
{"_id": coll_id}, projection=["hasDedupeIndex"]
255+
)
256+
if not result:
257+
raise HTTPException(status_code=404, detail="collection_not_found")
258+
259+
return result["hasDedupeIndex"] is True
260+
224261
async def add_crawls_to_collection(
225262
self,
226263
coll_id: UUID,
@@ -229,8 +266,6 @@ async def add_crawls_to_collection(
229266
headers: Optional[dict] = None,
230267
) -> CollOut:
231268
"""Add crawls to collection"""
232-
await self.crawl_ops.add_to_collection(crawl_ids, coll_id, org)
233-
234269
modified = dt_now()
235270
result = await self.collections.find_one_and_update(
236271
{"_id": coll_id},
@@ -240,8 +275,11 @@ async def add_crawls_to_collection(
240275
if not result:
241276
raise HTTPException(status_code=404, detail="collection_not_found")
242277

278+
# do this after checking if collection exists
279+
await self.crawl_ops.add_to_collection(crawl_ids, coll_id, org)
280+
243281
await self.update_collection_counts_and_tags(coll_id)
244-
await self.update_collection_dates(coll_id, org.id)
282+
await self.update_collection_dates(coll_id, org.id, update_index=True)
245283

246284
asyncio.create_task(
247285
self.event_webhook_ops.create_added_to_collection_notification(
@@ -270,7 +308,7 @@ async def remove_crawls_from_collection(
270308
raise HTTPException(status_code=404, detail="collection_not_found")
271309

272310
await self.update_collection_counts_and_tags(coll_id)
273-
await self.update_collection_dates(coll_id, org.id)
311+
await self.update_collection_dates(coll_id, org.id, update_index=True)
274312

275313
asyncio.create_task(
276314
self.event_webhook_ops.create_removed_from_collection_notification(
@@ -294,6 +332,24 @@ async def get_collection_raw(
294332

295333
return result
296334

335+
async def enable_dedupe_index(self, coll_id: UUID):
336+
"""enable dedupe index if it doesn't exist yet"""
337+
result = await self.collections.find_one_and_update(
338+
{"_id": coll_id, "hasDedupeIndex": {"$ne": True}},
339+
{"$set": {"hasDedupeIndex": True}},
340+
return_document=pymongo.ReturnDocument.AFTER,
341+
)
342+
343+
# not changed, nothing to do
344+
if not result:
345+
return False
346+
347+
coll = Collection.from_dict(result)
348+
349+
await self.crawl_manager.create_coll_index(coll)
350+
351+
return True
352+
297353
async def get_collection_raw_by_slug(
298354
self,
299355
coll_slug: str,
@@ -396,6 +452,16 @@ async def get_collection_out(
396452

397453
return CollOut.from_dict(result)
398454

455+
async def get_internal_replay_list(self, coll_id: UUID, oid: UUID) -> ResourcesOnly:
456+
"""get list of internally resolved signed WACZ files"""
457+
org = await self.orgs.get_org_by_id(oid)
458+
resources, _, _ = await self.get_collection_crawl_resources(coll_id, org)
459+
460+
for file_ in resources:
461+
file_.path = self.storage_ops.resolve_internal_access_path(file_.path)
462+
463+
return ResourcesOnly(resources=resources)
464+
399465
async def get_public_collection_out(
400466
self,
401467
coll_id: UUID,
@@ -639,6 +705,9 @@ async def delete_collection(self, coll_id: UUID, org: Organization):
639705
if coll.thumbnail:
640706
await self.delete_thumbnail(coll_id, org)
641707

708+
if coll.hasDedupeIndex:
709+
await self.crawl_manager.delete_coll_index(coll.id)
710+
642711
result = await self.collections.delete_one({"_id": coll_id, "oid": org.id})
643712
if result.deleted_count < 1:
644713
raise HTTPException(status_code=404, detail="collection_not_found")
@@ -740,7 +809,9 @@ async def update_collection_counts_and_tags(self, collection_id: UUID):
740809
},
741810
)
742811

743-
async def update_collection_dates(self, coll_id: UUID, oid: UUID):
812+
async def update_collection_dates(
813+
self, coll_id: UUID, oid: UUID, update_index=False
814+
):
744815
"""Update collection earliest and latest dates from page timestamps"""
745816
# pylint: disable=too-many-locals
746817
coll = await self.get_collection(coll_id, oid)
@@ -749,6 +820,10 @@ async def update_collection_dates(self, coll_id: UUID, oid: UUID):
749820
earliest_ts = None
750821
latest_ts = None
751822

823+
# update_index is set, update dedupe index if it exists
824+
if update_index and coll.hasDedupeIndex:
825+
await self.crawl_manager.update_coll_index(coll_id)
826+
752827
match_query = {
753828
"oid": coll.oid,
754829
"crawl_id": {"$in": crawl_ids},
@@ -783,13 +858,16 @@ async def update_collection_dates(self, coll_id: UUID, oid: UUID):
783858

784859
async def update_crawl_collections(self, crawl_id: str, oid: UUID):
785860
"""Update counts, dates, and modified for all collections in crawl"""
861+
# accessing directly to handle both crawls and uploads
786862
crawl = await self.crawls.find_one({"_id": crawl_id})
787-
crawl_coll_ids = crawl.get("collectionIds")
863+
crawl_coll_ids = crawl.get("collectionIds") or []
788864
modified = dt_now()
789865

790866
for coll_id in crawl_coll_ids:
791867
await self.update_collection_counts_and_tags(coll_id)
792-
await self.update_collection_dates(coll_id, oid)
868+
await self.update_collection_dates(
869+
coll_id, oid, crawl.get("dedupeCollId") != coll_id
870+
)
793871
await self.collections.find_one_and_update(
794872
{"_id": coll_id},
795873
{"$set": {"modified": modified}},
@@ -1000,12 +1078,20 @@ async def calculate_thumbnail_storage(self, oid: UUID) -> int:
10001078
# ============================================================================
10011079
# pylint: disable=too-many-locals
10021080
def init_collections_api(
1003-
app, mdb, orgs, storage_ops, event_webhook_ops, user_dep
1081+
app,
1082+
mdb,
1083+
orgs: OrgOps,
1084+
storage_ops: StorageOps,
1085+
crawl_manager: CrawlManager,
1086+
event_webhook_ops: EventWebhookOps,
1087+
user_dep,
10041088
) -> CollectionOps:
10051089
"""init collections api"""
10061090
# pylint: disable=invalid-name, unused-argument, too-many-arguments
10071091

1008-
colls: CollectionOps = CollectionOps(mdb, storage_ops, orgs, event_webhook_ops)
1092+
colls: CollectionOps = CollectionOps(
1093+
mdb, orgs, storage_ops, crawl_manager, event_webhook_ops
1094+
)
10091095

10101096
org_crawl_dep = orgs.org_crawl_dep
10111097
org_viewer_dep = orgs.org_viewer_dep

backend/btrixcloud/crawlconfigs.py

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,17 @@
44

55
# pylint: disable=too-many-lines
66

7-
from typing import List, Optional, TYPE_CHECKING, cast, Dict, Tuple, Annotated, Union
7+
from typing import (
8+
List,
9+
Optional,
10+
TYPE_CHECKING,
11+
cast,
12+
Dict,
13+
Tuple,
14+
Annotated,
15+
Union,
16+
Any,
17+
)
818

919
import asyncio
1020
import json
@@ -319,6 +329,14 @@ async def add_crawl_config(
319329

320330
first_seed = seeds[0].url
321331

332+
# the dedupe collection id must also be in auto add collections
333+
if config_in.dedupeCollId:
334+
if config_in.autoAddCollections is None:
335+
config_in.autoAddCollections = []
336+
337+
if config_in.dedupeCollId not in config_in.autoAddCollections:
338+
config_in.autoAddCollections.append(config_in.dedupeCollId)
339+
322340
now = dt_now()
323341
crawlconfig = CrawlConfig(
324342
id=uuid4(),
@@ -346,6 +364,7 @@ async def add_crawl_config(
346364
firstSeed=first_seed,
347365
seedCount=seed_count,
348366
shareable=config_in.shareable,
367+
dedupeCollId=config_in.dedupeCollId,
349368
)
350369

351370
if config_in.runNow:
@@ -362,6 +381,9 @@ async def add_crawl_config(
362381
storage_quota_reached = False
363382
exec_mins_quota_reached = False
364383

384+
if config_in.dedupeCollId:
385+
await self.coll_ops.enable_dedupe_index(config_in.dedupeCollId)
386+
365387
if config_in.runNow:
366388
try:
367389
crawl_id = await self.run_now_internal(crawlconfig, org, user)
@@ -605,6 +627,26 @@ async def update_crawl_config(
605627
update.tags is not None
606628
and ",".join(orig_crawl_config.tags) != ",".join(update.tags)
607629
)
630+
631+
metadata_changed = metadata_changed or (
632+
update.dedupeCollId is not None
633+
and update.dedupeCollId != orig_crawl_config.dedupeCollId
634+
)
635+
636+
if isinstance(update.dedupeCollId, UUID):
637+
dedupe_coll_id = update.dedupeCollId
638+
elif update.dedupeCollId == "":
639+
dedupe_coll_id = None
640+
else:
641+
dedupe_coll_id = orig_crawl_config.dedupeCollId
642+
643+
if (
644+
dedupe_coll_id
645+
and update.autoAddCollections is not None
646+
and dedupe_coll_id not in update.autoAddCollections
647+
):
648+
update.autoAddCollections.append(dedupe_coll_id)
649+
608650
metadata_changed = metadata_changed or (
609651
update.autoAddCollections is not None
610652
and sorted(orig_crawl_config.autoAddCollections)
@@ -632,14 +674,22 @@ async def update_crawl_config(
632674
query["modifiedByName"] = user.name
633675
query["modified"] = dt_now()
634676

635-
# if empty str, just clear the profile
677+
# profile - if empty str, just clear the profile
636678
if update.profileid == "":
637679
query["profileid"] = None
638680
# else, ensure its a valid profile
639681
elif update.profileid:
640682
await self.profiles.get_profile(cast(UUID, update.profileid), org)
641683
query["profileid"] = update.profileid
642684

685+
# dedupe - if empty dedupeCollId, clear the coll id
686+
if update.dedupeCollId == "":
687+
query["dedupeCollId"] = None
688+
# else, enable dedupe on collection
689+
if isinstance(update.dedupeCollId, UUID):
690+
query["dedupeCollId"] = update.dedupeCollId
691+
await self.coll_ops.enable_dedupe_index(update.dedupeCollId)
692+
643693
if update.config is not None:
644694
query["config"] = update.config.dict()
645695

@@ -654,10 +704,15 @@ async def update_crawl_config(
654704
query["seedCount"] = len(update.config.seeds)
655705
query["seedFileId"] = None
656706

707+
update_query: dict[str, Any] = {"$set": query, "$inc": {"rev": 1}}
708+
# only add here if not setting autoAddCollections
709+
if dedupe_coll_id and "autoAddCollections" not in query:
710+
update_query["$addToSet"] = {"autoAddCollections": dedupe_coll_id}
711+
657712
# update in db
658713
result = await self.crawl_configs.find_one_and_update(
659714
{"_id": cid, "inactive": {"$ne": True}},
660-
{"$set": query, "$inc": {"rev": 1}},
715+
update_query,
661716
return_document=pymongo.ReturnDocument.AFTER,
662717
)
663718

@@ -1123,6 +1178,10 @@ async def remove_collection_from_all_configs(
11231178
{"$pull": {"autoAddCollections": coll_id}},
11241179
)
11251180

1181+
await self.crawl_configs.update_many(
1182+
{"oid": org.id, "dedupeCollId": coll_id}, {"$set": {"dedupeCollId": None}}
1183+
)
1184+
11261185
async def get_crawl_config_tags(self, org):
11271186
"""get distinct tags from all crawl configs for this org"""
11281187
return await self.crawl_configs.distinct("tags", {"oid": org.id})

0 commit comments

Comments
 (0)