Skip to content

Commit 8107b05

Browse files
ikreymerSuaYoo
andauthored
Profiles: Make browser commit API call idempotent (#2728)
- Fix race condition related to browser commit time - The profile commit request waits for browser to actual finish, and profile saved. This can cause request to time out, resulting in a retry, in which the browser has already been closed. - With these changes, the commit is now 'idempotent' and returns a waiting_for_browser until the profile is actually committed. - On frontend, keep pinging commit endpoint with a timeout while 'waiting_for_browser' is returned, actual committed when endpoint returns profile id. --------- Co-authored-by: sua yoo <[email protected]>
1 parent 3043b67 commit 8107b05

File tree

7 files changed

+233
-116
lines changed

7 files changed

+233
-116
lines changed

backend/btrixcloud/crawlmanager.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ async def run_profile_browser(
3737
baseprofile: str = "",
3838
profile_filename: str = "",
3939
proxy_id: str = "",
40+
profileid: str = "",
4041
) -> str:
4142
"""run browser for profile creation"""
4243

@@ -60,6 +61,7 @@ async def run_profile_browser(
6061
"crawler_image": crawler_image,
6162
"image_pull_policy": image_pull_policy,
6263
"proxy_id": proxy_id or DEFAULT_PROXY_ID,
64+
"profileid": profileid,
6365
}
6466

6567
data = self.templates.env.get_template("profile_job.yaml").render(params)
@@ -365,14 +367,21 @@ async def get_profile_browser_metadata(self, browserid: str) -> dict[str, str]:
365367
except:
366368
return {}
367369

368-
return browser["metadata"]["labels"]
370+
metadata = browser["metadata"]["labels"]
369371

370-
async def ping_profile_browser(self, browserid: str) -> None:
371-
"""return ping profile browser"""
372+
metadata["committing"] = browser.get("spec", {}).get("committing")
373+
374+
return metadata
375+
376+
async def keep_alive_profile_browser(self, browserid: str, committing="") -> None:
377+
"""update profile browser to not expire"""
372378
expire_at = dt_now() + timedelta(seconds=30)
373-
await self._patch_job(
374-
browserid, {"expireTime": date_to_str(expire_at)}, "profilejobs"
375-
)
379+
380+
update = {"expireTime": date_to_str(expire_at)}
381+
if committing:
382+
update["committing"] = committing
383+
384+
await self._patch_job(browserid, update, "profilejobs")
376385

377386
async def rollover_restart_crawl(self, crawl_id: str) -> dict:
378387
"""Rolling restart of crawl by updating restartTime field"""

backend/btrixcloud/profiles.py

Lines changed: 127 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from typing import Optional, TYPE_CHECKING, Any, cast, Dict, List, Tuple
44
from uuid import UUID, uuid4
55
import os
6+
import asyncio
67

78
from urllib.parse import urlencode
89

@@ -61,6 +62,8 @@ class ProfileOps:
6162
browser_fqdn_suffix: str
6263
router: APIRouter
6364

65+
bg_tasks: set
66+
6467
def __init__(self, mdb, orgs, crawl_manager, storage_ops, background_job_ops):
6568
self.profiles = mdb["profiles"]
6669
self.orgs = orgs
@@ -79,6 +82,10 @@ def __init__(self, mdb, orgs, crawl_manager, storage_ops, background_job_ops):
7982

8083
self.crawlconfigs = cast(CrawlConfigOps, None)
8184

85+
# to avoid background tasks being garbage collected
86+
# see: https://stackoverflow.com/a/74059981
87+
self.bg_tasks = set()
88+
8289
def set_crawlconfigs(self, crawlconfigs):
8390
"""set crawlconfigs ops"""
8491
self.crawlconfigs = crawlconfigs
@@ -128,6 +135,7 @@ async def create_new_browser(
128135
baseprofile=prev_profile_id,
129136
profile_filename=prev_profile_path,
130137
proxy_id=proxy_id,
138+
profileid=str(uuid4()),
131139
)
132140

133141
if not browserid:
@@ -166,8 +174,6 @@ async def get_profile_browser_url(
166174

167175
async def ping_profile_browser(self, browserid: str) -> dict[str, Any]:
168176
"""ping profile browser to keep it running"""
169-
await self.crawl_manager.ping_profile_browser(browserid)
170-
171177
json = await self._send_browser_req(browserid, "/ping")
172178

173179
return {"success": True, "origins": json.get("origins") or []}
@@ -182,93 +188,130 @@ async def navigate_profile_browser(
182188

183189
async def commit_to_profile(
184190
self,
191+
metadata: dict,
185192
browser_commit: ProfileCreate,
186193
org: Organization,
187194
user: User,
188-
metadata: dict,
189195
existing_profile: Optional[Profile] = None,
190196
) -> dict[str, Any]:
191-
"""commit profile and shutdown profile browser"""
192-
# pylint: disable=too-many-locals
197+
"""commit to profile async, returning if committed, or waiting"""
198+
profileid = metadata.get("profileid")
199+
if not profileid:
200+
raise HTTPException(status_code=400, detail="browser_not_valid")
193201

194-
now = dt_now()
202+
self.orgs.can_write_data(org, include_time=False)
195203

196-
if existing_profile:
197-
profileid = existing_profile.id
198-
created = existing_profile.created
199-
created_by = existing_profile.createdBy
200-
created_by_name = existing_profile.createdByName
201-
else:
202-
profileid = uuid4()
203-
created = now
204-
created_by = user.id
205-
created_by_name = user.name if user.name else user.email
204+
committing = metadata.get("committing")
205+
if not committing:
206+
self._run_task(
207+
self.do_commit_to_profile(
208+
metadata=metadata,
209+
browser_commit=browser_commit,
210+
org=org,
211+
user=user,
212+
existing_profile=existing_profile,
213+
)
214+
)
206215

207-
filename_data = {"filename": f"profiles/profile-{profileid}.tar.gz"}
216+
if committing == "done":
217+
await self.crawl_manager.delete_profile_browser(browser_commit.browserid)
218+
return {
219+
"added": True,
220+
"id": profileid,
221+
"storageQuotaReached": self.orgs.storage_quota_reached(org),
222+
}
208223

209-
json = await self._send_browser_req(
210-
browser_commit.browserid, "/createProfileJS", "POST", json=filename_data
211-
)
224+
raise HTTPException(status_code=200, detail="waiting_for_browser")
212225

226+
async def do_commit_to_profile(
227+
self,
228+
metadata: dict,
229+
browser_commit: ProfileCreate,
230+
org: Organization,
231+
user: User,
232+
existing_profile: Optional[Profile] = None,
233+
) -> bool:
234+
"""commit profile and shutdown profile browser"""
235+
# pylint: disable=too-many-locals
213236
try:
237+
now = dt_now()
238+
239+
if existing_profile:
240+
profileid = existing_profile.id
241+
created = existing_profile.created
242+
created_by = existing_profile.createdBy
243+
created_by_name = existing_profile.createdByName
244+
else:
245+
profileid = UUID(metadata["profileid"])
246+
created = now
247+
created_by = user.id
248+
created_by_name = user.name if user.name else user.email
249+
250+
filename_data = {"filename": f"profiles/profile-{profileid}.tar.gz"}
251+
252+
json = await self._send_browser_req(
253+
browser_commit.browserid,
254+
"/createProfileJS",
255+
"POST",
256+
json=filename_data,
257+
committing="committing",
258+
)
214259
resource = json["resource"]
215-
except:
216-
# pylint: disable=raise-missing-from
217-
raise HTTPException(status_code=400, detail="browser_not_valid")
218-
219-
await self.crawl_manager.delete_profile_browser(browser_commit.browserid)
220260

221-
# backwards compatibility
222-
file_size = resource.get("size") or resource.get("bytes")
261+
# backwards compatibility
262+
file_size = resource.get("size") or resource.get("bytes")
223263

224-
profile_file = ProfileFile(
225-
hash=resource["hash"],
226-
size=file_size,
227-
filename=resource["path"],
228-
storage=org.storage,
229-
)
264+
profile_file = ProfileFile(
265+
hash=resource["hash"],
266+
size=file_size,
267+
filename=resource["path"],
268+
storage=org.storage,
269+
)
230270

231-
baseid = metadata.get("btrix.baseprofile")
232-
if baseid:
233-
print("baseid", baseid)
234-
baseid = UUID(baseid)
271+
baseid = metadata.get("btrix.baseprofile")
272+
if baseid:
273+
print("baseid", baseid)
274+
baseid = UUID(baseid)
275+
276+
profile = Profile(
277+
id=profileid,
278+
name=browser_commit.name,
279+
description=browser_commit.description,
280+
created=created,
281+
createdBy=created_by,
282+
createdByName=created_by_name,
283+
modified=now,
284+
modifiedBy=user.id,
285+
modifiedByName=user.name if user.name else user.email,
286+
origins=json["origins"],
287+
resource=profile_file,
288+
userid=UUID(metadata.get("btrix.user")),
289+
oid=org.id,
290+
baseid=baseid,
291+
crawlerChannel=browser_commit.crawlerChannel,
292+
proxyId=browser_commit.proxyId,
293+
)
235294

236-
self.orgs.can_write_data(org, include_time=False)
295+
await self.profiles.find_one_and_update(
296+
{"_id": profile.id}, {"$set": profile.to_dict()}, upsert=True
297+
)
237298

238-
profile = Profile(
239-
id=profileid,
240-
name=browser_commit.name,
241-
description=browser_commit.description,
242-
created=created,
243-
createdBy=created_by,
244-
createdByName=created_by_name,
245-
modified=now,
246-
modifiedBy=user.id,
247-
modifiedByName=user.name if user.name else user.email,
248-
origins=json["origins"],
249-
resource=profile_file,
250-
userid=UUID(metadata.get("btrix.user")),
251-
oid=org.id,
252-
baseid=baseid,
253-
crawlerChannel=browser_commit.crawlerChannel,
254-
proxyId=browser_commit.proxyId,
255-
)
299+
await self.background_job_ops.create_replica_jobs(
300+
org.id, profile_file, str(profileid), "profile"
301+
)
256302

257-
await self.profiles.find_one_and_update(
258-
{"_id": profile.id}, {"$set": profile.to_dict()}, upsert=True
259-
)
303+
await self.orgs.inc_org_bytes_stored(org.id, file_size, "profile")
260304

261-
await self.background_job_ops.create_replica_jobs(
262-
org.id, profile_file, str(profileid), "profile"
263-
)
305+
await self.crawl_manager.keep_alive_profile_browser(
306+
browser_commit.browserid, committing="done"
307+
)
264308

265-
await self.orgs.inc_org_bytes_stored(org.id, file_size, "profile")
309+
# pylint: disable=broad-except
310+
except Exception as e:
311+
print("Profile commit failed", e)
312+
return False
266313

267-
return {
268-
"added": True,
269-
"id": str(profile.id),
270-
"storageQuotaReached": self.orgs.storage_quota_reached(org),
271-
}
314+
return True
272315

273316
async def update_profile_metadata(
274317
self, profileid: UUID, update: ProfileUpdate, user: User
@@ -432,8 +475,13 @@ async def _send_browser_req(
432475
path: str,
433476
method: str = "GET",
434477
json: Optional[dict[str, Any]] = None,
478+
committing="",
435479
) -> dict[str, Any]:
436480
"""make request to browser api to get state"""
481+
await self.crawl_manager.keep_alive_profile_browser(
482+
browserid, committing=committing
483+
)
484+
437485
try:
438486
async with aiohttp.ClientSession() as session:
439487
async with session.request(
@@ -443,7 +491,8 @@ async def _send_browser_req(
443491
) as resp:
444492
json = await resp.json()
445493

446-
except Exception:
494+
except Exception as e:
495+
print(e)
447496
# pylint: disable=raise-missing-from
448497
raise HTTPException(status_code=200, detail="waiting_for_browser")
449498

@@ -470,6 +519,12 @@ async def calculate_org_profile_file_storage(self, oid: UUID) -> int:
470519

471520
return total_size
472521

522+
def _run_task(self, func) -> None:
523+
"""add bg tasks to set to avoid premature garbage collection"""
524+
task = asyncio.create_task(func)
525+
self.bg_tasks.add(task)
526+
task.add_done_callback(self.bg_tasks.discard)
527+
473528

474529
# ============================================================================
475530
# pylint: disable=redefined-builtin,invalid-name,too-many-locals,too-many-arguments
@@ -529,7 +584,9 @@ async def commit_browser_to_new(
529584
):
530585
metadata = await browser_get_metadata(browser_commit.browserid, org)
531586

532-
return await ops.commit_to_profile(browser_commit, org, user, metadata)
587+
return await ops.commit_to_profile(
588+
browser_commit=browser_commit, org=org, user=user, metadata=metadata
589+
)
533590

534591
@router.patch("/{profileid}", response_model=UpdatedResponse)
535592
async def commit_browser_to_existing(

backend/test/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -702,7 +702,7 @@ def profile_id(admin_auth_headers, default_org_id, profile_browser_id):
702702

703703
# Create profile
704704
start_time = time.monotonic()
705-
time_limit = 300
705+
time_limit = 30
706706
while True:
707707
try:
708708
r = requests.post(
@@ -793,7 +793,7 @@ def profile_2_id(admin_auth_headers, default_org_id, profile_browser_2_id):
793793

794794
# Create profile
795795
start_time = time.monotonic()
796-
time_limit = 300
796+
time_limit = 30
797797
while True:
798798
try:
799799
r = requests.post(

0 commit comments

Comments
 (0)