From 76c53af46bd25d40060c27cc5c79cc0d9d23b1db Mon Sep 17 00:00:00 2001 From: mdecimus Date: Mon, 13 Jan 2025 20:28:30 +0100 Subject: [PATCH] Fixed distributed locking in non-Redis stores (#1066) --- crates/common/src/ipc.rs | 15 ++-- crates/jmap/src/api/management/queue.rs | 14 +-- crates/jmap/src/services/gossip/ping.rs | 7 +- crates/smtp/src/outbound/delivery.rs | 54 ++++++------ crates/smtp/src/queue/manager.rs | 110 ++++++++++++++---------- crates/smtp/src/queue/spool.rs | 2 +- crates/store/src/dispatch/lookup.rs | 57 ++++++++++-- crates/store/src/dispatch/store.rs | 6 +- tests/Cargo.toml | 2 +- tests/src/directory/internal.rs | 14 +-- tests/src/jmap/auth_acl.rs | 24 ++++-- tests/src/jmap/mod.rs | 4 +- tests/src/jmap/permissions.rs | 8 ++ tests/src/smtp/inbound/antispam.rs | 3 + tests/src/smtp/inbound/mod.rs | 27 ++++-- tests/src/smtp/outbound/lmtp.rs | 4 +- tests/src/smtp/outbound/smtp.rs | 4 +- tests/src/smtp/outbound/throttle.rs | 6 +- tests/src/smtp/queue/concurrent.rs | 7 +- tests/src/smtp/queue/retry.rs | 13 +-- tests/src/store/lookup.rs | 26 ++++++ 21 files changed, 259 insertions(+), 148 deletions(-) diff --git a/crates/common/src/ipc.rs b/crates/common/src/ipc.rs index c8a791279..c0980d8a4 100644 --- a/crates/common/src/ipc.rs +++ b/crates/common/src/ipc.rs @@ -126,23 +126,26 @@ pub struct EncryptionKeys { #[derive(Debug)] pub enum QueueEvent { - Refresh(Option), - WorkerDone(u64), - OnHold { queue_id: u64, status: OnHold }, + Refresh, + WorkerDone { + queue_id: u64, + status: QueueEventStatus, + }, Paused(bool), Stop, } #[derive(Debug)] -pub enum OnHold { - InFlight, +pub enum QueueEventStatus { + Completed, Locked { until: u64, }, - ConcurrencyLimited { + Limited { limiters: Vec, next_due: Option, }, + Deferred, } #[derive(Debug, Clone, Copy)] diff --git a/crates/jmap/src/api/management/queue.rs b/crates/jmap/src/api/management/queue.rs index 3ddf4491a..5460c3045 100644 --- a/crates/jmap/src/api/management/queue.rs +++ b/crates/jmap/src/api/management/queue.rs @@ -257,12 +257,7 @@ impl QueueManagement for Server { } } - let _ = server - .inner - .ipc - .queue_tx - .send(QueueEvent::Refresh(None)) - .await; + let _ = server.inner.ipc.queue_tx.send(QueueEvent::Refresh).await; }); } @@ -314,12 +309,7 @@ impl QueueManagement for Server { message .save_changes(self, prev_event.into(), next_event.into()) .await; - let _ = self - .inner - .ipc - .queue_tx - .send(QueueEvent::Refresh(None)) - .await; + let _ = self.inner.ipc.queue_tx.send(QueueEvent::Refresh).await; } Ok(JsonResponse::new(json!({ diff --git a/crates/jmap/src/services/gossip/ping.rs b/crates/jmap/src/services/gossip/ping.rs index 15a8a860a..53c97c404 100644 --- a/crates/jmap/src/services/gossip/ping.rs +++ b/crates/jmap/src/services/gossip/ping.rs @@ -75,12 +75,7 @@ impl Gossiper { trc::event!(Cluster(ClusterEvent::OneOrMorePeersOffline)); server.notify_task_queue(); - let _ = server - .inner - .ipc - .queue_tx - .send(QueueEvent::Refresh(None)) - .await; + let _ = server.inner.ipc.queue_tx.send(QueueEvent::Refresh).await; }); } diff --git a/crates/smtp/src/outbound/delivery.rs b/crates/smtp/src/outbound/delivery.rs index a38771573..f1b2d0007 100644 --- a/crates/smtp/src/outbound/delivery.rs +++ b/crates/smtp/src/outbound/delivery.rs @@ -18,12 +18,13 @@ use common::config::{ server::ServerProtocol, smtp::{queue::RequireOptional, report::AggregateFrequency}, }; -use common::ipc::{OnHold, PolicyType, QueueEvent, TlsEvent}; +use common::ipc::{PolicyType, QueueEvent, QueueEventStatus, TlsEvent}; use common::Server; use mail_auth::{ mta_sts::TlsRpt, report::tlsrpt::{FailureDetails, ResultType}, }; +use rand::Rng; use smtp_proto::MAIL_REQUIRETLS; use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, @@ -45,7 +46,7 @@ impl DeliveryAttempt { tokio::spawn(async move { // Lock queue event let queue_id = self.event.queue_id; - let queue_event = if server.try_lock_event(queue_id).await { + let status = if server.try_lock_event(queue_id).await { if let Some(mut message) = server.read_message(queue_id).await { // Generate span id message.span_id = server.inner.data.span_id_gen.generate().unwrap_or_else(now); @@ -111,19 +112,23 @@ impl DeliveryAttempt { // Unlock event server.unlock_event(queue_id).await; - QueueEvent::WorkerDone(queue_id) + QueueEventStatus::Completed } } else { - QueueEvent::OnHold { - queue_id: self.event.queue_id, - status: OnHold::Locked { - until: now() + LOCK_EXPIRY + 1, - }, + QueueEventStatus::Locked { + until: now() + LOCK_EXPIRY + rand::thread_rng().gen_range(5..10), } }; // Notify queue manager - if server.inner.ipc.queue_tx.send(queue_event).await.is_err() { + if server + .inner + .ipc + .queue_tx + .send(QueueEvent::WorkerDone { queue_id, status }) + .await + .is_err() + { trc::event!( Server(ServerEvent::ThreadError), Reason = "Channel closed.", @@ -133,11 +138,10 @@ impl DeliveryAttempt { }); } - async fn deliver_task(mut self, server: Server, mut message: Message) -> QueueEvent { + async fn deliver_task(mut self, server: Server, mut message: Message) -> QueueEventStatus { // Check that the message still has recipients to be delivered let has_pending_delivery = message.has_pending_delivery(); let span_id = message.span_id; - let queue_id = message.queue_id; // Send any due Delivery Status Notifications server.send_dsn(&mut message).await; @@ -150,7 +154,7 @@ impl DeliveryAttempt { message .save_changes(&server, self.event.due.into(), due.into()) .await; - return QueueEvent::Refresh(queue_id.into()); + return QueueEventStatus::Deferred; } } else { trc::event!( @@ -162,7 +166,7 @@ impl DeliveryAttempt { // All message recipients expired, do not re-queue. (DSN has been already sent) message.remove(&server, self.event.due).await; - return QueueEvent::WorkerDone(queue_id); + return QueueEventStatus::Completed; } // Throttle sender @@ -183,12 +187,9 @@ impl DeliveryAttempt { SpanId = span_id, ); - QueueEvent::OnHold { - queue_id: self.event.queue_id, - status: OnHold::ConcurrencyLimited { - next_due, - limiters: vec![limiter], - }, + QueueEventStatus::Limited { + limiters: vec![limiter], + next_due, } } throttle::Error::Rate { retry_at } => { @@ -209,7 +210,7 @@ impl DeliveryAttempt { .save_changes(&server, self.event.due.into(), next_event.into()) .await; - QueueEvent::Refresh(queue_id.into()) + QueueEventStatus::Deferred } }; @@ -1331,12 +1332,9 @@ impl DeliveryAttempt { SpanId = span_id, ); - QueueEvent::OnHold { - queue_id: self.event.queue_id, - status: OnHold::ConcurrencyLimited { - next_due, - limiters: on_hold, - }, + QueueEventStatus::Limited { + limiters: on_hold, + next_due, } } else if let Some(due) = message.next_event() { trc::event!( @@ -1352,7 +1350,7 @@ impl DeliveryAttempt { .save_changes(&server, self.event.due.into(), due.into()) .await; - QueueEvent::Refresh(queue_id.into()) + QueueEventStatus::Deferred } else { trc::event!( Delivery(DeliveryEvent::Completed), @@ -1363,7 +1361,7 @@ impl DeliveryAttempt { // Delete message from queue message.remove(&server, self.event.due).await; - QueueEvent::WorkerDone(queue_id) + QueueEventStatus::Completed } } } diff --git a/crates/smtp/src/queue/manager.rs b/crates/smtp/src/queue/manager.rs index b194563f9..d0fdbedf5 100644 --- a/crates/smtp/src/queue/manager.rs +++ b/crates/smtp/src/queue/manager.rs @@ -12,7 +12,8 @@ use std::{ use ahash::{AHashMap, AHashSet}; use common::{ core::BuildServer, - ipc::{OnHold, QueueEvent}, + ipc::{QueueEvent, QueueEventStatus}, + listener::limiter::ConcurrencyLimiter, Inner, }; use rand::seq::SliceRandom; @@ -21,7 +22,6 @@ use tokio::sync::mpsc; use super::{ spool::{SmtpSpool, QUEUE_REFRESH}, - throttle::IsAllowed, DeliveryAttempt, Message, QueueId, Status, }; @@ -32,6 +32,17 @@ pub struct Queue { pub rx: mpsc::Receiver, } +pub enum OnHold { + InFlight, + ConcurrencyLimited { + limiters: Vec, + next_due: Option, + }, + Locked { + until: u64, + }, +} + impl SpawnQueue for mpsc::Receiver { fn spawn(self, core: Arc) { tokio::spawn(async move { @@ -55,6 +66,8 @@ impl Queue { pub async fn start(&mut self) { let mut is_paused = false; let mut next_cleanup = Instant::now() + CLEANUP_INTERVAL; + let mut in_flight_count = 0; + let mut has_back_pressure = false; loop { let refresh_queue = match tokio::time::timeout( @@ -63,27 +76,37 @@ impl Queue { ) .await { - Ok(Some(QueueEvent::Refresh(queue_id))) => { - if let Some(queue_id) = queue_id { - self.on_hold.remove(&queue_id); - } - true - } - Ok(Some(QueueEvent::WorkerDone(queue_id))) => { - self.on_hold.remove(&queue_id); - !self.on_hold.is_empty() - } - Ok(Some(QueueEvent::OnHold { queue_id, status })) => { - if let OnHold::Locked { until } = &status { - let due_in = Instant::now() + Duration::from_secs(*until - now()); - if due_in < self.next_wake_up { - self.next_wake_up = due_in; + Ok(Some(QueueEvent::WorkerDone { queue_id, status })) => { + in_flight_count -= 1; + + match status { + QueueEventStatus::Completed => { + self.on_hold.remove(&queue_id); + !self.on_hold.is_empty() || has_back_pressure } - } + QueueEventStatus::Locked { until } => { + let due_in = Instant::now() + Duration::from_secs(until - now()); + if due_in < self.next_wake_up { + self.next_wake_up = due_in; + } - self.on_hold.insert(queue_id, status); - self.on_hold.len() > 1 + self.on_hold.insert(queue_id, OnHold::Locked { until }); + self.on_hold.len() > 1 || has_back_pressure + } + QueueEventStatus::Limited { limiters, next_due } => { + self.on_hold.insert( + queue_id, + OnHold::ConcurrencyLimited { limiters, next_due }, + ); + !self.on_hold.is_empty() || has_back_pressure + } + QueueEventStatus::Deferred => { + self.on_hold.remove(&queue_id); + true + } + } } + Ok(Some(QueueEvent::Refresh)) => true, Ok(Some(QueueEvent::Paused(paused))) => { self.core .data @@ -101,11 +124,17 @@ impl Queue { if !is_paused { // Deliver scheduled messages if refresh_queue || self.next_wake_up <= Instant::now() { - let now = now(); - let mut next_wake_up = QUEUE_REFRESH; + // If the number of in-flight messages is greater than the maximum allowed, skip the queue let server = self.core.build_server(); + let max_in_flight = server.core.smtp.queue.throttle.outbound_concurrency; + has_back_pressure = in_flight_count >= max_in_flight; + if has_back_pressure { + continue; + } // Process queue events + let now = now(); + let mut next_wake_up = QUEUE_REFRESH; let mut queue_events = server.next_event().await; if queue_events.len() > 5 { @@ -114,6 +143,17 @@ impl Queue { for queue_event in &queue_events { if queue_event.due <= now { + // Enforce global concurrency limits + if in_flight_count >= max_in_flight { + has_back_pressure = true; + trc::event!( + Queue(trc::QueueEvent::ConcurrencyLimitExceeded), + Details = "Outbound concurrency limit exceeded.", + Limit = max_in_flight, + ); + break; + } + // Check if the message is still on hold if let Some(on_hold) = self.on_hold.get(&queue_event.queue_id) { match on_hold { @@ -140,28 +180,10 @@ impl Queue { self.on_hold.remove(&queue_event.queue_id); } - // Enforce global concurrency limits - let mut in_flight = Vec::new(); - match server.is_outbound_allowed(&mut in_flight) { - Ok(_) => { - self.on_hold.insert(queue_event.queue_id, OnHold::InFlight); - DeliveryAttempt { - in_flight, - event: *queue_event, - } - .try_deliver(server.clone()); - } - - Err(limiter) => { - self.on_hold.insert( - queue_event.queue_id, - OnHold::ConcurrencyLimited { - limiters: vec![limiter], - next_due: None, - }, - ); - } - } + // Deliver message + in_flight_count += 1; + self.on_hold.insert(queue_event.queue_id, OnHold::InFlight); + DeliveryAttempt::new(*queue_event).try_deliver(server.clone()); } else { let due_in = queue_event.due - now; if due_in < next_wake_up { diff --git a/crates/smtp/src/queue/spool.rs b/crates/smtp/src/queue/spool.rs index 74e693599..328f72895 100644 --- a/crates/smtp/src/queue/spool.rs +++ b/crates/smtp/src/queue/spool.rs @@ -308,7 +308,7 @@ impl Message { .inner .ipc .queue_tx - .send(QueueEvent::Refresh(None)) + .send(QueueEvent::Refresh) .await .is_err() { diff --git a/crates/store/src/dispatch/lookup.rs b/crates/store/src/dispatch/lookup.rs index bce1fab35..fce639975 100644 --- a/crates/store/src/dispatch/lookup.rs +++ b/crates/store/src/dispatch/lookup.rs @@ -9,7 +9,11 @@ use std::borrow::Cow; use trc::AddContext; use utils::config::Rate; -use crate::{backend::http::lookup::HttpStoreGet, write::InMemoryClass}; +use crate::{ + backend::http::lookup::HttpStoreGet, + write::{assert::AssertValue, InMemoryClass, MaybeDynamicId}, + Serialize, +}; #[allow(unused_imports)] use crate::{ write::{ @@ -271,13 +275,56 @@ impl InMemoryStore { } pub async fn try_lock(&self, prefix: u8, key: &[u8], duration: u64) -> trc::Result { - self.counter_incr(KeyValue::with_prefix(prefix, key, 1).expires(duration)) - .await - .map(|count| count == 1) + match self { + InMemoryStore::Store(store) => { + let key = KeyValue::<()>::build_key(prefix, key); + let lock_expiry = store + .get_value::(ValueKey::from(ValueClass::InMemory(InMemoryClass::Key( + key.clone(), + )))) + .await + .caused_by(trc::location!())?; + let now = now(); + if lock_expiry.is_some_and(|expiry| expiry > now) { + return Ok(false); + } + + let key: ValueClass = ValueClass::InMemory(InMemoryClass::Key(key)); + let mut batch = BatchBuilder::new(); + batch.assert_value( + key.clone(), + match lock_expiry { + Some(value) => AssertValue::U64(value), + None => AssertValue::None, + }, + ); + batch.set(key.clone(), (now + duration).serialize()); + match store.write(batch.build()).await { + Ok(_) => Ok(true), + Err(err) if err.is_assertion_failure() => Ok(false), + Err(err) => Err(err + .details("Failed to lock event.") + .caused_by(trc::location!())), + } + } + #[cfg(feature = "redis")] + InMemoryStore::Redis(store) => store + .key_incr(&KeyValue::<()>::build_key(prefix, key), 1, duration.into()) + .await + .map(|count| count == 1), + #[cfg(feature = "enterprise")] + InMemoryStore::Sharded(store) => store + .counter_incr(KeyValue::with_prefix(prefix, key, 1).expires(duration)) + .await + .map(|count| count == 1), + InMemoryStore::Static(_) | InMemoryStore::Http(_) => { + Err(trc::StoreEvent::NotSupported.into_err()) + } + } } pub async fn remove_lock(&self, prefix: u8, key: &[u8]) -> trc::Result<()> { - self.counter_delete(KeyValue::<()>::build_key(prefix, key)) + self.key_delete(KeyValue::<()>::build_key(prefix, key)) .await } diff --git a/crates/store/src/dispatch/store.rs b/crates/store/src/dispatch/store.rs index 5be0066cc..c7c14cfb7 100644 --- a/crates/store/src/dispatch/store.rs +++ b/crates/store/src/dispatch/store.rs @@ -880,10 +880,12 @@ impl Store { } _ => { println!( - "Found key in {:?}: {:?} {:?}", + "Found key in {:?}: {:?} ({:?}) = {:?} ({:?})", char::from(subspace), key, - value + String::from_utf8_lossy(key), + value, + String::from_utf8_lossy(value) ); } } diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 7d5d87adb..5e54d7773 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -7,7 +7,7 @@ resolver = "2" [features] #default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis", "azure", "foundationdb"] #default = ["sqlite", "postgres", "mysql", "rocks", "s3", "redis", "foundationdb"] -default = ["rocks"] +default = ["rocks", "redis"] sqlite = ["store/sqlite"] foundationdb = ["store/foundation", "common/foundation"] postgres = ["store/postgres"] diff --git a/tests/src/directory/internal.rs b/tests/src/directory/internal.rs index bb219764f..c1bc72e23 100644 --- a/tests/src/directory/internal.rs +++ b/tests/src/directory/internal.rs @@ -9,7 +9,7 @@ use directory::{ backend::{ internal::{ lookup::DirectoryStore, - manage::{self, ManageDirectory, UpdatePrincipal}, + manage::{self, ChangedPrincipals, ManageDirectory, UpdatePrincipal}, PrincipalField, PrincipalUpdate, PrincipalValue, }, RcptType, @@ -742,8 +742,8 @@ pub trait TestInternalDirectory { async fn create_test_group(&self, login: &str, name: &str, emails: &[&str]) -> u32; async fn create_test_list(&self, login: &str, name: &str, emails: &[&str]) -> u32; async fn set_test_quota(&self, login: &str, quota: u32); - async fn add_to_group(&self, login: &str, group: &str); - async fn remove_from_group(&self, login: &str, group: &str); + async fn add_to_group(&self, login: &str, group: &str) -> ChangedPrincipals; + async fn remove_from_group(&self, login: &str, group: &str) -> ChangedPrincipals; async fn remove_test_alias(&self, login: &str, alias: &str); async fn create_test_domains(&self, domains: &[&str]); } @@ -866,7 +866,7 @@ impl TestInternalDirectory for Store { .unwrap(); } - async fn add_to_group(&self, login: &str, group: &str) { + async fn add_to_group(&self, login: &str, group: &str) -> ChangedPrincipals { self.update_principal(UpdatePrincipal::by_name(login).with_updates(vec![ PrincipalUpdate::add_item( PrincipalField::MemberOf, @@ -874,10 +874,10 @@ impl TestInternalDirectory for Store { ), ])) .await - .unwrap(); + .unwrap() } - async fn remove_from_group(&self, login: &str, group: &str) { + async fn remove_from_group(&self, login: &str, group: &str) -> ChangedPrincipals { self.update_principal(UpdatePrincipal::by_name(login).with_updates(vec![ PrincipalUpdate::remove_item( PrincipalField::MemberOf, @@ -885,7 +885,7 @@ impl TestInternalDirectory for Store { ), ])) .await - .unwrap(); + .unwrap() } async fn remove_test_alias(&self, login: &str, alias: &str) { diff --git a/tests/src/jmap/auth_acl.rs b/tests/src/jmap/auth_acl.rs index 8cba15420..53dda5a6a 100644 --- a/tests/src/jmap/auth_acl.rs +++ b/tests/src/jmap/auth_acl.rs @@ -664,10 +664,14 @@ pub async fn test(params: &mut JMAPTest) { // Add John and Jane to the Sales group for name in ["jdoe@example.com", "jane.smith@example.com"] { server - .core - .storage - .data - .add_to_group(name, "sales@example.com") + .increment_principal_revision( + server + .core + .storage + .data + .add_to_group(name, "sales@example.com") + .await, + ) .await; } john_client.refresh_session().await.unwrap(); @@ -764,10 +768,14 @@ pub async fn test(params: &mut JMAPTest) { // Remove John from the sales group server - .core - .storage - .data - .remove_from_group("jdoe@example.com", "sales@example.com") + .increment_principal_revision( + server + .core + .storage + .data + .remove_from_group("jdoe@example.com", "sales@example.com") + .await, + ) .await; assert_forbidden( john_client diff --git a/tests/src/jmap/mod.rs b/tests/src/jmap/mod.rs index 8a4072eaa..5c98e8dce 100644 --- a/tests/src/jmap/mod.rs +++ b/tests/src/jmap/mod.rs @@ -372,7 +372,7 @@ pub async fn jmap_tests() { .await; webhooks::test(&mut params).await; - email_query::test(&mut params, delete).await; + /*email_query::test(&mut params, delete).await; email_get::test(&mut params).await; email_set::test(&mut params).await; email_parse::test(&mut params).await; @@ -395,7 +395,7 @@ pub async fn jmap_tests() { websocket::test(&mut params).await; quota::test(&mut params).await; crypto::test(&mut params).await; - blob::test(&mut params).await; + blob::test(&mut params).await;*/ permissions::test(¶ms).await; purge::test(&mut params).await; enterprise::test(&mut params).await; diff --git a/tests/src/jmap/permissions.rs b/tests/src/jmap/permissions.rs index 5877e7b0f..5c99b6bd2 100644 --- a/tests/src/jmap/permissions.rs +++ b/tests/src/jmap/permissions.rs @@ -735,6 +735,14 @@ pub async fn test(params: &JMAPTest) { .unwrap() .unwrap_data(); + server + .core + .storage + .config + .clear("report.domain") + .await + .unwrap(); + assert_is_empty(server).await; } diff --git a/tests/src/smtp/inbound/antispam.rs b/tests/src/smtp/inbound/antispam.rs index 72a163734..265adc8d5 100644 --- a/tests/src/smtp/inbound/antispam.rs +++ b/tests/src/smtp/inbound/antispam.rs @@ -77,6 +77,9 @@ category = 0 confidence = 1 explanation = 2 +[spam-filter.reputation] +enable = true + [session.rcpt] relay = true diff --git a/tests/src/smtp/inbound/mod.rs b/tests/src/smtp/inbound/mod.rs index a04e78e79..e8256ae10 100644 --- a/tests/src/smtp/inbound/mod.rs +++ b/tests/src/smtp/inbound/mod.rs @@ -7,7 +7,7 @@ use std::time::Duration; use common::{ - ipc::{DmarcEvent, OnHold, QueueEvent, QueuedMessage, ReportingEvent, TlsEvent}, + ipc::{DmarcEvent, QueueEvent, QueueEventStatus, QueuedMessage, ReportingEvent, TlsEvent}, Server, }; use store::{ @@ -302,34 +302,47 @@ pub trait TestQueueEvent { fn assert_refresh(self); fn assert_done(self); fn assert_refresh_or_done(self); - fn unwrap_on_hold(self) -> OnHold; + fn assert_on_hold(self); } impl TestQueueEvent for QueueEvent { fn assert_refresh(self) { match self { - QueueEvent::Refresh(_) => (), + QueueEvent::Refresh + | QueueEvent::WorkerDone { + status: QueueEventStatus::Deferred, + .. + } => (), e => panic!("Unexpected event: {e:?}"), } } fn assert_done(self) { match self { - QueueEvent::WorkerDone(_) => (), + QueueEvent::WorkerDone { + status: QueueEventStatus::Completed, + .. + } => (), e => panic!("Unexpected event: {e:?}"), } } fn assert_refresh_or_done(self) { match self { - QueueEvent::Refresh(_) | QueueEvent::WorkerDone(_) => (), + QueueEvent::WorkerDone { + status: QueueEventStatus::Completed | QueueEventStatus::Deferred, + .. + } => (), e => panic!("Unexpected event: {e:?}"), } } - fn unwrap_on_hold(self) -> OnHold { + fn assert_on_hold(self) { match self { - QueueEvent::OnHold { status, .. } => status, + QueueEvent::WorkerDone { + status: QueueEventStatus::Limited { .. }, + .. + } => (), e => panic!("Unexpected event: {e:?}"), } } diff --git a/tests/src/smtp/outbound/lmtp.rs b/tests/src/smtp/outbound/lmtp.rs index eee466fc3..b1e55e67e 100644 --- a/tests/src/smtp/outbound/lmtp.rs +++ b/tests/src/smtp/outbound/lmtp.rs @@ -107,8 +107,8 @@ async fn lmtp_delivery() { let mut dsn = Vec::new(); loop { match local.queue_receiver.try_read_event().await { - Some(QueueEvent::Refresh(_) | QueueEvent::WorkerDone(_)) => {} - Some(QueueEvent::OnHold { .. }) | Some(QueueEvent::Paused(_)) => unreachable!(), + Some(QueueEvent::Refresh | QueueEvent::WorkerDone { .. }) => {} + Some(QueueEvent::Paused(_)) => unreachable!(), None | Some(QueueEvent::Stop) => break, } diff --git a/tests/src/smtp/outbound/smtp.rs b/tests/src/smtp/outbound/smtp.rs index 1e65339a3..b22c3c6b3 100644 --- a/tests/src/smtp/outbound/smtp.rs +++ b/tests/src/smtp/outbound/smtp.rs @@ -136,8 +136,8 @@ async fn smtp_delivery() { let mut domain_retries = vec![0; num_domains]; loop { match local.queue_receiver.try_read_event().await { - Some(QueueEvent::Refresh(_) | QueueEvent::WorkerDone(_)) => {} - Some(QueueEvent::OnHold { .. }) | Some(QueueEvent::Paused(_)) => unreachable!(), + Some(QueueEvent::Refresh | QueueEvent::WorkerDone { .. }) => {} + Some(QueueEvent::Paused(_)) => unreachable!(), None | Some(QueueEvent::Stop) => break, } diff --git a/tests/src/smtp/outbound/throttle.rs b/tests/src/smtp/outbound/throttle.rs index 4b92112d9..b2f523924 100644 --- a/tests/src/smtp/outbound/throttle.rs +++ b/tests/src/smtp/outbound/throttle.rs @@ -109,7 +109,7 @@ async fn throttle_outbound() { .await .try_deliver(core.clone()); tokio::time::sleep(Duration::from_millis(100)).await; - local.queue_receiver.read_event().await.unwrap_on_hold(); + local.queue_receiver.read_event().await.assert_on_hold(); in_flight.clear(); // Expect rate limit throttle for sender domain 'foobar.net' @@ -172,7 +172,7 @@ async fn throttle_outbound() { .await .try_deliver(core.clone()); tokio::time::sleep(Duration::from_millis(100)).await; - local.queue_receiver.read_event().await.unwrap_on_hold(); + local.queue_receiver.read_event().await.assert_on_hold(); in_flight.clear(); // Expect rate limit throttle for recipient domain 'example.net' @@ -252,7 +252,7 @@ async fn throttle_outbound() { .expect_message_then_deliver() .await .try_deliver(core.clone()); - local.queue_receiver.read_event().await.unwrap_on_hold(); + local.queue_receiver.read_event().await.assert_on_hold(); in_flight.clear(); // Expect rate limit throttle for mx 'mx.test.net' diff --git a/tests/src/smtp/queue/concurrent.rs b/tests/src/smtp/queue/concurrent.rs index 9be1390b5..5c9afcf6f 100644 --- a/tests/src/smtp/queue/concurrent.rs +++ b/tests/src/smtp/queue/concurrent.rs @@ -107,12 +107,7 @@ async fn concurrent_queue() { // Wake up all queues for inner in &inners { - inner - .ipc - .queue_tx - .send(QueueEvent::Refresh(None)) - .await - .unwrap(); + inner.ipc.queue_tx.send(QueueEvent::Refresh).await.unwrap(); } tokio::time::sleep(Duration::from_millis(1500)).await; diff --git a/tests/src/smtp/queue/retry.rs b/tests/src/smtp/queue/retry.rs index f13c16c94..e392fed6c 100644 --- a/tests/src/smtp/queue/retry.rs +++ b/tests/src/smtp/queue/retry.rs @@ -12,7 +12,7 @@ use crate::smtp::{ TestSMTP, }; use ahash::AHashSet; -use common::ipc::QueueEvent; +use common::ipc::{QueueEvent, QueueEventStatus}; use smtp::queue::{spool::SmtpSpool, DeliveryAttempt}; use store::write::now; @@ -90,13 +90,14 @@ async fn queue_retry() { loop { match qr.try_read_event().await { - Some(QueueEvent::Refresh(Some(queue_id)) | QueueEvent::WorkerDone(queue_id)) => { + Some(QueueEvent::WorkerDone { queue_id, status }) => { in_fight.remove(&queue_id); + match &status { + QueueEventStatus::Completed | QueueEventStatus::Deferred => (), + _ => panic!("unexpected status {queue_id}: {status:?}"), + } } - Some(QueueEvent::OnHold { queue_id, status }) => { - panic!("unexpected on hold event {queue_id}: {status:?}"); - } - Some(QueueEvent::Refresh(None)) => (), + Some(QueueEvent::Refresh) => (), None | Some(QueueEvent::Stop) | Some(QueueEvent::Paused(_)) => break, } diff --git a/tests/src/store/lookup.rs b/tests/src/store/lookup.rs index 8a4ef6c02..03caa040b 100644 --- a/tests/src/store/lookup.rs +++ b/tests/src/store/lookup.rs @@ -126,6 +126,32 @@ pub async fn lookup_tests() { store.assert_is_empty(store.clone().into()).await; } + // Test locking + for iteration in [1, 2] { + let mut tasks = Vec::new(); + for _ in 0..100 { + let store = store.clone(); + tasks.push(tokio::spawn(async move { + store.try_lock(0, "lock".as_bytes(), 1).await.unwrap() + })); + } + // Only one should return true + let mut count = 0; + for task in tasks { + if task.await.unwrap() { + count += 1; + } + } + assert_eq!(1, count, "Iteration {}", iteration); + + // Wait 2 seconds for the lock to expire + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + } + store.purge_in_memory_store().await.unwrap(); + if let InMemoryStore::Store(store) = &store { + store.assert_is_empty(store.clone().into()).await; + } + // Test prefix delete store .key_set(KeyValue::with_prefix(