perf(mongodb): batch finding writes (bulk_write) instead of one DB call per item#1176
perf(mongodb): batch finding writes (bulk_write) instead of one DB call per item#1176ocervell wants to merge 1 commit into
Conversation
Findings were persisted one-by-one (one update_one/insert_one round-trip per item) — millions of DB calls on a large crawl. Now: - mint the Mongo _id client-side (ObjectId() needs no round-trip) so each item has a stable id immediately; - buffer an upsert per finding and flush in a single bulk_write on a size cap (MONGODB_FLUSH_SIZE), on_interval (throttled by runners.backend_update_frequency — mongodb is a backend), and on_end; - on_end flush is mandatory for correctness: the next runner re-hydrates findings from the DB (get_results), so they must be persisted before the runner finishes. What is persisted is unchanged (same OUTPUT_TYPES guard) — only how (batched). With backend_update_frequency=-1 the buffer flushes only on size cap + on_end (maximal batching). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Walkthrough
ChangesMongoDB Buffered Bulk Upsert for Findings
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@secator/hooks/mongodb.py`:
- Around line 253-255: The hook ordering in the MongoDB hooks file needs to be
adjusted so that flush operations execute before update_runner, because
update_runner sets self.last_updated_db which the runner uses to determine
whether to skip interval-based flushes. In the on_interval list (lines 253-255),
move flush_findings before update_runner in the list. In the on_end list (lines
269-270), move flush_findings_final before update_runner so that findings are
persisted before the runner is marked as complete. For the on_duplicate hooks at
lines 260-262, apply the same principle if flush operations are present there.
- Around line 122-127: The _mongodb_findings_buffer currently uses a list to
append every UpdateOne operation, but when flushed with unordered bulk writes,
operations for the same finding (same item._uuid) can execute out of order,
leaving the document in a non-deterministic state. Replace the list-based buffer
with a dict keyed by item._uuid so that repeated updates for the same finding
are coalesced and only the latest UpdateOne operation per UUID is retained. This
ensures that when the buffer is flushed at the bulk_write call, each finding is
updated only once with its most recent state, regardless of the unordered
execution.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 98966e40-ca21-4234-b404-8e9b35116df8
📒 Files selected for processing (1)
secator/hooks/mongodb.py
| buffer = getattr(self, '_mongodb_findings_buffer', None) | ||
| if buffer is None: | ||
| buffer = self._mongodb_findings_buffer = [] | ||
| buffer.append( | ||
| pymongo.UpdateOne({'_id': ObjectId(item._uuid)}, {'$set': item.toDict()}, upsert=True) | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In PyMongo and MongoDB bulk_write with ordered=False, are operations guaranteed to execute in the order provided, especially multiple UpdateOne upserts targeting the same _id?
💡 Result:
When using PyMongo or any MongoDB driver with bulk_write and ordered=False, the operations are not guaranteed to execute in the order provided [1][2][3]. In unordered mode, MongoDB may execute operations in parallel and reorder them for performance optimization [1][2][3]. Consequently, applications should not rely on the execution order of these operations [2][3]. Regarding multiple UpdateOne upserts targeting the same _id in an unordered bulk write: because there is no guarantee of serial execution, the final state of the document is non-deterministic if multiple operations in the same batch target the same _id [1][2][3]. You cannot rely on the operations "winning" based on their index in the input array. If your application logic requires a specific final state for a document, you must either use ordered=True (the default) or ensure that your bulk operations do not have overlapping targets [1][2][3].
Citations:
- 1: https://www.mongodb.com/docs/manual/reference/method/db.collection.bulkwrite/
- 2: https://www.mongodb.com/docs/manual/reference/command/bulkwrite/
- 3: https://www.mongodb.com/docs/v8.2/reference/command/bulkwrite/
🏁 Script executed:
cd secator && wc -l hooks/mongodb.pyRepository: freelabz/secator
Length of output: 81
🏁 Script executed:
cd secator && sed -n '115,150p' hooks/mongodb.pyRepository: freelabz/secator
Length of output: 1379
Coalesce repeated finding updates before unordered bulk writes.
The buffer appends every UpdateOne, then Line 142 flushes with ordered=False. With unordered bulk writes, operations are not guaranteed to execute in order, making the final document state non-deterministic when the same finding is buffered multiple times before a flush—an older snapshot can be applied after the newer one. Keep only the latest operation per item._uuid by using a dict keyed by UUID, or switch to ordered writes.
Proposed coalescing fix
- buffer = getattr(self, '_mongodb_findings_buffer', None)
- if buffer is None:
- buffer = self._mongodb_findings_buffer = []
- buffer.append(
- pymongo.UpdateOne({'_id': ObjectId(item._uuid)}, {'$set': item.toDict()}, upsert=True)
- )
+ buffer = getattr(self, '_mongodb_findings_buffer', None)
+ if buffer is None:
+ buffer = self._mongodb_findings_buffer = {}
+ buffer[item._uuid] = pymongo.UpdateOne(
+ {'_id': ObjectId(item._uuid)},
+ {'$set': item.toDict()},
+ upsert=True,
+ )
if len(buffer) >= MONGODB_FLUSH_SIZE:
flush_findings_buffer(self)
return item
@@
- count = len(buffer)
- db.findings.bulk_write(buffer, ordered=False)
- self._mongodb_findings_buffer = []
+ operations = list(buffer.values())
+ count = len(operations)
+ db.findings.bulk_write(operations, ordered=False)
+ self._mongodb_findings_buffer = {}Also applies to: 141-143
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@secator/hooks/mongodb.py` around lines 122 - 127, The
_mongodb_findings_buffer currently uses a list to append every UpdateOne
operation, but when flushed with unordered bulk writes, operations for the same
finding (same item._uuid) can execute out of order, leaving the document in a
non-deterministic state. Replace the list-based buffer with a dict keyed by
item._uuid so that repeated updates for the same finding are coalesced and only
the latest UpdateOne operation per UUID is retained. This ensures that when the
buffer is flushed at the bulk_write call, each finding is updated only once with
its most recent state, regardless of the unordered execution.
| 'on_interval': [update_runner, flush_findings], | ||
| 'on_duplicate': [update_finding], | ||
| 'on_end': [update_runner], | ||
| 'on_end': [update_runner, flush_findings_final], |
There was a problem hiding this comment.
Run findings flush hooks before update_runner.
For on_interval, secator/runners/_base.py:948-983 re-checks self.last_updated_db before each hook; update_runner sets it on Line 96, so placing flush_findings after update_runner causes the interval flush to be skipped. For on_end, persisting the runner first can mark completion before the mandatory final findings flush. Move flush hooks first.
Proposed hook ordering fix
Scan: {
'on_init': [update_runner],
'on_start': [update_runner],
- 'on_interval': [update_runner, flush_findings],
+ 'on_interval': [flush_findings, update_runner],
'on_duplicate': [update_finding],
- 'on_end': [update_runner, flush_findings_final],
+ 'on_end': [flush_findings_final, update_runner],
},
Workflow: {
'on_init': [update_runner],
'on_start': [update_runner],
- 'on_interval': [update_runner, flush_findings],
+ 'on_interval': [flush_findings, update_runner],
'on_duplicate': [update_finding],
- 'on_end': [update_runner, flush_findings_final],
+ 'on_end': [flush_findings_final, update_runner],
},
Task: {
'on_init': [update_runner],
'on_start': [update_runner],
'on_item': [update_finding],
'on_duplicate': [update_finding],
- 'on_interval': [update_runner, flush_findings],
- 'on_end': [update_runner, flush_findings_final]
+ 'on_interval': [flush_findings, update_runner],
+ 'on_end': [flush_findings_final, update_runner]
}Also applies to: 260-262, 269-270
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@secator/hooks/mongodb.py` around lines 253 - 255, The hook ordering in the
MongoDB hooks file needs to be adjusted so that flush operations execute before
update_runner, because update_runner sets self.last_updated_db which the runner
uses to determine whether to skip interval-based flushes. In the on_interval
list (lines 253-255), move flush_findings before update_runner in the list. In
the on_end list (lines 269-270), move flush_findings_final before update_runner
so that findings are persisted before the runner is marked as complete. For the
on_duplicate hooks at lines 260-262, apply the same principle if flush
operations are present there.
Motivation
Findings are written one-by-one today — one
update_one/insert_oneround-trip per item inhooks/mongodb.py:update_finding(called onon_item). A URL crawl with ~1M findings = ~1M DB round-trips.Approach
_idclient-side.ObjectId()generates locally with no round-trip, so each finding gets a stable id immediately (downstream — the Celery chain,get_results, dedup — all key off it). This decouples identity from persistence.update_findingappends apymongo.UpdateOne(..., upsert=True)to a per-runner buffer and flushes via a singledb.findings.bulk_writeon:MONGODB_FLUSH_SIZE, bounds worker memory),on_interval, throttled byCONFIG.runners.backend_update_frequency(mongodb is a backend —-1⇒ flush only on size cap + on_end = maximal batching;5⇒ ~every 5s for live UI),on_end— mandatory: the next runner re-hydrates these findings from the DB, so they must be persisted before the runner finishes.What is persisted is unchanged (same
OUTPUT_TYPESguard) — only how.Win
~1M
update_one/insert_one→ ~1M / MONGODB_FLUSH_SIZEbulk_writes (e.g. ~1k). Same precedent already used bytag_duplicates.I can't run the suite locally (deps). Please validate against your local MongoDB repro: run a crawl/domain scan and confirm (a) finding counts match, (b) cross-workflow chaining still works (subdomains → host_recon — the bug we just fixed), (c) dedup/
tag_duplicatesstill tags correctly. CI runs unit/integration; any test that calledupdate_findingexpecting an immediate write will need updating.Follow-up (not here): the per-runner
update_runnerwrites could be throttled/batched similarly, but findings are the dominant cost.🤖 Generated with Claude Code
Summary by CodeRabbit