Skip to content

Deduplicate function queue DLQ topics#1421

Open
abishekgiri wants to merge 7 commits intoiii-hq:mainfrom
abishekgiri:dlq-topic-deduplication
Open

Deduplicate function queue DLQ topics#1421
abishekgiri wants to merge 7 commits intoiii-hq:mainfrom
abishekgiri:dlq-topic-deduplication

Conversation

@abishekgiri
Copy link
Copy Markdown
Contributor

@abishekgiri abishekgiri commented Apr 7, 2026

Summary

  • normalize function queue topics reported by the adapter before building the console DLQ topics response
  • deduplicate adapter-reported function queues against configured function queues so the same DLQ topic is not returned twice
  • add a regression test covering the built-in adapter case

Testing

  • cargo test -p iii console_dlq_topics_deduplicates_function_queues_reported_by_adapter -- --nocapture

Summary by CodeRabbit

  • Bug Fixes
    • Prevented duplicate Dead Letter Queue (DLQ) entries by normalizing adapter-reported function-queue names (strip internal prefix) and deduplicating; function-queue entries are now labeled as broker_type "function_queue" while regular topics remain distinct.
  • Tests
    • Added integration and unit tests to verify deduplication, normalization, and that normalized names appear once without leaking raw adapter labels.

@vercel
Copy link
Copy Markdown
Contributor

vercel bot commented Apr 7, 2026

@abishekgiri is attempting to deploy a commit to the motia Team on Vercel.

A member of the Team first needs to authorize it.

@abishekgiri
Copy link
Copy Markdown
Contributor Author

Updated the console DLQ topics response so adapter-reported function queues are normalized to the same display name as configured function queues before the list is assembled. That prevents duplicate entries for the same function queue and adds a regression test for the built-in adapter path.

@abishekgiri abishekgiri marked this pull request as ready for review April 7, 2026 00:42
@abishekgiri abishekgiri marked this pull request as draft April 7, 2026 00:42
@abishekgiri abishekgiri marked this pull request as ready for review April 7, 2026 00:57
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 8, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: c4848a81-1f5d-42b9-a2f3-99e8f2a946a3

📥 Commits

Reviewing files that changed from the base of the PR and between a0f63a6 and 6c04a9a.

📒 Files selected for processing (1)
  • engine/src/workers/queue/queue.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • engine/src/workers/queue/queue.rs

📝 Walkthrough

Walkthrough

QueueWorker::console_dlq_topics now normalizes adapter-reported function-queue topic names by stripping the __fn_queue:: prefix, emits those entries with broker_type: "function_queue", and deduplicates function-queue DLQs so adapter-reported and configured entries are not duplicated.

Changes

Cohort / File(s) Summary
DLQ Topic Handling
engine/src/workers/queue/queue.rs
Normalize adapter-reported function-queue topics by removing the __fn_queue:: prefix, set broker_type to "function_queue" for those entries, and deduplicate using a HashSet (seen_function_queues).
Tests (added/updated)
engine/src/workers/queue/*_test.rs, engine/tests/...
Added/updated unit and integration tests to verify deduplication and normalization (adapter reports __fn_queue::default vs configured default, ensure normalized entry appears once and raw __fn_queue::... does not leak).

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Suggested reviewers

  • ytallo

Poem

🐰 I twitch my nose and hop along the queue,
I strip prefixes clean and skip repeats too.
Function queues tidy, broker types aligned,
No double-defaults left behind. 🥕

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Deduplicate function queue DLQ topics' directly and accurately summarizes the main change: deduplicating and normalizing DLQ topic entries for function queues.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
engine/src/workers/queue/queue.rs (1)

437-459: Narrow deduplication to function-queue entries only

seen is keyed by display_name for every topic, so a regular topic and a function queue with the same visible name can collapse into one entry (order-dependent). This is broader than the PR intent (dedupe adapter-reported function queues vs configured function queues).

Proposed refactor
-                let mut seen = HashSet::new();
+                let mut seen_function_queues = HashSet::new();
                 for topic in &topics {
-                    let (display_name, broker_type) =
-                        if let Some(stripped) = topic.name.strip_prefix("__fn_queue::") {
-                            (stripped.to_string(), "function_queue".to_string())
-                        } else {
-                            (topic.name.clone(), topic.broker_type.clone())
-                        };
-                    if !seen.insert(display_name.clone()) {
-                        continue;
-                    }
-                    let dlq_count = self.adapter.dlq_count(&topic.name).await.unwrap_or(0);
-                    dlq_topics.push(json!({
-                        "topic": display_name,
-                        "broker_type": broker_type,
-                        "message_count": dlq_count,
-                    }));
+                    if let Some(stripped) = topic.name.strip_prefix("__fn_queue::") {
+                        if !seen_function_queues.insert(stripped.to_string()) {
+                            continue;
+                        }
+                        let dlq_count = self.adapter.dlq_count(&topic.name).await.unwrap_or(0);
+                        dlq_topics.push(json!({
+                            "topic": stripped,
+                            "broker_type": "function_queue",
+                            "message_count": dlq_count,
+                        }));
+                    } else {
+                        let dlq_count = self.adapter.dlq_count(&topic.name).await.unwrap_or(0);
+                        dlq_topics.push(json!({
+                            "topic": topic.name,
+                            "broker_type": topic.broker_type,
+                            "message_count": dlq_count,
+                        }));
+                    }
                 }
                 // Also include function queue DLQs
                 for name in self._config.queue_configs.keys() {
-                    if !seen.insert(name.clone()) {
+                    if !seen_function_queues.insert(name.clone()) {
                         continue;
                     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@engine/src/workers/queue/queue.rs` around lines 437 - 459, The current dedup
uses seen keyed by display_name for all topics (variables: seen, display_name,
broker_type, topics, dlq_topics), which collapses regular topics and
function_queue entries with the same visible name; restrict dedup to function
queues only by changing the dedup key or logic: either make seen track
(display_name, broker_type) so non-function brokers are distinct, or keep seen
as-is but only insert/check it when broker_type == "function_queue" during the
topics loop (and when iterating self._config.queue_configs.keys() only consult
seen for configured function queues), ensuring dlq_count lookup
(self.adapter.dlq_count(&topic.name)) and dlq_topics population remain unchanged
for non-function topics.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@engine/src/workers/queue/queue.rs`:
- Around line 2335-2353: The test
console_dlq_topics_deduplicates_function_queues_reported_by_adapter currently
uses the mock setup_queue_module_with_configs() and so doesn't exercise the
built-in adapter path; update the test (or add a new test) to use
setup_integration_module() (or call it alongside the existing mock setup) and
seed its built-in adapter topics with an entry whose name starts with
"__fn_queue::" (similar to the existing TopicInfo) so console_dlq_topics(...)
runs through the builtin adapter code path; ensure you still assert that the
deduplicated "default" topic appears exactly once.

---

Nitpick comments:
In `@engine/src/workers/queue/queue.rs`:
- Around line 437-459: The current dedup uses seen keyed by display_name for all
topics (variables: seen, display_name, broker_type, topics, dlq_topics), which
collapses regular topics and function_queue entries with the same visible name;
restrict dedup to function queues only by changing the dedup key or logic:
either make seen track (display_name, broker_type) so non-function brokers are
distinct, or keep seen as-is but only insert/check it when broker_type ==
"function_queue" during the topics loop (and when iterating
self._config.queue_configs.keys() only consult seen for configured function
queues), ensuring dlq_count lookup (self.adapter.dlq_count(&topic.name)) and
dlq_topics population remain unchanged for non-function topics.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 959638a5-c3ec-4e47-be46-07d7e567fec9

📥 Commits

Reviewing files that changed from the base of the PR and between b31d5dd and d03414b.

📒 Files selected for processing (1)
  • engine/src/workers/queue/queue.rs

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
engine/src/workers/queue/queue.rs (1)

444-468: ⚠️ Potential issue | 🟠 Major

Return a stable DLQ key; topic is no longer unique.

This can now legitimately emit both {"topic":"default","broker_type":"builtin"} and {"topic":"default","broker_type":"function_queue"}. The follow-up handlers in this file still resolve DLQ actions by name only, so once a configured function queue shares that name, the regular topic entry becomes impossible to browse/redrive/discard.

Suggested direction
                         dlq_topics.push(json!({
+                            "queue_key": topic.name.clone(),
                             "topic": stripped,
                             "broker_type": "function_queue",
                             "message_count": dlq_count,
                         }));
                     } else {
                         dlq_topics.push(json!({
+                            "queue_key": topic.name.clone(),
                             "topic": topic.name.clone(),
                             "broker_type": topic.broker_type.clone(),
                             "message_count": dlq_count,
                         }));
                     }
@@
                     dlq_topics.push(json!({
+                        "queue_key": namespaced.clone(),
                         "topic": name,
                         "broker_type": "function_queue",
                         "message_count": dlq_count,
                     }));

Then have console_dlq_messages / redrive* / discard_message prefer queue_key instead of re-deriving from the display name.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@engine/src/workers/queue/queue.rs` around lines 444 - 468, The DLQ entries
currently emit a non-unique "topic" display name; change the DLQ JSON created
where dlq_topics.push(...) is called (both the topic.branch and function queue
loop) to include a stable unique key (e.g., "queue_key") that encodes broker
type or the namespaced topic (for function queues use the namespaced value like
"__fn_queue::{name}" and for builtins use the original topic identifier), and
then update the DLQ handlers console_dlq_messages, redrive*
(redrive_topic/redrive_message or similarly named functions), and
discard_message to prefer this "queue_key" when resolving actions instead of
deriving from the human-facing "topic" field so function queues and builtin
topics with the same display name remain distinct.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@engine/src/workers/queue/queue.rs`:
- Around line 1775-1786: The test currently only counts entries with t["topic"]
== "default" and can be fooled by coexistence of a raw "__fn_queue::default"
plus a normalized "default"; update the assertions around the topics Vec
(variable topics and default_count) to either assert the exact normalized shape
and count for the function queue (e.g., one entry with topic == "default" and
broker_type == "function_queue") and assert that no entry exists with a prefixed
topic (t["topic"] == "__fn_queue::default"), or replace the loose checks with a
single equality check against the expected normalized JSON entry list; make the
same changes in the other test block referenced (around the similar code at
lines 2384-2389) so the normalization path is strictly enforced.

---

Outside diff comments:
In `@engine/src/workers/queue/queue.rs`:
- Around line 444-468: The DLQ entries currently emit a non-unique "topic"
display name; change the DLQ JSON created where dlq_topics.push(...) is called
(both the topic.branch and function queue loop) to include a stable unique key
(e.g., "queue_key") that encodes broker type or the namespaced topic (for
function queues use the namespaced value like "__fn_queue::{name}" and for
builtins use the original topic identifier), and then update the DLQ handlers
console_dlq_messages, redrive* (redrive_topic/redrive_message or similarly named
functions), and discard_message to prefer this "queue_key" when resolving
actions instead of deriving from the human-facing "topic" field so function
queues and builtin topics with the same display name remain distinct.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: b4f3f961-f0ab-45e3-8ff9-56c31a074a6b

📥 Commits

Reviewing files that changed from the base of the PR and between d03414b and a0f63a6.

📒 Files selected for processing (1)
  • engine/src/workers/queue/queue.rs

@abishekgiri
Copy link
Copy Markdown
Contributor Author

Addressed the remaining DLQ topic normalization review point in the latest push.

I tightened both regression tests so they now verify the normalized default entry is present and that the raw __fn_queue::default entry does not appear in the console output.

Validated with:

  • cargo fmt --all
  • cargo test -p iii console_dlq_topics_ -- --nocapture
  • cargo test -p iii integration_console_dlq_topics_deduplicates_builtin_function_queue_topics -- --nocapture

Latest commit: 6c04a9a2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant