Skip to content

Commit

Permalink
Added queue backpressure event
Browse files Browse the repository at this point in the history
  • Loading branch information
mdecimus committed Jan 14, 2025
1 parent 7142a8c commit 6bb468c
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 22 deletions.
9 changes: 8 additions & 1 deletion crates/smtp/src/inbound/rcpt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,15 @@ impl<T: SessionStream> Session<T> {
if self.instance.id.ends_with("-debug") {
if to.address.contains("fail@") {
return self.write(b"503 5.5.1 Invalid recipient.\r\n").await;
} else if to.address.contains("delay@") {
} else if (to.address.contains("delay-random@") && rand::random())
|| to.address.contains("delay@")
{
return self.write(b"451 4.5.3 Try again later.\r\n").await;
} else if to.address.contains("slow@") {
tokio::time::sleep(std::time::Duration::from_secs(
rand::random::<u64>() % 5 + 5,
))
.await;
}
}

Expand Down
39 changes: 34 additions & 5 deletions crates/smtp/src/queue/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub struct Queue {
pub rx: mpsc::Receiver<QueueEvent>,
}

#[derive(Debug)]
pub enum OnHold {
InFlight,
ConcurrencyLimited {
Expand All @@ -52,6 +53,7 @@ impl SpawnQueue for mpsc::Receiver<QueueEvent> {
}

const CLEANUP_INTERVAL: Duration = Duration::from_secs(10 * 60);
const BACK_PRESSURE_WARN_INTERVAL: Duration = Duration::from_secs(60);

impl Queue {
pub fn new(core: Arc<Inner>, rx: mpsc::Receiver<QueueEvent>) -> Self {
Expand All @@ -66,6 +68,7 @@ impl Queue {
pub async fn start(&mut self) {
let mut is_paused = false;
let mut next_cleanup = Instant::now() + CLEANUP_INTERVAL;
let mut last_backpressure_warning = Instant::now() - BACK_PRESSURE_WARN_INTERVAL;
let mut in_flight_count = 0;
let mut has_back_pressure = false;

Expand Down Expand Up @@ -129,6 +132,25 @@ impl Queue {
let max_in_flight = server.core.smtp.queue.throttle.outbound_concurrency;
has_back_pressure = in_flight_count >= max_in_flight;
if has_back_pressure {
self.next_wake_up = Instant::now() + Duration::from_secs(QUEUE_REFRESH);

if last_backpressure_warning.elapsed() >= BACK_PRESSURE_WARN_INTERVAL {
let queue_events = server.next_event().await;
last_backpressure_warning = Instant::now();
trc::event!(
Queue(trc::QueueEvent::BackPressure),
Reason =
"Queue outbound processing capacity for this node exceeded.",
Total = queue_events.len(),
Details = self
.on_hold
.keys()
.copied()
.map(trc::Value::from)
.collect::<Vec<_>>(),
Limit = max_in_flight,
);
}
continue;
}

Expand All @@ -146,11 +168,18 @@ impl Queue {
// Enforce global concurrency limits
if in_flight_count >= max_in_flight {
has_back_pressure = true;
trc::event!(
Queue(trc::QueueEvent::ConcurrencyLimitExceeded),
Details = "Outbound concurrency limit exceeded.",
Limit = max_in_flight,
);
if last_backpressure_warning.elapsed()
>= BACK_PRESSURE_WARN_INTERVAL
{
last_backpressure_warning = Instant::now();
trc::event!(
Queue(trc::QueueEvent::BackPressure),
Reason = "Queue outbound processing capacity for this node exceeded.",
Total = queue_events.len(),
Details = self.on_hold.keys().copied().map(trc::Value::from).collect::<Vec<_>>(),
Limit = max_in_flight,
);
}
break;
}

Expand Down
4 changes: 4 additions & 0 deletions crates/trc/src/event/description.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,7 @@ impl QueueEvent {
QueueEvent::QueueReport => "Queued report for delivery",
QueueEvent::QueueDsn => "Queued DSN for delivery",
QueueEvent::QueueAutogenerated => "Queued autogenerated message for delivery",
QueueEvent::BackPressure => "Queue backpressure detected",
}
}

Expand All @@ -738,6 +739,9 @@ impl QueueEvent {
QueueEvent::QueueReport => "A new report was queued for delivery",
QueueEvent::QueueDsn => "A delivery status notification was queued for delivery",
QueueEvent::QueueAutogenerated => "A system generated message was queued for delivery",
QueueEvent::BackPressure => {
"Queue congested, processing can't keep up with incoming message rate"
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/trc/src/event/level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ impl EventType {
DeliveryEvent::RawInput | DeliveryEvent::RawOutput => Level::Trace,
},
EventType::Queue(event) => match event {
QueueEvent::BackPressure => Level::Warn,
QueueEvent::QueueMessage
| QueueEvent::QueueMessageAuthenticated
| QueueEvent::QueueReport
Expand Down
1 change: 1 addition & 0 deletions crates/trc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ pub enum QueueEvent {
RateLimitExceeded,
ConcurrencyLimitExceeded,
QuotaExceeded,
BackPressure,
}

#[event_type]
Expand Down
4 changes: 3 additions & 1 deletion crates/trc/src/serializers/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,7 @@ impl EventType {
EventType::Spam(SpamEvent::Dnsbl) => 562,
EventType::Spam(SpamEvent::DnsblError) => 563,
EventType::Spam(SpamEvent::Pyzor) => 564,
EventType::Queue(QueueEvent::BackPressure) => 48,
}
}

Expand Down Expand Up @@ -1465,12 +1466,13 @@ impl EventType {
562 => Some(EventType::Spam(SpamEvent::Dnsbl)),
563 => Some(EventType::Spam(SpamEvent::DnsblError)),
564 => Some(EventType::Spam(SpamEvent::Pyzor)),
48 => Some(EventType::Queue(QueueEvent::BackPressure)),
_ => None,
}
}
}

// 57 48 147 148 335 336 376 458 459
// 57 147 148 335 336 376 458 459

impl Key {
fn code(&self) -> u64 {
Expand Down
43 changes: 37 additions & 6 deletions tests/src/smtp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,34 @@ cert = '%{file:{CERT}}%'
private-key = '%{file:{PK}}%'
[storage]
data = "rocksdb"
lookup = "rocksdb"
blob = "rocksdb"
fts = "rocksdb"
data = "{STORE}"
fts = "{STORE}"
blob = "{STORE}"
lookup = "{STORE}"
[store."rocksdb"]
type = "rocksdb"
path = "{TMP}/queue.db"
[store."foundationdb"]
type = "foundationdb"
[store."postgresql"]
type = "postgresql"
host = "localhost"
port = 5432
database = "stalwart"
user = "postgres"
password = "mysecretpassword"
[store."mysql"]
type = "mysql"
host = "localhost"
port = 3307
database = "stalwart"
user = "root"
password = "password"
"#;

impl TestSMTP {
Expand Down Expand Up @@ -198,9 +217,21 @@ impl TestSMTP {
}

pub async fn new(name: &str, config: impl AsRef<str>) -> TestSMTP {
Self::with_database(name, config, "rocksdb").await
}

pub async fn with_database(
name: &str,
config: impl AsRef<str>,
store_id: impl AsRef<str>,
) -> TestSMTP {
let temp_dir = TempDir::new(name, true);
let mut config =
Config::new(temp_dir.update_config(add_test_certs(CONFIG) + config.as_ref())).unwrap();
let mut config = Config::new(
temp_dir
.update_config(add_test_certs(CONFIG) + config.as_ref())
.replace("{STORE}", store_id.as_ref()),
)
.unwrap();
config.resolve_all_macros().await;
let stores = Stores::parse_all(&mut config, false).await;
let core = Core::parse(&mut config, stores, Default::default()).await;
Expand Down
54 changes: 45 additions & 9 deletions tests/src/smtp/queue/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ relay = true
messages = 2000
[queue.outbound]
concurrency = 1
concurrency = 4
[queue.schedule]
retry = "1s"
notify = "1d"
expire = "1d"
"#;

const REMOTE: &str = r#"
Expand All @@ -39,7 +43,10 @@ enable = false
"#;

#[tokio::test]
const NUM_MESSAGES: usize = 1000;
const NUM_QUEUES: usize = 10;

#[tokio::test(flavor = "multi_thread", worker_threads = 18)]
#[serial_test::serial]
async fn concurrent_queue() {
// Enable logging
Expand All @@ -49,7 +56,7 @@ async fn concurrent_queue() {
let remote = TestSMTP::new("smtp_concurrent_queue_remote", REMOTE).await;
let _rx = remote.start(&[ServerProtocol::Smtp]).await;

let local = TestSMTP::new("smtp_concurrent_queue_local", LOCAL).await;
let local = TestSMTP::with_database("smtp_concurrent_queue_local", LOCAL, "mysql").await;

// Add mock DNS entries
let core = local.build_smtp();
Expand All @@ -72,9 +79,9 @@ async fn concurrent_queue() {
session.eval_session_params().await;
session.ehlo("mx.test.org").await;

// Spawn 20 concurrent queues
// Spawn concurrent queues
let mut inners = vec![];
for _ in 0..20 {
for _ in 0..NUM_QUEUES {
let (inner, rxs) = local.inner_with_rxs();
let server = inner.build_server();
server.mx_add(
Expand All @@ -99,22 +106,51 @@ async fn concurrent_queue() {
tokio::time::sleep(Duration::from_millis(200)).await;

// Send 1000 test messages
for _ in 0..100 {
for _ in 0..(NUM_MESSAGES / 2) {
session
.send_message("[email protected]", &["bill@foobar.org"], "test:no_dkim", "250")
.send_message("[email protected]", &["slow@foobar.org"], "test:no_dkim", "250")
.await;
}

// Wake up all queues
for inner in &inners {
inner.ipc.queue_tx.send(QueueEvent::Refresh).await.unwrap();
}
for _ in 0..(NUM_MESSAGES / 2) {
session
.send_message(
"[email protected]",
&["[email protected]"],
"test:no_dkim",
"250",
)
.await;
}

tokio::time::sleep(Duration::from_millis(1500)).await;
loop {
tokio::time::sleep(Duration::from_millis(1500)).await;

let m = local.queue_receiver.read_queued_messages().await.len();
let e = local.queue_receiver.read_queued_events().await.len();

if m + e != 0 {
println!("Queue still has {} messages and {} events", m, e);
for inner in &inners {
inner
.ipc
.queue_tx
.send(QueueEvent::Paused(true))
.await
.unwrap();
}
} else {
break;
}
}

local.queue_receiver.assert_queue_is_empty().await;
let remote_messages = remote.queue_receiver.read_queued_messages().await;
assert_eq!(remote_messages.len(), 100);
assert_eq!(remote_messages.len(), NUM_MESSAGES);

// Make sure local store is queue
core.core
Expand Down

0 comments on commit 6bb468c

Please sign in to comment.