diff --git a/engine/src/builtins/queue.rs b/engine/src/builtins/queue.rs index b73f28e3d..f611a21a6 100644 --- a/engine/src/builtins/queue.rs +++ b/engine/src/builtins/queue.rs @@ -212,6 +212,11 @@ pub struct BuiltinQueue { } impl BuiltinQueue { + fn queue_name_from_job_key(key: &str) -> Option<&str> { + key.strip_prefix("queue:") + .and_then(|rest| rest.rsplit_once(":jobs:").map(|(queue, _)| queue)) + } + pub fn new( kv_store: Arc, pubsub: Arc, @@ -280,14 +285,7 @@ impl BuiltinQueue { job_keys .iter() .filter(|k| k.contains(":jobs:")) - .filter_map(|k| { - let parts: Vec<&str> = k.split(':').collect(); - if parts.len() >= 2 { - Some(parts[1].to_string()) - } else { - None - } - }) + .filter_map(|k| Self::queue_name_from_job_key(k).map(str::to_string)) .collect() }; @@ -1538,6 +1536,50 @@ mod tests { std::fs::remove_dir_all(&dir).unwrap(); } + #[tokio::test] + async fn test_rebuild_from_storage_preserves_queue_names_with_colons() { + let dir = std::env::temp_dir().join(format!("kv_store_{}", uuid::Uuid::new_v4())); + std::fs::create_dir_all(&dir).unwrap(); + + let config_json = serde_json::json!({ + "store_method": "file_based", + "file_path": dir.to_string_lossy(), + "save_interval_ms": 5 + }); + + let queue_name = "billing:jobs:v2"; + let kv_store = make_queue_kv(Some(config_json.clone())); + let pubsub = Arc::new(BuiltInPubSubLite::new(None)); + let config = QueueConfig { + max_attempts: 3, + backoff_ms: 1, + ..Default::default() + }; + let queue = BuiltinQueue::new(kv_store.clone(), pubsub.clone(), config.clone()); + + let data = serde_json::json!({"key": "value"}); + queue.push(queue_name, data, None, None).await; + + let job = queue.pop(queue_name).await.unwrap(); + queue.nack(queue_name, &job.id, "Test error").await.unwrap(); + + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + drop(queue); + drop(kv_store); + + let new_kv_store = make_queue_kv(Some(config_json)); + let new_pubsub = Arc::new(BuiltInPubSubLite::new(None)); + let new_queue = BuiltinQueue::new(new_kv_store, new_pubsub, config); + new_queue.rebuild_from_storage().await.unwrap(); + + let popped = new_queue.pop(queue_name).await; + assert!(popped.is_some(), "queue name with ':' should survive rebuild"); + assert_eq!(popped.unwrap().id, job.id); + + std::fs::remove_dir_all(&dir).unwrap(); + } + #[tokio::test] async fn test_queue_with_full_persistence_and_rebuild() { let dir = std::env::temp_dir().join(format!("kv_store_{}", uuid::Uuid::new_v4()));