Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 122 additions & 6 deletions engine/src/workers/queue/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// See LICENSE and PATENTS files for details.

use std::{
collections::HashMap,
collections::{HashMap, HashSet},
pin::Pin,
sync::{Arc, RwLock},
};
Expand Down Expand Up @@ -434,16 +434,31 @@ impl QueueWorker {
match self.adapter.list_topics().await {
Ok(topics) => {
let mut dlq_topics = Vec::new();
let mut seen_function_queues = HashSet::new();
for topic in &topics {
let dlq_count = self.adapter.dlq_count(&topic.name).await.unwrap_or(0);
dlq_topics.push(json!({
"topic": topic.name,
"broker_type": topic.broker_type,
"message_count": dlq_count,
}));
if let Some(stripped) = topic.name.strip_prefix("__fn_queue::") {
if !seen_function_queues.insert(stripped.to_string()) {
continue;
}
dlq_topics.push(json!({
"topic": stripped,
"broker_type": "function_queue",
"message_count": dlq_count,
}));
} else {
dlq_topics.push(json!({
"topic": topic.name.clone(),
"broker_type": topic.broker_type.clone(),
"message_count": dlq_count,
}));
}
}
// Also include function queue DLQs
for name in self._config.queue_configs.keys() {
if !seen_function_queues.insert(name.clone()) {
continue;
}
let namespaced = format!("__fn_queue::{}", name);
let dlq_count = self.adapter.dlq_count(&namespaced).await.unwrap_or(0);
dlq_topics.push(json!({
Expand Down Expand Up @@ -1739,6 +1754,45 @@ mod tests {
);
}

#[tokio::test]
async fn integration_console_dlq_topics_deduplicates_builtin_function_queue_topics() {
let (_engine, module, adapter) = setup_integration_module().await;
let default_config = module
._config
.queue_configs
.get("default")
.expect("default queue config should exist")
.clone();

adapter
.setup_function_queue("default", &default_config)
.await
.expect("builtin function queue setup should succeed");

let result = module.console_dlq_topics(json!({})).await;
match result {
FunctionResult::Success(Some(val)) => {
let topics: Vec<Value> = serde_json::from_value(val).unwrap();
let default_count = topics.iter().filter(|t| t["topic"] == "default").count();
assert_eq!(
default_count, 1,
"function queue topics should not be duplicated"
);
assert!(
topics
.iter()
.any(|t| t["topic"] == "default" && t["broker_type"] == "function_queue"),
"builtin function queue should be normalized to function_queue"
);
assert!(
topics.iter().all(|t| t["topic"] != "__fn_queue::default"),
"raw builtin function queue topic should not leak into console output"
);
}
_ => panic!("Expected Success with DLQ topics"),
}
}

#[tokio::test]
async fn integration_function_failure_nacks() {
let (engine, module, adapter) = setup_integration_module().await;
Expand Down Expand Up @@ -2319,6 +2373,68 @@ mod tests {
}
}

#[tokio::test]
async fn console_dlq_topics_deduplicates_function_queues_reported_by_adapter() {
let (_engine, module, adapter) = setup_queue_module_with_configs();
*adapter.list_topics_result.lock().await = vec![TopicInfo {
name: "__fn_queue::default".to_string(),
broker_type: "builtin".to_string(),
subscriber_count: 0,
}];

let result = module.console_dlq_topics(json!({})).await;
match result {
FunctionResult::Success(Some(val)) => {
let topics: Vec<Value> = serde_json::from_value(val).unwrap();
let default_count = topics.iter().filter(|t| t["topic"] == "default").count();
assert_eq!(
default_count, 1,
"function queue topics should not be duplicated"
);
}
_ => panic!("Expected Success with DLQ topics"),
}
}

#[tokio::test]
async fn console_dlq_topics_keeps_regular_topics_with_function_queue_name() {
let (_engine, module, adapter) = setup_queue_module_with_configs();
*adapter.list_topics_result.lock().await = vec![TopicInfo {
name: "default".to_string(),
broker_type: "builtin".to_string(),
subscriber_count: 0,
}];

let result = module.console_dlq_topics(json!({})).await;
match result {
FunctionResult::Success(Some(val)) => {
let topics: Vec<Value> = serde_json::from_value(val).unwrap();
let default_topics: Vec<&Value> =
topics.iter().filter(|t| t["topic"] == "default").collect();
assert_eq!(
default_topics.len(),
2,
"regular topic and function queue should both be listed"
);
assert!(
default_topics.iter().any(|t| t["broker_type"] == "builtin"),
"regular topic should keep its original broker type"
);
assert!(
default_topics
.iter()
.any(|t| t["broker_type"] == "function_queue"),
"configured function queue should still be included"
);
assert!(
topics.iter().all(|t| t["topic"] != "__fn_queue::default"),
"raw builtin function queue topic should not be included alongside normalized output"
);
}
_ => panic!("Expected Success with DLQ topics"),
}
}

#[tokio::test]
async fn console_dlq_topics_adapter_error() {
let (_engine, module, adapter) = setup_queue_module();
Expand Down
Loading