diff --git a/engine/src/workers/queue/adapters/rabbitmq/adapter.rs b/engine/src/workers/queue/adapters/rabbitmq/adapter.rs index e11a082a5..b838be1ae 100644 --- a/engine/src/workers/queue/adapters/rabbitmq/adapter.rs +++ b/engine/src/workers/queue/adapters/rabbitmq/adapter.rs @@ -215,6 +215,10 @@ fn delivery_stable_id(delivery: &Delivery) -> String { format!("dlq-{:016x}", hasher.finish()) } +fn subscription_topic(key: &str) -> Option<&str> { + key.rsplit_once(':').map(|(topic, _)| topic) +} + impl Clone for RabbitMQAdapter { fn clone(&self) -> Self { Self { @@ -1194,7 +1198,7 @@ impl QueueAdapter for RabbitMQAdapter { let consumer_count = { let subs = self.subscriptions.read().await; subs.keys() - .filter(|key| key.split(':').next().map(|t| t == topic).unwrap_or(false)) + .filter(|key| subscription_topic(key).map(|t| t == topic).unwrap_or(false)) .count() as u64 }; @@ -1210,8 +1214,7 @@ impl QueueAdapter for RabbitMQAdapter { let subs = self.subscriptions.read().await; let mut topics: HashMap = HashMap::new(); for key in subs.keys() { - // subscription key format is "topic:id" - if let Some(topic) = key.split(':').next() { + if let Some(topic) = subscription_topic(key) { *topics.entry(topic.to_string()).or_insert(0u64) += 1; } } @@ -1227,3 +1230,16 @@ impl QueueAdapter for RabbitMQAdapter { } crate::register_adapter!( name: "rabbitmq", make_adapter); + +#[cfg(test)] +mod tests { + use super::subscription_topic; + + #[test] + fn subscription_topic_preserves_topics_with_colons() { + assert_eq!( + subscription_topic("orders:v1:123e4567-e89b-12d3-a456-426614174000"), + Some("orders:v1") + ); + } +}