From 6bb468ca5d9cdab9658b538773c499cc74f5ae06 Mon Sep 17 00:00:00 2001 From: mdecimus Date: Tue, 14 Jan 2025 16:49:28 +0100 Subject: [PATCH] Added queue backpressure event --- crates/smtp/src/inbound/rcpt.rs | 9 ++++- crates/smtp/src/queue/manager.rs | 39 +++++++++++++++++--- crates/trc/src/event/description.rs | 4 +++ crates/trc/src/event/level.rs | 1 + crates/trc/src/lib.rs | 1 + crates/trc/src/serializers/binary.rs | 4 ++- tests/src/smtp/mod.rs | 43 ++++++++++++++++++---- tests/src/smtp/queue/concurrent.rs | 54 +++++++++++++++++++++++----- 8 files changed, 133 insertions(+), 22 deletions(-) diff --git a/crates/smtp/src/inbound/rcpt.rs b/crates/smtp/src/inbound/rcpt.rs index 5c11b1f36..5eea7adbf 100644 --- a/crates/smtp/src/inbound/rcpt.rs +++ b/crates/smtp/src/inbound/rcpt.rs @@ -26,8 +26,15 @@ impl Session { if self.instance.id.ends_with("-debug") { if to.address.contains("fail@") { return self.write(b"503 5.5.1 Invalid recipient.\r\n").await; - } else if to.address.contains("delay@") { + } else if (to.address.contains("delay-random@") && rand::random()) + || to.address.contains("delay@") + { return self.write(b"451 4.5.3 Try again later.\r\n").await; + } else if to.address.contains("slow@") { + tokio::time::sleep(std::time::Duration::from_secs( + rand::random::() % 5 + 5, + )) + .await; } } diff --git a/crates/smtp/src/queue/manager.rs b/crates/smtp/src/queue/manager.rs index d0fdbedf5..ee8522060 100644 --- a/crates/smtp/src/queue/manager.rs +++ b/crates/smtp/src/queue/manager.rs @@ -32,6 +32,7 @@ pub struct Queue { pub rx: mpsc::Receiver, } +#[derive(Debug)] pub enum OnHold { InFlight, ConcurrencyLimited { @@ -52,6 +53,7 @@ impl SpawnQueue for mpsc::Receiver { } const CLEANUP_INTERVAL: Duration = Duration::from_secs(10 * 60); +const BACK_PRESSURE_WARN_INTERVAL: Duration = Duration::from_secs(60); impl Queue { pub fn new(core: Arc, rx: mpsc::Receiver) -> Self { @@ -66,6 +68,7 @@ impl Queue { pub async fn start(&mut self) { let mut is_paused = false; let mut next_cleanup = Instant::now() + CLEANUP_INTERVAL; + let mut last_backpressure_warning = Instant::now() - BACK_PRESSURE_WARN_INTERVAL; let mut in_flight_count = 0; let mut has_back_pressure = false; @@ -129,6 +132,25 @@ impl Queue { let max_in_flight = server.core.smtp.queue.throttle.outbound_concurrency; has_back_pressure = in_flight_count >= max_in_flight; if has_back_pressure { + self.next_wake_up = Instant::now() + Duration::from_secs(QUEUE_REFRESH); + + if last_backpressure_warning.elapsed() >= BACK_PRESSURE_WARN_INTERVAL { + let queue_events = server.next_event().await; + last_backpressure_warning = Instant::now(); + trc::event!( + Queue(trc::QueueEvent::BackPressure), + Reason = + "Queue outbound processing capacity for this node exceeded.", + Total = queue_events.len(), + Details = self + .on_hold + .keys() + .copied() + .map(trc::Value::from) + .collect::>(), + Limit = max_in_flight, + ); + } continue; } @@ -146,11 +168,18 @@ impl Queue { // Enforce global concurrency limits if in_flight_count >= max_in_flight { has_back_pressure = true; - trc::event!( - Queue(trc::QueueEvent::ConcurrencyLimitExceeded), - Details = "Outbound concurrency limit exceeded.", - Limit = max_in_flight, - ); + if last_backpressure_warning.elapsed() + >= BACK_PRESSURE_WARN_INTERVAL + { + last_backpressure_warning = Instant::now(); + trc::event!( + Queue(trc::QueueEvent::BackPressure), + Reason = "Queue outbound processing capacity for this node exceeded.", + Total = queue_events.len(), + Details = self.on_hold.keys().copied().map(trc::Value::from).collect::>(), + Limit = max_in_flight, + ); + } break; } diff --git a/crates/trc/src/event/description.rs b/crates/trc/src/event/description.rs index 2314fa251..eacb9afe9 100644 --- a/crates/trc/src/event/description.rs +++ b/crates/trc/src/event/description.rs @@ -720,6 +720,7 @@ impl QueueEvent { QueueEvent::QueueReport => "Queued report for delivery", QueueEvent::QueueDsn => "Queued DSN for delivery", QueueEvent::QueueAutogenerated => "Queued autogenerated message for delivery", + QueueEvent::BackPressure => "Queue backpressure detected", } } @@ -738,6 +739,9 @@ impl QueueEvent { QueueEvent::QueueReport => "A new report was queued for delivery", QueueEvent::QueueDsn => "A delivery status notification was queued for delivery", QueueEvent::QueueAutogenerated => "A system generated message was queued for delivery", + QueueEvent::BackPressure => { + "Queue congested, processing can't keep up with incoming message rate" + } } } } diff --git a/crates/trc/src/event/level.rs b/crates/trc/src/event/level.rs index 9c6633dff..67ca665ca 100644 --- a/crates/trc/src/event/level.rs +++ b/crates/trc/src/event/level.rs @@ -459,6 +459,7 @@ impl EventType { DeliveryEvent::RawInput | DeliveryEvent::RawOutput => Level::Trace, }, EventType::Queue(event) => match event { + QueueEvent::BackPressure => Level::Warn, QueueEvent::QueueMessage | QueueEvent::QueueMessageAuthenticated | QueueEvent::QueueReport diff --git a/crates/trc/src/lib.rs b/crates/trc/src/lib.rs index 688a119d8..fbf04f599 100644 --- a/crates/trc/src/lib.rs +++ b/crates/trc/src/lib.rs @@ -488,6 +488,7 @@ pub enum QueueEvent { RateLimitExceeded, ConcurrencyLimitExceeded, QuotaExceeded, + BackPressure, } #[event_type] diff --git a/crates/trc/src/serializers/binary.rs b/crates/trc/src/serializers/binary.rs index de6ab99cc..53c89b0e1 100644 --- a/crates/trc/src/serializers/binary.rs +++ b/crates/trc/src/serializers/binary.rs @@ -864,6 +864,7 @@ impl EventType { EventType::Spam(SpamEvent::Dnsbl) => 562, EventType::Spam(SpamEvent::DnsblError) => 563, EventType::Spam(SpamEvent::Pyzor) => 564, + EventType::Queue(QueueEvent::BackPressure) => 48, } } @@ -1465,12 +1466,13 @@ impl EventType { 562 => Some(EventType::Spam(SpamEvent::Dnsbl)), 563 => Some(EventType::Spam(SpamEvent::DnsblError)), 564 => Some(EventType::Spam(SpamEvent::Pyzor)), + 48 => Some(EventType::Queue(QueueEvent::BackPressure)), _ => None, } } } -// 57 48 147 148 335 336 376 458 459 +// 57 147 148 335 336 376 458 459 impl Key { fn code(&self) -> u64 { diff --git a/tests/src/smtp/mod.rs b/tests/src/smtp/mod.rs index 20e28ebf0..285c45ecf 100644 --- a/tests/src/smtp/mod.rs +++ b/tests/src/smtp/mod.rs @@ -137,15 +137,34 @@ cert = '%{file:{CERT}}%' private-key = '%{file:{PK}}%' [storage] -data = "rocksdb" -lookup = "rocksdb" -blob = "rocksdb" -fts = "rocksdb" +data = "{STORE}" +fts = "{STORE}" +blob = "{STORE}" +lookup = "{STORE}" [store."rocksdb"] type = "rocksdb" path = "{TMP}/queue.db" +[store."foundationdb"] +type = "foundationdb" + +[store."postgresql"] +type = "postgresql" +host = "localhost" +port = 5432 +database = "stalwart" +user = "postgres" +password = "mysecretpassword" + +[store."mysql"] +type = "mysql" +host = "localhost" +port = 3307 +database = "stalwart" +user = "root" +password = "password" + "#; impl TestSMTP { @@ -198,9 +217,21 @@ impl TestSMTP { } pub async fn new(name: &str, config: impl AsRef) -> TestSMTP { + Self::with_database(name, config, "rocksdb").await + } + + pub async fn with_database( + name: &str, + config: impl AsRef, + store_id: impl AsRef, + ) -> TestSMTP { let temp_dir = TempDir::new(name, true); - let mut config = - Config::new(temp_dir.update_config(add_test_certs(CONFIG) + config.as_ref())).unwrap(); + let mut config = Config::new( + temp_dir + .update_config(add_test_certs(CONFIG) + config.as_ref()) + .replace("{STORE}", store_id.as_ref()), + ) + .unwrap(); config.resolve_all_macros().await; let stores = Stores::parse_all(&mut config, false).await; let core = Core::parse(&mut config, stores, Default::default()).await; diff --git a/tests/src/smtp/queue/concurrent.rs b/tests/src/smtp/queue/concurrent.rs index 5c9afcf6f..405551a0f 100644 --- a/tests/src/smtp/queue/concurrent.rs +++ b/tests/src/smtp/queue/concurrent.rs @@ -23,8 +23,12 @@ relay = true messages = 2000 [queue.outbound] -concurrency = 1 +concurrency = 4 +[queue.schedule] +retry = "1s" +notify = "1d" +expire = "1d" "#; const REMOTE: &str = r#" @@ -39,7 +43,10 @@ enable = false "#; -#[tokio::test] +const NUM_MESSAGES: usize = 1000; +const NUM_QUEUES: usize = 10; + +#[tokio::test(flavor = "multi_thread", worker_threads = 18)] #[serial_test::serial] async fn concurrent_queue() { // Enable logging @@ -49,7 +56,7 @@ async fn concurrent_queue() { let remote = TestSMTP::new("smtp_concurrent_queue_remote", REMOTE).await; let _rx = remote.start(&[ServerProtocol::Smtp]).await; - let local = TestSMTP::new("smtp_concurrent_queue_local", LOCAL).await; + let local = TestSMTP::with_database("smtp_concurrent_queue_local", LOCAL, "mysql").await; // Add mock DNS entries let core = local.build_smtp(); @@ -72,9 +79,9 @@ async fn concurrent_queue() { session.eval_session_params().await; session.ehlo("mx.test.org").await; - // Spawn 20 concurrent queues + // Spawn concurrent queues let mut inners = vec![]; - for _ in 0..20 { + for _ in 0..NUM_QUEUES { let (inner, rxs) = local.inner_with_rxs(); let server = inner.build_server(); server.mx_add( @@ -99,9 +106,9 @@ async fn concurrent_queue() { tokio::time::sleep(Duration::from_millis(200)).await; // Send 1000 test messages - for _ in 0..100 { + for _ in 0..(NUM_MESSAGES / 2) { session - .send_message("john@test.org", &["bill@foobar.org"], "test:no_dkim", "250") + .send_message("john@test.org", &["slow@foobar.org"], "test:no_dkim", "250") .await; } @@ -109,12 +116,41 @@ async fn concurrent_queue() { for inner in &inners { inner.ipc.queue_tx.send(QueueEvent::Refresh).await.unwrap(); } + for _ in 0..(NUM_MESSAGES / 2) { + session + .send_message( + "john@test.org", + &["delay-random@foobar.org"], + "test:no_dkim", + "250", + ) + .await; + } - tokio::time::sleep(Duration::from_millis(1500)).await; + loop { + tokio::time::sleep(Duration::from_millis(1500)).await; + + let m = local.queue_receiver.read_queued_messages().await.len(); + let e = local.queue_receiver.read_queued_events().await.len(); + + if m + e != 0 { + println!("Queue still has {} messages and {} events", m, e); + for inner in &inners { + inner + .ipc + .queue_tx + .send(QueueEvent::Paused(true)) + .await + .unwrap(); + } + } else { + break; + } + } local.queue_receiver.assert_queue_is_empty().await; let remote_messages = remote.queue_receiver.read_queued_messages().await; - assert_eq!(remote_messages.len(), 100); + assert_eq!(remote_messages.len(), NUM_MESSAGES); // Make sure local store is queue core.core