Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 50 additions & 8 deletions engine/src/builtins/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ pub struct BuiltinQueue {
}

impl BuiltinQueue {
fn queue_name_from_job_key(key: &str) -> Option<&str> {
key.strip_prefix("queue:")
.and_then(|rest| rest.rsplit_once(":jobs:").map(|(queue, _)| queue))
}

pub fn new(
kv_store: Arc<QueueKvStore>,
pubsub: Arc<BuiltInPubSubLite>,
Expand Down Expand Up @@ -280,14 +285,7 @@ impl BuiltinQueue {
job_keys
.iter()
.filter(|k| k.contains(":jobs:"))
.filter_map(|k| {
let parts: Vec<&str> = k.split(':').collect();
if parts.len() >= 2 {
Some(parts[1].to_string())
} else {
None
}
})
.filter_map(|k| Self::queue_name_from_job_key(k).map(str::to_string))
.collect()
};

Expand Down Expand Up @@ -1538,6 +1536,50 @@ mod tests {
std::fs::remove_dir_all(&dir).unwrap();
}

#[tokio::test]
async fn test_rebuild_from_storage_preserves_queue_names_with_colons() {
let dir = std::env::temp_dir().join(format!("kv_store_{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&dir).unwrap();

let config_json = serde_json::json!({
"store_method": "file_based",
"file_path": dir.to_string_lossy(),
"save_interval_ms": 5
});

let queue_name = "billing:jobs:v2";
let kv_store = make_queue_kv(Some(config_json.clone()));
let pubsub = Arc::new(BuiltInPubSubLite::new(None));
let config = QueueConfig {
max_attempts: 3,
backoff_ms: 1,
..Default::default()
};
let queue = BuiltinQueue::new(kv_store.clone(), pubsub.clone(), config.clone());

let data = serde_json::json!({"key": "value"});
queue.push(queue_name, data, None, None).await;

let job = queue.pop(queue_name).await.unwrap();
queue.nack(queue_name, &job.id, "Test error").await.unwrap();

tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;

drop(queue);
drop(kv_store);

let new_kv_store = make_queue_kv(Some(config_json));
let new_pubsub = Arc::new(BuiltInPubSubLite::new(None));
let new_queue = BuiltinQueue::new(new_kv_store, new_pubsub, config);
new_queue.rebuild_from_storage().await.unwrap();

let popped = new_queue.pop(queue_name).await;
assert!(popped.is_some(), "queue name with ':' should survive rebuild");
assert_eq!(popped.unwrap().id, job.id);

std::fs::remove_dir_all(&dir).unwrap();
}

#[tokio::test]
async fn test_queue_with_full_persistence_and_rebuild() {
let dir = std::env::temp_dir().join(format!("kv_store_{}", uuid::Uuid::new_v4()));
Expand Down