Skip to content

Commit

Permalink
Fixed distributed locking in non-Redis stores (#1066)
Browse files Browse the repository at this point in the history
  • Loading branch information
mdecimus committed Jan 13, 2025
1 parent 5bec52b commit 76c53af
Show file tree
Hide file tree
Showing 21 changed files with 259 additions and 148 deletions.
15 changes: 9 additions & 6 deletions crates/common/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,23 +126,26 @@ pub struct EncryptionKeys {

#[derive(Debug)]
pub enum QueueEvent {
Refresh(Option<u64>),
WorkerDone(u64),
OnHold { queue_id: u64, status: OnHold },
Refresh,
WorkerDone {
queue_id: u64,
status: QueueEventStatus,
},
Paused(bool),
Stop,
}

#[derive(Debug)]
pub enum OnHold {
InFlight,
pub enum QueueEventStatus {
Completed,
Locked {
until: u64,
},
ConcurrencyLimited {
Limited {
limiters: Vec<ConcurrencyLimiter>,
next_due: Option<u64>,
},
Deferred,
}

#[derive(Debug, Clone, Copy)]
Expand Down
14 changes: 2 additions & 12 deletions crates/jmap/src/api/management/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,7 @@ impl QueueManagement for Server {
}
}

let _ = server
.inner
.ipc
.queue_tx
.send(QueueEvent::Refresh(None))
.await;
let _ = server.inner.ipc.queue_tx.send(QueueEvent::Refresh).await;
});
}

Expand Down Expand Up @@ -314,12 +309,7 @@ impl QueueManagement for Server {
message
.save_changes(self, prev_event.into(), next_event.into())
.await;
let _ = self
.inner
.ipc
.queue_tx
.send(QueueEvent::Refresh(None))
.await;
let _ = self.inner.ipc.queue_tx.send(QueueEvent::Refresh).await;
}

Ok(JsonResponse::new(json!({
Expand Down
7 changes: 1 addition & 6 deletions crates/jmap/src/services/gossip/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,7 @@ impl Gossiper {
trc::event!(Cluster(ClusterEvent::OneOrMorePeersOffline));

server.notify_task_queue();
let _ = server
.inner
.ipc
.queue_tx
.send(QueueEvent::Refresh(None))
.await;
let _ = server.inner.ipc.queue_tx.send(QueueEvent::Refresh).await;
});
}

Expand Down
54 changes: 26 additions & 28 deletions crates/smtp/src/outbound/delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ use common::config::{
server::ServerProtocol,
smtp::{queue::RequireOptional, report::AggregateFrequency},
};
use common::ipc::{OnHold, PolicyType, QueueEvent, TlsEvent};
use common::ipc::{PolicyType, QueueEvent, QueueEventStatus, TlsEvent};
use common::Server;
use mail_auth::{
mta_sts::TlsRpt,
report::tlsrpt::{FailureDetails, ResultType},
};
use rand::Rng;
use smtp_proto::MAIL_REQUIRETLS;
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
Expand All @@ -45,7 +46,7 @@ impl DeliveryAttempt {
tokio::spawn(async move {
// Lock queue event
let queue_id = self.event.queue_id;
let queue_event = if server.try_lock_event(queue_id).await {
let status = if server.try_lock_event(queue_id).await {
if let Some(mut message) = server.read_message(queue_id).await {
// Generate span id
message.span_id = server.inner.data.span_id_gen.generate().unwrap_or_else(now);
Expand Down Expand Up @@ -111,19 +112,23 @@ impl DeliveryAttempt {
// Unlock event
server.unlock_event(queue_id).await;

QueueEvent::WorkerDone(queue_id)
QueueEventStatus::Completed
}
} else {
QueueEvent::OnHold {
queue_id: self.event.queue_id,
status: OnHold::Locked {
until: now() + LOCK_EXPIRY + 1,
},
QueueEventStatus::Locked {
until: now() + LOCK_EXPIRY + rand::thread_rng().gen_range(5..10),
}
};

// Notify queue manager
if server.inner.ipc.queue_tx.send(queue_event).await.is_err() {
if server
.inner
.ipc
.queue_tx
.send(QueueEvent::WorkerDone { queue_id, status })
.await
.is_err()
{
trc::event!(
Server(ServerEvent::ThreadError),
Reason = "Channel closed.",
Expand All @@ -133,11 +138,10 @@ impl DeliveryAttempt {
});
}

async fn deliver_task(mut self, server: Server, mut message: Message) -> QueueEvent {
async fn deliver_task(mut self, server: Server, mut message: Message) -> QueueEventStatus {
// Check that the message still has recipients to be delivered
let has_pending_delivery = message.has_pending_delivery();
let span_id = message.span_id;
let queue_id = message.queue_id;

// Send any due Delivery Status Notifications
server.send_dsn(&mut message).await;
Expand All @@ -150,7 +154,7 @@ impl DeliveryAttempt {
message
.save_changes(&server, self.event.due.into(), due.into())
.await;
return QueueEvent::Refresh(queue_id.into());
return QueueEventStatus::Deferred;
}
} else {
trc::event!(
Expand All @@ -162,7 +166,7 @@ impl DeliveryAttempt {
// All message recipients expired, do not re-queue. (DSN has been already sent)
message.remove(&server, self.event.due).await;

return QueueEvent::WorkerDone(queue_id);
return QueueEventStatus::Completed;
}

// Throttle sender
Expand All @@ -183,12 +187,9 @@ impl DeliveryAttempt {
SpanId = span_id,
);

QueueEvent::OnHold {
queue_id: self.event.queue_id,
status: OnHold::ConcurrencyLimited {
next_due,
limiters: vec![limiter],
},
QueueEventStatus::Limited {
limiters: vec![limiter],
next_due,
}
}
throttle::Error::Rate { retry_at } => {
Expand All @@ -209,7 +210,7 @@ impl DeliveryAttempt {
.save_changes(&server, self.event.due.into(), next_event.into())
.await;

QueueEvent::Refresh(queue_id.into())
QueueEventStatus::Deferred
}
};

Expand Down Expand Up @@ -1331,12 +1332,9 @@ impl DeliveryAttempt {
SpanId = span_id,
);

QueueEvent::OnHold {
queue_id: self.event.queue_id,
status: OnHold::ConcurrencyLimited {
next_due,
limiters: on_hold,
},
QueueEventStatus::Limited {
limiters: on_hold,
next_due,
}
} else if let Some(due) = message.next_event() {
trc::event!(
Expand All @@ -1352,7 +1350,7 @@ impl DeliveryAttempt {
.save_changes(&server, self.event.due.into(), due.into())
.await;

QueueEvent::Refresh(queue_id.into())
QueueEventStatus::Deferred
} else {
trc::event!(
Delivery(DeliveryEvent::Completed),
Expand All @@ -1363,7 +1361,7 @@ impl DeliveryAttempt {
// Delete message from queue
message.remove(&server, self.event.due).await;

QueueEvent::WorkerDone(queue_id)
QueueEventStatus::Completed
}
}
}
Expand Down
110 changes: 66 additions & 44 deletions crates/smtp/src/queue/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use std::{
use ahash::{AHashMap, AHashSet};
use common::{
core::BuildServer,
ipc::{OnHold, QueueEvent},
ipc::{QueueEvent, QueueEventStatus},
listener::limiter::ConcurrencyLimiter,
Inner,
};
use rand::seq::SliceRandom;
Expand All @@ -21,7 +22,6 @@ use tokio::sync::mpsc;

use super::{
spool::{SmtpSpool, QUEUE_REFRESH},
throttle::IsAllowed,
DeliveryAttempt, Message, QueueId, Status,
};

Expand All @@ -32,6 +32,17 @@ pub struct Queue {
pub rx: mpsc::Receiver<QueueEvent>,
}

pub enum OnHold {
InFlight,
ConcurrencyLimited {
limiters: Vec<ConcurrencyLimiter>,
next_due: Option<u64>,
},
Locked {
until: u64,
},
}

impl SpawnQueue for mpsc::Receiver<QueueEvent> {
fn spawn(self, core: Arc<Inner>) {
tokio::spawn(async move {
Expand All @@ -55,6 +66,8 @@ impl Queue {
pub async fn start(&mut self) {
let mut is_paused = false;
let mut next_cleanup = Instant::now() + CLEANUP_INTERVAL;
let mut in_flight_count = 0;
let mut has_back_pressure = false;

loop {
let refresh_queue = match tokio::time::timeout(
Expand All @@ -63,27 +76,37 @@ impl Queue {
)
.await
{
Ok(Some(QueueEvent::Refresh(queue_id))) => {
if let Some(queue_id) = queue_id {
self.on_hold.remove(&queue_id);
}
true
}
Ok(Some(QueueEvent::WorkerDone(queue_id))) => {
self.on_hold.remove(&queue_id);
!self.on_hold.is_empty()
}
Ok(Some(QueueEvent::OnHold { queue_id, status })) => {
if let OnHold::Locked { until } = &status {
let due_in = Instant::now() + Duration::from_secs(*until - now());
if due_in < self.next_wake_up {
self.next_wake_up = due_in;
Ok(Some(QueueEvent::WorkerDone { queue_id, status })) => {
in_flight_count -= 1;

match status {
QueueEventStatus::Completed => {
self.on_hold.remove(&queue_id);
!self.on_hold.is_empty() || has_back_pressure
}
}
QueueEventStatus::Locked { until } => {
let due_in = Instant::now() + Duration::from_secs(until - now());
if due_in < self.next_wake_up {
self.next_wake_up = due_in;
}

self.on_hold.insert(queue_id, status);
self.on_hold.len() > 1
self.on_hold.insert(queue_id, OnHold::Locked { until });
self.on_hold.len() > 1 || has_back_pressure
}
QueueEventStatus::Limited { limiters, next_due } => {
self.on_hold.insert(
queue_id,
OnHold::ConcurrencyLimited { limiters, next_due },
);
!self.on_hold.is_empty() || has_back_pressure
}
QueueEventStatus::Deferred => {
self.on_hold.remove(&queue_id);
true
}
}
}
Ok(Some(QueueEvent::Refresh)) => true,
Ok(Some(QueueEvent::Paused(paused))) => {
self.core
.data
Expand All @@ -101,11 +124,17 @@ impl Queue {
if !is_paused {
// Deliver scheduled messages
if refresh_queue || self.next_wake_up <= Instant::now() {
let now = now();
let mut next_wake_up = QUEUE_REFRESH;
// If the number of in-flight messages is greater than the maximum allowed, skip the queue
let server = self.core.build_server();
let max_in_flight = server.core.smtp.queue.throttle.outbound_concurrency;
has_back_pressure = in_flight_count >= max_in_flight;
if has_back_pressure {
continue;
}

// Process queue events
let now = now();
let mut next_wake_up = QUEUE_REFRESH;
let mut queue_events = server.next_event().await;

if queue_events.len() > 5 {
Expand All @@ -114,6 +143,17 @@ impl Queue {

for queue_event in &queue_events {
if queue_event.due <= now {
// Enforce global concurrency limits
if in_flight_count >= max_in_flight {
has_back_pressure = true;
trc::event!(
Queue(trc::QueueEvent::ConcurrencyLimitExceeded),
Details = "Outbound concurrency limit exceeded.",
Limit = max_in_flight,
);
break;
}

// Check if the message is still on hold
if let Some(on_hold) = self.on_hold.get(&queue_event.queue_id) {
match on_hold {
Expand All @@ -140,28 +180,10 @@ impl Queue {
self.on_hold.remove(&queue_event.queue_id);
}

// Enforce global concurrency limits
let mut in_flight = Vec::new();
match server.is_outbound_allowed(&mut in_flight) {
Ok(_) => {
self.on_hold.insert(queue_event.queue_id, OnHold::InFlight);
DeliveryAttempt {
in_flight,
event: *queue_event,
}
.try_deliver(server.clone());
}

Err(limiter) => {
self.on_hold.insert(
queue_event.queue_id,
OnHold::ConcurrencyLimited {
limiters: vec![limiter],
next_due: None,
},
);
}
}
// Deliver message
in_flight_count += 1;
self.on_hold.insert(queue_event.queue_id, OnHold::InFlight);
DeliveryAttempt::new(*queue_event).try_deliver(server.clone());
} else {
let due_in = queue_event.due - now;
if due_in < next_wake_up {
Expand Down
Loading

0 comments on commit 76c53af

Please sign in to comment.